This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
commit 6a9ca8ca6a272a4b10dcb424e614ab03705c6947 Author: youjin <[email protected]> AuthorDate: Thu Dec 25 17:21:56 2025 +0800 [build] Update default flink to 2.2.0 --- .../flink/agents/examples/ReActAgentExample.java | 4 +- .../examples/WorkflowMultipleAgentExample.java | 4 +- .../plan/serializer/ActionJsonSerializer.java | 20 ++--- pom.xml | 15 +++- python/pyproject.toml | 2 +- .../runtime/actionstate/ActionStateUtil.java | 2 +- .../runtime/message/MessageTypeInformation.java | 86 ---------------------- .../runtime/operator/ActionExecutionOperator.java | 12 ++- .../operator/ActionExecutionOperatorFactory.java | 4 +- .../flink/agents/runtime/operator/StateUtils.java | 5 +- .../apache/flink/agents/runtime/RescalingTest.java | 4 +- 11 files changed, 49 insertions(+), 109 deletions(-) diff --git a/examples/src/main/java/org/apache/flink/agents/examples/ReActAgentExample.java b/examples/src/main/java/org/apache/flink/agents/examples/ReActAgentExample.java index e2d6a3c..23af1bb 100644 --- a/examples/src/main/java/org/apache/flink/agents/examples/ReActAgentExample.java +++ b/examples/src/main/java/org/apache/flink/agents/examples/ReActAgentExample.java @@ -29,7 +29,7 @@ import org.apache.flink.agents.examples.agents.CustomTypesAndResources; import org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelSetup; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.connector.file.src.FileSource; -import org.apache.flink.connector.file.src.reader.TextLineFormat; +import org.apache.flink.connector.file.src.reader.TextLineInputFormat; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -105,7 +105,7 @@ public class ReActAgentExample { DataStream<Row> productReviewStream = env.fromSource( FileSource.forRecordStreamFormat( - new TextLineFormat(), + new TextLineInputFormat(), new Path(inputDataFile.getAbsolutePath())) .monitorContinuously(Duration.ofMinutes(1)) .build(), diff --git a/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java b/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java index 8703e9e..fe91c58 100644 --- a/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java +++ b/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java @@ -37,13 +37,13 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.io.File; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import static org.apache.flink.agents.examples.WorkflowSingleAgentExample.copyResource; import static org.apache.flink.agents.examples.agents.CustomTypesAndResources.ProductReviewAnalysisRes; import static org.apache.flink.agents.examples.agents.CustomTypesAndResources.ProductReviewSummary; -import static org.apache.flink.streaming.api.windowing.time.Time.minutes; /** * Java example demonstrating multiple workflow agents for product improvement suggestion. @@ -164,7 +164,7 @@ public class WorkflowMultipleAgentExample { reviewAnalysisResStream .map(element -> (ProductReviewAnalysisRes) element) .keyBy(ProductReviewAnalysisRes::getId) - .window(TumblingProcessingTimeWindows.of(minutes(1))) + .window(TumblingProcessingTimeWindows.of(Duration.ofMinutes(1))) .process(new AggregateScoreDistributionAndDislikeReasons()); // Use the ProductSuggestionAgent (LLM) to generate product improvement diff --git a/plan/src/main/java/org/apache/flink/agents/plan/serializer/ActionJsonSerializer.java b/plan/src/main/java/org/apache/flink/agents/plan/serializer/ActionJsonSerializer.java index 837be92..77581da 100644 --- a/plan/src/main/java/org/apache/flink/agents/plan/serializer/ActionJsonSerializer.java +++ b/plan/src/main/java/org/apache/flink/agents/plan/serializer/ActionJsonSerializer.java @@ -80,21 +80,23 @@ public class ActionJsonSerializer extends StdSerializer<Action> { String configType = (String) config.get(CONFIG_TYPE); if (configType == null) { configType = "java"; - } else { - config.remove(CONFIG_TYPE); + config.put(CONFIG_TYPE, configType); } - jsonGenerator.writeStringField(CONFIG_TYPE, configType); if (configType.equals("java")) { action.getConfig() .forEach( (name, value) -> { try { - jsonGenerator.writeFieldName(name); - jsonGenerator.writeStartObject(); - jsonGenerator.writeStringField( - "@class", value.getClass().getName()); - jsonGenerator.writeObjectField("value", value); - jsonGenerator.writeEndObject(); + if (CONFIG_TYPE.equals(name)) { + jsonGenerator.writeStringField(name, (String) value); + } else { + jsonGenerator.writeFieldName(name); + jsonGenerator.writeStartObject(); + jsonGenerator.writeStringField( + "@class", value.getClass().getName()); + jsonGenerator.writeObjectField("value", value); + jsonGenerator.writeEndObject(); + } } catch (IOException e) { throw new RuntimeException( "Error writing action: " + name, e); diff --git a/pom.xml b/pom.xml index b8b4225..954cae7 100644 --- a/pom.xml +++ b/pom.xml @@ -41,7 +41,20 @@ under the License. <maven.compiler.target>${target.java.version}</maven.compiler.target> <spotless.version>2.27.1</spotless.version> <spotless.skip>false</spotless.skip> - <flink.version>1.20.3</flink.version> + + <!-- ============================================ --> + <!-- Flink Version Management --> + <!-- ============================================ --> + <!-- Supported Flink versions --> + <flink.1.20.version>1.20.3</flink.1.20.version> + <flink.2.0.version>2.0.1</flink.2.0.version> + <flink.2.1.version>2.1.1</flink.2.1.version> + <flink.2.2.version>2.2.0</flink.2.2.version> + + <!-- Default version used in main modules --> + <flink.version>${flink.2.2.version}</flink.version> + <!-- ============================================ --> + <kafka.version>4.0.0</kafka.version> <junit5.version>5.10.1</junit5.version> <jackson.version>2.18.2</jackson.version> diff --git a/python/pyproject.toml b/python/pyproject.toml index ee282d6..c9c12af 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -42,7 +42,7 @@ classifiers = [ ] dependencies = [ - "apache-flink==1.20.3", + "apache-flink==2.2.0", "pydantic==2.11.4", "docstring-parser==0.16", "pyyaml==6.0.2", diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtil.java b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtil.java index fc3f259..dac2d5d 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtil.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtil.java @@ -22,7 +22,7 @@ import org.apache.flink.agents.api.Event; import org.apache.flink.agents.api.InputEvent; import org.apache.flink.agents.plan.actions.Action; import org.apache.flink.agents.runtime.python.event.PythonEvent; -import org.apache.flink.shaded.guava31.com.google.common.base.Preconditions; +import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/message/MessageTypeInformation.java b/runtime/src/main/java/org/apache/flink/agents/runtime/message/MessageTypeInformation.java deleted file mode 100644 index 4c753f1..0000000 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/message/MessageTypeInformation.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.agents.runtime.message; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; - -/** TypeInformation for {@link Message}. */ -public class MessageTypeInformation extends TypeInformation<Message> { - - private MessageTypeInformation() {} - - public static final MessageTypeInformation INSTANCE = new MessageTypeInformation(); - - @Override - public boolean isBasicType() { - return false; - } - - @Override - public boolean isTupleType() { - return false; - } - - @Override - public int getArity() { - return 0; - } - - @Override - public int getTotalFields() { - return 0; - } - - @Override - public Class<Message> getTypeClass() { - return Message.class; - } - - @Override - public boolean isKeyType() { - return false; - } - - @Override - public TypeSerializer<Message> createSerializer(ExecutionConfig executionConfig) { - return new KryoSerializer<>(Message.class, executionConfig); - } - - @Override - public String toString() { - return "MessageTypeInformation"; - } - - @Override - public boolean equals(Object o) { - return o instanceof MessageTypeInformation; - } - - @Override - public int hashCode() { - return getTypeClass().hashCode(); - } - - @Override - public boolean canEqual(Object o) { - return o instanceof MessageTypeInformation; - } -} diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java index d69808f..544f24b 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java @@ -66,14 +66,16 @@ import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; -import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl; import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor; import org.apache.flink.types.Row; @@ -199,7 +201,6 @@ public class ActionExecutionOperator<IN, OUT> extends AbstractStreamOperator<OUT this.agentPlan = agentPlan; this.inputIsJava = inputIsJava; this.processingTimeService = processingTimeService; - this.chainingStrategy = ChainingStrategy.ALWAYS; this.mailboxExecutor = mailboxExecutor; this.eventLogger = EventLoggerFactory.createLogger(EventLoggerConfig.builder().build()); this.eventListeners = new ArrayList<>(); @@ -208,6 +209,13 @@ public class ActionExecutionOperator<IN, OUT> extends AbstractStreamOperator<OUT this.actionTaskMemoryContexts = new HashMap<>(); } + protected void setup( + StreamTask<?, ?> containingTask, + StreamConfig config, + Output<StreamRecord<OUT>> output) { + super.setup(containingTask, config, output); + } + @Override public void open() throws Exception { super.open(); diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorFactory.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorFactory.java index 1b863ff..e6a0e21 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorFactory.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorFactory.java @@ -20,13 +20,14 @@ package org.apache.flink.agents.runtime.operator; import org.apache.flink.agents.plan.AgentPlan; import org.apache.flink.agents.runtime.actionstate.ActionStateStore; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; /** Operator factory for {@link ActionExecutionOperator}. */ -public class ActionExecutionOperatorFactory<IN, OUT> +public class ActionExecutionOperatorFactory<IN, OUT> extends AbstractStreamOperatorFactory<OUT> implements OneInputStreamOperatorFactory<IN, OUT> { private final AgentPlan agentPlan; @@ -45,6 +46,7 @@ public class ActionExecutionOperatorFactory<IN, OUT> this.agentPlan = agentPlan; this.inputIsJava = inputIsJava; this.actionStateStore = actionStateStore; + this.chainingStrategy = ChainingStrategy.ALWAYS; } @Override diff --git a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/StateUtils.java b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/StateUtils.java index 55a9d70..6266ced 100644 --- a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/StateUtils.java +++ b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/StateUtils.java @@ -21,7 +21,6 @@ package org.apache.flink.agents.runtime.operator; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.runtime.state.StateInitializationContext; -import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; @@ -67,7 +66,9 @@ public class StateUtils { } T value = values.get(0); - state.update(Lists.newArrayList(value)); + List<T> newList = new ArrayList<>(); + newList.add(value); + state.update(newList); return value; } diff --git a/runtime/src/test/java/org/apache/flink/agents/runtime/RescalingTest.java b/runtime/src/test/java/org/apache/flink/agents/runtime/RescalingTest.java index dca3ed2..c3c376f 100644 --- a/runtime/src/test/java/org/apache/flink/agents/runtime/RescalingTest.java +++ b/runtime/src/test/java/org/apache/flink/agents/runtime/RescalingTest.java @@ -51,8 +51,8 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; +import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction; +import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorResource;
