This is an automated email from the ASF dual-hosted git repository. fhueske pushed a commit to branch release-2.3 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 717ed1e815768dbfc7aea4f3f2033fb6b72a5462 Author: Fabian Hueske <[email protected]> AuthorDate: Tue Apr 21 21:28:14 2026 +0200 [FLINK-39436][table] Allow late data in PTFs (#27935) Previously, late events (rowtime <= watermark) were silently dropped before reaching PTF eval(), and timer registrations for times <= watermark were also silently dropped. This change removes both restrictions: - ProcessTableRunner: remove the early-return guard in processEval() so that late events are passed to the PTF's eval() method. - WritableInternalTimeContext: remove the watermark check in registerOnTimeInternal() so that timers can be registered for past times. Such timers fire immediately at the next watermark advance, including when registered from within onTimer(). The previous guard also had an unintended side effect: any call to replaceNamedTimer() with a past time would delete the existing timer entry but then silently drop the new registration, leaving the named timer in a state where it appeared un-registered but the old timer was gone. - AbstractProcessTableOperator: remove the fired named timer's state entry before invoking onTimer() to prevent stale entries from accumulating in the named timers map state. Tests are updated to reflect the new semantics: - PROCESS_LATE_EVENTS: demonstrates that late events enter eval(), can register timers (including for past times), and that such timers fire immediately at the next watermark advance. - PROCESS_ROW_LATE_EVENTS (new): verifies the same for row-semantics PTFs. - PROCESS_OPTIONAL_ON_TIME / PROCESS_NAMED_TIMERS: updated to reflect that timer registrations for past times are no longer dropped. Previously, once the watermark passed the registered time, the timer was silently discarded; now it fires immediately. --- docs/content.zh/docs/dev/table/functions/ptfs.md | 12 + docs/content/docs/dev/table/functions/ptfs.md | 12 + .../table/functions/ProcessTableFunction.java | 25 ++ .../stream/ProcessTableFunctionRestoreTests.java | 3 +- .../stream/ProcessTableFunctionSemanticTests.java | 1 + .../stream/ProcessTableFunctionTestPrograms.java | 126 ++++++-- .../exec/stream/ProcessTableFunctionTestUtils.java | 26 +- .../plan/process-late-events-restore.json | 320 +++++++++++++++++++++ .../savepoint/_metadata | Bin 0 -> 15523 bytes .../runtime/generated/ProcessTableRunner.java | 4 - .../process/AbstractProcessTableOperator.java | 9 +- .../process/WritableInternalTimeContext.java | 5 - 12 files changed, 500 insertions(+), 43 deletions(-) diff --git a/docs/content.zh/docs/dev/table/functions/ptfs.md b/docs/content.zh/docs/dev/table/functions/ptfs.md index 9a077a6473c..fc89ab76f6c 100644 --- a/docs/content.zh/docs/dev/table/functions/ptfs.md +++ b/docs/content.zh/docs/dev/table/functions/ptfs.md @@ -1021,6 +1021,18 @@ class TimerFunction extends ProcessTableFunction<String> { {{< /tab >}} {{< /tabs >}} +### Handling of Late Records + +A late record is a record with a time attribute value that is less than or equal to the current +watermark. PTFs handle late records just like non-late records by calling the `eval()` method. If +the `on_time` argument is specified, the late timestamp is preserved in the output. This behavior is +the same for PTFs with row and set semantics. + +Registering a timer for a time that is less than or equal to the current watermark is allowed. +If registered from within `eval()`, the timer fires on the next watermark advance. If registered +from within `onTimer()`, the timer fires immediately after the current timer finishes. Note that +unconditionally re-registering a past-time timer from within `onTimer()` causes an infinite loop. + ### Efficiency and Design Principles Registering too many timers might affect performance. An ever-growing timer state can happen diff --git a/docs/content/docs/dev/table/functions/ptfs.md b/docs/content/docs/dev/table/functions/ptfs.md index 1b919dbc0d3..0b392c56d11 100644 --- a/docs/content/docs/dev/table/functions/ptfs.md +++ b/docs/content/docs/dev/table/functions/ptfs.md @@ -1022,6 +1022,18 @@ class TimerFunction extends ProcessTableFunction<String> { {{< /tab >}} {{< /tabs >}} +### Handling of Late Records + +A late record is a record with a time attribute value that is less than or equal to the current +watermark. PTFs handle late records just like non-late records by calling the `eval()` method. If +the `on_time` argument is specified, the late timestamp is preserved in the output. This behavior is +the same for PTFs with row and set semantics. + +Registering a timer for a time that is less than or equal to the current watermark is allowed. +If registered from within `eval()`, the timer fires on the next watermark advance. If registered +from within `onTimer()`, the timer fires immediately after the current timer finishes. Note that +unconditionally re-registering a past-time timer from within `onTimer()` causes an infinite loop. + ### Efficiency and Design Principles Registering too many timers might affect performance. An ever-growing timer state can happen diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java index bec4bcc8593..19e5bdee501 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java @@ -403,6 +403,19 @@ import java.time.LocalDateTime; * } * }</pre> * + * <h2>Handling of Late Records</h2> + * + * <p>A late record is a record with a time attribute value that is less than or equal to the + * current watermark. PTFs handle late records just like non-late records by calling the {@code + * eval()} method. If the {@code on_time} argument is specified, the late timestamp is preserved in + * the output. This behavior is the same for PTFs with row and set semantics. + * + * <p>Registering a timer for a time that is less than or equal to the current watermark is allowed. + * If registered from within {@code eval()}, the timer fires on the next watermark advance. If + * registered from within {@code onTimer()}, the timer fires immediately after the current timer + * finishes. Note that unconditionally re-registering a past-time timer from within {@code + * onTimer()} causes an infinite loop. + * * <h2>Efficiency and Design Principles</h2> * * <p>Registering too many timers might affect performance. An ever-growing timer state can happen @@ -660,6 +673,12 @@ public abstract class ProcessTableFunction<T> extends UserDefinedFunction { * timer only fires if a watermark was received from all inputs and the timestamp is smaller * or equal to the minimum of all received watermarks. * + * <p>If the timestamp of the registered timer is already less than or equal to the current + * watermark, the timer fires on the next watermark advance if registered from within {@code + * eval()}, or immediately after the current timer finishes if registered from within {@code + * onTimer()}. Note that unconditionally re-registering a past-time timer from within {@code + * onTimer()} causes an infinite loop. + * * <p>Timers can be named for distinguishing them in the {@code onTimer()} method. * Registering a timer under the same name twice will replace an existing timer. * @@ -680,6 +699,12 @@ public abstract class ProcessTableFunction<T> extends UserDefinedFunction { * timer only fires if a watermark was received from all inputs and the timestamp is smaller * or equal to the minimum of all received watermarks. * + * <p>If the timestamp of the registered timer is already less than or equal to the current + * watermark, the timer fires on the next watermark advance if registered from within {@code + * eval()}, or immediately after the current timer finishes if registered from within {@code + * onTimer()}. Note that unconditionally re-registering a past-time timer from within {@code + * onTimer()} causes an infinite loop. + * * <p>Only one timer can be registered for a given time. * * <p>Note: Because only PTFs taking set semantic tables support state, and timers are a diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionRestoreTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionRestoreTests.java index d95c4a7dd73..03ef42de0e1 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionRestoreTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionRestoreTests.java @@ -40,6 +40,7 @@ public class ProcessTableFunctionRestoreTests extends RestoreTestBase { ProcessTableFunctionTestPrograms.PROCESS_MAP_STATE_RESTORE, ProcessTableFunctionTestPrograms.PROCESS_UPDATING_OUTPUT_UPSERT_RESTORE, ProcessTableFunctionTestPrograms.PROCESS_ORDER_BY_RESTORE, - ProcessTableFunctionTestPrograms.PROCESS_MULTI_INPUT_ORDER_BY_RESTORE); + ProcessTableFunctionTestPrograms.PROCESS_MULTI_INPUT_ORDER_BY_RESTORE, + ProcessTableFunctionTestPrograms.PROCESS_LATE_EVENTS_RESTORE); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java index 7903b93bc5d..aa547037a13 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java @@ -87,6 +87,7 @@ public class ProcessTableFunctionSemanticTests extends SemanticTestBase { ProcessTableFunctionTestPrograms.PROCESS_NAMED_TIMERS, ProcessTableFunctionTestPrograms.PROCESS_UNNAMED_TIMERS, ProcessTableFunctionTestPrograms.PROCESS_LATE_EVENTS, + ProcessTableFunctionTestPrograms.PROCESS_ROW_LATE_EVENTS, ProcessTableFunctionTestPrograms.PROCESS_SCALAR_ARGS_TIME, ProcessTableFunctionTestPrograms.PROCESS_OPTIONAL_PARTITION_BY_TIME, ProcessTableFunctionTestPrograms.PROCESS_OPTIONAL_ON_TIME, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java index 7248ff1755a..062de5c2416 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java @@ -50,6 +50,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctio import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.PojoStateFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.PojoStateTimeFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.PojoWithDefaultStateFunction; +import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.RequiredTimeFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.RowSemanticTableFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.RowSemanticTablePassThroughFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ScalarArgsFunction; @@ -1075,8 +1076,8 @@ public class ProcessTableFunctionTestPrograms { "+I[Bob, {Processing input row +I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark null}, 1970-01-01T00:00:00.004Z]", "+I[Bob, {Processing input row +I[Bob, 5, 1970-01-01T00:00:00.005Z] at time 5 watermark null}, 1970-01-01T00:00:00.005Z]", "+I[Bob, {Processing input row +I[Bob, 6, 1970-01-01T00:00:00.006Z] at time 6 watermark null}, 1970-01-01T00:00:00.006Z]", - "+I[Bob, {Timer timeout2 fired at time 2 watermark 9223372036854775807}, 1970-01-01T00:00:00.002Z]", - "+I[Bob, {Clearing all timers at time 2 watermark 9223372036854775807}, 1970-01-01T00:00:00.002Z]") + "+I[Bob, {Timer timeout2 fired at time 5 watermark 9223372036854775807}, 1970-01-01T00:00:00.005Z]", + "+I[Bob, {Clearing all timers at time 5 watermark 9223372036854775807}, 1970-01-01T00:00:00.005Z]") .build()) .runSql( "INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name, on_time => DESCRIPTOR(ts))") @@ -1115,7 +1116,7 @@ public class ProcessTableFunctionTestPrograms { public static final TableTestProgram PROCESS_LATE_EVENTS = TableTestProgram.of( "process-late-events", - "test that late events are dropped in both input and when registering timers") + "test that late events enter PTF (eval and timer registration)") .setupTemporarySystemFunction("f", LateTimersFunction.class) .setupTableSource(TIMED_SOURCE_LATE_EVENTS) .setupTableSink( @@ -1123,27 +1124,102 @@ public class ProcessTableFunctionTestPrograms { .addSchema(KEYED_TIMED_BASE_SINK_SCHEMA) .consumedValues( "+I[Bob, {Processing input row +I[Bob, 1, 1970-01-01T00:00:00Z] at time 0 watermark null}, 1970-01-01T00:00:00Z]", - "+I[Bob, {Registering timer t for 0 at time 0 watermark null}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Registering timer bob for 0 at time 0 watermark null}, 1970-01-01T00:00:00Z]", "+I[Bob, {Registering timer for 0 at time 0 watermark null}, 1970-01-01T00:00:00Z]", "+I[Alice, {Processing input row +I[Alice, 1, 1970-01-01T00:00:00.001Z] at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]", - "+I[Alice, {Registering timer t for 0 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]", - "+I[Alice, {Registering timer for 0 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]", + "+I[Alice, {Registering timer alice for 1 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]", "+I[Bob, {Timer null fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Bob, {Registering timer again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Alice, {Timer null fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Alice, {Registering timer again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Bob, {Timer t fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Bob, {Registering timer again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Alice, {Timer t fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Alice, {Registering timer again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Timer bob fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]", "+I[Bob, {Processing input row +I[Bob, 2, 1970-01-01T00:01:39.999Z] at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]", - "+I[Bob, {Registering timer t for 0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]", - "+I[Bob, {Registering timer for 0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]") + "+I[Bob, {Registering timer bob for 0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]", + "+I[Bob, {Registering timer for 0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]", + "+I[Bob, {Timer null fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Timer bob fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]", + "+I[Alice, {Timer alice fired at time 1 watermark 99998}, 1970-01-01T00:00:00.001Z]", + "+I[Alice, {Registering timer alice-again for 0 at time 1 watermark 99998}, 1970-01-01T00:00:00.001Z]", + "+I[Alice, {Timer alice-again fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Processing input row +I[Bob, 3, 1970-01-01T00:00:00.003Z] at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]", + "+I[Bob, {Registering timer bob for 0 at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]", + "+I[Bob, {Registering timer for 0 at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]", + "+I[Bob, {Processing input row +I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]", + "+I[Bob, {Registering timer bob for 0 at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]", + "+I[Bob, {Registering timer for 0 at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]", + "+I[Bob, {Timer null fired at time 0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Timer bob fired at time 0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]") .build()) .runSql( "INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name, on_time => DESCRIPTOR(ts))") .build(); + public static final TableTestProgram PROCESS_LATE_EVENTS_RESTORE = + TableTestProgram.of( + "process-late-events-restore", + "test that late events and their past-time timers work correctly after restore") + .setupTemporarySystemFunction("f", LateTimersFunction.class) + .setupTableSource( + SourceTestStep.newBuilder("t") + .addSchema(TIMED_SOURCE_SCHEMA) + .producedBeforeRestore( + Row.of("Bob", 1, Instant.ofEpochMilli(0)), + Row.of("Alice", 1, Instant.ofEpochMilli(1))) + .producedAfterRestore( + Row.of("Bob", 2, Instant.ofEpochMilli(99999)), + Row.of("Bob", 3, Instant.ofEpochMilli(3)), + Row.of("Bob", 4, Instant.ofEpochMilli(4))) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema(KEYED_TIMED_BASE_SINK_SCHEMA) + .consumedBeforeRestore( + "+I[Bob, {Processing input row +I[Bob, 1, 1970-01-01T00:00:00Z] at time 0 watermark null}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Registering timer bob for 0 at time 0 watermark null}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Registering timer for 0 at time 0 watermark null}, 1970-01-01T00:00:00Z]", + "+I[Alice, {Processing input row +I[Alice, 1, 1970-01-01T00:00:00.001Z] at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]", + "+I[Alice, {Registering timer alice for 1 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]", + "+I[Bob, {Timer null fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Timer bob fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]") + .consumedAfterRestore( + "+I[Bob, {Processing input row +I[Bob, 2, 1970-01-01T00:01:39.999Z] at time 99999 watermark null}, 1970-01-01T00:01:39.999Z]", + "+I[Bob, {Registering timer bob for 0 at time 99999 watermark null}, 1970-01-01T00:01:39.999Z]", + "+I[Bob, {Registering timer for 0 at time 99999 watermark null}, 1970-01-01T00:01:39.999Z]", + "+I[Bob, {Timer null fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Timer bob fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]", + "+I[Alice, {Timer alice fired at time 1 watermark 99998}, 1970-01-01T00:00:00.001Z]", + "+I[Alice, {Registering timer alice-again for 0 at time 1 watermark 99998}, 1970-01-01T00:00:00.001Z]", + "+I[Alice, {Timer alice-again fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Processing input row +I[Bob, 3, 1970-01-01T00:00:00.003Z] at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]", + "+I[Bob, {Registering timer bob for 0 at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]", + "+I[Bob, {Registering timer for 0 at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]", + "+I[Bob, {Processing input row +I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]", + "+I[Bob, {Registering timer bob for 0 at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]", + "+I[Bob, {Registering timer for 0 at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]", + "+I[Bob, {Timer null fired at time 0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Timer bob fired at time 0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name, on_time => DESCRIPTOR(ts))") + .build(); + + public static final TableTestProgram PROCESS_ROW_LATE_EVENTS = + TableTestProgram.of( + "process-row-late-events", + "test that late events enter a PTF with row semantics") + .setupTemporarySystemFunction("f", RequiredTimeFunction.class) + .setupTableSource(TIMED_SOURCE_LATE_EVENTS) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema(TIMED_BASE_SINK_SCHEMA) + .consumedValues( + "+I[{+I[Bob, 1, 1970-01-01T00:00:00Z]}, 1970-01-01T00:00:00Z]", + "+I[{+I[Alice, 1, 1970-01-01T00:00:00.001Z]}, 1970-01-01T00:00:00.001Z]", + "+I[{+I[Bob, 2, 1970-01-01T00:01:39.999Z]}, 1970-01-01T00:01:39.999Z]", + "+I[{+I[Bob, 3, 1970-01-01T00:00:00.003Z]}, 1970-01-01T00:00:00.003Z]", + "+I[{+I[Bob, 4, 1970-01-01T00:00:00.004Z]}, 1970-01-01T00:00:00.004Z]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM f(r => TABLE t, on_time => DESCRIPTOR(ts))") + .build(); + public static final TableTestProgram PROCESS_SCALAR_ARGS_TIME = TableTestProgram.of( "process-scalar-args-time", @@ -1187,7 +1263,7 @@ public class ProcessTableFunctionTestPrograms { public static final TableTestProgram PROCESS_OPTIONAL_ON_TIME = TableTestProgram.of( "process-optional-on-time", - "test optional time attribute, fire once for constant timer") + "test optional time attribute, re-fires timer for constant timer registration after time passed") .setupTemporarySystemFunction("f", OptionalOnTimeFunction.class) .setupTableSource(TIMED_SOURCE) .setupTableSink( @@ -1195,21 +1271,25 @@ public class ProcessTableFunctionTestPrograms { .addSchema(KEYED_BASE_SINK_SCHEMA) .consumedValues( "+I[Bob, {Processing input row +I[Bob, 1, 1970-01-01T00:00:00Z] at time null watermark null}]", - "+I[Bob, {Registering timer t for 2 at time null watermark null}]", + "+I[Bob, {Registering timer t for 1 at time null watermark null}]", "+I[Alice, {Processing input row +I[Alice, 1, 1970-01-01T00:00:00.001Z] at time null watermark -1}]", - "+I[Alice, {Registering timer t for 2 at time null watermark -1}]", + "+I[Alice, {Registering timer t for 1 at time null watermark -1}]", "+I[Bob, {Processing input row +I[Bob, 2, 1970-01-01T00:00:00.002Z] at time null watermark 0}]", - "+I[Bob, {Registering timer t for 2 at time null watermark 0}]", + "+I[Bob, {Registering timer t for 1 at time null watermark 0}]", + "+I[Alice, {Timer t fired at time 1 watermark 1}]", + "+I[Bob, {Timer t fired at time 1 watermark 1}]", "+I[Bob, {Processing input row +I[Bob, 3, 1970-01-01T00:00:00.003Z] at time null watermark 1}]", "+I[Bob, {Registering timer t for 2 at time null watermark 1}]", - "+I[Alice, {Timer t fired at time 2 watermark 2}]", "+I[Bob, {Timer t fired at time 2 watermark 2}]", "+I[Bob, {Processing input row +I[Bob, 4, 1970-01-01T00:00:00.004Z] at time null watermark 2}]", - "+I[Bob, {Registering timer t for 2 at time null watermark 2}]", + "+I[Bob, {Registering timer t for 3 at time null watermark 2}]", + "+I[Bob, {Timer t fired at time 3 watermark 3}]", "+I[Bob, {Processing input row +I[Bob, 5, 1970-01-01T00:00:00.005Z] at time null watermark 3}]", - "+I[Bob, {Registering timer t for 2 at time null watermark 3}]", + "+I[Bob, {Registering timer t for 4 at time null watermark 3}]", + "+I[Bob, {Timer t fired at time 4 watermark 4}]", "+I[Bob, {Processing input row +I[Bob, 6, 1970-01-01T00:00:00.006Z] at time null watermark 4}]", - "+I[Bob, {Registering timer t for 2 at time null watermark 4}]") + "+I[Bob, {Registering timer t for 5 at time null watermark 4}]", + "+I[Bob, {Timer t fired at time 5 watermark 5}]") .build()) .runSql("INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name)") .build(); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java index be6902a0b17..495293c9244 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java @@ -645,20 +645,23 @@ public class ProcessTableFunctionTestUtils { public void eval(Context ctx, @ArgumentHint({SET_SEMANTIC_TABLE, REQUIRE_ON_TIME}) Row r) { final TimeContext<Long> timeCtx = ctx.timeContext(Long.class); collectEvalEvent(timeCtx, r); - // all timers should be executed once - if (timeCtx.time() == 99998) { - // will never be fired because it's late - collectCreateTimer(timeCtx, "late", 1L); + if (r.getFieldAs("name").equals("Bob")) { + // Bob registers timers at 0 + collectCreateTimer(timeCtx, "bob", 0L); + collectCreateTimer(timeCtx, 0L); + } else { + // Alice registers timer at 1 + collectCreateTimer(timeCtx, "alice", 1L); } - collectCreateTimer(timeCtx, "t", 0L); - collectCreateTimer(timeCtx, 0L); } public void onTimer(OnTimerContext ctx) { final TimeContext<Long> timeCtx = ctx.timeContext(Long.class); collectOnTimerEvent(ctx); - // will never be fired because it's late - collectCreateTimer(timeCtx, "again", timeCtx.time()); + if (ctx.currentTimer() != null && ctx.currentTimer().equals("alice")) { + // register a timer in the past, which should fire immediately + collectCreateTimer(timeCtx, "alice-again", 0); + } } } @@ -699,7 +702,12 @@ public class ProcessTableFunctionTestUtils { public void eval(Context ctx, @ArgumentHint(SET_SEMANTIC_TABLE) Row r) { final TimeContext<Long> timeCtx = ctx.timeContext(Long.class); collectEvalEvent(timeCtx, r); - collectCreateTimer(timeCtx, "t", 2); + Long wm = timeCtx.currentWatermark(); + // Register at wm+1 to always target the immediate next watermark: the timer fires + // exactly once per watermark advance, and each new row re-registers the timer for the + // following watermark step, demonstrating repeated timer re-registration. + long timer = wm == null || wm < 0 ? 1 : wm + 1; + collectCreateTimer(timeCtx, "t", timer); } public void onTimer(OnTimerContext ctx) { diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/plan/process-late-events-restore.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/plan/process-late-events-restore.json new file mode 100644 index 00000000000..78ec15a7a7e --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/plan/process-late-events-restore.json @@ -0,0 +1,320 @@ +{ + "flinkVersion" : "2.3", + "nodes" : [ { + "id" : 1, + "type" : "stream-exec-table-source-scan_2", + "scanTableSource" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`t`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "score", + "dataType" : "INT" + }, { + "name" : "ts", + "dataType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ], + "watermarkSpecs" : [ { + "rowtimeAttribute" : "ts", + "expression" : { + "rexNode" : { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + }, { + "kind" : "LITERAL", + "value" : "1", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + }, + "serializableString" : "`ts` - INTERVAL '0.001' SECOND" + } + } ] + } + } + }, + "abilities" : [ { + "type" : "WatermarkPushDown", + "watermarkExpr" : { + "kind" : "CALL", + "syntax" : "BINARY", + "internalName" : "$-$1", + "operands" : [ { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + }, { + "kind" : "LITERAL", + "value" : "1", + "type" : "INTERVAL SECOND(6) NOT NULL" + } ], + "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + }, + "rowtimeExpr" : { + "kind" : "INPUT_REF", + "inputIndex" : 2, + "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + }, + "idleTimeoutMillis" : -1, + "producedType" : { + "type" : "ROW", + "nullable" : false, + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "score", + "fieldType" : "INT" + }, { + "name" : "ts", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "watermarkParams" : { + "emitStrategy" : "ON_EVENT", + "alignGroupName" : null, + "alignMaxDrift" : "PT0S", + "alignUpdateInterval" : "PT1S", + "sourceIdleTimeout" : -1 + } + } ] + }, + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "score", + "fieldType" : "INT" + }, { + "name" : "ts", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "TableSourceScan(table=[[default_catalog, default_database, t, watermark=[-(ts, 1:INTERVAL SECOND)], watermarkEmitStrategy=[on-event]]], fields=[name, score, ts])" + }, { + "id" : 2, + "type" : "stream-exec-exchange_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "HASH", + "keys" : [ 0 ] + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "score", + "fieldType" : "INT" + }, { + "name" : "ts", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Exchange(distribution=[hash[name]])" + }, { + "id" : 3, + "type" : "stream-exec-process-table-function_1", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "out", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION BY($0), DESCRIPTOR(_UTF-16LE'ts'), DEFAULT())], uid=[f], select=[name,out,rowtime], rowType=[RecordType(VARCHAR(2147483647) name, VARCHAR(2147483647) out, TIMESTAMP_LTZ(3) *ROWTIME* rowtime)])", + "uid" : "f", + "functionCall" : { + "kind" : "CALL", + "systemName" : "f", + "operands" : [ { + "kind" : "TABLE_ARG_CALL", + "inputIndex" : 0, + "partitionKeys" : [ 0 ], + "orderKeys" : [ ], + "orderDirections" : [ ], + "type" : { + "type" : "ROW", + "nullable" : false, + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "score", + "fieldType" : "INT" + }, { + "name" : "ts", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + } + }, { + "kind" : "CALL", + "internalName" : "$DESCRIPTOR$1", + "operands" : [ { + "kind" : "LITERAL", + "value" : "ts", + "type" : "CHAR(2) NOT NULL" + } ], + "type" : "DESCRIPTOR NOT NULL" + }, { + "kind" : "CALL", + "syntax" : "SPECIAL", + "internalName" : "$DEFAULT$1", + "type" : "VARCHAR(2147483647)" + } ], + "type" : { + "type" : "ROW", + "nullable" : false, + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "out", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + } + }, + "inputChangelogModes" : [ [ "INSERT" ] ], + "outputChangelogMode" : [ "INSERT" ] + }, { + "id" : 4, + "type" : "stream-exec-sink_2", + "configuration" : { + "table.exec.sink.keyed-shuffle" : "AUTO", + "table.exec.sink.not-null-enforcer" : "ERROR", + "table.exec.sink.rowtime-inserter" : "ENABLED", + "table.exec.sink.type-length-enforcer" : "IGNORE", + "table.exec.sink.upsert-materialize" : "AUTO" + }, + "dynamicTableSink" : { + "table" : { + "identifier" : "`default_catalog`.`default_database`.`sink`", + "resolvedTable" : { + "schema" : { + "columns" : [ { + "name" : "name", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "out", + "dataType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "dataType" : "TIMESTAMP(3) WITH LOCAL TIME ZONE" + } ] + } + } + } + }, + "inputChangelogMode" : [ "INSERT" ], + "upsertMaterializeStrategy" : "ADAPTIVE", + "inputProperties" : [ { + "requiredDistribution" : { + "type" : "UNKNOWN" + }, + "damBehavior" : "PIPELINED", + "priority" : 0 + } ], + "outputType" : { + "type" : "ROW", + "fields" : [ { + "name" : "name", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "out", + "fieldType" : "VARCHAR(2147483647)" + }, { + "name" : "rowtime", + "fieldType" : { + "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", + "nullable" : false, + "precision" : 3, + "kind" : "ROWTIME" + } + } ] + }, + "description" : "Sink(table=[default_catalog.default_database.sink], fields=[name, out, rowtime])" + } ], + "edges" : [ { + "source" : 1, + "target" : 2, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 2, + "target" : 3, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + }, { + "source" : 3, + "target" : 4, + "shuffle" : { + "type" : "FORWARD" + }, + "shuffleMode" : "PIPELINED" + } ] +} \ No newline at end of file diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/savepoint/_metadata new file mode 100644 index 00000000000..90af4c35308 Binary files /dev/null and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/savepoint/_metadata differ diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java index 318dd4ae0d3..b90a201649e 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java @@ -160,10 +160,6 @@ public abstract class ProcessTableRunner extends AbstractRichFunction { } public void processEval() throws Exception { - // Drop late events - if (rowtime != null && rowtime <= currentWatermark) { - return; - } processMethod(this::callEval); } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java index 93eee5e8e4f..cf3a6149af6 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java @@ -155,9 +155,16 @@ public abstract class AbstractProcessTableOperator extends AbstractStreamOperato @Override public void onEventTime(InternalTimer<RowData, Object> timer) throws Exception { final Object namedTimer = timer.getNamespace(); + boolean isNamedTimer = namedTimer != VoidNamespace.INSTANCE; + // Remove the fired timer's state entry immediately to prevent stale entries from + // accumulating. Without this, entries for fired timers would persist until the same + // timer name is re-registered or the state is explicitly cleared. + if (isNamedTimer) { + namedTimersMapState.remove((StringData) namedTimer); + } processTableRunner.ingestTimerEvent( timer.getKey(), - namedTimer == VoidNamespace.INSTANCE ? null : (StringData) namedTimer, + isNamedTimer ? (StringData) namedTimer : null, timer.getTimestamp()); processTableRunner.processOnTimer(); } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java index 5fd996fecaa..ce97c630c80 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java @@ -87,11 +87,6 @@ class WritableInternalTimeContext extends ReadableInternalTimeContext { } private void registerOnTimeInternal(@Nullable String name, long newTime) { - if (newTime <= unnamedTimerService.currentWatermark()) { - // Do not register timers for late events. - // Otherwise, the next watermark would trigger an onTimer() that emits late events. - return; - } if (name != null) { replaceNamedTimer(StringData.fromString(name), newTime); } else {
