[GitHub] [flink] 1996fanrui commented on a diff in pull request #22560: [FLINK-32023][API / DataStream] Support negative Duration for special usage, such as -1ms for execution.buffer-timeout

2023-05-23 Thread via GitHub


1996fanrui commented on code in PR #22560:
URL: https://github.com/apache/flink/pull/22560#discussion_r1203434702


##
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##
@@ -92,6 +92,43 @@ public class ExecutionOptions {
 .withDescription(
 "Tells if we should use compression for the state 
snapshot data or not");
 
+public static final ConfigOption BUFFER_TIMEOUT_ENABLED =
+ConfigOptions.key("execution.buffer-timeout.enabled")
+.booleanType()
+.defaultValue(true)
+.withDescription(
+Description.builder()
+.text(
+"If disabled, the config 
execution.buffer-timeout will not take effect and the flushing will be 
triggered only when the output "
++ "buffer is full thus 
maximizing throughput")
+.build());
+
+public static final ConfigOption BUFFER_TIMEOUT_INTERVAL =
+ConfigOptions.key("execution.buffer-timeout.interval")
+.durationType()
+.defaultValue(Duration.ofMillis(100))
+.withDescription(
+Description.builder()
+.text(
+"The maximum time frequency 
(milliseconds) for the flushing of the output buffers. By default "
++ "the output buffers 
flush frequently to provide low latency and to aid smooth developer "
++ "experience. Setting the 
parameter can result in three logical modes:")
+.list(
+text(
+"A positive value triggers 
flushing periodically by that interval"),
+text(
+FLUSH_AFTER_EVERY_RECORD
++ " triggers 
flushing after every record thus minimizing latency"),
+text(
+"If the config "
++ 
BUFFER_TIMEOUT_ENABLED.key()
++ " is false,"
++ " trigger 
flushing only when the output buffer is full thus maximizing "
++ "throughput"))
+.build());
+
+/** @deprecated Use {@link #BUFFER_TIMEOUT_INTERVAL} instead. */
+@Deprecated
 public static final ConfigOption BUFFER_TIMEOUT =
 ConfigOptions.key("execution.buffer-timeout")

Review Comment:
   Don't need to create a new `ConfigOption`, you can take a look this 
[PR](https://github.com/apache/flink/pull/20867/files#diff-7d4e333f055cb785073ea9bf5a53fcc7f8f527ba4230e153dd9e59212093aec4).
   
   https://github.com/apache/flink/assets/38427477/82328ecc-536f-43b9-a9cd-c3c18ce04a4a;>
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #22560: [FLINK-32023][API / DataStream] Support negative Duration for special usage, such as -1ms for execution.buffer-timeout

2023-05-23 Thread via GitHub


1996fanrui commented on code in PR #22560:
URL: https://github.com/apache/flink/pull/22560#discussion_r1203434702


##
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##
@@ -92,6 +92,43 @@ public class ExecutionOptions {
 .withDescription(
 "Tells if we should use compression for the state 
snapshot data or not");
 
+public static final ConfigOption BUFFER_TIMEOUT_ENABLED =
+ConfigOptions.key("execution.buffer-timeout.enabled")
+.booleanType()
+.defaultValue(true)
+.withDescription(
+Description.builder()
+.text(
+"If disabled, the config 
execution.buffer-timeout will not take effect and the flushing will be 
triggered only when the output "
++ "buffer is full thus 
maximizing throughput")
+.build());
+
+public static final ConfigOption BUFFER_TIMEOUT_INTERVAL =
+ConfigOptions.key("execution.buffer-timeout.interval")
+.durationType()
+.defaultValue(Duration.ofMillis(100))
+.withDescription(
+Description.builder()
+.text(
+"The maximum time frequency 
(milliseconds) for the flushing of the output buffers. By default "
++ "the output buffers 
flush frequently to provide low latency and to aid smooth developer "
++ "experience. Setting the 
parameter can result in three logical modes:")
+.list(
+text(
+"A positive value triggers 
flushing periodically by that interval"),
+text(
+FLUSH_AFTER_EVERY_RECORD
++ " triggers 
flushing after every record thus minimizing latency"),
+text(
+"If the config "
++ 
BUFFER_TIMEOUT_ENABLED.key()
++ " is false,"
++ " trigger 
flushing only when the output buffer is full thus maximizing "
++ "throughput"))
+.build());
+
+/** @deprecated Use {@link #BUFFER_TIMEOUT_INTERVAL} instead. */
+@Deprecated
 public static final ConfigOption BUFFER_TIMEOUT =
 ConfigOptions.key("execution.buffer-timeout")

Review Comment:
   Don't need to create a `ConfigOption`, you can take a look this 
[PR](https://github.com/apache/flink/pull/20867/files#diff-7d4e333f055cb785073ea9bf5a53fcc7f8f527ba4230e153dd9e59212093aec4).
   
   https://github.com/apache/flink/assets/38427477/82328ecc-536f-43b9-a9cd-c3c18ce04a4a;>
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #22560: [FLINK-32023][API / DataStream] Support negative Duration for special usage, such as -1ms for execution.buffer-timeout

2023-05-16 Thread via GitHub


1996fanrui commented on code in PR #22560:
URL: https://github.com/apache/flink/pull/22560#discussion_r1195929472


##
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##
@@ -92,6 +92,17 @@ public class ExecutionOptions {
 .withDescription(
 "Tells if we should use compression for the state 
snapshot data or not");
 
+public static final ConfigOption BUFFER_TIMEOUT_ENABLED =
+ConfigOptions.key("execution.buffer-timeout.enabled")

Review Comment:
   Hi @Myracle , thanks for your feedback.
   
   I see too many related option names that all have the same prefix but 
different suffixes. From the user's point of view, the same prefix also clearly 
tells the user that these options are related. Therefore, when FLINK-29372 
restricts the option prefix, it also adds suffixes to many option names that do 
not conform to the specification.
   
   So I prefer keep the prefixes the same for these 2 options. In order to the 
reasonable of this fix, I look forward to more feedback from community.
   
   Hi @wanglijie95  @reswqa , would you mind take a look this PR in your free 
time? thanks a lot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #22560: [FLINK-32023][API / DataStream] Support negative Duration for special usage, such as -1ms for execution.buffer-timeout

2023-05-15 Thread via GitHub


1996fanrui commented on code in PR #22560:
URL: https://github.com/apache/flink/pull/22560#discussion_r1194556471


##
flink-core/src/main/java/org/apache/flink/configuration/ExecutionOptions.java:
##
@@ -92,6 +92,17 @@ public class ExecutionOptions {
 .withDescription(
 "Tells if we should use compression for the state 
snapshot data or not");
 
+public static final ConfigOption BUFFER_TIMEOUT_ENABLED =
+ConfigOptions.key("execution.buffer-timeout.enabled")

Review Comment:
   FLINK-29372 limited that the option key cannot be a prefix of other options, 
however `execution.buffer-timeout` is the prefix of 
`execution.buffer-timeout.enabled`. So the CI fails.
   
   Hi @zentol , could we update the option key for `BUFFER_TIMEOUT` and mark 
the `"execution.buffer-timeout"` as the DeprecatedKey? How about update it to 
`execution.buffer-timeout.interval` ? Or do you have any suggestions here? 
Looking forward to your opinion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] 1996fanrui commented on a diff in pull request #22560: [FLINK-32023][API / DataStream] Support negative Duration for special usage, such as -1ms for execution.buffer-timeout

2023-05-10 Thread via GitHub


1996fanrui commented on code in PR #22560:
URL: https://github.com/apache/flink/pull/22560#discussion_r1189688192


##
flink-core/src/test/java/org/apache/flink/util/TimeUtilsTest.java:
##
@@ -118,6 +118,12 @@ public void testParseDurationTrim() {
 assertEquals(155L, TimeUtils.parseDuration("  155  ms   
").toMillis());
 }
 
+@Test
+public void testParseDurationWithNegativeNumber() {
+assertEquals(-1L, TimeUtils.parseDuration("-1ms").toMillis());
+assertEquals(-1L, TimeUtils.parseDuration("-1").toMillis());

Review Comment:
   Could you add some cases with multiple `-`?



##
flink-core/src/main/java/org/apache/flink/util/TimeUtils.java:
##
@@ -69,7 +69,8 @@ public static Duration parseDuration(String text) {
 int pos = 0;
 
 char current;
-while (pos < len && (current = trimmed.charAt(pos)) >= '0' && current 
<= '9') {
+// Negative number is supported for special usage, such as -1ms for 
execution.buffer-timeout
+while (pos < len && ((current = trimmed.charAt(pos)) >= '0' && current 
<= '9' || current == '-')) {

Review Comment:
   We should check that the `-` must be the first char, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org