This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.17 by this push: new 67acbe9c881 [FLINK-32023][API / DataStream] Add config execution.buffer-timeout.enabled to flush only when the output buffer is full. 67acbe9c881 is described below commit 67acbe9c8814dd56262053d443ae2712e03d1cb0 Author: liujiangang <liujiang...@kuaishou.com> AuthorDate: Wed May 10 15:46:05 2023 +0800 [FLINK-32023][API / DataStream] Add config execution.buffer-timeout.enabled to flush only when the output buffer is full. --- .../generated/execution_configuration.html | 10 +++- .../flink/configuration/ExecutionOptions.java | 20 ++++++-- .../environment/StreamExecutionEnvironment.java | 15 ++++-- .../api/StreamExecutionEnvironmentTest.java | 54 ++++++++++++++++++++++ 4 files changed, 91 insertions(+), 8 deletions(-) diff --git a/docs/layouts/shortcodes/generated/execution_configuration.html b/docs/layouts/shortcodes/generated/execution_configuration.html index 8de9b1cef17..5407666376d 100644 --- a/docs/layouts/shortcodes/generated/execution_configuration.html +++ b/docs/layouts/shortcodes/generated/execution_configuration.html @@ -15,10 +15,16 @@ <td>Defines how data is exchanged between tasks in batch 'execution.runtime-mode' if the shuffling behavior has not been set explicitly for an individual exchange.<br />With pipelined exchanges, upstream and downstream tasks run simultaneously. In order to achieve lower latency, a result record is immediately sent to and processed by the downstream task. Thus, the receiver back-pressures the sender. The streaming mode always uses this exchange.<br />With blocking exchanges, u [...] </tr> <tr> - <td><h5>execution.buffer-timeout</h5></td> + <td><h5>execution.buffer-timeout.enabled</h5></td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>If disabled, the config execution.buffer-timeout.interval will not take effect and the flushing will be triggered only when the output buffer is full thus maximizing throughput</td> + </tr> + <tr> + <td><h5>execution.buffer-timeout.interval</h5></td> <td style="word-wrap: break-word;">100 ms</td> <td>Duration</td> - <td>The maximum time frequency (milliseconds) for the flushing of the output buffers. By default the output buffers flush frequently to provide low latency and to aid smooth developer experience. Setting the parameter can result in three logical modes:<ul><li>A positive value triggers flushing periodically by that interval</li><li>0 triggers flushing after every record thus minimizing latency</li><li>-1 ms triggers flushing only when the output buffer is full thus maximizing [...] + <td>The maximum time frequency (milliseconds) for the flushing of the output buffers. By default the output buffers flush frequently to provide low latency and to aid smooth developer experience. Setting the parameter can result in three logical modes:<ul><li>A positive value triggers flushing periodically by that interval</li><li>0 triggers flushing after every record thus minimizing latency</li><li>If the config execution.buffer-timeout.enabled is false, trigger flushing on [...] </tr> <tr> <td><h5>execution.checkpointing.snapshot-compression</h5></td> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java index fae0429c544..2a5f0b61736 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java @@ -92,10 +92,22 @@ public class ExecutionOptions { .withDescription( "Tells if we should use compression for the state snapshot data or not"); + public static final ConfigOption<Boolean> BUFFER_TIMEOUT_ENABLED = + ConfigOptions.key("execution.buffer-timeout.enabled") + .booleanType() + .defaultValue(true) + .withDescription( + Description.builder() + .text( + "If disabled, the config execution.buffer-timeout.interval will not take effect and the flushing will be triggered only when the output " + + "buffer is full thus maximizing throughput") + .build()); + public static final ConfigOption<Duration> BUFFER_TIMEOUT = - ConfigOptions.key("execution.buffer-timeout") + ConfigOptions.key("execution.buffer-timeout.interval") .durationType() .defaultValue(Duration.ofMillis(100)) + .withDeprecatedKeys("execution.buffer-timeout") .withDescription( Description.builder() .text( @@ -109,8 +121,10 @@ public class ExecutionOptions { FLUSH_AFTER_EVERY_RECORD + " triggers flushing after every record thus minimizing latency"), text( - DISABLED_NETWORK_BUFFER_TIMEOUT - + " ms triggers flushing only when the output buffer is full thus maximizing " + "If the config " + + BUFFER_TIMEOUT_ENABLED.key() + + " is false," + + " trigger flushing only when the output buffer is full thus maximizing " + "throughput")) .build()); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index c516b074133..955afff19a6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -990,9 +990,6 @@ public class StreamExecutionEnvironment implements AutoCloseable { configuration .getOptional(PipelineOptions.OPERATOR_CHAINING) .ifPresent(c -> this.isChainingEnabled = c); - configuration - .getOptional(ExecutionOptions.BUFFER_TIMEOUT) - .ifPresent(t -> this.setBufferTimeout(t.toMillis())); configuration .getOptional(DeploymentOptions.JOB_LISTENERS) .ifPresent(listeners -> registerCustomListeners(classLoader, listeners)); @@ -1052,6 +1049,8 @@ public class StreamExecutionEnvironment implements AutoCloseable { BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_ENABLED, flag)); + configBufferTimeout(configuration); + config.configure(configuration, classLoader); checkpointCfg.configure(configuration); } @@ -1077,6 +1076,16 @@ public class StreamExecutionEnvironment implements AutoCloseable { } } + private void configBufferTimeout(ReadableConfig configuration) { + if (configuration.get(ExecutionOptions.BUFFER_TIMEOUT_ENABLED)) { + configuration + .getOptional(ExecutionOptions.BUFFER_TIMEOUT) + .ifPresent(t -> this.setBufferTimeout(t.toMillis())); + } else { + this.setBufferTimeout(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT); + } + } + // -------------------------------------------------------------------------------------------- // Data stream creations // -------------------------------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java index a71a26cf54c..0d8836d8482 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.streaming.api.datastream.DataStream; @@ -399,6 +400,59 @@ public class StreamExecutionEnvironmentTest { assertEquals(new GenericTypeInfo<>(Row.class), source2.getType()); } + @Test + public void testBufferTimeoutByDefault() { + Configuration config = new Configuration(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + testBufferTimeout(config, env); + } + + @Test + public void testBufferTimeoutEnabled() { + Configuration config = new Configuration(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + config.set(ExecutionOptions.BUFFER_TIMEOUT_ENABLED, true); + testBufferTimeout(config, env); + } + + @Test + public void testBufferTimeoutDisabled() { + Configuration config = new Configuration(); + config.set(ExecutionOptions.BUFFER_TIMEOUT_ENABLED, false); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // The execution.buffer-timeout's default value 100ms will not take effect. + env.configure(config, this.getClass().getClassLoader()); + assertEquals(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT, env.getBufferTimeout()); + + // Setting execution.buffer-timeout's to 0ms will not take effect. + config.setString(ExecutionOptions.BUFFER_TIMEOUT.key(), "0ms"); + env.configure(config, this.getClass().getClassLoader()); + assertEquals(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT, env.getBufferTimeout()); + + // Setting execution.buffer-timeout's to -1ms will not take effect. + config.setString(ExecutionOptions.BUFFER_TIMEOUT.key(), "-1ms"); + env.configure(config, this.getClass().getClassLoader()); + assertEquals(ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT, env.getBufferTimeout()); + } + + private void testBufferTimeout(Configuration config, StreamExecutionEnvironment env) { + env.configure(config, this.getClass().getClassLoader()); + assertEquals( + ExecutionOptions.BUFFER_TIMEOUT.defaultValue().toMillis(), env.getBufferTimeout()); + + config.setString(ExecutionOptions.BUFFER_TIMEOUT.key(), "0ms"); + env.configure(config, this.getClass().getClassLoader()); + assertEquals(0, env.getBufferTimeout()); + + try { + config.setString(ExecutionOptions.BUFFER_TIMEOUT.key(), "-1ms"); + env.configure(config, this.getClass().getClassLoader()); + fail("exception expected"); + } catch (IllegalArgumentException ignored) { + } + } + ///////////////////////////////////////////////////////////// // Utilities /////////////////////////////////////////////////////////////