fredia commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1590678974
########## flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java: ########## @@ -46,123 +46,169 @@ public class StateFutureImpl<T> implements InternalStateFuture<T> { /** The callback runner. */ protected final CallbackRunner callbackRunner; - public StateFutureImpl(CallbackRunner callbackRunner) { + /** The exception handler that handles callback framework's error. */ + protected final CallbackExceptionHandler exceptionHandler; + + public StateFutureImpl( + CallbackRunner callbackRunner, CallbackExceptionHandler exceptionHandler) { this.completableFuture = new CompletableFuture<>(); this.callbackRunner = callbackRunner; + this.exceptionHandler = exceptionHandler; } @Override - public <U> StateFuture<U> thenApply(Function<? super T, ? extends U> fn) { + public <U> StateFuture<U> thenApply( + FunctionWithException<? super T, ? extends U, ? extends Exception> fn) { callbackRegistered(); - try { - if (completableFuture.isDone()) { + + if (completableFuture.isDone()) { + try { U r = fn.apply(completableFuture.get()); callbackFinished(); return StateFutureUtils.completedFuture(r); Review Comment: The `exceptionHandler` is used to handle the framework error, such as fail to submitting to mailbox. While the exceptions thrown by `completableFuture.isDone()` branch and `CompletedStateFuture` are the exceptions of user code, they would be handled by mailbox directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org