Swarms 智能体框架核心实现源码总结
Swarms 智能体框架核心实现源码总结
核心架构图
┌─────────────────────────────────────────────────────────┐
│ Workflow Layer (流程编排层) │
│ 📁 swarms/structs/agent_rearrange.py │
│ 📁 swarms/structs/sequential_workflow.py │
│ 📁 swarms/structs/concurrent_workflow.py │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Agent Layer (智能体层) │
│ 📁 swarms/structs/agent.py │
│ • Agent._run() - 核心执行循环 │
│ • Agent.call_llm() - LLM调用 │
│ • Agent.tool_execution_retry() - 工具执行 │
└─────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────┐
│ Communication Layer (通信层) │
│ 📁 swarms/structs/conversation.py │
│ • Conversation.add() - 消息管理 │
│ • Conversation.get_str() - 上下文获取 │
└─────────────────────────────────────────────────────────┘
一、核心类与关键位置
1. Agent 类(核心智能体)
位置: swarms/structs/agent.py:187
class Agent:
"""
核心智能体类,连接LLM、工具和内存
关键方法:
- __init__() - 初始化(行187-887)
- run() - 入口方法(调用_run)
- _run() - 核心执行循环(行1342-1652)
- call_llm() - LLM调用
- tool_execution_retry() - 工具执行
- short_memory_init() - 内存初始化(行848-887)
"""
2. AgentRearrange 类(流程编排)
位置: swarms/structs/agent_rearrange.py:20
class AgentRearrange:
"""
多智能体流程编排器
关键方法:
- __init__() - 初始化(行86-199)
- run() - 入口方法(行727-737)
- _run() - 核心编排逻辑(行583-698)
- _run_sequential_workflow() - 顺序执行(行525-581)
- _run_concurrent_workflow() - 并发执行(行469-523)
"""
3. Conversation 类(对话管理)
位置: swarms/structs/conversation.py:51
class Conversation:
"""
对话历史管理器
关键方法:
- __init__() - 初始化(行75-129)
- add() - 添加消息(行237-280)
- get_str() - 获取字符串格式历史
- return_history_as_string() - 返回完整历史
"""
二、核心执行流程(ReAct模式)
阶段1:初始化
位置: swarms/structs/agent.py:848-887
def short_memory_init(self):
"""初始化短期内存(对话历史)"""
# 1. 构建系统提示词
prompt_dict = {}
if self.agent_name:
prompt_dict["name"] = f"Your Name: {self.agent_name}"
if self.system_prompt:
prompt_dict["instructions"] = f"Your Instructions: {self.system_prompt}"
# 2. 创建Conversation对象
memory = Conversation(
name=f"{self.agent_name}_id_{self.id}_conversation",
system_prompt=prompt,
user=self.user_name,
context_length=self.context_length,
)
return memory
阶段2:主执行循环
位置: swarms/structs/agent.py:1342-1652
def _run(self, task, img=None, streaming_callback=None, *args, **kwargs):
"""核心执行循环 - ReAct模式的核心实现"""
# ========== 步骤1: 初始化任务 ==========
self.short_memory.add(role=self.user_name, content=task) # 行1382
# ========== 步骤2: RAG查询(可选)==========
if self.long_term_memory and not self.rag_every_loop:
self.handle_rag_query(task) # 行1389
# ========== 步骤3: 规划(可选)==========
if self.plan_enabled:
self.plan(task) # 行1392
# ========== 步骤4: 主循环(ReAct循环)==========
loop_count = 0
while loop_count < self.max_loops: # 行1406-1630
loop_count += 1
# 4.1 添加推理提示(反思机制)
if self.reasoning_prompt_on and self.max_loops >= 2:
self.short_memory.add(
role=self.agent_name,
content=f"Current Internal Reasoning Loop: {loop_count}/{self.max_loops}"
) # 行1427-1431
# 4.2 构建任务提示词(包含完整历史)
task_prompt = self.short_memory.return_history_as_string() # 行1459-1461
# 4.3 调用LLM(Thought阶段)
response = self.call_llm(
task=task_prompt,
current_loop=loop_count,
streaming_callback=streaming_callback,
) # 行1479-1485
# 4.4 解析响应
response = self.parse_llm_output(response) # 行1495
# 4.5 存入内存
self.short_memory.add(
role=self.agent_name,
content=response
) # 行1497-1500
# 4.6 工具执行(Action阶段)
if self.tools:
self.tool_execution_retry(response, loop_count) # 行1520-1523
# 4.7 检查停止条件
if self.stopping_condition and self._check_stopping_condition(response):
break # 行1587-1595
# ========== 步骤5: 返回格式化结果 ==========
return history_output_formatter(
self.short_memory, type=self.output_type
) # 行1638-1640
阶段3:工具执行(Action)
位置: swarms/structs/agent.py:1520-1523(调用tool_execution_retry)
def tool_execution_retry(self, response, loop_count):
"""工具执行重试机制"""
# 1. 检测LLM响应中是否包含工具调用
# 2. 解析工具调用参数
# 3. 执行工具(Action)
# 4. 将工具结果(Observation)存入内存
# 5. 重试机制处理错误
关键点:
- LLM通过Function Calling返回工具调用
- 框架自动执行工具
- 工具结果作为Observation存入对话历史
- 下一轮循环LLM可以看到Observation
阶段4:多智能体编排
位置: swarms/structs/agent_rearrange.py:583-698
def _run(self, task=None, img=None, *args, **kwargs):
"""流程编排核心逻辑"""
# ========== 步骤1: 初始化共享对话 ==========
self.conversation.add("User", task) # 行627
# ========== 步骤2: 解析Flow字符串 ==========
tasks = self.flow.split("->") # 例如: "agent1 -> agent2, agent3"
# 行633
# ========== 步骤3: 执行每个阶段 ==========
for task_idx, task in enumerate(tasks): # 行659
agent_names = [name.strip() for name in task.split(",")]
if len(agent_names) > 1:
# 并发执行
concurrent_results = self._run_concurrent_workflow(
agent_names=agent_names, img=img
) # 行666-673
else:
# 顺序执行
agent_name = agent_names[0]
result = self._run_sequential_workflow(
agent_name=agent_name, tasks=tasks, img=img
) # 行679-685
# ========== 步骤4: 返回结果 ==========
return history_output_formatter(
conversation=self.conversation, type=self.output_type
) # 行692-695
顺序执行关键代码(行525-581):
def _run_sequential_workflow(self, agent_name, tasks, img=None):
"""顺序执行单个agent"""
agent = self.agents[agent_name]
# 添加团队感知信息
if self.team_awareness:
awareness_info = self._get_sequential_awareness(agent_name, tasks)
self.conversation.add("system", awareness_info) # 行566
# 关键:agent从共享对话历史获取完整上下文
current_task = agent.run(
task=self.conversation.get_str(), # 获取完整对话历史
img=img
) # 行571-576
# 将结果存入共享对话历史
self.conversation.add(agent.agent_name, current_task) # 行579
return current_task
三、关键设计模式
1. 共享对话历史模式
位置: swarms/structs/conversation.py:237-280
def add(self, role: str, content: Union[str, dict, list]):
"""添加消息到对话历史"""
message = {
"role": role,
"content": content,
"timestamp": datetime.now() if self.time_enabled else None
}
self.conversation_history.append(message)
# 自动保存(如果启用)
if self.autosave:
self._autosave()
使用场景:
- Agent执行后:
conversation.add("agent_name", response) - 下一个Agent获取:
conversation.get_str()→ 包含所有历史
2. Flow DSL解析模式
位置: swarms/structs/agent_rearrange.py:295-337
def validate_flow(self):
"""验证Flow字符串"""
# Flow语法:
# "->" 表示顺序
# "," 表示并发
# 例如: "agent1 -> agent2, agent3 -> agent4"
tasks = self.flow.split("->") # 分割顺序阶段
for task in tasks:
agent_names = [name.strip() for name in task.split(",")] # 检测并发
# 验证agent是否存在
3. ReAct循环模式
位置: swarms/structs/agent.py:1406-1630
# ReAct循环结构:
while loop_count < self.max_loops:
# Thought: LLM生成响应(可能包含工具调用)
response = self.call_llm(task_prompt)
# Action: 执行工具(如果LLM返回工具调用)
if self.tools:
self.tool_execution_retry(response)
# Observation: 工具结果自动存入内存
# 下一轮循环LLM可以看到Observation
四、关键源码位置速查表
| 功能 | 文件位置 | 行号范围 | 说明 |
|---|---|---|---|
| Agent类定义 | swarms/structs/agent.py | 187-4012 | 核心智能体类 |
| 核心执行循环 | swarms/structs/agent.py | 1342-1652 | _run()方法 |
| 内存初始化 | swarms/structs/agent.py | 848-887 | short_memory_init() |
| LLM调用 | swarms/structs/agent.py | ~1000+ | call_llm()方法 |
| 工具执行 | swarms/structs/agent.py | ~2000+ | tool_execution_retry() |
| 流程编排 | swarms/structs/agent_rearrange.py | 583-698 | _run()方法 |
| 顺序执行 | swarms/structs/agent_rearrange.py | 525-581 | _run_sequential_workflow() |
| 并发执行 | swarms/structs/agent_rearrange.py | 469-523 | _run_concurrent_workflow() |
| 对话管理 | swarms/structs/conversation.py | 51-1258 | Conversation类 |
| 消息添加 | swarms/structs/conversation.py | 237-280 | add()方法 |
| ReAct提示词 | swarms/prompts/react_base_prompt.py | 1-42 | REACT_SYS_PROMPT |
五、完整执行流程示例
# ========== 用户代码 ==========
from swarms import Agent
from swarms.prompts.react_base_prompt import REACT_SYS_PROMPT
agent = Agent(
system_prompt=REACT_SYS_PROMPT,
tools=[search_tool],
max_loops=3,
)
result = agent.run("研究AI发展趋势")
# ========== 内部执行流程 ==========
# 1. agent.run() 调用
# ↓
# 2. Agent._run() [agent.py:1342]
# ├─ short_memory.add("User", "研究AI发展趋势") [行1382]
# ├─ plan() [可选,行1392]
# └─ while loop_count < max_loops: [行1406]
# ├─ Thought: call_llm() [行1479]
# │ └─ 返回: "我需要搜索AI相关信息..."
# ├─ 存入内存: short_memory.add("agent", response) [行1497]
# ├─ Action: tool_execution_retry() [行1520]
# │ ├─ 检测工具调用: search_tool(query="AI发展趋势")
# │ ├─ 执行工具: search_tool.run()
# │ └─ Observation: short_memory.add("tool", result)
# └─ 下一轮循环(LLM看到Observation)
# └─ 返回: history_output_formatter() [行1638]
六、核心设计要点
- 内存驱动:通过
Conversation累积上下文,每轮循环LLM看到完整历史 - 循环执行:
max_loops控制迭代次数,实现多轮推理 - 工具集成:Function Calling自动检测和执行工具,结果作为Observation
- 流程编排:Flow DSL(
->和,)定义执行顺序 - 共享上下文:多智能体通过共享
Conversation对象传递上下文
七、快速上手路径
- 先看:
agent.py:1342-1652(核心执行循环) - 再看:
agent_rearrange.py:583-698(多智能体编排) - 最后:
conversation.py:237-280(对话管理)
这三个位置覆盖了框架的核心逻辑。
关于我