1996fanrui commented on code in PR #24941:
URL: https://github.com/apache/flink/pull/24941#discussion_r1642312751


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java:
##########
@@ -190,4 +217,35 @@ public void close() throws Exception {
         FunctionUtils.closeFunction(watermarkGenerator);
         super.close();
     }
+
+    private boolean isIdlenessEnabled() {
+        return idleTimeout > 0;
+    }
+
+    @VisibleForTesting
+    static long calculateProcessingTimeTimerInterval(long watermarkInterval, 
long idleTimeout) {
+        if (watermarkInterval <= 0) {
+            return idleTimeout;
+        }
+        if (idleTimeout <= 0) {
+            return watermarkInterval;
+        }

Review Comment:
   Could we add a `checkArgument(watermarkInterval > 0 || idleTimeout > 0)` in 
the beginning of this method?
   
   IIUC, we expect the `calculateProcessingTimeTimerInterval` method is only 
called when `watermarkInterval > 0 || idleTimeout > 0`, and the timerInterval 
should be > 0. If both of them are 0, the timerInterval will be 0 (It's 
unexpected).
   
   Currently, `calculateProcessingTimeTimerInterval` is called by only one 
caller in this PR. And caller has already checked `watermarkInterval > 0 || 
idleTimeout > 0`, but I'm afraid other callers call 
`calculateProcessingTimeTimerInterval` in the future, and they doesn't check 
`watermarkInterval > 0 || idleTimeout > 0`.
   
   It can prevent unexpected call in the future if we add a 
`checkArgument(watermarkInterval > 0 || idleTimeout > 0)` in the beginning of 
this method. 
   
   WDYT?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java:
##########
@@ -136,19 +154,28 @@ private void advanceWatermark() {
 
     @Override
     public void onProcessingTime(long timestamp) throws Exception {
-        advanceWatermark();
+        // timestamp and now can be off in case TM is heavily overloaded.
+        long now = getProcessingTimeService().getCurrentProcessingTime();
 
-        if (idleTimeout > 0 && currentStatus.equals(WatermarkStatus.ACTIVE)) {
-            final long currentTime = 
getProcessingTimeService().getCurrentProcessingTime();
-            if (currentTime - lastRecordTime > idleTimeout) {
-                // mark the channel as idle to ignore watermarks from this 
channel
-                emitWatermarkStatus(WatermarkStatus.IDLE);
-            }
+        if (watermarkInterval > 0
+                && lastWatermarkPeriodicEmitTime + watermarkInterval <= 
timestamp) {
+            lastWatermarkPeriodicEmitTime = now;
+            advanceWatermark();
+        }
+        if (processedElements != lastIdleCheckProcessedElements) {
+            idleSince = now;
+        }
+        lastIdleCheckProcessedElements = processedElements;

Review Comment:
   ```suggestion
           if (processedElements != lastIdleCheckProcessedElements) {
               idleSince = now;
               lastIdleCheckProcessedElements = processedElements;
           }
   ```



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java:
##########
@@ -52,12 +53,22 @@ public class WatermarkAssignerOperator extends 
AbstractStreamOperator<RowData>
 
     private transient long watermarkInterval;
 
+    private transient long timerInterval;
+
     private transient long currentWatermark;
 
-    private transient long lastRecordTime;
+    // Last time watermark have been (periodically) emitted
+    private transient long lastWatermarkPeriodicEmitTime;
+
+    // Last time idleness status has been checked
+    private transient long idleSince;

Review Comment:
   The comment is fine, but this name is a bit ambiguous to me.
   
   From the literal meaning, `idleSince` means when this operator is marked to 
IDLE. But it is the approximate time when the latest data was processed.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java:
##########
@@ -100,11 +114,15 @@ public void open() throws Exception {
 
     @Override
     public void processElement(StreamRecord<RowData> element) throws Exception 
{
-        if (idleTimeout > 0 && currentStatus.equals(WatermarkStatus.IDLE)) {
-            // mark the channel active
-            emitWatermarkStatus(WatermarkStatus.ACTIVE);
-            lastRecordTime = 
getProcessingTimeService().getCurrentProcessingTime();
+        processedElements++;
+
+        if (isIdlenessEnabled() && currentStatus.equals(WatermarkStatus.IDLE)) 
{
+            if (currentStatus.equals(WatermarkStatus.IDLE)) {
+                // mark the channel active
+                emitWatermarkStatus(WatermarkStatus.ACTIVE);
+            }
         }

Review Comment:
   ```suggestion
           if (isIdlenessEnabled() && 
currentStatus.equals(WatermarkStatus.IDLE)) {
               // mark the channel active
               emitWatermarkStatus(WatermarkStatus.ACTIVE);
           }
   ```
   
   It looks like the second `if` condition is not needed, the first condition 
already checked  `currentStatus.equals(WatermarkStatus.IDLE)`.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java:
##########
@@ -136,19 +154,28 @@ private void advanceWatermark() {
 
     @Override
     public void onProcessingTime(long timestamp) throws Exception {
-        advanceWatermark();
+        // timestamp and now can be off in case TM is heavily overloaded.
+        long now = getProcessingTimeService().getCurrentProcessingTime();
 
-        if (idleTimeout > 0 && currentStatus.equals(WatermarkStatus.ACTIVE)) {
-            final long currentTime = 
getProcessingTimeService().getCurrentProcessingTime();
-            if (currentTime - lastRecordTime > idleTimeout) {
-                // mark the channel as idle to ignore watermarks from this 
channel
-                emitWatermarkStatus(WatermarkStatus.IDLE);
-            }
+        if (watermarkInterval > 0
+                && lastWatermarkPeriodicEmitTime + watermarkInterval <= 
timestamp) {

Review Comment:
   As the comment said, `timestamp and now` may be different. I'm curious, why 
using `timestamp` instead  of `now` here?
   
   If the timer is triggered later, and `lastWatermarkPeriodicEmitTime + 
watermarkInterval` may be less than `now`, but greater than `timestamp`. For 
this case, we could `advanceWatermark`, right?
   
   It means we can use `now` directly. For old code, it uses `currentTime` as 
well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to