wuchong commented on a change in pull request #13392:
URL: https://github.com/apache/flink/pull/13392#discussion_r490117998



##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala
##########
@@ -198,9 +198,10 @@ object WindowEmitStrategy {
 
   // It is a experimental config, will may be removed later.
   @Experimental
-  val TABLE_EXEC_EMIT_LATE_FIRE_DELAY: ConfigOption[String] =
+  val TABLE_EXEC_EMIT_LATE_FIRE_DELAY: ConfigOption[Duration] =
   key("table.exec.emit.late-fire.delay")
-      .noDefaultValue
+      .durationType()
+      .defaultValue(Duration.ofMillis(-1))

Review comment:
       We can use `.noDefaultValue()` to keep the same behavior as before. 

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
##########
@@ -60,9 +60,10 @@
        // 
------------------------------------------------------------------------
 
        @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
-       public static final ConfigOption<String> TABLE_EXEC_SOURCE_IDLE_TIMEOUT 
=
+       public static final ConfigOption<Duration> 
TABLE_EXEC_SOURCE_IDLE_TIMEOUT =
                key("table.exec.source.idle-timeout")
-                       .defaultValue("-1 ms")
+                       .durationType()
+                       .defaultValue(Duration.ofMillis(0))
                        .withDescription("When a source do not receive any 
elements for the timeout time, " +
                                "it will be marked as temporarily idle. This 
allows downstream " +
                                "tasks to advance their watermarks without the 
need to wait for " +

Review comment:
       The default value is 0ms now. It would be better to mention the meaning 
of default value, otherwise it's confusing. 
   
   > Default value is 0, which means detecting source idleness is not enabled. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/optimize/StreamCommonSubGraphBasedOptimizer.scala
##########
@@ -61,8 +60,8 @@ class StreamCommonSubGraphBasedOptimizer(planner: 
StreamPlanner)
 
       val miniBatchInterval: MiniBatchInterval = if 
(config.getConfiguration.getBoolean(
         ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED)) {
-        val miniBatchLatency = getMillisecondFromConfigDuration(config,
-          ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY)
+        val miniBatchLatency =config.getConfiguration.get(

Review comment:
       ```suggestion
           val miniBatchLatency = config.getConfiguration.get(
   ```

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowEmitStrategy.scala
##########
@@ -179,9 +178,10 @@ object WindowEmitStrategy {
 
   // It is a experimental config, will may be removed later.
   @Experimental
-  val TABLE_EXEC_EMIT_EARLY_FIRE_DELAY: ConfigOption[String] =
+  val TABLE_EXEC_EMIT_EARLY_FIRE_DELAY: ConfigOption[Duration] =
   key("table.exec.emit.early-fire.delay")
-      .noDefaultValue
+      .durationType()
+      .defaultValue(Duration.ofMillis(-1))

Review comment:
       It's not allowed to use `-1ms` for Duration type. We can use 
`.noDefaultValue()` to keep the same behavior as before. You can use 
`tableConfig.getConfiguration.getOptional(TABLE_EXEC_EMIT_EARLY_FIRE_DELAY).isPresent`
 to check whether it is set. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to