This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a1bf98a [api][runtime][java] Persist reconciler exceptions as 
failures (#624)
1a1bf98a is described below

commit 1a1bf98ac13879737ffc77961c2e850da19664bc
Author: Joey Tong <[email protected]>
AuthorDate: Tue Apr 14 19:08:04 2026 +0800

    [api][runtime][java] Persist reconciler exceptions as failures (#624)
---
 .../java/org/apache/flink/agents/api/context/DurableCallable.java | 5 ++---
 .../apache/flink/agents/runtime/context/RunnerContextImpl.java    | 8 +++-----
 .../context/JavaRunnerContextImplDurableExecuteAsyncTest.java     | 8 ++++----
 .../runtime/context/RunnerContextImplDurableExecuteTest.java      | 8 ++++----
 4 files changed, 13 insertions(+), 16 deletions(-)

diff --git 
a/api/src/main/java/org/apache/flink/agents/api/context/DurableCallable.java 
b/api/src/main/java/org/apache/flink/agents/api/context/DurableCallable.java
index 73f062b1..afccf245 100644
--- a/api/src/main/java/org/apache/flink/agents/api/context/DurableCallable.java
+++ b/api/src/main/java/org/apache/flink/agents/api/context/DurableCallable.java
@@ -67,9 +67,8 @@ public interface DurableCallable<T> {
      * <ul>
      *   <li>Return the result to provide the recovered successful outcome for 
this durable call.
      *       The runtime persists and replays that recovered result.
-     *   <li>Throw an exception if reconcile cannot provide a successful 
outcome. The exception is
-     *       propagated to the caller, and the runtime does not persist a 
recovered terminal outcome
-     *       for this durable call.
+     *   <li>Throw an exception to provide the recovered failed outcome for 
this durable call. The
+     *       runtime persists and replays that recovered failure.
      * </ul>
      */
     default @Nullable Callable<T> reconciler() {
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
index 3e302854..c479c3a7 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
@@ -500,8 +500,8 @@ public class RunnerContextImpl implements RunnerContext {
      *
      * @param durableCallable durable call that provides the durable execution 
identity and result
      *     metadata
-     * @param reconcileCallable reconcile boundary used to recover a 
successful outcome from a
-     *     pending durable call
+     * @param reconcileCallable reconcile boundary used to recover a terminal 
outcome from a pending
+     *     durable call
      * @param executionCallable concrete execution boundary for the current 
path when recovery
      *     starts or restarts the original durable call
      */
@@ -541,9 +541,7 @@ public class RunnerContextImpl implements RunnerContext {
                             durableExecutionContext.getCurrentCallIndex(), 
functionId, argsDigest));
         }
 
-        T reconcileResult = reconcileCallable.call();
-        finalizeCurrentCall(functionId, argsDigest, 
serializeDurableResult(reconcileResult), null);
-        return reconcileResult;
+        return executeAndFinalizeCurrentCall(functionId, argsDigest, 
reconcileCallable);
     }
 
     protected <T> T executeAndFinalizeCurrentCall(
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/context/JavaRunnerContextImplDurableExecuteAsyncTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/context/JavaRunnerContextImplDurableExecuteAsyncTest.java
index ef3023ff..ff378fb6 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/context/JavaRunnerContextImplDurableExecuteAsyncTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/context/JavaRunnerContextImplDurableExecuteAsyncTest.java
@@ -156,7 +156,7 @@ class JavaRunnerContextImplDurableExecuteAsyncTest {
     }
 
     @Test
-    void testDurableExecuteAsyncReconcilableReconcileExceptionPropagates() 
throws Exception {
+    void 
testDurableExecuteAsyncReconcilableReconcileExceptionPersistsFailure() throws 
Exception {
         InspectingContinuationActionExecutor executor = new 
InspectingContinuationActionExecutor();
         ActionState actionState = new ActionState(null);
         actionState.addCallResult(CallResult.pending("recon-async", ""));
@@ -188,11 +188,11 @@ class JavaRunnerContextImplDurableExecuteAsyncTest {
         assertEquals(0, callable.getCallCount());
         assertEquals(1, callable.getReconcileCount());
         assertEquals(0, executor.getExecuteAsyncCallCount());
-        assertEquals(0, persistCallCount.get());
+        assertEquals(1, persistCallCount.get());
         CallResult persisted =
                 
context.getDurableExecutionContext().getActionState().getCallResults().get(0);
-        assertTrue(persisted.isPending());
-        assertEquals(0, 
context.getDurableExecutionContext().getCurrentCallIndex());
+        assertTrue(persisted.isFailure());
+        assertEquals(1, 
context.getDurableExecutionContext().getCurrentCallIndex());
     }
 
     private JavaRunnerContextImpl createContext(
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/context/RunnerContextImplDurableExecuteTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/context/RunnerContextImplDurableExecuteTest.java
index 132a7a6e..10b42dec 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/context/RunnerContextImplDurableExecuteTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/context/RunnerContextImplDurableExecuteTest.java
@@ -217,7 +217,7 @@ class RunnerContextImplDurableExecuteTest {
     }
 
     @Test
-    void testDurableExecuteReconcilableReconcileExceptionPropagates() throws 
Exception {
+    void testDurableExecuteReconcilableReconcileExceptionPersistsFailure() 
throws Exception {
         ActionState actionState = new ActionState(null);
         actionState.addCallResult(CallResult.pending("recon-call", ""));
         RunnerContextImpl context = createContext(actionState);
@@ -237,11 +237,11 @@ class RunnerContextImplDurableExecuteTest {
         assertSame(failure, thrown);
         assertEquals(0, callable.getCallCount());
         assertEquals(1, callable.getReconcileCount());
-        assertEquals(0, persistCallCount.get());
+        assertEquals(1, persistCallCount.get());
         CallResult persisted =
                 
context.getDurableExecutionContext().getActionState().getCallResults().get(0);
-        assertTrue(persisted.isPending());
-        assertEquals(0, 
context.getDurableExecutionContext().getCurrentCallIndex());
+        assertTrue(persisted.isFailure());
+        assertEquals(1, 
context.getDurableExecutionContext().getCurrentCallIndex());
     }
 
     @Test

Reply via email to