This is an automated email from the ASF dual-hosted git repository.
fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 93732565369 [FLINK-39436][table] Allow late data in PTFs (#27935)
93732565369 is described below
commit 937325653695881e8737816ba40b46fa0819dd21
Author: Fabian Hueske <[email protected]>
AuthorDate: Tue Apr 21 21:28:14 2026 +0200
[FLINK-39436][table] Allow late data in PTFs (#27935)
Previously, late events (rowtime <= watermark) were silently dropped
before reaching PTF eval(), and timer registrations for times <=
watermark were also silently dropped. This change removes both
restrictions:
- ProcessTableRunner: remove the early-return guard in processEval()
so that late events are passed to the PTF's eval() method.
- WritableInternalTimeContext: remove the watermark check in
registerOnTimeInternal() so that timers can be registered for past
times. Such timers fire immediately at the next watermark advance,
including when registered from within onTimer(). The previous guard
also had an unintended side effect: any call to replaceNamedTimer()
with a past time would delete the existing timer entry but then
silently drop the new registration, leaving the named timer in a
state where it appeared un-registered but the old timer was gone.
- AbstractProcessTableOperator: remove the fired named timer's state
entry before invoking onTimer() to prevent stale entries from
accumulating in the named timers map state.
Tests are updated to reflect the new semantics:
- PROCESS_LATE_EVENTS: demonstrates that late events enter eval(),
can register timers (including for past times), and that such timers
fire immediately at the next watermark advance.
- PROCESS_ROW_LATE_EVENTS (new): verifies the same for row-semantics
PTFs.
- PROCESS_OPTIONAL_ON_TIME / PROCESS_NAMED_TIMERS: updated to reflect
that timer registrations for past times are no longer dropped.
Previously, once the watermark passed the registered time, the timer
was silently discarded; now it fires immediately.
---
docs/content.zh/docs/dev/table/functions/ptfs.md | 12 +
docs/content/docs/dev/table/functions/ptfs.md | 12 +
.../table/functions/ProcessTableFunction.java | 25 ++
.../stream/ProcessTableFunctionRestoreTests.java | 3 +-
.../stream/ProcessTableFunctionSemanticTests.java | 1 +
.../stream/ProcessTableFunctionTestPrograms.java | 126 ++++++--
.../exec/stream/ProcessTableFunctionTestUtils.java | 26 +-
.../plan/process-late-events-restore.json | 320 +++++++++++++++++++++
.../savepoint/_metadata | Bin 0 -> 15523 bytes
.../runtime/generated/ProcessTableRunner.java | 4 -
.../process/AbstractProcessTableOperator.java | 9 +-
.../process/WritableInternalTimeContext.java | 5 -
12 files changed, 500 insertions(+), 43 deletions(-)
diff --git a/docs/content.zh/docs/dev/table/functions/ptfs.md
b/docs/content.zh/docs/dev/table/functions/ptfs.md
index 9a077a6473c..fc89ab76f6c 100644
--- a/docs/content.zh/docs/dev/table/functions/ptfs.md
+++ b/docs/content.zh/docs/dev/table/functions/ptfs.md
@@ -1021,6 +1021,18 @@ class TimerFunction extends ProcessTableFunction<String>
{
{{< /tab >}}
{{< /tabs >}}
+### Handling of Late Records
+
+A late record is a record with a time attribute value that is less than or
equal to the current
+watermark. PTFs handle late records just like non-late records by calling the
`eval()` method. If
+the `on_time` argument is specified, the late timestamp is preserved in the
output. This behavior is
+the same for PTFs with row and set semantics.
+
+Registering a timer for a time that is less than or equal to the current
watermark is allowed.
+If registered from within `eval()`, the timer fires on the next watermark
advance. If registered
+from within `onTimer()`, the timer fires immediately after the current timer
finishes. Note that
+unconditionally re-registering a past-time timer from within `onTimer()`
causes an infinite loop.
+
### Efficiency and Design Principles
Registering too many timers might affect performance. An ever-growing timer
state can happen
diff --git a/docs/content/docs/dev/table/functions/ptfs.md
b/docs/content/docs/dev/table/functions/ptfs.md
index 1b919dbc0d3..0b392c56d11 100644
--- a/docs/content/docs/dev/table/functions/ptfs.md
+++ b/docs/content/docs/dev/table/functions/ptfs.md
@@ -1022,6 +1022,18 @@ class TimerFunction extends ProcessTableFunction<String>
{
{{< /tab >}}
{{< /tabs >}}
+### Handling of Late Records
+
+A late record is a record with a time attribute value that is less than or
equal to the current
+watermark. PTFs handle late records just like non-late records by calling the
`eval()` method. If
+the `on_time` argument is specified, the late timestamp is preserved in the
output. This behavior is
+the same for PTFs with row and set semantics.
+
+Registering a timer for a time that is less than or equal to the current
watermark is allowed.
+If registered from within `eval()`, the timer fires on the next watermark
advance. If registered
+from within `onTimer()`, the timer fires immediately after the current timer
finishes. Note that
+unconditionally re-registering a past-time timer from within `onTimer()`
causes an infinite loop.
+
### Efficiency and Design Principles
Registering too many timers might affect performance. An ever-growing timer
state can happen
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java
index bec4bcc8593..19e5bdee501 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java
@@ -403,6 +403,19 @@ import java.time.LocalDateTime;
* }
* }</pre>
*
+ * <h2>Handling of Late Records</h2>
+ *
+ * <p>A late record is a record with a time attribute value that is less than
or equal to the
+ * current watermark. PTFs handle late records just like non-late records by
calling the {@code
+ * eval()} method. If the {@code on_time} argument is specified, the late
timestamp is preserved in
+ * the output. This behavior is the same for PTFs with row and set semantics.
+ *
+ * <p>Registering a timer for a time that is less than or equal to the current
watermark is allowed.
+ * If registered from within {@code eval()}, the timer fires on the next
watermark advance. If
+ * registered from within {@code onTimer()}, the timer fires immediately after
the current timer
+ * finishes. Note that unconditionally re-registering a past-time timer from
within {@code
+ * onTimer()} causes an infinite loop.
+ *
* <h2>Efficiency and Design Principles</h2>
*
* <p>Registering too many timers might affect performance. An ever-growing
timer state can happen
@@ -660,6 +673,12 @@ public abstract class ProcessTableFunction<T> extends
UserDefinedFunction {
* timer only fires if a watermark was received from all inputs and
the timestamp is smaller
* or equal to the minimum of all received watermarks.
*
+ * <p>If the timestamp of the registered timer is already less than or
equal to the current
+ * watermark, the timer fires on the next watermark advance if
registered from within {@code
+ * eval()}, or immediately after the current timer finishes if
registered from within {@code
+ * onTimer()}. Note that unconditionally re-registering a past-time
timer from within {@code
+ * onTimer()} causes an infinite loop.
+ *
* <p>Timers can be named for distinguishing them in the {@code
onTimer()} method.
* Registering a timer under the same name twice will replace an
existing timer.
*
@@ -680,6 +699,12 @@ public abstract class ProcessTableFunction<T> extends
UserDefinedFunction {
* timer only fires if a watermark was received from all inputs and
the timestamp is smaller
* or equal to the minimum of all received watermarks.
*
+ * <p>If the timestamp of the registered timer is already less than or
equal to the current
+ * watermark, the timer fires on the next watermark advance if
registered from within {@code
+ * eval()}, or immediately after the current timer finishes if
registered from within {@code
+ * onTimer()}. Note that unconditionally re-registering a past-time
timer from within {@code
+ * onTimer()} causes an infinite loop.
+ *
* <p>Only one timer can be registered for a given time.
*
* <p>Note: Because only PTFs taking set semantic tables support
state, and timers are a
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionRestoreTests.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionRestoreTests.java
index d95c4a7dd73..03ef42de0e1 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionRestoreTests.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionRestoreTests.java
@@ -40,6 +40,7 @@ public class ProcessTableFunctionRestoreTests extends
RestoreTestBase {
ProcessTableFunctionTestPrograms.PROCESS_MAP_STATE_RESTORE,
ProcessTableFunctionTestPrograms.PROCESS_UPDATING_OUTPUT_UPSERT_RESTORE,
ProcessTableFunctionTestPrograms.PROCESS_ORDER_BY_RESTORE,
-
ProcessTableFunctionTestPrograms.PROCESS_MULTI_INPUT_ORDER_BY_RESTORE);
+
ProcessTableFunctionTestPrograms.PROCESS_MULTI_INPUT_ORDER_BY_RESTORE,
+ ProcessTableFunctionTestPrograms.PROCESS_LATE_EVENTS_RESTORE);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
index 7903b93bc5d..aa547037a13 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
@@ -87,6 +87,7 @@ public class ProcessTableFunctionSemanticTests extends
SemanticTestBase {
ProcessTableFunctionTestPrograms.PROCESS_NAMED_TIMERS,
ProcessTableFunctionTestPrograms.PROCESS_UNNAMED_TIMERS,
ProcessTableFunctionTestPrograms.PROCESS_LATE_EVENTS,
+ ProcessTableFunctionTestPrograms.PROCESS_ROW_LATE_EVENTS,
ProcessTableFunctionTestPrograms.PROCESS_SCALAR_ARGS_TIME,
ProcessTableFunctionTestPrograms.PROCESS_OPTIONAL_PARTITION_BY_TIME,
ProcessTableFunctionTestPrograms.PROCESS_OPTIONAL_ON_TIME,
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
index 7248ff1755a..062de5c2416 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
@@ -50,6 +50,7 @@ import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctio
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.PojoStateFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.PojoStateTimeFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.PojoWithDefaultStateFunction;
+import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.RequiredTimeFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.RowSemanticTableFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.RowSemanticTablePassThroughFunction;
import
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ScalarArgsFunction;
@@ -1075,8 +1076,8 @@ public class ProcessTableFunctionTestPrograms {
"+I[Bob, {Processing input row
+I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark null},
1970-01-01T00:00:00.004Z]",
"+I[Bob, {Processing input row
+I[Bob, 5, 1970-01-01T00:00:00.005Z] at time 5 watermark null},
1970-01-01T00:00:00.005Z]",
"+I[Bob, {Processing input row
+I[Bob, 6, 1970-01-01T00:00:00.006Z] at time 6 watermark null},
1970-01-01T00:00:00.006Z]",
- "+I[Bob, {Timer timeout2 fired at
time 2 watermark 9223372036854775807}, 1970-01-01T00:00:00.002Z]",
- "+I[Bob, {Clearing all timers at
time 2 watermark 9223372036854775807}, 1970-01-01T00:00:00.002Z]")
+ "+I[Bob, {Timer timeout2 fired at
time 5 watermark 9223372036854775807}, 1970-01-01T00:00:00.005Z]",
+ "+I[Bob, {Clearing all timers at
time 5 watermark 9223372036854775807}, 1970-01-01T00:00:00.005Z]")
.build())
.runSql(
"INSERT INTO sink SELECT * FROM f(r => TABLE t
PARTITION BY name, on_time => DESCRIPTOR(ts))")
@@ -1115,7 +1116,7 @@ public class ProcessTableFunctionTestPrograms {
public static final TableTestProgram PROCESS_LATE_EVENTS =
TableTestProgram.of(
"process-late-events",
- "test that late events are dropped in both input
and when registering timers")
+ "test that late events enter PTF (eval and timer
registration)")
.setupTemporarySystemFunction("f",
LateTimersFunction.class)
.setupTableSource(TIMED_SOURCE_LATE_EVENTS)
.setupTableSink(
@@ -1123,27 +1124,102 @@ public class ProcessTableFunctionTestPrograms {
.addSchema(KEYED_TIMED_BASE_SINK_SCHEMA)
.consumedValues(
"+I[Bob, {Processing input row
+I[Bob, 1, 1970-01-01T00:00:00Z] at time 0 watermark null},
1970-01-01T00:00:00Z]",
- "+I[Bob, {Registering timer t for
0 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
+ "+I[Bob, {Registering timer bob
for 0 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
"+I[Bob, {Registering timer for 0
at time 0 watermark null}, 1970-01-01T00:00:00Z]",
"+I[Alice, {Processing input row
+I[Alice, 1, 1970-01-01T00:00:00.001Z] at time 1 watermark -1},
1970-01-01T00:00:00.001Z]",
- "+I[Alice, {Registering timer t
for 0 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]",
- "+I[Alice, {Registering timer for
0 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]",
+ "+I[Alice, {Registering timer
alice for 1 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]",
"+I[Bob, {Timer null fired at time
0 watermark 0}, 1970-01-01T00:00:00Z]",
- "+I[Bob, {Registering timer again
for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
- "+I[Alice, {Timer null fired at
time 0 watermark 0}, 1970-01-01T00:00:00Z]",
- "+I[Alice, {Registering timer
again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
- "+I[Bob, {Timer t fired at time 0
watermark 0}, 1970-01-01T00:00:00Z]",
- "+I[Bob, {Registering timer again
for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
- "+I[Alice, {Timer t fired at time
0 watermark 0}, 1970-01-01T00:00:00Z]",
- "+I[Alice, {Registering timer
again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
+ "+I[Bob, {Timer bob fired at time
0 watermark 0}, 1970-01-01T00:00:00Z]",
"+I[Bob, {Processing input row
+I[Bob, 2, 1970-01-01T00:01:39.999Z] at time 99999 watermark 0},
1970-01-01T00:01:39.999Z]",
- "+I[Bob, {Registering timer t for
0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]",
- "+I[Bob, {Registering timer for 0
at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]")
+ "+I[Bob, {Registering timer bob
for 0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]",
+ "+I[Bob, {Registering timer for 0
at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]",
+ "+I[Bob, {Timer null fired at time
0 watermark 99998}, 1970-01-01T00:00:00Z]",
+ "+I[Bob, {Timer bob fired at time
0 watermark 99998}, 1970-01-01T00:00:00Z]",
+ "+I[Alice, {Timer alice fired at
time 1 watermark 99998}, 1970-01-01T00:00:00.001Z]",
+ "+I[Alice, {Registering timer
alice-again for 0 at time 1 watermark 99998}, 1970-01-01T00:00:00.001Z]",
+ "+I[Alice, {Timer alice-again
fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]",
+ "+I[Bob, {Processing input row
+I[Bob, 3, 1970-01-01T00:00:00.003Z] at time 3 watermark 99998},
1970-01-01T00:00:00.003Z]",
+ "+I[Bob, {Registering timer bob
for 0 at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]",
+ "+I[Bob, {Registering timer for 0
at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]",
+ "+I[Bob, {Processing input row
+I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark 99998},
1970-01-01T00:00:00.004Z]",
+ "+I[Bob, {Registering timer bob
for 0 at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]",
+ "+I[Bob, {Registering timer for 0
at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]",
+ "+I[Bob, {Timer null fired at time
0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]",
+ "+I[Bob, {Timer bob fired at time
0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]")
.build())
.runSql(
"INSERT INTO sink SELECT * FROM f(r => TABLE t
PARTITION BY name, on_time => DESCRIPTOR(ts))")
.build();
+ public static final TableTestProgram PROCESS_LATE_EVENTS_RESTORE =
+ TableTestProgram.of(
+ "process-late-events-restore",
+ "test that late events and their past-time timers
work correctly after restore")
+ .setupTemporarySystemFunction("f",
LateTimersFunction.class)
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema(TIMED_SOURCE_SCHEMA)
+ .producedBeforeRestore(
+ Row.of("Bob", 1,
Instant.ofEpochMilli(0)),
+ Row.of("Alice", 1,
Instant.ofEpochMilli(1)))
+ .producedAfterRestore(
+ Row.of("Bob", 2,
Instant.ofEpochMilli(99999)),
+ Row.of("Bob", 3,
Instant.ofEpochMilli(3)),
+ Row.of("Bob", 4,
Instant.ofEpochMilli(4)))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema(KEYED_TIMED_BASE_SINK_SCHEMA)
+ .consumedBeforeRestore(
+ "+I[Bob, {Processing input row
+I[Bob, 1, 1970-01-01T00:00:00Z] at time 0 watermark null},
1970-01-01T00:00:00Z]",
+ "+I[Bob, {Registering timer bob
for 0 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
+ "+I[Bob, {Registering timer for 0
at time 0 watermark null}, 1970-01-01T00:00:00Z]",
+ "+I[Alice, {Processing input row
+I[Alice, 1, 1970-01-01T00:00:00.001Z] at time 1 watermark -1},
1970-01-01T00:00:00.001Z]",
+ "+I[Alice, {Registering timer
alice for 1 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]",
+ "+I[Bob, {Timer null fired at time
0 watermark 0}, 1970-01-01T00:00:00Z]",
+ "+I[Bob, {Timer bob fired at time
0 watermark 0}, 1970-01-01T00:00:00Z]")
+ .consumedAfterRestore(
+ "+I[Bob, {Processing input row
+I[Bob, 2, 1970-01-01T00:01:39.999Z] at time 99999 watermark null},
1970-01-01T00:01:39.999Z]",
+ "+I[Bob, {Registering timer bob
for 0 at time 99999 watermark null}, 1970-01-01T00:01:39.999Z]",
+ "+I[Bob, {Registering timer for 0
at time 99999 watermark null}, 1970-01-01T00:01:39.999Z]",
+ "+I[Bob, {Timer null fired at time
0 watermark 99998}, 1970-01-01T00:00:00Z]",
+ "+I[Bob, {Timer bob fired at time
0 watermark 99998}, 1970-01-01T00:00:00Z]",
+ "+I[Alice, {Timer alice fired at
time 1 watermark 99998}, 1970-01-01T00:00:00.001Z]",
+ "+I[Alice, {Registering timer
alice-again for 0 at time 1 watermark 99998}, 1970-01-01T00:00:00.001Z]",
+ "+I[Alice, {Timer alice-again
fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]",
+ "+I[Bob, {Processing input row
+I[Bob, 3, 1970-01-01T00:00:00.003Z] at time 3 watermark 99998},
1970-01-01T00:00:00.003Z]",
+ "+I[Bob, {Registering timer bob
for 0 at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]",
+ "+I[Bob, {Registering timer for 0
at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]",
+ "+I[Bob, {Processing input row
+I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark 99998},
1970-01-01T00:00:00.004Z]",
+ "+I[Bob, {Registering timer bob
for 0 at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]",
+ "+I[Bob, {Registering timer for 0
at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]",
+ "+I[Bob, {Timer null fired at time
0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]",
+ "+I[Bob, {Timer bob fired at time
0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]")
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM f(r => TABLE t
PARTITION BY name, on_time => DESCRIPTOR(ts))")
+ .build();
+
+ public static final TableTestProgram PROCESS_ROW_LATE_EVENTS =
+ TableTestProgram.of(
+ "process-row-late-events",
+ "test that late events enter a PTF with row
semantics")
+ .setupTemporarySystemFunction("f",
RequiredTimeFunction.class)
+ .setupTableSource(TIMED_SOURCE_LATE_EVENTS)
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema(TIMED_BASE_SINK_SCHEMA)
+ .consumedValues(
+ "+I[{+I[Bob, 1,
1970-01-01T00:00:00Z]}, 1970-01-01T00:00:00Z]",
+ "+I[{+I[Alice, 1,
1970-01-01T00:00:00.001Z]}, 1970-01-01T00:00:00.001Z]",
+ "+I[{+I[Bob, 2,
1970-01-01T00:01:39.999Z]}, 1970-01-01T00:01:39.999Z]",
+ "+I[{+I[Bob, 3,
1970-01-01T00:00:00.003Z]}, 1970-01-01T00:00:00.003Z]",
+ "+I[{+I[Bob, 4,
1970-01-01T00:00:00.004Z]}, 1970-01-01T00:00:00.004Z]")
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM f(r => TABLE t,
on_time => DESCRIPTOR(ts))")
+ .build();
+
public static final TableTestProgram PROCESS_SCALAR_ARGS_TIME =
TableTestProgram.of(
"process-scalar-args-time",
@@ -1187,7 +1263,7 @@ public class ProcessTableFunctionTestPrograms {
public static final TableTestProgram PROCESS_OPTIONAL_ON_TIME =
TableTestProgram.of(
"process-optional-on-time",
- "test optional time attribute, fire once for
constant timer")
+ "test optional time attribute, re-fires timer for
constant timer registration after time passed")
.setupTemporarySystemFunction("f",
OptionalOnTimeFunction.class)
.setupTableSource(TIMED_SOURCE)
.setupTableSink(
@@ -1195,21 +1271,25 @@ public class ProcessTableFunctionTestPrograms {
.addSchema(KEYED_BASE_SINK_SCHEMA)
.consumedValues(
"+I[Bob, {Processing input row
+I[Bob, 1, 1970-01-01T00:00:00Z] at time null watermark null}]",
- "+I[Bob, {Registering timer t for
2 at time null watermark null}]",
+ "+I[Bob, {Registering timer t for
1 at time null watermark null}]",
"+I[Alice, {Processing input row
+I[Alice, 1, 1970-01-01T00:00:00.001Z] at time null watermark -1}]",
- "+I[Alice, {Registering timer t
for 2 at time null watermark -1}]",
+ "+I[Alice, {Registering timer t
for 1 at time null watermark -1}]",
"+I[Bob, {Processing input row
+I[Bob, 2, 1970-01-01T00:00:00.002Z] at time null watermark 0}]",
- "+I[Bob, {Registering timer t for
2 at time null watermark 0}]",
+ "+I[Bob, {Registering timer t for
1 at time null watermark 0}]",
+ "+I[Alice, {Timer t fired at time
1 watermark 1}]",
+ "+I[Bob, {Timer t fired at time 1
watermark 1}]",
"+I[Bob, {Processing input row
+I[Bob, 3, 1970-01-01T00:00:00.003Z] at time null watermark 1}]",
"+I[Bob, {Registering timer t for
2 at time null watermark 1}]",
- "+I[Alice, {Timer t fired at time
2 watermark 2}]",
"+I[Bob, {Timer t fired at time 2
watermark 2}]",
"+I[Bob, {Processing input row
+I[Bob, 4, 1970-01-01T00:00:00.004Z] at time null watermark 2}]",
- "+I[Bob, {Registering timer t for
2 at time null watermark 2}]",
+ "+I[Bob, {Registering timer t for
3 at time null watermark 2}]",
+ "+I[Bob, {Timer t fired at time 3
watermark 3}]",
"+I[Bob, {Processing input row
+I[Bob, 5, 1970-01-01T00:00:00.005Z] at time null watermark 3}]",
- "+I[Bob, {Registering timer t for
2 at time null watermark 3}]",
+ "+I[Bob, {Registering timer t for
4 at time null watermark 3}]",
+ "+I[Bob, {Timer t fired at time 4
watermark 4}]",
"+I[Bob, {Processing input row
+I[Bob, 6, 1970-01-01T00:00:00.006Z] at time null watermark 4}]",
- "+I[Bob, {Registering timer t for
2 at time null watermark 4}]")
+ "+I[Bob, {Registering timer t for
5 at time null watermark 4}]",
+ "+I[Bob, {Timer t fired at time 5
watermark 5}]")
.build())
.runSql("INSERT INTO sink SELECT * FROM f(r => TABLE t
PARTITION BY name)")
.build();
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
index be6902a0b17..495293c9244 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
@@ -645,20 +645,23 @@ public class ProcessTableFunctionTestUtils {
public void eval(Context ctx, @ArgumentHint({SET_SEMANTIC_TABLE,
REQUIRE_ON_TIME}) Row r) {
final TimeContext<Long> timeCtx = ctx.timeContext(Long.class);
collectEvalEvent(timeCtx, r);
- // all timers should be executed once
- if (timeCtx.time() == 99998) {
- // will never be fired because it's late
- collectCreateTimer(timeCtx, "late", 1L);
+ if (r.getFieldAs("name").equals("Bob")) {
+ // Bob registers timers at 0
+ collectCreateTimer(timeCtx, "bob", 0L);
+ collectCreateTimer(timeCtx, 0L);
+ } else {
+ // Alice registers timer at 1
+ collectCreateTimer(timeCtx, "alice", 1L);
}
- collectCreateTimer(timeCtx, "t", 0L);
- collectCreateTimer(timeCtx, 0L);
}
public void onTimer(OnTimerContext ctx) {
final TimeContext<Long> timeCtx = ctx.timeContext(Long.class);
collectOnTimerEvent(ctx);
- // will never be fired because it's late
- collectCreateTimer(timeCtx, "again", timeCtx.time());
+ if (ctx.currentTimer() != null &&
ctx.currentTimer().equals("alice")) {
+ // register a timer in the past, which should fire immediately
+ collectCreateTimer(timeCtx, "alice-again", 0);
+ }
}
}
@@ -699,7 +702,12 @@ public class ProcessTableFunctionTestUtils {
public void eval(Context ctx, @ArgumentHint(SET_SEMANTIC_TABLE) Row r)
{
final TimeContext<Long> timeCtx = ctx.timeContext(Long.class);
collectEvalEvent(timeCtx, r);
- collectCreateTimer(timeCtx, "t", 2);
+ Long wm = timeCtx.currentWatermark();
+ // Register at wm+1 to always target the immediate next watermark:
the timer fires
+ // exactly once per watermark advance, and each new row
re-registers the timer for the
+ // following watermark step, demonstrating repeated timer
re-registration.
+ long timer = wm == null || wm < 0 ? 1 : wm + 1;
+ collectCreateTimer(timeCtx, "t", timer);
}
public void onTimer(OnTimerContext ctx) {
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/plan/process-late-events-restore.json
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/plan/process-late-events-restore.json
new file mode 100644
index 00000000000..8d22a24a156
--- /dev/null
+++
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/plan/process-late-events-restore.json
@@ -0,0 +1,320 @@
+{
+ "flinkVersion" : "2.4",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_2",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`t`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "score",
+ "dataType" : "INT"
+ }, {
+ "name" : "ts",
+ "dataType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ],
+ "watermarkSpecs" : [ {
+ "rowtimeAttribute" : "ts",
+ "expression" : {
+ "rexNode" : {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$-$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1",
+ "type" : "INTERVAL SECOND(6) NOT NULL"
+ } ],
+ "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE"
+ },
+ "serializableString" : "`ts` - INTERVAL '0.001' SECOND"
+ }
+ } ]
+ }
+ }
+ },
+ "abilities" : [ {
+ "type" : "WatermarkPushDown",
+ "watermarkExpr" : {
+ "kind" : "CALL",
+ "syntax" : "BINARY",
+ "internalName" : "$-$1",
+ "operands" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE"
+ }, {
+ "kind" : "LITERAL",
+ "value" : "1",
+ "type" : "INTERVAL SECOND(6) NOT NULL"
+ } ],
+ "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE"
+ },
+ "rowtimeExpr" : {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 2,
+ "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE"
+ },
+ "idleTimeoutMillis" : -1,
+ "producedType" : {
+ "type" : "ROW",
+ "nullable" : false,
+ "fields" : [ {
+ "name" : "name",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "score",
+ "fieldType" : "INT"
+ }, {
+ "name" : "ts",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "watermarkParams" : {
+ "emitStrategy" : "ON_EVENT",
+ "alignGroupName" : null,
+ "alignMaxDrift" : "PT0S",
+ "alignUpdateInterval" : "PT1S",
+ "sourceIdleTimeout" : -1
+ }
+ } ]
+ },
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "name",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "score",
+ "fieldType" : "INT"
+ }, {
+ "name" : "ts",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "description" : "TableSourceScan(table=[[default_catalog,
default_database, t, watermark=[-(ts, 1:INTERVAL SECOND)],
watermarkEmitStrategy=[on-event]]], fields=[name, score, ts])"
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "HASH",
+ "keys" : [ 0 ]
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "name",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "score",
+ "fieldType" : "INT"
+ }, {
+ "name" : "ts",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "description" : "Exchange(distribution=[hash[name]])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-process-table-function_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "name",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "out",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "description" : "ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION
BY($0), DESCRIPTOR(_UTF-16LE'ts'), DEFAULT())], uid=[f],
select=[name,out,rowtime], rowType=[RecordType(VARCHAR(2147483647) name,
VARCHAR(2147483647) out, TIMESTAMP_LTZ(3) *ROWTIME* rowtime)])",
+ "uid" : "f",
+ "functionCall" : {
+ "kind" : "CALL",
+ "systemName" : "f",
+ "operands" : [ {
+ "kind" : "TABLE_ARG_CALL",
+ "inputIndex" : 0,
+ "partitionKeys" : [ 0 ],
+ "orderKeys" : [ ],
+ "orderDirections" : [ ],
+ "type" : {
+ "type" : "ROW",
+ "nullable" : false,
+ "fields" : [ {
+ "name" : "name",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "score",
+ "fieldType" : "INT"
+ }, {
+ "name" : "ts",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ }
+ }, {
+ "kind" : "CALL",
+ "internalName" : "$DESCRIPTOR$1",
+ "operands" : [ {
+ "kind" : "LITERAL",
+ "value" : "ts",
+ "type" : "CHAR(2) NOT NULL"
+ } ],
+ "type" : "DESCRIPTOR NOT NULL"
+ }, {
+ "kind" : "CALL",
+ "syntax" : "SPECIAL",
+ "internalName" : "$DEFAULT$1",
+ "type" : "VARCHAR(2147483647)"
+ } ],
+ "type" : {
+ "type" : "ROW",
+ "nullable" : false,
+ "fields" : [ {
+ "name" : "name",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "out",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ }
+ },
+ "inputChangelogModes" : [ [ "INSERT" ] ],
+ "outputChangelogMode" : [ "INSERT" ]
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-sink_2",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`sink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "name",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "out",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "rowtime",
+ "dataType" : "TIMESTAMP(3) WITH LOCAL TIME ZONE"
+ } ]
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "upsertMaterializeStrategy" : "ADAPTIVE",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : {
+ "type" : "ROW",
+ "fields" : [ {
+ "name" : "name",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "out",
+ "fieldType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "rowtime",
+ "fieldType" : {
+ "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+ "nullable" : false,
+ "precision" : 3,
+ "kind" : "ROWTIME"
+ }
+ } ]
+ },
+ "description" : "Sink(table=[default_catalog.default_database.sink],
fields=[name, out, rowtime])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file
diff --git
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/savepoint/_metadata
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/savepoint/_metadata
new file mode 100644
index 00000000000..90af4c35308
Binary files /dev/null and
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/savepoint/_metadata
differ
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
index 318dd4ae0d3..b90a201649e 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
@@ -160,10 +160,6 @@ public abstract class ProcessTableRunner extends
AbstractRichFunction {
}
public void processEval() throws Exception {
- // Drop late events
- if (rowtime != null && rowtime <= currentWatermark) {
- return;
- }
processMethod(this::callEval);
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
index 93eee5e8e4f..cf3a6149af6 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
@@ -155,9 +155,16 @@ public abstract class AbstractProcessTableOperator extends
AbstractStreamOperato
@Override
public void onEventTime(InternalTimer<RowData, Object> timer) throws
Exception {
final Object namedTimer = timer.getNamespace();
+ boolean isNamedTimer = namedTimer != VoidNamespace.INSTANCE;
+ // Remove the fired timer's state entry immediately to prevent stale
entries from
+ // accumulating. Without this, entries for fired timers would persist
until the same
+ // timer name is re-registered or the state is explicitly cleared.
+ if (isNamedTimer) {
+ namedTimersMapState.remove((StringData) namedTimer);
+ }
processTableRunner.ingestTimerEvent(
timer.getKey(),
- namedTimer == VoidNamespace.INSTANCE ? null : (StringData)
namedTimer,
+ isNamedTimer ? (StringData) namedTimer : null,
timer.getTimestamp());
processTableRunner.processOnTimer();
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java
index 5fd996fecaa..ce97c630c80 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java
@@ -87,11 +87,6 @@ class WritableInternalTimeContext extends
ReadableInternalTimeContext {
}
private void registerOnTimeInternal(@Nullable String name, long newTime) {
- if (newTime <= unnamedTimerService.currentWatermark()) {
- // Do not register timers for late events.
- // Otherwise, the next watermark would trigger an onTimer() that
emits late events.
- return;
- }
if (name != null) {
replaceNamedTimer(StringData.fromString(name), newTime);
} else {