This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
commit 82c275e643e62fb5e02a70aa58d14494389ed897 Author: WenjinXie <[email protected]> AuthorDate: Thu Sep 25 15:03:22 2025 +0800 [example] Add react agent quick start example and refactor workflow agent example. --- .../docs/get-started/quickstart/workflow_agent.md | 4 +- .../agents/custom_types_and_resources.py | 205 +++++++++++++++++++++ .../quickstart/agents/product_suggestion_agent.py | 65 +------ .../quickstart/agents/review_analysis_agent.py | 92 ++++----- ...t_review_analysis.py => react_agent_example.py} | 46 +++-- .../quickstart/resources/product_review.txt | 3 + ...stion.py => workflow_multiple_agent_example.py} | 11 +- ...nalysis.py => workflow_single_agent_example.py} | 13 +- 8 files changed, 297 insertions(+), 142 deletions(-) diff --git a/docs/content/docs/get-started/quickstart/workflow_agent.md b/docs/content/docs/get-started/quickstart/workflow_agent.md index 05e5853..b7fbb55 100644 --- a/docs/content/docs/get-started/quickstart/workflow_agent.md +++ b/docs/content/docs/get-started/quickstart/workflow_agent.md @@ -103,10 +103,10 @@ git clone https://github.com/apache/flink-agents.git export PYTHONPATH=$(python -c 'import sysconfig; print(sysconfig.get_paths()["purelib"])') # Run review analysis example -./flink-1.20.3/bin/flink run -py ./flink-agents/python/flink_agents/examples/quickstart/product_review_analysis.py +./flink-1.20.3/bin/flink run -py ./flink-agents/python/flink_agents/examples/quickstart/workflow_single_agent_example.py # Run product suggestion example -./flink-1.20.3/bin/flink run -py ./flink-agents/python/flink_agents/examples/quickstart/product_improve_suggestion.py +./flink-1.20.3/bin/flink run -py ./flink-agents/python/flink_agents/examples/quickstart/workflow_multiple_agent_example.py ``` Now you should see a Flink job submitted to the Flink Cluster in Flink web UI [localhost:8081](localhost:8081) diff --git a/python/flink_agents/examples/quickstart/agents/custom_types_and_resources.py b/python/flink_agents/examples/quickstart/agents/custom_types_and_resources.py new file mode 100644 index 0000000..709ae9b --- /dev/null +++ b/python/flink_agents/examples/quickstart/agents/custom_types_and_resources.py @@ -0,0 +1,205 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################# +from typing import List + +from pydantic import BaseModel + +from flink_agents.api.chat_message import ChatMessage, MessageRole +from flink_agents.api.prompts.prompt import Prompt +from flink_agents.api.resource import ResourceDescriptor +from flink_agents.integrations.chat_models.ollama_chat_model import ( + OllamaChatModelConnection, +) + +# Prompt for review analysis agent. +review_analysis_system_prompt_str = """ + Analyze the user review and product information to determine a + satisfaction score (1-5) and potential reasons for dissatisfaction. + + Example input format: + {{ + "id": "12345", + "review": "The headphones broke after one week of use. Very poor quality." + }} + + Ensure your response can be parsed by Python JSON, using this format as an example: + {{ + "id": "12345", + "score": 1, + "reasons": [ + "poor quality" + ] + }} + + Please note that if a product review includes dissatisfaction with the shipping process, + you should first notify the shipping manager using the appropriate tools. After executing + the tools, strictly follow the example above to provide your score and reason — there is + no need to disclose whether the tool was used. + """ + +review_analysis_prompt = Prompt.from_messages( + messages=[ + ChatMessage( + role=MessageRole.SYSTEM, + content=review_analysis_system_prompt_str, + ), + # Here we just fill the prompt with input, user should deserialize + # input element to input text self in action. + ChatMessage( + role=MessageRole.USER, + content=""" + "input": + {input} + """, + ), + ], +) + +# Prompt for review analysis react agent. +review_analysis_react_prompt = Prompt.from_messages( + messages=[ + ChatMessage( + role=MessageRole.SYSTEM, + content=review_analysis_system_prompt_str, + ), + # For react agent, framework will deserialize input element + # to dict and fill the prompt. + # Note, the input element should be BaseModel or Row. + ChatMessage( + role=MessageRole.USER, + content=""" + "id": {id}, + "review": {review} + """, + ), + ], +) + +# Prompt for product suggestion agent. +product_suggestion_prompt_str = """ + Based on the rating distribution and user dissatisfaction reasons, generate three actionable suggestions for product improvement. + + Input format: + {{ + "id": "1", + "score_histogram": ["10%", "20%", "10%", "15%", "45%"], + "unsatisfied_reasons": ["reason1", "reason2", "reason3"] + }} + + Ensure that your response can be parsed by Python json,use the following format as an example: + {{ + "suggestion_list": [ + "suggestion1", + "suggestion2", + "suggestion3" + ] + }} + + input: + {input} + """ +product_suggestion_prompt = Prompt.from_text(product_suggestion_prompt_str) + + +# Tool for notifying the shipping manager. For simplicity, just print the message. +def notify_shipping_manager(id: str, review: str) -> None: + """Notify the shipping manager when product received a negative review due to + shipping damage. + + Parameters + ---------- + id : str + The id of the product that received a negative review due to shipping damage + review: str + The negative review content + """ + content = ( + f"Transportation issue for product [{id}], the customer feedback: {review}" + ) + print(content) + + +# Custom types used for product suggestion agent +class ProductReviewSummary(BaseModel): + """Aggregates multiple reviews and insights using LLM for a product. + + Attributes: + id (str): The unique identifier of the product. + score_hist (List[str]): A collection of rating scores from various reviews. + unsatisfied_reasons (List[str]): A list of reasons or insights generated by LLM + to explain the rating. + """ + + id: str + score_hist: List[str] + unsatisfied_reasons: List[str] + + +class ProductSuggestion(BaseModel): + """Provides a summary of review data including suggestions for improvement. + + Attributes: + id (str): The unique identifier of the product. + score_histogram (List[int]): A collection of rating scores from various reviews. + suggestions (List[str]): Suggestions or recommendations generated as a result of + review analysis. + """ + + id: str + score_hist: List[str] + suggestions: List[str] + + +# custom types for review analysis agent. +class ProductReview(BaseModel): + """Data model representing a product review. + + Attributes: + ---------- + id : str + The unique identifier for the product being reviewed. + review : str + The review of the product. + """ + + id: str + review: str + + +class ProductReviewAnalysisRes(BaseModel): + """Data model representing analysis result of a product review. + + Attributes: + ---------- + id : str + The unique identifier for the product being reviewed. + score : int + The satisfaction score given by the reviewer. + reasons : List[str] + A list of reasons provided by the reviewer for dissatisfaction, if any. + """ + + id: str + score: int + reasons: list[str] + + +# ollama chat model connection descriptor +ollama_server_descriptor = ResourceDescriptor( + clazz=OllamaChatModelConnection, model="qwen3:8b", request_timeout=120 +) diff --git a/python/flink_agents/examples/quickstart/agents/product_suggestion_agent.py b/python/flink_agents/examples/quickstart/agents/product_suggestion_agent.py index 7e7eb06..54762fc 100644 --- a/python/flink_agents/examples/quickstart/agents/product_suggestion_agent.py +++ b/python/flink_agents/examples/quickstart/agents/product_suggestion_agent.py @@ -17,9 +17,7 @@ ################################################################################# import json import logging -from typing import List - -from pydantic import BaseModel +from typing import TYPE_CHECKING from flink_agents.api.agent import Agent from flink_agents.api.chat_message import ChatMessage, MessageRole @@ -33,39 +31,18 @@ from flink_agents.api.events.event import InputEvent, OutputEvent from flink_agents.api.prompts.prompt import Prompt from flink_agents.api.resource import ResourceDescriptor from flink_agents.api.runner_context import RunnerContext +from flink_agents.examples.quickstart.agents.custom_types_and_resources import ( + ProductSuggestion, + product_suggestion_prompt, +) from flink_agents.integrations.chat_models.ollama_chat_model import ( OllamaChatModelSetup, ) - -class ProductReviewSummary(BaseModel): - """Aggregates multiple reviews and insights using LLM for a product. - - Attributes: - id (str): The unique identifier of the product. - score_hist (List[str]): A collection of rating scores from various reviews. - unsatisfied_reasons (List[str]): A list of reasons or insights generated by LLM - to explain the rating. - """ - - id: str - score_hist: List[str] - unsatisfied_reasons: List[str] - - -class ProductSuggestion(BaseModel): - """Provides a summary of review data including suggestions for improvement. - - Attributes: - id (str): The unique identifier of the product. - score_histogram (List[int]): A collection of rating scores from various reviews. - suggestions (List[str]): Suggestions or recommendations generated as a result of - review analysis. - """ - - id: str - score_hist: List[str] - suggestions: List[str] +if TYPE_CHECKING: + from flink_agents.examples.quickstart.agents.custom_types_and_resources import ( + ProductReviewSummary, + ) class ProductSuggestionAgent(Agent): @@ -84,29 +61,7 @@ class ProductSuggestionAgent(Agent): """Generate product suggestions based on the rating distribution and user dissatisfaction reasons. """ - prompt_str = """ - Based on the rating distribution and user dissatisfaction reasons, generate three actionable suggestions for product improvement. - - Input format: - {{ - "id": "1", - "score_histogram": ["10%", "20%", "10%", "15%", "45%"], - "unsatisfied_reasons": ["reason1", "reason2", "reason3"] - }} - - Ensure that your response can be parsed by Python json,use the following format as an example: - {{ - "suggestion_list": [ - "suggestion1", - "suggestion2", - "suggestion3" - ] - }} - - input: - {input} - """ - return Prompt.from_text(prompt_str) + return product_suggestion_prompt @chat_model_setup @staticmethod diff --git a/python/flink_agents/examples/quickstart/agents/review_analysis_agent.py b/python/flink_agents/examples/quickstart/agents/review_analysis_agent.py index 0cfa0f4..fe5917c 100644 --- a/python/flink_agents/examples/quickstart/agents/review_analysis_agent.py +++ b/python/flink_agents/examples/quickstart/agents/review_analysis_agent.py @@ -17,8 +17,7 @@ ################################################################################# import json import logging - -from pydantic import BaseModel +from typing import TYPE_CHECKING from flink_agents.api.agent import Agent from flink_agents.api.chat_message import ChatMessage, MessageRole @@ -26,48 +25,26 @@ from flink_agents.api.decorators import ( action, chat_model_setup, prompt, + tool, ) from flink_agents.api.events.chat_event import ChatRequestEvent, ChatResponseEvent from flink_agents.api.events.event import InputEvent, OutputEvent from flink_agents.api.prompts.prompt import Prompt from flink_agents.api.resource import ResourceDescriptor from flink_agents.api.runner_context import RunnerContext +from flink_agents.examples.quickstart.agents.custom_types_and_resources import ( + ProductReviewAnalysisRes, + notify_shipping_manager, + review_analysis_prompt, +) from flink_agents.integrations.chat_models.ollama_chat_model import ( OllamaChatModelSetup, ) - -class ProductReview(BaseModel): - """Data model representing a product review. - - Attributes: - ---------- - id : str - The unique identifier for the product being reviewed. - review : str - The review of the product. - """ - - id: str - review: str - - -class ProductReviewAnalysisRes(BaseModel): - """Data model representing analysis result of a product review. - - Attributes: - ---------- - id : str - The unique identifier for the product being reviewed. - score : int - The satisfaction score given by the reviewer. - reasons : List[str] - A list of reasons provided by the reviewer for dissatisfaction, if any. - """ - - id: str - score: int - reasons: list[str] +if TYPE_CHECKING: + from flink_agents.examples.quickstart.agents.custom_types_and_resources import ( + ProductReview, + ) class ReviewAnalysisAgent(Agent): @@ -83,35 +60,36 @@ class ReviewAnalysisAgent(Agent): @staticmethod def review_analysis_prompt() -> Prompt: """Prompt for review analysis.""" - prompt_str = """ - Analyze the user review and product information to determine a - satisfaction score (1-5) and potential reasons for dissatisfaction. - - Example input format: - {{ - "id": "12345", - "review": "The headphones broke after one week of use. Very poor quality." - }} + return review_analysis_prompt - Ensure your response can be parsed by Python JSON, using this format as an example: - {{ - "score": 1, - "reasons": [ - "poor quality" - ] - }} - - input: - {input} - """ - return Prompt.from_text(prompt_str) + @tool + @staticmethod + def notify_shipping_manager(id: str, review: str) -> None: + """Notify the shipping manager when product received a negative review due to + shipping damage. + + Parameters + ---------- + id : str + The id of the product that received a negative review due to shipping damage + review: str + The negative review content + """ + # reuse the declared function, but for parsing the tool metadata, we write doc + # string here again. + notify_shipping_manager(id=id, review=review) @chat_model_setup @staticmethod def review_analysis_model() -> ResourceDescriptor: """ChatModel which focus on review analysis.""" - return ResourceDescriptor(clazz=OllamaChatModelSetup, connection="ollama_server", - prompt="review_analysis_prompt", extract_reasoning=True) + return ResourceDescriptor( + clazz=OllamaChatModelSetup, + connection="ollama_server", + prompt="review_analysis_prompt", + tools=["notify_shipping_manager"], + extract_reasoning=True, + ) @action(InputEvent) @staticmethod diff --git a/python/flink_agents/examples/quickstart/product_review_analysis.py b/python/flink_agents/examples/quickstart/react_agent_example.py similarity index 63% copy from python/flink_agents/examples/quickstart/product_review_analysis.py copy to python/flink_agents/examples/quickstart/react_agent_example.py index 99a52b9..4227b58 100644 --- a/python/flink_agents/examples/quickstart/product_review_analysis.py +++ b/python/flink_agents/examples/quickstart/react_agent_example.py @@ -21,45 +21,53 @@ from pyflink.common import Duration, WatermarkStrategy from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors.file_system import FileSource, StreamFormat +from flink_agents.api.agents.react_agent import ReActAgent from flink_agents.api.execution_environment import AgentsExecutionEnvironment from flink_agents.api.resource import ResourceDescriptor -from flink_agents.examples.quickstart.agents.review_analysis_agent import ( +from flink_agents.api.tools.tool import Tool +from flink_agents.examples.quickstart.agents.custom_types_and_resources import ( ProductReview, - ReviewAnalysisAgent, + ProductReviewAnalysisRes, + notify_shipping_manager, + review_analysis_react_prompt, ) from flink_agents.integrations.chat_models.ollama_chat_model import ( OllamaChatModelConnection, + OllamaChatModelSetup, ) current_dir = Path(__file__).parent def main() -> None: - """Main function for the product review analysis quickstart example. + """Main function for the product shipping question record quickstart example. - This example demonstrates how to use Flink Agents to analyze product reviews in a - streaming pipeline. The pipeline reads product reviews from a file, deserializes - each review, and uses an LLM agent to extract review scores and unsatisfied reasons. - The results are printed to stdout. This serves as a minimal, end-to-end example of - integrating LLM-powered agents with Flink streaming jobs. + This example demonstrates how to use the Flink Agents to analyze product reviews + and record shipping questions in a streaming pipeline. The pipeline reads product + reviews from a file, deserializes each review, and uses an LLM agent to extract + review scores and unsatisfied reasons. If the unsatisfied reasons are related to + shipping, the agent will notify the shipping manager. This serves as a minimal, + end-to-end example of integrating LLM-powered react agent with Flink streaming jobs. """ # Set up the Flink streaming environment and the Agents execution environment. env = StreamExecutionEnvironment.get_execution_environment() agents_env = AgentsExecutionEnvironment.get_execution_environment(env) - # Add Ollama chat model connection to be used by the ReviewAnalysisAgent. + # Add Ollama chat model connection and record shipping question tool to be used + # by the Agent. agents_env.add_resource( "ollama_server", ResourceDescriptor( clazz=OllamaChatModelConnection, model="qwen3:8b", request_timeout=120 ), - ) + ).add_resource("notify_shipping_manager", Tool.from_callable(notify_shipping_manager)) # Read product reviews from a text file as a streaming source. # Each line in the file should be a JSON string representing a ProductReview. product_review_stream = env.from_source( source=FileSource.for_record_stream_format( - StreamFormat.text_line_format(), f"file:///{current_dir}/resources" + StreamFormat.text_line_format(), + f"file:///{current_dir}/resources/", ) .monitor_continuously(Duration.of_minutes(1)) .build(), @@ -71,12 +79,24 @@ def main() -> None: ) # Deserialize JSON to ProductReview. ) - # Use the ReviewAnalysisAgent to analyze each product review. + # Create react agent + review_analysis_react_agent = ReActAgent( + chat_model=ResourceDescriptor( + clazz=OllamaChatModelSetup, + connection="ollama_server", + tools=["notify_shipping_manager"], + ), + prompt=review_analysis_react_prompt, + output_schema=ProductReviewAnalysisRes, + ) + + # Use the ReAct agent to analyze each product review and + # record shipping question. review_analysis_res_stream = ( agents_env.from_datastream( input=product_review_stream, key_selector=lambda x: x.id ) - .apply(ReviewAnalysisAgent()) + .apply(review_analysis_react_agent) .to_datastream() ) diff --git a/python/flink_agents/examples/quickstart/resources/product_review.txt b/python/flink_agents/examples/quickstart/resources/product_review.txt index 8de590a..0e4fae3 100644 --- a/python/flink_agents/examples/quickstart/resources/product_review.txt +++ b/python/flink_agents/examples/quickstart/resources/product_review.txt @@ -1,5 +1,6 @@ {"id": "B010RRWKT4", "review": "Love these! I have 3 pairs...they're so comfortable - I'm on my feet all day & my feet actually don't hurt at the end of the day when I wear these :)\nI've learned to get these 1/2 size smaller....I'm normally a size 7 and the 6 1/2 fit perfectly"} {"id": "B000YFSR4W", "review": "comfy fit"} +{"id": "B000YFSR4W", "review": "The magazine was already damaged when I received it — the whole book was wet!"} {"id": "B010RRWKT4", "review": "I ordered a 1/2 size smaller than my usual size, and they are perfect. These shoes are hard to find. Great customer service as well."} {"id": "B001IKJOLW", "review": "I wear them for Zumba and my heel slips on turns. A little annoying because I have to pay attention but they are so cute and comfy, it balances out."} {"id": "B009MA34NY", "review": "I love these, I had the previous iteration (or maybe two earlier?) and finally wore them out after several years. They are definitely a light-weight training shoe so I wouldn't get them if you are going to be doing a lot of running. But this also makes them extremely comfortable, I hadn't realized how hot running shoes made my feet until I switched to these. Good support and my feet stay cool all day, also noticeably light but well-made and long lasting.\n [...] @@ -15,6 +16,7 @@ {"id": "B0058YEJ5K", "review": "second pair. Comfortable and great for people with bunion issues or previous surgeries. Sole wears great"} {"id": "B010RRWKT4", "review": "Fit is fine, but not much cushioning in the footbed. The first time wearing them on a long walk I developed a blister on the bottom of my foot."} {"id": "B0058YEJ5K", "review": "daughter loves them"} +{"id": "B001IKJOLW", "review": "The magazine was already damaged when I received it — the whole book was wet!"} {"id": "B000YFSR5G", "review": "Did not fit well. Was not comfortable. Switched to \"Fruit of the loom\". Fruit of the loom are somewhat oversized but fit well and are very comfortable."} {"id": "B0014F7B98", "review": "Cute and comfortable"} {"id": "B000YFSR5G", "review": "As described."} @@ -153,6 +155,7 @@ {"id": "B0092UF54A", "review": "Took other reviewers advice and went up half size and it was too large. I thought this was a more sock-like fit but it had a tongue that came up high on my ankle not at all sock like. Nice looking and very light weight with a rounded square toe that left plenty of room front foot and toes. These just weren't what I was looking for."} {"id": "B009MA34NY", "review": "i bought a pair from DSW for 50$ and they are very comfortable but i bought a size 6.5 when I normally where a size 7. and even the 6.5 feels a little roomy. I have wide flat feet btw. im not sure if I will keep them even though i love their look and comfort. I've been having feet problems while taking a gym class and I think they won't give me enough support when running. they're great for wearing at home or gym and doing exercises in one place- extr [...] {"id": "B0014F7B98", "review": "Fit my size perfectly and very comfortable!"} +{"id": "B014IBJKNO", "review": "The magazine was already damaged when I received it — the whole book was wet!"} {"id": "B0092UF54A", "review": "I have gotten so many compliments on the blue color and look of this shoe. It is super comfortable. The grip is wonderful for my indoor gym cross training classes. I always order a half size larger in Nike brand shoes and thy fit perfectly as I did with this shoe."} {"id": "B0058YEJ5K", "review": "This pair fits perfect.....the first pair was wrong size in the right box.....definitely order again!!!!!"} {"id": "B005AGO4LU", "review": "Perfect fit, very comfortable, and a great color. They also fit as expected."} diff --git a/python/flink_agents/examples/quickstart/product_improve_suggestion.py b/python/flink_agents/examples/quickstart/workflow_multiple_agent_example.py similarity index 95% rename from python/flink_agents/examples/quickstart/product_improve_suggestion.py rename to python/flink_agents/examples/quickstart/workflow_multiple_agent_example.py index 542a3e9..2ca920f 100644 --- a/python/flink_agents/examples/quickstart/product_improve_suggestion.py +++ b/python/flink_agents/examples/quickstart/workflow_multiple_agent_example.py @@ -27,7 +27,9 @@ from pyflink.datastream.connectors.file_system import FileSource, StreamFormat from pyflink.datastream.window import TumblingProcessingTimeWindows from flink_agents.api.execution_environment import AgentsExecutionEnvironment -from flink_agents.api.resource import ResourceDescriptor +from flink_agents.examples.quickstart.agents.custom_types_and_resources import ( + ollama_server_descriptor, +) from flink_agents.examples.quickstart.agents.product_suggestion_agent import ( ProductReviewSummary, ProductSuggestionAgent, @@ -37,9 +39,6 @@ from flink_agents.examples.quickstart.agents.review_analysis_agent import ( ProductReviewAnalysisRes, ReviewAnalysisAgent, ) -from flink_agents.integrations.chat_models.ollama_chat_model import ( - OllamaChatModelConnection, -) current_dir = Path(__file__).parent @@ -93,9 +92,7 @@ def main() -> None: # and ProductSuggestionAgent. agents_env.add_resource( "ollama_server", - ResourceDescriptor( - clazz=OllamaChatModelConnection, model="qwen3:8b", request_timeout=120 - ), + ollama_server_descriptor, ) # Read product reviews from a text file as a streaming source. diff --git a/python/flink_agents/examples/quickstart/product_review_analysis.py b/python/flink_agents/examples/quickstart/workflow_single_agent_example.py similarity index 92% rename from python/flink_agents/examples/quickstart/product_review_analysis.py rename to python/flink_agents/examples/quickstart/workflow_single_agent_example.py index 99a52b9..59039ee 100644 --- a/python/flink_agents/examples/quickstart/product_review_analysis.py +++ b/python/flink_agents/examples/quickstart/workflow_single_agent_example.py @@ -22,13 +22,12 @@ from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors.file_system import FileSource, StreamFormat from flink_agents.api.execution_environment import AgentsExecutionEnvironment -from flink_agents.api.resource import ResourceDescriptor -from flink_agents.examples.quickstart.agents.review_analysis_agent import ( +from flink_agents.examples.quickstart.agents.custom_types_and_resources import ( ProductReview, - ReviewAnalysisAgent, + ollama_server_descriptor, ) -from flink_agents.integrations.chat_models.ollama_chat_model import ( - OllamaChatModelConnection, +from flink_agents.examples.quickstart.agents.review_analysis_agent import ( + ReviewAnalysisAgent, ) current_dir = Path(__file__).parent @@ -50,9 +49,7 @@ def main() -> None: # Add Ollama chat model connection to be used by the ReviewAnalysisAgent. agents_env.add_resource( "ollama_server", - ResourceDescriptor( - clazz=OllamaChatModelConnection, model="qwen3:8b", request_timeout=120 - ), + ollama_server_descriptor, ) # Read product reviews from a text file as a streaming source.
