1996fanrui commented on code in PR #24941:
URL: https://github.com/apache/flink/pull/24941#discussion_r1642312751
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java:
##########
@@ -190,4 +217,35 @@ public void close() throws Exception {
FunctionUtils.closeFunction(watermarkGenerator);
super.close();
}
+
+ private boolean isIdlenessEnabled() {
+ return idleTimeout > 0;
+ }
+
+ @VisibleForTesting
+ static long calculateProcessingTimeTimerInterval(long watermarkInterval,
long idleTimeout) {
+ if (watermarkInterval <= 0) {
+ return idleTimeout;
+ }
+ if (idleTimeout <= 0) {
+ return watermarkInterval;
+ }
Review Comment:
Could we add a `checkArgument(watermarkInterval > 0 || idleTimeout > 0)` in
the beginning of this method?
IIUC, we expect the `calculateProcessingTimeTimerInterval` method is only
called when `watermarkInterval > 0 || idleTimeout > 0`, and the timerInterval
should be > 0. If both of them are 0, the timerInterval will be 0 (It's
unexpected).
Currently, `calculateProcessingTimeTimerInterval` is called by only one
caller in this PR. And caller has already checked `watermarkInterval > 0 ||
idleTimeout > 0`, but I'm afraid other callers call
`calculateProcessingTimeTimerInterval` in the future, and they doesn't check
`watermarkInterval > 0 || idleTimeout > 0`.
It can prevent unexpected call in the future if we add a
`checkArgument(watermarkInterval > 0 || idleTimeout > 0)` in the beginning of
this method.
WDYT?
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java:
##########
@@ -136,19 +154,28 @@ private void advanceWatermark() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
- advanceWatermark();
+ // timestamp and now can be off in case TM is heavily overloaded.
+ long now = getProcessingTimeService().getCurrentProcessingTime();
- if (idleTimeout > 0 && currentStatus.equals(WatermarkStatus.ACTIVE)) {
- final long currentTime =
getProcessingTimeService().getCurrentProcessingTime();
- if (currentTime - lastRecordTime > idleTimeout) {
- // mark the channel as idle to ignore watermarks from this
channel
- emitWatermarkStatus(WatermarkStatus.IDLE);
- }
+ if (watermarkInterval > 0
+ && lastWatermarkPeriodicEmitTime + watermarkInterval <=
timestamp) {
+ lastWatermarkPeriodicEmitTime = now;
+ advanceWatermark();
+ }
+ if (processedElements != lastIdleCheckProcessedElements) {
+ idleSince = now;
+ }
+ lastIdleCheckProcessedElements = processedElements;
Review Comment:
```suggestion
if (processedElements != lastIdleCheckProcessedElements) {
idleSince = now;
lastIdleCheckProcessedElements = processedElements;
}
```
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java:
##########
@@ -52,12 +53,22 @@ public class WatermarkAssignerOperator extends
AbstractStreamOperator<RowData>
private transient long watermarkInterval;
+ private transient long timerInterval;
+
private transient long currentWatermark;
- private transient long lastRecordTime;
+ // Last time watermark have been (periodically) emitted
+ private transient long lastWatermarkPeriodicEmitTime;
+
+ // Last time idleness status has been checked
+ private transient long idleSince;
Review Comment:
The comment is fine, but this name is a bit ambiguous to me.
From the literal meaning, `idleSince` means when this operator is marked to
IDLE. But it is the approximate time when the latest data was processed.
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java:
##########
@@ -100,11 +114,15 @@ public void open() throws Exception {
@Override
public void processElement(StreamRecord<RowData> element) throws Exception
{
- if (idleTimeout > 0 && currentStatus.equals(WatermarkStatus.IDLE)) {
- // mark the channel active
- emitWatermarkStatus(WatermarkStatus.ACTIVE);
- lastRecordTime =
getProcessingTimeService().getCurrentProcessingTime();
+ processedElements++;
+
+ if (isIdlenessEnabled() && currentStatus.equals(WatermarkStatus.IDLE))
{
+ if (currentStatus.equals(WatermarkStatus.IDLE)) {
+ // mark the channel active
+ emitWatermarkStatus(WatermarkStatus.ACTIVE);
+ }
}
Review Comment:
```suggestion
if (isIdlenessEnabled() &&
currentStatus.equals(WatermarkStatus.IDLE)) {
// mark the channel active
emitWatermarkStatus(WatermarkStatus.ACTIVE);
}
```
It looks like the second `if` condition is not needed, the first condition
already checked `currentStatus.equals(WatermarkStatus.IDLE)`.
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java:
##########
@@ -136,19 +154,28 @@ private void advanceWatermark() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
- advanceWatermark();
+ // timestamp and now can be off in case TM is heavily overloaded.
+ long now = getProcessingTimeService().getCurrentProcessingTime();
- if (idleTimeout > 0 && currentStatus.equals(WatermarkStatus.ACTIVE)) {
- final long currentTime =
getProcessingTimeService().getCurrentProcessingTime();
- if (currentTime - lastRecordTime > idleTimeout) {
- // mark the channel as idle to ignore watermarks from this
channel
- emitWatermarkStatus(WatermarkStatus.IDLE);
- }
+ if (watermarkInterval > 0
+ && lastWatermarkPeriodicEmitTime + watermarkInterval <=
timestamp) {
Review Comment:
As the comment said, `timestamp and now` may be different. I'm curious, why
using `timestamp` instead of `now` here?
If the timer is triggered later, and `lastWatermarkPeriodicEmitTime +
watermarkInterval` may be less than `now`, but greater than `timestamp`. For
this case, we could `advanceWatermark`, right?
It means we can use `now` directly. For old code, it uses `currentTime` as
well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]