Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-05-08 Thread via GitHub


fredia closed pull request #24698: [FLINK-35158][runtime] Error handling in 
StateFuture's callback
URL: https://github.com/apache/flink/pull/24698


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-05-07 Thread via GitHub


fredia commented on PR #24698:
URL: https://github.com/apache/flink/pull/24698#issuecomment-2099597032

   Thanks for the detailed review, rebased to master.  Will merge after CI 
green.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-05-07 Thread via GitHub


yunfengzhou-hub commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1592131320


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncStateException.java:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.asyncprocessing;
+
+/**
+ * An exception for wrapping exceptions that are thrown by {@link
+ * org.apache.flink.api.common.state.v2.StateFuture} callback framework.
+ */
+public class AsyncStateException extends RuntimeException {
+private static final long serialVersionUID = 1L;
+
+public AsyncStateException(Throwable cause) {
+super(cause);
+}
+
+public AsyncStateException(String message, Throwable cause) {
+super(message, cause);
+}
+
+@Override
+public String toString() {
+return "StateCallbackException{" + getCause() + "}";

Review Comment:
   nit: StateCallbackException -> AsyncStateException



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-05-07 Thread via GitHub


fredia commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1592128250


##
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##
@@ -203,12 +251,29 @@ public void callbackFinished() {
 }
 
 @Override
-public void thenSyncAccept(Consumer action) {
-completableFuture.thenAccept(action);
+public void thenSyncAccept(ThrowingConsumer action) {
+completableFuture
+.thenAccept(ThrowingConsumer.unchecked(action))
+.exceptionally(
+(e) -> {
+exceptionHandler.handleException(
+"Caught exception when processing 
completed StateFuture's callback.",
+e);
+return null;
+});
 }
 
 /** The entry for a state future to submit task to mailbox. */
 public interface CallbackRunner {
-void submit(Runnable task);
+void submit(ThrowingRunnable task);
+}
+
+/**
+ * Handles an exception thrown by callback framework, borrowed idea from 
{@code
+ * AsyncExceptionHandler}.
+ */
+public interface CallbackExceptionHandler {

Review Comment:
   Thanks for the suggestion, renamed and squashed commits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-05-07 Thread via GitHub


Zakelly commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1591888669


##
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##
@@ -203,12 +251,29 @@ public void callbackFinished() {
 }
 
 @Override
-public void thenSyncAccept(Consumer action) {
-completableFuture.thenAccept(action);
+public void thenSyncAccept(ThrowingConsumer action) {
+completableFuture
+.thenAccept(ThrowingConsumer.unchecked(action))
+.exceptionally(
+(e) -> {
+exceptionHandler.handleException(
+"Caught exception when processing 
completed StateFuture's callback.",
+e);
+return null;
+});
 }
 
 /** The entry for a state future to submit task to mailbox. */
 public interface CallbackRunner {
-void submit(Runnable task);
+void submit(ThrowingRunnable task);
+}
+
+/**
+ * Handles an exception thrown by callback framework, borrowed idea from 
{@code
+ * AsyncExceptionHandler}.
+ */
+public interface CallbackExceptionHandler {

Review Comment:
   I just went though the final commit. It seems the exception handler only 
handle the exception from framework? so I'd suggest change this name.
   
   BTW, it seems better to change the name of `StateCallbackException` to 
something like `AsyncStateException`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-05-06 Thread via GitHub


fredia commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1590687424


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java:
##
@@ -78,13 +81,18 @@ public void setup(
 this.asyncExecutionController =
 new AsyncExecutionController(
 mailboxExecutor,
+this::handleStateCallbackException,
 null,
 maxParallelism,
 asyncBufferSize,
 asyncBufferTimeout,
 inFlightRecordsLimit);
 }
 
+private void handleStateCallbackException(String message, Throwable 
exception) {

Review Comment:
   Currently `StreamTask.asyncExceptionHandler::handleAsyncException` is used 
to handle the exceptions thrown by processing timer, here I tend to separate 
these two types of exceptions, so `StateCallbackException` is introduced here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-05-06 Thread via GitHub


fredia commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1590680460


##
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##
@@ -46,123 +46,169 @@ public class StateFutureImpl implements 
InternalStateFuture {
 /** The callback runner. */
 protected final CallbackRunner callbackRunner;
 
-public StateFutureImpl(CallbackRunner callbackRunner) {
+/** The exception handler that handles callback framework's error. */
+protected final CallbackExceptionHandler exceptionHandler;
+
+public StateFutureImpl(
+CallbackRunner callbackRunner, CallbackExceptionHandler 
exceptionHandler) {
 this.completableFuture = new CompletableFuture<>();
 this.callbackRunner = callbackRunner;
+this.exceptionHandler = exceptionHandler;
 }
 
 @Override
-public  StateFuture thenApply(Function fn) {
+public  StateFuture thenApply(
+FunctionWithException 
fn) {
 callbackRegistered();
-try {
-if (completableFuture.isDone()) {
+
+if (completableFuture.isDone()) {
+try {
 U r = fn.apply(completableFuture.get());
 callbackFinished();
 return StateFutureUtils.completedFuture(r);
-} else {
-StateFutureImpl ret = makeNewStateFuture();
-completableFuture.thenAccept(
-(t) -> {
-callbackRunner.submit(
-() -> {
-
ret.completeInCallbackRunner(fn.apply(t));
-callbackFinished();
-});
-});
-return ret;
+} catch (Throwable e) {
+exceptionHandler.handleException(
+"Caught exception when processing completed 
StateFuture's callback.", e);
+return null;
 }
-} catch (Throwable e) {
-throw new FlinkRuntimeException("Error binding or executing 
callback", e);
+} else {
+StateFutureImpl ret = makeNewStateFuture();
+completableFuture
+.thenAccept(
+(t) -> {
+callbackRunner.submit(
+() -> {
+
ret.completeInCallbackRunner(fn.apply(t));
+callbackFinished();
+});
+})
+.exceptionally(
+(e) -> {
+exceptionHandler.handleException(
+"Caught exception when submitting 
StateFuture's callback.",
+e);
+return null;
+});
+return ret;
 }
 }
 
 @Override
-public StateFuture thenAccept(Consumer action) {
+public StateFuture thenAccept(ThrowingConsumer action) {
 callbackRegistered();
-try {
-if (completableFuture.isDone()) {
+if (completableFuture.isDone()) {
+try {
 action.accept(completableFuture.get());
 callbackFinished();
 return StateFutureUtils.completedVoidFuture();

Review Comment:
   Thanks for the suggestion, added some description here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-05-06 Thread via GitHub


fredia commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1590679773


##
flink-core/src/test/java/org/apache/flink/core/state/StateFutureTest.java:
##
@@ -37,19 +38,23 @@
 
 /** Tests for {@link StateFuture} related implementations. */
 public class StateFutureTest {
+static StateFutureImpl.CallbackExceptionHandler exceptionHandler =

Review Comment:
   It was handled by `ContextStateFutureImplTest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-05-06 Thread via GitHub


fredia commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1590678974


##
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##
@@ -46,123 +46,169 @@ public class StateFutureImpl implements 
InternalStateFuture {
 /** The callback runner. */
 protected final CallbackRunner callbackRunner;
 
-public StateFutureImpl(CallbackRunner callbackRunner) {
+/** The exception handler that handles callback framework's error. */
+protected final CallbackExceptionHandler exceptionHandler;
+
+public StateFutureImpl(
+CallbackRunner callbackRunner, CallbackExceptionHandler 
exceptionHandler) {
 this.completableFuture = new CompletableFuture<>();
 this.callbackRunner = callbackRunner;
+this.exceptionHandler = exceptionHandler;
 }
 
 @Override
-public  StateFuture thenApply(Function fn) {
+public  StateFuture thenApply(
+FunctionWithException 
fn) {
 callbackRegistered();
-try {
-if (completableFuture.isDone()) {
+
+if (completableFuture.isDone()) {
+try {
 U r = fn.apply(completableFuture.get());
 callbackFinished();
 return StateFutureUtils.completedFuture(r);

Review Comment:
   The `exceptionHandler` is used to handle the framework error, such as fail 
to submitting to mailbox.
   
   While the exceptions thrown by `completableFuture.isDone()` branch and 
`CompletedStateFuture` are the exceptions of user code, they would be handled 
by mailbox directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-05-06 Thread via GitHub


fredia commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1590672845


##
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##
@@ -203,12 +249,29 @@ public void callbackFinished() {
 }
 
 @Override
-public void thenSyncAccept(Consumer action) {
-completableFuture.thenAccept(action);
+public void thenSyncAccept(ThrowingConsumer action) {
+completableFuture
+.thenAccept(ThrowingConsumer.unchecked(action))
+.exceptionally(
+(e) -> {
+exceptionHandler.handleException(
+"Caught exception when processing 
completed StateFuture's callback.",
+e);
+return null;
+});
 }
 
 /** The entry for a state future to submit task to mailbox. */
 public interface CallbackRunner {
-void submit(Runnable task);
+void submit(ThrowingRunnable task);
+}
+
+/**
+ * Handles an exception thrown by callback framework, borrowed idea from 
{@code
+ * AsyncExceptionHandler}.
+ */
+public interface CallbackExceptionHandler {

Review Comment:
   The callbacks throw RuntimeException directly without this PR.
   After this PR:
   1. the framework exception(such as fail to submit to mailbox) would be 
caught by `CallbackExceptionHandler`, and finally caught by 
`AbstractAsyncStateStreamOperator#handleStateCallbackException`.
   2. The exceptions thrown by user code or state access would be caught by 
mailbox.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-05-06 Thread via GitHub


Zakelly commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1590597054


##
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##
@@ -61,23 +66,34 @@ public  StateFuture thenApply(Function fn) {
 return StateFutureUtils.completedFuture(r);
 } else {
 StateFutureImpl ret = makeNewStateFuture();
-completableFuture.thenAccept(
-(t) -> {
-callbackRunner.submit(
-() -> {
-ret.complete(fn.apply(t));
-callbackFinished();
-});
-});
+completableFuture
+.thenAccept(
+(t) -> {
+callbackRunner.submit(
+() -> {
+ret.complete(fn.apply(t));
+callbackFinished();
+});
+})
+.exceptionally(

Review Comment:
   thanks for the explanation, I'm fine with current implementation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-04-30 Thread via GitHub


fredia commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1584530942


##
flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFuture.java:
##
@@ -49,7 +49,7 @@ public interface StateFuture {
  * @param action the action to perform before completing the returned 
StateFuture.
  * @return the new StateFuture.
  */
-StateFuture thenAccept(Consumer action);
+StateFuture thenAccept(ConsumerWithException action);

Review Comment:
   IIUC, when constructing a `CompletableFuture`,  the parameters must be of 
type `runnable`, so the **checked exceptions** must be handled in 
`CompletableFuture`.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-04-30 Thread via GitHub


yunfengzhou-hub commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1584175343


##
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##
@@ -46,123 +46,169 @@ public class StateFutureImpl implements 
InternalStateFuture {
 /** The callback runner. */
 protected final CallbackRunner callbackRunner;
 
-public StateFutureImpl(CallbackRunner callbackRunner) {
+/** The exception handler that handles callback framework's error. */
+protected final CallbackExceptionHandler exceptionHandler;
+
+public StateFutureImpl(
+CallbackRunner callbackRunner, CallbackExceptionHandler 
exceptionHandler) {
 this.completableFuture = new CompletableFuture<>();
 this.callbackRunner = callbackRunner;
+this.exceptionHandler = exceptionHandler;
 }
 
 @Override
-public  StateFuture thenApply(Function fn) {
+public  StateFuture thenApply(
+FunctionWithException 
fn) {
 callbackRegistered();
-try {
-if (completableFuture.isDone()) {
+
+if (completableFuture.isDone()) {
+try {
 U r = fn.apply(completableFuture.get());
 callbackFinished();
 return StateFutureUtils.completedFuture(r);
-} else {
-StateFutureImpl ret = makeNewStateFuture();
-completableFuture.thenAccept(
-(t) -> {
-callbackRunner.submit(
-() -> {
-
ret.completeInCallbackRunner(fn.apply(t));
-callbackFinished();
-});
-});
-return ret;
+} catch (Throwable e) {
+exceptionHandler.handleException(
+"Caught exception when processing completed 
StateFuture's callback.", e);
+return null;
 }
-} catch (Throwable e) {
-throw new FlinkRuntimeException("Error binding or executing 
callback", e);
+} else {
+StateFutureImpl ret = makeNewStateFuture();
+completableFuture
+.thenAccept(
+(t) -> {
+callbackRunner.submit(
+() -> {
+
ret.completeInCallbackRunner(fn.apply(t));
+callbackFinished();
+});
+})
+.exceptionally(
+(e) -> {
+exceptionHandler.handleException(
+"Caught exception when submitting 
StateFuture's callback.",
+e);
+return null;
+});
+return ret;
 }
 }
 
 @Override
-public StateFuture thenAccept(Consumer action) {
+public StateFuture thenAccept(ThrowingConsumer action) {
 callbackRegistered();
-try {
-if (completableFuture.isDone()) {
+if (completableFuture.isDone()) {
+try {
 action.accept(completableFuture.get());
 callbackFinished();
 return StateFutureUtils.completedVoidFuture();

Review Comment:
   Shall we wrap the three lines above into `callbakRunner.submit` as well? Or 
shall we add some description in the method's JavaDoc stating that this method 
must be invoked in the callbackRunner/mailboxThread? Same for other methods.



##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java:
##
@@ -68,7 +72,17 @@ public void postComplete(boolean inCallbackRunner) {
 if (inCallbackRunner) {
 recordContext.release(Runnable::run);
 } else {
-recordContext.release(callbackRunner::submit);
+recordContext.release(
+runnable -> {
+try {
+ThrowingRunnable 
throwingRunnable =
+() -> runnable.run();

Review Comment:
   nit: runnable::run.



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java:
##
@@ -78,13 +81,18 @@ public void setup(
 this.asyncExecutionController =
 new AsyncExecutionController(
 mailboxExecutor,
+this::handleStateCallbackException,
 null,
 maxParallelism,
 asyncBufferSize,
 asyncBufferTimeout,
   

Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-04-28 Thread via GitHub


fredia commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1582504623


##
flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFuture.java:
##
@@ -49,7 +49,7 @@ public interface StateFuture {
  * @param action the action to perform before completing the returned 
StateFuture.
  * @return the new StateFuture.
  */
-StateFuture thenAccept(Consumer action);
+StateFuture thenAccept(ConsumerWithException action);

Review Comment:
   Here we classify exceptions into two categories:
   1. Exceptions in user code: Users are **not forced to** handle exceptions. 
For example, users can handle various internal logic exceptions in callbacks, 
or they can directly hand them over to `thenXXX()` without handling them, and 
finally the exceptions will be thrown by the mailbox.
   2. Exceptions in the asynchronous framework: directly let the job fail.
   
   We don't want to be completely aligned with `CompletableFuture` because 
`CompletableFuture` constraints must handle checked exceptions. For example, 
the following code is not allowed in `CompletableFuture`, but is allowed in 
`StateFuture`:
   
   ```Java
   CompletableFuture future = new CompletableFuture<>();
   future.thenAccept((v) -> {
   throw new IOException("test");  // not allow
   });
   StateFutureImpl stateFuture = new StateFutureImpl<>(null, 
exceptionHandler);
   stateFuture.thenAccept(
   (v) -> {
   throw new IOException("test"); // allow
   }
   );
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-04-28 Thread via GitHub


yunfengzhou-hub commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1578908743


##
flink-core-api/src/main/java/org/apache/flink/util/function/ConsumerWithException.java:
##
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util.function;
+
+import java.util.function.Consumer;
+
+/**
+ * A checked extension of the {@link Consumer} interface.
+ *
+ * @param  type of the first argument
+ * @param  type of the thrown exception
+ */
+public interface ConsumerWithException {

Review Comment:
   It might be simpler to reuse 
`org.apache.flink.util.function.ThrowingConsumer`.



##
flink-core-api/src/main/java/org/apache/flink/api/common/state/v2/StateFuture.java:
##
@@ -49,7 +49,7 @@ public interface StateFuture {
  * @param action the action to perform before completing the returned 
StateFuture.
  * @return the new StateFuture.
  */
-StateFuture thenAccept(Consumer action);
+StateFuture thenAccept(ConsumerWithException action);

Review Comment:
   Given that java CompletableFuture use the following signatures to handle 
exceptions
   ```java
   CompletableFuture thenAccept(Consumer action);
   CompletableFuture whenComplete(BiConsumer 
action);
   ```
   Would it be better to align the APIs here with CompletableFuture?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-04-28 Thread via GitHub


fredia commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1582032431


##
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##
@@ -61,23 +66,34 @@ public  StateFuture thenApply(Function fn) {
 return StateFutureUtils.completedFuture(r);
 } else {
 StateFutureImpl ret = makeNewStateFuture();
-completableFuture.thenAccept(
-(t) -> {
-callbackRunner.submit(
-() -> {
-ret.complete(fn.apply(t));
-callbackFinished();
-});
-});
+completableFuture
+.thenAccept(
+(t) -> {
+callbackRunner.submit(
+() -> {
+ret.complete(fn.apply(t));
+callbackFinished();
+});
+})
+.exceptionally(

Review Comment:
   `exceptionally` and `try/catch` are equivalent, this is an explanation 
regarding `exceptionally`. I prefer writing it in terms of `exceptionally` to 
reduce nested levels.
   
   > Returns a new CompletableFuture that is completed when this 
CompletableFuture completes, with the result of the given function of the 
exception triggering this CompletableFuture's completion when it completes 
exceptionally; otherwise, if this CompletableFuture completes normally, then 
the returned CompletableFuture also completes normally with the same value



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-04-25 Thread via GitHub


jectpro7 commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1579119199


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java:
##
@@ -58,17 +60,35 @@ public abstract class AbstractAsyncStateStreamOperator 
extends AbstractStre
 
 private RecordContext currentProcessingContext;
 
+private Environment environment;
+
 /** Initialize necessary state components for {@link 
AbstractStreamOperator}. */
 @Override
 public void setup(
 StreamTask containingTask,
 StreamConfig config,
 Output> output) {
 super.setup(containingTask, config, output);
-// TODO: properly read config and setup
-final MailboxExecutor mailboxExecutor =
-containingTask.getEnvironment().getMainMailboxExecutor();
-this.asyncExecutionController = new 
AsyncExecutionController(mailboxExecutor, null);
+final Environment environment = containingTask.getEnvironment();
+final MailboxExecutor mailboxExecutor = 
environment.getMainMailboxExecutor();
+final int inFlightRecordsLimit =
+
environment.getExecutionConfig().getAsyncInflightRecordsLimit();
+final int asyncBufferSize = 
environment.getExecutionConfig().getAsyncStateBufferSize();
+final long asyncBufferTimeout =
+environment.getExecutionConfig().getAsyncStateBufferTimeout();
+// TODO: initial state executor and set state executor for aec
+this.asyncExecutionController =
+new AsyncExecutionController(
+mailboxExecutor,
+this::handleStateCallbackException,
+null,
+asyncBufferSize,
+asyncBufferTimeout,
+inFlightRecordsLimit);
+}
+
+private void handleStateCallbackException(String message, Throwable 
exception) {

Review Comment:
   Sorry my bad, I thought it is AsyncOperator for user



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-04-24 Thread via GitHub


fredia commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1577788928


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,29 +109,83 @@ public class AsyncExecutionController {
  */
 final AtomicInteger inFlightRecordNum;
 
-public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
-}
+/** The executor service that schedules and calls the triggers of this 
task. */
+ScheduledExecutorService scheduledExecutor;
+
+ScheduledFuture currentScheduledFuture;
+
+/**
+ * The current trigger sequence number, used to distinguish different 
triggers. Every time a
+ * trigger occurs, {@code currentTriggerSeq} increases by one.
+ */
+AtomicLong currentTriggerSeq;
 
 public AsyncExecutionController(
 MailboxExecutor mailboxExecutor,
+CallbackExceptionHandler exceptionHandler,
 StateExecutor stateExecutor,
 int batchSize,
+long bufferTimeout,
 int maxInFlightRecords) {
 this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
 this.mailboxExecutor = mailboxExecutor;
-this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
+this.callbackExceptionHandler = exceptionHandler;
+this.stateFutureFactory =
+new StateFutureFactory<>(this, mailboxExecutor, 
callbackExceptionHandler);
 this.stateExecutor = stateExecutor;
 this.batchSize = batchSize;
+this.bufferTimeout = bufferTimeout;
 this.maxInFlightRecordNum = maxInFlightRecords;
 this.stateRequestsBuffer = new StateRequestBuffer<>();
 this.inFlightRecordNum = new AtomicInteger(0);
+this.currentTriggerSeq = new AtomicLong(0);
+
+// - initialize buffer timeout ---
+this.currentScheduledFuture = null;
+if (bufferTimeout > 0) {
+this.scheduledExecutor =
+new ScheduledThreadPoolExecutor(
+1, new 
ExecutorThreadFactory("AEC-timeout-scheduler"));

Review Comment:
    Good suggestion, I will optimize this in 
https://github.com/apache/flink/pull/24667



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-04-24 Thread via GitHub


fredia commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1577782892


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java:
##
@@ -58,17 +60,35 @@ public abstract class AbstractAsyncStateStreamOperator 
extends AbstractStre
 
 private RecordContext currentProcessingContext;
 
+private Environment environment;
+
 /** Initialize necessary state components for {@link 
AbstractStreamOperator}. */
 @Override
 public void setup(
 StreamTask containingTask,
 StreamConfig config,
 Output> output) {
 super.setup(containingTask, config, output);
-// TODO: properly read config and setup
-final MailboxExecutor mailboxExecutor =
-containingTask.getEnvironment().getMainMailboxExecutor();
-this.asyncExecutionController = new 
AsyncExecutionController(mailboxExecutor, null);
+final Environment environment = containingTask.getEnvironment();
+final MailboxExecutor mailboxExecutor = 
environment.getMainMailboxExecutor();
+final int inFlightRecordsLimit =
+
environment.getExecutionConfig().getAsyncInflightRecordsLimit();
+final int asyncBufferSize = 
environment.getExecutionConfig().getAsyncStateBufferSize();
+final long asyncBufferTimeout =
+environment.getExecutionConfig().getAsyncStateBufferTimeout();
+// TODO: initial state executor and set state executor for aec
+this.asyncExecutionController =
+new AsyncExecutionController(
+mailboxExecutor,
+this::handleStateCallbackException,
+null,
+asyncBufferSize,
+asyncBufferTimeout,
+inFlightRecordsLimit);
+}
+
+private void handleStateCallbackException(String message, Throwable 
exception) {

Review Comment:
   AEC is transparent to the user, and users should not be aware of its 
internal implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-04-24 Thread via GitHub


fredia commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r159797


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,29 +109,83 @@ public class AsyncExecutionController {
  */
 final AtomicInteger inFlightRecordNum;
 
-public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
-}
+/** The executor service that schedules and calls the triggers of this 
task. */
+ScheduledExecutorService scheduledExecutor;
+
+ScheduledFuture currentScheduledFuture;
+
+/**
+ * The current trigger sequence number, used to distinguish different 
triggers. Every time a
+ * trigger occurs, {@code currentTriggerSeq} increases by one.
+ */
+AtomicLong currentTriggerSeq;
 
 public AsyncExecutionController(
 MailboxExecutor mailboxExecutor,
+CallbackExceptionHandler exceptionHandler,
 StateExecutor stateExecutor,
 int batchSize,
+long bufferTimeout,
 int maxInFlightRecords) {
 this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
 this.mailboxExecutor = mailboxExecutor;
-this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
+this.callbackExceptionHandler = exceptionHandler;
+this.stateFutureFactory =
+new StateFutureFactory<>(this, mailboxExecutor, 
callbackExceptionHandler);
 this.stateExecutor = stateExecutor;
 this.batchSize = batchSize;
+this.bufferTimeout = bufferTimeout;
 this.maxInFlightRecordNum = maxInFlightRecords;
 this.stateRequestsBuffer = new StateRequestBuffer<>();
 this.inFlightRecordNum = new AtomicInteger(0);
+this.currentTriggerSeq = new AtomicLong(0);
+
+// - initialize buffer timeout ---
+this.currentScheduledFuture = null;
+if (bufferTimeout > 0) {
+this.scheduledExecutor =
+new ScheduledThreadPoolExecutor(
+1, new 
ExecutorThreadFactory("AEC-timeout-scheduler"));
+((ScheduledThreadPoolExecutor) 
this.scheduledExecutor).setRemoveOnCancelPolicy(true);
+// make sure shutdown removes all pending tasks
+((ScheduledThreadPoolExecutor) this.scheduledExecutor)
+
.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+((ScheduledThreadPoolExecutor) this.scheduledExecutor)
+.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+} else {
+this.scheduledExecutor = null;
+}
+
 LOG.info(
-"Create AsyncExecutionController: batchSize {}, 
maxInFlightRecordsNum {}",
+"Create AsyncExecutionController: batchSize {}, bufferTimeout 
{}, maxInFlightRecordsNum {}",
 batchSize,
+bufferTimeout,
 maxInFlightRecords);
 }
 
+void scheduleTimeout(long triggerSeq) {
+if (bufferTimeout > 0) {

Review Comment:
   Yes, It's expected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-04-24 Thread via GitHub


fredia commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r158129


##
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##
@@ -181,4 +182,62 @@ public class ExecutionOptions {
 + " operators. NOTE: It takes effect only 
in the BATCH runtime mode and requires sorted inputs"
 + SORT_INPUTS.key()
 + " to be enabled.");
+
+// - Async State Execution 
--
+
+/**
+ * The max limit of in-flight records number in async state execution, 
'in-flight' refers to the
+ * records that have entered the operator but have not yet been processed 
and emitted to the
+ * downstream. If the in-flight records number exceeds the limit, the 
newly records entering
+ * will be blocked until the in-flight records number drops below the 
limit.
+ */
+@Experimental
+@Documentation.ExcludeFromDocumentation(
+"This is an experimental option, internal use only for now.")
+public static final ConfigOption ASYNC_INFLIGHT_RECORDS_LIMIT =
+ConfigOptions.key("execution.async-state.in-flight-records-limit")
+.intType()
+.defaultValue(6000)
+.withDescription(
+"The max limit of in-flight records number in 
async state execution, 'in-flight' refers"
++ " to the records that have entered the 
operator but have not yet been processed and"
++ " emitted to the downstream. If the 
in-flight records number exceeds the limit,"
++ " the newly records entering will be 
blocked until the in-flight records number drops below the limit.");
+
+/**
+ * The size of buffer under async state execution. Async state execution 
provides a buffer
+ * mechanism to reduce state access. When the number of state requests in 
the buffer exceeds the
+ * batch size, a batched state execution would be triggered. Larger batch 
sizes will bring
+ * higher end-to-end latency, this option works with {@link 
#ASYNC_STATE_BUFFER_TIMEOUT} to
+ * control the frequency of triggering.
+ */
+@Experimental
+@Documentation.ExcludeFromDocumentation(
+"This is an experimental option, internal use only for now.")
+public static final ConfigOption ASYNC_STATE_BUFFER_SIZE =
+ConfigOptions.key("execution.async-state.buffer-size")
+.intType()
+.defaultValue(1000)
+.withDescription(
+"The size of buffer under async state execution. 
Async state execution provides a buffer mechanism to reduce state access."
++ " When the number of state requests in 
the active buffer exceeds the batch size,"
++ " a batched state execution would be 
triggered. Larger batch sizes will bring higher end-to-end latency,"
++ " this option works with 
'execution.async-state.buffer-timeout' to control the frequency of 
triggering.");
+
+/**
+ * The timeout of buffer triggering in milliseconds. If the buffer has not 
reached the {@link
+ * #ASYNC_STATE_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a 
trigger will perform
+ * actively.
+ */
+@Experimental
+@Documentation.ExcludeFromDocumentation(
+"This is an experimental option, internal use only for now.")
+public static final ConfigOption ASYNC_STATE_BUFFER_TIMEOUT =

Review Comment:
   @jectpro7  0c177b...f985f1c are rebased from 
https://github.com/apache/flink/pull/24667, I will rebase this PR once 
https://github.com/apache/flink/pull/24667 finished.
   
   >  I suggest to use something like trigger interval here
   
   In fact, the buffer is not triggered at a fixed frequency, `interval`  
implies a fixed rate, so I prefer to keep it as is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-04-24 Thread via GitHub


Zakelly commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1577730173


##
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##
@@ -61,23 +66,34 @@ public  StateFuture thenApply(Function fn) {
 return StateFutureUtils.completedFuture(r);
 } else {
 StateFutureImpl ret = makeNewStateFuture();
-completableFuture.thenAccept(
-(t) -> {
-callbackRunner.submit(
-() -> {
-ret.complete(fn.apply(t));
-callbackFinished();
-});
-});
+completableFuture
+.thenAccept(
+(t) -> {
+callbackRunner.submit(
+() -> {
+ret.complete(fn.apply(t));
+callbackFinished();
+});
+})
+.exceptionally(
+(e) -> {
+exceptionHandler.handleException(

Review Comment:
   Please note that the `exceptionHandler` here is for non-recoverable errors 
by framework. As for the recoverable ones (like state access timeout or 
something), there will be automatically retry in StateExecutor. In this layer, 
we only focus on the future-related stuff.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-04-24 Thread via GitHub


Zakelly commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1577723817


##
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##
@@ -61,23 +66,34 @@ public  StateFuture thenApply(Function fn) {
 return StateFutureUtils.completedFuture(r);
 } else {
 StateFutureImpl ret = makeNewStateFuture();
-completableFuture.thenAccept(
-(t) -> {
-callbackRunner.submit(
-() -> {
-ret.complete(fn.apply(t));
-callbackFinished();
-});
-});
+completableFuture
+.thenAccept(
+(t) -> {
+callbackRunner.submit(
+() -> {
+ret.complete(fn.apply(t));
+callbackFinished();
+});
+})
+.exceptionally(

Review Comment:
   Shall we do the `exceptionally` here? IIUC, we won't call 
`completeExceptionally` right?
   I was thinking the callback should be protected by:
   ```
   completableFuture
   .thenAccept(
   (t) -> {
   try {
   callbackRunner.submit(
   () -> {
   
ret.complete(fn.apply(t));
   callbackFinished();
   });
 } catch (Throwable e) { 
exceptionHandler.handleException }
   })
   ```
   Right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-04-24 Thread via GitHub


Zakelly commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1577710395


##
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##
@@ -46,13 +46,18 @@ public class StateFutureImpl implements 
InternalStateFuture {
 /** The callback runner. */
 protected final CallbackRunner callbackRunner;
 
-public StateFutureImpl(CallbackRunner callbackRunner) {
+protected final CallbackExceptionHandler exceptionHandler;
+
+public StateFutureImpl(
+CallbackRunner callbackRunner, CallbackExceptionHandler 
exceptionHandler) {
 this.completableFuture = new CompletableFuture<>();
 this.callbackRunner = callbackRunner;
+this.exceptionHandler = exceptionHandler;
 }
 
 @Override
-public  StateFuture thenApply(Function fn) {
+public  StateFuture thenApply(

Review Comment:
   Are you suggesting this?
   ```
   thenCompose((v) -> StateFutureUtils.completedFuture(fn.apply(v)));
   ```
   
   Well I'd suggest not doing so, since we will consider checkpointing the 
user-provided callback function, nested wrapping may make things more complex. 
We could consider optimize this if this doesn't affect that part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-04-23 Thread via GitHub


jectpro7 commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1577181524


##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,29 +109,83 @@ public class AsyncExecutionController {
  */
 final AtomicInteger inFlightRecordNum;
 
-public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
-}
+/** The executor service that schedules and calls the triggers of this 
task. */
+ScheduledExecutorService scheduledExecutor;
+
+ScheduledFuture currentScheduledFuture;
+
+/**
+ * The current trigger sequence number, used to distinguish different 
triggers. Every time a
+ * trigger occurs, {@code currentTriggerSeq} increases by one.
+ */
+AtomicLong currentTriggerSeq;
 
 public AsyncExecutionController(
 MailboxExecutor mailboxExecutor,
+CallbackExceptionHandler exceptionHandler,
 StateExecutor stateExecutor,
 int batchSize,
+long bufferTimeout,
 int maxInFlightRecords) {
 this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
 this.mailboxExecutor = mailboxExecutor;
-this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
+this.callbackExceptionHandler = exceptionHandler;
+this.stateFutureFactory =
+new StateFutureFactory<>(this, mailboxExecutor, 
callbackExceptionHandler);
 this.stateExecutor = stateExecutor;
 this.batchSize = batchSize;
+this.bufferTimeout = bufferTimeout;
 this.maxInFlightRecordNum = maxInFlightRecords;
 this.stateRequestsBuffer = new StateRequestBuffer<>();
 this.inFlightRecordNum = new AtomicInteger(0);
+this.currentTriggerSeq = new AtomicLong(0);
+
+// - initialize buffer timeout ---
+this.currentScheduledFuture = null;
+if (bufferTimeout > 0) {
+this.scheduledExecutor =
+new ScheduledThreadPoolExecutor(
+1, new 
ExecutorThreadFactory("AEC-timeout-scheduler"));

Review Comment:
   Is that possible to  share the `ScheduledThreadPoolExecutor` for all AEC to 
reduce the overhead of thread context switching? It only for scheduling and the 
task is non-blocking, which should run very fast. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-04-23 Thread via GitHub


jectpro7 commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1576541718


##
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##
@@ -46,13 +46,18 @@ public class StateFutureImpl implements 
InternalStateFuture {
 /** The callback runner. */
 protected final CallbackRunner callbackRunner;
 
-public StateFutureImpl(CallbackRunner callbackRunner) {
+protected final CallbackExceptionHandler exceptionHandler;
+
+public StateFutureImpl(
+CallbackRunner callbackRunner, CallbackExceptionHandler 
exceptionHandler) {
 this.completableFuture = new CompletableFuture<>();
 this.callbackRunner = callbackRunner;
+this.exceptionHandler = exceptionHandler;
 }
 
 @Override
-public  StateFuture thenApply(Function fn) {
+public  StateFuture thenApply(

Review Comment:
   `thenApply` can also be simplified with `thenCompose` by 
wrapping`CompletedStateFuture`



##
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/asyncprocessing/AbstractAsyncStateStreamOperator.java:
##
@@ -58,17 +60,35 @@ public abstract class AbstractAsyncStateStreamOperator 
extends AbstractStre
 
 private RecordContext currentProcessingContext;
 
+private Environment environment;
+
 /** Initialize necessary state components for {@link 
AbstractStreamOperator}. */
 @Override
 public void setup(
 StreamTask containingTask,
 StreamConfig config,
 Output> output) {
 super.setup(containingTask, config, output);
-// TODO: properly read config and setup
-final MailboxExecutor mailboxExecutor =
-containingTask.getEnvironment().getMainMailboxExecutor();
-this.asyncExecutionController = new 
AsyncExecutionController(mailboxExecutor, null);
+final Environment environment = containingTask.getEnvironment();
+final MailboxExecutor mailboxExecutor = 
environment.getMainMailboxExecutor();
+final int inFlightRecordsLimit =
+
environment.getExecutionConfig().getAsyncInflightRecordsLimit();
+final int asyncBufferSize = 
environment.getExecutionConfig().getAsyncStateBufferSize();
+final long asyncBufferTimeout =
+environment.getExecutionConfig().getAsyncStateBufferTimeout();
+// TODO: initial state executor and set state executor for aec
+this.asyncExecutionController =
+new AsyncExecutionController(
+mailboxExecutor,
+this::handleStateCallbackException,
+null,
+asyncBufferSize,
+asyncBufferTimeout,
+inFlightRecordsLimit);
+}
+
+private void handleStateCallbackException(String message, Throwable 
exception) {

Review Comment:
   shall we make it `protected` so the user could customize it?



##
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##
@@ -94,29 +109,83 @@ public class AsyncExecutionController {
  */
 final AtomicInteger inFlightRecordNum;
 
-public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
-}
+/** The executor service that schedules and calls the triggers of this 
task. */
+ScheduledExecutorService scheduledExecutor;
+
+ScheduledFuture currentScheduledFuture;
+
+/**
+ * The current trigger sequence number, used to distinguish different 
triggers. Every time a
+ * trigger occurs, {@code currentTriggerSeq} increases by one.
+ */
+AtomicLong currentTriggerSeq;
 
 public AsyncExecutionController(
 MailboxExecutor mailboxExecutor,
+CallbackExceptionHandler exceptionHandler,
 StateExecutor stateExecutor,
 int batchSize,
+long bufferTimeout,
 int maxInFlightRecords) {
 this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
 this.mailboxExecutor = mailboxExecutor;
-this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
+this.callbackExceptionHandler = exceptionHandler;
+this.stateFutureFactory =
+new StateFutureFactory<>(this, mailboxExecutor, 
callbackExceptionHandler);
 this.stateExecutor = stateExecutor;
 this.batchSize = batchSize;
+this.bufferTimeout = bufferTimeout;
 this.maxInFlightRecordNum = maxInFlightRecords;
 this.stateRequestsBuffer = new StateRequestBuffer<>();
 this.inFlightRecordNum = new AtomicInteger(0);
+this.currentTriggerSeq = new AtomicLong(0);
+
+// - initialize buffer timeout 

Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-04-23 Thread via GitHub


jectpro7 commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1576511469


##
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##
@@ -61,23 +66,34 @@ public  StateFuture thenApply(Function fn) {
 return StateFutureUtils.completedFuture(r);
 } else {
 StateFutureImpl ret = makeNewStateFuture();
-completableFuture.thenAccept(
-(t) -> {
-callbackRunner.submit(
-() -> {
-ret.complete(fn.apply(t));
-callbackFinished();
-});
-});
+completableFuture
+.thenAccept(
+(t) -> {
+callbackRunner.submit(
+() -> {
+ret.complete(fn.apply(t));
+callbackFinished();
+});
+})
+.exceptionally(
+(e) -> {
+exceptionHandler.handleException(

Review Comment:
   Shall we allow the handler return something (e.g. default fallback value)? 
if the execption is recoveryable, we can continue the operation. This behaviour 
also matched the `exceptionally` function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-04-23 Thread via GitHub


jectpro7 commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1576446912


##
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##
@@ -61,23 +66,34 @@ public  StateFuture thenApply(Function fn) {
 return StateFutureUtils.completedFuture(r);
 } else {
 StateFutureImpl ret = makeNewStateFuture();
-completableFuture.thenAccept(
-(t) -> {
-callbackRunner.submit(
-() -> {
-ret.complete(fn.apply(t));
-callbackFinished();
-});
-});
+completableFuture
+.thenAccept(
+(t) -> {
+callbackRunner.submit(
+() -> {
+ret.complete(fn.apply(t));
+callbackFinished();
+});
+})
+.exceptionally(
+(e) -> {
+exceptionHandler.handleException(
+"Caught exception when submitting 
StateFuture's callback.",
+e);
+return null;
+});
 return ret;
 }
 } catch (Throwable e) {
-throw new FlinkRuntimeException("Error binding or executing 
callback", e);
+exceptionHandler.handleException(
+"Caught exception when processing completed StateFuture's 
callback.", e);
+return null;
 }
 }
 
 @Override
-public StateFuture thenAccept(Consumer action) {
+public StateFuture thenAccept(

Review Comment:
   `thenAccept` could simplified by:
   ```
   return thenApply((v) -> {
  action.accept(v);
  return null;
   )
   ```



##
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##
@@ -181,4 +182,62 @@ public class ExecutionOptions {
 + " operators. NOTE: It takes effect only 
in the BATCH runtime mode and requires sorted inputs"
 + SORT_INPUTS.key()
 + " to be enabled.");
+
+// - Async State Execution 
--
+
+/**
+ * The max limit of in-flight records number in async state execution, 
'in-flight' refers to the
+ * records that have entered the operator but have not yet been processed 
and emitted to the
+ * downstream. If the in-flight records number exceeds the limit, the 
newly records entering
+ * will be blocked until the in-flight records number drops below the 
limit.
+ */
+@Experimental
+@Documentation.ExcludeFromDocumentation(
+"This is an experimental option, internal use only for now.")
+public static final ConfigOption ASYNC_INFLIGHT_RECORDS_LIMIT =
+ConfigOptions.key("execution.async-state.in-flight-records-limit")
+.intType()
+.defaultValue(6000)
+.withDescription(
+"The max limit of in-flight records number in 
async state execution, 'in-flight' refers"
++ " to the records that have entered the 
operator but have not yet been processed and"
++ " emitted to the downstream. If the 
in-flight records number exceeds the limit,"
++ " the newly records entering will be 
blocked until the in-flight records number drops below the limit.");
+
+/**
+ * The size of buffer under async state execution. Async state execution 
provides a buffer
+ * mechanism to reduce state access. When the number of state requests in 
the buffer exceeds the
+ * batch size, a batched state execution would be triggered. Larger batch 
sizes will bring
+ * higher end-to-end latency, this option works with {@link 
#ASYNC_STATE_BUFFER_TIMEOUT} to
+ * control the frequency of triggering.
+ */
+@Experimental
+@Documentation.ExcludeFromDocumentation(
+"This is an experimental option, internal use only for now.")
+public static final ConfigOption ASYNC_STATE_BUFFER_SIZE =
+ConfigOptions.key("execution.async-state.buffer-size")
+.intType()
+.defaultValue(1000)
+.withDescription(
+"The size of buffer under async state execution. 
Async 

Re: [PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-04-22 Thread via GitHub


flinkbot commented on PR #24698:
URL: https://github.com/apache/flink/pull/24698#issuecomment-2069754200

   
   ## CI report:
   
   * 2b98fd70bb820730897b52e71931d182ebe2d638 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35158][runtime] Error handling in StateFuture's callback [flink]

2024-04-22 Thread via GitHub


fredia opened a new pull request, #24698:
URL: https://github.com/apache/flink/pull/24698

   
   
   ## What is the purpose of the change
   
   This PR implements the error handling in StateFuture's callback, making the 
job fail when any exception exception occurs in StateFuture's callback.
   
   
   ## Brief change log
   
   - Add exception handling to the implementation class of `StateFuture`/
   - Wire the exception in `StateFuture`'s callback to 
`environment.failExternally`.
   
   
   ## Verifying this change
   
   
   This change added/updated tests and can be verified as follows:
   - `AsyncExecutionControllerTest#testException`
   - `ContextStateFutureImplTest`
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): ( no )
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? ( no)
 - If yes, how is the feature documented? (JavaDocs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org