Back to Docs
Data Sources

Vector Databases

Connect ChromaDB, Pinecone, Weaviate, pgvector, Cosmos DB, Databricks, LangChain, and LlamaIndex

Last updated: August 20, 2025
Category: data-sources

Vector Database Connectors

Vecta provides connectors for all major vector databases. Each connector requires a VectorDBSchema that maps your database's field structure to Vecta's chunk format. See Accessor Syntax for the full path syntax reference.

ChromaDB (Local)

from vecta import ChromaLocalConnector, VectorDBSchema
import chromadb

chroma_client = chromadb.Client()

schema = VectorDBSchema(
    id_accessor="id",
    content_accessor="document",
    metadata_accessor="metadata",
    source_path_accessor="metadata.source_path",
    page_nums_accessor="metadata.page_nums",
)

connector = ChromaLocalConnector(
    client=chroma_client,
    collection_name="my_collection",
    schema=schema,
)
ParameterTypeDescription
clientchromadb.api.ClientAPIChromaDB client instance
collection_namestrName of the collection
schemaVectorDBSchemaSchema for field extraction

ChromaDB (Cloud)

from vecta import ChromaCloudConnector, VectorDBSchema

schema = VectorDBSchema(
    id_accessor="id",
    content_accessor="document",
    metadata_accessor="metadata",
    source_path_accessor="metadata.source_path",
    page_nums_accessor="metadata.page_nums",
)

connector = ChromaCloudConnector(
    tenant="my-tenant",
    database="my-database",
    api_key="ck-...",
    collection_name="my_collection",
    schema=schema,
)
ParameterTypeDescription
tenantstrChroma Cloud tenant ID
databasestrDatabase name
api_keystrChroma Cloud API key
collection_namestrCollection name
schemaVectorDBSchemaSchema for field extraction

Pinecone

from vecta import PineconeConnector, VectorDBSchema

schema = VectorDBSchema(
    id_accessor=".id",
    content_accessor="metadata.content",
    metadata_accessor="metadata",
    source_path_accessor="metadata.source_path",
    page_nums_accessor="metadata.page_nums",
)

connector = PineconeConnector(
    api_key="pk-...",
    index_name="my-index",
    namespace="",
    openai_api_key="sk-...",  # for query embedding
    schema=schema,
)

Note: Pinecone requires an embedding API key for semantic_search because queries must be converted to vectors client-side.

Weaviate

from vecta import WeaviateConnector, VectorDBSchema

schema = VectorDBSchema(
    id_accessor=".uuid",
    content_accessor="properties.content",
    metadata_accessor="properties.metadata",
    source_path_accessor="properties.metadata.source_path",
    page_nums_accessor="properties.metadata.page_nums",
)

connector = WeaviateConnector(
    cluster_url="https://my-cluster.weaviate.network",
    api_key="wk-...",
    collection_name="Documents",
    use_cloud=True,
    schema=schema,
)
ParameterTypeDefaultDescription
cluster_urlstr—Weaviate Cloud cluster URL
api_keystr—API key
collection_namestr"Documents"Collection name
use_cloudboolTrueWhether to use cloud or local instance
hoststr"localhost"Local host (when use_cloud=False)
portint8080HTTP port
grpc_portint50051gRPC port
schemaVectorDBSchema—Schema for field extraction

pgvector

from vecta import PgVectorConnector, VectorDBSchema

schema = VectorDBSchema(
    id_accessor="id",
    content_accessor="content",
    metadata_accessor="metadata",
    source_path_accessor="metadata.source_path",
    page_nums_accessor="metadata.page_nums",
)

connector = PgVectorConnector(
    dsn="postgresql://user:pass@host:5432/mydb",
    table="chunks",
    schema=schema,
    openai_api_key="sk-...",  # for query embedding
)

Azure Cosmos DB

from vecta import AzureCosmosConnector, VectorDBSchema

schema = VectorDBSchema(
    id_accessor="id",
    content_accessor="content",
    metadata_accessor="metadata",
    source_path_accessor="metadata.source_path",
    page_nums_accessor="metadata.page_nums",
)

connector = AzureCosmosConnector(
    endpoint="https://my-cosmos.documents.azure.com:443/",
    key="...",
    database_name="my-db",
    container_name="my-container",
    schema=schema,
    openai_api_key="sk-...",  # for query embedding
)

Databricks

from vecta import DatabricksConnector, VectorDBSchema

schema = VectorDBSchema(
    id_accessor="[0]",
    content_accessor="[1]",
    metadata_accessor="[2]",
    source_path_accessor="[2].source_path",
    page_nums_accessor="[2].page_nums",
)

connector = DatabricksConnector(
    workspace_url="https://my-workspace.databricks.com",
    index_name="my_catalog.my_schema.my_index",
    personal_access_token="dapi...",
    schema=schema,
)

LangChain

Wrap any LangChain VectorStore or BaseRetriever:

from vecta import LangChainVectorStoreConnector, VectorDBSchema

schema = VectorDBSchema(
    id_accessor="id",
    content_accessor="page_content",
    metadata_accessor="metadata",
    source_path_accessor="metadata.source_path",
    page_nums_accessor="metadata.page_nums",
)

connector = LangChainVectorStoreConnector(
    vectorstore=my_langchain_vectorstore,
    schema=schema,
)

# Or with a retriever
connector = LangChainVectorStoreConnector(
    retriever=my_langchain_retriever,
    schema=schema,
)

Note: get_all_chunks() is supported for Chroma-backed and FAISS-backed LangChain vector stores. Other stores may only support semantic_search().

LlamaIndex

Wrap a LlamaIndex VectorStoreIndex or BaseRetriever:

from vecta import LlamaIndexConnector, VectorDBSchema

schema = VectorDBSchema(
    id_accessor="node_id",
    content_accessor="content",
    metadata_accessor="metadata",
    source_path_accessor="metadata.source_path",
    page_nums_accessor="metadata.page_nums",
)

connector = LlamaIndexConnector(
    index=my_llama_index,
    schema=schema,
)

Custom Connectors

Build your own connector by extending BaseVectorDBConnector:

from vecta.connectors.base import BaseVectorDBConnector
from vecta import ChunkData, VectorDBSchema

class MyCustomConnector(BaseVectorDBConnector):
    def __init__(self, db_client, schema: VectorDBSchema):
        super().__init__(schema)
        self.db = db_client

    def get_all_chunks(self) -> list[ChunkData]:
        results = self.db.get_all()
        return [self._create_chunk_data_from_raw(r) for r in results]

    def semantic_search(self, query_str: str, k: int = 10) -> list[ChunkData]:
        results = self.db.search(query_str, limit=k)
        return [self._create_chunk_data_from_raw(r) for r in results]

    def get_chunk_by_id(self, chunk_id: str) -> ChunkData:
        result = self.db.get(chunk_id)
        return self._create_chunk_data_from_raw(result)

The inherited _create_chunk_data_from_raw() method uses your schema to extract fields automatically.

Next Steps

  • Accessor Syntax — Full reference for schema paths
  • Benchmarks — Create evaluation datasets from your data source

Need Help?

Can't find what you're looking for? Our team is here to help.