Copilot commented on code in PR #27962:
URL: https://github.com/apache/flink/pull/27962#discussion_r3102238088
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java:
##########
@@ -1144,6 +1145,34 @@ public class ProcessTableFunctionTestPrograms {
"INSERT INTO sink SELECT * FROM f(r => TABLE t
PARTITION BY name, on_time => DESCRIPTOR(ts))")
.build();
+ public static final TableTestProgram PROCESS_CONSISTENT_WATERMARK_TIMERS =
+ TableTestProgram.of(
+ "process-consistent-watermark-timers",
+ "test that multiple named timers registered at the
same timestamp all see a consistent watermark")
+ .setupTemporarySystemFunction("f",
ConsistentWatermarkTimersFunction.class)
+ .setupTableSource(TIMED_SOURCE)
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema(KEYED_TIMED_BASE_SINK_SCHEMA)
+ .consumedValues(
+ "+I[Bob, {Processing input row
+I[Bob, 1, 1970-01-01T00:00:00Z] at time 0 watermark null},
1970-01-01T00:00:00Z]",
+ "+I[Bob, {Registering timer timerA
for 5 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
+ "+I[Bob, {Registering timer timerB
for 5 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
+ "+I[Bob, {Registering timer timerC
for 5 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
+ "+I[Alice, {Processing input row
+I[Alice, 1, 1970-01-01T00:00:00.001Z] at time 1 watermark -1},
1970-01-01T00:00:00.001Z]",
+ "+I[Bob, {Processing input row
+I[Bob, 2, 1970-01-01T00:00:00.002Z] at time 2 watermark 0},
1970-01-01T00:00:00.002Z]",
+ "+I[Bob, {Processing input row
+I[Bob, 3, 1970-01-01T00:00:00.003Z] at time 3 watermark 1},
1970-01-01T00:00:00.003Z]",
+ "+I[Bob, {Processing input row
+I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark 2},
1970-01-01T00:00:00.004Z]",
+ "+I[Bob, {Processing input row
+I[Bob, 5, 1970-01-01T00:00:00.005Z] at time 5 watermark 3},
1970-01-01T00:00:00.005Z]",
+ "+I[Bob, {Processing input row
+I[Bob, 6, 1970-01-01T00:00:00.006Z] at time 6 watermark 4},
1970-01-01T00:00:00.006Z]",
+ "+I[Bob, {Timer timerA fired at
time 5 watermark 5}, 1970-01-01T00:00:00.005Z]",
+ "+I[Bob, {Timer timerB fired at
time 5 watermark 5}, 1970-01-01T00:00:00.005Z]",
+ "+I[Bob, {Timer timerC fired at
time 5 watermark 5}, 1970-01-01T00:00:00.005Z]")
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM f(r => TABLE t
PARTITION BY name, on_time => DESCRIPTOR(ts))")
+ .build();
Review Comment:
The new semantic test program doesn't appear to enable unaligned checkpoints
/ interruptible timers via configuration, so it likely won't exercise the
MailboxWatermarkProcessor path that this PR is introducing for PTFs. Consider
adding test setup config to explicitly enable
`execution.checkpointing.unaligned` and
`execution.checkpointing.unaligned.interruptible-timers.enabled` (and any other
required checkpointing knobs) so the test will fail if interruptible-timer
watermark consistency regresses.
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java:
##########
@@ -662,6 +662,26 @@ public void onTimer(OnTimerContext ctx) {
}
}
+ /** Testing function for validating watermark consistency across
same-timestamp timer callbacks. */
+ public static class ConsistentWatermarkTimersFunction extends
AppendProcessTableFunctionBase {
+ public void eval(Context ctx, @ArgumentHint({SET_SEMANTIC_TABLE,
REQUIRE_ON_TIME}) Row r) {
+ final TimeContext<Long> timeCtx = ctx.timeContext(Long.class);
+ collectEvalEvent(timeCtx, r);
+ if (timeCtx.time() == 0) {
+ // Register multiple named timers at the same time to validate
that all timer
+ // callbacks see a consistent watermark, even when interrupted
across mailbox
+ // iterations.
Review Comment:
This Javadoc/comment claims the callbacks are validated "even when
interrupted across mailbox iterations", but the function itself only registers
3 timers and the surrounding test program doesn't explicitly enable
interruptible timers. Either adjust the wording to match what the test actually
guarantees, or update the test program/environment to reliably force the
interruptible-timer mailbox path so the comment stays accurate.
```suggestion
// callbacks for that timestamp see a consistent watermark.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]