jectpro7 commented on code in PR #24698:
URL: https://github.com/apache/flink/pull/24698#discussion_r1576446912


##########
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##########
@@ -61,23 +66,34 @@ public <U> StateFuture<U> thenApply(Function<? super T, ? 
extends U> fn) {
                 return StateFutureUtils.completedFuture(r);
             } else {
                 StateFutureImpl<U> ret = makeNewStateFuture();
-                completableFuture.thenAccept(
-                        (t) -> {
-                            callbackRunner.submit(
-                                    () -> {
-                                        ret.complete(fn.apply(t));
-                                        callbackFinished();
-                                    });
-                        });
+                completableFuture
+                        .thenAccept(
+                                (t) -> {
+                                    callbackRunner.submit(
+                                            () -> {
+                                                ret.complete(fn.apply(t));
+                                                callbackFinished();
+                                            });
+                                })
+                        .exceptionally(
+                                (e) -> {
+                                    exceptionHandler.handleException(
+                                            "Caught exception when submitting 
StateFuture's callback.",
+                                            e);
+                                    return null;
+                                });
                 return ret;
             }
         } catch (Throwable e) {
-            throw new FlinkRuntimeException("Error binding or executing 
callback", e);
+            exceptionHandler.handleException(
+                    "Caught exception when processing completed StateFuture's 
callback.", e);
+            return null;
         }
     }
 
     @Override
-    public StateFuture<Void> thenAccept(Consumer<? super T> action) {
+    public StateFuture<Void> thenAccept(

Review Comment:
   `thenAccept` could simplified by:
   ```
   return thenApply((v) -> {
      action.accept(v);
      return null;
   )
   ```



##########
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##########
@@ -181,4 +182,62 @@ public class ExecutionOptions {
                                     + " operators. NOTE: It takes effect only 
in the BATCH runtime mode and requires sorted inputs"
                                     + SORT_INPUTS.key()
                                     + " to be enabled.");
+
+    // ------------------------- Async State Execution 
--------------------------
+
+    /**
+     * The max limit of in-flight records number in async state execution, 
'in-flight' refers to the
+     * records that have entered the operator but have not yet been processed 
and emitted to the
+     * downstream. If the in-flight records number exceeds the limit, the 
newly records entering
+     * will be blocked until the in-flight records number drops below the 
limit.
+     */
+    @Experimental
+    @Documentation.ExcludeFromDocumentation(
+            "This is an experimental option, internal use only for now.")
+    public static final ConfigOption<Integer> ASYNC_INFLIGHT_RECORDS_LIMIT =
+            ConfigOptions.key("execution.async-state.in-flight-records-limit")
+                    .intType()
+                    .defaultValue(6000)
+                    .withDescription(
+                            "The max limit of in-flight records number in 
async state execution, 'in-flight' refers"
+                                    + " to the records that have entered the 
operator but have not yet been processed and"
+                                    + " emitted to the downstream. If the 
in-flight records number exceeds the limit,"
+                                    + " the newly records entering will be 
blocked until the in-flight records number drops below the limit.");
+
+    /**
+     * The size of buffer under async state execution. Async state execution 
provides a buffer
+     * mechanism to reduce state access. When the number of state requests in 
the buffer exceeds the
+     * batch size, a batched state execution would be triggered. Larger batch 
sizes will bring
+     * higher end-to-end latency, this option works with {@link 
#ASYNC_STATE_BUFFER_TIMEOUT} to
+     * control the frequency of triggering.
+     */
+    @Experimental
+    @Documentation.ExcludeFromDocumentation(
+            "This is an experimental option, internal use only for now.")
+    public static final ConfigOption<Integer> ASYNC_STATE_BUFFER_SIZE =
+            ConfigOptions.key("execution.async-state.buffer-size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The size of buffer under async state execution. 
Async state execution provides a buffer mechanism to reduce state access."
+                                    + " When the number of state requests in 
the active buffer exceeds the batch size,"
+                                    + " a batched state execution would be 
triggered. Larger batch sizes will bring higher end-to-end latency,"
+                                    + " this option works with 
'execution.async-state.buffer-timeout' to control the frequency of 
triggering.");
+
+    /**
+     * The timeout of buffer triggering in milliseconds. If the buffer has not 
reached the {@link
+     * #ASYNC_STATE_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a 
trigger will perform
+     * actively.
+     */
+    @Experimental
+    @Documentation.ExcludeFromDocumentation(
+            "This is an experimental option, internal use only for now.")
+    public static final ConfigOption<Long> ASYNC_STATE_BUFFER_TIMEOUT =

Review Comment:
   Hi @fredia, the `timeout` is really confused me, it sounds like an 
exception, I read the FLIP-424 twice before realize its meaning. I suggest to 
use something like `trigger interval` here



##########
flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java:
##########
@@ -61,23 +66,34 @@ public <U> StateFuture<U> thenApply(Function<? super T, ? 
extends U> fn) {
                 return StateFutureUtils.completedFuture(r);
             } else {
                 StateFutureImpl<U> ret = makeNewStateFuture();
-                completableFuture.thenAccept(
-                        (t) -> {
-                            callbackRunner.submit(
-                                    () -> {
-                                        ret.complete(fn.apply(t));
-                                        callbackFinished();
-                                    });
-                        });
+                completableFuture
+                        .thenAccept(
+                                (t) -> {
+                                    callbackRunner.submit(
+                                            () -> {
+                                                ret.complete(fn.apply(t));
+                                                callbackFinished();
+                                            });
+                                })
+                        .exceptionally(
+                                (e) -> {
+                                    exceptionHandler.handleException(

Review Comment:
   shall we allow the handler return something? if the execption is 
recoveryable, we can continue the operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to