Au-Miner commented on code in PR #27962:
URL: https://github.com/apache/flink/pull/27962#discussion_r3110972369
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java:
##########
@@ -1144,6 +1145,34 @@ public class ProcessTableFunctionTestPrograms {
"INSERT INTO sink SELECT * FROM f(r => TABLE t
PARTITION BY name, on_time => DESCRIPTOR(ts))")
.build();
+ public static final TableTestProgram PROCESS_CONSISTENT_WATERMARK_TIMERS =
+ TableTestProgram.of(
+ "process-consistent-watermark-timers",
+ "test that multiple named timers registered at the
same timestamp all see a consistent watermark")
+ .setupTemporarySystemFunction("f",
ConsistentWatermarkTimersFunction.class)
+ .setupTableSource(TIMED_SOURCE)
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema(KEYED_TIMED_BASE_SINK_SCHEMA)
+ .consumedValues(
+ "+I[Bob, {Processing input row
+I[Bob, 1, 1970-01-01T00:00:00Z] at time 0 watermark null},
1970-01-01T00:00:00Z]",
+ "+I[Bob, {Registering timer timerA
for 5 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
+ "+I[Bob, {Registering timer timerB
for 5 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
+ "+I[Bob, {Registering timer timerC
for 5 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
+ "+I[Alice, {Processing input row
+I[Alice, 1, 1970-01-01T00:00:00.001Z] at time 1 watermark -1},
1970-01-01T00:00:00.001Z]",
+ "+I[Bob, {Processing input row
+I[Bob, 2, 1970-01-01T00:00:00.002Z] at time 2 watermark 0},
1970-01-01T00:00:00.002Z]",
+ "+I[Bob, {Processing input row
+I[Bob, 3, 1970-01-01T00:00:00.003Z] at time 3 watermark 1},
1970-01-01T00:00:00.003Z]",
+ "+I[Bob, {Processing input row
+I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark 2},
1970-01-01T00:00:00.004Z]",
+ "+I[Bob, {Processing input row
+I[Bob, 5, 1970-01-01T00:00:00.005Z] at time 5 watermark 3},
1970-01-01T00:00:00.005Z]",
+ "+I[Bob, {Processing input row
+I[Bob, 6, 1970-01-01T00:00:00.006Z] at time 6 watermark 4},
1970-01-01T00:00:00.006Z]",
+ "+I[Bob, {Timer timerA fired at
time 5 watermark 5}, 1970-01-01T00:00:00.005Z]",
+ "+I[Bob, {Timer timerB fired at
time 5 watermark 5}, 1970-01-01T00:00:00.005Z]",
+ "+I[Bob, {Timer timerC fired at
time 5 watermark 5}, 1970-01-01T00:00:00.005Z]")
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM f(r => TABLE t
PARTITION BY name, on_time => DESCRIPTOR(ts))")
+ .build();
Review Comment:
Perhaps we need to create a dedicated HarnessTest to test whether
interruptions are still correct
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]