letaoj commented on code in PR #225: URL: https://github.com/apache/flink-agents/pull/225#discussion_r2384746754
########## examples/src/main/resources/input_data.txt: ########## Review Comment: updated ########## examples/src/main/java/org/apache/flink/agents/examples/agents/ReviewAnalysisAgent.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.examples.agents; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.agents.api.Agent; +import org.apache.flink.agents.api.InputEvent; +import org.apache.flink.agents.api.OutputEvent; +import org.apache.flink.agents.api.annotation.Action; +import org.apache.flink.agents.api.annotation.ChatModelConnection; +import org.apache.flink.agents.api.annotation.ChatModelSetup; +import org.apache.flink.agents.api.annotation.Prompt; +import org.apache.flink.agents.api.annotation.Tool; +import org.apache.flink.agents.api.annotation.ToolParam; +import org.apache.flink.agents.api.chat.messages.ChatMessage; +import org.apache.flink.agents.api.chat.messages.MessageRole; +import org.apache.flink.agents.api.context.RunnerContext; +import org.apache.flink.agents.api.event.ChatRequestEvent; +import org.apache.flink.agents.api.event.ChatResponseEvent; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelConnection; +import org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelSetup; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.agents.examples.agents.CustomTypesAndResources.REVIEW_ANALYSIS_PROMPT; + +/** + * An agent that uses a large language model (LLM) to analyze product reviews and generate a + * satisfaction score and potential reasons for dissatisfaction. + * + * <p>This agent receives a product review and produces a satisfaction score and a list of reasons + * for dissatisfaction. It handles prompt construction, LLM interaction, and output parsing. + */ +public class ReviewAnalysisAgent extends Agent { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @ChatModelConnection + public static ResourceDescriptor ollamaChatModelConnection() { + return ResourceDescriptor.Builder.newBuilder(OllamaChatModelConnection.class.getName()) + .addInitialArgument("endpoint", "http://localhost:11434") + .build(); + } + + @Prompt + public static org.apache.flink.agents.api.prompt.Prompt reviewAnalysisPrompt() { + return REVIEW_ANALYSIS_PROMPT; + } + + @ChatModelSetup + public static ResourceDescriptor reviewAnalysisModel() { + return ResourceDescriptor.Builder.newBuilder(OllamaChatModelSetup.class.getName()) + .addInitialArgument("connection", "ollamaChatModelConnection") + .addInitialArgument("model", "qwen3:8b") + .addInitialArgument("prompt", "reviewAnalysisPrompt") + .addInitialArgument("tools", Collections.singletonList("notifyShippingManager")) + .addInitialArgument("extract_reasoning", "true") + .build(); + } + + /** + * Tool for notifying the shipping manager when product received a negative review due to + * shipping damage. + * + * @param id The id of the product that received a negative review due to shipping damage + * @param review The negative review content + */ + @Tool( + description = + "Notify the shipping manager when product received a negative review due to shipping damage.") + public static void notifyShippingManager( + @ToolParam(name = "id") String id, @ToolParam(name = "review") String review) { + CustomTypesAndResources.notifyShippingManager(id, review); + } + + /** Process input event and send chat request for review analysis. */ + @Action(listenEvents = {InputEvent.class}) + public static void processInput(InputEvent event, RunnerContext ctx) throws Exception { + String input = (String) event.getInput(); + MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + CustomTypesAndResources.ProductReview inputObj = + MAPPER.readValue(input, CustomTypesAndResources.ProductReview.class); + + ctx.getShortTermMemory().set("id", inputObj.getId()); + + String content = + String.format( + "{\n" + "\"id\": %s,\n" + "\"review\": \"%s\"\n" + "}", + inputObj.getId(), inputObj.getReview()); + ChatMessage msg = new ChatMessage(MessageRole.USER, content); Review Comment: Thanks for the detailed explanation. I was not able to figure it out and thought it might be a model issue :| ########## examples/src/main/java/org/apache/flink/agents/examples/agents/CustomTypesAndResources.java: ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.examples.agents; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.flink.agents.api.chat.messages.ChatMessage; +import org.apache.flink.agents.api.chat.messages.MessageRole; +import org.apache.flink.agents.api.prompt.Prompt; +import org.apache.flink.agents.api.resource.ResourceDescriptor; +import org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelConnection; + +import java.util.Arrays; +import java.util.List; + +/** + * Custom types and resources for the quickstart agents. + * + * <p>This class contains the Java equivalents of the Python custom types and resources, including + * data models, prompts, and tools for product review analysis. + */ +public class CustomTypesAndResources { + + // Prompt for review analysis agent + public static final String REVIEW_ANALYSIS_SYSTEM_PROMPT_STR = + "Analyze the user review and product information to determine a " + + "satisfaction score (1-5) and potential reasons for dissatisfaction.\n\n" + + "Example input format:\n" + + "{\n" + + " \"id\": \"12345\",\n" + + " \"review\": \"The headphones broke after one week of use. Very high quality.\"\n" + + "}\n\n" + + "Ensure your response can be parsed by Java JSON, using this format as an example:\n" + + "{\n" + + " \"id\": \"12345\",\n" + + " \"score\": {score},\n" + + " \"reasons\": [\n" + + " \"{reason}\"\n" + + " ]\n" + + "}\n\n" + + "Please note that if a product review includes dissatisfaction with the shipping process,\n" + + "you should first notify the shipping manager using the appropriate tools. After executing\n" + + "the tools, strictly follow the example above to provide your score and reason — there is\n" + + "no need to disclose whether the tool was used."; + + public static final Prompt REVIEW_ANALYSIS_PROMPT = + new Prompt( + Arrays.asList( + new ChatMessage(MessageRole.SYSTEM, REVIEW_ANALYSIS_SYSTEM_PROMPT_STR), + new ChatMessage(MessageRole.USER, "\"input\":\n" + "{input}"))); + + // Prompt for review analysis react agent + public static final Prompt REVIEW_ANALYSIS_REACT_PROMPT = + new Prompt( + Arrays.asList( + new ChatMessage(MessageRole.SYSTEM, REVIEW_ANALYSIS_SYSTEM_PROMPT_STR), + new ChatMessage( + MessageRole.USER, "\"id\": {id},\n" + "\"review\": {review}"))); + + // Prompt for product suggestion agent + public static final String PRODUCT_SUGGESTION_PROMPT_STR = + "Based on the rating distribution and user dissatisfaction reasons, generate three actionable suggestions for product improvement.\n\n" + + "Input format:\n" + + "{\n" + + " \"id\": \"1\",\n" + + " \"score_histogram\": [\"10%\", \"20%\", \"10%\", \"15%\", \"45%\"],\n" + + " \"unsatisfied_reasons\": [\"reason1\", \"reason2\", \"reason3\"]\n" + + "}\n\n" + + "Ensure that your response can be parsed by Java JSON, use the following format as an example:\n" + + "{\n" + + " \"suggestion_list\": [\n" + + " \"suggestion1\",\n" + + " \"suggestion2\",\n" + + " \"suggestion3\"\n" + + " ]\n" + + "}\n\n" + + "input:\n" + + "{input}"; + + public static final Prompt PRODUCT_SUGGESTION_PROMPT = + new Prompt(PRODUCT_SUGGESTION_PROMPT_STR); + + /** + * Tool for notifying the shipping manager when product received a negative review due to + * shipping damage. + * + * @param id The id of the product that received a negative review due to shipping damage + * @param review The negative review content + */ + public static String notifyShippingManager(String id, String review) { + String content = + String.format( + "Transportation issue for product [%s], the customer feedback: %s", + id, review); + System.out.println(content); + return content; + } + + // Ollama chat model connection descriptor + public static final ResourceDescriptor OLLAMA_SERVER_DESCRIPTOR = + ResourceDescriptor.Builder.newBuilder(OllamaChatModelConnection.class.getName()) + .addInitialArgument("model", "qwen3:8b") Review Comment: Removed. ########## examples/src/main/java/org/apache/flink/agents/examples/WorkflowSingleAgentExample.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.agents.examples; + +import org.apache.flink.agents.api.AgentsExecutionEnvironment; +import org.apache.flink.agents.examples.agents.CustomTypesAndResources; +import org.apache.flink.agents.examples.agents.ReviewAnalysisAgent; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.connector.file.src.FileSource; +import org.apache.flink.connector.file.src.reader.TextLineFormat; +import org.apache.flink.core.fs.Path; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +/** + * Java example demonstrating a single workflow agent for product review analysis. + * + * <p>This example demonstrates how to use Flink Agents to analyze product reviews in a streaming + * pipeline. The pipeline reads product reviews from a source, deserializes each review, and uses an + * LLM agent to extract review scores and unsatisfied reasons. The results are printed to stdout. + * This serves as a minimal, end-to-end example of integrating LLM-powered agents with Flink + * streaming jobs. + */ +public class WorkflowSingleAgentExample { + + /** Runs the example pipeline. */ + public static void main(String[] args) throws Exception { + // Set up the Flink streaming environment and the Agents execution environment. + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + AgentsExecutionEnvironment agentsEnv = + AgentsExecutionEnvironment.getExecutionEnvironment(env); + + // Add Ollama chat model connection to be used by the ReviewAnalysisAgent. + agentsEnv.addChatModelConnection( + "ollama_server", CustomTypesAndResources.OLLAMA_SERVER_DESCRIPTOR); + + // Read product reviews from input_data.txt file as a streaming source. + // Each element represents a ProductReview. + DataStream<String> productReviewStream = + env.fromSource( + FileSource.forRecordStreamFormat( + new TextLineFormat(), + new Path("src/main/resources/input_data.txt")) Review Comment: Thank for the suggestion. Updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
