AI 技术教程 | RAG 检索增强生成系统完整实现
第一章:RAG 技术原理与架构设计
1.1 什么是 RAG?
RAG(Retrieval-Augmented Generation,检索增强生成)是一种将信息检索与文本生成相结合的技术架构。它通过从外部知识库中检索相关信息,并将其作为上下文提供给大语言模型,从而生成更准确、更有依据的回答。
传统的大语言模型存在一个根本性局限:知识截止。模型在训练完成后,其知识就固定在了训练数据的时间点。RAG 通过引入外部检索机制,有效解决了这个问题。
1.2 RAG 的核心组件
一个完整的 RAG 系统包含以下核心组件:
“
┌─────────────────────────────────────────────────────────────┐
│ RAG 系统架构 │
├─────────────────────────────────────────────────────────────┤
│ 用户查询 → 查询理解 → 向量检索 → 文档重排序 → 上下文构建 │
│ ↓ │
│ 生成响应 ← 大语言模型 ← Prompt 工程 ← 知识片段注入 │
└─────────────────────────────────────────────────────────────┘
“
关键组件说明:
1. 文档编码器:将知识库文档转换为向量表示
2. 向量数据库:存储和索引文档向量,支持高效相似性搜索
3. 查询处理器:理解用户意图,优化检索查询
4. 重排序模块:对检索结果进行相关性排序
5. 生成模型:基于检索到的上下文生成最终回答
1.3 为什么需要 RAG?
| 问题类型 | 纯 LLM 方案 | RAG 方案 |
|---|---|---|
| 知识时效性 | ❌ 无法获取最新信息 | ✅ 可检索实时数据 |
| 事实准确性 | ❌ 容易产生幻觉 | ✅ 基于检索证据 |
| 领域专业性 | ❌ 通用知识为主 | ✅ 可注入专业知识 |
| 可解释性 | ❌ 黑盒生成 | ✅ 可追溯信息来源 |
| 成本控制 | ❌ 需要频繁微调 | ✅ 一次构建多次使用 |
第二章:RAG 系统实现步骤
2.1 环境准备与依赖安装
首先安装必要的 Python 依赖包:
“bash
创建虚拟环境
python -m venv rag_env
source rag_env/bin/activate
# 安装核心依赖 pip install langchain langchain-community langchain-openai pip install chromadb sentence-transformers pip install faiss-cpu pinecone-client pip install unstructured pdf2image python-docx pip install fastapi uvicorn python-multipart “2.2 文档加载与预处理
文档预处理是 RAG 系统的基础。我们需要将各种格式的文档转换为统一的文本格式,并进行适当的分块处理。
“
python from langchain.document_loaders import ( PyPDFLoader, UnstructuredMarkdownLoader, Docx2txtLoader ) from langchain.text_splitter import RecursiveCharacterTextSplitter from typing import List, Dict
class DocumentProcessor:
“””文档处理器:支持多种格式文档的加载和分块”””
def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50):
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
separators=[“\n\n”, “\n”, “。”, “!”, “?”, “;”, ” “]
)
def load_document(self, file_path: str, file_type: str = None) -> List:
“””加载单个文档”””
if file_type is None:
file_type = file_path.split(‘.’)[-1].lower()
loaders = {
‘pdf’: PyPDFLoader,
‘md’: UnstructuredMarkdownLoader,
‘markdown’: UnstructuredMarkdownLoader,
‘docx’: Docx2txtLoader,
‘txt’: lambda path: [type(‘Doc’, (), {‘page_content’: open(path).read()})()]
}
if file_type not in loaders:
raise ValueError(f”不支持的文件类型:{file_type}”)
loader = loaders[file_type](file_path)
return loader.load()
def split_documents(self, documents: List) -> List:
“””将文档分割成小块”””
return self.text_splitter.split_documents(documents)
def process_file(self, file_path: str) -> List:
“””完整处理流程:加载 + 分块”””
docs = self.load_document(file_path)
return self.split_documents(docs)
“
2.3 向量嵌入与存储
选择合适的嵌入模型和向量数据库是 RAG 系统性能的关键。
“python
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma, FAISS
import chromadb
from chromadb.config import Settings
class VectorStoreManager:
“””向量存储管理器”””
def __init__(
self,
embedding_model: str = “sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2”,
vector_store_type: str = “chroma”
):
# 初始化嵌入模型(支持中文)
self.embeddings = HuggingFaceEmbeddings(
model_name=embedding_model,
model_kwargs={‘device’: ‘cuda’},
encode_kwargs={‘normalize_embeddings’: True}
)
self.vector_store_type = vector_store_type
self.vector_store = None
def create_store(self, documents: List, persist_directory: str = “./chroma_db”):
“””创建向量存储”””
if self.vector_store_type == “chroma”:
self.vector_store = Chroma.from_documents(
documents=documents,
embedding=self.embeddings,
persist_directory=persist_directory,
client_settings=Settings(
anonymized_telemetry=False,
allow_reset=True
)
)
elif self.vector_store_type == “faiss”:
self.vector_store = FAISS.from_documents(
documents=documents,
embedding=self.embeddings
)
return self.vector_store
def add_documents(self, documents: List):
“””向现有存储添加文档”””
if self.vector_store is None:
raise ValueError(“请先创建向量存储”)
self.vector_store.add_documents(documents)
def similarity_search(self, query: str, k: int = 5) -> List:
“””执行相似性搜索”””
return self.vector_store.similarity_search(query, k=k)
def similarity_search_with_score(self, query: str, k: int = 5) -> List:
“””带分数的相似性搜索”””
return self.vector_store.similarity_search_with_score(query, k=k)
“
2.4 检索器配置与优化
“python
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import (
LLMChainExtractor,
EmbeddingsFilter
)
from langchain.chat_models import ChatOpenAI
class RetrieverConfig:
“””高级检索器配置”””
def __init__(self, vector_store, llm=None):
self.base_retriever = vector_store.as_retriever(
search_type=”similarity_score_threshold”,
search_kwargs={“score_threshold”: 0.3, “k”: 10}
)
self.llm = llm or ChatOpenAI(
model=”gpt-4″,
temperature=0.7,
openai_api_key=”your-api-key”
)
def create_compression_retriever(self):
“””创建带压缩的检索器(提高相关性)”””
# 使用 LLM 提取最相关的片段
extractor = LLMChainExtractor.from_llm(self.llm)
# 使用嵌入过滤器过滤不相关文档
embeddings_filter = EmbeddingsFilter(
embeddings=self.base_retriever.embeddings,
similarity_threshold=0.5
)
# 组合压缩器
from langchain.retrievers.document_compressors import DocumentCompressorPipeline
compressor = DocumentCompressorPipeline(
transformers=[embeddings_filter, extractor]
)
# 创建压缩检索器
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=self.base_retriever
)
return compression_retriever
def create_parent_document_retriever(self, vector_store):
“””父文档检索器:检索小块,返回大块上下文”””
from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore
# 大块用于生成,小块用于检索
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000)
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400)
# 存储父文档
doc_store = InMemoryStore()
retriever = ParentDocumentRetriever(
vectorstore=vector_store,
docstore=doc_store,
child_splitter=child_splitter,
parent_splitter=parent_splitter,
)
return retriever
“
第三章:完整 RAG 系统实现
3.1 RAG 引擎核心类
“python
from langchain.prompts import ChatPromptTemplate
from langchain.schema import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough
import hashlib
from datetime import datetime
class RAGEngine:
“””完整的 RAG 引擎实现”””
def __init__(
self,
vector_store,
llm,
retriever=None,
max_context_length: int = 4000
):
self.vector_store = vector_store
self.llm = llm
self.retriever = retriever or vector_store.as_retriever(
search_kwargs={“k”: 5}
)
self.max_context_length = max_context_length
self.query_history = []
def _format_docs(self, docs) -> str:
“””格式化检索到的文档”””
formatted = []
for i, doc in enumerate(docs, 1):
source = doc.metadata.get(‘source’, ‘未知来源’)
content = doc.page_content.strip()
formatted.append(f”[来源 {i}] {source}\n{content}”)
return “\n\n”.join(formatted)
def _build_prompt(self) -> ChatPromptTemplate:
“””构建 RAG Prompt 模板”””
template = “””你是一个专业的 AI 助手,基于以下检索到的信息回答用户问题。
【检索到的信息】
{context}
【回答要求】
1. 严格基于上述信息回答,不要编造事实
2. 如果信息不足以回答问题,请明确说明
3. 引用信息来源(使用 [来源 X] 格式)
4. 回答要清晰、准确、有条理
5. 如果问题与检索内容无关,礼貌地说明
【用户问题】
{question}
请开始回答:”””
return ChatPromptTemplate.from_template(template)
def query(self, question: str, include_sources: bool = True) -> Dict:
“””执行 RAG 查询”””
start_time = datetime.now()
# 步骤 1: 检索相关文档
docs = self.retriever.get_relevant_documents(question)
# 步骤 2: 格式化上下文
context = self._format_docs(docs)
# 步骤 3: 构建并执行 Prompt
prompt = self._build_prompt()
chain = (
{“context”: lambda x: context, “question”: RunnablePassthrough()}
| prompt
| self.llm
| StrOutputParser()
)
answer = chain.invoke(question)
# 步骤 4: 记录查询历史
query_record = {
“question”: question,
“answer”: answer,
“sources”: [
{
“content”: doc.page_content[:200],
“source”: doc.metadata.get(‘source’, ‘未知’),
“score”: doc.metadata.get(‘score’, None)
}
for doc in docs
],
“timestamp”: start_time.isoformat(),
“latency_ms”: (datetime.now()
- start_time).total_seconds() * 1000
}
self.query_history.append(query_record)
if include_sources:
sources_text = “\n\n参考资料:\n”
for i, doc in enumerate(docs, 1):
source = doc.metadata.get(‘source’, ‘未知来源’)
sources_text += f”- [来源 {i}] {source}\n”
return {“answer”: answer + sources_text, “metadata”: query_record}
return {“answer”: answer, “metadata”: query_record}
def batch_query(self, questions: List[str]) -> List[Dict]:
“””批量查询”””
return [self.query(q) for q in questions]
“
3.2 API 服务封装
“python
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import uvicorn
app = FastAPI(title=”RAG API Service”, version=”1.0.0″)
# 配置 CORS
app.add_middleware(
CORSMiddleware,
allow_origins=[“*”],
allow_credentials=True,
allow_methods=[“*”],
allow_headers=[“*”],
)
# 全局 RAG 引擎实例
rag_engine: RAGEngine = None
class QueryRequest(BaseModel):
question: str
top_k: int = 5
include_sources: bool = True
class QueryResponse(BaseModel):
answer: str
sources: List[Dict]
latency_ms: float
@app.post(“/upload”)
async def upload_document(file: UploadFile = File(…)):
“””上传并处理文档”””
try:
# 保存临时文件
temp_path = f”./temp/{file.filename}”
with open(temp_path, “wb”) as f:
f.write(await file.read())
# 处理文档
processor = DocumentProcessor()
docs = processor.process_file(temp_path)
# 添加到向量存储
rag_engine.vector_store.add_documents(docs)
return {
“status”: “success”,
“message”: f”成功处理 {len(docs)} 个文档片段”,
“filename”: file.filename
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post(“/query”, response_model=QueryResponse)
async def query(request: QueryRequest):
“””执行 RAG 查询”””
if rag_engine is None:
raise HTTPException(status_code=503, detail=”RAG 引擎未初始化”)
result = rag_engine.query(
question=request.question,
include_sources=request.include_sources
)
return QueryResponse(
answer=result[“answer”],
sources=result[“metadata”][“sources”],
latency_ms=result[“metadata”][“latency_ms”]
)
@app.get(“/health”)
async def health_check():
“””健康检查”””
return {“status”: “healthy”, “engine_ready”: rag_engine is not None}
if __name__ == “__main__”:
uvicorn.run(app, host=”0.0.0.0″, port=8000)
“
第四章:实战案例分析
4.1 案例一:企业知识库问答系统
场景描述:某科技公司需要构建内部知识库问答系统,帮助员工快速查找技术文档、项目资料和政策文件。
实施方案:
“python
企业知识库 RAG 配置
class EnterpriseRAG(RAGEngine):
def __init__(self):
# 使用本地部署的模型保证数据安全
llm = ChatOpenAI(
model=”qwen-max”,
base_url=”https://dashscope.aliyuncs.com/compatible-mode/v1″,
api_key=”your-dashscope-key”
)
# 使用 ChromaDB 本地存储
vector_store = Chroma(
persist_directory=”./enterprise_knowledge_base”,
embedding_function=HuggingFaceEmbeddings(
model_name=”BAAI/bge-large-zh-v1.5″
)
)
super().__init__(vector_store, llm)
def query_with_permission(self, question: str, user_role: str) -> Dict:
“””基于用户角色的权限控制查询”””
# 根据角色过滤可访问的文档
permission_filter = {
“engineer”: [“technical”, “project”],
“manager”: [“technical”, “project”, “policy”, “hr”],
“executive”: [“technical”, “project”, “policy”, “hr”, “financial”]
}
allowed_categories = permission_filter.get(user_role, [])
# 添加权限过滤的检索
filtered_docs = self.retriever.get_relevant_documents(
question,
filter={“category”: {“$in”: allowed_categories}}
)
return self._generate_answer(question, filtered_docs)
“
实施效果:
- 响应时间:平均 1.2 秒
- 准确率:89%(基于用户反馈)
– 文档覆盖率:15,000+ 文档,50,000+ 片段
4.2 案例二:客服智能问答机器人
场景描述:电商平台需要 7×24 小时在线客服,处理产品咨询、售后问题和订单查询。
关键优化点:
“python
客服场景的特殊处理
class CustomerServiceRAG(RAGEngine):
def __init__(self):
super().__init__(…)
self.intent_classifier = self._load_intent_model()
self.fallback_responses = {
“complaint”: “非常抱歉给您带来不便,我将为您转接人工客服。”,
“refund”: “退款问题需要人工审核,正在为您转接…”,
“technical”: “这是一个技术问题,让我为您查找解决方案…”
}
def query(self, question: str) -> Dict:
# 意图识别
intent = self.intent_classifier.predict(question)
# 高风险意图直接转人工
if intent in [“complaint”, “legal”, “urgent”]:
return {“answer”: self.fallback_responses[intent], “human_handoff”: True}
# 正常 RAG 查询
result = super().query(question)
# 置信度检查
if result[“metadata”][“sources”][0].get(“score”, 1) < 0.4:
result["answer"] += "\n\n如果以上信息未能解决您的问题,我可以为您转接人工客服。"
return result
`
第五章:故障排查与性能优化
5.1 常见问题诊断
问题 1:检索结果不相关
`python
诊断步骤
def diagnose_retrieval_issue(query: str, rag_engine: RAGEngine):
print("=== 检索诊断报告 ===\n")
# 1. 检查查询嵌入质量
query_embedding = rag_engine.embeddings.embed_query(query)
print(f"1. 查询向量维度:{len(query_embedding)}")
print(f" 向量范数:{np.linalg.norm(query_embedding):.4f}\n")
# 2. 检查检索结果分数分布
docs_with_scores = rag_engine.vector_store.similarity_search_with_score(query, k=10)
scores = [score for _, score in docs_with_scores]
print(f"2. 检索分数统计:")
print(f" 最高分:{max(scores):.4f}")
print(f" 最低分:{min(scores):.4f}")
print(f" 平均分:{np.mean(scores):.4f}\n")
# 3. 检查文档内容质量
print("3. Top-3 检索结果预览:")
for i, (doc, score) in enumerate(docs_with_scores[:3], 1):
print(f" [结果 {i}] 分数:{score:.4f}")
print(f" 内容:{doc.page_content[:150]}...\n")
# 4. 建议优化措施
if max(scores) < 0.5:
print("⚠️ 建议:检索分数偏低,考虑:")
print(" - 更换嵌入模型")
print(" - 调整分块策略")
print(" - 添加查询重写模块")
`
问题 2:生成内容出现幻觉
`python
添加事实核查层
class FactCheckingRAG(RAGEngine):
def _verify_answer(self, question: str, answer: str, contexts: List[str]) -> Dict:
“””验证答案是否基于提供的上下文”””
verification_prompt = f”””
请判断以下答案是否完全基于提供的上下文信息:
【上下文】
{chr(10).join(contexts)}
【答案】
{answer}
【判断标准】
- 答案中的所有事实陈述都能在上下文中找到依据
- 没有添加上下文之外的信息
– 没有曲解或过度推断上下文内容
请回答:是/否,并说明理由。
“””
result = self.llm.invoke(verification_prompt)
is_verified = “是” in result.content
return {
“verified”: is_verified,
“explanation”: result.content,
“confidence”: 0.9 if is_verified else 0.3
}
def query_with_verification(self, question: str) -> Dict:
result = super().query(question)
contexts = [src[“content”] for src in result[“metadata”][“sources”]]
verification = self._verify_answer(
question,
result[“answer”],
contexts
)
result[“verification”] = verification
return result
“
5.2 性能优化策略
优化 1:缓存层实现
“python
from functools import lru_cache
import redis
import json
class CachedRAG(RAGEngine):
def __init__(self, *args, cache_ttl: int = 3600, **kwargs):
super().__init__(*args, **kwargs)
self.cache = redis.Redis(host=’localhost’, port=6379, db=0)
self.cache_ttl = cache_ttl
def _cache_key(self, question: str) -> str:
return f”rag:query:{hashlib.md5(question.encode()).hexdigest()}”
def query(self, question: str) -> Dict:
cache_key = self._cache_key(question)
# 尝试从缓存获取
cached = self.cache.get(cache_key)
if cached:
return json.loads(cached)
# 执行正常查询
result = super().query(question)
# 缓存结果
self.cache.setex(
cache_key,
self.cache_ttl,
json.dumps(result, ensure_ascii=False)
)
return result
“
优化 2:异步批量处理
“python
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncRAG(RAGEngine):
def __init__(self, *args, max_workers: int = 10, **kwargs):
super().__init__(*args, **kwargs)
self.executor = ThreadPoolExecutor(max_workers=max_workers)
async def query_async(self, question: str) -> Dict:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.executor,
self.query,
question
)
async def batch_query_async(self, questions: List[str]) -> List[Dict]:
tasks = [self.query_async(q) for q in questions]
return await asyncio.gather(*tasks)
“
5.3 监控与日志
“python
import logging
from prometheus_client import Counter, Histogram, start_http_server
# 定义监控指标
QUERY_COUNTER = Counter(‘rag_queries_total’, ‘Total RAG queries’, [‘status’])
QUERY_LATENCY = Histogram(‘rag_query_latency_seconds’, ‘Query latency’)
RETRIEVAL_SCORE = Histogram(‘rag_retrieval_score’, ‘Retrieval scores’)
class MonitoredRAG(RAGEngine):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format=’%(asctime)s
- %(name)s – %(levelname)s – %(message)s’
)
self.logger = logging.getLogger(‘RAG’)
# 启动监控服务器
start_http_server(8001)
def query(self, question: str) -> Dict:
start_time = datetime.now()
try:
result = super().query(question)
# 记录成功指标
QUERY_COUNTER.labels(status=’success’).inc()
QUERY_LATENCY.observe((datetime.now() – start_time).total_seconds())
if result[“metadata”][“sources”]:
top_score = result[“metadata”][“sources”][0].get(“score”, 0)
if top_score:
RETRIEVAL_SCORE.observe(top_score)
self.logger.info(f”Query success: {question[:50]}… latency={result[‘metadata’][‘latency_ms’]:.0f}ms”)
return result
except Exception as e:
QUERY_COUNTER.labels(status=’error’).inc()
self.logger.error(f”Query failed: {question[:50]}… error={str(e)}”)
raise
“
第六章:总结与最佳实践
6.1 RAG 系统实施检查清单
在部署 RAG 系统前,请确保完成以下检查:
- [ ] 文档质量:文档已清洗、去重、格式统一
- [ ] 分块策略:chunk_size 和 chunk_overlap 已针对文档类型优化
- [ ] 嵌入模型:选择了适合领域和语言的嵌入模型
- [ ] 检索配置:k 值和 score_threshold 已调优
- [ ] Prompt 工程:Prompt 模板经过多轮测试和优化
- [ ] 评估指标:建立了准确率、召回率、响应时间的监控
- [ ] 安全控制:实现了访问控制和内容过滤
– [ ] 容错机制:配置了降级策略和人工接管流程
6.2 技术选型建议
| 组件 | 小规模场景 | 大规模场景 | 企业级场景 |
|---|---|---|---|
| 嵌入模型 | sentence-transformers | BGE-large | 自定义微调 |
| 向量数据库 | Chroma/FAISS | Pinecone/Weaviate | Milvus/Qdrant |
| LLM | GPT-3.5-turbo | GPT-4/Claude | 私有化部署 |
| 缓存 | 内存缓存 | Redis | Redis Cluster |
6.3 参考资源
1. LangChain 官方文档
- https://python.langchain.com/docs/get_started/introduction
2. RAG 技术论文
- Lewis et al. “Retrieval-Augmented Generation for Knowledge-Intensive NLP Tasks” (2020)
3. 向量数据库对比
- https://github.com/milvus-io/awesome-vector-database
4. 嵌入模型排行榜
- https://huggingface.co/spaces/mteb/leaderboard
5. RAG 评估框架 – https://github.com/ragas-io/ragas
—
作者注:本文提供了 RAG 系统从理论到实践的完整指南。实际部署时,请根据具体业务场景调整参数和架构。RAG 技术仍在快速发展,建议持续关注最新研究成果和工程实践。
本文字数:约 4,200 字 | 代码块:8 个 | 参考链接:5 个
虾米生活分享

评论前必须登录!
注册