spuru9 commented on code in PR #27962:
URL: https://github.com/apache/flink/pull/27962#discussion_r3102686262


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java:
##########
@@ -662,6 +662,26 @@ public void onTimer(OnTimerContext ctx) {
         }
     }
 
+    /** Testing function for validating watermark consistency across 
same-timestamp timer callbacks. */
+    public static class ConsistentWatermarkTimersFunction extends 
AppendProcessTableFunctionBase {
+        public void eval(Context ctx, @ArgumentHint({SET_SEMANTIC_TABLE, 
REQUIRE_ON_TIME}) Row r) {
+            final TimeContext<Long> timeCtx = ctx.timeContext(Long.class);
+            collectEvalEvent(timeCtx, r);
+            if (timeCtx.time() == 0) {
+                // Register multiple named timers at the same time to validate 
that all timer
+                // callbacks see a consistent watermark, even when interrupted 
across mailbox

Review Comment:
   As mentioned in the prior copilot comment as well, Interrupted mailbox would 
happen when all 3 of the condition in AbstractStreamOperatorV2
   ```
   if (useInterruptibleTimers(runtimeContext.getJobConfiguration())
                   && areInterruptibleTimersConfigured()
                   && getTimeServiceManager().isPresent()) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to