Vector Databases
Connect ChromaDB, Pinecone, Weaviate, pgvector, Cosmos DB, Databricks, LangChain, and LlamaIndex
Vector Database Connectors
Vecta provides connectors for all major vector databases. Each connector requires a VectorDBSchema that maps your database's field structure to Vecta's chunk format. See Accessor Syntax for the full path syntax reference.
ChromaDB (Local)
from vecta import ChromaLocalConnector, VectorDBSchema
import chromadb
chroma_client = chromadb.Client()
schema = VectorDBSchema(
id_accessor="id",
content_accessor="document",
metadata_accessor="metadata",
source_path_accessor="metadata.source_path",
page_nums_accessor="metadata.page_nums",
)
connector = ChromaLocalConnector(
client=chroma_client,
collection_name="my_collection",
schema=schema,
)
| Parameter | Type | Description |
|---|---|---|
client | chromadb.api.ClientAPI | ChromaDB client instance |
collection_name | str | Name of the collection |
schema | VectorDBSchema | Schema for field extraction |
ChromaDB (Cloud)
from vecta import ChromaCloudConnector, VectorDBSchema
schema = VectorDBSchema(
id_accessor="id",
content_accessor="document",
metadata_accessor="metadata",
source_path_accessor="metadata.source_path",
page_nums_accessor="metadata.page_nums",
)
connector = ChromaCloudConnector(
tenant="my-tenant",
database="my-database",
api_key="ck-...",
collection_name="my_collection",
schema=schema,
)
| Parameter | Type | Description |
|---|---|---|
tenant | str | Chroma Cloud tenant ID |
database | str | Database name |
api_key | str | Chroma Cloud API key |
collection_name | str | Collection name |
schema | VectorDBSchema | Schema for field extraction |
Pinecone
from vecta import PineconeConnector, VectorDBSchema
schema = VectorDBSchema(
id_accessor=".id",
content_accessor="metadata.content",
metadata_accessor="metadata",
source_path_accessor="metadata.source_path",
page_nums_accessor="metadata.page_nums",
)
connector = PineconeConnector(
api_key="pk-...",
index_name="my-index",
namespace="",
openai_api_key="sk-...", # for query embedding
schema=schema,
)
Note: Pinecone requires an embedding API key for
semantic_searchbecause queries must be converted to vectors client-side.
Weaviate
from vecta import WeaviateConnector, VectorDBSchema
schema = VectorDBSchema(
id_accessor=".uuid",
content_accessor="properties.content",
metadata_accessor="properties.metadata",
source_path_accessor="properties.metadata.source_path",
page_nums_accessor="properties.metadata.page_nums",
)
connector = WeaviateConnector(
cluster_url="https://my-cluster.weaviate.network",
api_key="wk-...",
collection_name="Documents",
use_cloud=True,
schema=schema,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
cluster_url | str | — | Weaviate Cloud cluster URL |
api_key | str | — | API key |
collection_name | str | "Documents" | Collection name |
use_cloud | bool | True | Whether to use cloud or local instance |
host | str | "localhost" | Local host (when use_cloud=False) |
port | int | 8080 | HTTP port |
grpc_port | int | 50051 | gRPC port |
schema | VectorDBSchema | — | Schema for field extraction |
pgvector
from vecta import PgVectorConnector, VectorDBSchema
schema = VectorDBSchema(
id_accessor="id",
content_accessor="content",
metadata_accessor="metadata",
source_path_accessor="metadata.source_path",
page_nums_accessor="metadata.page_nums",
)
connector = PgVectorConnector(
dsn="postgresql://user:pass@host:5432/mydb",
table="chunks",
schema=schema,
openai_api_key="sk-...", # for query embedding
)
Azure Cosmos DB
from vecta import AzureCosmosConnector, VectorDBSchema
schema = VectorDBSchema(
id_accessor="id",
content_accessor="content",
metadata_accessor="metadata",
source_path_accessor="metadata.source_path",
page_nums_accessor="metadata.page_nums",
)
connector = AzureCosmosConnector(
endpoint="https://my-cosmos.documents.azure.com:443/",
key="...",
database_name="my-db",
container_name="my-container",
schema=schema,
openai_api_key="sk-...", # for query embedding
)
Databricks
from vecta import DatabricksConnector, VectorDBSchema
schema = VectorDBSchema(
id_accessor="[0]",
content_accessor="[1]",
metadata_accessor="[2]",
source_path_accessor="[2].source_path",
page_nums_accessor="[2].page_nums",
)
connector = DatabricksConnector(
workspace_url="https://my-workspace.databricks.com",
index_name="my_catalog.my_schema.my_index",
personal_access_token="dapi...",
schema=schema,
)
LangChain
Wrap any LangChain VectorStore or BaseRetriever:
from vecta import LangChainVectorStoreConnector, VectorDBSchema
schema = VectorDBSchema(
id_accessor="id",
content_accessor="page_content",
metadata_accessor="metadata",
source_path_accessor="metadata.source_path",
page_nums_accessor="metadata.page_nums",
)
connector = LangChainVectorStoreConnector(
vectorstore=my_langchain_vectorstore,
schema=schema,
)
# Or with a retriever
connector = LangChainVectorStoreConnector(
retriever=my_langchain_retriever,
schema=schema,
)
Note:
get_all_chunks()is supported for Chroma-backed and FAISS-backed LangChain vector stores. Other stores may only supportsemantic_search().
LlamaIndex
Wrap a LlamaIndex VectorStoreIndex or BaseRetriever:
from vecta import LlamaIndexConnector, VectorDBSchema
schema = VectorDBSchema(
id_accessor="node_id",
content_accessor="content",
metadata_accessor="metadata",
source_path_accessor="metadata.source_path",
page_nums_accessor="metadata.page_nums",
)
connector = LlamaIndexConnector(
index=my_llama_index,
schema=schema,
)
Custom Connectors
Build your own connector by extending BaseVectorDBConnector:
from vecta.connectors.base import BaseVectorDBConnector
from vecta import ChunkData, VectorDBSchema
class MyCustomConnector(BaseVectorDBConnector):
def __init__(self, db_client, schema: VectorDBSchema):
super().__init__(schema)
self.db = db_client
def get_all_chunks(self) -> list[ChunkData]:
results = self.db.get_all()
return [self._create_chunk_data_from_raw(r) for r in results]
def semantic_search(self, query_str: str, k: int = 10) -> list[ChunkData]:
results = self.db.search(query_str, limit=k)
return [self._create_chunk_data_from_raw(r) for r in results]
def get_chunk_by_id(self, chunk_id: str) -> ChunkData:
result = self.db.get(chunk_id)
return self._create_chunk_data_from_raw(result)
The inherited _create_chunk_data_from_raw() method uses your schema to extract fields automatically.
Next Steps
- Accessor Syntax — Full reference for schema paths
- Benchmarks — Create evaluation datasets from your data source