fredia commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1590672845
########## flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java: ########## @@ -203,12 +249,29 @@ public void callbackFinished() { } @Override - public void thenSyncAccept(Consumer<? super T> action) { - completableFuture.thenAccept(action); + public void thenSyncAccept(ThrowingConsumer<? super T, ? extends Exception> action) { + completableFuture + .thenAccept(ThrowingConsumer.unchecked(action)) + .exceptionally( + (e) -> { + exceptionHandler.handleException( + "Caught exception when processing completed StateFuture's callback.", + e); + return null; + }); } /** The entry for a state future to submit task to mailbox. */ public interface CallbackRunner { - void submit(Runnable task); + void submit(ThrowingRunnable<? extends Exception> task); + } + + /** + * Handles an exception thrown by callback framework, borrowed idea from {@code + * AsyncExceptionHandler}. + */ + public interface CallbackExceptionHandler { Review Comment: The callbacks throw RuntimeException directly without this PR. After this PR: 1. the framework exception(such as fail to submit to mailbox) would be caught by `CallbackExceptionHandler`, and finally caught by `AbstractAsyncStateStreamOperator#handleStateCallbackException`. 2. The exceptions thrown by user code or state access would be caught by mailbox. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org