元素码农
基础
UML建模
数据结构
算法
设计模式
网络
TCP/IP协议
HTTPS安全机制
WebSocket实时通信
数据库
sqlite
postgresql
clickhouse
后端
rust
go
java
php
mysql
redis
mongodb
etcd
nats
zincsearch
前端
浏览器
javascript
typescript
vue3
react
游戏
unity
unreal
C++
C#
Lua
App
android
ios
flutter
react-native
安全
Web安全
测试
软件测试
自动化测试 - Playwright
人工智能
Python
langChain
langGraph
运维
linux
docker
工具
git
svn
🌞
🌙
目录
▶
LangGraph基础概念
什么是LangGraph
核心特性解析
典型应用场景
▶
快速入门指南
环境安装配置
第一个LangGraph应用
执行流程演示
▶
核心组件解析
图结构基础
节点(Node)详解
边(Edge)的类型
执行引擎原理
路由策略配置
状态容器使用
错误处理机制
输入输出管道
配置管理系统
发布时间:
2025-04-01 23:07
↑
☰
# 输入输出管道 ## LangGraph输入输出管道详解 LangGraph的输入输出管道是连接外部世界与图执行核心的桥梁,负责数据的预处理、转换和后处理。本文将深入探讨LangGraph中的数据流管理、输入处理、输出格式化和管道组合,帮助您构建高效、灵活的数据处理流程。 ## 输入输出管道基础 ### 什么是输入输出管道? 在LangGraph中,输入输出管道是一系列处理函数的组合,用于: 1. **输入处理**:将外部数据转换为图可处理的状态格式 2. **输出处理**:将图的执行结果转换为所需的外部格式 3. **数据转换**:在图执行过程中转换数据格式 4. **数据验证**:确保数据符合预期格式和约束 管道设计遵循函数式编程原则,每个处理步骤都是纯函数,接收输入并产生输出,不修改原始数据。 ### 管道的角色 输入输出管道在LangGraph应用中扮演关键角色: - **接口适配**:将各种外部接口的数据转换为统一格式 - **数据标准化**:规范化和清理输入数据 - **格式转换**:在不同数据格式间进行转换 - **数据增强**:添加额外信息或元数据 - **结果格式化**:将内部状态转换为用户友好的输出 ## 输入管道 ### 基本输入处理 最简单的输入管道将外部数据转换为图状态: ```python from typing import TypedDict, List, Dict # 定义状态类型 class ChatState(TypedDict): messages: List[Dict[str, str]] context: Dict[str, str] # 基本输入处理函数 def process_input(user_message: str) -> ChatState: """将用户消息转换为初始状态""" return { "messages": [{"role": "user", "content": user_message}], "context": {} } # 使用输入处理函数 user_input = "Hello, can you help me?" initial_state = process_input(user_input) # 执行图 final_state = graph.invoke(initial_state) ``` ### 输入验证与规范化 输入管道可以包含验证和规范化步骤: ```python from pydantic import BaseModel, validator # 使用Pydantic进行输入验证 class UserInput(BaseModel): message: str user_id: str @validator('message') def message_not_empty(cls, v): if not v.strip(): raise ValueError('消息不能为空') return v.strip() # 输入处理函数 def validate_and_process(raw_input: dict) -> ChatState: # 验证输入 validated_input = UserInput(**raw_input) # 转换为状态 return { "messages": [{"role": "user", "content": validated_input.message}], "context": {"user_id": validated_input.user_id} } ``` ### 输入增强 输入管道可以通过外部数据源增强输入: ```python def enhance_input(user_message: str, user_id: str) -> ChatState: # 获取用户历史 user_history = get_user_history(user_id) # 获取相关知识 relevant_knowledge = retrieve_knowledge(user_message) # 创建增强状态 return { "messages": [{"role": "user", "content": user_message}], "context": { "user_id": user_id, "user_history": user_history, "relevant_knowledge": relevant_knowledge } } ``` ## 输出管道 ### 基本输出处理 输出管道将图执行结果转换为所需格式: ```python def process_output(final_state: ChatState) -> str: """提取最后一条助手消息作为输出""" messages = final_state["messages"] for msg in reversed(messages): if msg["role"] == "assistant": return msg["content"] return "No response generated" # 执行图并处理输出 final_state = graph.invoke(initial_state) response = process_output(final_state) ``` ### 输出格式化 输出管道可以将结果格式化为特定结构: ```python def format_output(final_state: ChatState) -> Dict: """将状态格式化为API响应""" # 提取最后一条助手消息 assistant_message = None for msg in reversed(final_state["messages"]): if msg["role"] == "assistant": assistant_message = msg["content"] break # 构建响应 return { "response": assistant_message or "No response", "conversation_id": final_state.get("conversation_id"), "metadata": { "processing_time": final_state.get("processing_time"), "model_used": final_state.get("model_used"), "confidence": final_state.get("confidence") } } ``` ### 输出转换 输出管道可以将文本转换为其他格式: ```python def transform_output(final_state: ChatState) -> Dict: """将助手回复转换为结构化数据""" # 提取助手回复 assistant_reply = None for msg in reversed(final_state["messages"]): if msg["role"] == "assistant": assistant_reply = msg["content"] break if not assistant_reply: return {"error": "No response generated"} # 解析JSON响应(假设助手回复是JSON字符串) try: import json structured_data = json.loads(assistant_reply) return structured_data except json.JSONDecodeError: # 如果不是JSON,尝试其他解析方法 return {"text": assistant_reply} ``` ## 管道组合 ### 函数组合 LangGraph支持将多个处理函数组合成管道: ```python from functools import reduce from typing import Callable, TypeVar, Any T = TypeVar('T') U = TypeVar('U') V = TypeVar('V') # 函数组合辅助函数 def compose(f: Callable[[U], V], g: Callable[[T], U]) -> Callable[[T], V]: """组合两个函数""" return lambda x: f(g(x)) # 创建输入管道 def normalize_text(text: str) -> str: """规范化文本""" return text.strip().lower() def extract_entities(text: str) -> Dict[str, Any]: """提取实体""" # 实体提取逻辑 return {"text": text, "entities": []} def create_state(data: Dict[str, Any]) -> ChatState: """创建初始状态""" return { "messages": [{"role": "user", "content": data["text"]}], "context": {"entities": data["entities"]} } # 组合输入管道 input_pipeline = compose(create_state, compose(extract_entities, normalize_text)) # 使用组合管道 user_input = " Hello, what's the Weather in New York? " initial_state = input_pipeline(user_input) ``` ### 管道类 LangGraph也支持使用类来定义更复杂的管道: ```python class Pipeline: """管道类,支持多个处理步骤""" def __init__(self, steps=None): self.steps = steps or [] def add_step(self, func): """添加处理步骤""" self.steps.append(func) return self def process(self, data): """执行所有处理步骤""" result = data for step in self.steps: result = step(result) return result # 创建输入管道 input_pipeline = Pipeline() input_pipeline.add_step(normalize_text) input_pipeline.add_step(extract_entities) input_pipeline.add_step(create_state) # 使用管道 initial_state = input_pipeline.process(user_input) ``` ## 高级管道技术 ### 异步管道 LangGraph支持异步管道处理: ```python import asyncio # 异步处理函数 async def fetch_user_data(user_id: str) -> Dict: """异步获取用户数据""" # 模拟异步API调用 await asyncio.sleep(0.5) return {"user_id": user_id, "preferences": {"language": "en"}} async def async_input_processor(user_message: str, user_id: str) -> ChatState: """异步输入处理""" # 并行获取多个数据源 user_data_task = fetch_user_data(user_id) knowledge_task = async_retrieve_knowledge(user_message) # 等待所有任务完成 user_data, knowledge = await asyncio.gather(user_data_task, knowledge_task) # 创建状态 return { "messages": [{"role": "user", "content": user_message}], "context": { "user_data": user_data, "knowledge": knowledge } } # 使用异步管道 async def process_request(user_message: str, user_id: str): # 处理输入 initial_state = await async_input_processor(user_message, user_id) # 执行图(假设图支持异步执行) final_state = await graph.ainvoke(initial_state) # 处理输出 return process_output(final_state) ``` ### 条件管道 LangGraph支持基于条件的管道处理: ```python def conditional_pipeline(data: Any) -> ChatState: """基于条件选择不同处理路径""" # 检查数据类型 if isinstance(data, str): # 文本输入处理 return text_input_processor(data) elif isinstance(data, dict): # 结构化数据处理 return structured_input_processor(data) elif isinstance(data, bytes): # 二进制数据处理 return binary_input_processor(data) else: # 默认处理 return default_input_processor(data) ``` ### 流式管道 LangGraph支持流式处理,适用于大型数据集: ```python def stream_processor(data_stream, chunk_size=100): """流式处理大型数据集""" # 初始化状态 state = {"messages": [], "context": {}, "processed_chunks": 0} # 分块处理 for chunk in data_stream.iter_chunks(chunk_size): # 处理当前块 chunk_result = process_chunk(chunk) # 更新状态 state["messages"].extend(chunk_result["messages"]) state["context"].update(chunk_result["context"]) state["processed_chunks"] += 1 # 可选:产生中间结果 yield state # 返回最终状态 return state ``` ## 实际应用案例 ### 构建聊天机器人API ```python from typing import TypedDict, List, Dict, Any from fastapi import FastAPI, HTTPException from pydantic import BaseModel from langgraph.graph import StateGraph # 定义API模型 class ChatRequest(BaseModel): message: str user_id: str conversation_id: str = None class ChatResponse(BaseModel): response: str conversation_id: str metadata: Dict[str, Any] = {} # 定义状态类型 class ChatState(TypedDict): messages: List[Dict[str, str]] conversation_id: str user_id: str metadata: Dict[str, Any] # 创建图(假设已定义) graph = StateGraph(ChatState) # ... 添加节点和边 ... app = graph.compile() # 输入处理 def process_chat_request(request: ChatRequest) -> ChatState: # 获取对话历史 conversation_id = request.conversation_id or generate_id() conversation_history = get_conversation(conversation_id) if request.conversation_id else [] # 创建初始状态 return { "messages": conversation_history + [{"role": "user", "content": request.message}], "conversation_id": conversation_id, "user_id": request.user_id, "metadata": {"timestamp": get_current_timestamp()} } # 输出处理 def process_chat_response(state: ChatState) -> ChatResponse: # 提取最后一条助手消息 assistant_message = None for msg in reversed(state["messages"]): if msg["role"] == "assistant": assistant_message = msg["content"] break # 保存对话历史 save_conversation(state["conversation_id"], state["messages"]) # 构建响应 return ChatResponse( response=assistant_message or "No response generated", conversation_id=state["conversation_id"], metadata=state["metadata"] ) # 创建FastAPI应用 api = FastAPI(title="LangGraph Chat API") @api.post("/chat", response_model=ChatResponse) async def chat_endpoint(request: ChatRequest): try: # 处理输入 initial_state = process_chat_request(request) # 执行图 final_state = app.invoke(initial_state) # 处理输出 return process_chat_response(final_state) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) ``` ### 数据处理管道 ```python from typing import TypedDict, List, Dict, Any import pandas as pd # 定义状态类型 class DataPipelineState(TypedDict): data: Any metadata: Dict[str, Any] transformations: List[str] # 创建图(假设已定义) graph = StateGraph(DataPipelineState) # ... 添加节点和边 ... app = graph.compile() # 输入处理函数 def process_csv_input(file_path: str) -> DataPipelineState: """处理CSV输入""" # 读取CSV文件 df = pd.read_csv(file_path) # 基本数据清理 df = df.dropna() # 创建初始状态 return { "data": df, "metadata": { "rows": len(df), "columns": list(df.columns), "source": file_path }, "transformations": ["initial_load"] } # 输出处理函数 def process_pipeline_output(state: DataPipelineState) -> Dict[str, Any]: """处理管道输出""" df = state["data"] # 生成摘要统计 summary = { "row_count": len(df), "column_count": len(df.columns), "transformations": state["transformations"], "statistics": { col: { "mean": df[col].mean() if pd.api.types.is_numeric_dtype(df[col]) else None, "min": df[col].min() if pd.api.types.is_numeric_dtype(df[col]) else None, "max": df[col].max() if pd.api.types.is_numeric_dtype(df[col]) else None, "null_count": df[col].isna().sum() } for col in df.columns } } # 返回处理后的数据和摘要 return { "processed_data": df, "summary": summary, "metadata": state["metadata"] } # 使用管道 def run_data_pipeline(file_path: str) -> Dict[str, Any]: # 处理输入 initial_state = process_csv_input(file_path) # 执行图 final_state = app.invoke(initial_state) # 处理输出 return process_pipeline_output(final_state) ``` ## 最佳实践 ### 管道设计原则 1. **单一职责**:每个管道函数应专注于一个特定任务 2. **可组合性**:设计可以灵活组合的小型函数 3. **不可变性**:避免修改输入数据,而是返回新的数据结构 4. **错误处理**:在管道中包含适当的错误处理和验证 5. **类型安全**:使用类型注解确保数据流的类型安全 ### 性能优化 ```python # 缓存昂贵的操作 from functools import lru_cache @lru_cache(maxsize=100) def expensive_knowledge_retrieval(query: str) -> List[Dict]: """昂贵的知识检索操作(带缓存)""" # 复杂的检索逻辑 return retrieve_knowledge(query) # 批处理 def batch_processor(items: List[Any], batch_size=10) -> List[Any]: """批量处理项目""" results = [] for i in range(0, len(items), batch_size): batch = items[i:i+batch_size] batch_results = process_batch(batch) results.extend(batch_results) return results ``` ### 测试管道 ```python import unittest class TestInputPipeline(unittest.TestCase): def test_normalize_text(self): self.assertEqual(normalize_text(" Hello World "), "hello world") def test_extract_entities(self): result = extract_entities("Meet John in New York") self.assertIn("entities", result) # 更多断言... def test_full_pipeline(self): pipeline = Pipeline() pipeline.add_step(normalize_text) pipeline.add_step(extract_entities) pipeline.add_step(create_state) result = pipeline.process(" Hello, John! ") self.assertIn("messages", result) self.assertIn("context", result) # 更多断言... ``` ## 总结 LangGraph的输入输出管道是构建灵活、可维护应用程序的关键组件。通过合理设计输入处理、输出格式化和管道组合,可以实现: 1. **数据流的一致性**:确保数据在整个应用中以一致的格式流动 2. **关注点分离**:将数据处理逻辑与核心业务逻辑分离 3. **可重用性**:创建可在多个应用中重用的管道组件 4. **可扩展性**:轻松添加新的处理步骤而不影响现有功能 5. **可测试性**:独立测试每个管道组件 掌握LangGraph的输入输出管道,是构建高质量LLM应用的必备技能。