This is an automated email from the ASF dual-hosted git repository.

sxnan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new a34f12e  [runtime] Implement hashCode and equals for ActionTask. (#365)
a34f12e is described below

commit a34f12e350df3f88cb2b421812bb2e7982554b89
Author: Wenjin Xie <[email protected]>
AuthorDate: Tue Dec 2 15:45:23 2025 +0800

    [runtime] Implement hashCode and equals for ActionTask. (#365)
---
 .../main/java/org/apache/flink/agents/api/Event.java   | 15 +++++++++++++++
 .../org/apache/flink/agents/plan/PythonFunction.java   | 16 ++++++++++++++++
 .../org/apache/flink/agents/plan/actions/Action.java   | 16 ++++++++++++++++
 .../flink_agents/e2e_tests/flink_intergration_test.py  |  6 +++---
 .../flink/agents/runtime/operator/ActionTask.java      | 16 ++++++++++++++++
 .../flink/agents/runtime/python/event/PythonEvent.java | 18 ++++++++++++++++++
 6 files changed, 84 insertions(+), 3 deletions(-)

diff --git a/api/src/main/java/org/apache/flink/agents/api/Event.java 
b/api/src/main/java/org/apache/flink/agents/api/Event.java
index e0e77e5..868447a 100644
--- a/api/src/main/java/org/apache/flink/agents/api/Event.java
+++ b/api/src/main/java/org/apache/flink/agents/api/Event.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 
 /** Base class for all event types in the system. */
@@ -68,4 +69,18 @@ public abstract class Event {
     public void setSourceTimestamp(long timestamp) {
         this.sourceTimestamp = timestamp;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        Event other = (Event) o;
+        return Objects.equals(this.id, other.id)
+                && Objects.equals(this.attributes, other.attributes);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id, attributes);
+    }
 }
diff --git 
a/plan/src/main/java/org/apache/flink/agents/plan/PythonFunction.java 
b/plan/src/main/java/org/apache/flink/agents/plan/PythonFunction.java
index da112b0..b5beabd 100644
--- a/plan/src/main/java/org/apache/flink/agents/plan/PythonFunction.java
+++ b/plan/src/main/java/org/apache/flink/agents/plan/PythonFunction.java
@@ -19,6 +19,8 @@ package org.apache.flink.agents.plan;
 
 import pemja.core.PythonInterpreter;
 
+import java.util.Objects;
+
 /** Represent a Python function. */
 public class PythonFunction implements Function {
     private static final String CALL_PYTHON_FUNCTION = 
"function.call_python_function";
@@ -57,4 +59,18 @@ public class PythonFunction implements Function {
     public String getQualName() {
         return qualName;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        PythonFunction other = (PythonFunction) o;
+        return Objects.equals(this.module, other.module)
+                && Objects.equals(this.qualName, other.qualName);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(module, qualName);
+    }
 }
diff --git 
a/plan/src/main/java/org/apache/flink/agents/plan/actions/Action.java 
b/plan/src/main/java/org/apache/flink/agents/plan/actions/Action.java
index a9a4cfa..771e278 100644
--- a/plan/src/main/java/org/apache/flink/agents/plan/actions/Action.java
+++ b/plan/src/main/java/org/apache/flink/agents/plan/actions/Action.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * Representation of an agent action with event listening and function 
execution.
@@ -80,4 +81,19 @@ public class Action {
     public Map<String, Object> getConfig() {
         return config;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        Action other = (Action) o;
+        return name.equals(other.name)
+                && exec.equals(other.exec)
+                && listenEventTypes.equals(other.listenEventTypes);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, exec, listenEventTypes);
+    }
 }
diff --git a/python/flink_agents/e2e_tests/flink_intergration_test.py 
b/python/flink_agents/e2e_tests/flink_intergration_test.py
index 4f537ed..6647cfb 100644
--- a/python/flink_agents/e2e_tests/flink_intergration_test.py
+++ b/python/flink_agents/e2e_tests/flink_intergration_test.py
@@ -49,9 +49,9 @@ os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"]
 
 def test_from_datastream_to_datastream(tmp_path: Path) -> None:  # noqa: D103
     config = Configuration()
-    # config.set_string("state.backend.type", "rocksdb")
-    # config.set_string("checkpointing.interval", "1s")
-    # config.set_string("restart-strategy.type", "disable")
+    config.set_string("state.backend.type", "rocksdb")
+    config.set_string("checkpointing.interval", "1s")
+    config.set_string("restart-strategy.type", "disable")
     env = StreamExecutionEnvironment.get_execution_environment(config)
     env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
     env.set_parallelism(1)
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionTask.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionTask.java
index 5dff8a5..d8a30b3 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionTask.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionTask.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 
 /**
@@ -70,6 +71,21 @@ public abstract class ActionTask {
         return key;
     }
 
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ActionTask other = (ActionTask) o;
+        return Objects.equals(this.key, other.key)
+                && Objects.equals(this.event, other.event)
+                && Objects.equals(this.action, other.action);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(key, event, action);
+    }
+
     /** Invokes the action task. */
     public abstract ActionTaskResult invoke() throws Exception;
 
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
index a5692a2..1a99e97 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
@@ -23,7 +23,9 @@ import org.apache.flink.agents.api.Event;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.util.Arrays;
 import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 
 /** An event generated by the framework, passing a Python event to the Java 
agent runner. */
@@ -55,4 +57,20 @@ public class PythonEvent extends Event {
     public String getEventType() {
         return eventType;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        if (!super.equals(o)) {
+            return false;
+        }
+        PythonEvent other = (PythonEvent) o;
+        return Arrays.equals(event, other.event) && 
Objects.equals(this.eventType, other.eventType);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), Arrays.hashCode(event), 
eventType);
+    }
 }

Reply via email to