Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
fredia closed pull request #24698: [FLINK-35158][runtime] Error handling in StateFuture's callback URL: https://github.com/apache/flink/pull/24698 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
fredia commented on PR #24698: URL: https://github.com/apache/flink/pull/24698#issuecomment-2099597032 Thanks for the detailed review, rebased to master. Will merge after CI green. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
yunfengzhou-hub commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1592131320 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncStateException.java: ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +/** + * An exception for wrapping exceptions that are thrown by {@link + * org.apache.flink.api.common.state.v2.StateFuture} callback framework. + */ +public class AsyncStateException extends RuntimeException { +private static final long serialVersionUID = 1L; + +public AsyncStateException(Throwable cause) { +super(cause); +} + +public AsyncStateException(String message, Throwable cause) { +super(message, cause); +} + +@Override +public String toString() { +return "StateCallbackException{" + getCause() + "}"; Review Comment: nit: StateCallbackException -> AsyncStateException -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
fredia commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1592128250 ## flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java: ## @@ -203,12 +251,29 @@ public void callbackFinished() { } @Override -public void thenSyncAccept(Consumer action) { -completableFuture.thenAccept(action); +public void thenSyncAccept(ThrowingConsumer action) { +completableFuture +.thenAccept(ThrowingConsumer.unchecked(action)) +.exceptionally( +(e) -> { +exceptionHandler.handleException( +"Caught exception when processing completed StateFuture's callback.", +e); +return null; +}); } /** The entry for a state future to submit task to mailbox. */ public interface CallbackRunner { -void submit(Runnable task); +void submit(ThrowingRunnable task); +} + +/** + * Handles an exception thrown by callback framework, borrowed idea from {@code + * AsyncExceptionHandler}. + */ +public interface CallbackExceptionHandler { Review Comment: Thanks for the suggestion, renamed and squashed commits. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
Zakelly commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1591888669 ## flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java: ## @@ -203,12 +251,29 @@ public void callbackFinished() { } @Override -public void thenSyncAccept(Consumer action) { -completableFuture.thenAccept(action); +public void thenSyncAccept(ThrowingConsumer action) { +completableFuture +.thenAccept(ThrowingConsumer.unchecked(action)) +.exceptionally( +(e) -> { +exceptionHandler.handleException( +"Caught exception when processing completed StateFuture's callback.", +e); +return null; +}); } /** The entry for a state future to submit task to mailbox. */ public interface CallbackRunner { -void submit(Runnable task); +void submit(ThrowingRunnable task); +} + +/** + * Handles an exception thrown by callback framework, borrowed idea from {@code + * AsyncExceptionHandler}. + */ +public interface CallbackExceptionHandler { Review Comment: I just went though the final commit. It seems the exception handler only handle the exception from framework? so I'd suggest change this name. BTW, it seems better to change the name of `StateCallbackException` to something like `AsyncStateException` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
fredia commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1590687424 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java: ## @@ -78,13 +81,18 @@ public void setup( this.asyncExecutionController = new AsyncExecutionController( mailboxExecutor, +this::handleStateCallbackException, null, maxParallelism, asyncBufferSize, asyncBufferTimeout, inFlightRecordsLimit); } +private void handleStateCallbackException(String message, Throwable exception) { Review Comment: Currently `StreamTask.asyncExceptionHandler::handleAsyncException` is used to handle the exceptions thrown by processing timer, here I tend to separate these two types of exceptions, so `StateCallbackException` is introduced here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
fredia commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1590680460 ## flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java: ## @@ -46,123 +46,169 @@ public class StateFutureImpl implements InternalStateFuture { /** The callback runner. */ protected final CallbackRunner callbackRunner; -public StateFutureImpl(CallbackRunner callbackRunner) { +/** The exception handler that handles callback framework's error. */ +protected final CallbackExceptionHandler exceptionHandler; + +public StateFutureImpl( +CallbackRunner callbackRunner, CallbackExceptionHandler exceptionHandler) { this.completableFuture = new CompletableFuture<>(); this.callbackRunner = callbackRunner; +this.exceptionHandler = exceptionHandler; } @Override -public StateFuture thenApply(Function fn) { +public StateFuture thenApply( +FunctionWithException fn) { callbackRegistered(); -try { -if (completableFuture.isDone()) { + +if (completableFuture.isDone()) { +try { U r = fn.apply(completableFuture.get()); callbackFinished(); return StateFutureUtils.completedFuture(r); -} else { -StateFutureImpl ret = makeNewStateFuture(); -completableFuture.thenAccept( -(t) -> { -callbackRunner.submit( -() -> { - ret.completeInCallbackRunner(fn.apply(t)); -callbackFinished(); -}); -}); -return ret; +} catch (Throwable e) { +exceptionHandler.handleException( +"Caught exception when processing completed StateFuture's callback.", e); +return null; } -} catch (Throwable e) { -throw new FlinkRuntimeException("Error binding or executing callback", e); +} else { +StateFutureImpl ret = makeNewStateFuture(); +completableFuture +.thenAccept( +(t) -> { +callbackRunner.submit( +() -> { + ret.completeInCallbackRunner(fn.apply(t)); +callbackFinished(); +}); +}) +.exceptionally( +(e) -> { +exceptionHandler.handleException( +"Caught exception when submitting StateFuture's callback.", +e); +return null; +}); +return ret; } } @Override -public StateFuture thenAccept(Consumer action) { +public StateFuture thenAccept(ThrowingConsumer action) { callbackRegistered(); -try { -if (completableFuture.isDone()) { +if (completableFuture.isDone()) { +try { action.accept(completableFuture.get()); callbackFinished(); return StateFutureUtils.completedVoidFuture(); Review Comment: Thanks for the suggestion, added some description here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
fredia commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1590679773 ## flink-core/src/test/java/org/apache/flink/core/state/StateFutureTest.java: ## @@ -37,19 +38,23 @@ /** Tests for {@link StateFuture} related implementations. */ public class StateFutureTest { +static StateFutureImpl.CallbackExceptionHandler exceptionHandler = Review Comment: It was handled by `ContextStateFutureImplTest`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
fredia commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1590678974 ## flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java: ## @@ -46,123 +46,169 @@ public class StateFutureImpl implements InternalStateFuture { /** The callback runner. */ protected final CallbackRunner callbackRunner; -public StateFutureImpl(CallbackRunner callbackRunner) { +/** The exception handler that handles callback framework's error. */ +protected final CallbackExceptionHandler exceptionHandler; + +public StateFutureImpl( +CallbackRunner callbackRunner, CallbackExceptionHandler exceptionHandler) { this.completableFuture = new CompletableFuture<>(); this.callbackRunner = callbackRunner; +this.exceptionHandler = exceptionHandler; } @Override -public StateFuture thenApply(Function fn) { +public StateFuture thenApply( +FunctionWithException fn) { callbackRegistered(); -try { -if (completableFuture.isDone()) { + +if (completableFuture.isDone()) { +try { U r = fn.apply(completableFuture.get()); callbackFinished(); return StateFutureUtils.completedFuture(r); Review Comment: The `exceptionHandler` is used to handle the framework error, such as fail to submitting to mailbox. While the exceptions thrown by `completableFuture.isDone()` branch and `CompletedStateFuture` are the exceptions of user code, they would be handled by mailbox directly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
fredia commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1590672845 ## flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java: ## @@ -203,12 +249,29 @@ public void callbackFinished() { } @Override -public void thenSyncAccept(Consumer action) { -completableFuture.thenAccept(action); +public void thenSyncAccept(ThrowingConsumer action) { +completableFuture +.thenAccept(ThrowingConsumer.unchecked(action)) +.exceptionally( +(e) -> { +exceptionHandler.handleException( +"Caught exception when processing completed StateFuture's callback.", +e); +return null; +}); } /** The entry for a state future to submit task to mailbox. */ public interface CallbackRunner { -void submit(Runnable task); +void submit(ThrowingRunnable task); +} + +/** + * Handles an exception thrown by callback framework, borrowed idea from {@code + * AsyncExceptionHandler}. + */ +public interface CallbackExceptionHandler { Review Comment: The callbacks throw RuntimeException directly without this PR. After this PR: 1. the framework exception(such as fail to submit to mailbox) would be caught by `CallbackExceptionHandler`, and finally caught by `AbstractAsyncStateStreamOperator#handleStateCallbackException`. 2. The exceptions thrown by user code or state access would be caught by mailbox. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
Zakelly commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1590597054 ## flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java: ## @@ -61,23 +66,34 @@ public StateFuture thenApply(Function fn) { return StateFutureUtils.completedFuture(r); } else { StateFutureImpl ret = makeNewStateFuture(); -completableFuture.thenAccept( -(t) -> { -callbackRunner.submit( -() -> { -ret.complete(fn.apply(t)); -callbackFinished(); -}); -}); +completableFuture +.thenAccept( +(t) -> { +callbackRunner.submit( +() -> { +ret.complete(fn.apply(t)); +callbackFinished(); +}); +}) +.exceptionally( Review Comment: thanks for the explanation, I'm fine with current implementation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
fredia commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1584530942 ## flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFuture.java: ## @@ -49,7 +49,7 @@ public interface StateFuture { * @param action the action to perform before completing the returned StateFuture. * @return the new StateFuture. */ -StateFuture thenAccept(Consumer action); +StateFuture thenAccept(ConsumerWithException action); Review Comment: IIUC, when constructing a `CompletableFuture`, the parameters must be of type `runnable`, so the **checked exceptions** must be handled in `CompletableFuture`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
yunfengzhou-hub commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1584175343 ## flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java: ## @@ -46,123 +46,169 @@ public class StateFutureImpl implements InternalStateFuture { /** The callback runner. */ protected final CallbackRunner callbackRunner; -public StateFutureImpl(CallbackRunner callbackRunner) { +/** The exception handler that handles callback framework's error. */ +protected final CallbackExceptionHandler exceptionHandler; + +public StateFutureImpl( +CallbackRunner callbackRunner, CallbackExceptionHandler exceptionHandler) { this.completableFuture = new CompletableFuture<>(); this.callbackRunner = callbackRunner; +this.exceptionHandler = exceptionHandler; } @Override -public StateFuture thenApply(Function fn) { +public StateFuture thenApply( +FunctionWithException fn) { callbackRegistered(); -try { -if (completableFuture.isDone()) { + +if (completableFuture.isDone()) { +try { U r = fn.apply(completableFuture.get()); callbackFinished(); return StateFutureUtils.completedFuture(r); -} else { -StateFutureImpl ret = makeNewStateFuture(); -completableFuture.thenAccept( -(t) -> { -callbackRunner.submit( -() -> { - ret.completeInCallbackRunner(fn.apply(t)); -callbackFinished(); -}); -}); -return ret; +} catch (Throwable e) { +exceptionHandler.handleException( +"Caught exception when processing completed StateFuture's callback.", e); +return null; } -} catch (Throwable e) { -throw new FlinkRuntimeException("Error binding or executing callback", e); +} else { +StateFutureImpl ret = makeNewStateFuture(); +completableFuture +.thenAccept( +(t) -> { +callbackRunner.submit( +() -> { + ret.completeInCallbackRunner(fn.apply(t)); +callbackFinished(); +}); +}) +.exceptionally( +(e) -> { +exceptionHandler.handleException( +"Caught exception when submitting StateFuture's callback.", +e); +return null; +}); +return ret; } } @Override -public StateFuture thenAccept(Consumer action) { +public StateFuture thenAccept(ThrowingConsumer action) { callbackRegistered(); -try { -if (completableFuture.isDone()) { +if (completableFuture.isDone()) { +try { action.accept(completableFuture.get()); callbackFinished(); return StateFutureUtils.completedVoidFuture(); Review Comment: Shall we wrap the three lines above into `callbakRunner.submit` as well? Or shall we add some description in the method's JavaDoc stating that this method must be invoked in the callbackRunner/mailboxThread? Same for other methods. ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java: ## @@ -68,7 +72,17 @@ public void postComplete(boolean inCallbackRunner) { if (inCallbackRunner) { recordContext.release(Runnable::run); } else { -recordContext.release(callbackRunner::submit); +recordContext.release( +runnable -> { +try { +ThrowingRunnable throwingRunnable = +() -> runnable.run(); Review Comment: nit: runnable::run. ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java: ## @@ -78,13 +81,18 @@ public void setup( this.asyncExecutionController = new AsyncExecutionController( mailboxExecutor, +this::handleStateCallbackException, null, maxParallelism, asyncBufferSize, asyncBufferTimeout,
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
fredia commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1582504623 ## flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFuture.java: ## @@ -49,7 +49,7 @@ public interface StateFuture { * @param action the action to perform before completing the returned StateFuture. * @return the new StateFuture. */ -StateFuture thenAccept(Consumer action); +StateFuture thenAccept(ConsumerWithException action); Review Comment: Here we classify exceptions into two categories: 1. Exceptions in user code: Users are **not forced to** handle exceptions. For example, users can handle various internal logic exceptions in callbacks, or they can directly hand them over to `thenXXX()` without handling them, and finally the exceptions will be thrown by the mailbox. 2. Exceptions in the asynchronous framework: directly let the job fail. We don't want to be completely aligned with `CompletableFuture` because `CompletableFuture` constraints must handle checked exceptions. For example, the following code is not allowed in `CompletableFuture`, but is allowed in `StateFuture`: ```Java CompletableFuture future = new CompletableFuture<>(); future.thenAccept((v) -> { throw new IOException("test"); // not allow }); StateFutureImpl stateFuture = new StateFutureImpl<>(null, exceptionHandler); stateFuture.thenAccept( (v) -> { throw new IOException("test"); // allow } ); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
yunfengzhou-hub commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1578908743 ## flink-core-api/src/main/java/org/apache/flink/util/function/ConsumerWithException.java: ## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util.function; + +import java.util.function.Consumer; + +/** + * A checked extension of the {@link Consumer} interface. + * + * @param type of the first argument + * @param type of the thrown exception + */ +public interface ConsumerWithException { Review Comment: It might be simpler to reuse `org.apache.flink.util.function.ThrowingConsumer`. ## flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFuture.java: ## @@ -49,7 +49,7 @@ public interface StateFuture { * @param action the action to perform before completing the returned StateFuture. * @return the new StateFuture. */ -StateFuture thenAccept(Consumer action); +StateFuture thenAccept(ConsumerWithException action); Review Comment: Given that java CompletableFuture use the following signatures to handle exceptions ```java CompletableFuture thenAccept(Consumer action); CompletableFuture whenComplete(BiConsumer action); ``` Would it be better to align the APIs here with CompletableFuture? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
fredia commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1582032431 ## flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java: ## @@ -61,23 +66,34 @@ public StateFuture thenApply(Function fn) { return StateFutureUtils.completedFuture(r); } else { StateFutureImpl ret = makeNewStateFuture(); -completableFuture.thenAccept( -(t) -> { -callbackRunner.submit( -() -> { -ret.complete(fn.apply(t)); -callbackFinished(); -}); -}); +completableFuture +.thenAccept( +(t) -> { +callbackRunner.submit( +() -> { +ret.complete(fn.apply(t)); +callbackFinished(); +}); +}) +.exceptionally( Review Comment: `exceptionally` and `try/catch` are equivalent, this is an explanation regarding `exceptionally`. I prefer writing it in terms of `exceptionally` to reduce nested levels. > Returns a new CompletableFuture that is completed when this CompletableFuture completes, with the result of the given function of the exception triggering this CompletableFuture's completion when it completes exceptionally; otherwise, if this CompletableFuture completes normally, then the returned CompletableFuture also completes normally with the same value -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
jectpro7 commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1579119199 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java: ## @@ -58,17 +60,35 @@ public abstract class AbstractAsyncStateStreamOperator extends AbstractStre private RecordContext currentProcessingContext; +private Environment environment; + /** Initialize necessary state components for {@link AbstractStreamOperator}. */ @Override public void setup( StreamTask containingTask, StreamConfig config, Output> output) { super.setup(containingTask, config, output); -// TODO: properly read config and setup -final MailboxExecutor mailboxExecutor = -containingTask.getEnvironment().getMainMailboxExecutor(); -this.asyncExecutionController = new AsyncExecutionController(mailboxExecutor, null); +final Environment environment = containingTask.getEnvironment(); +final MailboxExecutor mailboxExecutor = environment.getMainMailboxExecutor(); +final int inFlightRecordsLimit = + environment.getExecutionConfig().getAsyncInflightRecordsLimit(); +final int asyncBufferSize = environment.getExecutionConfig().getAsyncStateBufferSize(); +final long asyncBufferTimeout = +environment.getExecutionConfig().getAsyncStateBufferTimeout(); +// TODO: initial state executor and set state executor for aec +this.asyncExecutionController = +new AsyncExecutionController( +mailboxExecutor, +this::handleStateCallbackException, +null, +asyncBufferSize, +asyncBufferTimeout, +inFlightRecordsLimit); +} + +private void handleStateCallbackException(String message, Throwable exception) { Review Comment: Sorry my bad, I thought it is AsyncOperator for user -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
fredia commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1577788928 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -94,29 +109,83 @@ public class AsyncExecutionController { */ final AtomicInteger inFlightRecordNum; -public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { -this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); -} +/** The executor service that schedules and calls the triggers of this task. */ +ScheduledExecutorService scheduledExecutor; + +ScheduledFuture currentScheduledFuture; + +/** + * The current trigger sequence number, used to distinguish different triggers. Every time a + * trigger occurs, {@code currentTriggerSeq} increases by one. + */ +AtomicLong currentTriggerSeq; public AsyncExecutionController( MailboxExecutor mailboxExecutor, +CallbackExceptionHandler exceptionHandler, StateExecutor stateExecutor, int batchSize, +long bufferTimeout, int maxInFlightRecords) { this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); this.mailboxExecutor = mailboxExecutor; -this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor); +this.callbackExceptionHandler = exceptionHandler; +this.stateFutureFactory = +new StateFutureFactory<>(this, mailboxExecutor, callbackExceptionHandler); this.stateExecutor = stateExecutor; this.batchSize = batchSize; +this.bufferTimeout = bufferTimeout; this.maxInFlightRecordNum = maxInFlightRecords; this.stateRequestsBuffer = new StateRequestBuffer<>(); this.inFlightRecordNum = new AtomicInteger(0); +this.currentTriggerSeq = new AtomicLong(0); + +// - initialize buffer timeout --- +this.currentScheduledFuture = null; +if (bufferTimeout > 0) { +this.scheduledExecutor = +new ScheduledThreadPoolExecutor( +1, new ExecutorThreadFactory("AEC-timeout-scheduler")); Review Comment: Good suggestion, I will optimize this in https://github.com/apache/flink/pull/24667 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
fredia commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1577782892 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java: ## @@ -58,17 +60,35 @@ public abstract class AbstractAsyncStateStreamOperator extends AbstractStre private RecordContext currentProcessingContext; +private Environment environment; + /** Initialize necessary state components for {@link AbstractStreamOperator}. */ @Override public void setup( StreamTask containingTask, StreamConfig config, Output> output) { super.setup(containingTask, config, output); -// TODO: properly read config and setup -final MailboxExecutor mailboxExecutor = -containingTask.getEnvironment().getMainMailboxExecutor(); -this.asyncExecutionController = new AsyncExecutionController(mailboxExecutor, null); +final Environment environment = containingTask.getEnvironment(); +final MailboxExecutor mailboxExecutor = environment.getMainMailboxExecutor(); +final int inFlightRecordsLimit = + environment.getExecutionConfig().getAsyncInflightRecordsLimit(); +final int asyncBufferSize = environment.getExecutionConfig().getAsyncStateBufferSize(); +final long asyncBufferTimeout = +environment.getExecutionConfig().getAsyncStateBufferTimeout(); +// TODO: initial state executor and set state executor for aec +this.asyncExecutionController = +new AsyncExecutionController( +mailboxExecutor, +this::handleStateCallbackException, +null, +asyncBufferSize, +asyncBufferTimeout, +inFlightRecordsLimit); +} + +private void handleStateCallbackException(String message, Throwable exception) { Review Comment: AEC is transparent to the user, and users should not be aware of its internal implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
fredia commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r159797 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -94,29 +109,83 @@ public class AsyncExecutionController { */ final AtomicInteger inFlightRecordNum; -public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { -this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); -} +/** The executor service that schedules and calls the triggers of this task. */ +ScheduledExecutorService scheduledExecutor; + +ScheduledFuture currentScheduledFuture; + +/** + * The current trigger sequence number, used to distinguish different triggers. Every time a + * trigger occurs, {@code currentTriggerSeq} increases by one. + */ +AtomicLong currentTriggerSeq; public AsyncExecutionController( MailboxExecutor mailboxExecutor, +CallbackExceptionHandler exceptionHandler, StateExecutor stateExecutor, int batchSize, +long bufferTimeout, int maxInFlightRecords) { this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); this.mailboxExecutor = mailboxExecutor; -this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor); +this.callbackExceptionHandler = exceptionHandler; +this.stateFutureFactory = +new StateFutureFactory<>(this, mailboxExecutor, callbackExceptionHandler); this.stateExecutor = stateExecutor; this.batchSize = batchSize; +this.bufferTimeout = bufferTimeout; this.maxInFlightRecordNum = maxInFlightRecords; this.stateRequestsBuffer = new StateRequestBuffer<>(); this.inFlightRecordNum = new AtomicInteger(0); +this.currentTriggerSeq = new AtomicLong(0); + +// - initialize buffer timeout --- +this.currentScheduledFuture = null; +if (bufferTimeout > 0) { +this.scheduledExecutor = +new ScheduledThreadPoolExecutor( +1, new ExecutorThreadFactory("AEC-timeout-scheduler")); +((ScheduledThreadPoolExecutor) this.scheduledExecutor).setRemoveOnCancelPolicy(true); +// make sure shutdown removes all pending tasks +((ScheduledThreadPoolExecutor) this.scheduledExecutor) + .setContinueExistingPeriodicTasksAfterShutdownPolicy(false); +((ScheduledThreadPoolExecutor) this.scheduledExecutor) +.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); +} else { +this.scheduledExecutor = null; +} + LOG.info( -"Create AsyncExecutionController: batchSize {}, maxInFlightRecordsNum {}", +"Create AsyncExecutionController: batchSize {}, bufferTimeout {}, maxInFlightRecordsNum {}", batchSize, +bufferTimeout, maxInFlightRecords); } +void scheduleTimeout(long triggerSeq) { +if (bufferTimeout > 0) { Review Comment: Yes, It's expected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
fredia commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r158129 ## flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java: ## @@ -181,4 +182,62 @@ public class ExecutionOptions { + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs" + SORT_INPUTS.key() + " to be enabled."); + +// - Async State Execution -- + +/** + * The max limit of in-flight records number in async state execution, 'in-flight' refers to the + * records that have entered the operator but have not yet been processed and emitted to the + * downstream. If the in-flight records number exceeds the limit, the newly records entering + * will be blocked until the in-flight records number drops below the limit. + */ +@Experimental +@Documentation.ExcludeFromDocumentation( +"This is an experimental option, internal use only for now.") +public static final ConfigOption ASYNC_INFLIGHT_RECORDS_LIMIT = +ConfigOptions.key("execution.async-state.in-flight-records-limit") +.intType() +.defaultValue(6000) +.withDescription( +"The max limit of in-flight records number in async state execution, 'in-flight' refers" ++ " to the records that have entered the operator but have not yet been processed and" ++ " emitted to the downstream. If the in-flight records number exceeds the limit," ++ " the newly records entering will be blocked until the in-flight records number drops below the limit."); + +/** + * The size of buffer under async state execution. Async state execution provides a buffer + * mechanism to reduce state access. When the number of state requests in the buffer exceeds the + * batch size, a batched state execution would be triggered. Larger batch sizes will bring + * higher end-to-end latency, this option works with {@link #ASYNC_STATE_BUFFER_TIMEOUT} to + * control the frequency of triggering. + */ +@Experimental +@Documentation.ExcludeFromDocumentation( +"This is an experimental option, internal use only for now.") +public static final ConfigOption ASYNC_STATE_BUFFER_SIZE = +ConfigOptions.key("execution.async-state.buffer-size") +.intType() +.defaultValue(1000) +.withDescription( +"The size of buffer under async state execution. Async state execution provides a buffer mechanism to reduce state access." ++ " When the number of state requests in the active buffer exceeds the batch size," ++ " a batched state execution would be triggered. Larger batch sizes will bring higher end-to-end latency," ++ " this option works with 'execution.async-state.buffer-timeout' to control the frequency of triggering."); + +/** + * The timeout of buffer triggering in milliseconds. If the buffer has not reached the {@link + * #ASYNC_STATE_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a trigger will perform + * actively. + */ +@Experimental +@Documentation.ExcludeFromDocumentation( +"This is an experimental option, internal use only for now.") +public static final ConfigOption ASYNC_STATE_BUFFER_TIMEOUT = Review Comment: @jectpro7 0c177b...f985f1c are rebased from https://github.com/apache/flink/pull/24667, I will rebase this PR once https://github.com/apache/flink/pull/24667 finished. > I suggest to use something like trigger interval here In fact, the buffer is not triggered at a fixed frequency, `interval` implies a fixed rate, so I prefer to keep it as is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
Zakelly commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1577730173 ## flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java: ## @@ -61,23 +66,34 @@ public StateFuture thenApply(Function fn) { return StateFutureUtils.completedFuture(r); } else { StateFutureImpl ret = makeNewStateFuture(); -completableFuture.thenAccept( -(t) -> { -callbackRunner.submit( -() -> { -ret.complete(fn.apply(t)); -callbackFinished(); -}); -}); +completableFuture +.thenAccept( +(t) -> { +callbackRunner.submit( +() -> { +ret.complete(fn.apply(t)); +callbackFinished(); +}); +}) +.exceptionally( +(e) -> { +exceptionHandler.handleException( Review Comment: Please note that the `exceptionHandler` here is for non-recoverable errors by framework. As for the recoverable ones (like state access timeout or something), there will be automatically retry in StateExecutor. In this layer, we only focus on the future-related stuff. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
Zakelly commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1577723817 ## flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java: ## @@ -61,23 +66,34 @@ public StateFuture thenApply(Function fn) { return StateFutureUtils.completedFuture(r); } else { StateFutureImpl ret = makeNewStateFuture(); -completableFuture.thenAccept( -(t) -> { -callbackRunner.submit( -() -> { -ret.complete(fn.apply(t)); -callbackFinished(); -}); -}); +completableFuture +.thenAccept( +(t) -> { +callbackRunner.submit( +() -> { +ret.complete(fn.apply(t)); +callbackFinished(); +}); +}) +.exceptionally( Review Comment: Shall we do the `exceptionally` here? IIUC, we won't call `completeExceptionally` right? I was thinking the callback should be protected by: ``` completableFuture .thenAccept( (t) -> { try { callbackRunner.submit( () -> { ret.complete(fn.apply(t)); callbackFinished(); }); } catch (Throwable e) { exceptionHandler.handleException } }) ``` Right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
Zakelly commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1577710395 ## flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java: ## @@ -46,13 +46,18 @@ public class StateFutureImpl implements InternalStateFuture { /** The callback runner. */ protected final CallbackRunner callbackRunner; -public StateFutureImpl(CallbackRunner callbackRunner) { +protected final CallbackExceptionHandler exceptionHandler; + +public StateFutureImpl( +CallbackRunner callbackRunner, CallbackExceptionHandler exceptionHandler) { this.completableFuture = new CompletableFuture<>(); this.callbackRunner = callbackRunner; +this.exceptionHandler = exceptionHandler; } @Override -public StateFuture thenApply(Function fn) { +public StateFuture thenApply( Review Comment: Are you suggesting this? ``` thenCompose((v) -> StateFutureUtils.completedFuture(fn.apply(v))); ``` Well I'd suggest not doing so, since we will consider checkpointing the user-provided callback function, nested wrapping may make things more complex. We could consider optimize this if this doesn't affect that part. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
jectpro7 commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1577181524 ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -94,29 +109,83 @@ public class AsyncExecutionController { */ final AtomicInteger inFlightRecordNum; -public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { -this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); -} +/** The executor service that schedules and calls the triggers of this task. */ +ScheduledExecutorService scheduledExecutor; + +ScheduledFuture currentScheduledFuture; + +/** + * The current trigger sequence number, used to distinguish different triggers. Every time a + * trigger occurs, {@code currentTriggerSeq} increases by one. + */ +AtomicLong currentTriggerSeq; public AsyncExecutionController( MailboxExecutor mailboxExecutor, +CallbackExceptionHandler exceptionHandler, StateExecutor stateExecutor, int batchSize, +long bufferTimeout, int maxInFlightRecords) { this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); this.mailboxExecutor = mailboxExecutor; -this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor); +this.callbackExceptionHandler = exceptionHandler; +this.stateFutureFactory = +new StateFutureFactory<>(this, mailboxExecutor, callbackExceptionHandler); this.stateExecutor = stateExecutor; this.batchSize = batchSize; +this.bufferTimeout = bufferTimeout; this.maxInFlightRecordNum = maxInFlightRecords; this.stateRequestsBuffer = new StateRequestBuffer<>(); this.inFlightRecordNum = new AtomicInteger(0); +this.currentTriggerSeq = new AtomicLong(0); + +// - initialize buffer timeout --- +this.currentScheduledFuture = null; +if (bufferTimeout > 0) { +this.scheduledExecutor = +new ScheduledThreadPoolExecutor( +1, new ExecutorThreadFactory("AEC-timeout-scheduler")); Review Comment: Is that possible to share the `ScheduledThreadPoolExecutor` for all AEC to reduce the overhead of thread context switching? It only for scheduling and the task is non-blocking, which should run very fast. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
jectpro7 commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1576541718 ## flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java: ## @@ -46,13 +46,18 @@ public class StateFutureImpl implements InternalStateFuture { /** The callback runner. */ protected final CallbackRunner callbackRunner; -public StateFutureImpl(CallbackRunner callbackRunner) { +protected final CallbackExceptionHandler exceptionHandler; + +public StateFutureImpl( +CallbackRunner callbackRunner, CallbackExceptionHandler exceptionHandler) { this.completableFuture = new CompletableFuture<>(); this.callbackRunner = callbackRunner; +this.exceptionHandler = exceptionHandler; } @Override -public StateFuture thenApply(Function fn) { +public StateFuture thenApply( Review Comment: `thenApply` can also be simplified with `thenCompose` by wrapping`CompletedStateFuture` ## flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java: ## @@ -58,17 +60,35 @@ public abstract class AbstractAsyncStateStreamOperator extends AbstractStre private RecordContext currentProcessingContext; +private Environment environment; + /** Initialize necessary state components for {@link AbstractStreamOperator}. */ @Override public void setup( StreamTask containingTask, StreamConfig config, Output> output) { super.setup(containingTask, config, output); -// TODO: properly read config and setup -final MailboxExecutor mailboxExecutor = -containingTask.getEnvironment().getMainMailboxExecutor(); -this.asyncExecutionController = new AsyncExecutionController(mailboxExecutor, null); +final Environment environment = containingTask.getEnvironment(); +final MailboxExecutor mailboxExecutor = environment.getMainMailboxExecutor(); +final int inFlightRecordsLimit = + environment.getExecutionConfig().getAsyncInflightRecordsLimit(); +final int asyncBufferSize = environment.getExecutionConfig().getAsyncStateBufferSize(); +final long asyncBufferTimeout = +environment.getExecutionConfig().getAsyncStateBufferTimeout(); +// TODO: initial state executor and set state executor for aec +this.asyncExecutionController = +new AsyncExecutionController( +mailboxExecutor, +this::handleStateCallbackException, +null, +asyncBufferSize, +asyncBufferTimeout, +inFlightRecordsLimit); +} + +private void handleStateCallbackException(String message, Throwable exception) { Review Comment: shall we make it `protected` so the user could customize it? ## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ## @@ -94,29 +109,83 @@ public class AsyncExecutionController { */ final AtomicInteger inFlightRecordNum; -public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { -this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); -} +/** The executor service that schedules and calls the triggers of this task. */ +ScheduledExecutorService scheduledExecutor; + +ScheduledFuture currentScheduledFuture; + +/** + * The current trigger sequence number, used to distinguish different triggers. Every time a + * trigger occurs, {@code currentTriggerSeq} increases by one. + */ +AtomicLong currentTriggerSeq; public AsyncExecutionController( MailboxExecutor mailboxExecutor, +CallbackExceptionHandler exceptionHandler, StateExecutor stateExecutor, int batchSize, +long bufferTimeout, int maxInFlightRecords) { this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); this.mailboxExecutor = mailboxExecutor; -this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor); +this.callbackExceptionHandler = exceptionHandler; +this.stateFutureFactory = +new StateFutureFactory<>(this, mailboxExecutor, callbackExceptionHandler); this.stateExecutor = stateExecutor; this.batchSize = batchSize; +this.bufferTimeout = bufferTimeout; this.maxInFlightRecordNum = maxInFlightRecords; this.stateRequestsBuffer = new StateRequestBuffer<>(); this.inFlightRecordNum = new AtomicInteger(0); +this.currentTriggerSeq = new AtomicLong(0); + +// - initialize buffer timeout
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
jectpro7 commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1576511469 ## flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java: ## @@ -61,23 +66,34 @@ public StateFuture thenApply(Function fn) { return StateFutureUtils.completedFuture(r); } else { StateFutureImpl ret = makeNewStateFuture(); -completableFuture.thenAccept( -(t) -> { -callbackRunner.submit( -() -> { -ret.complete(fn.apply(t)); -callbackFinished(); -}); -}); +completableFuture +.thenAccept( +(t) -> { +callbackRunner.submit( +() -> { +ret.complete(fn.apply(t)); +callbackFinished(); +}); +}) +.exceptionally( +(e) -> { +exceptionHandler.handleException( Review Comment: Shall we allow the handler return something (e.g. default fallback value)? if the execption is recoveryable, we can continue the operation. This behaviour also matched the `exceptionally` function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
jectpro7 commented on code in PR #24698: URL: https://github.com/apache/flink/pull/24698#discussion_r1576446912 ## flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java: ## @@ -61,23 +66,34 @@ public StateFuture thenApply(Function fn) { return StateFutureUtils.completedFuture(r); } else { StateFutureImpl ret = makeNewStateFuture(); -completableFuture.thenAccept( -(t) -> { -callbackRunner.submit( -() -> { -ret.complete(fn.apply(t)); -callbackFinished(); -}); -}); +completableFuture +.thenAccept( +(t) -> { +callbackRunner.submit( +() -> { +ret.complete(fn.apply(t)); +callbackFinished(); +}); +}) +.exceptionally( +(e) -> { +exceptionHandler.handleException( +"Caught exception when submitting StateFuture's callback.", +e); +return null; +}); return ret; } } catch (Throwable e) { -throw new FlinkRuntimeException("Error binding or executing callback", e); +exceptionHandler.handleException( +"Caught exception when processing completed StateFuture's callback.", e); +return null; } } @Override -public StateFuture thenAccept(Consumer action) { +public StateFuture thenAccept( Review Comment: `thenAccept` could simplified by: ``` return thenApply((v) -> { action.accept(v); return null; ) ``` ## flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java: ## @@ -181,4 +182,62 @@ public class ExecutionOptions { + " operators. NOTE: It takes effect only in the BATCH runtime mode and requires sorted inputs" + SORT_INPUTS.key() + " to be enabled."); + +// - Async State Execution -- + +/** + * The max limit of in-flight records number in async state execution, 'in-flight' refers to the + * records that have entered the operator but have not yet been processed and emitted to the + * downstream. If the in-flight records number exceeds the limit, the newly records entering + * will be blocked until the in-flight records number drops below the limit. + */ +@Experimental +@Documentation.ExcludeFromDocumentation( +"This is an experimental option, internal use only for now.") +public static final ConfigOption ASYNC_INFLIGHT_RECORDS_LIMIT = +ConfigOptions.key("execution.async-state.in-flight-records-limit") +.intType() +.defaultValue(6000) +.withDescription( +"The max limit of in-flight records number in async state execution, 'in-flight' refers" ++ " to the records that have entered the operator but have not yet been processed and" ++ " emitted to the downstream. If the in-flight records number exceeds the limit," ++ " the newly records entering will be blocked until the in-flight records number drops below the limit."); + +/** + * The size of buffer under async state execution. Async state execution provides a buffer + * mechanism to reduce state access. When the number of state requests in the buffer exceeds the + * batch size, a batched state execution would be triggered. Larger batch sizes will bring + * higher end-to-end latency, this option works with {@link #ASYNC_STATE_BUFFER_TIMEOUT} to + * control the frequency of triggering. + */ +@Experimental +@Documentation.ExcludeFromDocumentation( +"This is an experimental option, internal use only for now.") +public static final ConfigOption ASYNC_STATE_BUFFER_SIZE = +ConfigOptions.key("execution.async-state.buffer-size") +.intType() +.defaultValue(1000) +.withDescription( +"The size of buffer under async state execution. Async
Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
flinkbot commented on PR #24698: URL: https://github.com/apache/flink/pull/24698#issuecomment-2069754200 ## CI report: * 2b98fd70bb820730897b52e71931d182ebe2d638 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]
fredia opened a new pull request, #24698: URL: https://github.com/apache/flink/pull/24698 ## What is the purpose of the change This PR implements the error handling in StateFuture's callback, making the job fail when any exception exception occurs in StateFuture's callback. ## Brief change log - Add exception handling to the implementation class of `StateFuture`/ - Wire the exception in `StateFuture`'s callback to `environment.failExternally`. ## Verifying this change This change added/updated tests and can be verified as follows: - `AsyncExecutionControllerTest#testException` - `ContextStateFutureImplTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): ( no ) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? ( no) - If yes, how is the feature documented? (JavaDocs) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org