This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git

commit 6a9ca8ca6a272a4b10dcb424e614ab03705c6947
Author: youjin <[email protected]>
AuthorDate: Thu Dec 25 17:21:56 2025 +0800

    [build] Update default flink to 2.2.0
---
 .../flink/agents/examples/ReActAgentExample.java   |  4 +-
 .../examples/WorkflowMultipleAgentExample.java     |  4 +-
 .../plan/serializer/ActionJsonSerializer.java      | 20 ++---
 pom.xml                                            | 15 +++-
 python/pyproject.toml                              |  2 +-
 .../runtime/actionstate/ActionStateUtil.java       |  2 +-
 .../runtime/message/MessageTypeInformation.java    | 86 ----------------------
 .../runtime/operator/ActionExecutionOperator.java  | 12 ++-
 .../operator/ActionExecutionOperatorFactory.java   |  4 +-
 .../flink/agents/runtime/operator/StateUtils.java  |  5 +-
 .../apache/flink/agents/runtime/RescalingTest.java |  4 +-
 11 files changed, 49 insertions(+), 109 deletions(-)

diff --git 
a/examples/src/main/java/org/apache/flink/agents/examples/ReActAgentExample.java
 
b/examples/src/main/java/org/apache/flink/agents/examples/ReActAgentExample.java
index e2d6a3c..23af1bb 100644
--- 
a/examples/src/main/java/org/apache/flink/agents/examples/ReActAgentExample.java
+++ 
b/examples/src/main/java/org/apache/flink/agents/examples/ReActAgentExample.java
@@ -29,7 +29,7 @@ import 
org.apache.flink.agents.examples.agents.CustomTypesAndResources;
 import 
org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelSetup;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.connector.file.src.FileSource;
-import org.apache.flink.connector.file.src.reader.TextLineFormat;
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -105,7 +105,7 @@ public class ReActAgentExample {
         DataStream<Row> productReviewStream =
                 env.fromSource(
                                 FileSource.forRecordStreamFormat(
-                                                new TextLineFormat(),
+                                                new TextLineInputFormat(),
                                                 new 
Path(inputDataFile.getAbsolutePath()))
                                         
.monitorContinuously(Duration.ofMinutes(1))
                                         .build(),
diff --git 
a/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java
 
b/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java
index 8703e9e..fe91c58 100644
--- 
a/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java
+++ 
b/examples/src/main/java/org/apache/flink/agents/examples/WorkflowMultipleAgentExample.java
@@ -37,13 +37,13 @@ import 
org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.util.Collector;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 
 import static 
org.apache.flink.agents.examples.WorkflowSingleAgentExample.copyResource;
 import static 
org.apache.flink.agents.examples.agents.CustomTypesAndResources.ProductReviewAnalysisRes;
 import static 
org.apache.flink.agents.examples.agents.CustomTypesAndResources.ProductReviewSummary;
-import static org.apache.flink.streaming.api.windowing.time.Time.minutes;
 
 /**
  * Java example demonstrating multiple workflow agents for product improvement 
suggestion.
@@ -164,7 +164,7 @@ public class WorkflowMultipleAgentExample {
                 reviewAnalysisResStream
                         .map(element -> (ProductReviewAnalysisRes) element)
                         .keyBy(ProductReviewAnalysisRes::getId)
-                        .window(TumblingProcessingTimeWindows.of(minutes(1)))
+                        
.window(TumblingProcessingTimeWindows.of(Duration.ofMinutes(1)))
                         .process(new 
AggregateScoreDistributionAndDislikeReasons());
 
         // Use the ProductSuggestionAgent (LLM) to generate product improvement
diff --git 
a/plan/src/main/java/org/apache/flink/agents/plan/serializer/ActionJsonSerializer.java
 
b/plan/src/main/java/org/apache/flink/agents/plan/serializer/ActionJsonSerializer.java
index 837be92..77581da 100644
--- 
a/plan/src/main/java/org/apache/flink/agents/plan/serializer/ActionJsonSerializer.java
+++ 
b/plan/src/main/java/org/apache/flink/agents/plan/serializer/ActionJsonSerializer.java
@@ -80,21 +80,23 @@ public class ActionJsonSerializer extends 
StdSerializer<Action> {
             String configType = (String) config.get(CONFIG_TYPE);
             if (configType == null) {
                 configType = "java";
-            } else {
-                config.remove(CONFIG_TYPE);
+                config.put(CONFIG_TYPE, configType);
             }
-            jsonGenerator.writeStringField(CONFIG_TYPE, configType);
             if (configType.equals("java")) {
                 action.getConfig()
                         .forEach(
                                 (name, value) -> {
                                     try {
-                                        jsonGenerator.writeFieldName(name);
-                                        jsonGenerator.writeStartObject();
-                                        jsonGenerator.writeStringField(
-                                                "@class", 
value.getClass().getName());
-                                        
jsonGenerator.writeObjectField("value", value);
-                                        jsonGenerator.writeEndObject();
+                                        if (CONFIG_TYPE.equals(name)) {
+                                            
jsonGenerator.writeStringField(name, (String) value);
+                                        } else {
+                                            jsonGenerator.writeFieldName(name);
+                                            jsonGenerator.writeStartObject();
+                                            jsonGenerator.writeStringField(
+                                                    "@class", 
value.getClass().getName());
+                                            
jsonGenerator.writeObjectField("value", value);
+                                            jsonGenerator.writeEndObject();
+                                        }
                                     } catch (IOException e) {
                                         throw new RuntimeException(
                                                 "Error writing action: " + 
name, e);
diff --git a/pom.xml b/pom.xml
index b8b4225..954cae7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -41,7 +41,20 @@ under the License.
         <maven.compiler.target>${target.java.version}</maven.compiler.target>
         <spotless.version>2.27.1</spotless.version>
         <spotless.skip>false</spotless.skip>
-        <flink.version>1.20.3</flink.version>
+
+        <!-- ============================================ -->
+        <!-- Flink Version Management                     -->
+        <!-- ============================================ -->
+        <!-- Supported Flink versions -->
+        <flink.1.20.version>1.20.3</flink.1.20.version>
+        <flink.2.0.version>2.0.1</flink.2.0.version>
+        <flink.2.1.version>2.1.1</flink.2.1.version>
+        <flink.2.2.version>2.2.0</flink.2.2.version>
+
+        <!-- Default version used in main modules -->
+        <flink.version>${flink.2.2.version}</flink.version>
+        <!-- ============================================ -->
+
         <kafka.version>4.0.0</kafka.version>
         <junit5.version>5.10.1</junit5.version>
         <jackson.version>2.18.2</jackson.version>
diff --git a/python/pyproject.toml b/python/pyproject.toml
index ee282d6..c9c12af 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -42,7 +42,7 @@ classifiers = [
 ]
 
 dependencies = [
-    "apache-flink==1.20.3",
+    "apache-flink==2.2.0",
     "pydantic==2.11.4",
     "docstring-parser==0.16",
     "pyyaml==6.0.2",
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtil.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtil.java
index fc3f259..dac2d5d 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtil.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/actionstate/ActionStateUtil.java
@@ -22,7 +22,7 @@ import org.apache.flink.agents.api.Event;
 import org.apache.flink.agents.api.InputEvent;
 import org.apache.flink.agents.plan.actions.Action;
 import org.apache.flink.agents.runtime.python.event.PythonEvent;
-import org.apache.flink.shaded.guava31.com.google.common.base.Preconditions;
+import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
 
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/message/MessageTypeInformation.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/message/MessageTypeInformation.java
deleted file mode 100644
index 4c753f1..0000000
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/message/MessageTypeInformation.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.agents.runtime.message;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-
-/** TypeInformation for {@link Message}. */
-public class MessageTypeInformation extends TypeInformation<Message> {
-
-    private MessageTypeInformation() {}
-
-    public static final MessageTypeInformation INSTANCE = new 
MessageTypeInformation();
-
-    @Override
-    public boolean isBasicType() {
-        return false;
-    }
-
-    @Override
-    public boolean isTupleType() {
-        return false;
-    }
-
-    @Override
-    public int getArity() {
-        return 0;
-    }
-
-    @Override
-    public int getTotalFields() {
-        return 0;
-    }
-
-    @Override
-    public Class<Message> getTypeClass() {
-        return Message.class;
-    }
-
-    @Override
-    public boolean isKeyType() {
-        return false;
-    }
-
-    @Override
-    public TypeSerializer<Message> createSerializer(ExecutionConfig 
executionConfig) {
-        return new KryoSerializer<>(Message.class, executionConfig);
-    }
-
-    @Override
-    public String toString() {
-        return "MessageTypeInformation";
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        return o instanceof MessageTypeInformation;
-    }
-
-    @Override
-    public int hashCode() {
-        return getTypeClass().hashCode();
-    }
-
-    @Override
-    public boolean canEqual(Object o) {
-        return o instanceof MessageTypeInformation;
-    }
-}
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index d69808f..544f24b 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -66,14 +66,16 @@ import 
org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
 import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
 import org.apache.flink.types.Row;
@@ -199,7 +201,6 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
         this.agentPlan = agentPlan;
         this.inputIsJava = inputIsJava;
         this.processingTimeService = processingTimeService;
-        this.chainingStrategy = ChainingStrategy.ALWAYS;
         this.mailboxExecutor = mailboxExecutor;
         this.eventLogger = 
EventLoggerFactory.createLogger(EventLoggerConfig.builder().build());
         this.eventListeners = new ArrayList<>();
@@ -208,6 +209,13 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
         this.actionTaskMemoryContexts = new HashMap<>();
     }
 
+    protected void setup(
+            StreamTask<?, ?> containingTask,
+            StreamConfig config,
+            Output<StreamRecord<OUT>> output) {
+        super.setup(containingTask, config, output);
+    }
+
     @Override
     public void open() throws Exception {
         super.open();
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorFactory.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorFactory.java
index 1b863ff..e6a0e21 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorFactory.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperatorFactory.java
@@ -20,13 +20,14 @@ package org.apache.flink.agents.runtime.operator;
 import org.apache.flink.agents.plan.AgentPlan;
 import org.apache.flink.agents.runtime.actionstate.ActionStateStore;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 
 /** Operator factory for {@link ActionExecutionOperator}. */
-public class ActionExecutionOperatorFactory<IN, OUT>
+public class ActionExecutionOperatorFactory<IN, OUT> extends 
AbstractStreamOperatorFactory<OUT>
         implements OneInputStreamOperatorFactory<IN, OUT> {
 
     private final AgentPlan agentPlan;
@@ -45,6 +46,7 @@ public class ActionExecutionOperatorFactory<IN, OUT>
         this.agentPlan = agentPlan;
         this.inputIsJava = inputIsJava;
         this.actionStateStore = actionStateStore;
+        this.chainingStrategy = ChainingStrategy.ALWAYS;
     }
 
     @Override
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/StateUtils.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/StateUtils.java
index 55a9d70..6266ced 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/StateUtils.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/StateUtils.java
@@ -21,7 +21,6 @@ package org.apache.flink.agents.runtime.operator;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
@@ -67,7 +66,9 @@ public class StateUtils {
         }
 
         T value = values.get(0);
-        state.update(Lists.newArrayList(value));
+        List<T> newList = new ArrayList<>();
+        newList.add(value);
+        state.update(newList);
 
         return value;
     }
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/RescalingTest.java 
b/runtime/src/test/java/org/apache/flink/agents/runtime/RescalingTest.java
index dca3ed2..c3c376f 100644
--- a/runtime/src/test/java/org/apache/flink/agents/runtime/RescalingTest.java
+++ b/runtime/src/test/java/org/apache/flink/agents/runtime/RescalingTest.java
@@ -51,8 +51,8 @@ import 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
+import 
org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorResource;

Reply via email to