This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f926d2a58c73c0e4509bb96050bf84f05f5b6223
Author: wangfeifan <zoltar9...@163.com>
AuthorDate: Thu May 23 16:27:03 2024 +0800

    [FLINK-35434][state] Allow StateExecutor pass exception to runtime
---
 .../apache/flink/core/state/CompletedStateFuture.java |  5 +++++
 .../apache/flink/core/state/InternalStateFuture.java  |  8 ++++++++
 .../org/apache/flink/core/state/StateFutureImpl.java  | 19 ++++++++++++++++++-
 .../flink/state/forst/ForStDBOperationTestBase.java   |  5 +++++
 4 files changed, 36 insertions(+), 1 deletion(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/state/CompletedStateFuture.java
 
b/flink-core/src/main/java/org/apache/flink/core/state/CompletedStateFuture.java
index 4c70ecd4ec5..4a0e61787c2 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/state/CompletedStateFuture.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/state/CompletedStateFuture.java
@@ -69,6 +69,11 @@ public class CompletedStateFuture<T> implements 
InternalStateFuture<T> {
         throw new UnsupportedOperationException("This state future has already 
been completed.");
     }
 
+    @Override
+    public void completeExceptionally(String message, Throwable ex) {
+        throw new UnsupportedOperationException("This state future has already 
been completed.");
+    }
+
     @Override
     public void thenSyncAccept(ThrowingConsumer<? super T, ? extends 
Exception> action) {
         ThrowingConsumer.unchecked(action).accept(result);
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/state/InternalStateFuture.java 
b/flink-core/src/main/java/org/apache/flink/core/state/InternalStateFuture.java
index ecd55708d50..b41ab839940 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/state/InternalStateFuture.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/state/InternalStateFuture.java
@@ -31,6 +31,14 @@ public interface InternalStateFuture<T> extends 
StateFuture<T> {
     /** Complete this future. */
     void complete(T result);
 
+    /**
+     * Fail this future and pass the given exception to the runtime.
+     *
+     * @param message the description of this exception
+     * @param ex the exception
+     */
+    void completeExceptionally(String message, Throwable ex);
+
     /**
      * Accept the action in the same thread with the one of complete (or 
current thread if it has
      * been completed).
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java 
b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java
index 285f4e5f271..d065d65edd8 100644
--- a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java
+++ b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java
@@ -181,9 +181,18 @@ public class StateFutureImpl<T> implements 
InternalStateFuture<T> {
         callbackRegistered();
         if (completableFuture.isDone()) {
             // this branch must be invoked in task thread when expected
+            T t;
+            try {
+                t = completableFuture.get();
+            } catch (Throwable e) {
+                exceptionHandler.handleException(
+                        "Caught exception when submitting StateFuture's 
callback.", e);
+                return null;
+            }
+
             return other.thenCompose(
                     (u) -> {
-                        V v = fn.apply(completableFuture.get(), u);
+                        V v = fn.apply(t, u);
                         callbackFinished();
                         return StateFutureUtils.completedFuture(v);
                     });
@@ -226,10 +235,18 @@ public class StateFutureImpl<T> implements 
InternalStateFuture<T> {
 
     @Override
     public void complete(T result) {
+        if (completableFuture.isCompletedExceptionally()) {
+            throw new IllegalStateException("StateFuture already failed !");
+        }
         completableFuture.complete(result);
         postComplete(false);
     }
 
+    @Override
+    public void completeExceptionally(String message, Throwable ex) {
+        exceptionHandler.handleException(message, ex);
+    }
+
     private void completeInCallbackRunner(T result) {
         completableFuture.complete(result);
         postComplete(true);
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
index 3d31737beb0..bba4777dced 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStDBOperationTestBase.java
@@ -121,6 +121,11 @@ public class ForStDBOperationTestBase {
             future.complete(result);
         }
 
+        @Override
+        public void completeExceptionally(String message, Throwable ex) {
+            throw new UnsupportedOperationException();
+        }
+
         @Override
         public void thenSyncAccept(ThrowingConsumer<? super T, ? extends 
Exception> action) {
             throw new UnsupportedOperationException();

Reply via email to