This is an automated email from the ASF dual-hosted git repository.
sxnan pushed a commit to branch release-0.1
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/release-0.1 by this push:
new 59fede5 [runtime] Move PythonActionExecutor to
PythonRunnerContextImpl (#301)
59fede5 is described below
commit 59fede5b9ce663ddb63d61717471f4df912e0d52
Author: Xuannan <[email protected]>
AuthorDate: Wed Nov 5 10:55:17 2025 +0800
[runtime] Move PythonActionExecutor to PythonRunnerContextImpl (#301)
---
.../e2e_tests/integrate_datastream_with_agent_example.py | 9 ++++++---
.../agents/runtime/operator/ActionExecutionOperator.java | 5 +++--
.../runtime/python/context/PythonRunnerContextImpl.java | 11 ++++++++++-
.../agents/runtime/python/operator/PythonActionTask.java | 16 +++++++++-------
.../python/operator/PythonGeneratorActionTask.java | 13 ++++---------
5 files changed, 32 insertions(+), 22 deletions(-)
diff --git
a/python/flink_agents/e2e_tests/integrate_datastream_with_agent_example.py
b/python/flink_agents/e2e_tests/integrate_datastream_with_agent_example.py
index 5f2e391..b88e96f 100644
--- a/python/flink_agents/e2e_tests/integrate_datastream_with_agent_example.py
+++ b/python/flink_agents/e2e_tests/integrate_datastream_with_agent_example.py
@@ -17,7 +17,7 @@
#################################################################################
from pathlib import Path
-from pyflink.common import Duration, WatermarkStrategy
+from pyflink.common import Configuration, Duration, WatermarkStrategy
from pyflink.datastream import (
KeySelector,
RuntimeExecutionMode,
@@ -43,8 +43,11 @@ current_dir = Path(__file__).parent
# PYTHONPATH like "os.environ["PYTHONPATH"] = ($VENV_HOME/lib/$PYTHON_VERSION/
# site-packages) in this file.
if __name__ == "__main__":
- env = StreamExecutionEnvironment.get_execution_environment()
-
+ config = Configuration()
+ config.set_string("state.backend.type", "rocksdb")
+ config.set_string("checkpointing.interval", "1s")
+ config.set_string("restart-strategy.type", "disable")
+ env = StreamExecutionEnvironment.get_execution_environment(config)
env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
env.set_parallelism(1)
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index a17299e..95b991b 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -641,7 +641,7 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
return new JavaActionTask(
key, event, action,
getRuntimeContext().getUserCodeClassLoader());
} else if (action.getExec() instanceof PythonFunction) {
- return new PythonActionTask(key, event, action,
pythonActionExecutor);
+ return new PythonActionTask(key, event, action);
} else {
throw new IllegalStateException(
"Unsupported action type: " + action.getExec().getClass());
@@ -669,7 +669,8 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
new CachedMemoryStore(shortTermMemState),
metricGroup,
this::checkMailboxThread,
- agentPlan);
+ agentPlan,
+ pythonActionExecutor);
} else {
throw new IllegalStateException(
"Unsupported action type: " +
actionTask.action.getExec().getClass());
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
index 43e3f62..89f741d 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
@@ -24,6 +24,7 @@ import
org.apache.flink.agents.runtime.context.RunnerContextImpl;
import org.apache.flink.agents.runtime.memory.CachedMemoryStore;
import org.apache.flink.agents.runtime.metrics.FlinkAgentsMetricGroupImpl;
import org.apache.flink.agents.runtime.python.event.PythonEvent;
+import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor;
import org.apache.flink.util.Preconditions;
import javax.annotation.concurrent.NotThreadSafe;
@@ -32,12 +33,16 @@ import javax.annotation.concurrent.NotThreadSafe;
@NotThreadSafe
public class PythonRunnerContextImpl extends RunnerContextImpl {
+ private final PythonActionExecutor pythonActionExecutor;
+
public PythonRunnerContextImpl(
CachedMemoryStore store,
FlinkAgentsMetricGroupImpl agentMetricGroup,
Runnable mailboxThreadChecker,
- AgentPlan agentPlan) {
+ AgentPlan agentPlan,
+ PythonActionExecutor pythonActionExecutor) {
super(store, agentMetricGroup, mailboxThreadChecker, agentPlan);
+ this.pythonActionExecutor = pythonActionExecutor;
}
@Override
@@ -51,4 +56,8 @@ public class PythonRunnerContextImpl extends
RunnerContextImpl {
// this method will be invoked by PythonActionExecutor's python
interpreter.
sendEvent(new PythonEvent(event, type));
}
+
+ public PythonActionExecutor getPythonActionExecutor() {
+ return pythonActionExecutor;
+ }
}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
index ab3bec8..c3e1f84 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
@@ -21,6 +21,7 @@ import org.apache.flink.agents.api.Event;
import org.apache.flink.agents.plan.PythonFunction;
import org.apache.flink.agents.plan.actions.Action;
import org.apache.flink.agents.runtime.operator.ActionTask;
+import org.apache.flink.agents.runtime.python.context.PythonRunnerContextImpl;
import org.apache.flink.agents.runtime.python.event.PythonEvent;
import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor;
@@ -34,16 +35,12 @@ import static
org.apache.flink.util.Preconditions.checkState;
*/
public class PythonActionTask extends ActionTask {
- protected final PythonActionExecutor pythonActionExecutor;
-
- public PythonActionTask(
- Object key, Event event, Action action, PythonActionExecutor
pythonActionExecutor) {
+ public PythonActionTask(Object key, Event event, Action action) {
super(key, event, action);
checkState(action.getExec() instanceof PythonFunction);
checkState(
event instanceof PythonEvent,
"Python action only accept python event, but got " + event);
- this.pythonActionExecutor = pythonActionExecutor;
}
public ActionTaskResult invoke() throws Exception {
@@ -54,6 +51,7 @@ public class PythonActionTask extends ActionTask {
key);
runnerContext.checkNoPendingEvents();
+ PythonActionExecutor pythonActionExecutor = getPythonActionExecutor();
String pythonGeneratorRef =
pythonActionExecutor.executePythonFunction(
(PythonFunction) action.getExec(), (PythonEvent)
event, runnerContext);
@@ -64,12 +62,16 @@ public class PythonActionTask extends ActionTask {
// The Python action generates a generator. We need to execute it
once, which will
// submit an asynchronous task and return whether the action has
been completed.
ActionTask tempGeneratedActionTask =
- new PythonGeneratorActionTask(
- key, event, action, pythonActionExecutor,
pythonGeneratorRef);
+ new PythonGeneratorActionTask(key, event, action,
pythonGeneratorRef);
tempGeneratedActionTask.setRunnerContext(runnerContext);
return tempGeneratedActionTask.invoke();
}
return new ActionTaskResult(
true, runnerContext.drainEvents(event.getSourceTimestamp()),
null);
}
+
+ protected PythonActionExecutor getPythonActionExecutor() {
+ checkState(runnerContext != null && runnerContext instanceof
PythonRunnerContextImpl);
+ return ((PythonRunnerContextImpl)
runnerContext).getPythonActionExecutor();
+ }
}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
index 9b33d85..4119fcd 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
@@ -20,30 +20,25 @@ package org.apache.flink.agents.runtime.python.operator;
import org.apache.flink.agents.api.Event;
import org.apache.flink.agents.plan.actions.Action;
import org.apache.flink.agents.runtime.operator.ActionTask;
-import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor;
/** An {@link ActionTask} wrapper a Python Generator to represent a code block
in Python action. */
public class PythonGeneratorActionTask extends PythonActionTask {
private final String pythonGeneratorRef;
public PythonGeneratorActionTask(
- Object key,
- Event event,
- Action action,
- PythonActionExecutor pythonActionExecutor,
- String pythonGeneratorRef) {
- super(key, event, action, pythonActionExecutor);
+ Object key, Event event, Action action, String pythonGeneratorRef)
{
+ super(key, event, action);
this.pythonGeneratorRef = pythonGeneratorRef;
}
@Override
- public ActionTaskResult invoke() throws Exception {
+ public ActionTaskResult invoke() {
LOG.debug(
"Try execute python generator action {} for event {} with key
{}.",
action.getName(),
event,
key);
- boolean finished =
pythonActionExecutor.callPythonGenerator(pythonGeneratorRef);
+ boolean finished =
getPythonActionExecutor().callPythonGenerator(pythonGeneratorRef);
ActionTask generatedActionTask = finished ? null : this;
return new ActionTaskResult(
finished,