fhueske commented on code in PR #27962:
URL: https://github.com/apache/flink/pull/27962#discussion_r3116891742


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java:
##########
@@ -145,11 +146,17 @@ public void open() throws Exception {
         FunctionUtils.openFunction(processTableRunner, 
DefaultOpenContext.INSTANCE);
     }
 
+    @Override
+    public final boolean useInterruptibleTimers(ReadableConfig config) {
+        return true;
+    }
+
     @Override
     public void processWatermark(Watermark mark) throws Exception {
-        super.processWatermark(mark);
-        // TODO this line has issues with interruptible timers, see FLINK-39437
+        // Update the runner's watermark before firing timers to keep it 
consistent with the
+        // timer service watermark, which is also advanced before any timer 
fires.
         processTableRunner.ingestCurrentWatermarkEvent(mark.getTimestamp());

Review Comment:
   Before processing any timers, `MailboxPartialWatermarkProcessor`, 
`MailboxWatermarkProcessor` and `InternalTimerServiceImpl` update their 
internal watermarks to the new watermark. So the operators internal state is 
already at the new watermark before all timers have been processed. Only the WM 
emission is delayed until all timers are handled.
   
   I think this also makes sense, also in the context of late data handling, 
because the operator has already received the WM (all following records with a 
ts < wm are de-facto late) and started to operate on it. The first triggered 
timers might already cleanup state and/or emit data. So even before all timers 
have been processed, data can (and IMO should) be treated as late. 
   Imagine a case with two timers for key A and B. A has been fired and cleaned 
up A's state. Then there's an interrupt with a late record for key A. A's data 
is gone so the record cannot be processed correctly. Receiving a late record 
for B might be OK because the data is still present, but it is also late.
   
   Flipping the calls here just makes it more obvious that the internal WM 
state is updated before the timers are called. Even with interrupted timers, 
the `ingestCurrentWatermarkEvent()` call would happen before an intermediate 
records is processed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to