yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1569768663


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,19 +103,26 @@ public class AsyncExecutionController<K> {
     final AtomicInteger inFlightRecordNum;
 
     public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-        this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+        this(
+                mailboxExecutor,
+                stateExecutor,
+                DEFAULT_BATCH_SIZE,
+                DEFAULT_BUFFER_TIMEOUT,
+                DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);

Review Comment:
   Given that #24657 has been merged, it might be better to verify that the 
introduced configurations can pass the configured values into AEC through 
operator setups now.



##########
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##########
@@ -181,4 +182,73 @@ public class ExecutionOptions {
                                     + " operators. NOTE: It takes effect only 
in the BATCH runtime mode and requires sorted inputs"
                                     + SORT_INPUTS.key()
                                     + " to be enabled.");
+
+    /**
+     * A flag to enable or disable async mode related components when tasks 
initialize. As long as
+     * this option is enabled, the state access of Async state APIs will be 
executed asynchronously.
+     * Otherwise, the state access of Async state APIs will be executed 
synchronously. For Sync
+     * state APIs, the state access is always executed synchronously, enable 
this option would bring
+     * some overhead.
+     *
+     * <p>Note: This is an experimental feature(FLIP-425) under evaluation.
+     */
+    @Experimental
+    public static final ConfigOption<Boolean> ASYNC_STATE_ENABLED =
+            ConfigOptions.key("execution.async-mode.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "A flag to enable or disable async mode related 
components when tasks initialize."
+                                    + " As long as this option is enabled, the 
state access of Async state APIs will be executed asynchronously."
+                                    + " Otherwise, the state access of Async 
state APIs will be executed synchronously."
+                                    + " For Sync state APIs, the state access 
is always executed synchronously, enable this option would bring some 
overhead.\n"
+                                    + " Note: This is an experimental feature 
under evaluation.");
+
+    /**
+     * The max limit of in-flight records number in async execution mode, 
'in-flight' refers to the
+     * records that have entered the operator but have not yet been processed 
and emitted to the
+     * downstream. If the in-flight records number exceeds the limit, the 
newly records entering
+     * will be blocked until the in-flight records number drops below the 
limit.
+     */
+    @Experimental
+    public static final ConfigOption<Integer> ASYNC_INFLIGHT_RECORDS_LIMIT =
+            ConfigOptions.key("execution.async-mode.in-flight-records-limit")
+                    .intType()
+                    .defaultValue(6000)
+                    .withDescription(
+                            "The max limit of in-flight records number in 
async execution mode, 'in-flight' refers"
+                                    + " to the records that have entered the 
operator but have not yet been processed and"
+                                    + " emitted to the downstream. If the 
in-flight records number exceeds the limit,"
+                                    + " the newly records entering will be 
blocked until the in-flight records number drops below the limit.");
+
+    /**
+     * The size of buffer under async execution mode. Async execution mode 
provides a buffer
+     * mechanism to reduce state access. When the number of state requests in 
the buffer exceeds the
+     * batch size, a batched state execution would be triggered. Larger batch 
sizes will bring
+     * higher end-to-end latency, this option works with {@link 
#ASYNC_BUFFER_TIMEOUT} to control
+     * the frequency of triggering.
+     */
+    @Experimental
+    public static final ConfigOption<Integer> ASYNC_BUFFER_SIZE =
+            ConfigOptions.key("execution.async-mode.buffer-size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The size of buffer under async execution mode. 
Async execution mode provides a buffer mechanism to reduce state access."
+                                    + " When the number of state requests in 
the active buffer exceeds the batch size,"
+                                    + " a batched state execution would be 
triggered. Larger batch sizes will bring higher end-to-end latency,"
+                                    + " this option works with 
'execution.async-state.buffer-timeout' to control the frequency of 
triggering.");
+
+    /**
+     * The timeout of buffer triggering in milliseconds. If the buffer has not 
reached the {@link
+     * #ASYNC_BUFFER_SIZE} within 'buffer-timeout' milliseconds, a trigger 
will perform actively.
+     */
+    @Experimental
+    public static final ConfigOption<Integer> ASYNC_BUFFER_TIMEOUT =
+            ConfigOptions.key("execution.async-state.buffer-timeout")

Review Comment:
   Got it. I agree with it that `async-mode` is better than `async-state` here, 
but a single word "buffer" might still not be enough, as a new developer who is 
not familiar with FLIP-425 might regard this configuration as some buffer 
related to stream records, and then get confused with this config and 
`in-flight-records-limit`. A possibly better name that comes to my mind is 
`execution.async-mode.state-buffer-timeout`. WDYT?



##########
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##########
@@ -1085,6 +1085,54 @@ public void setUseSnapshotCompression(boolean 
useSnapshotCompression) {
         configuration.set(ExecutionOptions.SNAPSHOT_COMPRESSION, 
useSnapshotCompression);
     }
 
+    // 
--------------------------------------------------------------------------------------------
+    //  Asynchronous execution configurations
+    // 
--------------------------------------------------------------------------------------------
+
+    @Internal

Review Comment:
   Seems that this comment is not resolved yet?



##########
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##########
@@ -181,4 +182,52 @@ public class ExecutionOptions {
                                     + " operators. NOTE: It takes effect only 
in the BATCH runtime mode and requires sorted inputs"
                                     + SORT_INPUTS.key()
                                     + " to be enabled.");
+
+    @Internal
+    @Documentation.ExcludeFromDocumentation(
+            "This is an experimental option, internal use only for now.")
+    public static final ConfigOption<Integer> ASYNC_INFLIGHT_RECORDS_LIMIT =
+            ConfigOptions.key("execution.async-mode.in-flight-records-limit")
+                    .intType()
+                    .defaultValue(6000)
+                    .withDescription(
+                            "The max limit of in-flight records number in 
async execution mode, 'in-flight' refers"
+                                    + " to the records that have entered the 
operator but have not yet been processed and"
+                                    + " emitted to the downstream. If the 
in-flight records number exceeds the limit,"
+                                    + " the newly records entering will be 
blocked until the in-flight records number drops below the limit.");
+
+    /**
+     * The size of buffer under async execution mode. Async execution mode 
provides a buffer
+     * mechanism to reduce state access. When the number of state requests in 
the buffer exceeds the
+     * batch size, a batched state execution would be triggered. Larger batch 
sizes will bring
+     * higher end-to-end latency, this option works with {@link 
#ASYNC_BUFFER_TIMEOUT} to control
+     * the frequency of triggering.
+     */
+    @Internal
+    @Documentation.ExcludeFromDocumentation(
+            "This is an experimental option, internal use only for now.")
+    public static final ConfigOption<Integer> ASYNC_BUFFER_SIZE =
+            ConfigOptions.key("execution.async-mode.buffer-size")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription(
+                            "The size of buffer under async execution mode. 
Async execution mode provides a buffer mechanism to reduce state access."
+                                    + " When the number of state requests in 
the active buffer exceeds the batch size,"
+                                    + " a batched state execution would be 
triggered. Larger batch sizes will bring higher end-to-end latency,"
+                                    + " this option works with 
'execution.async-state.buffer-timeout' to control the frequency of 
triggering.");

Review Comment:
   "execution.async-state.buffer-timeout" should be changed to 
"execution.async-mode.buffer-timeout".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to