fhueske commented on code in PR #27962:
URL: https://github.com/apache/flink/pull/27962#discussion_r3116891742
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java:
##########
@@ -145,11 +146,17 @@ public void open() throws Exception {
FunctionUtils.openFunction(processTableRunner,
DefaultOpenContext.INSTANCE);
}
+ @Override
+ public final boolean useInterruptibleTimers(ReadableConfig config) {
+ return true;
+ }
+
@Override
public void processWatermark(Watermark mark) throws Exception {
- super.processWatermark(mark);
- // TODO this line has issues with interruptible timers, see FLINK-39437
+ // Update the runner's watermark before firing timers to keep it
consistent with the
+ // timer service watermark, which is also advanced before any timer
fires.
processTableRunner.ingestCurrentWatermarkEvent(mark.getTimestamp());
Review Comment:
Before processing any timers, `MailboxPartialWatermarkProcessor`,
`MailboxWatermarkProcessor` and `InternalTimerServiceImpl` update their
internal watermarks to the new watermark. So the operators internal state is
already at the new watermark before all timers have been processed. Only the WM
emission is delayed until all timers are handled.
I think this also makes sense, also in the context of late data handling,
because the operator has already received the WM (all following records with a
ts < wm are de-facto late) and started to operate on it. The first triggered
timers might already cleanup state and/or emit data. So even before all timers
have been processed, data can (and IMO should) be treated as late.
Imagine a case with two timers for key A and B. A has been fired and cleaned
up A's state. Then there's an interrupt with a late record for key A. A's data
is gone so the record cannot be processed correctly. Receiving a late record
for B might be OK because the data is still present, but it is also late.
Flipping the calls here just makes it more obvious that the internal WM
state is updated before the timers are called. Even with interrupted timers,
the `ingestCurrentWatermarkEvent()` call would happen before an intermediate
records is processed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]