使用 LangChain 和 Supabase 进行文档向量化与语义搜索
之前写过3篇文章关于Langchain的基础使用、SequentialChain、以及Agent的使用,今天来研究一下如果通过Langchain来处理文档,并对文档进行向量化处理。
前面提到Agent的应用的重要使用场景之一,就是RAG(Retrieval-Augmented Generation,检索增强生成),即通过私有的知识库,来增强大语言模型的信息丰富程度。而将已有的知识库与LLM进行融合的方式之一,就是通过文档文档向量化(vectorize)并嵌入(embedding)到向量数据库中,再通过向量化的检索关键词,进行相似度查询,进行资料的查询。
研究主题
- 学习如何使用 LangChain 加载和分割 PDF 文档
- 掌握如何将文档内容向量化并存储到 Supabase 数据库
- 实现基于向量相似度的语义搜索功能
文档预处理:加载、分割
我们知道要查询一个文档的话,需要查询文档中的某些段落的内容,因此为了方便检索,需要对文档进行提前预处理,将文档进行适当的切分,
2.1 加载并分割 PDF 文档
def load_and_split_pdf(file_path: str) -> list:
loader = PyPDFLoader(file_path)
data = loader.load()
recursive_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # 单位:字符数
chunk_overlap=200, # 设置文本块之间的重叠字符数,用于保持上下文连贯性
add_start_index=True, # 为每个分割后的文档块添加起始位置索引,方便追踪原始文档中的位置
separators=["\n\n", "\n", " ", ""] # 默认的分割字符顺序
)
split_docs = recursive_splitter.split_documents(data)
return split_docs
- 功能:
- 加载 PDF 文件并将其分割为多个文档块,我们这里采用的是递归字符分割(RecursiveCharacterTextSplitter)的策略。
- 核心思路:按照一组预定义的字符顺序,递归地尝试将文本分割成更小的块,直到每个块的大小符合要求。
- 参数:
- chunk_size:切分后每个chunk的字符数
- chunk_overlap: 设置文本块之间的重叠字符数,用于保持上下文连贯性
- add_start_index: 是否在分割后的文档中添加当前块的起始检索位置,可以近似理解为:为每个文本块添加一个类似书本的页码,方便追踪在原始文档中的位置。
- separators:切分文本字符顺序,它会首先尝试用第一个字符(如 "\n\n")分割文本。如果分割后的块仍然过大,则继续用下一个字符(如 "\n")递归分割,直到所有字符都尝试完毕。
- 返回:分割后的文档列表
2.2 向量化文档
将原始文档切分后,就需要对原始文档进行向量化(vectorize)操作,并存储到向量数据库中。现在网上有很多服务商提供向量化操作和向量数据库,这里采用的是阿里的百炼平台和supabase。
前置任务:
- 注册阿里云,并开通百炼的向量模型服务,并申请api_key:百炼控制台
- 注册supabase, 同样申请api_key:Supabase控制台
- 将上面的key,写入到环境变量中。
def vectorize_docs(docs: list) -> list:
embeddings = []
for doc in docs:
completion = bailian_client.embeddings.create(
model="text-embedding-v3",
input=doc.page_content,
dimensions=1024,
encoding_format="float",
)
embeddings.append(completion.data[0].embedding)
return embeddings
-
功能:将文档内容转换为向量
-
参数:
docs
- 文档列表- model:指定使用的嵌入模型,这个可以在百炼模型广场的向量模型中查看所有可用的模型。
- input:待向量化的文档内容。
- dimensions:向量的维度,维度越高,向量能捕捉的文本特征越丰富,但计算成本也越高。
- encoding_format:向量的编码格式,通常使用浮点数以便更精确地表示文本特征。
-
返回:向量列表
2.3 插入向量数据到 Supabase
将文档进行向量化之后,下一步就需要存储到向量数据库中。我们这里使用的是supabase的客户端。
pip install supabase
在插入数据之前,需要先建好表,在supabase的控制台执行如下SQL:
CREATE TABLE IF NOT EXISTS document_vectors (
id SERIAL PRIMARY KEY,
document_id UUID,
content TEXT,
embedding VECTOR(1536), # 根据向量维度调整(如text-embedding-v3是1536维)
metadata JSONB
);
将向量化的文档,插入向量数据库。
def insert_vector(docs):
for i, doc in enumerate(docs):
document_id = str(uuid.uuid4())
content = doc.page_content
embedding = vectorize_docs([doc])[0]
metadata = doc.metadata
data = {
"document_id": document_id,
"content": content,
"embedding": json.dumps(embedding),
"metadata": metadata,
}
supabase.table("documents_vectors").insert(data).execute()
- 功能:将向量化后的文档插入到 Supabase 数据库
- 参数:
- document_id: 文档的唯一标识符,这里使用UUID。
- content:存储文档的原始文本内容
- embedding:文档的向量表示,这里调用前面的向量化方法,返回向量化后的文档。
- metadata:文档的元数据,存储文档的附加信息,如来源、作者、创建时间等,便于后续的过滤和分类。
成功将文档插入数据库后,我们就可以进行语义搜索了,我们这里使用langchain的vector_store库,调用similarity_search_with_relevance_scores方法。
2.4 语义搜索
先在supabase的控制台中,创建一个搜索函数:
DROP FUNCTION IF EXISTS semantic_search;
-- 创建自定义搜索函数
CREATE OR REPLACE FUNCTION semantic_search(
query_embedding VECTOR(1024), -- 根据实际维度调整(如text-embedding-v3是1536维)
top_k INTEGER DEFAULT 5
)
RETURNS TABLE (
id BIGINT,
content TEXT,
metadata JSONB,
similarity FLOAT
)
LANGUAGE plpgsql
STABLE
AS $$
BEGIN
RETURN QUERY
SELECT
dv.id,
dv.content,
dv.metadata,
dv.embedding <=> query_embedding AS similarity
FROM documents_vectors dv
ORDER BY similarity ASC -- 余弦距离越小表示越相似
LIMIT top_k;
END;
$$;
之后,通过vector_store来调用这个搜索函数。
def semantic_search(NL_query: str, k: int = 5, filter: dict = None):
results = vector_store.similarity_search_with_relevance_scores(
NL_query, k=k, filter=filter
)
for i, (doc, score) in enumerate(results, 1):
print(f"第{i}个结果:")
print(f"内容: {doc.page_content[:100]}")
print(f"相似度: {1-score:.4f}")
print(f"来源: {doc.metadata.get('source','N/A')}")
- 功能:根据自然语言查询进行语义搜索
- 参数:
NL_query
- 自然语言查询k
- 返回结果的数量filter
- 过滤条件
3. 使用示例
def main():
file_path = "/root/workspace/langchain/datasets/pdf/flink.pdf"
split_docs = load_and_split_pdf(file_path)
print(f"分割后的文档数量: {len(split_docs)}")
doc_embeddings = vectorize_docs(split_docs)
insert_vector(split_docs)
NL_query = "flink batch processing"
semantic_search(NL_query, k=5)
执行结果:
NL_query: flink batch processing
第1个结果:
内容: to go back to a point in time to reprocess data, to perform A/B testing and to apply
application up
相似度: 0.6755
来源: /root/workspace/langchain/datasets/pdf/flink.pdf
----------------------------------------------------------------------------------------------------
第2个结果:
内容: to go back to a point in time to reprocess data, to perform A/B testing and to apply
application up
相似度: 0.6755
来源: /root/workspace/langchain/datasets/pdf/flink.pdf
----------------------------------------------------------------------------------------------------
第3个结果:
内容: to go back to a point in time to reprocess data, to perform A/B testing and to apply
application up
相似度: 0.6755
来源: /root/workspace/langchain/datasets/pdf/flink.pdf
----------------------------------------------------------------------------------------------------
第4个结果:
内容: inadequate for such as use -case because the three consecutive reading may be in different
tumbling
相似度: 0.6720
来源: /root/workspace/langchain/datasets/pdf/flink.pdf
----------------------------------------------------------------------------------------------------
第5个结果:
内容: inadequate for such as use -case because the three consecutive reading may be in different
tumbling
相似度: 0.6720
来源: /root/workspace/langchain/datasets/pdf/flink.pdf
----------------------------------------------------------------------------------------------------
评论区