letaoj commented on code in PR #225:
URL: https://github.com/apache/flink-agents/pull/225#discussion_r2384746754


##########
examples/src/main/resources/input_data.txt:
##########


Review Comment:
   updated



##########
examples/src/main/java/org/apache/flink/agents/examples/agents/ReviewAnalysisAgent.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.examples.agents;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.Agent;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.api.annotation.Action;
+import org.apache.flink.agents.api.annotation.ChatModelConnection;
+import org.apache.flink.agents.api.annotation.ChatModelSetup;
+import org.apache.flink.agents.api.annotation.Prompt;
+import org.apache.flink.agents.api.annotation.Tool;
+import org.apache.flink.agents.api.annotation.ToolParam;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.api.event.ChatRequestEvent;
+import org.apache.flink.agents.api.event.ChatResponseEvent;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import 
org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelConnection;
+import 
org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelSetup;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.agents.examples.agents.CustomTypesAndResources.REVIEW_ANALYSIS_PROMPT;
+
+/**
+ * An agent that uses a large language model (LLM) to analyze product reviews 
and generate a
+ * satisfaction score and potential reasons for dissatisfaction.
+ *
+ * <p>This agent receives a product review and produces a satisfaction score 
and a list of reasons
+ * for dissatisfaction. It handles prompt construction, LLM interaction, and 
output parsing.
+ */
+public class ReviewAnalysisAgent extends Agent {
+
+    private static final ObjectMapper MAPPER = new ObjectMapper();
+
+    @ChatModelConnection
+    public static ResourceDescriptor ollamaChatModelConnection() {
+        return 
ResourceDescriptor.Builder.newBuilder(OllamaChatModelConnection.class.getName())
+                .addInitialArgument("endpoint", "http://localhost:11434";)
+                .build();
+    }
+
+    @Prompt
+    public static org.apache.flink.agents.api.prompt.Prompt 
reviewAnalysisPrompt() {
+        return REVIEW_ANALYSIS_PROMPT;
+    }
+
+    @ChatModelSetup
+    public static ResourceDescriptor reviewAnalysisModel() {
+        return 
ResourceDescriptor.Builder.newBuilder(OllamaChatModelSetup.class.getName())
+                .addInitialArgument("connection", "ollamaChatModelConnection")
+                .addInitialArgument("model", "qwen3:8b")
+                .addInitialArgument("prompt", "reviewAnalysisPrompt")
+                .addInitialArgument("tools", 
Collections.singletonList("notifyShippingManager"))
+                .addInitialArgument("extract_reasoning", "true")
+                .build();
+    }
+
+    /**
+     * Tool for notifying the shipping manager when product received a 
negative review due to
+     * shipping damage.
+     *
+     * @param id The id of the product that received a negative review due to 
shipping damage
+     * @param review The negative review content
+     */
+    @Tool(
+            description =
+                    "Notify the shipping manager when product received a 
negative review due to shipping damage.")
+    public static void notifyShippingManager(
+            @ToolParam(name = "id") String id, @ToolParam(name = "review") 
String review) {
+        CustomTypesAndResources.notifyShippingManager(id, review);
+    }
+
+    /** Process input event and send chat request for review analysis. */
+    @Action(listenEvents = {InputEvent.class})
+    public static void processInput(InputEvent event, RunnerContext ctx) 
throws Exception {
+        String input = (String) event.getInput();
+        MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+        CustomTypesAndResources.ProductReview inputObj =
+                MAPPER.readValue(input, 
CustomTypesAndResources.ProductReview.class);
+
+        ctx.getShortTermMemory().set("id", inputObj.getId());
+
+        String content =
+                String.format(
+                        "{\n" + "\"id\": %s,\n" + "\"review\": \"%s\"\n" + "}",
+                        inputObj.getId(), inputObj.getReview());
+        ChatMessage msg = new ChatMessage(MessageRole.USER, content);

Review Comment:
   Thanks for the detailed explanation. I was not able to figure it out and 
thought it might be a model issue :|



##########
examples/src/main/java/org/apache/flink/agents/examples/agents/CustomTypesAndResources.java:
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.examples.agents;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.prompt.Prompt;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import 
org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelConnection;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Custom types and resources for the quickstart agents.
+ *
+ * <p>This class contains the Java equivalents of the Python custom types and 
resources, including
+ * data models, prompts, and tools for product review analysis.
+ */
+public class CustomTypesAndResources {
+
+    // Prompt for review analysis agent
+    public static final String REVIEW_ANALYSIS_SYSTEM_PROMPT_STR =
+            "Analyze the user review and product information to determine a "
+                    + "satisfaction score (1-5) and potential reasons for 
dissatisfaction.\n\n"
+                    + "Example input format:\n"
+                    + "{\n"
+                    + "    \"id\": \"12345\",\n"
+                    + "    \"review\": \"The headphones broke after one week 
of use. Very high quality.\"\n"
+                    + "}\n\n"
+                    + "Ensure your response can be parsed by Java JSON, using 
this format as an example:\n"
+                    + "{\n"
+                    + " \"id\": \"12345\",\n"
+                    + " \"score\": {score},\n"
+                    + " \"reasons\": [\n"
+                    + "   \"{reason}\"\n"
+                    + "   ]\n"
+                    + "}\n\n"
+                    + "Please note that if a product review includes 
dissatisfaction with the shipping process,\n"
+                    + "you should first notify the shipping manager using the 
appropriate tools. After executing\n"
+                    + "the tools, strictly follow the example above to provide 
your score and reason — there is\n"
+                    + "no need to disclose whether the tool was used.";
+
+    public static final Prompt REVIEW_ANALYSIS_PROMPT =
+            new Prompt(
+                    Arrays.asList(
+                            new ChatMessage(MessageRole.SYSTEM, 
REVIEW_ANALYSIS_SYSTEM_PROMPT_STR),
+                            new ChatMessage(MessageRole.USER, "\"input\":\n" + 
"{input}")));
+
+    // Prompt for review analysis react agent
+    public static final Prompt REVIEW_ANALYSIS_REACT_PROMPT =
+            new Prompt(
+                    Arrays.asList(
+                            new ChatMessage(MessageRole.SYSTEM, 
REVIEW_ANALYSIS_SYSTEM_PROMPT_STR),
+                            new ChatMessage(
+                                    MessageRole.USER, "\"id\": {id},\n" + 
"\"review\": {review}")));
+
+    // Prompt for product suggestion agent
+    public static final String PRODUCT_SUGGESTION_PROMPT_STR =
+            "Based on the rating distribution and user dissatisfaction 
reasons, generate three actionable suggestions for product improvement.\n\n"
+                    + "Input format:\n"
+                    + "{\n"
+                    + "    \"id\": \"1\",\n"
+                    + "    \"score_histogram\": [\"10%\", \"20%\", \"10%\", 
\"15%\", \"45%\"],\n"
+                    + "    \"unsatisfied_reasons\": [\"reason1\", \"reason2\", 
\"reason3\"]\n"
+                    + "}\n\n"
+                    + "Ensure that your response can be parsed by Java JSON, 
use the following format as an example:\n"
+                    + "{\n"
+                    + "    \"suggestion_list\": [\n"
+                    + "        \"suggestion1\",\n"
+                    + "        \"suggestion2\",\n"
+                    + "        \"suggestion3\"\n"
+                    + "    ]\n"
+                    + "}\n\n"
+                    + "input:\n"
+                    + "{input}";
+
+    public static final Prompt PRODUCT_SUGGESTION_PROMPT =
+            new Prompt(PRODUCT_SUGGESTION_PROMPT_STR);
+
+    /**
+     * Tool for notifying the shipping manager when product received a 
negative review due to
+     * shipping damage.
+     *
+     * @param id The id of the product that received a negative review due to 
shipping damage
+     * @param review The negative review content
+     */
+    public static String notifyShippingManager(String id, String review) {
+        String content =
+                String.format(
+                        "Transportation issue for product [%s], the customer 
feedback: %s",
+                        id, review);
+        System.out.println(content);
+        return content;
+    }
+
+    // Ollama chat model connection descriptor
+    public static final ResourceDescriptor OLLAMA_SERVER_DESCRIPTOR =
+            
ResourceDescriptor.Builder.newBuilder(OllamaChatModelConnection.class.getName())
+                    .addInitialArgument("model", "qwen3:8b")

Review Comment:
   Removed.



##########
examples/src/main/java/org/apache/flink/agents/examples/WorkflowSingleAgentExample.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.examples;
+
+import org.apache.flink.agents.api.AgentsExecutionEnvironment;
+import org.apache.flink.agents.examples.agents.CustomTypesAndResources;
+import org.apache.flink.agents.examples.agents.ReviewAnalysisAgent;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineFormat;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * Java example demonstrating a single workflow agent for product review 
analysis.
+ *
+ * <p>This example demonstrates how to use Flink Agents to analyze product 
reviews in a streaming
+ * pipeline. The pipeline reads product reviews from a source, deserializes 
each review, and uses an
+ * LLM agent to extract review scores and unsatisfied reasons. The results are 
printed to stdout.
+ * This serves as a minimal, end-to-end example of integrating LLM-powered 
agents with Flink
+ * streaming jobs.
+ */
+public class WorkflowSingleAgentExample {
+
+    /** Runs the example pipeline. */
+    public static void main(String[] args) throws Exception {
+        // Set up the Flink streaming environment and the Agents execution 
environment.
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        AgentsExecutionEnvironment agentsEnv =
+                AgentsExecutionEnvironment.getExecutionEnvironment(env);
+
+        // Add Ollama chat model connection to be used by the 
ReviewAnalysisAgent.
+        agentsEnv.addChatModelConnection(
+                "ollama_server", 
CustomTypesAndResources.OLLAMA_SERVER_DESCRIPTOR);
+
+        // Read product reviews from input_data.txt file as a streaming source.
+        // Each element represents a ProductReview.
+        DataStream<String> productReviewStream =
+                env.fromSource(
+                        FileSource.forRecordStreamFormat(
+                                        new TextLineFormat(),
+                                        new 
Path("src/main/resources/input_data.txt"))

Review Comment:
   Thank for the suggestion. Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to