[GitHub] [flink] 1996fanrui commented on a diff in pull request #22560: [FLINK-32023][API / DataStream] Support negative Duration for special usage, such as -1ms for execution.buffer-timeout
1996fanrui commented on code in PR #22560: URL: https://github.com/apache/flink/pull/22560#discussion_r1203434702 ## flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java: ## @@ -92,6 +92,43 @@ public class ExecutionOptions { .withDescription( "Tells if we should use compression for the state snapshot data or not"); +public static final ConfigOption BUFFER_TIMEOUT_ENABLED = +ConfigOptions.key("execution.buffer-timeout.enabled") +.booleanType() +.defaultValue(true) +.withDescription( +Description.builder() +.text( +"If disabled, the config execution.buffer-timeout will not take effect and the flushing will be triggered only when the output " ++ "buffer is full thus maximizing throughput") +.build()); + +public static final ConfigOption BUFFER_TIMEOUT_INTERVAL = +ConfigOptions.key("execution.buffer-timeout.interval") +.durationType() +.defaultValue(Duration.ofMillis(100)) +.withDescription( +Description.builder() +.text( +"The maximum time frequency (milliseconds) for the flushing of the output buffers. By default " ++ "the output buffers flush frequently to provide low latency and to aid smooth developer " ++ "experience. Setting the parameter can result in three logical modes:") +.list( +text( +"A positive value triggers flushing periodically by that interval"), +text( +FLUSH_AFTER_EVERY_RECORD ++ " triggers flushing after every record thus minimizing latency"), +text( +"If the config " ++ BUFFER_TIMEOUT_ENABLED.key() ++ " is false," ++ " trigger flushing only when the output buffer is full thus maximizing " ++ "throughput")) +.build()); + +/** @deprecated Use {@link #BUFFER_TIMEOUT_INTERVAL} instead. */ +@Deprecated public static final ConfigOption BUFFER_TIMEOUT = ConfigOptions.key("execution.buffer-timeout") Review Comment: Don't need to create a new `ConfigOption`, you can take a look this [PR](https://github.com/apache/flink/pull/20867/files#diff-7d4e333f055cb785073ea9bf5a53fcc7f8f527ba4230e153dd9e59212093aec4). https://github.com/apache/flink/assets/38427477/82328ecc-536f-43b9-a9cd-c3c18ce04a4a;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on a diff in pull request #22560: [FLINK-32023][API / DataStream] Support negative Duration for special usage, such as -1ms for execution.buffer-timeout
1996fanrui commented on code in PR #22560: URL: https://github.com/apache/flink/pull/22560#discussion_r1203434702 ## flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java: ## @@ -92,6 +92,43 @@ public class ExecutionOptions { .withDescription( "Tells if we should use compression for the state snapshot data or not"); +public static final ConfigOption BUFFER_TIMEOUT_ENABLED = +ConfigOptions.key("execution.buffer-timeout.enabled") +.booleanType() +.defaultValue(true) +.withDescription( +Description.builder() +.text( +"If disabled, the config execution.buffer-timeout will not take effect and the flushing will be triggered only when the output " ++ "buffer is full thus maximizing throughput") +.build()); + +public static final ConfigOption BUFFER_TIMEOUT_INTERVAL = +ConfigOptions.key("execution.buffer-timeout.interval") +.durationType() +.defaultValue(Duration.ofMillis(100)) +.withDescription( +Description.builder() +.text( +"The maximum time frequency (milliseconds) for the flushing of the output buffers. By default " ++ "the output buffers flush frequently to provide low latency and to aid smooth developer " ++ "experience. Setting the parameter can result in three logical modes:") +.list( +text( +"A positive value triggers flushing periodically by that interval"), +text( +FLUSH_AFTER_EVERY_RECORD ++ " triggers flushing after every record thus minimizing latency"), +text( +"If the config " ++ BUFFER_TIMEOUT_ENABLED.key() ++ " is false," ++ " trigger flushing only when the output buffer is full thus maximizing " ++ "throughput")) +.build()); + +/** @deprecated Use {@link #BUFFER_TIMEOUT_INTERVAL} instead. */ +@Deprecated public static final ConfigOption BUFFER_TIMEOUT = ConfigOptions.key("execution.buffer-timeout") Review Comment: Don't need to create a `ConfigOption`, you can take a look this [PR](https://github.com/apache/flink/pull/20867/files#diff-7d4e333f055cb785073ea9bf5a53fcc7f8f527ba4230e153dd9e59212093aec4). https://github.com/apache/flink/assets/38427477/82328ecc-536f-43b9-a9cd-c3c18ce04a4a;> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on a diff in pull request #22560: [FLINK-32023][API / DataStream] Support negative Duration for special usage, such as -1ms for execution.buffer-timeout
1996fanrui commented on code in PR #22560: URL: https://github.com/apache/flink/pull/22560#discussion_r1195929472 ## flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java: ## @@ -92,6 +92,17 @@ public class ExecutionOptions { .withDescription( "Tells if we should use compression for the state snapshot data or not"); +public static final ConfigOption BUFFER_TIMEOUT_ENABLED = +ConfigOptions.key("execution.buffer-timeout.enabled") Review Comment: Hi @Myracle , thanks for your feedback. I see too many related option names that all have the same prefix but different suffixes. From the user's point of view, the same prefix also clearly tells the user that these options are related. Therefore, when FLINK-29372 restricts the option prefix, it also adds suffixes to many option names that do not conform to the specification. So I prefer keep the prefixes the same for these 2 options. In order to the reasonable of this fix, I look forward to more feedback from community. Hi @wanglijie95 @reswqa , would you mind take a look this PR in your free time? thanks a lot. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on a diff in pull request #22560: [FLINK-32023][API / DataStream] Support negative Duration for special usage, such as -1ms for execution.buffer-timeout
1996fanrui commented on code in PR #22560: URL: https://github.com/apache/flink/pull/22560#discussion_r1194556471 ## flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java: ## @@ -92,6 +92,17 @@ public class ExecutionOptions { .withDescription( "Tells if we should use compression for the state snapshot data or not"); +public static final ConfigOption BUFFER_TIMEOUT_ENABLED = +ConfigOptions.key("execution.buffer-timeout.enabled") Review Comment: FLINK-29372 limited that the option key cannot be a prefix of other options, however `execution.buffer-timeout` is the prefix of `execution.buffer-timeout.enabled`. So the CI fails. Hi @zentol , could we update the option key for `BUFFER_TIMEOUT` and mark the `"execution.buffer-timeout"` as the DeprecatedKey? How about update it to `execution.buffer-timeout.interval` ? Or do you have any suggestions here? Looking forward to your opinion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] 1996fanrui commented on a diff in pull request #22560: [FLINK-32023][API / DataStream] Support negative Duration for special usage, such as -1ms for execution.buffer-timeout
1996fanrui commented on code in PR #22560: URL: https://github.com/apache/flink/pull/22560#discussion_r1189688192 ## flink-core/src/test/java/org/apache/flink/util/TimeUtilsTest.java: ## @@ -118,6 +118,12 @@ public void testParseDurationTrim() { assertEquals(155L, TimeUtils.parseDuration(" 155 ms ").toMillis()); } +@Test +public void testParseDurationWithNegativeNumber() { +assertEquals(-1L, TimeUtils.parseDuration("-1ms").toMillis()); +assertEquals(-1L, TimeUtils.parseDuration("-1").toMillis()); Review Comment: Could you add some cases with multiple `-`? ## flink-core/src/main/java/org/apache/flink/util/TimeUtils.java: ## @@ -69,7 +69,8 @@ public static Duration parseDuration(String text) { int pos = 0; char current; -while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current <= '9') { +// Negative number is supported for special usage, such as -1ms for execution.buffer-timeout +while (pos < len && ((current = trimmed.charAt(pos)) >= '0' && current <= '9' || current == '-')) { Review Comment: We should check that the `-` must be the first char, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org