This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 84f990cc [runtime][python] Add Python durable reconciler support (#614)
84f990cc is described below
commit 84f990cc10d1718729c7d44e3cc9eb6830e62aa7
Author: Joey Tong <[email protected]>
AuthorDate: Tue Apr 14 18:57:28 2026 +0800
[runtime][python] Add Python durable reconciler support (#614)
---
python/flink_agents/api/runner_context.py | 30 ++
python/flink_agents/runtime/durable_execution.py | 75 ++++
.../flink_agents/runtime/flink_runner_context.py | 323 +++++++++++++---
python/flink_agents/runtime/local_runner.py | 16 +-
.../runtime/tests/test_durable_execution.py | 64 +++-
.../test_flink_runner_context_reconcilable.py | 410 +++++++++++++++++++++
.../tests/test_local_runner_reconcilable.py | 79 ++++
.../agents/runtime/context/RunnerContextImpl.java | 18 +
8 files changed, 965 insertions(+), 50 deletions(-)
diff --git a/python/flink_agents/api/runner_context.py
b/python/flink_agents/api/runner_context.py
index 7dfd5437..bf8d4ff9 100644
--- a/python/flink_agents/api/runner_context.py
+++ b/python/flink_agents/api/runner_context.py
@@ -196,6 +196,7 @@ class RunnerContext(ABC):
self,
func: Callable[[Any], Any],
*args: Any,
+ reconciler: Callable[[], Any] | None = None,
**kwargs: Any,
) -> Any:
"""Synchronously execute the provided function with durable execution
support.
@@ -212,6 +213,16 @@ class RunnerContext(ABC):
will always make the durable_execute call with the same arguments and
in the
same order during job recovery. Otherwise, the behavior is undefined.
+ If `reconciler` is provided, recovery invokes it only when revisiting
+ this durable call and no terminal outcome from the previous durable
+ invocation has been persisted yet. The reconciler may:
+
+ * return a result to provide the recovered successful outcome for this
+ durable call; The runtime persists and replays that recovered result
+ * raise an exception to provide the recovered failed outcome for this
+ durable call; The runtime persists and replays that recovered
+ failure
+
Usage::
def my_action(event, ctx):
@@ -224,6 +235,10 @@ class RunnerContext(ABC):
The function to be executed.
*args : Any
Positional arguments to pass to the function.
+ reconciler : Callable[[], Any] | None
+ Optional zero-argument reconciler callable used only during
recovery.
+ This is a reserved keyword-only parameter and is not forwarded to
+ `func`.
**kwargs : Any
Keyword arguments to pass to the function.
@@ -238,6 +253,7 @@ class RunnerContext(ABC):
self,
func: Callable[[Any], Any],
*args: Any,
+ reconciler: Callable[[], Any] | None = None,
**kwargs: Any,
) -> "AsyncExecutionResult":
"""Asynchronously execute the provided function with durable execution
support.
@@ -251,6 +267,16 @@ class RunnerContext(ABC):
will always make the durable_execute_async call with the same
arguments and in
the same order during job recovery. Otherwise, the behavior is
undefined.
+ If `reconciler` is provided, recovery invokes it only when revisiting
+ this durable call and no terminal outcome from the previous durable
+ invocation has been persisted yet. The reconciler may:
+
+ * return a result to provide the recovered successful outcome for this
+ durable call; The runtime persists and replays that recovered result
+ * raise an exception to provide the recovered failed outcome for this
+ durable call; The runtime persists and replays that recovered
+ failure
+
Usage::
async def my_action(event, ctx):
@@ -267,6 +293,10 @@ class RunnerContext(ABC):
The function to be executed asynchronously.
*args : Any
Positional arguments to pass to the function.
+ reconciler : Callable[[], Any] | None
+ Optional zero-argument reconciler callable used only during
recovery.
+ This is a reserved keyword-only parameter and is not forwarded to
+ `func`.
**kwargs : Any
Keyword arguments to pass to the function.
diff --git a/python/flink_agents/runtime/durable_execution.py
b/python/flink_agents/runtime/durable_execution.py
new file mode 100644
index 00000000..9db22bfa
--- /dev/null
+++ b/python/flink_agents/runtime/durable_execution.py
@@ -0,0 +1,75 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import hashlib
+import inspect
+from typing import Any, Callable
+
+import cloudpickle
+
+
+def _compute_function_id(func: Callable) -> str:
+ """Compute a stable function identifier from a callable."""
+ module_obj = inspect.getmodule(func)
+ module = (
+ module_obj.__name__
+ if module_obj is not None
+ else getattr(func, "__module__", "<unknown>")
+ )
+ qualname = getattr(func, "__qualname__", getattr(func, "__name__",
"<unknown>"))
+ return f"{module}.{qualname}"
+
+
+def _compute_args_digest(args: tuple, kwargs: dict) -> str:
+ """Compute a stable digest of the serialized arguments."""
+ try:
+ serialized = cloudpickle.dumps((args, kwargs))
+ return hashlib.sha256(serialized).hexdigest()[:16]
+ except Exception:
+ return hashlib.sha256(str((args, kwargs)).encode()).hexdigest()[:16]
+
+
+def _can_bind_call(
+ func: Callable,
+ *args: Any,
+ **kwargs: Any,
+) -> bool:
+ """Return whether the callable signature can bind the provided
arguments."""
+ try:
+ inspect.signature(func).bind(*args, **kwargs)
+ except (TypeError, ValueError):
+ return False
+ else:
+ return True
+
+
+def _validate_reconciler_callable(
+ reconciler: Callable[[], Any] | None,
+) -> Callable[[], Any] | None:
+ """Validate that the reconciler callable is either absent or
zero-argument."""
+ if reconciler is None:
+ return None
+
+ if not callable(reconciler):
+ err_msg = "reconciler must be callable"
+ raise TypeError(err_msg)
+
+ if not _can_bind_call(reconciler):
+ err_msg = "reconciler must be a callable that takes no arguments"
+ raise TypeError(err_msg)
+
+ return reconciler
diff --git a/python/flink_agents/runtime/flink_runner_context.py
b/python/flink_agents/runtime/flink_runner_context.py
index 631267fa..4daeb627 100644
--- a/python/flink_agents/runtime/flink_runner_context.py
+++ b/python/flink_agents/runtime/flink_runner_context.py
@@ -15,11 +15,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
-import hashlib
import logging
import os
from concurrent.futures import ThreadPoolExecutor
-from typing import Any, Callable, Dict
+from dataclasses import dataclass
+from functools import partial
+from typing import Any, Callable, Dict, Literal
import cloudpickle
from typing_extensions import override
@@ -34,8 +35,15 @@ from flink_agents.api.memory.long_term_memory import (
from flink_agents.api.memory_object import MemoryType
from flink_agents.api.metric_group import MetricGroup
from flink_agents.api.resource import Resource, ResourceType
-from flink_agents.api.runner_context import AsyncExecutionResult, RunnerContext
-from flink_agents.plan.agent_plan import AgentPlan
+from flink_agents.api.runner_context import (
+ AsyncExecutionResult,
+ RunnerContext,
+)
+from flink_agents.runtime.durable_execution import (
+ _compute_args_digest,
+ _compute_function_id,
+ _validate_reconciler_callable,
+)
from flink_agents.runtime.flink_memory_object import FlinkMemoryObject
from flink_agents.runtime.flink_metric_group import FlinkMetricGroup
from flink_agents.runtime.memory.internal_base_long_term_memory import (
@@ -50,6 +58,23 @@ from flink_agents.runtime.resource_cache import ResourceCache
logger = logging.getLogger(__name__)
+@dataclass(frozen=True)
+class _PersistedCallResult:
+ function_id: str
+ args_digest: str
+ status: str
+ result_payload: bytes | None
+ exception_payload: bytes | None
+
+
+@dataclass(frozen=True)
+class _ReconcilerExecutionPlan:
+ mode: Literal["replay", "execute"]
+ callable: Callable[[], Any] | None = None
+ needs_clear: bool = False
+ needs_append_pending: bool = False
+
+
class _DurableExecutionResult:
"""Wrapper that holds result and triggers recording when unwrapped."""
@@ -155,28 +180,65 @@ class _DurableAsyncExecutionResult(AsyncExecutionResult):
return result
-def _compute_function_id(func: Callable) -> str:
- """Compute a stable function identifier from a callable.
+class _ReconcilerDurableAsyncExecutionResult(AsyncExecutionResult):
+ """An AsyncExecutionResult that resolves reconciler state on await."""
- Returns module.qualname for functions/methods.
- """
- module = getattr(func, "__module__", "<unknown>")
- qualname = getattr(func, "__qualname__", getattr(func, "__name__",
"<unknown>"))
- return f"{module}.{qualname}"
+ def __init__(
+ self,
+ ctx: "FlinkRunnerContext",
+ executor: Any,
+ func: Callable,
+ args: tuple,
+ reconciler: Callable[[], Any],
+ kwargs: dict,
+ ) -> None:
+ super().__init__(executor, func, args, kwargs)
+ self._ctx = ctx
+ self._reconciler = reconciler
+
+ def __await__(self) -> Any:
+ plan = self._ctx._plan_reconciler_execution(
+ self._func,
+ self._args,
+ self._reconciler,
+ self._kwargs,
+ )
+ if plan.mode == "replay":
+ result = self._ctx._replay_terminal_call(self._func, self._args,
self._kwargs)
+ if False:
+ yield
+ return result
-def _compute_args_digest(args: tuple, kwargs: dict) -> str:
- """Compute a stable digest of the serialized arguments.
+ self._ctx._prepare_reconciler_execution(
+ plan,
+ self._func,
+ self._args,
+ self._kwargs,
+ )
- The digest is used to validate that the same arguments are passed
- during recovery as during the original execution.
- """
- try:
- serialized = cloudpickle.dumps((args, kwargs))
- return hashlib.sha256(serialized).hexdigest()[:16]
- except Exception:
- # If serialization fails, return a fallback digest
- return hashlib.sha256(str((args, kwargs)).encode()).hexdigest()[:16]
+ future = self._executor.submit(plan.callable)
+ while not future.done():
+ yield
+
+ exception = None
+ result = None
+ try:
+ result = future.result()
+ except BaseException as e:
+ exception = e
+
+ self._ctx._finalize_current_call(
+ self._func,
+ self._args,
+ self._kwargs,
+ result,
+ exception,
+ )
+
+ if exception is not None:
+ raise exception
+ return result
class FlinkRunnerContext(RunnerContext):
@@ -186,7 +248,7 @@ class FlinkRunnerContext(RunnerContext):
durable execution support through execute() and execute_async() methods.
"""
- __agent_plan: AgentPlan | None
+ __agent_plan: Any
__ltm: InternalBaseLongTermMemory = None
def __init__(
@@ -203,6 +265,8 @@ class FlinkRunnerContext(RunnerContext):
j_runner_context : Any
Java runner context used to synchronize data between Python and
Java.
"""
+ from flink_agents.plan.agent_plan import AgentPlan
+
self._j_runner_context = j_runner_context
self.__agent_plan = AgentPlan.model_validate_json(agent_plan_json)
self.__resource_cache = ResourceCache(
@@ -409,11 +473,165 @@ class FlinkRunnerContext(RunnerContext):
if "recordCallCompletion" not in str(e):
logger.warning("Failed to record call completion: %s", e)
+ @staticmethod
+ def _serialize_call_payloads(
+ result: Any,
+ exception: BaseException | None,
+ ) -> tuple[bytes | None, bytes | None]:
+ result_payload = None if exception else cloudpickle.dumps(result)
+ exception_payload = cloudpickle.dumps(exception) if exception else None
+ return result_payload, exception_payload
+
+ def _peek_current_call_result(self) -> _PersistedCallResult | None:
+ current = self._j_runner_context.getCurrentCallResultFields()
+ if current is None:
+ return None
+
+ function_id, args_digest, status, result_payload, exception_payload =
current
+ return _PersistedCallResult(
+ function_id=function_id,
+ args_digest=args_digest,
+ status=status,
+ result_payload=bytes(result_payload) if result_payload is not None
else None,
+ exception_payload=(
+ bytes(exception_payload) if exception_payload is not None else
None
+ ),
+ )
+
+ def _append_pending_call(self, func: Callable, args: tuple, kwargs: dict)
-> None:
+ self._j_runner_context.appendPendingCall(
+ _compute_function_id(func),
+ _compute_args_digest(args, kwargs),
+ )
+
+ def _finalize_current_call(
+ self,
+ func: Callable,
+ args: tuple,
+ kwargs: dict,
+ result: Any,
+ exception: BaseException | None,
+ ) -> None:
+ function_id = _compute_function_id(func)
+ args_digest = _compute_args_digest(args, kwargs)
+ result_payload, exception_payload = self._serialize_call_payloads(
+ result,
+ exception,
+ )
+ self._j_runner_context.finalizeCurrentCall(
+ function_id,
+ args_digest,
+ result_payload,
+ exception_payload,
+ )
+
+ def _clear_call_results_from_current_index_and_persist(self) -> None:
+ self._j_runner_context.clearCallResultsFromCurrentIndexAndPersist()
+
+ def _replay_terminal_call(self, func: Callable, args: tuple, kwargs: dict)
-> Any:
+ is_hit, cached_result = self._try_get_cached_result(func, args, kwargs)
+ if not is_hit:
+ err_msg = "Expected a terminal durable call result but replay did
not hit"
+ raise RuntimeError(err_msg)
+ return cached_result
+
+ def _plan_reconciler_execution(
+ self,
+ func: Callable,
+ args: tuple,
+ reconciler: Callable[[], Any],
+ kwargs: dict,
+ ) -> _ReconcilerExecutionPlan:
+ function_id = _compute_function_id(func)
+ args_digest = _compute_args_digest(args, kwargs)
+ current = self._peek_current_call_result()
+ durable_call = partial(func, *args, **kwargs)
+
+ if current is None:
+ return _ReconcilerExecutionPlan(
+ "execute",
+ callable=durable_call,
+ needs_append_pending=True,
+ )
+
+ if current.function_id != function_id or current.args_digest !=
args_digest:
+ return _ReconcilerExecutionPlan(
+ "execute",
+ callable=durable_call,
+ needs_clear=True,
+ needs_append_pending=True,
+ )
+
+ if current.status != "PENDING":
+ return _ReconcilerExecutionPlan("replay")
+
+ return _ReconcilerExecutionPlan(
+ "execute",
+ callable=reconciler,
+ )
+
+ def _prepare_reconciler_execution(
+ self,
+ plan: _ReconcilerExecutionPlan,
+ func: Callable,
+ args: tuple,
+ kwargs: dict,
+ ) -> None:
+ if plan.needs_clear:
+ self._clear_call_results_from_current_index_and_persist()
+ if plan.needs_append_pending:
+ self._append_pending_call(func, args, kwargs)
+
+ def _execute_current_pending_call(
+ self,
+ execution_callable: Callable[[], Any],
+ func: Callable,
+ args: tuple,
+ kwargs: dict,
+ ) -> Any:
+ exception = None
+ result = None
+ try:
+ result = execution_callable()
+ except BaseException as e:
+ exception = e
+
+ self._finalize_current_call(func, args, kwargs, result, exception)
+
+ if exception is not None:
+ raise exception
+ return result
+
+ def _wrap_completion_only_func(
+ self,
+ func: Callable,
+ args: tuple,
+ kwargs: dict,
+ ) -> Callable[..., Any]:
+ def wrapped_func(*a: Any, **kw: Any) -> Any:
+ exception = None
+ result = None
+ try:
+ result = func(*a, **kw)
+ except BaseException as e:
+ exception = e
+
+ if exception:
+ raise _DurableExecutionException(
+ func, args, kwargs, result, exception,
self._record_call_completion
+ )
+ return _DurableExecutionResult(
+ func, args, kwargs, result, self._record_call_completion
+ )
+
+ return wrapped_func
+
@override
def durable_execute(
self,
func: Callable[[Any], Any],
*args: Any,
+ reconciler: Callable[[], Any] | None = None,
**kwargs: Any,
) -> Any:
"""Synchronously execute the provided function with durable execution
support.
@@ -426,6 +644,26 @@ class FlinkRunnerContext(RunnerContext):
The function is executed synchronously in the current thread, blocking
the operator until completion.
"""
+ validated_reconciler = _validate_reconciler_callable(reconciler)
+
+ if validated_reconciler is not None:
+ plan = self._plan_reconciler_execution(
+ func,
+ args,
+ validated_reconciler,
+ kwargs,
+ )
+ if plan.mode == "replay":
+ return self._replay_terminal_call(func, args, kwargs)
+
+ self._prepare_reconciler_execution(plan, func, args, kwargs)
+ return self._execute_current_pending_call(
+ plan.callable,
+ func,
+ args,
+ kwargs,
+ )
+
# Try to get cached result for recovery
is_hit, cached_result = self._try_get_cached_result(func, args, kwargs)
if is_hit:
@@ -451,6 +689,7 @@ class FlinkRunnerContext(RunnerContext):
self,
func: Callable[[Any], Any],
*args: Any,
+ reconciler: Callable[[], Any] | None = None,
**kwargs: Any,
) -> AsyncExecutionResult:
"""Asynchronously execute the provided function with durable execution
support.
@@ -464,32 +703,30 @@ class FlinkRunnerContext(RunnerContext):
is awaited. Fire-and-forget calls (not awaiting the result) will NOT be
recorded and cannot be recovered.
"""
+ validated_reconciler = _validate_reconciler_callable(reconciler)
+
+ if validated_reconciler is not None:
+ return _ReconcilerDurableAsyncExecutionResult(
+ self,
+ self.executor,
+ func,
+ args,
+ validated_reconciler,
+ kwargs,
+ )
+
# Try to get cached result for recovery
is_hit, cached_result = self._try_get_cached_result(func, args, kwargs)
if is_hit:
# Return a pre-completed AsyncExecutionResult
return _CachedAsyncExecutionResult(cached_result)
- # Create a wrapper function that records completion
- def wrapped_func(*a: Any, **kw: Any) -> Any:
- exception = None
- result = None
- try:
- result = func(*a, **kw)
- except BaseException as e:
- exception = e
-
- # Note: This runs in a thread pool, so we need to be careful
- # The actual recording will happen when the result is awaited
- if exception:
- raise _DurableExecutionException(
- func, args, kwargs, result, exception,
self._record_call_completion
- )
- return _DurableExecutionResult(
- func, args, kwargs, result, self._record_call_completion
- )
-
- return _DurableAsyncExecutionResult(self.executor, wrapped_func, args,
kwargs)
+ return _DurableAsyncExecutionResult(
+ self.executor,
+ self._wrap_completion_only_func(func, args, kwargs),
+ args,
+ kwargs,
+ )
@property
@override
diff --git a/python/flink_agents/runtime/local_runner.py
b/python/flink_agents/runtime/local_runner.py
index 75a73efb..53823b61 100644
--- a/python/flink_agents/runtime/local_runner.py
+++ b/python/flink_agents/runtime/local_runner.py
@@ -20,7 +20,7 @@ import logging
import uuid
from collections import deque
from concurrent.futures import Future
-from typing import Any, Callable, Dict, List
+from typing import TYPE_CHECKING, Any, Callable, Dict, List
from typing_extensions import override
@@ -31,12 +31,14 @@ from flink_agents.api.memory_object import MemoryObject,
MemoryType
from flink_agents.api.metric_group import MetricGroup
from flink_agents.api.resource import Resource, ResourceType
from flink_agents.api.runner_context import AsyncExecutionResult, RunnerContext
-from flink_agents.plan.agent_plan import AgentPlan
from flink_agents.plan.configuration import AgentConfiguration
from flink_agents.runtime.agent_runner import AgentRunner
from flink_agents.runtime.local_memory_object import LocalMemoryObject
from flink_agents.runtime.resource_cache import ResourceCache
+if TYPE_CHECKING:
+ from flink_agents.plan.agent_plan import AgentPlan
+
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
@@ -58,7 +60,7 @@ class LocalRunnerContext(RunnerContext):
Name of the action being executed.
"""
- __agent_plan: AgentPlan | None
+ __agent_plan: Any
__key: Any
events: deque[Event]
action_name: str
@@ -69,7 +71,7 @@ class LocalRunnerContext(RunnerContext):
_config: AgentConfiguration
def __init__(
- self, agent_plan: AgentPlan, key: Any, config: AgentConfiguration
+ self, agent_plan: "AgentPlan", key: Any, config: AgentConfiguration
) -> None:
"""Initialize a new context with the given agent and key.
@@ -190,6 +192,7 @@ class LocalRunnerContext(RunnerContext):
self,
func: Callable[[Any], Any],
*args: Any,
+ reconciler: Callable[[], Any] | None = None,
**kwargs: Any,
) -> Any:
"""Synchronously execute the provided function. Access to memory
@@ -208,6 +211,7 @@ class LocalRunnerContext(RunnerContext):
self,
func: Callable[[Any], Any],
*args: Any,
+ reconciler: Callable[[], Any] | None = None,
**kwargs: Any,
) -> AsyncExecutionResult:
"""Asynchronously execute the provided function. Access to memory
@@ -271,7 +275,7 @@ class LocalRunner(AgentRunner):
Internal configration.
"""
- __agent_plan: AgentPlan
+ __agent_plan: Any
__keyed_contexts: Dict[Any, LocalRunnerContext]
__outputs: List[Dict[str, Any]]
__config: AgentConfiguration
@@ -284,6 +288,8 @@ class LocalRunner(AgentRunner):
agent : Agent
The agent class to convert and run.
"""
+ from flink_agents.plan.agent_plan import AgentPlan
+
self.__agent_plan = AgentPlan.from_agent(agent, config)
self.__keyed_contexts = {}
self.__outputs = []
diff --git a/python/flink_agents/runtime/tests/test_durable_execution.py
b/python/flink_agents/runtime/tests/test_durable_execution.py
index 080ad846..52d777a2 100644
--- a/python/flink_agents/runtime/tests/test_durable_execution.py
+++ b/python/flink_agents/runtime/tests/test_durable_execution.py
@@ -18,10 +18,12 @@
"""Tests for durable execution helper functions."""
import cloudpickle
+import pytest
-from flink_agents.runtime.flink_runner_context import (
+from flink_agents.runtime.durable_execution import (
_compute_args_digest,
_compute_function_id,
+ _validate_reconciler_callable,
)
@@ -48,6 +50,22 @@ class SampleClass:
return x * 4
+class ReconcilerCallables:
+ """Helpers for reconciler callable validation tests."""
+
+ def __init__(self, prefix: str) -> None:
+ """Store a prefix used by the helper callables."""
+ self.prefix = prefix
+
+ def bound_no_arg(self) -> str:
+ """Return a bound zero-argument reconciler result."""
+ return f"bound:{self.prefix}"
+
+ def requires_arg(self, value: int) -> str:
+ """Require an argument so validation can reject the callable."""
+ return f"{self.prefix}:{value}"
+
+
def test_compute_function_id_for_function() -> None:
"""Test function ID computation for regular functions."""
func_id = _compute_function_id(sample_function)
@@ -127,6 +145,49 @@ def test_compute_args_digest_kwargs_vs_args() -> None:
assert digest1 != digest2
+def test_validate_reconciler_callable_accepts_none() -> None:
+ """Allow omitting the reconciler callable."""
+ assert _validate_reconciler_callable(None) is None
+
+
+def test_validate_reconciler_callable_accepts_zero_arg_function() -> None:
+ """Accept a zero-argument reconciler function."""
+ def reconciler() -> str:
+ return "ok"
+
+ validated = _validate_reconciler_callable(reconciler)
+
+ assert validated is reconciler
+ assert validated() == "ok"
+
+
+def test_validate_reconciler_callable_accepts_bound_zero_arg_method() -> None:
+ """Accept a bound reconciler method with no remaining arguments."""
+ callables = ReconcilerCallables("client")
+ bound_method = callables.bound_no_arg
+
+ validated = _validate_reconciler_callable(bound_method)
+
+ assert validated is bound_method
+ assert validated() == "bound:client"
+
+
+def test_validate_reconciler_callable_requires_callable() -> None:
+ """Reject non-callable reconciler values."""
+ with pytest.raises(TypeError, match="reconciler must be callable"):
+ _validate_reconciler_callable(1) # type: ignore[arg-type]
+
+
+def test_validate_reconciler_callable_requires_zero_args() -> None:
+ """Reject reconciler callables that require arguments."""
+ callables = ReconcilerCallables("client")
+
+ with pytest.raises(
+ TypeError, match="reconciler must be a callable that takes no
arguments"
+ ):
+ _validate_reconciler_callable(callables.requires_arg)
+
+
def test_cloudpickle_serialization() -> None:
"""Test that results can be serialized and deserialized with
cloudpickle."""
# Test basic types
@@ -216,4 +277,3 @@ def test_cloudpickle_none_exception_message() -> None:
assert isinstance(deserialized, RuntimeError)
# str() of an exception with None message is "None"
assert str(deserialized) == "None"
-
diff --git
a/python/flink_agents/runtime/tests/test_flink_runner_context_reconcilable.py
b/python/flink_agents/runtime/tests/test_flink_runner_context_reconcilable.py
new file mode 100644
index 00000000..4d64eae4
--- /dev/null
+++
b/python/flink_agents/runtime/tests/test_flink_runner_context_reconcilable.py
@@ -0,0 +1,410 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import asyncio
+from concurrent.futures import ThreadPoolExecutor
+from dataclasses import dataclass
+from typing import Any, Callable
+
+import cloudpickle
+import pytest
+
+from flink_agents.runtime.durable_execution import (
+ _compute_args_digest,
+ _compute_function_id,
+)
+from flink_agents.runtime.flink_runner_context import FlinkRunnerContext
+
+
+@dataclass
+class _StoredCallResult:
+ function_id: str
+ args_digest: str
+ status: str
+ result_payload: bytes | None = None
+ exception_payload: bytes | None = None
+
+
+class _FakeJavaRunnerContext:
+ def __init__(self) -> None:
+ self.call_results: list[_StoredCallResult] = []
+ self.current_call_index = 0
+ self.operations: list[str] = []
+
+ def getCurrentCallResultFields(self) -> list[Any] | None:
+ self.operations.append("peek")
+ if self.current_call_index < len(self.call_results):
+ current = self.call_results[self.current_call_index]
+ return [
+ current.function_id,
+ current.args_digest,
+ current.status,
+ current.result_payload,
+ current.exception_payload,
+ ]
+ return None
+
+ def matchNextOrClearSubsequentCallResult(
+ self, function_id: str, args_digest: str
+ ) -> list[Any] | None:
+ self.operations.append("match")
+ if self.current_call_index < len(self.call_results):
+ current = self.call_results[self.current_call_index]
+ if (
+ current.function_id == function_id
+ and current.args_digest == args_digest
+ ):
+ self.current_call_index += 1
+ return [True, current.result_payload,
current.exception_payload]
+ self.call_results = self.call_results[: self.current_call_index]
+ return None
+
+ def recordCallCompletion(
+ self,
+ function_id: str,
+ args_digest: str,
+ result_payload: bytes | None,
+ exception_payload: bytes | None,
+ ) -> None:
+ self.operations.append("record")
+ status = "FAILED" if exception_payload is not None else "SUCCEEDED"
+ self.call_results.append(
+ _StoredCallResult(
+ function_id=function_id,
+ args_digest=args_digest,
+ status=status,
+ result_payload=result_payload,
+ exception_payload=exception_payload,
+ )
+ )
+ self.current_call_index += 1
+
+ def appendPendingCall(self, function_id: str, args_digest: str) -> None:
+ self.operations.append("append_pending")
+ self.call_results.append(
+ _StoredCallResult(
+ function_id=function_id,
+ args_digest=args_digest,
+ status="PENDING",
+ )
+ )
+
+ def finalizeCurrentCall(
+ self,
+ function_id: str,
+ args_digest: str,
+ result_payload: bytes | None,
+ exception_payload: bytes | None,
+ ) -> None:
+ self.operations.append("finalize")
+ current = self.call_results[self.current_call_index]
+ assert current.status == "PENDING"
+ assert current.function_id == function_id
+ assert current.args_digest == args_digest
+ self.call_results[self.current_call_index] = _StoredCallResult(
+ function_id=function_id,
+ args_digest=args_digest,
+ status="FAILED" if exception_payload is not None else "SUCCEEDED",
+ result_payload=result_payload,
+ exception_payload=exception_payload,
+ )
+ self.current_call_index += 1
+
+ def clearCallResultsFromCurrentIndexAndPersist(self) -> None:
+ self.operations.append("clear")
+ self.call_results = self.call_results[: self.current_call_index]
+
+
+def _create_runner_context(
+ j_runner_context: _FakeJavaRunnerContext,
+) -> FlinkRunnerContext:
+ ctx = FlinkRunnerContext.__new__(FlinkRunnerContext)
+ ctx._j_runner_context = j_runner_context
+ ctx.executor = ThreadPoolExecutor(max_workers=1)
+ ctx._FlinkRunnerContext__agent_plan = None
+ ctx._FlinkRunnerContext__ltm = None
+ return ctx
+
+
+def _close_runner_context(ctx: FlinkRunnerContext) -> None:
+ ctx.executor.shutdown(wait=True)
+
+
+def _run_async(result: Any) -> object:
+ async def _await_result() -> Any:
+ return await result
+
+ return asyncio.run(_await_result())
+
+
+def _preload_pending(
+ j_runner_context: _FakeJavaRunnerContext,
+ func: Callable[..., Any],
+ *args: Any,
+ **kwargs: Any,
+) -> None:
+ j_runner_context.call_results.append(
+ _StoredCallResult(
+ function_id=_compute_function_id(func),
+ args_digest=_compute_args_digest(args, kwargs),
+ status="PENDING",
+ )
+ )
+
+
+def _call_value(value: str) -> str:
+ return f"call:{value}"
+
+
+def test_flink_runner_context_sync_with_reconciler_executes_original_call() ->
None:
+ """Start a new durable call when no pending state exists."""
+ j_runner_context = _FakeJavaRunnerContext()
+ ctx = _create_runner_context(j_runner_context)
+ reconciler_called = False
+
+ def reconciler() -> str:
+ nonlocal reconciler_called
+ reconciler_called = True
+ return "reconciled:order-1"
+
+ try:
+ result = ctx.durable_execute(_call_value, "order-1",
reconciler=reconciler)
+ finally:
+ _close_runner_context(ctx)
+
+ assert result == "call:order-1"
+ assert reconciler_called is False
+ assert j_runner_context.operations == ["peek", "append_pending",
"finalize"]
+ assert j_runner_context.call_results[0].status == "SUCCEEDED"
+
+
+def test_flink_runner_context_sync_reconciler_success() -> None:
+ """Persist a recovered success without re-executing the original call."""
+ j_runner_context = _FakeJavaRunnerContext()
+ call_count = 0
+
+ def tracked_call(value: str) -> str:
+ nonlocal call_count
+ call_count += 1
+ return _call_value(value)
+
+ _preload_pending(j_runner_context, tracked_call, "order-1")
+ ctx = _create_runner_context(j_runner_context)
+
+ try:
+ result = ctx.durable_execute(
+ tracked_call,
+ "order-1",
+ reconciler=lambda: "reconciled:order-1",
+ )
+ finally:
+ _close_runner_context(ctx)
+
+ assert result == "reconciled:order-1"
+ assert call_count == 0
+ assert j_runner_context.operations == ["peek", "finalize"]
+ assert cloudpickle.loads(j_runner_context.call_results[0].result_payload)
== (
+ "reconciled:order-1"
+ )
+
+
+def test_flink_runner_context_sync_reconciler_exception_persists_failure() ->
None:
+ """Persist a recovered failure from the reconciler and re-raise it."""
+ j_runner_context = _FakeJavaRunnerContext()
+ call_count = 0
+
+ def tracked_call(value: str) -> str:
+ nonlocal call_count
+ call_count += 1
+ return _call_value(value)
+
+ _preload_pending(j_runner_context, tracked_call, "order-1")
+ ctx = _create_runner_context(j_runner_context)
+
+ def reconciler() -> str:
+ error_message = "failed:order-1"
+ raise ValueError(error_message)
+
+ try:
+ with pytest.raises(ValueError, match="failed:order-1"):
+ ctx.durable_execute(tracked_call, "order-1", reconciler=reconciler)
+ finally:
+ _close_runner_context(ctx)
+
+ assert call_count == 0
+ assert j_runner_context.operations == ["peek", "finalize"]
+ assert j_runner_context.call_results[0].status == "FAILED"
+ persisted_exception = cloudpickle.loads(
+ j_runner_context.call_results[0].exception_payload
+ )
+ assert isinstance(persisted_exception, ValueError)
+ assert str(persisted_exception) == "failed:order-1"
+ assert j_runner_context.current_call_index == 1
+
+
+def test_flink_runner_context_sync_reconciler_mismatch_clears_and_executes()
-> None:
+ """Clear mismatched persisted state before executing the original call."""
+ j_runner_context = _FakeJavaRunnerContext()
+ stale_result_payload = cloudpickle.dumps("stale")
+ j_runner_context.call_results.extend(
+ [
+ _StoredCallResult(
+ function_id=_compute_function_id(_call_value),
+ args_digest=_compute_args_digest(("other-order",), {}),
+ status="PENDING",
+ ),
+ _StoredCallResult(
+ function_id="stale.function",
+ args_digest="stale-args",
+ status="SUCCEEDED",
+ result_payload=stale_result_payload,
+ ),
+ ]
+ )
+ ctx = _create_runner_context(j_runner_context)
+ reconciler_called = False
+
+ def reconciler() -> str:
+ nonlocal reconciler_called
+ reconciler_called = True
+ return "reconciled:order-1"
+
+ try:
+ result = ctx.durable_execute(_call_value, "order-1",
reconciler=reconciler)
+ finally:
+ _close_runner_context(ctx)
+
+ assert result == "call:order-1"
+ assert reconciler_called is False
+ assert j_runner_context.operations == ["peek", "clear", "append_pending",
"finalize"]
+ assert len(j_runner_context.call_results) == 1
+ assert j_runner_context.call_results[0].function_id ==
_compute_function_id(_call_value)
+ assert j_runner_context.call_results[0].args_digest ==
_compute_args_digest(
+ ("order-1",), {}
+ )
+ assert j_runner_context.call_results[0].status == "SUCCEEDED"
+
+
+def test_flink_runner_context_async_writes_pending_on_await() -> None:
+ """Defer pending-state writes for async execution until await time."""
+ j_runner_context = _FakeJavaRunnerContext()
+ ctx = _create_runner_context(j_runner_context)
+ reconciler_called = False
+
+ def reconciler() -> str:
+ nonlocal reconciler_called
+ reconciler_called = True
+ return "reconciled:order-1"
+
+ try:
+ async_result = ctx.durable_execute_async(
+ _call_value,
+ "order-1",
+ reconciler=reconciler,
+ )
+ assert j_runner_context.call_results == []
+ result = _run_async(async_result)
+ finally:
+ _close_runner_context(ctx)
+
+ assert result == "call:order-1"
+ assert reconciler_called is False
+ assert j_runner_context.operations == ["peek", "append_pending",
"finalize"]
+ assert j_runner_context.call_results[0].status == "SUCCEEDED"
+
+
+def test_flink_runner_context_async_reconciler_success() -> None:
+ """Recover a successful async result through the reconciler."""
+ j_runner_context = _FakeJavaRunnerContext()
+ call_count = 0
+
+ def tracked_call(value: str) -> str:
+ nonlocal call_count
+ call_count += 1
+ return _call_value(value)
+
+ _preload_pending(j_runner_context, tracked_call, "order-1")
+ ctx = _create_runner_context(j_runner_context)
+
+ try:
+ async_result = ctx.durable_execute_async(
+ tracked_call,
+ "order-1",
+ reconciler=lambda: "reconciled:order-1",
+ )
+ result = _run_async(async_result)
+ finally:
+ _close_runner_context(ctx)
+
+ assert result == "reconciled:order-1"
+ assert call_count == 0
+ assert j_runner_context.operations == ["peek", "finalize"]
+
+
+def test_flink_runner_context_async_reconciler_exception_persists_failure() ->
None:
+ """Persist an async reconciler failure and re-raise it."""
+ j_runner_context = _FakeJavaRunnerContext()
+ call_count = 0
+
+ def tracked_call(value: str) -> str:
+ nonlocal call_count
+ call_count += 1
+ return _call_value(value)
+
+ _preload_pending(j_runner_context, tracked_call, "order-1")
+ ctx = _create_runner_context(j_runner_context)
+
+ def reconciler() -> str:
+ error_message = "reconcile unavailable"
+ raise RuntimeError(error_message)
+
+ try:
+ async_result = ctx.durable_execute_async(
+ tracked_call,
+ "order-1",
+ reconciler=reconciler,
+ )
+ with pytest.raises(RuntimeError, match="reconcile unavailable"):
+ _run_async(async_result)
+ finally:
+ _close_runner_context(ctx)
+
+ assert call_count == 0
+ assert j_runner_context.operations == ["peek", "finalize"]
+ assert j_runner_context.call_results[0].status == "FAILED"
+ persisted_exception = cloudpickle.loads(
+ j_runner_context.call_results[0].exception_payload
+ )
+ assert isinstance(persisted_exception, RuntimeError)
+ assert str(persisted_exception) == "reconcile unavailable"
+ assert j_runner_context.current_call_index == 1
+
+
+def test_flink_runner_context_reconciler_kwarg_is_not_forwarded() -> None:
+ """Keep the reserved reconciler kwarg out of the user function call."""
+ j_runner_context = _FakeJavaRunnerContext()
+ ctx = _create_runner_context(j_runner_context)
+
+ def collect_kwargs(**kwargs: Any) -> dict[str, Any]:
+ return kwargs
+
+ try:
+ result = ctx.durable_execute(collect_kwargs, reconciler=lambda:
"unused")
+ finally:
+ _close_runner_context(ctx)
+
+ assert result == {}
diff --git
a/python/flink_agents/runtime/tests/test_local_runner_reconcilable.py
b/python/flink_agents/runtime/tests/test_local_runner_reconcilable.py
new file mode 100644
index 00000000..a213423f
--- /dev/null
+++ b/python/flink_agents/runtime/tests/test_local_runner_reconcilable.py
@@ -0,0 +1,79 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+import asyncio
+from typing import Any
+
+from flink_agents.runtime.local_runner import LocalRunnerContext
+
+
+def reconciled_add(x: int, y: int) -> int:
+ """Return a simple deterministic value for local-runner tests."""
+ return x + y
+
+
+def _create_local_runner_context() -> LocalRunnerContext:
+ return LocalRunnerContext.__new__(LocalRunnerContext)
+
+
+def test_local_runner_context_reconciler_durable_execute_degrades() -> None:
+ """Keep sync local execution on the existing non-durable path."""
+ ctx = _create_local_runner_context()
+ reconciler_called = False
+
+ def reconciler() -> int:
+ nonlocal reconciler_called
+ reconciler_called = True
+ return 999
+
+ result = ctx.durable_execute(reconciled_add, 5, 10, reconciler=reconciler)
+
+ assert result == 15
+ assert reconciler_called is False
+
+
+def test_local_runner_context_reconciler_durable_execute_async_degrades() ->
None:
+ """Keep async local execution on the existing non-durable path."""
+ ctx = _create_local_runner_context()
+ reconciler_called = False
+
+ def reconciler() -> int:
+ nonlocal reconciler_called
+ reconciler_called = True
+ return 999
+
+ async_result = ctx.durable_execute_async(
+ reconciled_add, 5, 10, reconciler=reconciler
+ )
+
+ async def _await_result() -> Any:
+ return await async_result
+
+ assert asyncio.run(_await_result()) == 15
+ assert reconciler_called is False
+
+
+def test_local_runner_context_reconciler_kwarg_is_not_forwarded() -> None:
+ """Do not forward the reserved reconciler kwarg to the user function."""
+ ctx = _create_local_runner_context()
+
+ def collect_kwargs(**kwargs: Any) -> dict[str, Any]:
+ return kwargs
+
+ result = ctx.durable_execute(collect_kwargs, reconciler=lambda: "unused")
+
+ assert result == {}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
index f4d28d3b..3e302854 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
@@ -441,6 +441,24 @@ public class RunnerContextImpl implements RunnerContext {
}
}
+ /**
+ * Returns the current durable call result as an array of fields for
bridge consumers, or null
+ * if no persisted slot exists at the current call index.
+ */
+ public Object[] getCurrentCallResultFields() {
+ CallResult current = getCurrentCallResult();
+ if (current == null) {
+ return null;
+ }
+ return new Object[] {
+ current.getFunctionId(),
+ current.getArgsDigest(),
+ current.isPending() ? "PENDING" : current.isFailure() ? "FAILED" :
"SUCCEEDED",
+ current.getResultPayload(),
+ current.getExceptionPayload()
+ };
+ }
+
protected CallResult getCurrentCallResult() {
mailboxThreadChecker.run();
if (durableExecutionContext != null) {