This is an automated email from the ASF dual-hosted git repository.

sxnan pushed a commit to branch release-0.1
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/release-0.1 by this push:
     new 59fede5  [runtime] Move PythonActionExecutor to 
PythonRunnerContextImpl (#301)
59fede5 is described below

commit 59fede5b9ce663ddb63d61717471f4df912e0d52
Author: Xuannan <[email protected]>
AuthorDate: Wed Nov 5 10:55:17 2025 +0800

    [runtime] Move PythonActionExecutor to PythonRunnerContextImpl (#301)
---
 .../e2e_tests/integrate_datastream_with_agent_example.py |  9 ++++++---
 .../agents/runtime/operator/ActionExecutionOperator.java |  5 +++--
 .../runtime/python/context/PythonRunnerContextImpl.java  | 11 ++++++++++-
 .../agents/runtime/python/operator/PythonActionTask.java | 16 +++++++++-------
 .../python/operator/PythonGeneratorActionTask.java       | 13 ++++---------
 5 files changed, 32 insertions(+), 22 deletions(-)

diff --git 
a/python/flink_agents/e2e_tests/integrate_datastream_with_agent_example.py 
b/python/flink_agents/e2e_tests/integrate_datastream_with_agent_example.py
index 5f2e391..b88e96f 100644
--- a/python/flink_agents/e2e_tests/integrate_datastream_with_agent_example.py
+++ b/python/flink_agents/e2e_tests/integrate_datastream_with_agent_example.py
@@ -17,7 +17,7 @@
 
#################################################################################
 from pathlib import Path
 
-from pyflink.common import Duration, WatermarkStrategy
+from pyflink.common import Configuration, Duration, WatermarkStrategy
 from pyflink.datastream import (
     KeySelector,
     RuntimeExecutionMode,
@@ -43,8 +43,11 @@ current_dir = Path(__file__).parent
 # PYTHONPATH like "os.environ["PYTHONPATH"] = ($VENV_HOME/lib/$PYTHON_VERSION/
 # site-packages) in this file.
 if __name__ == "__main__":
-    env = StreamExecutionEnvironment.get_execution_environment()
-
+    config = Configuration()
+    config.set_string("state.backend.type", "rocksdb")
+    config.set_string("checkpointing.interval", "1s")
+    config.set_string("restart-strategy.type", "disable")
+    env = StreamExecutionEnvironment.get_execution_environment(config)
     env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
     env.set_parallelism(1)
 
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index a17299e..95b991b 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -641,7 +641,7 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
             return new JavaActionTask(
                     key, event, action, 
getRuntimeContext().getUserCodeClassLoader());
         } else if (action.getExec() instanceof PythonFunction) {
-            return new PythonActionTask(key, event, action, 
pythonActionExecutor);
+            return new PythonActionTask(key, event, action);
         } else {
             throw new IllegalStateException(
                     "Unsupported action type: " + action.getExec().getClass());
@@ -669,7 +669,8 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
                             new CachedMemoryStore(shortTermMemState),
                             metricGroup,
                             this::checkMailboxThread,
-                            agentPlan);
+                            agentPlan,
+                            pythonActionExecutor);
         } else {
             throw new IllegalStateException(
                     "Unsupported action type: " + 
actionTask.action.getExec().getClass());
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
index 43e3f62..89f741d 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.agents.runtime.context.RunnerContextImpl;
 import org.apache.flink.agents.runtime.memory.CachedMemoryStore;
 import org.apache.flink.agents.runtime.metrics.FlinkAgentsMetricGroupImpl;
 import org.apache.flink.agents.runtime.python.event.PythonEvent;
+import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.concurrent.NotThreadSafe;
@@ -32,12 +33,16 @@ import javax.annotation.concurrent.NotThreadSafe;
 @NotThreadSafe
 public class PythonRunnerContextImpl extends RunnerContextImpl {
 
+    private final PythonActionExecutor pythonActionExecutor;
+
     public PythonRunnerContextImpl(
             CachedMemoryStore store,
             FlinkAgentsMetricGroupImpl agentMetricGroup,
             Runnable mailboxThreadChecker,
-            AgentPlan agentPlan) {
+            AgentPlan agentPlan,
+            PythonActionExecutor pythonActionExecutor) {
         super(store, agentMetricGroup, mailboxThreadChecker, agentPlan);
+        this.pythonActionExecutor = pythonActionExecutor;
     }
 
     @Override
@@ -51,4 +56,8 @@ public class PythonRunnerContextImpl extends 
RunnerContextImpl {
         // this method will be invoked by PythonActionExecutor's python 
interpreter.
         sendEvent(new PythonEvent(event, type));
     }
+
+    public PythonActionExecutor getPythonActionExecutor() {
+        return pythonActionExecutor;
+    }
 }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
index ab3bec8..c3e1f84 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
@@ -21,6 +21,7 @@ import org.apache.flink.agents.api.Event;
 import org.apache.flink.agents.plan.PythonFunction;
 import org.apache.flink.agents.plan.actions.Action;
 import org.apache.flink.agents.runtime.operator.ActionTask;
+import org.apache.flink.agents.runtime.python.context.PythonRunnerContextImpl;
 import org.apache.flink.agents.runtime.python.event.PythonEvent;
 import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor;
 
@@ -34,16 +35,12 @@ import static 
org.apache.flink.util.Preconditions.checkState;
  */
 public class PythonActionTask extends ActionTask {
 
-    protected final PythonActionExecutor pythonActionExecutor;
-
-    public PythonActionTask(
-            Object key, Event event, Action action, PythonActionExecutor 
pythonActionExecutor) {
+    public PythonActionTask(Object key, Event event, Action action) {
         super(key, event, action);
         checkState(action.getExec() instanceof PythonFunction);
         checkState(
                 event instanceof PythonEvent,
                 "Python action only accept python event, but got " + event);
-        this.pythonActionExecutor = pythonActionExecutor;
     }
 
     public ActionTaskResult invoke() throws Exception {
@@ -54,6 +51,7 @@ public class PythonActionTask extends ActionTask {
                 key);
         runnerContext.checkNoPendingEvents();
 
+        PythonActionExecutor pythonActionExecutor = getPythonActionExecutor();
         String pythonGeneratorRef =
                 pythonActionExecutor.executePythonFunction(
                         (PythonFunction) action.getExec(), (PythonEvent) 
event, runnerContext);
@@ -64,12 +62,16 @@ public class PythonActionTask extends ActionTask {
             // The Python action generates a generator. We need to execute it 
once, which will
             // submit an asynchronous task and return whether the action has 
been completed.
             ActionTask tempGeneratedActionTask =
-                    new PythonGeneratorActionTask(
-                            key, event, action, pythonActionExecutor, 
pythonGeneratorRef);
+                    new PythonGeneratorActionTask(key, event, action, 
pythonGeneratorRef);
             tempGeneratedActionTask.setRunnerContext(runnerContext);
             return tempGeneratedActionTask.invoke();
         }
         return new ActionTaskResult(
                 true, runnerContext.drainEvents(event.getSourceTimestamp()), 
null);
     }
+
+    protected PythonActionExecutor getPythonActionExecutor() {
+        checkState(runnerContext != null && runnerContext instanceof 
PythonRunnerContextImpl);
+        return ((PythonRunnerContextImpl) 
runnerContext).getPythonActionExecutor();
+    }
 }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
index 9b33d85..4119fcd 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
@@ -20,30 +20,25 @@ package org.apache.flink.agents.runtime.python.operator;
 import org.apache.flink.agents.api.Event;
 import org.apache.flink.agents.plan.actions.Action;
 import org.apache.flink.agents.runtime.operator.ActionTask;
-import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor;
 
 /** An {@link ActionTask} wrapper a Python Generator to represent a code block 
in Python action. */
 public class PythonGeneratorActionTask extends PythonActionTask {
     private final String pythonGeneratorRef;
 
     public PythonGeneratorActionTask(
-            Object key,
-            Event event,
-            Action action,
-            PythonActionExecutor pythonActionExecutor,
-            String pythonGeneratorRef) {
-        super(key, event, action, pythonActionExecutor);
+            Object key, Event event, Action action, String pythonGeneratorRef) 
{
+        super(key, event, action);
         this.pythonGeneratorRef = pythonGeneratorRef;
     }
 
     @Override
-    public ActionTaskResult invoke() throws Exception {
+    public ActionTaskResult invoke() {
         LOG.debug(
                 "Try execute python generator action {} for event {} with key 
{}.",
                 action.getName(),
                 event,
                 key);
-        boolean finished = 
pythonActionExecutor.callPythonGenerator(pythonGeneratorRef);
+        boolean finished = 
getPythonActionExecutor().callPythonGenerator(pythonGeneratorRef);
         ActionTask generatedActionTask = finished ? null : this;
         return new ActionTaskResult(
                 finished,

Reply via email to