fredia commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1592128250


##########
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##########
@@ -203,12 +251,29 @@ public void callbackFinished() {
     }
 
     @Override
-    public void thenSyncAccept(Consumer<? super T> action) {
-        completableFuture.thenAccept(action);
+    public void thenSyncAccept(ThrowingConsumer<? super T, ? extends 
Exception> action) {
+        completableFuture
+                .thenAccept(ThrowingConsumer.unchecked(action))
+                .exceptionally(
+                        (e) -> {
+                            exceptionHandler.handleException(
+                                    "Caught exception when processing 
completed StateFuture's callback.",
+                                    e);
+                            return null;
+                        });
     }
 
     /** The entry for a state future to submit task to mailbox. */
     public interface CallbackRunner {
-        void submit(Runnable task);
+        void submit(ThrowingRunnable<? extends Exception> task);
+    }
+
+    /**
+     * Handles an exception thrown by callback framework, borrowed idea from 
{@code
+     * AsyncExceptionHandler}.
+     */
+    public interface CallbackExceptionHandler {

Review Comment:
   Thanks for the suggestion, renamed and squashed commits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to