This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new 67acbe9c881 [FLINK-32023][API / DataStream] Add config 
execution.buffer-timeout.enabled to flush only when the output buffer is full.
67acbe9c881 is described below

commit 67acbe9c8814dd56262053d443ae2712e03d1cb0
Author: liujiangang <liujiang...@kuaishou.com>
AuthorDate: Wed May 10 15:46:05 2023 +0800

    [FLINK-32023][API / DataStream] Add config execution.buffer-timeout.enabled 
to flush only when the output buffer is full.
---
 .../generated/execution_configuration.html         | 10 +++-
 .../flink/configuration/ExecutionOptions.java      | 20 ++++++--
 .../environment/StreamExecutionEnvironment.java    | 15 ++++--
 .../api/StreamExecutionEnvironmentTest.java        | 54 ++++++++++++++++++++++
 4 files changed, 91 insertions(+), 8 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/execution_configuration.html 
b/docs/layouts/shortcodes/generated/execution_configuration.html
index 8de9b1cef17..5407666376d 100644
--- a/docs/layouts/shortcodes/generated/execution_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_configuration.html
@@ -15,10 +15,16 @@
             <td>Defines how data is exchanged between tasks in batch 
'execution.runtime-mode' if the shuffling behavior has not been set explicitly 
for an individual exchange.<br />With pipelined exchanges, upstream and 
downstream tasks run simultaneously. In order to achieve lower latency, a 
result record is immediately sent to and processed by the downstream task. 
Thus, the receiver back-pressures the sender. The streaming mode always uses 
this exchange.<br />With blocking exchanges, u [...]
         </tr>
         <tr>
-            <td><h5>execution.buffer-timeout</h5></td>
+            <td><h5>execution.buffer-timeout.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>If disabled, the config execution.buffer-timeout.interval will 
not take effect and the flushing will be triggered only when the output buffer 
is full thus maximizing throughput</td>
+        </tr>
+        <tr>
+            <td><h5>execution.buffer-timeout.interval</h5></td>
             <td style="word-wrap: break-word;">100 ms</td>
             <td>Duration</td>
-            <td>The maximum time frequency (milliseconds) for the flushing of 
the output buffers. By default the output buffers flush frequently to provide 
low latency and to aid smooth developer experience. Setting the parameter can 
result in three logical modes:<ul><li>A positive value triggers flushing 
periodically by that interval</li><li>0 triggers flushing after every record 
thus minimizing latency</li><li>-1 ms triggers flushing only when the output 
buffer is full thus maximizing  [...]
+            <td>The maximum time frequency (milliseconds) for the flushing of 
the output buffers. By default the output buffers flush frequently to provide 
low latency and to aid smooth developer experience. Setting the parameter can 
result in three logical modes:<ul><li>A positive value triggers flushing 
periodically by that interval</li><li>0 triggers flushing after every record 
thus minimizing latency</li><li>If the config execution.buffer-timeout.enabled 
is false, trigger flushing on [...]
         </tr>
         <tr>
             <td><h5>execution.checkpointing.snapshot-compression</h5></td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java
index fae0429c544..2a5f0b61736 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java
@@ -92,10 +92,22 @@ public class ExecutionOptions {
                     .withDescription(
                             "Tells if we should use compression for the state 
snapshot data or not");
 
+    public static final ConfigOption<Boolean> BUFFER_TIMEOUT_ENABLED =
+            ConfigOptions.key("execution.buffer-timeout.enabled")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "If disabled, the config 
execution.buffer-timeout.interval will not take effect and the flushing will be 
triggered only when the output "
+                                                    + "buffer is full thus 
maximizing throughput")
+                                    .build());
+
     public static final ConfigOption<Duration> BUFFER_TIMEOUT =
-            ConfigOptions.key("execution.buffer-timeout")
+            ConfigOptions.key("execution.buffer-timeout.interval")
                     .durationType()
                     .defaultValue(Duration.ofMillis(100))
+                    .withDeprecatedKeys("execution.buffer-timeout")
                     .withDescription(
                             Description.builder()
                                     .text(
@@ -109,8 +121,10 @@ public class ExecutionOptions {
                                                     FLUSH_AFTER_EVERY_RECORD
                                                             + " triggers 
flushing after every record thus minimizing latency"),
                                             text(
-                                                    
DISABLED_NETWORK_BUFFER_TIMEOUT
-                                                            + " ms triggers 
flushing only when the output buffer is full thus maximizing "
+                                                    "If the config "
+                                                            + 
BUFFER_TIMEOUT_ENABLED.key()
+                                                            + " is false,"
+                                                            + " trigger 
flushing only when the output buffer is full thus maximizing "
                                                             + "throughput"))
                                     .build());
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index c516b074133..955afff19a6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -990,9 +990,6 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
         configuration
                 .getOptional(PipelineOptions.OPERATOR_CHAINING)
                 .ifPresent(c -> this.isChainingEnabled = c);
-        configuration
-                .getOptional(ExecutionOptions.BUFFER_TIMEOUT)
-                .ifPresent(t -> this.setBufferTimeout(t.toMillis()));
         configuration
                 .getOptional(DeploymentOptions.JOB_LISTENERS)
                 .ifPresent(listeners -> registerCustomListeners(classLoader, 
listeners));
@@ -1052,6 +1049,8 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
                                         
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_ENABLED,
                                         flag));
 
+        configBufferTimeout(configuration);
+
         config.configure(configuration, classLoader);
         checkpointCfg.configure(configuration);
     }
@@ -1077,6 +1076,16 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
         }
     }
 
+    private void configBufferTimeout(ReadableConfig configuration) {
+        if (configuration.get(ExecutionOptions.BUFFER_TIMEOUT_ENABLED)) {
+            configuration
+                    .getOptional(ExecutionOptions.BUFFER_TIMEOUT)
+                    .ifPresent(t -> this.setBufferTimeout(t.toMillis()));
+        } else {
+            
this.setBufferTimeout(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT);
+        }
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Data stream creations
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index a71a26cf54c..0d8836d8482 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -399,6 +400,59 @@ public class StreamExecutionEnvironmentTest {
         assertEquals(new GenericTypeInfo<>(Row.class), source2.getType());
     }
 
+    @Test
+    public void testBufferTimeoutByDefault() {
+        Configuration config = new Configuration();
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        testBufferTimeout(config, env);
+    }
+
+    @Test
+    public void testBufferTimeoutEnabled() {
+        Configuration config = new Configuration();
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        config.set(ExecutionOptions.BUFFER_TIMEOUT_ENABLED, true);
+        testBufferTimeout(config, env);
+    }
+
+    @Test
+    public void testBufferTimeoutDisabled() {
+        Configuration config = new Configuration();
+        config.set(ExecutionOptions.BUFFER_TIMEOUT_ENABLED, false);
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+        // The execution.buffer-timeout's default value 100ms will not take 
effect.
+        env.configure(config, this.getClass().getClassLoader());
+        assertEquals(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT, 
env.getBufferTimeout());
+
+        // Setting execution.buffer-timeout's to 0ms will not take effect.
+        config.setString(ExecutionOptions.BUFFER_TIMEOUT.key(), "0ms");
+        env.configure(config, this.getClass().getClassLoader());
+        assertEquals(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT, 
env.getBufferTimeout());
+
+        // Setting execution.buffer-timeout's to -1ms will not take effect.
+        config.setString(ExecutionOptions.BUFFER_TIMEOUT.key(), "-1ms");
+        env.configure(config, this.getClass().getClassLoader());
+        assertEquals(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT, 
env.getBufferTimeout());
+    }
+
+    private void testBufferTimeout(Configuration config, 
StreamExecutionEnvironment env) {
+        env.configure(config, this.getClass().getClassLoader());
+        assertEquals(
+                ExecutionOptions.BUFFER_TIMEOUT.defaultValue().toMillis(), 
env.getBufferTimeout());
+
+        config.setString(ExecutionOptions.BUFFER_TIMEOUT.key(), "0ms");
+        env.configure(config, this.getClass().getClassLoader());
+        assertEquals(0, env.getBufferTimeout());
+
+        try {
+            config.setString(ExecutionOptions.BUFFER_TIMEOUT.key(), "-1ms");
+            env.configure(config, this.getClass().getClassLoader());
+            fail("exception expected");
+        } catch (IllegalArgumentException ignored) {
+        }
+    }
+
     /////////////////////////////////////////////////////////////
     // Utilities
     /////////////////////////////////////////////////////////////

Reply via email to