This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 1a1bf98a [api][runtime][java] Persist reconciler exceptions as
failures (#624)
1a1bf98a is described below
commit 1a1bf98ac13879737ffc77961c2e850da19664bc
Author: Joey Tong <[email protected]>
AuthorDate: Tue Apr 14 19:08:04 2026 +0800
[api][runtime][java] Persist reconciler exceptions as failures (#624)
---
.../java/org/apache/flink/agents/api/context/DurableCallable.java | 5 ++---
.../apache/flink/agents/runtime/context/RunnerContextImpl.java | 8 +++-----
.../context/JavaRunnerContextImplDurableExecuteAsyncTest.java | 8 ++++----
.../runtime/context/RunnerContextImplDurableExecuteTest.java | 8 ++++----
4 files changed, 13 insertions(+), 16 deletions(-)
diff --git
a/api/src/main/java/org/apache/flink/agents/api/context/DurableCallable.java
b/api/src/main/java/org/apache/flink/agents/api/context/DurableCallable.java
index 73f062b1..afccf245 100644
--- a/api/src/main/java/org/apache/flink/agents/api/context/DurableCallable.java
+++ b/api/src/main/java/org/apache/flink/agents/api/context/DurableCallable.java
@@ -67,9 +67,8 @@ public interface DurableCallable<T> {
* <ul>
* <li>Return the result to provide the recovered successful outcome for
this durable call.
* The runtime persists and replays that recovered result.
- * <li>Throw an exception if reconcile cannot provide a successful
outcome. The exception is
- * propagated to the caller, and the runtime does not persist a
recovered terminal outcome
- * for this durable call.
+ * <li>Throw an exception to provide the recovered failed outcome for
this durable call. The
+ * runtime persists and replays that recovered failure.
* </ul>
*/
default @Nullable Callable<T> reconciler() {
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
index 3e302854..c479c3a7 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
@@ -500,8 +500,8 @@ public class RunnerContextImpl implements RunnerContext {
*
* @param durableCallable durable call that provides the durable execution
identity and result
* metadata
- * @param reconcileCallable reconcile boundary used to recover a
successful outcome from a
- * pending durable call
+ * @param reconcileCallable reconcile boundary used to recover a terminal
outcome from a pending
+ * durable call
* @param executionCallable concrete execution boundary for the current
path when recovery
* starts or restarts the original durable call
*/
@@ -541,9 +541,7 @@ public class RunnerContextImpl implements RunnerContext {
durableExecutionContext.getCurrentCallIndex(),
functionId, argsDigest));
}
- T reconcileResult = reconcileCallable.call();
- finalizeCurrentCall(functionId, argsDigest,
serializeDurableResult(reconcileResult), null);
- return reconcileResult;
+ return executeAndFinalizeCurrentCall(functionId, argsDigest,
reconcileCallable);
}
protected <T> T executeAndFinalizeCurrentCall(
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/context/JavaRunnerContextImplDurableExecuteAsyncTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/context/JavaRunnerContextImplDurableExecuteAsyncTest.java
index ef3023ff..ff378fb6 100644
---
a/runtime/src/test/java/org/apache/flink/agents/runtime/context/JavaRunnerContextImplDurableExecuteAsyncTest.java
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/context/JavaRunnerContextImplDurableExecuteAsyncTest.java
@@ -156,7 +156,7 @@ class JavaRunnerContextImplDurableExecuteAsyncTest {
}
@Test
- void testDurableExecuteAsyncReconcilableReconcileExceptionPropagates()
throws Exception {
+ void
testDurableExecuteAsyncReconcilableReconcileExceptionPersistsFailure() throws
Exception {
InspectingContinuationActionExecutor executor = new
InspectingContinuationActionExecutor();
ActionState actionState = new ActionState(null);
actionState.addCallResult(CallResult.pending("recon-async", ""));
@@ -188,11 +188,11 @@ class JavaRunnerContextImplDurableExecuteAsyncTest {
assertEquals(0, callable.getCallCount());
assertEquals(1, callable.getReconcileCount());
assertEquals(0, executor.getExecuteAsyncCallCount());
- assertEquals(0, persistCallCount.get());
+ assertEquals(1, persistCallCount.get());
CallResult persisted =
context.getDurableExecutionContext().getActionState().getCallResults().get(0);
- assertTrue(persisted.isPending());
- assertEquals(0,
context.getDurableExecutionContext().getCurrentCallIndex());
+ assertTrue(persisted.isFailure());
+ assertEquals(1,
context.getDurableExecutionContext().getCurrentCallIndex());
}
private JavaRunnerContextImpl createContext(
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/context/RunnerContextImplDurableExecuteTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/context/RunnerContextImplDurableExecuteTest.java
index 132a7a6e..10b42dec 100644
---
a/runtime/src/test/java/org/apache/flink/agents/runtime/context/RunnerContextImplDurableExecuteTest.java
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/context/RunnerContextImplDurableExecuteTest.java
@@ -217,7 +217,7 @@ class RunnerContextImplDurableExecuteTest {
}
@Test
- void testDurableExecuteReconcilableReconcileExceptionPropagates() throws
Exception {
+ void testDurableExecuteReconcilableReconcileExceptionPersistsFailure()
throws Exception {
ActionState actionState = new ActionState(null);
actionState.addCallResult(CallResult.pending("recon-call", ""));
RunnerContextImpl context = createContext(actionState);
@@ -237,11 +237,11 @@ class RunnerContextImplDurableExecuteTest {
assertSame(failure, thrown);
assertEquals(0, callable.getCallCount());
assertEquals(1, callable.getReconcileCount());
- assertEquals(0, persistCallCount.get());
+ assertEquals(1, persistCallCount.get());
CallResult persisted =
context.getDurableExecutionContext().getActionState().getCallResults().get(0);
- assertTrue(persisted.isPending());
- assertEquals(0,
context.getDurableExecutionContext().getCurrentCallIndex());
+ assertTrue(persisted.isFailure());
+ assertEquals(1,
context.getDurableExecutionContext().getCurrentCallIndex());
}
@Test