codenohup commented on code in PR #80:
URL: https://github.com/apache/flink-agents/pull/80#discussion_r2241416406
##########
runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java:
##########
@@ -39,62 +37,103 @@ public class PythonActionExecutor {
"from flink_agents.plan import function\n"
+ "from flink_agents.runtime import flink_runner_context\n"
+ "from flink_agents.runtime import python_java_utils";
+
+ // =========== RUNNER CONTEXT ===========
private static final String CREATE_FLINK_RUNNER_CONTEXT =
"flink_runner_context.create_flink_runner_context";
+ private static final String FLINK_RUNNER_CONTEXT_VAR_NAME_PREFIX =
"flink_runner_context_";
+ private static final AtomicLong FLINK_RUNNER_CONTEXT_VAR_ID = new
AtomicLong(0);
+
+ // ========== ASYNC THREAD POOL ===========
+ private static final String CREATE_ASYNC_THREAD_POOL =
+ "flink_runner_context.create_async_thread_pool";
+ private static final String PYTHON_ASYNC_THREAD_POOL_VAR_NAME_PREFIX =
+ "python_async_thread_pool";
+ private static final AtomicLong PYTHON_ASYNC_THREAD_POOL_VAR_ID = new
AtomicLong(0);
+
+ // =========== PYTHON GENERATOR ===========
+ private static final String CALL_PYTHON_GENERATOR =
"function.call_python_generator";
+ private static final String PYTHON_GENERATOR_VAR_NAME_PREFIX =
"python_generator_";
+ private static final AtomicLong PYTHON_GENERATOR_VAR_ID = new
AtomicLong(0);
+
+ // =========== PYTHON AND JAVA OBJECT CONVERT ===========
private static final String CONVERT_TO_PYTHON_OBJECT =
"python_java_utils.convert_to_python_object";
private static final String WRAP_TO_INPUT_EVENT =
"python_java_utils.wrap_to_input_event";
private static final String GET_OUTPUT_FROM_OUTPUT_EVENT =
"python_java_utils.get_output_from_output_event";
- private static final String FLINK_RUNNER_CONTEXT_VAR_NAME =
"flink_runner_context";
private final PythonEnvironmentManager environmentManager;
private final String agentPlanJson;
- private PythonRunnerContextImpl runnerContext;
-
private PythonInterpreter interpreter;
+ private String pythonAsyncThreadPoolObjectName;
public PythonActionExecutor(PythonEnvironmentManager environmentManager,
String agentPlanJson) {
this.environmentManager = environmentManager;
this.agentPlanJson = agentPlanJson;
}
- public void open(
- MapState<String, MemoryObjectImpl.MemoryItem> shortTermMemState,
- Runnable mailboxThreadChecker)
- throws Exception {
+ public void open() throws Exception {
environmentManager.open();
EmbeddedPythonEnvironment env = environmentManager.createEnvironment();
interpreter = env.getInterpreter();
interpreter.exec(PYTHON_IMPORTS);
- runnerContext = new PythonRunnerContextImpl(shortTermMemState,
mailboxThreadChecker);
-
- // TODO: remove the set and get runner context after updating pemja to
version 0.5.3
- Object pythonRunnerContextObject =
- interpreter.invoke(CREATE_FLINK_RUNNER_CONTEXT, runnerContext,
agentPlanJson);
- interpreter.set(FLINK_RUNNER_CONTEXT_VAR_NAME,
pythonRunnerContextObject);
+ // TODO: remove the set and get thread pool after updating pemja to
version 0.5.3
+ Object pythonAsyncThreadPool =
interpreter.invoke(CREATE_ASYNC_THREAD_POOL);
+ this.pythonAsyncThreadPoolObjectName =
+ PYTHON_ASYNC_THREAD_POOL_VAR_NAME_PREFIX
+ + PYTHON_ASYNC_THREAD_POOL_VAR_ID.incrementAndGet();
+ interpreter.set(pythonAsyncThreadPoolObjectName,
pythonAsyncThreadPool);
Review Comment:
> 1. What does this todo mean?
Due to a bug in Pemja that may prevent proper cleanup of Python objects, we
are using get and set methods as a temporary workaround. For detailed context,
please refer to: #83 .
> 2. Is it necessary to have separate thread pools for each operator?
Since each operator has its own Python interpreter, and the thread pool is
created and managed within the interpreter, it is necessary to allocate a
separate thread pool for each operator.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]