This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 8ab9354 [Feature] Improve Python Async Execution API (#409)
8ab9354 is described below
commit 8ab9354556ed1c043e205dc32651d1297d8134e6
Author: Xuannan <[email protected]>
AuthorDate: Wed Jan 7 16:27:31 2026 +0800
[Feature] Improve Python Async Execution API (#409)
---
docs/content/docs/development/workflow_agent.md | 37 +++++++++++-
python/flink_agents/api/runner_context.py | 65 ++++++++++++++++++++--
.../flink_integration_agent.py | 4 +-
.../e2e_tests/long_term_memory_test.py | 8 +--
python/flink_agents/plan/function.py | 59 ++++++++------------
python/flink_agents/plan/tests/test_function.py | 6 +-
.../flink_agents/runtime/flink_runner_context.py | 13 +----
python/flink_agents/runtime/local_runner.py | 41 +++++++++++---
.../tests/test_local_execution_environment.py | 4 +-
.../runtime/python/operator/PythonActionTask.java | 12 ++--
.../python/operator/PythonGeneratorActionTask.java | 12 ++--
.../runtime/python/utils/PythonActionExecutor.java | 50 ++++++++---------
12 files changed, 202 insertions(+), 109 deletions(-)
diff --git a/docs/content/docs/development/workflow_agent.md
b/docs/content/docs/development/workflow_agent.md
index 346f80d..76431d0 100644
--- a/docs/content/docs/development/workflow_agent.md
+++ b/docs/content/docs/development/workflow_agent.md
@@ -219,7 +219,7 @@ An action is a piece of code that can be executed. Each
action listens to at lea
To declare an action in Agent, user can use `@action` to decorate a function
of Agent class in python (or annotate a method of Agent class in java), and
declare the listened event types as decorator/annotation parameters.
-The decorated/annotated function signature should be `(Event, RunnerContext)
-> None`
+The decorated/annotated function signature should be `(Event, RunnerContext)
-> None`. In Python, actions can also be defined as `async def` when using
async execution (see [Async Execution](#async-execution)).
{{< tabs "Action Function" >}}
@@ -277,6 +277,41 @@ public static void processInput(InputEvent event,
RunnerContext ctx) throws Exce
{{< /tabs >}}
+### Async Execution
+
+{{< hint warning >}}
+Async Execution is only supported in Python currently.
+{{< /hint >}}
+
+When an action needs to perform time-consuming I/O operations (such as calling
external APIs, database queries, or network requests), you can use
`ctx.execute_async()` to execute these operations asynchronously. This allows
Flink to efficiently manage resources and avoid blocking the main processing
thread.
+
+To use async execution, define your action as an `async def` function and use
`await` with `ctx.execute_async()`:
+
+```python
+@action(InputEvent)
+@staticmethod
+async def process_with_async(event: InputEvent, ctx: RunnerContext) -> None:
+ def slow_external_call(data: str) -> str:
+ # Simulate a slow external API call
+ time.sleep(2)
+ return f"Processed: {data}"
+
+ # Execute the slow operation asynchronously
+ result = await ctx.execute_async(slow_external_call, event.input)
+
+ ctx.send_event(OutputEvent(output=result))
+```
+
+**Key points:**
+- Use `async def` to define the action function
+- Use `await ctx.execute_async(func, *args, **kwargs)` to execute slow
operations
+- The function passed to `execute_async` will be submitted to a thread pool
+- Access to memory is prohibited within the function passed to `execute_async`
+
+{{< hint info >}}
+Only `await ctx.execute_async(...)` is supported. Standard asyncio functions
like `asyncio.gather`, `asyncio.wait`, `asyncio.create_task`, and
`asyncio.sleep` are **NOT** supported because there is no asyncio event loop
running.
+{{< /hint >}}
+
## Event
Events are messages passed between actions. Events may carry payloads. A
single event may trigger multiple actions if they are all listening to its type.
diff --git a/python/flink_agents/api/runner_context.py
b/python/flink_agents/api/runner_context.py
index 4def0ac..43d115a 100644
--- a/python/flink_agents/api/runner_context.py
+++ b/python/flink_agents/api/runner_context.py
@@ -28,6 +28,51 @@ if TYPE_CHECKING:
from flink_agents.api.memory_object import MemoryObject
+class AsyncExecutionResult:
+ """This class wraps an asynchronous task that will be submitted to a
thread pool
+ only when awaited. This ensures lazy submission and serial execution
semantics.
+
+ Note: Only `await ctx.execute_async(...)` is supported. asyncio functions
like
+ `asyncio.gather`, `asyncio.wait`, `asyncio.create_task`, and
`asyncio.sleep`
+ are NOT supported because there is no asyncio event loop.
+ """
+
+ def __init__(self, executor: Any, func: Callable, args: tuple, kwargs:
dict) -> None:
+ """Initialize an AsyncExecutionResult.
+
+ Parameters
+ ----------
+ executor : Any
+ The thread pool executor to submit the task to.
+ func : Callable
+ The function to execute asynchronously.
+ args : tuple
+ Positional arguments to pass to the function.
+ kwargs : dict
+ Keyword arguments to pass to the function.
+ """
+ self._executor = executor
+ self._func = func
+ self._args = args
+ self._kwargs = kwargs
+
+ def __await__(self) -> Any:
+ """Make this object awaitable.
+
+ When awaited, submits the task to the thread pool and yields control
+ until the task completes.
+
+ Returns:
+ -------
+ Any
+ The result of the function execution.
+ """
+ future = self._executor.submit(self._func, *self._args, **self._kwargs)
+ while not future.done():
+ yield
+ return future.result()
+
+
class RunnerContext(ABC):
"""Abstract base class providing context for agent execution.
@@ -147,14 +192,24 @@ class RunnerContext(ABC):
func: Callable[[Any], Any],
*args: Any,
**kwargs: Any,
- ) -> Any:
+ ) -> "AsyncExecutionResult":
"""Asynchronously execute the provided function. Access to memory
- is prohibited within the function.
+ is prohibited within the function.
+
+ Usage::
+
+ async def my_action(event, ctx):
+ result = await ctx.execute_async(slow_function, arg1, arg2)
+ ctx.send_event(OutputEvent(output=result))
+
+ Note: Only `await ctx.execute_async(...)` is supported. asyncio
functions
+ like `asyncio.gather`, `asyncio.wait`, `asyncio.create_task`, and
+ `asyncio.sleep` are NOT supported.
Parameters
----------
func : Callable
- The function need to be asynchronously processing.
+ The function to be executed asynchronously.
*args : Any
Positional arguments to pass to the function.
**kwargs : Any
@@ -162,8 +217,8 @@ class RunnerContext(ABC):
Returns:
-------
- Any
- The result of the function.
+ AsyncExecutionResult
+ An awaitable object that yields the function result when awaited.
"""
@property
diff --git
a/python/flink_agents/e2e_tests/e2e_tests_integration/flink_integration_agent.py
b/python/flink_agents/e2e_tests/e2e_tests_integration/flink_integration_agent.py
index 8892631..5e6249d 100644
---
a/python/flink_agents/e2e_tests/e2e_tests_integration/flink_integration_agent.py
+++
b/python/flink_agents/e2e_tests/e2e_tests_integration/flink_integration_agent.py
@@ -89,7 +89,7 @@ class DataStreamAgent(Agent):
@action(InputEvent)
@staticmethod
- def first_action(event: Event, ctx: RunnerContext): # noqa D102
+ async def first_action(event: Event, ctx: RunnerContext): # noqa D102
def log_to_stdout(input: Any, total: int) -> bool:
# Simulating asynchronous time consumption
time.sleep(random.random())
@@ -103,7 +103,7 @@ class DataStreamAgent(Agent):
total = current_total + 1
stm.set("status.total_reviews", total)
- log_success = yield from ctx.execute_async(log_to_stdout, input_data,
total)
+ log_success = await ctx.execute_async(log_to_stdout, input_data, total)
content = copy.deepcopy(input_data)
content.review += " first action, log success=" + str(log_success) +
","
diff --git a/python/flink_agents/e2e_tests/long_term_memory_test.py
b/python/flink_agents/e2e_tests/long_term_memory_test.py
index beb91b3..58ec7cc 100644
--- a/python/flink_agents/e2e_tests/long_term_memory_test.py
+++ b/python/flink_agents/e2e_tests/long_term_memory_test.py
@@ -170,7 +170,7 @@ class LongTermMemoryAgent(Agent):
@action(InputEvent)
@staticmethod
- def add_items(event: Event, ctx: RunnerContext): # noqa D102
+ async def add_items(event: Event, ctx: RunnerContext): # noqa D102
input_data = event.input
ltm = ctx.long_term_memory
@@ -181,7 +181,7 @@ class LongTermMemoryAgent(Agent):
capacity=5,
compaction_strategy=SummarizationStrategy(model="ollama_qwen3"),
)
- yield from ctx.execute_async(memory_set.add, items=input_data.review)
+ await ctx.execute_async(memory_set.add, items=input_data.review)
timestamp_after_add = datetime.now(timezone.utc).isoformat()
stm = ctx.short_term_memory
@@ -201,11 +201,11 @@ class LongTermMemoryAgent(Agent):
@action(MyEvent)
@staticmethod
- def retrieve_items(event: Event, ctx: RunnerContext): # noqa D102
+ async def retrieve_items(event: Event, ctx: RunnerContext): # noqa D102
record: Record = event.value
record.timestamp_second_action = datetime.now(timezone.utc).isoformat()
memory_set = ctx.long_term_memory.get_memory_set(name="test_ltm")
- items = yield from ctx.execute_async(memory_set.get)
+ items = await ctx.execute_async(memory_set.get)
if (
(record.id == 1 and record.count == 3)
or (record.id == 2 and record.count == 5)
diff --git a/python/flink_agents/plan/function.py
b/python/flink_agents/plan/function.py
index b457eac..82b6f47 100644
--- a/python/flink_agents/plan/function.py
+++ b/python/flink_agents/plan/function.py
@@ -20,7 +20,7 @@ import importlib
import inspect
import logging
from abc import ABC, abstractmethod
-from typing import Any, Callable, Dict, Generator, List, Tuple, get_type_hints
+from typing import Any, Callable, Dict, List, Tuple, get_type_hints
from pydantic import BaseModel, model_serializer
@@ -262,27 +262,6 @@ class JavaFunction(Function):
"""Check function signature is legal or not."""
-class PythonGeneratorWrapper:
- """A temporary wrapper class for Python generators to work around a
- known issue in PEMJA, where the generator type is incorrectly handled.
-
- TODO: This wrapper is intended to be a temporary solution. Once PEMJA
- version 0.5.5 (or later) fixes the bug related to generator type
conversion,
- this wrapper should be removed. For more details, please refer to
- https://github.com/apache/flink-agents/issues/83.
- """
-
- def __init__(self, generator: Generator) -> None:
- """Initialize a PythonGeneratorWrapper."""
- self.generator = generator
-
- def __str__(self) -> str:
- return "PythonGeneratorWrapper, generator=" + str(self.generator)
-
- def __next__(self) -> Any:
- return next(self.generator)
-
-
def call_python_function(module: str, qualname: str, func_args: Tuple[Any,
...]) -> Any:
"""Used to call a Python function in the Pemja environment.
@@ -316,8 +295,6 @@ def call_python_function(module: str, qualname: str,
func_args: Tuple[Any, ...])
python_func = _PYTHON_FUNCTION_CACHE[cache_key]
func_result = python_func(*func_args)
- if isinstance(func_result, Generator):
- return PythonGeneratorWrapper(func_result)
return func_result
@@ -353,31 +330,41 @@ def get_python_function_cache_keys() -> List[Tuple[str,
str]]:
return list(_PYTHON_FUNCTION_CACHE.keys())
-def call_python_generator(generator_wrapper: PythonGeneratorWrapper) -> (bool,
Any):
- """Invokes the next step of a wrapped Python generator and returns whether
+_ASYNCIO_ERROR_MESSAGE = (
+ "asyncio functions (gather/wait/create_task/sleep) are not supported "
+ "in Flink Agents. Only 'await ctx.execute_async(...)' is supported."
+)
+
+
+def call_python_awaitable(awaitable: Any) -> Tuple[bool, Any]:
+ """Invokes the next step of a Python coroutine or generator and returns
whether
it is done, along with the yielded or returned value.
Args:
- generator_wrapper (PythonGeneratorWrapper): A wrapper object that
- contains a `generator` attribute. This attribute should be an instance
- of a Python generator.
+ awaitable: A Python coroutine or generator object that can be driven
+ by the send() method.
Returns:
Tuple[bool, Any]:
- - The first element is a boolean flag indicating whether the
generator
+ - The first element is a boolean flag indicating whether the
awaitable
has finished:
- * False: The generator has more values to yield.
- * True: The generator has completed.
+ * False: The awaitable has more values to yield.
+ * True: The awaitable has completed.
- The second element is either:
- * The value yielded by the generator (when not exhausted), or
- * The return value of the generator (when it has finished).
+ * The value yielded by the awaitable (when not exhausted), or
+ * The return value of the awaitable (when it has finished).
"""
try:
- result = next(generator_wrapper.generator)
+ result = awaitable.send(None)
except StopIteration as e:
return True, e.value if hasattr(e, "value") else None
+ except RuntimeError as e:
+ err_msg = str(e)
+ if "no running event loop" in err_msg or "await wasn't used with
future" in err_msg:
+ raise RuntimeError(_ASYNCIO_ERROR_MESSAGE) from e
+ raise
except Exception:
- logger.exception("Error in generator execution")
+ logger.exception("Error in awaitable execution")
raise
else:
return False, result
diff --git a/python/flink_agents/plan/tests/test_function.py
b/python/flink_agents/plan/tests/test_function.py
index 4bc092a..832b336 100644
--- a/python/flink_agents/plan/tests/test_function.py
+++ b/python/flink_agents/plan/tests/test_function.py
@@ -26,7 +26,6 @@ import pytest
from flink_agents.api.events.event import Event, InputEvent, OutputEvent
from flink_agents.plan.function import (
PythonFunction,
- PythonGeneratorWrapper,
_is_function_cacheable,
call_python_function,
clear_python_function_cache,
@@ -315,9 +314,10 @@ def test_selective_caching_generator_functions() -> None:
result = call_python_function(
"flink_agents.plan.tests.test_function", "generator_function", (3,)
)
- assert isinstance(result, PythonGeneratorWrapper)
+ # Result is now a generator directly (no wrapper)
+ assert isinstance(result, Generator)
# Convert generator to list for testing
- result_list = list(result.generator)
+ result_list = list(result)
assert result_list == [0, 1, 2]
# Should not be cached
diff --git a/python/flink_agents/runtime/flink_runner_context.py
b/python/flink_agents/runtime/flink_runner_context.py
index c63fc7c..7c89ef1 100644
--- a/python/flink_agents/runtime/flink_runner_context.py
+++ b/python/flink_agents/runtime/flink_runner_context.py
@@ -31,7 +31,7 @@ from flink_agents.api.memory.long_term_memory import (
)
from flink_agents.api.memory_object import MemoryType
from flink_agents.api.resource import Resource, ResourceType
-from flink_agents.api.runner_context import RunnerContext
+from flink_agents.api.runner_context import AsyncExecutionResult, RunnerContext
from flink_agents.plan.agent_plan import AgentPlan
from flink_agents.runtime.flink_memory_object import FlinkMemoryObject
from flink_agents.runtime.flink_metric_group import FlinkMetricGroup
@@ -191,18 +191,11 @@ class FlinkRunnerContext(RunnerContext):
func: Callable[[Any], Any],
*args: Any,
**kwargs: Any,
- ) -> Any:
+ ) -> AsyncExecutionResult:
"""Asynchronously execute the provided function. Access to memory
is prohibited within the function.
"""
- future = self.executor.submit(func, *args, **kwargs)
- while not future.done():
- # TODO: Currently, we are using a polling mechanism to check
whether
- # the future has completed. This approach should be optimized in
the
- # future by switching to a notification-based model, where the
Flink
- # operator is notified directly once the future is completed.
- yield
- return future.result()
+ return AsyncExecutionResult(self.executor, func, args, kwargs)
@property
@override
diff --git a/python/flink_agents/runtime/local_runner.py
b/python/flink_agents/runtime/local_runner.py
index 21307f3..084f7bd 100644
--- a/python/flink_agents/runtime/local_runner.py
+++ b/python/flink_agents/runtime/local_runner.py
@@ -15,10 +15,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
+import asyncio
import logging
import uuid
from collections import deque
-from typing import Any, Callable, Dict, Generator, List
+from concurrent.futures import Future
+from typing import Any, Callable, Dict, List
from typing_extensions import override
@@ -28,7 +30,7 @@ from flink_agents.api.memory.long_term_memory import
BaseLongTermMemory
from flink_agents.api.memory_object import MemoryObject, MemoryType
from flink_agents.api.metric_group import MetricGroup
from flink_agents.api.resource import Resource, ResourceType
-from flink_agents.api.runner_context import RunnerContext
+from flink_agents.api.runner_context import AsyncExecutionResult, RunnerContext
from flink_agents.plan.agent_plan import AgentPlan
from flink_agents.plan.configuration import AgentConfiguration
from flink_agents.runtime.agent_runner import AgentRunner
@@ -182,16 +184,33 @@ class LocalRunnerContext(RunnerContext):
func: Callable[[Any], Any],
*args: Any,
**kwargs: Any,
- ) -> Any:
+ ) -> AsyncExecutionResult:
"""Asynchronously execute the provided function. Access to memory
is prohibited within the function.
+
+ Note: Local runner executes synchronously but returns an
AsyncExecutionResult
+ for API consistency.
"""
logger.warning(
"Local runner does not support asynchronous execution; falling
back to synchronous execution."
)
- func_result = func(*args, **kwargs)
- yield
- return func_result
+ # Execute synchronously and wrap the result in a completed Future
+ future: Future = Future()
+ try:
+ result = func(*args, **kwargs)
+ future.set_result(result)
+ except Exception as e:
+ future.set_exception(e)
+
+ # Create a mock executor that returns the pre-completed future
+ class _SyncExecutor:
+ def __init__(self, completed_future: Future) -> None:
+ self._future = completed_future
+
+ def submit(self, fn: Callable, *args: Any, **kwargs: Any) ->
Future:
+ return self._future
+
+ return AsyncExecutionResult(_SyncExecutor(future), func, args, kwargs)
@property
@override
@@ -285,10 +304,14 @@ class LocalRunner(AgentRunner):
logger.info("key: %s, performing action: %s", key, action.name)
context.action_name = action.name
func_result = action.exec(event, context)
- if isinstance(func_result, Generator):
+ if asyncio.iscoroutine(func_result):
+ # Drive the coroutine to completion using send()
try:
- for _ in func_result:
- pass
+ while True:
+ func_result.send(None)
+ except StopIteration:
+ # Coroutine completed normally
+ pass
except Exception:
logger.exception("Error in async execution")
raise
diff --git
a/python/flink_agents/runtime/tests/test_local_execution_environment.py
b/python/flink_agents/runtime/tests/test_local_execution_environment.py
index 3c15926..2201016 100644
--- a/python/flink_agents/runtime/tests/test_local_execution_environment.py
+++ b/python/flink_agents/runtime/tests/test_local_execution_environment.py
@@ -38,13 +38,13 @@ class Agent1(Agent): # noqa: D101
class Agent1WithAsync(Agent): # noqa: D101
@action(InputEvent)
@staticmethod
- def increment(event: Event, ctx: RunnerContext): # noqa D102
+ async def increment(event: Event, ctx: RunnerContext): # noqa D102
def my_func(value: int) -> int:
time.sleep(1)
return value + 1
input = event.input
- value = yield from ctx.execute_async(my_func, input)
+ value = await ctx.execute_async(my_func, input)
ctx.send_event(OutputEvent(output=value))
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
index 8f1e2d5..779297a 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
@@ -51,17 +51,17 @@ public class PythonActionTask extends ActionTask {
key);
runnerContext.checkNoPendingEvents();
- String pythonGeneratorRef =
+ String pythonAwaitableRef =
executor.executePythonFunction(
(PythonFunction) action.getExec(), (PythonEvent)
event, key.hashCode());
// If a user-defined action uses an interface to submit asynchronous
tasks, it will return a
- // Python generator object instance upon its first execution.
Otherwise, it means that no
- // asynchronous tasks were submitted and the action has already
completed.
- if (pythonGeneratorRef != null) {
- // The Python action generates a generator. We need to execute it
once, which will
+ // Python coroutine (awaitable) object instance upon its first
execution. Otherwise, it
+ // means that no asynchronous tasks were submitted and the action has
already completed.
+ if (pythonAwaitableRef != null) {
+ // The Python action generates an awaitable. We need to execute it
once, which will
// submit an asynchronous task and return whether the action has
been completed.
ActionTask tempGeneratedActionTask =
- new PythonGeneratorActionTask(key, event, action,
pythonGeneratorRef);
+ new PythonGeneratorActionTask(key, event, action,
pythonAwaitableRef);
tempGeneratedActionTask.setRunnerContext(runnerContext);
return tempGeneratedActionTask.invoke(userCodeClassLoader,
executor);
}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
index 969cb8f..c683436 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
@@ -22,24 +22,24 @@ import org.apache.flink.agents.plan.actions.Action;
import org.apache.flink.agents.runtime.operator.ActionTask;
import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor;
-/** An {@link ActionTask} wrapper a Python Generator to represent a code block
in Python action. */
+/** An {@link ActionTask} wrapper a Python awaitable to represent a code block
in Python action. */
public class PythonGeneratorActionTask extends PythonActionTask {
- private final String pythonGeneratorRef;
+ private final String pythonAwaitableRef;
public PythonGeneratorActionTask(
- Object key, Event event, Action action, String pythonGeneratorRef)
{
+ Object key, Event event, Action action, String pythonAwaitableRef)
{
super(key, event, action);
- this.pythonGeneratorRef = pythonGeneratorRef;
+ this.pythonAwaitableRef = pythonAwaitableRef;
}
@Override
public ActionTaskResult invoke(ClassLoader userCodeClassLoader,
PythonActionExecutor executor) {
LOG.debug(
- "Try execute python generator action {} for event {} with key
{}.",
+ "Try execute python awaitable action {} for event {} with key
{}.",
action.getName(),
event,
key);
- boolean finished = executor.callPythonGenerator(pythonGeneratorRef);
+ boolean finished = executor.callPythonAwaitable(pythonAwaitableRef);
ActionTask generatedActionTask = finished ? null : this;
return new ActionTaskResult(
finished,
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
index e0108fb..4d7792a 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
@@ -49,10 +49,10 @@ public class PythonActionExecutor {
private static final String CLOSE_ASYNC_THREAD_POOL =
"flink_runner_context.close_async_thread_pool";
- // =========== PYTHON GENERATOR ===========
- private static final String CALL_PYTHON_GENERATOR =
"function.call_python_generator";
- private static final String PYTHON_GENERATOR_VAR_NAME_PREFIX =
"python_generator_";
- private static final AtomicLong PYTHON_GENERATOR_VAR_ID = new
AtomicLong(0);
+ // =========== PYTHON AWAITABLE ===========
+ private static final String CALL_PYTHON_AWAITABLE =
"function.call_python_awaitable";
+ private static final String PYTHON_AWAITABLE_VAR_NAME_PREFIX =
"python_awaitable_";
+ private static final AtomicLong PYTHON_AWAITABLE_VAR_ID = new
AtomicLong(0);
// =========== PYTHON AND JAVA OBJECT CONVERT ===========
private static final String CONVERT_TO_PYTHON_OBJECT =
@@ -98,14 +98,14 @@ public class PythonActionExecutor {
}
/**
- * Execute the Python function, which may return a Python generator that
needs to be processed
- * in the future. Due to an issue in Pemja regarding incorrect object
reference counting, this
- * may lead to garbage collection of the object. To prevent this, we use
the set and get methods
- * to manually increment the object's reference count, then return the
name of the Python
- * generator variable.
+ * Execute the Python function, which may return a Python coroutine
(awaitable) that needs to be
+ * processed in the future. Due to an issue in Pemja regarding incorrect
object reference
+ * counting, this may lead to garbage collection of the object. To prevent
this, we use the set
+ * and get methods to manually increment the object's reference count,
then return the name of
+ * the Python awaitable variable.
*
- * @return The name of the Python generator variable. It may be null if
the Python function does
- * not return a generator.
+ * @return The name of the Python awaitable variable. It may be null if
the Python function does
+ * not return a coroutine.
*/
public String executePythonFunction(PythonFunction function, PythonEvent
event, int hashOfKey)
throws Exception {
@@ -125,12 +125,12 @@ public class PythonActionExecutor {
if (calledResult == null) {
return null;
} else {
- // must be a generator
- String pythonGeneratorRef =
- PYTHON_GENERATOR_VAR_NAME_PREFIX
- + PYTHON_GENERATOR_VAR_ID.incrementAndGet();
- interpreter.set(pythonGeneratorRef, calledResult);
- return pythonGeneratorRef;
+ // must be a coroutine (awaitable)
+ String pythonAwaitableRef =
+ PYTHON_AWAITABLE_VAR_NAME_PREFIX
+ + PYTHON_AWAITABLE_VAR_ID.incrementAndGet();
+ interpreter.set(pythonAwaitableRef, calledResult);
+ return pythonAwaitableRef;
}
} catch (Exception e) {
runnerContext.drainEvents(null);
@@ -154,19 +154,19 @@ public class PythonActionExecutor {
}
/**
- * Invokes the next step of a Python generator.
+ * Invokes the next step of a Python awaitable (coroutine or generator).
*
- * <p>This method is typically used after initializing or resuming a
Python generator that was
+ * <p>This method is typically used after initializing or resuming a
Python coroutine that was
* created via a user-defined action involving asynchronous execution.
*
- * @param pythonGeneratorRef the reference name of the Python generator
object stored in the
+ * @param pythonAwaitableRef the reference name of the Python awaitable
object stored in the
* interpreter's context
- * @return true if the generator has completed; false otherwise
+ * @return true if the awaitable has completed; false otherwise
*/
- public boolean callPythonGenerator(String pythonGeneratorRef) {
- // Calling next(generator) in Python returns a tuple of (finished,
output).
- Object pythonGenerator = interpreter.get(pythonGeneratorRef);
- Object invokeResult = interpreter.invoke(CALL_PYTHON_GENERATOR,
pythonGenerator);
+ public boolean callPythonAwaitable(String pythonAwaitableRef) {
+ // Calling awaitable.send(None) in Python returns a tuple of
(finished, output).
+ Object pythonAwaitable = interpreter.get(pythonAwaitableRef);
+ Object invokeResult = interpreter.invoke(CALL_PYTHON_AWAITABLE,
pythonAwaitable);
checkState(invokeResult.getClass().isArray() && ((Object[])
invokeResult).length == 2);
return (boolean) ((Object[]) invokeResult)[0];
}