Swarms 智能体框架核心实现源码总结

  |   0 评论   |   0 浏览

Swarms 智能体框架核心实现源码总结

核心架构图

┌─────────────────────────────────────────────────────────┐
│   Workflow Layer (流程编排层)                            │
│   📁 swarms/structs/agent_rearrange.py                  │
│   📁 swarms/structs/sequential_workflow.py              │
│   📁 swarms/structs/concurrent_workflow.py              │
└─────────────────────────────────────────────────────────┘
                        ↓
┌─────────────────────────────────────────────────────────┐
│   Agent Layer (智能体层)                                 │
│   📁 swarms/structs/agent.py                            │
│   • Agent._run() - 核心执行循环                          │
│   • Agent.call_llm() - LLM调用                           │
│   • Agent.tool_execution_retry() - 工具执行              │
└─────────────────────────────────────────────────────────┘
                        ↓
┌─────────────────────────────────────────────────────────┐
│   Communication Layer (通信层)                          │
│   📁 swarms/structs/conversation.py                     │
│   • Conversation.add() - 消息管理                        │
│   • Conversation.get_str() - 上下文获取                  │
└─────────────────────────────────────────────────────────┘

一、核心类与关键位置

1. Agent 类(核心智能体)

位置: swarms/structs/agent.py:187

class Agent:
    """
    核心智能体类,连接LLM、工具和内存
    关键方法:
    - __init__() - 初始化(行187-887)
    - run() - 入口方法(调用_run)
    - _run() - 核心执行循环(行1342-1652)
    - call_llm() - LLM调用
    - tool_execution_retry() - 工具执行
    - short_memory_init() - 内存初始化(行848-887)
    """

2. AgentRearrange 类(流程编排)

位置: swarms/structs/agent_rearrange.py:20

class AgentRearrange:
    """
    多智能体流程编排器
    关键方法:
    - __init__() - 初始化(行86-199)
    - run() - 入口方法(行727-737)
    - _run() - 核心编排逻辑(行583-698)
    - _run_sequential_workflow() - 顺序执行(行525-581)
    - _run_concurrent_workflow() - 并发执行(行469-523)
    """

3. Conversation 类(对话管理)

位置: swarms/structs/conversation.py:51

class Conversation:
    """
    对话历史管理器
    关键方法:
    - __init__() - 初始化(行75-129)
    - add() - 添加消息(行237-280)
    - get_str() - 获取字符串格式历史
    - return_history_as_string() - 返回完整历史
    """

二、核心执行流程(ReAct模式)

阶段1:初始化

位置: swarms/structs/agent.py:848-887

def short_memory_init(self):
    """初始化短期内存(对话历史)"""
    # 1. 构建系统提示词
    prompt_dict = {}
    if self.agent_name:
        prompt_dict["name"] = f"Your Name: {self.agent_name}"
    if self.system_prompt:
        prompt_dict["instructions"] = f"Your Instructions: {self.system_prompt}"
  
    # 2. 创建Conversation对象
    memory = Conversation(
        name=f"{self.agent_name}_id_{self.id}_conversation",
        system_prompt=prompt,
        user=self.user_name,
        context_length=self.context_length,
    )
    return memory

阶段2:主执行循环

位置: swarms/structs/agent.py:1342-1652

def _run(self, task, img=None, streaming_callback=None, *args, **kwargs):
    """核心执行循环 - ReAct模式的核心实现"""
  
    # ========== 步骤1: 初始化任务 ==========
    self.short_memory.add(role=self.user_name, content=task)  # 行1382
  
    # ========== 步骤2: RAG查询(可选)==========
    if self.long_term_memory and not self.rag_every_loop:
        self.handle_rag_query(task)  # 行1389
  
    # ========== 步骤3: 规划(可选)==========
    if self.plan_enabled:
        self.plan(task)  # 行1392
  
    # ========== 步骤4: 主循环(ReAct循环)==========
    loop_count = 0
    while loop_count < self.max_loops:  # 行1406-1630
        loop_count += 1
      
        # 4.1 添加推理提示(反思机制)
        if self.reasoning_prompt_on and self.max_loops >= 2:
            self.short_memory.add(
                role=self.agent_name,
                content=f"Current Internal Reasoning Loop: {loop_count}/{self.max_loops}"
            )  # 行1427-1431
      
        # 4.2 构建任务提示词(包含完整历史)
        task_prompt = self.short_memory.return_history_as_string()  # 行1459-1461
      
        # 4.3 调用LLM(Thought阶段)
        response = self.call_llm(
            task=task_prompt,
            current_loop=loop_count,
            streaming_callback=streaming_callback,
        )  # 行1479-1485
      
        # 4.4 解析响应
        response = self.parse_llm_output(response)  # 行1495
      
        # 4.5 存入内存
        self.short_memory.add(
            role=self.agent_name,
            content=response
        )  # 行1497-1500
      
        # 4.6 工具执行(Action阶段)
        if self.tools:
            self.tool_execution_retry(response, loop_count)  # 行1520-1523
      
        # 4.7 检查停止条件
        if self.stopping_condition and self._check_stopping_condition(response):
            break  # 行1587-1595
  
    # ========== 步骤5: 返回格式化结果 ==========
    return history_output_formatter(
        self.short_memory, type=self.output_type
    )  # 行1638-1640

阶段3:工具执行(Action)

位置: swarms/structs/agent.py:1520-1523(调用tool_execution_retry)

def tool_execution_retry(self, response, loop_count):
    """工具执行重试机制"""
    # 1. 检测LLM响应中是否包含工具调用
    # 2. 解析工具调用参数
    # 3. 执行工具(Action)
    # 4. 将工具结果(Observation)存入内存
    # 5. 重试机制处理错误

关键点

  • LLM通过Function Calling返回工具调用
  • 框架自动执行工具
  • 工具结果作为Observation存入对话历史
  • 下一轮循环LLM可以看到Observation

阶段4:多智能体编排

位置: swarms/structs/agent_rearrange.py:583-698

def _run(self, task=None, img=None, *args, **kwargs):
    """流程编排核心逻辑"""
  
    # ========== 步骤1: 初始化共享对话 ==========
    self.conversation.add("User", task)  # 行627
  
    # ========== 步骤2: 解析Flow字符串 ==========
    tasks = self.flow.split("->")  # 例如: "agent1 -> agent2, agent3"
    # 行633
  
    # ========== 步骤3: 执行每个阶段 ==========
    for task_idx, task in enumerate(tasks):  # 行659
        agent_names = [name.strip() for name in task.split(",")]
      
        if len(agent_names) > 1:
            # 并发执行
            concurrent_results = self._run_concurrent_workflow(
                agent_names=agent_names, img=img
            )  # 行666-673
        else:
            # 顺序执行
            agent_name = agent_names[0]
            result = self._run_sequential_workflow(
                agent_name=agent_name, tasks=tasks, img=img
            )  # 行679-685
  
    # ========== 步骤4: 返回结果 ==========
    return history_output_formatter(
        conversation=self.conversation, type=self.output_type
    )  # 行692-695

顺序执行关键代码(行525-581):

def _run_sequential_workflow(self, agent_name, tasks, img=None):
    """顺序执行单个agent"""
    agent = self.agents[agent_name]
  
    # 添加团队感知信息
    if self.team_awareness:
        awareness_info = self._get_sequential_awareness(agent_name, tasks)
        self.conversation.add("system", awareness_info)  # 行566
  
    # 关键:agent从共享对话历史获取完整上下文
    current_task = agent.run(
        task=self.conversation.get_str(),  # 获取完整对话历史
        img=img
    )  # 行571-576
  
    # 将结果存入共享对话历史
    self.conversation.add(agent.agent_name, current_task)  # 行579
  
    return current_task

三、关键设计模式

1. 共享对话历史模式

位置: swarms/structs/conversation.py:237-280

def add(self, role: str, content: Union[str, dict, list]):
    """添加消息到对话历史"""
    message = {
        "role": role,
        "content": content,
        "timestamp": datetime.now() if self.time_enabled else None
    }
    self.conversation_history.append(message)
  
    # 自动保存(如果启用)
    if self.autosave:
        self._autosave()

使用场景

  • Agent执行后:conversation.add("agent_name", response)
  • 下一个Agent获取:conversation.get_str() → 包含所有历史

2. Flow DSL解析模式

位置: swarms/structs/agent_rearrange.py:295-337

def validate_flow(self):
    """验证Flow字符串"""
    # Flow语法:
    # "->" 表示顺序
    # "," 表示并发
    # 例如: "agent1 -> agent2, agent3 -> agent4"
  
    tasks = self.flow.split("->")  # 分割顺序阶段
    for task in tasks:
        agent_names = [name.strip() for name in task.split(",")]  # 检测并发
        # 验证agent是否存在

3. ReAct循环模式

位置: swarms/structs/agent.py:1406-1630

# ReAct循环结构:
while loop_count < self.max_loops:
    # Thought: LLM生成响应(可能包含工具调用)
    response = self.call_llm(task_prompt)
  
    # Action: 执行工具(如果LLM返回工具调用)
    if self.tools:
        self.tool_execution_retry(response)
  
    # Observation: 工具结果自动存入内存
    # 下一轮循环LLM可以看到Observation

四、关键源码位置速查表

功能文件位置行号范围说明
Agent类定义swarms/structs/agent.py187-4012核心智能体类
核心执行循环swarms/structs/agent.py1342-1652_run()方法
内存初始化swarms/structs/agent.py848-887short_memory_init()
LLM调用swarms/structs/agent.py~1000+call_llm()方法
工具执行swarms/structs/agent.py~2000+tool_execution_retry()
流程编排swarms/structs/agent_rearrange.py583-698_run()方法
顺序执行swarms/structs/agent_rearrange.py525-581_run_sequential_workflow()
并发执行swarms/structs/agent_rearrange.py469-523_run_concurrent_workflow()
对话管理swarms/structs/conversation.py51-1258Conversation类
消息添加swarms/structs/conversation.py237-280add()方法
ReAct提示词swarms/prompts/react_base_prompt.py1-42REACT_SYS_PROMPT

五、完整执行流程示例

# ========== 用户代码 ==========
from swarms import Agent
from swarms.prompts.react_base_prompt import REACT_SYS_PROMPT

agent = Agent(
    system_prompt=REACT_SYS_PROMPT,
    tools=[search_tool],
    max_loops=3,
)

result = agent.run("研究AI发展趋势")

# ========== 内部执行流程 ==========

# 1. agent.run() 调用
#    ↓
# 2. Agent._run() [agent.py:1342]
#    ├─ short_memory.add("User", "研究AI发展趋势") [行1382]
#    ├─ plan() [可选,行1392]
#    └─ while loop_count < max_loops: [行1406]
#        ├─ Thought: call_llm() [行1479]
#        │   └─ 返回: "我需要搜索AI相关信息..."
#        ├─ 存入内存: short_memory.add("agent", response) [行1497]
#        ├─ Action: tool_execution_retry() [行1520]
#        │   ├─ 检测工具调用: search_tool(query="AI发展趋势")
#        │   ├─ 执行工具: search_tool.run()
#        │   └─ Observation: short_memory.add("tool", result)
#        └─ 下一轮循环(LLM看到Observation)
#    └─ 返回: history_output_formatter() [行1638]

六、核心设计要点

  1. 内存驱动:通过Conversation累积上下文,每轮循环LLM看到完整历史
  2. 循环执行:max_loops控制迭代次数,实现多轮推理
  3. 工具集成:Function Calling自动检测和执行工具,结果作为Observation
  4. 流程编排:Flow DSL(->,)定义执行顺序
  5. 共享上下文:多智能体通过共享Conversation对象传递上下文

七、快速上手路径

  1. 先看:agent.py:1342-1652(核心执行循环)
  2. 再看:agent_rearrange.py:583-698(多智能体编排)
  3. 最后:conversation.py:237-280(对话管理)

这三个位置覆盖了框架的核心逻辑。


标题:Swarms 智能体框架核心实现源码总结
作者:guobing
地址:http://guobingwei.tech/articles/2026/01/12/1768191340345.html