This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
commit b547931faf0a2144fa197304c4f6819cbc530d6f Author: WenjinXie <[email protected]> AuthorDate: Thu Oct 9 11:09:57 2025 +0800 [hotfix] Support read input data from jar. --- .../flink/agents/examples/ReActAgentExample.java | 17 ++++---- .../examples/WorkflowMultipleAgentExample.java | 12 ++---- .../examples/WorkflowSingleAgentExample.java | 46 ++++++++++++++++++---- 3 files changed, 49 insertions(+), 26 deletions(-) diff --git a/examples/src/main/java/org/apache/flink/agents/examples/ReActAgentExample.java b/examples/src/main/java/org/apache/flink/agents/examples/ReActAgentExample.java index 1ac96ac..e2d6a3c 100644 --- a/examples/src/main/java/org/apache/flink/agents/examples/ReActAgentExample.java +++ b/examples/src/main/java/org/apache/flink/agents/examples/ReActAgentExample.java @@ -35,11 +35,11 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.types.Row; +import java.io.File; import java.time.Duration; import java.util.Collections; -import java.util.Objects; -import static org.apache.flink.agents.api.resource.ResourceType.TOOL; +import static org.apache.flink.agents.examples.WorkflowSingleAgentExample.copyResource; /** * Java example demonstrating the ReActAgent for product review analysis and shipping notification. @@ -92,24 +92,21 @@ public class ReActAgentExample { CustomTypesAndResources.OLLAMA_SERVER_DESCRIPTOR) .addResource( "notifyShippingManager", - TOOL, + ResourceType.TOOL, org.apache.flink.agents.api.tools.Tool.fromMethod( ReActAgentExample.class.getMethod( "notifyShippingManager", String.class, String.class))); // Read product reviews from input_data.txt file as a streaming source. // Each element represents a ProductReview. + + File inputDataFile = copyResource("input_data.txt"); + DataStream<Row> productReviewStream = env.fromSource( FileSource.forRecordStreamFormat( new TextLineFormat(), - new Path( - Objects.requireNonNull( - ReActAgentExample.class - .getClassLoader() - .getResource( - "input_data.txt")) - .getPath())) + new Path(inputDataFile.getAbsolutePath())) .monitorContinuously(Duration.ofMinutes(1)) .build(), WatermarkStrategy.noWatermarks(), diff --git a/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java b/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java index 65c8ddc..8703e9e 100644 --- a/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java +++ b/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java @@ -36,10 +36,11 @@ import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTime import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; +import java.io.File; import java.util.ArrayList; import java.util.List; -import java.util.Objects; +import static org.apache.flink.agents.examples.WorkflowSingleAgentExample.copyResource; import static org.apache.flink.agents.examples.agents.CustomTypesAndResources.ProductReviewAnalysisRes; import static org.apache.flink.agents.examples.agents.CustomTypesAndResources.ProductReviewSummary; import static org.apache.flink.streaming.api.windowing.time.Time.minutes; @@ -137,17 +138,12 @@ public class WorkflowMultipleAgentExample { // Read product reviews from input_data.txt file as a streaming source. // Each element represents a ProductReview. + File inputDataFile = copyResource("input_data.txt"); DataStream<String> productReviewStream = env.fromSource( FileSource.forRecordStreamFormat( new TextLineInputFormat(), - new Path( - Objects.requireNonNull( - WorkflowSingleAgentExample.class - .getClassLoader() - .getResource( - "input_data.txt")) - .getPath())) + new Path(inputDataFile.getAbsolutePath())) .build(), WatermarkStrategy.noWatermarks(), "streaming-agent-example"); diff --git a/examples/src/main/java/org/apache/flink/agents/examples/WorkflowSingleAgentExample.java b/examples/src/main/java/org/apache/flink/agents/examples/WorkflowSingleAgentExample.java index 19a2b80..f685421 100644 --- a/examples/src/main/java/org/apache/flink/agents/examples/WorkflowSingleAgentExample.java +++ b/examples/src/main/java/org/apache/flink/agents/examples/WorkflowSingleAgentExample.java @@ -27,8 +27,14 @@ import org.apache.flink.connector.file.src.reader.TextLineInputFormat; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.FileUtils; -import java.util.Objects; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; /** * Java example demonstrating a single workflow agent for product review analysis. @@ -57,17 +63,12 @@ public class WorkflowSingleAgentExample { // Read product reviews from input_data.txt file as a streaming source. // Each element represents a ProductReview. + File inputDataFile = copyResource("input_data.txt"); DataStream<String> productReviewStream = env.fromSource( FileSource.forRecordStreamFormat( new TextLineInputFormat(), - new Path( - Objects.requireNonNull( - WorkflowSingleAgentExample.class - .getClassLoader() - .getResource( - "input_data.txt")) - .getPath())) + new Path(inputDataFile.getAbsolutePath())) .build(), WatermarkStrategy.noWatermarks(), "streaming-agent-example"); @@ -85,4 +86,33 @@ public class WorkflowSingleAgentExample { // Execute the Flink pipeline. agentsEnv.execute(); } + + public static File copyResource(String resourcePath) throws Exception { + try (InputStream inputStream = + ReActAgentExample.class.getClassLoader().getResourceAsStream(resourcePath)) { + if (inputStream == null) { + throw new FileNotFoundException(resourcePath + " not found"); + } + + String[] parts = resourcePath.split("/"); + String filename = parts.length < 1 ? null : parts[parts.length - 1]; + if (filename == null || filename.isEmpty()) { + throw new IllegalArgumentException("Invalid resource path: " + resourcePath); + } + + File tmpDir = + new File( + System.getProperty("java.io.tmpdir"), + "flink-agents-" + System.nanoTime()); + if (!tmpDir.mkdirs()) { + throw new IOException("Cannot mkdirs: " + tmpDir.getAbsolutePath()); + } + Runtime.getRuntime() + .addShutdownHook(new Thread(() -> FileUtils.deleteDirectoryQuietly(tmpDir))); + + File targetFile = new File(tmpDir, filename); + Files.copy(inputStream, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + return targetFile; + } + } }
