流式输出
Spring AI Alibaba Graph 内置了对流式处理的原生支持,框架统一是使用 Flux 来在框架中定义和传递流,与 Spring 生态的流式处理保持一致。以下是从 Graph 运行中流式返回输出的不同方式。
调用 Graph 的流式输出
.stream() 是一个用于从图运行中流式返回输出的方法。它返回一个 Flux,请记住由于 Flux 流式的特性,流返回后并不会立即出触发图引擎的执行,你需要执行类似 Flux.subscribe() 的操作才能真正启动流引擎。
目前 Flux 返回的是 NodeOutput 类的实例,该类基本上报告执行的节点名称和结果状态。
流的组合(嵌入和组合)
Flux 支持多个流的合并、转换、组合等操作,具备非常强大的能力,这在处理图中多个流式节点时会非常有用。具体使用方式可搜索 Spring Reactor 学习。
理解
在节点操作中整合流式输出
在 Spring AI Alibaba Graph 中,您可以在节点操作中直接返回 Flux 对象,框架会自动处理流式输出。
流式节 点实现
流式节点实现查看完整代码
import com.alibaba.cloud.ai.graph.OverAllState;
import com.alibaba.cloud.ai.graph.action.NodeAction;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.model.ChatResponse;
import java.util.Map;
import reactor.core.publisher.Flux;
/**
* 流式节点实现 - 使用 GraphFluxGenerator 处理流式响应
*/
public static class StreamingNode implements NodeAction {
private final ChatClient chatClient;
private final String nodeId;
public StreamingNode(ChatClient.Builder chatClientBuilder, String nodeId) {
this.chatClient = chatClientBuilder.build();
this.nodeId = nodeId;
}
@Override
public Map<String, Object> apply(OverAllState state) {
String query = (String) state.value("query").orElse("");
// 获取流式响应
Flux<ChatResponse> chatResponseFlux = chatClient.prompt()
.user(query)
.stream()
.chatResponse();
// 将流式响应存储在状态中
return Map.of("messages", chatResponseFlux);
}
}
处理流式输出的节点
处理流式输出的节点查看完整代码
import com.alibaba.cloud.ai.graph.OverAllState;
import com.alibaba.cloud.ai.graph.action.NodeAction;
import java.util.Map;
/**
* 处理流式输出的节点 - 接收并处理流式响应
*/
public static class ProcessStreamingNode implements NodeAction {
@Override
public Map<String, Object> apply(OverAllState state) {
// 请注意,虽然上一个节点返回的是Flux对象,但是在引擎运行到当前节点时,
// 框架已经完成了对上一个节点Flux对象的自动订阅与消费,并将最终的结果汇总后添加到了 messages key 中(基于 AppendStrategy)
Object messages = state.value("messages").orElse("");
String result = "流式响应已处理完成: " + messages;
return Map.of("result", result);
}
}