This is an automated email from the ASF dual-hosted git repository.
sxnan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new a34f12e [runtime] Implement hashCode and equals for ActionTask. (#365)
a34f12e is described below
commit a34f12e350df3f88cb2b421812bb2e7982554b89
Author: Wenjin Xie <[email protected]>
AuthorDate: Tue Dec 2 15:45:23 2025 +0800
[runtime] Implement hashCode and equals for ActionTask. (#365)
---
.../main/java/org/apache/flink/agents/api/Event.java | 15 +++++++++++++++
.../org/apache/flink/agents/plan/PythonFunction.java | 16 ++++++++++++++++
.../org/apache/flink/agents/plan/actions/Action.java | 16 ++++++++++++++++
.../flink_agents/e2e_tests/flink_intergration_test.py | 6 +++---
.../flink/agents/runtime/operator/ActionTask.java | 16 ++++++++++++++++
.../flink/agents/runtime/python/event/PythonEvent.java | 18 ++++++++++++++++++
6 files changed, 84 insertions(+), 3 deletions(-)
diff --git a/api/src/main/java/org/apache/flink/agents/api/Event.java
b/api/src/main/java/org/apache/flink/agents/api/Event.java
index e0e77e5..868447a 100644
--- a/api/src/main/java/org/apache/flink/agents/api/Event.java
+++ b/api/src/main/java/org/apache/flink/agents/api/Event.java
@@ -22,6 +22,7 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
/** Base class for all event types in the system. */
@@ -68,4 +69,18 @@ public abstract class Event {
public void setSourceTimestamp(long timestamp) {
this.sourceTimestamp = timestamp;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Event other = (Event) o;
+ return Objects.equals(this.id, other.id)
+ && Objects.equals(this.attributes, other.attributes);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, attributes);
+ }
}
diff --git
a/plan/src/main/java/org/apache/flink/agents/plan/PythonFunction.java
b/plan/src/main/java/org/apache/flink/agents/plan/PythonFunction.java
index da112b0..b5beabd 100644
--- a/plan/src/main/java/org/apache/flink/agents/plan/PythonFunction.java
+++ b/plan/src/main/java/org/apache/flink/agents/plan/PythonFunction.java
@@ -19,6 +19,8 @@ package org.apache.flink.agents.plan;
import pemja.core.PythonInterpreter;
+import java.util.Objects;
+
/** Represent a Python function. */
public class PythonFunction implements Function {
private static final String CALL_PYTHON_FUNCTION =
"function.call_python_function";
@@ -57,4 +59,18 @@ public class PythonFunction implements Function {
public String getQualName() {
return qualName;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ PythonFunction other = (PythonFunction) o;
+ return Objects.equals(this.module, other.module)
+ && Objects.equals(this.qualName, other.qualName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(module, qualName);
+ }
}
diff --git
a/plan/src/main/java/org/apache/flink/agents/plan/actions/Action.java
b/plan/src/main/java/org/apache/flink/agents/plan/actions/Action.java
index a9a4cfa..771e278 100644
--- a/plan/src/main/java/org/apache/flink/agents/plan/actions/Action.java
+++ b/plan/src/main/java/org/apache/flink/agents/plan/actions/Action.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
/**
* Representation of an agent action with event listening and function
execution.
@@ -80,4 +81,19 @@ public class Action {
public Map<String, Object> getConfig() {
return config;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Action other = (Action) o;
+ return name.equals(other.name)
+ && exec.equals(other.exec)
+ && listenEventTypes.equals(other.listenEventTypes);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, exec, listenEventTypes);
+ }
}
diff --git a/python/flink_agents/e2e_tests/flink_intergration_test.py
b/python/flink_agents/e2e_tests/flink_intergration_test.py
index 4f537ed..6647cfb 100644
--- a/python/flink_agents/e2e_tests/flink_intergration_test.py
+++ b/python/flink_agents/e2e_tests/flink_intergration_test.py
@@ -49,9 +49,9 @@ os.environ["PYTHONPATH"] = sysconfig.get_paths()["purelib"]
def test_from_datastream_to_datastream(tmp_path: Path) -> None: # noqa: D103
config = Configuration()
- # config.set_string("state.backend.type", "rocksdb")
- # config.set_string("checkpointing.interval", "1s")
- # config.set_string("restart-strategy.type", "disable")
+ config.set_string("state.backend.type", "rocksdb")
+ config.set_string("checkpointing.interval", "1s")
+ config.set_string("restart-strategy.type", "disable")
env = StreamExecutionEnvironment.get_execution_environment(config)
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_parallelism(1)
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionTask.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionTask.java
index 5dff8a5..d8a30b3 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionTask.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionTask.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
/**
@@ -70,6 +71,21 @@ public abstract class ActionTask {
return key;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ActionTask other = (ActionTask) o;
+ return Objects.equals(this.key, other.key)
+ && Objects.equals(this.event, other.event)
+ && Objects.equals(this.action, other.action);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, event, action);
+ }
+
/** Invokes the action task. */
public abstract ActionTaskResult invoke() throws Exception;
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
index a5692a2..1a99e97 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/event/PythonEvent.java
@@ -23,7 +23,9 @@ import org.apache.flink.agents.api.Event;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Arrays;
import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
/** An event generated by the framework, passing a Python event to the Java
agent runner. */
@@ -55,4 +57,20 @@ public class PythonEvent extends Event {
public String getEventType() {
return eventType;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) {
+ return false;
+ }
+ PythonEvent other = (PythonEvent) o;
+ return Arrays.equals(event, other.event) &&
Objects.equals(this.eventType, other.eventType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), Arrays.hashCode(event),
eventType);
+ }
}