fhueske commented on code in PR #27962:
URL: https://github.com/apache/flink/pull/27962#discussion_r3112454468


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java:
##########
@@ -1144,6 +1145,34 @@ public class ProcessTableFunctionTestPrograms {
                             "INSERT INTO sink SELECT * FROM f(r => TABLE t 
PARTITION BY name, on_time => DESCRIPTOR(ts))")
                     .build();
 
+    public static final TableTestProgram PROCESS_CONSISTENT_WATERMARK_TIMERS =
+            TableTestProgram.of(
+                            "process-consistent-watermark-timers",
+                            "test that multiple named timers registered at the 
same timestamp all see a consistent watermark")
+                    .setupTemporarySystemFunction("f", 
ConsistentWatermarkTimersFunction.class)
+                    .setupTableSource(TIMED_SOURCE)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema(KEYED_TIMED_BASE_SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[Bob, {Processing input row 
+I[Bob, 1, 1970-01-01T00:00:00Z] at time 0 watermark null}, 
1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Registering timer timerA 
for 5 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Registering timer timerB 
for 5 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Registering timer timerC 
for 5 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
+                                            "+I[Alice, {Processing input row 
+I[Alice, 1, 1970-01-01T00:00:00.001Z] at time 1 watermark -1}, 
1970-01-01T00:00:00.001Z]",
+                                            "+I[Bob, {Processing input row 
+I[Bob, 2, 1970-01-01T00:00:00.002Z] at time 2 watermark 0}, 
1970-01-01T00:00:00.002Z]",
+                                            "+I[Bob, {Processing input row 
+I[Bob, 3, 1970-01-01T00:00:00.003Z] at time 3 watermark 1}, 
1970-01-01T00:00:00.003Z]",
+                                            "+I[Bob, {Processing input row 
+I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark 2}, 
1970-01-01T00:00:00.004Z]",
+                                            "+I[Bob, {Processing input row 
+I[Bob, 5, 1970-01-01T00:00:00.005Z] at time 5 watermark 3}, 
1970-01-01T00:00:00.005Z]",
+                                            "+I[Bob, {Processing input row 
+I[Bob, 6, 1970-01-01T00:00:00.006Z] at time 6 watermark 4}, 
1970-01-01T00:00:00.006Z]",
+                                            "+I[Bob, {Timer timerA fired at 
time 5 watermark 5}, 1970-01-01T00:00:00.005Z]",
+                                            "+I[Bob, {Timer timerB fired at 
time 5 watermark 5}, 1970-01-01T00:00:00.005Z]",
+                                            "+I[Bob, {Timer timerC fired at 
time 5 watermark 5}, 1970-01-01T00:00:00.005Z]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM f(r => TABLE t 
PARTITION BY name, on_time => DESCRIPTOR(ts))")
+                    .build();

Review Comment:
   I've added `ProcessSetTableOperatorInterruptibleTimersTest` to assert the 
call order or timers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to