weiqingy commented on code in PR #830: URL: https://github.com/apache/flink-agents/pull/830#discussion_r3394054297
########## python/flink_agents/examples/quickstart/agents/parallel_chat_agent.py: ########## @@ -0,0 +1,174 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import json +import os +from typing import Any, Dict, Tuple + +from pydantic import BaseModel + +from flink_agents.api.agents.agent import STRUCTURED_OUTPUT, Agent +from flink_agents.api.agents.types import OutputSchema +from flink_agents.api.chat_message import ChatMessage, MessageRole +from flink_agents.api.decorators import action, chat_model_setup +from flink_agents.api.events.chat_event import ChatRequestEvent, ChatResponseEvent +from flink_agents.api.events.event import Event, InputEvent, OutputEvent +from flink_agents.api.resource import ResourceDescriptor, ResourceName +from flink_agents.api.runner_context import RunnerContext +from flink_agents.examples.quickstart.agents.custom_types_and_resources import ( + AspectResponse, + SummaryResponse, +) + +OLLAMA_MODEL = os.environ.get("PARALLEL_CHAT_OLLAMA_MODEL", "qwen3:1.7b") + +INPUT_TEXT = "The food here is great, but the service is too slow" +ASPECTS: Tuple[str, ...] = ("taste", "service", "price") +N_ASPECTS = len(ASPECTS) + +PARALLEL_SYSTEM_PROMPT = ( + "You are a sentiment analysis assistant. Return JSON: " + '{"aspect":"<dimension>", "result":"<positive|negative|not_mentioned>"}' + " — no explanation, no extra fields." +) +AGGREGATE_SYSTEM_PROMPT = ( + "You are a summary assistant. Based on the sentiment judgments for three " + "dimensions, compose a brief one-line evaluation. Return JSON: " + '{"summary":"taste: service: price:"} — return only this JSON.' +) + + +def _init_row(event: Event) -> Dict[str, Any]: + """Build a row skeleton from the InputEvent.""" + payload = InputEvent.from_event(event).input + return {"id": payload["id"], "text": payload["text"], "sentiments": {}} + + +def _save_row(ctx: RunnerContext, row: Dict[str, Any]) -> None: + """Write the row to sensory memory.""" + ctx.sensory_memory.set("res", json.dumps(row, ensure_ascii=False)) + + +def _load_row(ctx: RunnerContext) -> Dict[str, Any]: + """Read the row from sensory memory.""" + return json.loads(ctx.sensory_memory.get("res")) + + +def _build_aspect_request(text: str, aspect: str) -> ChatRequestEvent: + """Build a ChatRequestEvent for a single aspect dimension.""" + return ChatRequestEvent( + model="sentiment_model", + messages=[ + ChatMessage(role=MessageRole.SYSTEM, content=PARALLEL_SYSTEM_PROMPT), + ChatMessage( + role=MessageRole.USER, + content=f'Judge the "{aspect}" dimension: {text}', + ), + ], + output_schema=OutputSchema(output_schema=AspectResponse), + ) + + +def _build_summarize_request(row: Dict[str, Any]) -> ChatRequestEvent: + """Build a ChatRequestEvent for the aggregation phase.""" + sentiments = row["sentiments"] + body = ( + f"Original: {row['text']}\n" + + "Judgments: " + + " ".join(f"{a}:{sentiments[a]}" for a in ASPECTS) + ) + return ChatRequestEvent( + model="sentiment_model", + messages=[ + ChatMessage(role=MessageRole.SYSTEM, content=AGGREGATE_SYSTEM_PROMPT), + ChatMessage(role=MessageRole.USER, content=body), + ], + output_schema=OutputSchema(output_schema=SummaryResponse), + ) + + +def _build_output_event(row: Dict[str, Any], parsed: SummaryResponse) -> OutputEvent: + """Pack row fields and summary into the final OutputEvent.""" + return OutputEvent( + output={"id": row["id"], "text": row["text"], "summary": parsed.summary} + ) + + +def _parse_response(event: Event) -> AspectResponse | SummaryResponse: + """Parse a ChatResponseEvent into a structured response object.""" + response = ChatResponseEvent.from_event(event).response + raw = response.extra_args[STRUCTURED_OUTPUT] + if isinstance(raw, BaseModel): + return raw + if "summary" in raw: + return SummaryResponse.model_validate(raw) + return AspectResponse.model_validate(raw) + + +def _is_final(parsed: AspectResponse | SummaryResponse) -> bool: + """Return True if the parsed response is from the aggregation phase.""" + return isinstance(parsed, SummaryResponse) + + +def _all_aspects_received(row: Dict[str, Any]) -> bool: + """Return True if all aspect judgments have been collected.""" + return len(row["sentiments"]) == N_ASPECTS + + +class ParallelChatAgent(Agent): + """An agent that demonstrates parallel LLM invocations via fan-out of + multiple ChatRequestEvent events. + + This agent receives a restaurant review and uses an LLM to judge sentiment + along multiple dimensions in parallel, then aggregates the results into a + one-line summary with a final LLM call. It handles prompt construction, + parallel chat dispatch, response accumulation, and output assembly. + """ + + @chat_model_setup + @staticmethod + def sentiment_model() -> ResourceDescriptor: + """ChatModel for sentiment analysis.""" + return ResourceDescriptor( + clazz=ResourceName.ChatModel.OLLAMA_SETUP, + connection="ollama_server", + model=OLLAMA_MODEL, + extract_reasoning=True, + ) + + @action(InputEvent.EVENT_TYPE) + @staticmethod + def request_aspect_judgments(event: Event, ctx: RunnerContext) -> None: + """Process input event and send chat requests for each aspect.""" + row = _init_row(event) + _save_row(ctx, row) + for aspect in ASPECTS: + ctx.send_event(_build_aspect_request(row["text"], aspect)) + + @action(ChatResponseEvent.EVENT_TYPE) + @staticmethod + def handle_response(event: Event, ctx: RunnerContext) -> None: + """Process chat response event and send output event.""" + parsed = _parse_response(event) + row = _load_row(ctx) + if _is_final(parsed): + ctx.send_event(_build_output_event(row, parsed)) + return + row["sentiments"][parsed.aspect] = parsed.result Review Comment: The accumulator is keyed on `parsed.aspect` — the string the LLM put in its response — while the completion check is `len(row["sentiments"]) == N_ASPECTS`. Since the key is whatever the model echoes back rather than the aspect each request was dispatched for, two failure modes open up with a small/non-deterministic model like the documented `qwen3:1.7b`: if the model returns the same `aspect` value twice, the map never reaches size 3 and the summarize request is never sent (the input silently produces no output); if it returns a label outside `ASPECTS`, the later `_build_summarize_request` lookup over the fixed `ASPECTS` list misses. The two languages even diverge on that miss — Python raises `KeyError` (line 92), while Java yields a silent `"taste:null"` (`ParallelChatAgent.java:123`). Was correlating by the dispatched aspect considered here — e.g. carrying it through a `prompt_args` round-trip or a `request_id` — rather than trusting the model to echo it back? If the "trust the echoed aspect" simplification is intentional for the demo, would it be worth a doc note that the design assumes the model returns exactly one of the three labels, and that a non-conforming response breaks collection? That assumption is currently the load-bearing part of the example and it's invisible to a reader. ########## python/flink_agents/examples/quickstart/agents/parallel_chat_agent.py: ########## @@ -0,0 +1,174 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import json +import os +from typing import Any, Dict, Tuple + +from pydantic import BaseModel + +from flink_agents.api.agents.agent import STRUCTURED_OUTPUT, Agent +from flink_agents.api.agents.types import OutputSchema +from flink_agents.api.chat_message import ChatMessage, MessageRole +from flink_agents.api.decorators import action, chat_model_setup +from flink_agents.api.events.chat_event import ChatRequestEvent, ChatResponseEvent +from flink_agents.api.events.event import Event, InputEvent, OutputEvent +from flink_agents.api.resource import ResourceDescriptor, ResourceName +from flink_agents.api.runner_context import RunnerContext +from flink_agents.examples.quickstart.agents.custom_types_and_resources import ( + AspectResponse, + SummaryResponse, +) + +OLLAMA_MODEL = os.environ.get("PARALLEL_CHAT_OLLAMA_MODEL", "qwen3:1.7b") + +INPUT_TEXT = "The food here is great, but the service is too slow" +ASPECTS: Tuple[str, ...] = ("taste", "service", "price") +N_ASPECTS = len(ASPECTS) + +PARALLEL_SYSTEM_PROMPT = ( + "You are a sentiment analysis assistant. Return JSON: " + '{"aspect":"<dimension>", "result":"<positive|negative|not_mentioned>"}' + " — no explanation, no extra fields." +) +AGGREGATE_SYSTEM_PROMPT = ( Review Comment: The two `AGGREGATE_SYSTEM_PROMPT`s ask the model for materially different summary formats. The Java side (`ParallelChatAgent.java:71-76`) spells out the value vocabulary inline — `{"summary":"taste:<positive/negative/not_mentioned>, service:<...>, price:<...>"}` — while this one gives an empty template, `{"summary":"taste: service: price:"}`. From the same input the two examples will produce visibly different summaries. The `PARALLEL_SYSTEM_PROMPT` is identical across both languages, which is what makes this read as an oversight rather than a deliberate per-language choice — and CLAUDE.md asks for Java/Python parity on cross-language features. Would it make sense to bring these two in line? ########## python/flink_agents/examples/quickstart/agents/custom_types_and_resources.py: ########## @@ -196,6 +196,21 @@ class ProductReviewAnalysisRes(BaseModel): reasons: list[str] +# Custom types for parallel chat agent. +class AspectResponse(BaseModel): + """LLM response for a single aspect judgment.""" + + aspect: str + result: str + + +class SummaryResponse(BaseModel): + """LLM response for the aggregation phase.""" + + summary: str + Review Comment: Three blank lines between `SummaryResponse` and the next statement — ruff flags this as `E303 too many blank lines (3)`, so `./tools/lint.sh -c` will fail in CI. Reduce to two. ########## examples/src/main/java/org/apache/flink/agents/examples/agents/ParallelChatAgent.java: ########## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.agents.examples.agents; + +import org.apache.flink.agents.api.Event; +import org.apache.flink.agents.api.InputEvent; +import org.apache.flink.agents.api.OutputEvent; +import org.apache.flink.agents.api.agents.Agent; +import org.apache.flink.agents.api.annotation.Action; +import org.apache.flink.agents.api.annotation.ChatModelSetup; +import org.apache.flink.agents.api.chat.messages.ChatMessage; +import org.apache.flink.agents.api.chat.messages.MessageRole; +import org.apache.flink.agents.api.context.RunnerContext; +import org.apache.flink.agents.api.event.ChatRequestEvent; +import org.apache.flink.agents.api.event.ChatResponseEvent; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.api.resource.ResourceName; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.StringJoiner; + +import static org.apache.flink.agents.api.agents.Agent.STRUCTURED_OUTPUT; + +/** + * An agent that demonstrates parallel LLM invocations via fan-out of multiple {@link + * ChatRequestEvent} events. + * + * <p>This agent receives a restaurant review and uses an LLM to judge sentiment along multiple + * dimensions (taste / service / price) in parallel, then aggregates the results into a one-line + * summary with a final LLM call. It handles prompt construction, parallel chat dispatch, response + * accumulation, and output assembly. + * + * <p><b>JDK version note:</b> On JDK 21+, the framework uses the Continuation API to execute + * concurrent chat actions in parallel, so the wall clock time is roughly "slowest single branch + + * aggregation call". On JDK < 21, the framework silently falls back to sequential execution — + * the result is identical, but the LLM calls run one after another. + */ +public class ParallelChatAgent extends Agent { + + /** Ollama model name, configurable via environment variable. */ + public static final String OLLAMA_MODEL = + System.getenv().getOrDefault("PARALLEL_CHAT_OLLAMA_MODEL", "qwen3:1.7b"); + + /** Input text for the demo. */ + public static final String INPUT_TEXT = "The food here is great, but the service is too slow"; + + private static final String[] ASPECTS = {"taste", "service", "price"}; + private static final int N_ASPECTS = ASPECTS.length; + + private static final String PARALLEL_SYSTEM_PROMPT = + "You are a sentiment analysis assistant. Return JSON: " + + "{\"aspect\":\"<dimension>\", \"result\":\"<positive|negative|not_mentioned>\"}" + + " — no explanation, no extra fields."; + private static final String AGGREGATE_SYSTEM_PROMPT = + "You are a summary assistant. Based on the sentiment judgments for three " + + "dimensions, compose a brief one-line evaluation. Return JSON: " + + "{\"summary\":\"taste:<positive/negative/not_mentioned>, " + + "service:<positive/negative/not_mentioned>, " + + "price:<positive/negative/not_mentioned>\"} — return only this JSON."; + + @ChatModelSetup + public static ResourceDescriptor sentimentModel() { + return ResourceDescriptor.Builder.newBuilder(ResourceName.ChatModel.OLLAMA_SETUP) + .addInitialArgument("connection", "ollamaChatModelConnection") + .addInitialArgument("model", OLLAMA_MODEL) + .addInitialArgument("extract_reasoning", true) + .build(); + } + + private static Map<String, Object> initRow(Event event) { + InputEvent inputEvent = InputEvent.fromEvent(event); + CustomTypesAndResources.SentimentRequest request = + (CustomTypesAndResources.SentimentRequest) inputEvent.getInput(); + Map<String, Object> row = new HashMap<>(); + row.put("id", request.getId()); + row.put("text", request.getText()); + row.put("sentiments", new HashMap<String, String>()); + return row; + } + + private static void saveRow(RunnerContext ctx, Map<String, Object> row) throws Exception { + ctx.getSensoryMemory().set("res", row); Review Comment: `saveRow`/`loadRow` here store and read the row `Map<String,Object>` object directly, while the Python `_save_row`/`_load_row` round-trip through `json.dumps`/`json.loads` (`parallel_chat_agent.py:63`). Both work for this example, but in a teaching example built around cross-language parity a reader comparing the two sides sees Python defensively serializing (Pemja-backed memory can't always hold arbitrary Python objects) while Java stores a raw nested map. I'm not sure this matters for the demo, so genuinely a question: is the divergence intentional, and if so would a one-line doc note on why Python serializes help — so a Python reader doesn't infer that `set()` of an arbitrary nested dict is always safe? ########## python/flink_agents/examples/quickstart/agents/parallel_chat_agent.py: ########## @@ -0,0 +1,174 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +import json +import os +from typing import Any, Dict, Tuple + +from pydantic import BaseModel + +from flink_agents.api.agents.agent import STRUCTURED_OUTPUT, Agent +from flink_agents.api.agents.types import OutputSchema +from flink_agents.api.chat_message import ChatMessage, MessageRole +from flink_agents.api.decorators import action, chat_model_setup +from flink_agents.api.events.chat_event import ChatRequestEvent, ChatResponseEvent +from flink_agents.api.events.event import Event, InputEvent, OutputEvent +from flink_agents.api.resource import ResourceDescriptor, ResourceName +from flink_agents.api.runner_context import RunnerContext +from flink_agents.examples.quickstart.agents.custom_types_and_resources import ( + AspectResponse, + SummaryResponse, +) + +OLLAMA_MODEL = os.environ.get("PARALLEL_CHAT_OLLAMA_MODEL", "qwen3:1.7b") + +INPUT_TEXT = "The food here is great, but the service is too slow" +ASPECTS: Tuple[str, ...] = ("taste", "service", "price") +N_ASPECTS = len(ASPECTS) + +PARALLEL_SYSTEM_PROMPT = ( + "You are a sentiment analysis assistant. Return JSON: " + '{"aspect":"<dimension>", "result":"<positive|negative|not_mentioned>"}' + " — no explanation, no extra fields." +) +AGGREGATE_SYSTEM_PROMPT = ( + "You are a summary assistant. Based on the sentiment judgments for three " + "dimensions, compose a brief one-line evaluation. Return JSON: " + '{"summary":"taste: service: price:"} — return only this JSON.' +) + + +def _init_row(event: Event) -> Dict[str, Any]: + """Build a row skeleton from the InputEvent.""" + payload = InputEvent.from_event(event).input + return {"id": payload["id"], "text": payload["text"], "sentiments": {}} + + +def _save_row(ctx: RunnerContext, row: Dict[str, Any]) -> None: + """Write the row to sensory memory.""" + ctx.sensory_memory.set("res", json.dumps(row, ensure_ascii=False)) + + +def _load_row(ctx: RunnerContext) -> Dict[str, Any]: + """Read the row from sensory memory.""" + return json.loads(ctx.sensory_memory.get("res")) + + +def _build_aspect_request(text: str, aspect: str) -> ChatRequestEvent: + """Build a ChatRequestEvent for a single aspect dimension.""" + return ChatRequestEvent( + model="sentiment_model", + messages=[ + ChatMessage(role=MessageRole.SYSTEM, content=PARALLEL_SYSTEM_PROMPT), + ChatMessage( + role=MessageRole.USER, + content=f'Judge the "{aspect}" dimension: {text}', + ), + ], + output_schema=OutputSchema(output_schema=AspectResponse), + ) + + +def _build_summarize_request(row: Dict[str, Any]) -> ChatRequestEvent: + """Build a ChatRequestEvent for the aggregation phase.""" + sentiments = row["sentiments"] + body = ( + f"Original: {row['text']}\n" + + "Judgments: " + + " ".join(f"{a}:{sentiments[a]}" for a in ASPECTS) + ) + return ChatRequestEvent( + model="sentiment_model", + messages=[ + ChatMessage(role=MessageRole.SYSTEM, content=AGGREGATE_SYSTEM_PROMPT), + ChatMessage(role=MessageRole.USER, content=body), + ], + output_schema=OutputSchema(output_schema=SummaryResponse), + ) + + +def _build_output_event(row: Dict[str, Any], parsed: SummaryResponse) -> OutputEvent: + """Pack row fields and summary into the final OutputEvent.""" + return OutputEvent( + output={"id": row["id"], "text": row["text"], "summary": parsed.summary} + ) + + +def _parse_response(event: Event) -> AspectResponse | SummaryResponse: Review Comment: With an `OutputSchema(output_schema=...)` of a `BaseModel`, `_generate_structured_output` always stores a validated pydantic instance, so `isinstance(raw, BaseModel)` is always true and the two `model_validate` fallbacks below it are dead for this example. Not incorrect, but in a teaching example a dead branch invites the reader to wonder when it fires. Would dropping the dict fallback (or a one-line note on when `raw` would be a dict) keep the example's intent clearer? ########## docs/content/docs/get-started/quickstart/parallel_llm.md: ########## @@ -0,0 +1,349 @@ +--- +title: 'Parallel LLM Calls' +weight: 3 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +## Overview + +Flink Agents supports parallel LLM invocations via multi-action fan-out. By emitting multiple `ChatRequestEvent` events from a single action, the framework's built-in chat action executes the corresponding LLM calls concurrently — no external orchestration is required. + +This quickstart introduces an example that demonstrates how to build a parallel LLM workflow with Flink Agents: + +The **Parallel Sentiment Analysis** agent processes a restaurant review and judges sentiment along three dimensions (taste / service / price) in parallel, then aggregates the results into a one-line summary with a final LLM call. The end-to-end wall clock time is roughly "slowest single branch + aggregation call", rather than the sum of all four calls. + +{{< hint info >}} +**JDK version note (Java only):** On JDK 21+, the framework uses the Continuation API to execute concurrent chat actions in parallel. On JDK < 21, the framework silently falls back to sequential execution — the result is identical, but the LLM calls run one after another. Python uses native coroutines and always executes in parallel regardless of the JDK version. +{{< /hint >}} + +## Code Walkthrough + +### Prepare Agents Execution Environment + +Create the agents execution environment, and register the available chat model connection to the environment. + +{{< tabs "Prepare Agents Execution Environment" >}} + +{{< tab "Python" >}} +```python +# Set up the Flink streaming environment and the Agents execution environment. +env = StreamExecutionEnvironment.get_execution_environment() +env.set_parallelism(1) +agents_env = AgentsExecutionEnvironment.get_execution_environment(env) + +# Add Ollama chat model connection to be used by the ParallelChatAgent. +agents_env.add_resource( + "ollama_server", + ResourceType.CHAT_MODEL_CONNECTION, + ResourceDescriptor( + clazz=ResourceName.ChatModel.OLLAMA_CONNECTION, + request_timeout=240.0, + ), +) +``` +{{< /tab >}} + +{{< tab "Java" >}} +```Java +// Set up the Flink streaming environment and the Agents execution environment. +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setParallelism(1); +AgentsExecutionEnvironment agentsEnv = + AgentsExecutionEnvironment.getExecutionEnvironment(env); + +// Add Ollama chat model connection to be used by the ParallelChatAgent. +agentsEnv.addResource( + "ollamaChatModelConnection", + ResourceType.CHAT_MODEL_CONNECTION, + ResourceDescriptor.Builder.newBuilder(ResourceName.ChatModel.OLLAMA_CONNECTION) + .addInitialArgument("endpoint", "http://localhost:11434") + .addInitialArgument("requestTimeout", 240) + .build()); +``` +{{< /tab >}} + +{{< /tabs >}} + +### Create the Agent + +Below is the example code for the `ParallelChatAgent`. The agent defines a chat model for sentiment analysis and two actions: `request_aspect_judgments` fans out one `ChatRequestEvent` per dimension, and `handle_response` collects the results, triggers the aggregation call, and emits the final output. For more details, please refer to the [Workflow Agent]({{< ref "docs/development/workflow_agent" >}}) documentation. + +{{< tabs "Create the Agent" >}} + +{{< tab "Python" >}} +```python +class ParallelChatAgent(Agent): + """An agent that demonstrates parallel LLM invocations via fan-out of + multiple ChatRequestEvent events. + + This agent receives a restaurant review and uses an LLM to judge sentiment + along multiple dimensions in parallel, then aggregates the results into a + one-line summary with a final LLM call. It handles prompt construction, + parallel chat dispatch, response accumulation, and output assembly. + """ + + @chat_model_setup + @staticmethod + def sentiment_model() -> ResourceDescriptor: + """ChatModel for sentiment analysis.""" + return ResourceDescriptor( + clazz=ResourceName.ChatModel.OLLAMA_SETUP, + connection="ollama_server", + model=OLLAMA_MODEL, + extract_reasoning=True, + ) + + @action(InputEvent.EVENT_TYPE) + @staticmethod + def request_aspect_judgments(event: Event, ctx: RunnerContext) -> None: + """Process input event and send chat requests for each aspect.""" + row = _init_row(event) + _save_row(ctx, row) + for aspect in ASPECTS: + ctx.send_event(_build_aspect_request(row["text"], aspect)) + + @action(ChatResponseEvent.EVENT_TYPE) + @staticmethod + def handle_response(event: Event, ctx: RunnerContext) -> None: + """Process chat response event and send output event.""" + parsed = _parse_response(event) Review Comment: The walkthrough shows `handle_response` calling `_parse_response`, but that helper and the others (`_init_row`, `_build_aspect_request`, `_all_aspects_received`, `_build_summarize_request`, and the Java counterparts) aren't shown anywhere on the page. The collection logic that the inline questions probe — the aspect-keying and the type-based response dispatch — lives entirely in those helpers, so the part a reader most needs to understand the parallel pattern is the part that's elided. Would inlining the helper bodies (or linking to `parallel_chat_agent.py` / `ParallelChatAgent.java`) help the lesson land? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
