This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new 8ab9354  [Feature] Improve Python Async Execution API (#409)
8ab9354 is described below

commit 8ab9354556ed1c043e205dc32651d1297d8134e6
Author: Xuannan <[email protected]>
AuthorDate: Wed Jan 7 16:27:31 2026 +0800

    [Feature] Improve Python Async Execution API (#409)
---
 docs/content/docs/development/workflow_agent.md    | 37 +++++++++++-
 python/flink_agents/api/runner_context.py          | 65 ++++++++++++++++++++--
 .../flink_integration_agent.py                     |  4 +-
 .../e2e_tests/long_term_memory_test.py             |  8 +--
 python/flink_agents/plan/function.py               | 59 ++++++++------------
 python/flink_agents/plan/tests/test_function.py    |  6 +-
 .../flink_agents/runtime/flink_runner_context.py   | 13 +----
 python/flink_agents/runtime/local_runner.py        | 41 +++++++++++---
 .../tests/test_local_execution_environment.py      |  4 +-
 .../runtime/python/operator/PythonActionTask.java  | 12 ++--
 .../python/operator/PythonGeneratorActionTask.java | 12 ++--
 .../runtime/python/utils/PythonActionExecutor.java | 50 ++++++++---------
 12 files changed, 202 insertions(+), 109 deletions(-)

diff --git a/docs/content/docs/development/workflow_agent.md 
b/docs/content/docs/development/workflow_agent.md
index 346f80d..76431d0 100644
--- a/docs/content/docs/development/workflow_agent.md
+++ b/docs/content/docs/development/workflow_agent.md
@@ -219,7 +219,7 @@ An action is a piece of code that can be executed. Each 
action listens to at lea
 
 To declare an action in Agent, user can use `@action` to decorate a function 
of Agent class in python (or annotate a method of Agent class in java), and 
declare the listened event types as decorator/annotation parameters. 
 
-The decorated/annotated function signature should be `(Event, RunnerContext) 
-> None`
+The decorated/annotated function signature should be `(Event, RunnerContext) 
-> None`. In Python, actions can also be defined as `async def` when using 
async execution (see [Async Execution](#async-execution)).
 
 {{< tabs "Action Function" >}}
 
@@ -277,6 +277,41 @@ public static void processInput(InputEvent event, 
RunnerContext ctx) throws Exce
 
 {{< /tabs >}}
 
+### Async Execution
+
+{{< hint warning >}}
+Async Execution is only supported in Python currently.
+{{< /hint >}}
+
+When an action needs to perform time-consuming I/O operations (such as calling 
external APIs, database queries, or network requests), you can use 
`ctx.execute_async()` to execute these operations asynchronously. This allows 
Flink to efficiently manage resources and avoid blocking the main processing 
thread.
+
+To use async execution, define your action as an `async def` function and use 
`await` with `ctx.execute_async()`:
+
+```python
+@action(InputEvent)
+@staticmethod
+async def process_with_async(event: InputEvent, ctx: RunnerContext) -> None:
+    def slow_external_call(data: str) -> str:
+        # Simulate a slow external API call
+        time.sleep(2)
+        return f"Processed: {data}"
+    
+    # Execute the slow operation asynchronously
+    result = await ctx.execute_async(slow_external_call, event.input)
+    
+    ctx.send_event(OutputEvent(output=result))
+```
+
+**Key points:**
+- Use `async def` to define the action function
+- Use `await ctx.execute_async(func, *args, **kwargs)` to execute slow 
operations
+- The function passed to `execute_async` will be submitted to a thread pool
+- Access to memory is prohibited within the function passed to `execute_async`
+
+{{< hint info >}}
+Only `await ctx.execute_async(...)` is supported. Standard asyncio functions 
like `asyncio.gather`, `asyncio.wait`, `asyncio.create_task`, and 
`asyncio.sleep` are **NOT** supported because there is no asyncio event loop 
running.
+{{< /hint >}}
+
 ## Event
 Events are messages passed between actions. Events may carry payloads. A 
single event may trigger multiple actions if they are all listening to its type.
 
diff --git a/python/flink_agents/api/runner_context.py 
b/python/flink_agents/api/runner_context.py
index 4def0ac..43d115a 100644
--- a/python/flink_agents/api/runner_context.py
+++ b/python/flink_agents/api/runner_context.py
@@ -28,6 +28,51 @@ if TYPE_CHECKING:
     from flink_agents.api.memory_object import MemoryObject
 
 
+class AsyncExecutionResult:
+    """This class wraps an asynchronous task that will be submitted to a 
thread pool
+    only when awaited. This ensures lazy submission and serial execution 
semantics.
+
+    Note: Only `await ctx.execute_async(...)` is supported. asyncio functions 
like
+    `asyncio.gather`, `asyncio.wait`, `asyncio.create_task`, and 
`asyncio.sleep`
+    are NOT supported because there is no asyncio event loop.
+    """
+
+    def __init__(self, executor: Any, func: Callable, args: tuple, kwargs: 
dict) -> None:
+        """Initialize an AsyncExecutionResult.
+
+        Parameters
+        ----------
+        executor : Any
+            The thread pool executor to submit the task to.
+        func : Callable
+            The function to execute asynchronously.
+        args : tuple
+            Positional arguments to pass to the function.
+        kwargs : dict
+            Keyword arguments to pass to the function.
+        """
+        self._executor = executor
+        self._func = func
+        self._args = args
+        self._kwargs = kwargs
+
+    def __await__(self) -> Any:
+        """Make this object awaitable.
+
+        When awaited, submits the task to the thread pool and yields control
+        until the task completes.
+
+        Returns:
+        -------
+        Any
+            The result of the function execution.
+        """
+        future = self._executor.submit(self._func, *self._args, **self._kwargs)
+        while not future.done():
+            yield
+        return future.result()
+
+
 class RunnerContext(ABC):
     """Abstract base class providing context for agent execution.
 
@@ -147,14 +192,24 @@ class RunnerContext(ABC):
         func: Callable[[Any], Any],
         *args: Any,
         **kwargs: Any,
-    ) -> Any:
+    ) -> "AsyncExecutionResult":
         """Asynchronously execute the provided function. Access to memory
-         is prohibited within the function.
+        is prohibited within the function.
+
+        Usage::
+
+            async def my_action(event, ctx):
+                result = await ctx.execute_async(slow_function, arg1, arg2)
+                ctx.send_event(OutputEvent(output=result))
+
+        Note: Only `await ctx.execute_async(...)` is supported. asyncio 
functions
+        like `asyncio.gather`, `asyncio.wait`, `asyncio.create_task`, and
+        `asyncio.sleep` are NOT supported.
 
         Parameters
         ----------
         func : Callable
-            The function need to be asynchronously processing.
+            The function to be executed asynchronously.
         *args : Any
             Positional arguments to pass to the function.
         **kwargs : Any
@@ -162,8 +217,8 @@ class RunnerContext(ABC):
 
         Returns:
         -------
-        Any
-            The result of the function.
+        AsyncExecutionResult
+            An awaitable object that yields the function result when awaited.
         """
 
     @property
diff --git 
a/python/flink_agents/e2e_tests/e2e_tests_integration/flink_integration_agent.py
 
b/python/flink_agents/e2e_tests/e2e_tests_integration/flink_integration_agent.py
index 8892631..5e6249d 100644
--- 
a/python/flink_agents/e2e_tests/e2e_tests_integration/flink_integration_agent.py
+++ 
b/python/flink_agents/e2e_tests/e2e_tests_integration/flink_integration_agent.py
@@ -89,7 +89,7 @@ class DataStreamAgent(Agent):
 
     @action(InputEvent)
     @staticmethod
-    def first_action(event: Event, ctx: RunnerContext):  # noqa D102
+    async def first_action(event: Event, ctx: RunnerContext):  # noqa D102
         def log_to_stdout(input: Any, total: int) -> bool:
             # Simulating asynchronous time consumption
             time.sleep(random.random())
@@ -103,7 +103,7 @@ class DataStreamAgent(Agent):
         total = current_total + 1
         stm.set("status.total_reviews", total)
 
-        log_success = yield from ctx.execute_async(log_to_stdout, input_data, 
total)
+        log_success = await ctx.execute_async(log_to_stdout, input_data, total)
 
         content = copy.deepcopy(input_data)
         content.review += " first action, log success=" + str(log_success) + 
","
diff --git a/python/flink_agents/e2e_tests/long_term_memory_test.py 
b/python/flink_agents/e2e_tests/long_term_memory_test.py
index beb91b3..58ec7cc 100644
--- a/python/flink_agents/e2e_tests/long_term_memory_test.py
+++ b/python/flink_agents/e2e_tests/long_term_memory_test.py
@@ -170,7 +170,7 @@ class LongTermMemoryAgent(Agent):
 
     @action(InputEvent)
     @staticmethod
-    def add_items(event: Event, ctx: RunnerContext):  # noqa D102
+    async def add_items(event: Event, ctx: RunnerContext):  # noqa D102
         input_data = event.input
         ltm = ctx.long_term_memory
 
@@ -181,7 +181,7 @@ class LongTermMemoryAgent(Agent):
             capacity=5,
             compaction_strategy=SummarizationStrategy(model="ollama_qwen3"),
         )
-        yield from ctx.execute_async(memory_set.add, items=input_data.review)
+        await ctx.execute_async(memory_set.add, items=input_data.review)
         timestamp_after_add = datetime.now(timezone.utc).isoformat()
 
         stm = ctx.short_term_memory
@@ -201,11 +201,11 @@ class LongTermMemoryAgent(Agent):
 
     @action(MyEvent)
     @staticmethod
-    def retrieve_items(event: Event, ctx: RunnerContext):  # noqa D102
+    async def retrieve_items(event: Event, ctx: RunnerContext):  # noqa D102
         record: Record = event.value
         record.timestamp_second_action = datetime.now(timezone.utc).isoformat()
         memory_set = ctx.long_term_memory.get_memory_set(name="test_ltm")
-        items = yield from ctx.execute_async(memory_set.get)
+        items = await ctx.execute_async(memory_set.get)
         if (
             (record.id == 1 and record.count == 3)
             or (record.id == 2 and record.count == 5)
diff --git a/python/flink_agents/plan/function.py 
b/python/flink_agents/plan/function.py
index b457eac..82b6f47 100644
--- a/python/flink_agents/plan/function.py
+++ b/python/flink_agents/plan/function.py
@@ -20,7 +20,7 @@ import importlib
 import inspect
 import logging
 from abc import ABC, abstractmethod
-from typing import Any, Callable, Dict, Generator, List, Tuple, get_type_hints
+from typing import Any, Callable, Dict, List, Tuple, get_type_hints
 
 from pydantic import BaseModel, model_serializer
 
@@ -262,27 +262,6 @@ class JavaFunction(Function):
         """Check function signature is legal or not."""
 
 
-class PythonGeneratorWrapper:
-    """A temporary wrapper class for Python generators to work around a
-    known issue in PEMJA, where the generator type is incorrectly handled.
-
-    TODO: This wrapper is intended to be a temporary solution. Once PEMJA
-    version 0.5.5 (or later) fixes the bug related to generator type 
conversion,
-    this wrapper should be removed. For more details, please refer to
-    https://github.com/apache/flink-agents/issues/83.
-    """
-
-    def __init__(self, generator: Generator) -> None:
-        """Initialize a PythonGeneratorWrapper."""
-        self.generator = generator
-
-    def __str__(self) -> str:
-        return "PythonGeneratorWrapper, generator=" + str(self.generator)
-
-    def __next__(self) -> Any:
-        return next(self.generator)
-
-
 def call_python_function(module: str, qualname: str, func_args: Tuple[Any, 
...]) -> Any:
     """Used to call a Python function in the Pemja environment.
 
@@ -316,8 +295,6 @@ def call_python_function(module: str, qualname: str, 
func_args: Tuple[Any, ...])
         python_func = _PYTHON_FUNCTION_CACHE[cache_key]
 
     func_result = python_func(*func_args)
-    if isinstance(func_result, Generator):
-        return PythonGeneratorWrapper(func_result)
     return func_result
 
 
@@ -353,31 +330,41 @@ def get_python_function_cache_keys() -> List[Tuple[str, 
str]]:
     return list(_PYTHON_FUNCTION_CACHE.keys())
 
 
-def call_python_generator(generator_wrapper: PythonGeneratorWrapper) -> (bool, 
Any):
-    """Invokes the next step of a wrapped Python generator and returns whether
+_ASYNCIO_ERROR_MESSAGE = (
+    "asyncio functions (gather/wait/create_task/sleep) are not supported "
+    "in Flink Agents. Only 'await ctx.execute_async(...)' is supported."
+)
+
+
+def call_python_awaitable(awaitable: Any) -> Tuple[bool, Any]:
+    """Invokes the next step of a Python coroutine or generator and returns 
whether
     it is done, along with the yielded or returned value.
 
     Args:
-        generator_wrapper (PythonGeneratorWrapper): A wrapper object that
-        contains a `generator` attribute. This attribute should be an instance
-        of a Python generator.
+        awaitable: A Python coroutine or generator object that can be driven
+        by the send() method.
 
     Returns:
         Tuple[bool, Any]:
-            - The first element is a boolean flag indicating whether the 
generator
+            - The first element is a boolean flag indicating whether the 
awaitable
             has finished:
-                * False: The generator has more values to yield.
-                * True: The generator has completed.
+                * False: The awaitable has more values to yield.
+                * True: The awaitable has completed.
             - The second element is either:
-                * The value yielded by the generator (when not exhausted), or
-                * The return value of the generator (when it has finished).
+                * The value yielded by the awaitable (when not exhausted), or
+                * The return value of the awaitable (when it has finished).
     """
     try:
-        result = next(generator_wrapper.generator)
+        result = awaitable.send(None)
     except StopIteration as e:
         return True, e.value if hasattr(e, "value") else None
+    except RuntimeError as e:
+        err_msg = str(e)
+        if "no running event loop" in err_msg or "await wasn't used with 
future" in err_msg:
+            raise RuntimeError(_ASYNCIO_ERROR_MESSAGE) from e
+        raise
     except Exception:
-        logger.exception("Error in generator execution")
+        logger.exception("Error in awaitable execution")
         raise
     else:
         return False, result
diff --git a/python/flink_agents/plan/tests/test_function.py 
b/python/flink_agents/plan/tests/test_function.py
index 4bc092a..832b336 100644
--- a/python/flink_agents/plan/tests/test_function.py
+++ b/python/flink_agents/plan/tests/test_function.py
@@ -26,7 +26,6 @@ import pytest
 from flink_agents.api.events.event import Event, InputEvent, OutputEvent
 from flink_agents.plan.function import (
     PythonFunction,
-    PythonGeneratorWrapper,
     _is_function_cacheable,
     call_python_function,
     clear_python_function_cache,
@@ -315,9 +314,10 @@ def test_selective_caching_generator_functions() -> None:
     result = call_python_function(
         "flink_agents.plan.tests.test_function", "generator_function", (3,)
     )
-    assert isinstance(result, PythonGeneratorWrapper)
+    # Result is now a generator directly (no wrapper)
+    assert isinstance(result, Generator)
     # Convert generator to list for testing
-    result_list = list(result.generator)
+    result_list = list(result)
     assert result_list == [0, 1, 2]
 
     # Should not be cached
diff --git a/python/flink_agents/runtime/flink_runner_context.py 
b/python/flink_agents/runtime/flink_runner_context.py
index c63fc7c..7c89ef1 100644
--- a/python/flink_agents/runtime/flink_runner_context.py
+++ b/python/flink_agents/runtime/flink_runner_context.py
@@ -31,7 +31,7 @@ from flink_agents.api.memory.long_term_memory import (
 )
 from flink_agents.api.memory_object import MemoryType
 from flink_agents.api.resource import Resource, ResourceType
-from flink_agents.api.runner_context import RunnerContext
+from flink_agents.api.runner_context import AsyncExecutionResult, RunnerContext
 from flink_agents.plan.agent_plan import AgentPlan
 from flink_agents.runtime.flink_memory_object import FlinkMemoryObject
 from flink_agents.runtime.flink_metric_group import FlinkMetricGroup
@@ -191,18 +191,11 @@ class FlinkRunnerContext(RunnerContext):
         func: Callable[[Any], Any],
         *args: Any,
         **kwargs: Any,
-    ) -> Any:
+    ) -> AsyncExecutionResult:
         """Asynchronously execute the provided function. Access to memory
         is prohibited within the function.
         """
-        future = self.executor.submit(func, *args, **kwargs)
-        while not future.done():
-            # TODO: Currently, we are using a polling mechanism to check 
whether
-            #  the future has completed. This approach should be optimized in 
the
-            #  future by switching to a notification-based model, where the 
Flink
-            #  operator is notified directly once the future is completed.
-            yield
-        return future.result()
+        return AsyncExecutionResult(self.executor, func, args, kwargs)
 
     @property
     @override
diff --git a/python/flink_agents/runtime/local_runner.py 
b/python/flink_agents/runtime/local_runner.py
index 21307f3..084f7bd 100644
--- a/python/flink_agents/runtime/local_runner.py
+++ b/python/flink_agents/runtime/local_runner.py
@@ -15,10 +15,12 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
#################################################################################
+import asyncio
 import logging
 import uuid
 from collections import deque
-from typing import Any, Callable, Dict, Generator, List
+from concurrent.futures import Future
+from typing import Any, Callable, Dict, List
 
 from typing_extensions import override
 
@@ -28,7 +30,7 @@ from flink_agents.api.memory.long_term_memory import 
BaseLongTermMemory
 from flink_agents.api.memory_object import MemoryObject, MemoryType
 from flink_agents.api.metric_group import MetricGroup
 from flink_agents.api.resource import Resource, ResourceType
-from flink_agents.api.runner_context import RunnerContext
+from flink_agents.api.runner_context import AsyncExecutionResult, RunnerContext
 from flink_agents.plan.agent_plan import AgentPlan
 from flink_agents.plan.configuration import AgentConfiguration
 from flink_agents.runtime.agent_runner import AgentRunner
@@ -182,16 +184,33 @@ class LocalRunnerContext(RunnerContext):
         func: Callable[[Any], Any],
         *args: Any,
         **kwargs: Any,
-    ) -> Any:
+    ) -> AsyncExecutionResult:
         """Asynchronously execute the provided function. Access to memory
         is prohibited within the function.
+
+        Note: Local runner executes synchronously but returns an 
AsyncExecutionResult
+        for API consistency.
         """
         logger.warning(
             "Local runner does not support asynchronous execution; falling 
back to synchronous execution."
         )
-        func_result = func(*args, **kwargs)
-        yield
-        return func_result
+        # Execute synchronously and wrap the result in a completed Future
+        future: Future = Future()
+        try:
+            result = func(*args, **kwargs)
+            future.set_result(result)
+        except Exception as e:
+            future.set_exception(e)
+
+        # Create a mock executor that returns the pre-completed future
+        class _SyncExecutor:
+            def __init__(self, completed_future: Future) -> None:
+                self._future = completed_future
+
+            def submit(self, fn: Callable, *args: Any, **kwargs: Any) -> 
Future:
+                return self._future
+
+        return AsyncExecutionResult(_SyncExecutor(future), func, args, kwargs)
 
     @property
     @override
@@ -285,10 +304,14 @@ class LocalRunner(AgentRunner):
                 logger.info("key: %s, performing action: %s", key, action.name)
                 context.action_name = action.name
                 func_result = action.exec(event, context)
-                if isinstance(func_result, Generator):
+                if asyncio.iscoroutine(func_result):
+                    # Drive the coroutine to completion using send()
                     try:
-                        for _ in func_result:
-                            pass
+                        while True:
+                            func_result.send(None)
+                    except StopIteration:
+                        # Coroutine completed normally
+                        pass
                     except Exception:
                         logger.exception("Error in async execution")
                         raise
diff --git 
a/python/flink_agents/runtime/tests/test_local_execution_environment.py 
b/python/flink_agents/runtime/tests/test_local_execution_environment.py
index 3c15926..2201016 100644
--- a/python/flink_agents/runtime/tests/test_local_execution_environment.py
+++ b/python/flink_agents/runtime/tests/test_local_execution_environment.py
@@ -38,13 +38,13 @@ class Agent1(Agent):  # noqa: D101
 class Agent1WithAsync(Agent):  # noqa: D101
     @action(InputEvent)
     @staticmethod
-    def increment(event: Event, ctx: RunnerContext):  # noqa D102
+    async def increment(event: Event, ctx: RunnerContext):  # noqa D102
         def my_func(value: int) -> int:
             time.sleep(1)
             return value + 1
 
         input = event.input
-        value = yield from ctx.execute_async(my_func, input)
+        value = await ctx.execute_async(my_func, input)
         ctx.send_event(OutputEvent(output=value))
 
 
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
index 8f1e2d5..779297a 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonActionTask.java
@@ -51,17 +51,17 @@ public class PythonActionTask extends ActionTask {
                 key);
         runnerContext.checkNoPendingEvents();
 
-        String pythonGeneratorRef =
+        String pythonAwaitableRef =
                 executor.executePythonFunction(
                         (PythonFunction) action.getExec(), (PythonEvent) 
event, key.hashCode());
         // If a user-defined action uses an interface to submit asynchronous 
tasks, it will return a
-        // Python generator object instance upon its first execution. 
Otherwise, it means that no
-        // asynchronous tasks were submitted and the action has already 
completed.
-        if (pythonGeneratorRef != null) {
-            // The Python action generates a generator. We need to execute it 
once, which will
+        // Python coroutine (awaitable) object instance upon its first 
execution. Otherwise, it
+        // means that no asynchronous tasks were submitted and the action has 
already completed.
+        if (pythonAwaitableRef != null) {
+            // The Python action generates an awaitable. We need to execute it 
once, which will
             // submit an asynchronous task and return whether the action has 
been completed.
             ActionTask tempGeneratedActionTask =
-                    new PythonGeneratorActionTask(key, event, action, 
pythonGeneratorRef);
+                    new PythonGeneratorActionTask(key, event, action, 
pythonAwaitableRef);
             tempGeneratedActionTask.setRunnerContext(runnerContext);
             return tempGeneratedActionTask.invoke(userCodeClassLoader, 
executor);
         }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
index 969cb8f..c683436 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/operator/PythonGeneratorActionTask.java
@@ -22,24 +22,24 @@ import org.apache.flink.agents.plan.actions.Action;
 import org.apache.flink.agents.runtime.operator.ActionTask;
 import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor;
 
-/** An {@link ActionTask} wrapper a Python Generator to represent a code block 
in Python action. */
+/** An {@link ActionTask} wrapper a Python awaitable to represent a code block 
in Python action. */
 public class PythonGeneratorActionTask extends PythonActionTask {
-    private final String pythonGeneratorRef;
+    private final String pythonAwaitableRef;
 
     public PythonGeneratorActionTask(
-            Object key, Event event, Action action, String pythonGeneratorRef) 
{
+            Object key, Event event, Action action, String pythonAwaitableRef) 
{
         super(key, event, action);
-        this.pythonGeneratorRef = pythonGeneratorRef;
+        this.pythonAwaitableRef = pythonAwaitableRef;
     }
 
     @Override
     public ActionTaskResult invoke(ClassLoader userCodeClassLoader, 
PythonActionExecutor executor) {
         LOG.debug(
-                "Try execute python generator action {} for event {} with key 
{}.",
+                "Try execute python awaitable action {} for event {} with key 
{}.",
                 action.getName(),
                 event,
                 key);
-        boolean finished = executor.callPythonGenerator(pythonGeneratorRef);
+        boolean finished = executor.callPythonAwaitable(pythonAwaitableRef);
         ActionTask generatedActionTask = finished ? null : this;
         return new ActionTaskResult(
                 finished,
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
index e0108fb..4d7792a 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
@@ -49,10 +49,10 @@ public class PythonActionExecutor {
     private static final String CLOSE_ASYNC_THREAD_POOL =
             "flink_runner_context.close_async_thread_pool";
 
-    // =========== PYTHON GENERATOR ===========
-    private static final String CALL_PYTHON_GENERATOR = 
"function.call_python_generator";
-    private static final String PYTHON_GENERATOR_VAR_NAME_PREFIX = 
"python_generator_";
-    private static final AtomicLong PYTHON_GENERATOR_VAR_ID = new 
AtomicLong(0);
+    // =========== PYTHON AWAITABLE ===========
+    private static final String CALL_PYTHON_AWAITABLE = 
"function.call_python_awaitable";
+    private static final String PYTHON_AWAITABLE_VAR_NAME_PREFIX = 
"python_awaitable_";
+    private static final AtomicLong PYTHON_AWAITABLE_VAR_ID = new 
AtomicLong(0);
 
     // =========== PYTHON AND JAVA OBJECT CONVERT ===========
     private static final String CONVERT_TO_PYTHON_OBJECT =
@@ -98,14 +98,14 @@ public class PythonActionExecutor {
     }
 
     /**
-     * Execute the Python function, which may return a Python generator that 
needs to be processed
-     * in the future. Due to an issue in Pemja regarding incorrect object 
reference counting, this
-     * may lead to garbage collection of the object. To prevent this, we use 
the set and get methods
-     * to manually increment the object's reference count, then return the 
name of the Python
-     * generator variable.
+     * Execute the Python function, which may return a Python coroutine 
(awaitable) that needs to be
+     * processed in the future. Due to an issue in Pemja regarding incorrect 
object reference
+     * counting, this may lead to garbage collection of the object. To prevent 
this, we use the set
+     * and get methods to manually increment the object's reference count, 
then return the name of
+     * the Python awaitable variable.
      *
-     * @return The name of the Python generator variable. It may be null if 
the Python function does
-     *     not return a generator.
+     * @return The name of the Python awaitable variable. It may be null if 
the Python function does
+     *     not return a coroutine.
      */
     public String executePythonFunction(PythonFunction function, PythonEvent 
event, int hashOfKey)
             throws Exception {
@@ -125,12 +125,12 @@ public class PythonActionExecutor {
             if (calledResult == null) {
                 return null;
             } else {
-                // must be a generator
-                String pythonGeneratorRef =
-                        PYTHON_GENERATOR_VAR_NAME_PREFIX
-                                + PYTHON_GENERATOR_VAR_ID.incrementAndGet();
-                interpreter.set(pythonGeneratorRef, calledResult);
-                return pythonGeneratorRef;
+                // must be a coroutine (awaitable)
+                String pythonAwaitableRef =
+                        PYTHON_AWAITABLE_VAR_NAME_PREFIX
+                                + PYTHON_AWAITABLE_VAR_ID.incrementAndGet();
+                interpreter.set(pythonAwaitableRef, calledResult);
+                return pythonAwaitableRef;
             }
         } catch (Exception e) {
             runnerContext.drainEvents(null);
@@ -154,19 +154,19 @@ public class PythonActionExecutor {
     }
 
     /**
-     * Invokes the next step of a Python generator.
+     * Invokes the next step of a Python awaitable (coroutine or generator).
      *
-     * <p>This method is typically used after initializing or resuming a 
Python generator that was
+     * <p>This method is typically used after initializing or resuming a 
Python coroutine that was
      * created via a user-defined action involving asynchronous execution.
      *
-     * @param pythonGeneratorRef the reference name of the Python generator 
object stored in the
+     * @param pythonAwaitableRef the reference name of the Python awaitable 
object stored in the
      *     interpreter's context
-     * @return true if the generator has completed; false otherwise
+     * @return true if the awaitable has completed; false otherwise
      */
-    public boolean callPythonGenerator(String pythonGeneratorRef) {
-        // Calling next(generator) in Python returns a tuple of (finished, 
output).
-        Object pythonGenerator = interpreter.get(pythonGeneratorRef);
-        Object invokeResult = interpreter.invoke(CALL_PYTHON_GENERATOR, 
pythonGenerator);
+    public boolean callPythonAwaitable(String pythonAwaitableRef) {
+        // Calling awaitable.send(None) in Python returns a tuple of 
(finished, output).
+        Object pythonAwaitable = interpreter.get(pythonAwaitableRef);
+        Object invokeResult = interpreter.invoke(CALL_PYTHON_AWAITABLE, 
pythonAwaitable);
         checkState(invokeResult.getClass().isArray() && ((Object[]) 
invokeResult).length == 2);
         return (boolean) ((Object[]) invokeResult)[0];
     }

Reply via email to