核心库:概念指南
图(Graphs)
Spring AI Alibaba Graph 将智能体工作流建模为图。您可以使用三个关键组件来定义智能体的行为:
-
State:共享的数据结构,表示应用程序的当前快照。它由
OverAllState对象表示。 -
Nodes:一个函数式接口 (
AsyncNodeAction),编码智能体的逻辑。它们接收当前的State作为输入,执行一些计算或副作用,并返回更新后的State。或者使用AsyncNodeActionWithConfig,它可以额外接收RunnableConfig用于传递上下文。 -
Edges:一个函数式接口 (
AsyncEdgeAction),根据当前的State确定接下来执行哪个Node。它们可以是条件分支或固定转换。或者使用AsyncEdgeActionWithConfig,它可以额外接收RunnableConfig用于传递上下文。
通过组合 Nodes 和 Edges,您可以创建复杂的循环工作流,工作流在工作过程中持续更新 State,Spring AI Alibaba 会管理好 State,并确保 State 在工作流中传递并持久化。
在 Graph 中,Nodes 和 Edges 就像函数一样 - 它们可以包含 LLM 调用或只是普通的 Java 代码。
简而言之:节点完成工作,边决定下一步做什么。
StateGraph
StateGraph 类 Spring AI Alibaba Graph 中的核心定义,它通过用户定义的状态策略进行参数化。
编译图
要构建您的图,首先定义 state,然后添加 nodes 和 edges,最后编译它。编译图是什么意思?为什么需要编译?
编译是一个非常简单的步骤,它提供了对图结构的一些基本检查(没有孤立节点等),这也是您可以指定运行时参数(如检查点器和中断点)的地方。
编译本身并没有什么额外复杂的操作,它只是帮你做图编排的检查、预设置一些 config 参数而已。调用 .compile() 方法来编译图:
import com.alibaba.cloud.ai.graph.StateGraph;
import com.alibaba.cloud.ai.graph.CompiledGraph;
// 编译您的图
CompiledGraph graph = stateGraph.compile();
在使用图之前,您必须编译它。
Schema
OverAllState(状态)
定义图时首先要做的是定义图的 State。State 由图的 Key 以及 KeyStrategy 函数 组成,KeyStrategy 函数 用于多个节点更新同一个 key 时应该如何处理多个值(比如合并或覆盖)。State 的 key 将是图中所有 Nodes 和 Edges 的输入 schema。所有 Nodes 将发出对 State 的更新,通过返回一个包含一系列 key-value 对的 Map,然后图引擎会使用指定的 KeyStrategy 函数应用这些更新到 State。
KeyStrategy
KeyStrategy 是理解如何将节点的更新应用到 State 的关键。State 中的每个键都有自己独立的 Strategy 策略。如果没有显式指定 Strategy 策略,则默认使用 AppendStrategy,即假定该键的所有更新都应覆盖它。
让我们看几个例子来更好地理解它们。
示例 A:
import com.alibaba.cloud.ai.graph.KeyStrategyFactory;
import com.alibaba.cloud.ai.graph.KeyStrategy;
import com.alibaba.cloud.ai.graph.state.strategy.AppendStrategy;
import java.util.HashMap;
import java.util.Map;
public static KeyStrategyFactory createKeyStrategyFactory() {
return () -> {
Map<String, KeyStrategy> keyStrategyMap = new HashMap<>();
keyStrategyMap.put("messages", new AppendStrategy());
return keyStrategyMap;
};
}
var graphBuilder = new StateGraph(createKeyStrategyFactory());
ReplaceStrategy(覆盖策略)
ReplaceStrategy 会用新值完全替换旧值。当多个节点返回同一个 key 的 Map 时,后执行的节点会覆盖先执行节点的值。
示例:演示 ReplaceStrategy 的替换效果
// 定义状态策略,使用 ReplaceStrategy
KeyStrategyFactory keyStrategyFactory = () -> {
Map<String, KeyStrategy> keyStrategyMap = new HashMap<>();
keyStrategyMap.put("value", new ReplaceStrategy()); // 使用替换策略
return keyStrategyMap;
};
// 节点 A:返回 value = "初始值"
var nodeA = node_async(state -> {
return Map.of("value", "初始值");
});
// 节点 B:返回 value = "更新后的值"(会覆盖节点 A 的值)
var nodeB = node_async(state -> {
return Map.of("value", "更新后的值");
});
// 构建图
StateGraph stateGraph = new StateGraph(keyStrategyFactory)
.addNode("node_a", nodeA)
.addNode("node_b", nodeB)
.addEdge(START, "node_a")
.addEdge("node_a", "node_b")
.addEdge("node_b", END);
// 编译并执行
CompiledGraph graph = stateGraph.compile();
RunnableConfig config = RunnableConfig.builder()
.threadId("replace-strategy-demo")
.build();
// 执行图
Optional<OverAllStaste> stateOptional = graph.invoke(Map.of(), config);
// 获取最终状态
System.out.println("最终状态中的 value: " + (String)stateOptional.get().value("value"));
// 输出: 最终状态中的 value: 更新后的值
// 注意:节点 A 的值 "初始值" 已被节点 B 的值 "更新后的值" 完全替换
执行流程说明:
- 节点 A 执行:状态中
value = "初始值" - 节点 B 执行:由于使用
ReplaceStrategy,value被替换为"更新后的值" - 最终状态:
value = "更新后的值"(节点 A 的值已被完全覆盖)
AppendStrategy(追加策略)
AppendStrategy 会将新值追加到旧值中。当多个节点返回同一个 key 的 Map 时,后执行的节点的值会被追加到先执行节点的值之后。
示例:演示 AppendStrategy 的追加效果
// 定义状态策略,使用 AppendStrategy
KeyStrategyFactory keyStrategyFactory = () -> {
Map<String, KeyStrategy> keyStrategyMap = new HashMap<>();
keyStrategyMap.put("messages", new AppendStrategy()); // 使用追加策略
return keyStrategyMap;
};
// 节点 A:返回 messages = "消息1"
var nodeA = node_async(state -> {
return Map.of("messages", "消息1");
});
// 节点 B:返回 messages = "消息2"(会追加到节点 A 的值之后)
var nodeB = node_async(state -> {
return Map.of("messages", "消息2");
});
// 节点 C:返回 messages = "消息3"(会追加到之前的消息之后)
var nodeC = node_async(state -> {
return Map.of("messages", "消息3");
});
// 构建图
StateGraph stateGraph = new StateGraph(keyStrategyFactory)
.addNode("node_a", nodeA)
.addNode("node_b", nodeB)
.addNode("node_c", nodeC)
.addEdge(START, "node_a")
.addEdge("node_a", "node_b")
.addEdge("node_b", "node_c")
.addEdge("node_c", END);
// 编译并执行
CompiledGraph graph = stateGraph.compile();
RunnableConfig config = RunnableConfig.builder()
.threadId("append-strategy-demo")
.build();
// 执行图
Optional<OverAllState> stateOptional = graph.invoke(Map.of(), config);
// 获取最终状态
List<String> messages = (List<String>) stateOptional.get().value("messages").orElse(List.of());
System.out.println("最终状态中的 messages: " + messages);
// 输出: 最终状态中的 messages: [消息1, 消息2, 消息3]
// 注意:所有节点的值都被追加到列表中,而不是被替换
执行流程说明:
- 节点 A 执行:状态中
messages = ["消息1"] - 节点 B 执行:由于使用
AppendStrategy,messages变为["消息1", "消息2"] - 节点 C 执行:继续追加,
messages变为["消息1", "消息2", "消息3"] - 最终状态:
messages = ["消息1", "消息2", "消息3"](所有节点的值都被保留并追加)
如何在 AppendStrategy 策略的删除消息
[AppendStrategy] 支持通过 [RemoveByHash] 删除消息。
Spring AI Alibaba 提供了内置的 [RemoveByHash],允许通过比较其 hashCode 来删除消息,下面是其用法示例:
import static com.alibaba.cloud.ai.graph.StateGraph.END;
import static com.alibaba.cloud.ai.graph.StateGraph.START;
import static com.alibaba.cloud.ai.graph.action.AsyncNodeAction.node_async;
import com.alibaba.cloud.ai.graph.state.RemoveByHash;
var workflow = new StateGraph(createKeyStrategyFactory())
.addNode("agent_1", node_async(state ->
Map.of("messages", "message1")))
.addNode("agent_2", node_async(state ->
Map.of("messages", "message2.1")))
.addNode("agent_3", node_async(state ->
Map.of("messages", RemoveByHash.of("message2.1")))) // 从消息值中删除 "message2.1"
.addEdge(START, "agent_1")
.addEdge("agent_1", "agent_2")
.addEdge("agent_2", "agent_3")
.addEdge("agent_3", END);
自定义 KeyStrategy
您也可以为特定的状态属性指定自定义的 Strategy
序列化器(Serializer)
在图执行期间,状态需要被序列化(主要用于克隆目的),同时也为跨不同执行持久化状态提供能力。Spring AI Alibaba 目前提供了 Jackson、JDK 两种序列化策略实现。默认使用 Jackson 实现。
在序列化过程中,重点关注如下内容:
- 不依赖不安全的标准序列化框架
- 允许为第三方(非可序列化)类实现序列化
- 尽可能避免类加载问题
- 在序列化过程中管理可空值
自定义序列化器
默认情况下,Graph 使用 Jackson 作为序列化器,并且对几个主流模型厂商的 Message 类型做了兼容。但是对于用户自定义的消息类型,可能无法做到完全兼容,这里有三种常见做法: