This is an automated email from the ASF dual-hosted git repository. hangxiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f926d2a58c73c0e4509bb96050bf84f05f5b6223 Author: wangfeifan <zoltar9...@163.com> AuthorDate: Thu May 23 16:27:03 2024 +0800 [FLINK-35434][state] Allow StateExecutor pass exception to runtime --- .../apache/flink/core/state/CompletedStateFuture.java | 5 +++++ .../apache/flink/core/state/InternalStateFuture.java | 8 ++++++++ .../org/apache/flink/core/state/StateFutureImpl.java | 19 ++++++++++++++++++- .../flink/state/forst/ForStDBOperationTestBase.java | 5 +++++ 4 files changed, 36 insertions(+), 1 deletion(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/state/CompletedStateFuture.java b/flink-core/src/main/java/org/apache/flink/core/state/CompletedStateFuture.java index 4c70ecd4ec5..4a0e61787c2 100644 --- a/flink-core/src/main/java/org/apache/flink/core/state/CompletedStateFuture.java +++ b/flink-core/src/main/java/org/apache/flink/core/state/CompletedStateFuture.java @@ -69,6 +69,11 @@ public class CompletedStateFuture<T> implements InternalStateFuture<T> { throw new UnsupportedOperationException("This state future has already been completed."); } + @Override + public void completeExceptionally(String message, Throwable ex) { + throw new UnsupportedOperationException("This state future has already been completed."); + } + @Override public void thenSyncAccept(ThrowingConsumer<? super T, ? extends Exception> action) { ThrowingConsumer.unchecked(action).accept(result); diff --git a/flink-core/src/main/java/org/apache/flink/core/state/InternalStateFuture.java b/flink-core/src/main/java/org/apache/flink/core/state/InternalStateFuture.java index ecd55708d50..b41ab839940 100644 --- a/flink-core/src/main/java/org/apache/flink/core/state/InternalStateFuture.java +++ b/flink-core/src/main/java/org/apache/flink/core/state/InternalStateFuture.java @@ -31,6 +31,14 @@ public interface InternalStateFuture<T> extends StateFuture<T> { /** Complete this future. */ void complete(T result); + /** + * Fail this future and pass the given exception to the runtime. + * + * @param message the description of this exception + * @param ex the exception + */ + void completeExceptionally(String message, Throwable ex); + /** * Accept the action in the same thread with the one of complete (or current thread if it has * been completed). diff --git a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java index 285f4e5f271..d065d65edd8 100644 --- a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java +++ b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java @@ -181,9 +181,18 @@ public class StateFutureImpl<T> implements InternalStateFuture<T> { callbackRegistered(); if (completableFuture.isDone()) { // this branch must be invoked in task thread when expected + T t; + try { + t = completableFuture.get(); + } catch (Throwable e) { + exceptionHandler.handleException( + "Caught exception when submitting StateFuture's callback.", e); + return null; + } + return other.thenCompose( (u) -> { - V v = fn.apply(completableFuture.get(), u); + V v = fn.apply(t, u); callbackFinished(); return StateFutureUtils.completedFuture(v); }); @@ -226,10 +235,18 @@ public class StateFutureImpl<T> implements InternalStateFuture<T> { @Override public void complete(T result) { + if (completableFuture.isCompletedExceptionally()) { + throw new IllegalStateException("StateFuture already failed !"); + } completableFuture.complete(result); postComplete(false); } + @Override + public void completeExceptionally(String message, Throwable ex) { + exceptionHandler.handleException(message, ex); + } + private void completeInCallbackRunner(T result) { completableFuture.complete(result); postComplete(true); diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java index 3d31737beb0..bba4777dced 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java @@ -121,6 +121,11 @@ public class ForStDBOperationTestBase { future.complete(result); } + @Override + public void completeExceptionally(String message, Throwable ex) { + throw new UnsupportedOperationException(); + } + @Override public void thenSyncAccept(ThrowingConsumer<? super T, ? extends Exception> action) { throw new UnsupportedOperationException();