Copilot commented on code in PR #27962:
URL: https://github.com/apache/flink/pull/27962#discussion_r3102238088


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java:
##########
@@ -1144,6 +1145,34 @@ public class ProcessTableFunctionTestPrograms {
                             "INSERT INTO sink SELECT * FROM f(r => TABLE t 
PARTITION BY name, on_time => DESCRIPTOR(ts))")
                     .build();
 
+    public static final TableTestProgram PROCESS_CONSISTENT_WATERMARK_TIMERS =
+            TableTestProgram.of(
+                            "process-consistent-watermark-timers",
+                            "test that multiple named timers registered at the 
same timestamp all see a consistent watermark")
+                    .setupTemporarySystemFunction("f", 
ConsistentWatermarkTimersFunction.class)
+                    .setupTableSource(TIMED_SOURCE)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema(KEYED_TIMED_BASE_SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[Bob, {Processing input row 
+I[Bob, 1, 1970-01-01T00:00:00Z] at time 0 watermark null}, 
1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Registering timer timerA 
for 5 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Registering timer timerB 
for 5 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Registering timer timerC 
for 5 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
+                                            "+I[Alice, {Processing input row 
+I[Alice, 1, 1970-01-01T00:00:00.001Z] at time 1 watermark -1}, 
1970-01-01T00:00:00.001Z]",
+                                            "+I[Bob, {Processing input row 
+I[Bob, 2, 1970-01-01T00:00:00.002Z] at time 2 watermark 0}, 
1970-01-01T00:00:00.002Z]",
+                                            "+I[Bob, {Processing input row 
+I[Bob, 3, 1970-01-01T00:00:00.003Z] at time 3 watermark 1}, 
1970-01-01T00:00:00.003Z]",
+                                            "+I[Bob, {Processing input row 
+I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark 2}, 
1970-01-01T00:00:00.004Z]",
+                                            "+I[Bob, {Processing input row 
+I[Bob, 5, 1970-01-01T00:00:00.005Z] at time 5 watermark 3}, 
1970-01-01T00:00:00.005Z]",
+                                            "+I[Bob, {Processing input row 
+I[Bob, 6, 1970-01-01T00:00:00.006Z] at time 6 watermark 4}, 
1970-01-01T00:00:00.006Z]",
+                                            "+I[Bob, {Timer timerA fired at 
time 5 watermark 5}, 1970-01-01T00:00:00.005Z]",
+                                            "+I[Bob, {Timer timerB fired at 
time 5 watermark 5}, 1970-01-01T00:00:00.005Z]",
+                                            "+I[Bob, {Timer timerC fired at 
time 5 watermark 5}, 1970-01-01T00:00:00.005Z]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM f(r => TABLE t 
PARTITION BY name, on_time => DESCRIPTOR(ts))")
+                    .build();

Review Comment:
   The new semantic test program doesn't appear to enable unaligned checkpoints 
/ interruptible timers via configuration, so it likely won't exercise the 
MailboxWatermarkProcessor path that this PR is introducing for PTFs. Consider 
adding test setup config to explicitly enable 
`execution.checkpointing.unaligned` and 
`execution.checkpointing.unaligned.interruptible-timers.enabled` (and any other 
required checkpointing knobs) so the test will fail if interruptible-timer 
watermark consistency regresses.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java:
##########
@@ -662,6 +662,26 @@ public void onTimer(OnTimerContext ctx) {
         }
     }
 
+    /** Testing function for validating watermark consistency across 
same-timestamp timer callbacks. */
+    public static class ConsistentWatermarkTimersFunction extends 
AppendProcessTableFunctionBase {
+        public void eval(Context ctx, @ArgumentHint({SET_SEMANTIC_TABLE, 
REQUIRE_ON_TIME}) Row r) {
+            final TimeContext<Long> timeCtx = ctx.timeContext(Long.class);
+            collectEvalEvent(timeCtx, r);
+            if (timeCtx.time() == 0) {
+                // Register multiple named timers at the same time to validate 
that all timer
+                // callbacks see a consistent watermark, even when interrupted 
across mailbox
+                // iterations.

Review Comment:
   This Javadoc/comment claims the callbacks are validated "even when 
interrupted across mailbox iterations", but the function itself only registers 
3 timers and the surrounding test program doesn't explicitly enable 
interruptible timers. Either adjust the wording to match what the test actually 
guarantees, or update the test program/environment to reliably force the 
interruptible-timer mailbox path so the comment stays accurate.
   ```suggestion
                   // callbacks for that timestamp see a consistent watermark.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to