跳到主要内容

人工介入(Human-in-the-Loop)

人工介入(HITL)Hook 允许你为 Agent 工具调用添加人工监督。当模型提出需要审查的操作时——例如写入文件或执行 SQL——Hook 可以暂停执行并等待人工决策。

它通过检查每个工具调用并与可配置的策略进行比对来实现。如果需要人工干预,Hook 会发出中断(interrupt)来暂停执行。图的状态会通过 Spring AI Alibaba 的检查点机制保存,因此执行可以安全暂停并在之后恢复。

人工决策决定接下来发生什么:操作可以被原样批准(approve)、修改后运行(edit)或拒绝并提供反馈(reject)。

中断决策类型

Hook 定义了三种人工响应中断的内置方式:

决策类型描述使用场景示例
approve操作被原样批准并执行,不做任何更改完全按照写好的内容发送电子邮件
✏️ edit工具调用将被修改后执行在发送电子邮件之前更改收件人
reject工具调用被拒绝,并向对话中添加解释拒绝电子邮件草稿并解释如何重写

每个工具可用的决策类型取决于你在 approvalOn 中配置的策略。当多个工具调用同时暂停时,每个操作都需要单独的决策。

配置中断

要使用 HITL,在创建 Agent 时将 Hook 添加到 Agent 的 hooks 列表中。

你可以配置哪些工具需要人工审批,以及为每个工具允许哪些决策类型。

HumanInTheLoopHook 配置示例查看完整代码
import com.alibaba.cloud.ai.graph.agent.ReactAgent;
import com.alibaba.cloud.ai.graph.agent.hook.hip.HumanInTheLoopHook;
import com.alibaba.cloud.ai.graph.agent.hook.hip.ToolConfig;
import com.alibaba.cloud.ai.graph.checkpoint.savers.MemorySaver;

// 配置检查点保存器(人工介入需要检查点来处理中断)
MemorySaver memorySaver = new MemorySaver();

// 创建人工介入Hook
HumanInTheLoopHook humanInTheLoopHook = HumanInTheLoopHook.builder() // [!code highlight]
.approvalOn("write_file", ToolConfig.builder() // [!code highlight]
.description("文件写入操作需要审批") // [!code highlight]
.build()) // [!code highlight]
.approvalOn("execute_sql", ToolConfig.builder() // [!code highlight]
.description("SQL执行操作需要审批") // [!code highlight]
.build()) // [!code highlight]
.build(); // [!code highlight]

// 创建Agent
ReactAgent agent = ReactAgent.builder()
.name("approval_agent")
.model(chatModel)
.tools(writeFileTool, executeSqlTool, readDataTool)
.hooks(List.of(humanInTheLoopHook)) // [!code highlight]
.saver(memorySaver) // [!code highlight]
.build();
信息

你必须配置检查点保存器来在中断期间持久化图状态。 在生产环境中,使用持久化的检查点保存器(如基于 Redis 或 PostgreSQL 的实现)。对于测试或原型开发,使用 MemorySaver

调用 Agent 时,传递包含线程 IDRunnableConfig 以将执行与会话线程关联。

响应中断

当你调用 Agent 时,它会一直运行直到完成或触发中断。当工具调用匹配你在 approvalOn 中配置的策略时会触发中断。在这种情况下,调用结果将返回 InterruptionMetadata,其中包含需要审查的操作。你可以将这些操作呈现给审查者,并在提供决策后恢复执行。

响应中断示例查看完整代码
import com.alibaba.cloud.ai.graph.RunnableConfig;
import com.alibaba.cloud.ai.graph.NodeOutput;
import com.alibaba.cloud.ai.graph.action.InterruptionMetadata;

// 人工介入利用检查点机制。
// 你必须提供线程ID以将执行与会话线程关联,
// 以便可以暂停和恢复对话(人工审查所需)。
String threadId = "user-session-123"; // [!code highlight]
RunnableConfig config = RunnableConfig.builder() // [!code highlight]
.threadId(threadId) // [!code highlight]
.build(); // [!code highlight]

// 运行图直到触发中断
Optional<NodeOutput> result = agent.invokeAndGetOutput( // [!code highlight]
"删除数据库中的旧记录",
config
);

// 检查是否返回了中断
if (result.isPresent() && result.get() instanceof InterruptionMetadata) { // [!code highlight]
InterruptionMetadata interruptionMetadata = (InterruptionMetadata) result.get(); // [!code highlight]

// 中断包含需要审查的工具反馈
List<InterruptionMetadata.ToolFeedback> toolFeedbacks = // [!code highlight]
interruptionMetadata.toolFeedbacks(); // [!code highlight]

for (InterruptionMetadata.ToolFeedback feedback : toolFeedbacks) {
System.out.println("工具: " + feedback.getName());
System.out.println("参数: " + feedback.getArguments());
System.out.println("描述: " + feedback.getDescription());
}

// 示例输出:
// 工具: execute_sql
// 参数: {"query": "DELETE FROM records WHERE created_at < NOW() - INTERVAL '30 days';"}
// 描述: SQL执行操作需要审批
}

决策类型

执行生命周期

Hook 定义了一个在模型生成响应后但在执行任何工具调用之前运行的 afterModel 钩子:

  1. Agent 调用模型生成响应。
  2. Hook 检查响应中的工具调用。
  3. 如果任何调用需要人工输入,Hook 会构建包含工具反馈信息的 InterruptionMetadata 并触发中断。
  4. Agent 等待人工决策。
  5. 基于 InterruptionMetadata 中的决策,Hook 执行批准或编辑的调用,为拒绝的调用合成工具响应消息,并恢复执行。

完整示例

HumanInTheLoop 完整示例查看完整代码
import com.alibaba.cloud.ai.graph.agent.ReactAgent;
import com.alibaba.cloud.ai.graph.agent.hook.hip.HumanInTheLoopHook;
import com.alibaba.cloud.ai.graph.agent.hook.hip.ToolConfig;
import com.alibaba.cloud.ai.graph.RunnableConfig;
import com.alibaba.cloud.ai.graph.NodeOutput;
import com.alibaba.cloud.ai.graph.action.InterruptionMetadata;
import com.alibaba.cloud.ai.graph.checkpoint.savers.MemorySaver;

public class HumanInTheLoopExample {

public static void main(String[] args) throws Exception {
// 1. 配置检查点
MemorySaver memorySaver = new MemorySaver();

// 2. 创建人工介入Hook
HumanInTheLoopHook humanInTheLoopHook = HumanInTheLoopHook.builder()
.approvalOn("poem", ToolConfig.builder()
.description("请确认诗歌创作操作")
.build())
.build();

// 3. 创建Agent
ReactAgent agent = ReactAgent.builder()
.name("poet_agent")
.model(chatModel)
.tools(List.of(poetToolCallback))
.hooks(List.of(humanInTheLoopHook))
.saver(memorySaver)
.build();

String threadId = "user-session-001";
RunnableConfig config = RunnableConfig.builder()
.threadId(threadId)
.build();

// 4. 第一次调用 - 触发中断
System.out.println("=== 第一次调用:期望中断 ===");
Optional<NodeOutput> result = agent.invokeAndGetOutput(
"帮我写一首100字左右的诗",
config
);

// 5. 检查中断并处理
if (result.isPresent() && result.get() instanceof InterruptionMetadata) {
InterruptionMetadata interruptionMetadata = (InterruptionMetadata) result.get();

System.out.println("检测到中断,需要人工审批");
List<InterruptionMetadata.ToolFeedback> toolFeedbacks =
interruptionMetadata.toolFeedbacks();

for (InterruptionMetadata.ToolFeedback feedback : toolFeedbacks) {
System.out.println("工具: " + feedback.getName());
System.out.println("参数: " + feedback.getArguments());
System.out.println("描述: " + feedback.getDescription());
}

// 6. 模拟人工决策(这里选择批准)
InterruptionMetadata.Builder feedbackBuilder = InterruptionMetadata.builder()
.nodeId(interruptionMetadata.node())
.state(interruptionMetadata.state());

toolFeedbacks.forEach(toolFeedback -> {
InterruptionMetadata.ToolFeedback approvedFeedback =
InterruptionMetadata.ToolFeedback.builder(toolFeedback)
.result(InterruptionMetadata.ToolFeedback.FeedbackResult.APPROVED)
.build();
feedbackBuilder.addToolFeedback(approvedFeedback);
});

InterruptionMetadata approvalMetadata = feedbackBuilder.build();

// 7. 第二次调用 - 使用人工反馈恢复执行
System.out.println("
=== 第二次调用:使用批准决策恢复 ===");
RunnableConfig resumeConfig = RunnableConfig.builder()
.threadId(threadId)
.addMetadata(RunnableConfig.HUMAN_FEEDBACK_METADATA_KEY, approvalMetadata)
.build();

Optional<NodeOutput> finalResult = agent.invokeAndGetOutput("", resumeConfig);

if (finalResult.isPresent()) {
System.out.println("执行完成");
System.out.println("最终结果: " + finalResult.get());
}
}
}
}

Workflow 中嵌套 Agent 的人工中断

在复杂的应用场景中,你可能需要在 StateGraph 工作流中嵌套 ReactAgent,并为嵌套的 Agent 配置人工介入能力。这允许你在工作流执行过程中对 Agent 的工具调用进行人工监督。

工作原理

ReactAgent 作为工作流中的一个节点时,如果 Agent 配置了 HumanInTheLoopHook,工作流会在 Agent 节点触发工具调用中断时暂停。工作流级别的中断处理与单独的 Agent 中断处理类似,但需要在 CompiledGraph 层面进行恢复。

配置要点

  1. 检查点配置: 必须在 CompileConfig 中注册检查点保存器,以便在工作流级别保存和恢复状态
  2. Agent 配置: 嵌套的 Agent 也需要配置检查点保存器(使用相同的实例)
  3. 中断处理: 使用 CompiledGraph.invokeAndGetOutput() 检查中断,并使用 addHumanFeedback() 恢复执行
Workflow 中嵌套 Agent 的人工中断示例查看完整代码
import com.alibaba.cloud.ai.graph.CompileConfig;
import com.alibaba.cloud.ai.graph.CompiledGraph;
import com.alibaba.cloud.ai.graph.KeyStrategy;
import com.alibaba.cloud.ai.graph.KeyStrategyFactory;
import com.alibaba.cloud.ai.graph.NodeOutput;
import com.alibaba.cloud.ai.graph.OverAllState;
import com.alibaba.cloud.ai.graph.RunnableConfig;
import com.alibaba.cloud.ai.graph.StateGraph;
import com.alibaba.cloud.ai.graph.action.InterruptionMetadata;
import com.alibaba.cloud.ai.graph.action.NodeAction;
import com.alibaba.cloud.ai.graph.agent.ReactAgent;
import com.alibaba.cloud.ai.graph.agent.hook.hip.HumanInTheLoopHook;
import com.alibaba.cloud.ai.graph.agent.hook.hip.ToolConfig;
import com.alibaba.cloud.ai.graph.checkpoint.config.SaverConfig;
import com.alibaba.cloud.ai.graph.checkpoint.savers.MemorySaver;
import com.alibaba.cloud.ai.graph.state.strategy.ReplaceStrategy;

import org.springframework.ai.chat.messages.Message;
import org.springframework.ai.tool.ToolCallback;
import org.springframework.ai.tool.function.FunctionToolCallback;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.alibaba.cloud.ai.graph.action.AsyncEdgeAction.edge_async;
import static com.alibaba.cloud.ai.graph.action.AsyncNodeAction.node_async;

// 1. 创建工具回调
ToolCallback searchTool = FunctionToolCallback
.builder("search", (args) -> "搜索结果:AI Agent是能够感知环境、自主决策并采取行动的智能系统。")
.description("搜索工具,用于查找相关信息")
.inputType(String.class)
.build();

// 2. 配置检查点保存器(工作流和Agent共享)
MemorySaver saver = new MemorySaver();

// 3. 创建带有人工介入Hook的ReactAgent
ReactAgent qaAgent = ReactAgent.builder()
.name("qa_agent")
.model(chatModel)
.instruction("你是一个问答专家,负责回答用户的问题。如果需要搜索信息,请使用search工具。
用户问题:{cleaned_input}")
.outputKey("qa_result")
.saver(saver) // [!code highlight]
.hooks(HumanInTheLoopHook.builder() // [!code highlight]
.approvalOn("search", ToolConfig.builder()
.description("搜索操作需要人工审批,请确认是否执行搜索")
.build())
.build())
.tools(searchTool)
.build();

// 4. 创建自定义Node(预处理)
class PreprocessorNode implements NodeAction {
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
String input = state.value("input", "").toString();
String cleaned = input.trim();
return Map.of("cleaned_input", cleaned);
}
}

// 5. 创建自定义Node(验证)
class ValidatorNode implements NodeAction {
@Override
public Map<String, Object> apply(OverAllState state) throws Exception {
Optional<Object> qaResultOpt = state.value("qa_result");
if (qaResultOpt.isPresent() && qaResultOpt.get() instanceof Message message) {
boolean isValid = message.getText().length() > 30;
return Map.of("is_valid", isValid);
}
return Map.of("is_valid", false);
}
}

// 6. 定义状态管理策略
KeyStrategyFactory keyStrategyFactory = () -> {
HashMap<String, KeyStrategy> strategies = new HashMap<>();
strategies.put("input", new ReplaceStrategy());
strategies.put("cleaned_input", new ReplaceStrategy());
strategies.put("qa_result", new ReplaceStrategy());
strategies.put("is_valid", new ReplaceStrategy());
return strategies;
};

// 7. 构建工作流
StateGraph workflow = new StateGraph(keyStrategyFactory);

// 添加普通Node
workflow.addNode("preprocess", node_async(new PreprocessorNode()));
workflow.addNode("validate", node_async(new ValidatorNode()));

// 添加Agent Node(嵌套的ReactAgent)
workflow.addNode(qaAgent.name(), qaAgent.asNode( // [!code highlight]
true, // includeContents: 传递父图的消息历史
false // includeReasoning: 不返回推理过程
));

// 定义流程:预处理 -> Agent处理 -> 验证
workflow.addEdge(StateGraph.START, "preprocess");
workflow.addEdge("preprocess", qaAgent.name());
workflow.addEdge(qaAgent.name(), "validate");

// 条件边:验证通过则结束,否则重新处理
workflow.addConditionalEdges(
"validate",
edge_async(state -> {
Boolean isValid = (Boolean) state.value("is_valid", false);
return isValid ? "end" : qaAgent.name();
}),
Map.of(
"end", StateGraph.END,
qaAgent.name(), qaAgent.name()
)
);

// 8. 编译工作流(必须在CompileConfig中注册检查点保存器)
CompiledGraph compiledGraph = workflow.compile( // [!code highlight]
CompileConfig.builder()
.saverConfig(SaverConfig.builder().register(saver).build()) // [!code highlight]
.build()
);

// 9. 执行工作流并处理中断
String threadId = "workflow-hilt-001";
Map<String, Object> input = Map.of("input", "请解释量子计算的基本原理");

// 第一次调用 - 可能触发中断
Optional<NodeOutput> nodeOutputOptional = compiledGraph.invokeAndGetOutput( // [!code highlight]
input,
RunnableConfig.builder().threadId(threadId).build()
);

// 检查是否发生中断
if (nodeOutputOptional.isPresent()
&& nodeOutputOptional.get() instanceof InterruptionMetadata interruptionMetadata) { // [!code highlight]

System.out.println("工作流被中断,等待人工审核。");
System.out.println("中断节点: " + interruptionMetadata.node());

List<InterruptionMetadata.ToolFeedback> feedbacks = interruptionMetadata.toolFeedbacks();

// 显示所有需要审批的工具调用
for (InterruptionMetadata.ToolFeedback feedback : feedbacks) {
System.out.println("工具名称: " + feedback.getName());
System.out.println("工具参数: " + feedback.getArguments());
System.out.println("工具描述: " + feedback.getDescription());
}

// 构建人工反馈(批准所有工具调用)
InterruptionMetadata.Builder feedbackBuilder = InterruptionMetadata.builder()
.nodeId(interruptionMetadata.node())
.state(interruptionMetadata.state());

feedbacks.forEach(toolFeedback -> {
feedbackBuilder.addToolFeedback(
InterruptionMetadata.ToolFeedback.builder(toolFeedback)
.result(InterruptionMetadata.ToolFeedback.FeedbackResult.APPROVED)
.build()
);
});

InterruptionMetadata approvalMetadata = feedbackBuilder.build();

// 使用批准决策恢复执行
RunnableConfig resumableConfig = RunnableConfig.builder()
.threadId(threadId) // 相同的线程ID
.addHumanFeedback(approvalMetadata) // [!code highlight]
.build();

// 恢复工作流执行(传入空Map,因为状态已保存在检查点中)
nodeOutputOptional = compiledGraph.invokeAndGetOutput(Map.of(), resumableConfig); // [!code highlight]
}

关键区别

与单独使用 Agent 相比,在 Workflow 中使用人工中断有以下关键区别:

特性单独 AgentWorkflow 中的 Agent
检查点配置Agent 级别配置需要在 CompileConfig 中注册
中断检查agent.invokeAndGetOutput()compiledGraph.invokeAndGetOutput()
恢复执行直接调用 Agent调用 CompiledGraph
状态管理Agent 内部状态工作流全局状态
中断位置Agent 节点工作流中的 Agent 节点

注意事项

  1. 共享检查点保存器: 工作流和嵌套的 Agent 应该使用相同的检查点保存器实例,以确保状态一致性
  2. 线程 ID 一致性: 恢复执行时必须使用相同的 threadId
  3. 空输入恢复: 恢复执行时通常传入空的 Map,因为状态已保存在检查点中
  4. 节点标识: InterruptionMetadata.node() 返回的是工作流中 Agent 节点的名称,而不是 Agent 内部的节点名称

实用工具方法

为了简化人工介入的处理,你可以创建实用方法:

HITLHelper 实用工具方法示例查看完整代码
public class HITLHelper {

/**
* 批准所有工具调用
*/
public static InterruptionMetadata approveAll(InterruptionMetadata interruptionMetadata) {
InterruptionMetadata.Builder builder = InterruptionMetadata.builder()
.nodeId(interruptionMetadata.node())
.state(interruptionMetadata.state());

interruptionMetadata.toolFeedbacks().forEach(toolFeedback -> {
builder.addToolFeedback(
InterruptionMetadata.ToolFeedback.builder(toolFeedback)
.result(InterruptionMetadata.ToolFeedback.FeedbackResult.APPROVED)
.build()
);
});

return builder.build();
}

/**
* 拒绝所有工具调用
*/
public static InterruptionMetadata rejectAll(
InterruptionMetadata interruptionMetadata,
String reason) {
InterruptionMetadata.Builder builder = InterruptionMetadata.builder()
.nodeId(interruptionMetadata.node())
.state(interruptionMetadata.state());

interruptionMetadata.toolFeedbacks().forEach(toolFeedback -> {
builder.addToolFeedback(
InterruptionMetadata.ToolFeedback.builder(toolFeedback)
.result(InterruptionMetadata.ToolFeedback.FeedbackResult.REJECTED)
.description(reason)
.build()
);
});

return builder.build();
}

/**
* 编辑特定工具的参数
*/
public static InterruptionMetadata editTool(
InterruptionMetadata interruptionMetadata,
String toolName,
String newArguments) {
InterruptionMetadata.Builder builder = InterruptionMetadata.builder()
.nodeId(interruptionMetadata.node())
.state(interruptionMetadata.state());

interruptionMetadata.toolFeedbacks().forEach(toolFeedback -> {
if (toolFeedback.getName().equals(toolName)) {
builder.addToolFeedback(
InterruptionMetadata.ToolFeedback.builder(toolFeedback)
.arguments(newArguments)
.result(InterruptionMetadata.ToolFeedback.FeedbackResult.EDITED)
.build()
);
} else {
builder.addToolFeedback(
InterruptionMetadata.ToolFeedback.builder(toolFeedback)
.result(InterruptionMetadata.ToolFeedback.FeedbackResult.APPROVED)
.build()
);
}
});

return builder.build();
}
}

// 使用示例
InterruptionMetadata approvalMetadata = HITLHelper.approveAll(interruptionMetadata);
InterruptionMetadata rejectMetadata = HITLHelper.rejectAll(interruptionMetadata, "操作不安全");
InterruptionMetadata editMetadata = HITLHelper.editTool(
interruptionMetadata,
"execute_sql",
"{"query": "SELECT * FROM records LIMIT 10"}"
);

最佳实践

  1. 始终使用检查点: 人工介入需要检查点机制来保存和恢复状态
  2. 提供清晰的描述: 在 ToolConfig 中提供清晰的描述,帮助审查者理解操作
  3. 保守编辑: 编辑工具参数时,尽量保持最小更改
  4. 处理所有工具反馈: 确保为每个需要审查的工具调用提供决策
  5. 使用相同的 threadId: 恢复执行时必须使用相同的线程 ID
  6. 考虑超时: 实现超时机制以处理长时间未响应的人工审批

与 Interceptor 的区别

在 Spring AI Alibaba 中,Hook 和 Interceptor 都可以用于干预 Agent 执行:

特性HookInterceptor
执行位置Agent 级别(before/after agent, before/after model)模型或工具调用级别
中断能力支持中断和恢复(如 HumanInTheLoopHook)不支持中断,仅拦截和修改
使用场景人工审批、Agent 间协调日志记录、重试、降级
配置方式.hooks(List.of(...)).interceptors(List.of(...))

相关文档

Spring AI Alibaba 开源项目基于 Spring AI 构建,是阿里云通义系列模型及服务在 Java AI 应用开发领域的最佳实践,提供高层次的 AI API 抽象与云原生基础设施集成方案,帮助开发者快速构建 AI 应用。