This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch release-2.3
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 717ed1e815768dbfc7aea4f3f2033fb6b72a5462
Author: Fabian Hueske <[email protected]>
AuthorDate: Tue Apr 21 21:28:14 2026 +0200

    [FLINK-39436][table] Allow late data in PTFs (#27935)
    
    Previously, late events (rowtime <= watermark) were silently dropped
    before reaching PTF eval(), and timer registrations for times <=
    watermark were also silently dropped. This change removes both
    restrictions:
    
    - ProcessTableRunner: remove the early-return guard in processEval()
      so that late events are passed to the PTF's eval() method.
    
    - WritableInternalTimeContext: remove the watermark check in
      registerOnTimeInternal() so that timers can be registered for past
      times. Such timers fire immediately at the next watermark advance,
      including when registered from within onTimer(). The previous guard
      also had an unintended side effect: any call to replaceNamedTimer()
      with a past time would delete the existing timer entry but then
      silently drop the new registration, leaving the named timer in a
      state where it appeared un-registered but the old timer was gone.
    
    - AbstractProcessTableOperator: remove the fired named timer's state
      entry before invoking onTimer() to prevent stale entries from
      accumulating in the named timers map state.
    
    Tests are updated to reflect the new semantics:
    
    - PROCESS_LATE_EVENTS: demonstrates that late events enter eval(),
      can register timers (including for past times), and that such timers
      fire immediately at the next watermark advance.
    
    - PROCESS_ROW_LATE_EVENTS (new): verifies the same for row-semantics
      PTFs.
    
    - PROCESS_OPTIONAL_ON_TIME / PROCESS_NAMED_TIMERS: updated to reflect
      that timer registrations for past times are no longer dropped.
      Previously, once the watermark passed the registered time, the timer
      was silently discarded; now it fires immediately.
---
 docs/content.zh/docs/dev/table/functions/ptfs.md   |  12 +
 docs/content/docs/dev/table/functions/ptfs.md      |  12 +
 .../table/functions/ProcessTableFunction.java      |  25 ++
 .../stream/ProcessTableFunctionRestoreTests.java   |   3 +-
 .../stream/ProcessTableFunctionSemanticTests.java  |   1 +
 .../stream/ProcessTableFunctionTestPrograms.java   | 126 ++++++--
 .../exec/stream/ProcessTableFunctionTestUtils.java |  26 +-
 .../plan/process-late-events-restore.json          | 320 +++++++++++++++++++++
 .../savepoint/_metadata                            | Bin 0 -> 15523 bytes
 .../runtime/generated/ProcessTableRunner.java      |   4 -
 .../process/AbstractProcessTableOperator.java      |   9 +-
 .../process/WritableInternalTimeContext.java       |   5 -
 12 files changed, 500 insertions(+), 43 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/functions/ptfs.md 
b/docs/content.zh/docs/dev/table/functions/ptfs.md
index 9a077a6473c..fc89ab76f6c 100644
--- a/docs/content.zh/docs/dev/table/functions/ptfs.md
+++ b/docs/content.zh/docs/dev/table/functions/ptfs.md
@@ -1021,6 +1021,18 @@ class TimerFunction extends ProcessTableFunction<String> 
{
 {{< /tab >}}
 {{< /tabs >}}
 
+### Handling of Late Records
+
+A late record is a record with a time attribute value that is less than or 
equal to the current
+watermark. PTFs handle late records just like non-late records by calling the 
`eval()` method. If
+the `on_time` argument is specified, the late timestamp is preserved in the 
output. This behavior is
+the same for PTFs with row and set semantics.
+
+Registering a timer for a time that is less than or equal to the current 
watermark is allowed.
+If registered from within `eval()`, the timer fires on the next watermark 
advance. If registered
+from within `onTimer()`, the timer fires immediately after the current timer 
finishes. Note that
+unconditionally re-registering a past-time timer from within `onTimer()` 
causes an infinite loop.
+
 ### Efficiency and Design Principles
 
 Registering too many timers might affect performance. An ever-growing timer 
state can happen
diff --git a/docs/content/docs/dev/table/functions/ptfs.md 
b/docs/content/docs/dev/table/functions/ptfs.md
index 1b919dbc0d3..0b392c56d11 100644
--- a/docs/content/docs/dev/table/functions/ptfs.md
+++ b/docs/content/docs/dev/table/functions/ptfs.md
@@ -1022,6 +1022,18 @@ class TimerFunction extends ProcessTableFunction<String> 
{
 {{< /tab >}}
 {{< /tabs >}}
 
+### Handling of Late Records
+
+A late record is a record with a time attribute value that is less than or 
equal to the current
+watermark. PTFs handle late records just like non-late records by calling the 
`eval()` method. If 
+the `on_time` argument is specified, the late timestamp is preserved in the 
output. This behavior is
+the same for PTFs with row and set semantics.
+
+Registering a timer for a time that is less than or equal to the current 
watermark is allowed.
+If registered from within `eval()`, the timer fires on the next watermark 
advance. If registered
+from within `onTimer()`, the timer fires immediately after the current timer 
finishes. Note that
+unconditionally re-registering a past-time timer from within `onTimer()` 
causes an infinite loop.
+
 ### Efficiency and Design Principles
 
 Registering too many timers might affect performance. An ever-growing timer 
state can happen
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java
index bec4bcc8593..19e5bdee501 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java
@@ -403,6 +403,19 @@ import java.time.LocalDateTime;
  * }
  * }</pre>
  *
+ * <h2>Handling of Late Records</h2>
+ *
+ * <p>A late record is a record with a time attribute value that is less than 
or equal to the
+ * current watermark. PTFs handle late records just like non-late records by 
calling the {@code
+ * eval()} method. If the {@code on_time} argument is specified, the late 
timestamp is preserved in
+ * the output. This behavior is the same for PTFs with row and set semantics.
+ *
+ * <p>Registering a timer for a time that is less than or equal to the current 
watermark is allowed.
+ * If registered from within {@code eval()}, the timer fires on the next 
watermark advance. If
+ * registered from within {@code onTimer()}, the timer fires immediately after 
the current timer
+ * finishes. Note that unconditionally re-registering a past-time timer from 
within {@code
+ * onTimer()} causes an infinite loop.
+ *
  * <h2>Efficiency and Design Principles</h2>
  *
  * <p>Registering too many timers might affect performance. An ever-growing 
timer state can happen
@@ -660,6 +673,12 @@ public abstract class ProcessTableFunction<T> extends 
UserDefinedFunction {
          * timer only fires if a watermark was received from all inputs and 
the timestamp is smaller
          * or equal to the minimum of all received watermarks.
          *
+         * <p>If the timestamp of the registered timer is already less than or 
equal to the current
+         * watermark, the timer fires on the next watermark advance if 
registered from within {@code
+         * eval()}, or immediately after the current timer finishes if 
registered from within {@code
+         * onTimer()}. Note that unconditionally re-registering a past-time 
timer from within {@code
+         * onTimer()} causes an infinite loop.
+         *
          * <p>Timers can be named for distinguishing them in the {@code 
onTimer()} method.
          * Registering a timer under the same name twice will replace an 
existing timer.
          *
@@ -680,6 +699,12 @@ public abstract class ProcessTableFunction<T> extends 
UserDefinedFunction {
          * timer only fires if a watermark was received from all inputs and 
the timestamp is smaller
          * or equal to the minimum of all received watermarks.
          *
+         * <p>If the timestamp of the registered timer is already less than or 
equal to the current
+         * watermark, the timer fires on the next watermark advance if 
registered from within {@code
+         * eval()}, or immediately after the current timer finishes if 
registered from within {@code
+         * onTimer()}. Note that unconditionally re-registering a past-time 
timer from within {@code
+         * onTimer()} causes an infinite loop.
+         *
          * <p>Only one timer can be registered for a given time.
          *
          * <p>Note: Because only PTFs taking set semantic tables support 
state, and timers are a
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionRestoreTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionRestoreTests.java
index d95c4a7dd73..03ef42de0e1 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionRestoreTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionRestoreTests.java
@@ -40,6 +40,7 @@ public class ProcessTableFunctionRestoreTests extends 
RestoreTestBase {
                 ProcessTableFunctionTestPrograms.PROCESS_MAP_STATE_RESTORE,
                 
ProcessTableFunctionTestPrograms.PROCESS_UPDATING_OUTPUT_UPSERT_RESTORE,
                 ProcessTableFunctionTestPrograms.PROCESS_ORDER_BY_RESTORE,
-                
ProcessTableFunctionTestPrograms.PROCESS_MULTI_INPUT_ORDER_BY_RESTORE);
+                
ProcessTableFunctionTestPrograms.PROCESS_MULTI_INPUT_ORDER_BY_RESTORE,
+                ProcessTableFunctionTestPrograms.PROCESS_LATE_EVENTS_RESTORE);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
index 7903b93bc5d..aa547037a13 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
@@ -87,6 +87,7 @@ public class ProcessTableFunctionSemanticTests extends 
SemanticTestBase {
                 ProcessTableFunctionTestPrograms.PROCESS_NAMED_TIMERS,
                 ProcessTableFunctionTestPrograms.PROCESS_UNNAMED_TIMERS,
                 ProcessTableFunctionTestPrograms.PROCESS_LATE_EVENTS,
+                ProcessTableFunctionTestPrograms.PROCESS_ROW_LATE_EVENTS,
                 ProcessTableFunctionTestPrograms.PROCESS_SCALAR_ARGS_TIME,
                 
ProcessTableFunctionTestPrograms.PROCESS_OPTIONAL_PARTITION_BY_TIME,
                 ProcessTableFunctionTestPrograms.PROCESS_OPTIONAL_ON_TIME,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
index 7248ff1755a..062de5c2416 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
@@ -50,6 +50,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctio
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.PojoStateFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.PojoStateTimeFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.PojoWithDefaultStateFunction;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.RequiredTimeFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.RowSemanticTableFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.RowSemanticTablePassThroughFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ScalarArgsFunction;
@@ -1075,8 +1076,8 @@ public class ProcessTableFunctionTestPrograms {
                                             "+I[Bob, {Processing input row 
+I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark null}, 
1970-01-01T00:00:00.004Z]",
                                             "+I[Bob, {Processing input row 
+I[Bob, 5, 1970-01-01T00:00:00.005Z] at time 5 watermark null}, 
1970-01-01T00:00:00.005Z]",
                                             "+I[Bob, {Processing input row 
+I[Bob, 6, 1970-01-01T00:00:00.006Z] at time 6 watermark null}, 
1970-01-01T00:00:00.006Z]",
-                                            "+I[Bob, {Timer timeout2 fired at 
time 2 watermark 9223372036854775807}, 1970-01-01T00:00:00.002Z]",
-                                            "+I[Bob, {Clearing all timers at 
time 2 watermark 9223372036854775807}, 1970-01-01T00:00:00.002Z]")
+                                            "+I[Bob, {Timer timeout2 fired at 
time 5 watermark 9223372036854775807}, 1970-01-01T00:00:00.005Z]",
+                                            "+I[Bob, {Clearing all timers at 
time 5 watermark 9223372036854775807}, 1970-01-01T00:00:00.005Z]")
                                     .build())
                     .runSql(
                             "INSERT INTO sink SELECT * FROM f(r => TABLE t 
PARTITION BY name, on_time => DESCRIPTOR(ts))")
@@ -1115,7 +1116,7 @@ public class ProcessTableFunctionTestPrograms {
     public static final TableTestProgram PROCESS_LATE_EVENTS =
             TableTestProgram.of(
                             "process-late-events",
-                            "test that late events are dropped in both input 
and when registering timers")
+                            "test that late events enter PTF (eval and timer 
registration)")
                     .setupTemporarySystemFunction("f", 
LateTimersFunction.class)
                     .setupTableSource(TIMED_SOURCE_LATE_EVENTS)
                     .setupTableSink(
@@ -1123,27 +1124,102 @@ public class ProcessTableFunctionTestPrograms {
                                     .addSchema(KEYED_TIMED_BASE_SINK_SCHEMA)
                                     .consumedValues(
                                             "+I[Bob, {Processing input row 
+I[Bob, 1, 1970-01-01T00:00:00Z] at time 0 watermark null}, 
1970-01-01T00:00:00Z]",
-                                            "+I[Bob, {Registering timer t for 
0 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Registering timer bob 
for 0 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
                                             "+I[Bob, {Registering timer for 0 
at time 0 watermark null}, 1970-01-01T00:00:00Z]",
                                             "+I[Alice, {Processing input row 
+I[Alice, 1, 1970-01-01T00:00:00.001Z] at time 1 watermark -1}, 
1970-01-01T00:00:00.001Z]",
-                                            "+I[Alice, {Registering timer t 
for 0 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]",
-                                            "+I[Alice, {Registering timer for 
0 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]",
+                                            "+I[Alice, {Registering timer 
alice for 1 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]",
                                             "+I[Bob, {Timer null fired at time 
0 watermark 0}, 1970-01-01T00:00:00Z]",
-                                            "+I[Bob, {Registering timer again 
for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
-                                            "+I[Alice, {Timer null fired at 
time 0 watermark 0}, 1970-01-01T00:00:00Z]",
-                                            "+I[Alice, {Registering timer 
again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
-                                            "+I[Bob, {Timer t fired at time 0 
watermark 0}, 1970-01-01T00:00:00Z]",
-                                            "+I[Bob, {Registering timer again 
for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
-                                            "+I[Alice, {Timer t fired at time 
0 watermark 0}, 1970-01-01T00:00:00Z]",
-                                            "+I[Alice, {Registering timer 
again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Timer bob fired at time 
0 watermark 0}, 1970-01-01T00:00:00Z]",
                                             "+I[Bob, {Processing input row 
+I[Bob, 2, 1970-01-01T00:01:39.999Z] at time 99999 watermark 0}, 
1970-01-01T00:01:39.999Z]",
-                                            "+I[Bob, {Registering timer t for 
0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]",
-                                            "+I[Bob, {Registering timer for 0 
at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]")
+                                            "+I[Bob, {Registering timer bob 
for 0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]",
+                                            "+I[Bob, {Registering timer for 0 
at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]",
+                                            "+I[Bob, {Timer null fired at time 
0 watermark 99998}, 1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Timer bob fired at time 
0 watermark 99998}, 1970-01-01T00:00:00Z]",
+                                            "+I[Alice, {Timer alice fired at 
time 1 watermark 99998}, 1970-01-01T00:00:00.001Z]",
+                                            "+I[Alice, {Registering timer 
alice-again for 0 at time 1 watermark 99998}, 1970-01-01T00:00:00.001Z]",
+                                            "+I[Alice, {Timer alice-again 
fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Processing input row 
+I[Bob, 3, 1970-01-01T00:00:00.003Z] at time 3 watermark 99998}, 
1970-01-01T00:00:00.003Z]",
+                                            "+I[Bob, {Registering timer bob 
for 0 at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]",
+                                            "+I[Bob, {Registering timer for 0 
at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]",
+                                            "+I[Bob, {Processing input row 
+I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark 99998}, 
1970-01-01T00:00:00.004Z]",
+                                            "+I[Bob, {Registering timer bob 
for 0 at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]",
+                                            "+I[Bob, {Registering timer for 0 
at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]",
+                                            "+I[Bob, {Timer null fired at time 
0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Timer bob fired at time 
0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]")
                                     .build())
                     .runSql(
                             "INSERT INTO sink SELECT * FROM f(r => TABLE t 
PARTITION BY name, on_time => DESCRIPTOR(ts))")
                     .build();
 
+    public static final TableTestProgram PROCESS_LATE_EVENTS_RESTORE =
+            TableTestProgram.of(
+                            "process-late-events-restore",
+                            "test that late events and their past-time timers 
work correctly after restore")
+                    .setupTemporarySystemFunction("f", 
LateTimersFunction.class)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(TIMED_SOURCE_SCHEMA)
+                                    .producedBeforeRestore(
+                                            Row.of("Bob", 1, 
Instant.ofEpochMilli(0)),
+                                            Row.of("Alice", 1, 
Instant.ofEpochMilli(1)))
+                                    .producedAfterRestore(
+                                            Row.of("Bob", 2, 
Instant.ofEpochMilli(99999)),
+                                            Row.of("Bob", 3, 
Instant.ofEpochMilli(3)),
+                                            Row.of("Bob", 4, 
Instant.ofEpochMilli(4)))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema(KEYED_TIMED_BASE_SINK_SCHEMA)
+                                    .consumedBeforeRestore(
+                                            "+I[Bob, {Processing input row 
+I[Bob, 1, 1970-01-01T00:00:00Z] at time 0 watermark null}, 
1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Registering timer bob 
for 0 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Registering timer for 0 
at time 0 watermark null}, 1970-01-01T00:00:00Z]",
+                                            "+I[Alice, {Processing input row 
+I[Alice, 1, 1970-01-01T00:00:00.001Z] at time 1 watermark -1}, 
1970-01-01T00:00:00.001Z]",
+                                            "+I[Alice, {Registering timer 
alice for 1 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]",
+                                            "+I[Bob, {Timer null fired at time 
0 watermark 0}, 1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Timer bob fired at time 
0 watermark 0}, 1970-01-01T00:00:00Z]")
+                                    .consumedAfterRestore(
+                                            "+I[Bob, {Processing input row 
+I[Bob, 2, 1970-01-01T00:01:39.999Z] at time 99999 watermark null}, 
1970-01-01T00:01:39.999Z]",
+                                            "+I[Bob, {Registering timer bob 
for 0 at time 99999 watermark null}, 1970-01-01T00:01:39.999Z]",
+                                            "+I[Bob, {Registering timer for 0 
at time 99999 watermark null}, 1970-01-01T00:01:39.999Z]",
+                                            "+I[Bob, {Timer null fired at time 
0 watermark 99998}, 1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Timer bob fired at time 
0 watermark 99998}, 1970-01-01T00:00:00Z]",
+                                            "+I[Alice, {Timer alice fired at 
time 1 watermark 99998}, 1970-01-01T00:00:00.001Z]",
+                                            "+I[Alice, {Registering timer 
alice-again for 0 at time 1 watermark 99998}, 1970-01-01T00:00:00.001Z]",
+                                            "+I[Alice, {Timer alice-again 
fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Processing input row 
+I[Bob, 3, 1970-01-01T00:00:00.003Z] at time 3 watermark 99998}, 
1970-01-01T00:00:00.003Z]",
+                                            "+I[Bob, {Registering timer bob 
for 0 at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]",
+                                            "+I[Bob, {Registering timer for 0 
at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]",
+                                            "+I[Bob, {Processing input row 
+I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark 99998}, 
1970-01-01T00:00:00.004Z]",
+                                            "+I[Bob, {Registering timer bob 
for 0 at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]",
+                                            "+I[Bob, {Registering timer for 0 
at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]",
+                                            "+I[Bob, {Timer null fired at time 
0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Timer bob fired at time 
0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM f(r => TABLE t 
PARTITION BY name, on_time => DESCRIPTOR(ts))")
+                    .build();
+
+    public static final TableTestProgram PROCESS_ROW_LATE_EVENTS =
+            TableTestProgram.of(
+                            "process-row-late-events",
+                            "test that late events enter a PTF with row 
semantics")
+                    .setupTemporarySystemFunction("f", 
RequiredTimeFunction.class)
+                    .setupTableSource(TIMED_SOURCE_LATE_EVENTS)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema(TIMED_BASE_SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[{+I[Bob, 1, 
1970-01-01T00:00:00Z]}, 1970-01-01T00:00:00Z]",
+                                            "+I[{+I[Alice, 1, 
1970-01-01T00:00:00.001Z]}, 1970-01-01T00:00:00.001Z]",
+                                            "+I[{+I[Bob, 2, 
1970-01-01T00:01:39.999Z]}, 1970-01-01T00:01:39.999Z]",
+                                            "+I[{+I[Bob, 3, 
1970-01-01T00:00:00.003Z]}, 1970-01-01T00:00:00.003Z]",
+                                            "+I[{+I[Bob, 4, 
1970-01-01T00:00:00.004Z]}, 1970-01-01T00:00:00.004Z]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM f(r => TABLE t, 
on_time => DESCRIPTOR(ts))")
+                    .build();
+
     public static final TableTestProgram PROCESS_SCALAR_ARGS_TIME =
             TableTestProgram.of(
                             "process-scalar-args-time",
@@ -1187,7 +1263,7 @@ public class ProcessTableFunctionTestPrograms {
     public static final TableTestProgram PROCESS_OPTIONAL_ON_TIME =
             TableTestProgram.of(
                             "process-optional-on-time",
-                            "test optional time attribute, fire once for 
constant timer")
+                            "test optional time attribute, re-fires timer for 
constant timer registration after time passed")
                     .setupTemporarySystemFunction("f", 
OptionalOnTimeFunction.class)
                     .setupTableSource(TIMED_SOURCE)
                     .setupTableSink(
@@ -1195,21 +1271,25 @@ public class ProcessTableFunctionTestPrograms {
                                     .addSchema(KEYED_BASE_SINK_SCHEMA)
                                     .consumedValues(
                                             "+I[Bob, {Processing input row 
+I[Bob, 1, 1970-01-01T00:00:00Z] at time null watermark null}]",
-                                            "+I[Bob, {Registering timer t for 
2 at time null watermark null}]",
+                                            "+I[Bob, {Registering timer t for 
1 at time null watermark null}]",
                                             "+I[Alice, {Processing input row 
+I[Alice, 1, 1970-01-01T00:00:00.001Z] at time null watermark -1}]",
-                                            "+I[Alice, {Registering timer t 
for 2 at time null watermark -1}]",
+                                            "+I[Alice, {Registering timer t 
for 1 at time null watermark -1}]",
                                             "+I[Bob, {Processing input row 
+I[Bob, 2, 1970-01-01T00:00:00.002Z] at time null watermark 0}]",
-                                            "+I[Bob, {Registering timer t for 
2 at time null watermark 0}]",
+                                            "+I[Bob, {Registering timer t for 
1 at time null watermark 0}]",
+                                            "+I[Alice, {Timer t fired at time 
1 watermark 1}]",
+                                            "+I[Bob, {Timer t fired at time 1 
watermark 1}]",
                                             "+I[Bob, {Processing input row 
+I[Bob, 3, 1970-01-01T00:00:00.003Z] at time null watermark 1}]",
                                             "+I[Bob, {Registering timer t for 
2 at time null watermark 1}]",
-                                            "+I[Alice, {Timer t fired at time 
2 watermark 2}]",
                                             "+I[Bob, {Timer t fired at time 2 
watermark 2}]",
                                             "+I[Bob, {Processing input row 
+I[Bob, 4, 1970-01-01T00:00:00.004Z] at time null watermark 2}]",
-                                            "+I[Bob, {Registering timer t for 
2 at time null watermark 2}]",
+                                            "+I[Bob, {Registering timer t for 
3 at time null watermark 2}]",
+                                            "+I[Bob, {Timer t fired at time 3 
watermark 3}]",
                                             "+I[Bob, {Processing input row 
+I[Bob, 5, 1970-01-01T00:00:00.005Z] at time null watermark 3}]",
-                                            "+I[Bob, {Registering timer t for 
2 at time null watermark 3}]",
+                                            "+I[Bob, {Registering timer t for 
4 at time null watermark 3}]",
+                                            "+I[Bob, {Timer t fired at time 4 
watermark 4}]",
                                             "+I[Bob, {Processing input row 
+I[Bob, 6, 1970-01-01T00:00:00.006Z] at time null watermark 4}]",
-                                            "+I[Bob, {Registering timer t for 
2 at time null watermark 4}]")
+                                            "+I[Bob, {Registering timer t for 
5 at time null watermark 4}]",
+                                            "+I[Bob, {Timer t fired at time 5 
watermark 5}]")
                                     .build())
                     .runSql("INSERT INTO sink SELECT * FROM f(r => TABLE t 
PARTITION BY name)")
                     .build();
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
index be6902a0b17..495293c9244 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
@@ -645,20 +645,23 @@ public class ProcessTableFunctionTestUtils {
         public void eval(Context ctx, @ArgumentHint({SET_SEMANTIC_TABLE, 
REQUIRE_ON_TIME}) Row r) {
             final TimeContext<Long> timeCtx = ctx.timeContext(Long.class);
             collectEvalEvent(timeCtx, r);
-            // all timers should be executed once
-            if (timeCtx.time() == 99998) {
-                // will never be fired because it's late
-                collectCreateTimer(timeCtx, "late", 1L);
+            if (r.getFieldAs("name").equals("Bob")) {
+                // Bob registers timers at 0
+                collectCreateTimer(timeCtx, "bob", 0L);
+                collectCreateTimer(timeCtx, 0L);
+            } else {
+                // Alice registers timer at 1
+                collectCreateTimer(timeCtx, "alice", 1L);
             }
-            collectCreateTimer(timeCtx, "t", 0L);
-            collectCreateTimer(timeCtx, 0L);
         }
 
         public void onTimer(OnTimerContext ctx) {
             final TimeContext<Long> timeCtx = ctx.timeContext(Long.class);
             collectOnTimerEvent(ctx);
-            // will never be fired because it's late
-            collectCreateTimer(timeCtx, "again", timeCtx.time());
+            if (ctx.currentTimer() != null && 
ctx.currentTimer().equals("alice")) {
+                // register a timer in the past, which should fire immediately
+                collectCreateTimer(timeCtx, "alice-again", 0);
+            }
         }
     }
 
@@ -699,7 +702,12 @@ public class ProcessTableFunctionTestUtils {
         public void eval(Context ctx, @ArgumentHint(SET_SEMANTIC_TABLE) Row r) 
{
             final TimeContext<Long> timeCtx = ctx.timeContext(Long.class);
             collectEvalEvent(timeCtx, r);
-            collectCreateTimer(timeCtx, "t", 2);
+            Long wm = timeCtx.currentWatermark();
+            // Register at wm+1 to always target the immediate next watermark: 
the timer fires
+            // exactly once per watermark advance, and each new row 
re-registers the timer for the
+            // following watermark step, demonstrating repeated timer 
re-registration.
+            long timer = wm == null || wm < 0 ? 1 : wm + 1;
+            collectCreateTimer(timeCtx, "t", timer);
         }
 
         public void onTimer(OnTimerContext ctx) {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/plan/process-late-events-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/plan/process-late-events-restore.json
new file mode 100644
index 00000000000..78ec15a7a7e
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/plan/process-late-events-restore.json
@@ -0,0 +1,320 @@
+{
+  "flinkVersion" : "2.3",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_2",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`t`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "score",
+              "dataType" : "INT"
+            }, {
+              "name" : "ts",
+              "dataType" : {
+                "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+                "precision" : 3,
+                "kind" : "ROWTIME"
+              }
+            } ],
+            "watermarkSpecs" : [ {
+              "rowtimeAttribute" : "ts",
+              "expression" : {
+                "rexNode" : {
+                  "kind" : "CALL",
+                  "syntax" : "BINARY",
+                  "internalName" : "$-$1",
+                  "operands" : [ {
+                    "kind" : "INPUT_REF",
+                    "inputIndex" : 2,
+                    "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE"
+                  }, {
+                    "kind" : "LITERAL",
+                    "value" : "1",
+                    "type" : "INTERVAL SECOND(6) NOT NULL"
+                  } ],
+                  "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE"
+                },
+                "serializableString" : "`ts` - INTERVAL '0.001' SECOND"
+              }
+            } ]
+          }
+        }
+      },
+      "abilities" : [ {
+        "type" : "WatermarkPushDown",
+        "watermarkExpr" : {
+          "kind" : "CALL",
+          "syntax" : "BINARY",
+          "internalName" : "$-$1",
+          "operands" : [ {
+            "kind" : "INPUT_REF",
+            "inputIndex" : 2,
+            "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE"
+          }, {
+            "kind" : "LITERAL",
+            "value" : "1",
+            "type" : "INTERVAL SECOND(6) NOT NULL"
+          } ],
+          "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE"
+        },
+        "rowtimeExpr" : {
+          "kind" : "INPUT_REF",
+          "inputIndex" : 2,
+          "type" : "TIMESTAMP(3) WITH LOCAL TIME ZONE"
+        },
+        "idleTimeoutMillis" : -1,
+        "producedType" : {
+          "type" : "ROW",
+          "nullable" : false,
+          "fields" : [ {
+            "name" : "name",
+            "fieldType" : "VARCHAR(2147483647)"
+          }, {
+            "name" : "score",
+            "fieldType" : "INT"
+          }, {
+            "name" : "ts",
+            "fieldType" : {
+              "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+              "precision" : 3,
+              "kind" : "ROWTIME"
+            }
+          } ]
+        },
+        "watermarkParams" : {
+          "emitStrategy" : "ON_EVENT",
+          "alignGroupName" : null,
+          "alignMaxDrift" : "PT0S",
+          "alignUpdateInterval" : "PT1S",
+          "sourceIdleTimeout" : -1
+        }
+      } ]
+    },
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "score",
+        "fieldType" : "INT"
+      }, {
+        "name" : "ts",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, t, watermark=[-(ts, 1:INTERVAL SECOND)], 
watermarkEmitStrategy=[on-event]]], fields=[name, score, ts])"
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-exchange_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "HASH",
+        "keys" : [ 0 ]
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "score",
+        "fieldType" : "INT"
+      }, {
+        "name" : "ts",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "Exchange(distribution=[hash[name]])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-process-table-function_1",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "out",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "ProcessTableFunction(invocation=[f(TABLE(#0) PARTITION 
BY($0), DESCRIPTOR(_UTF-16LE'ts'), DEFAULT())], uid=[f], 
select=[name,out,rowtime], rowType=[RecordType(VARCHAR(2147483647) name, 
VARCHAR(2147483647) out, TIMESTAMP_LTZ(3) *ROWTIME* rowtime)])",
+    "uid" : "f",
+    "functionCall" : {
+      "kind" : "CALL",
+      "systemName" : "f",
+      "operands" : [ {
+        "kind" : "TABLE_ARG_CALL",
+        "inputIndex" : 0,
+        "partitionKeys" : [ 0 ],
+        "orderKeys" : [ ],
+        "orderDirections" : [ ],
+        "type" : {
+          "type" : "ROW",
+          "nullable" : false,
+          "fields" : [ {
+            "name" : "name",
+            "fieldType" : "VARCHAR(2147483647)"
+          }, {
+            "name" : "score",
+            "fieldType" : "INT"
+          }, {
+            "name" : "ts",
+            "fieldType" : {
+              "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+              "precision" : 3,
+              "kind" : "ROWTIME"
+            }
+          } ]
+        }
+      }, {
+        "kind" : "CALL",
+        "internalName" : "$DESCRIPTOR$1",
+        "operands" : [ {
+          "kind" : "LITERAL",
+          "value" : "ts",
+          "type" : "CHAR(2) NOT NULL"
+        } ],
+        "type" : "DESCRIPTOR NOT NULL"
+      }, {
+        "kind" : "CALL",
+        "syntax" : "SPECIAL",
+        "internalName" : "$DEFAULT$1",
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : {
+        "type" : "ROW",
+        "nullable" : false,
+        "fields" : [ {
+          "name" : "name",
+          "fieldType" : "VARCHAR(2147483647)"
+        }, {
+          "name" : "out",
+          "fieldType" : "VARCHAR(2147483647)"
+        }, {
+          "name" : "rowtime",
+          "fieldType" : {
+            "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+            "nullable" : false,
+            "precision" : 3,
+            "kind" : "ROWTIME"
+          }
+        } ]
+      }
+    },
+    "inputChangelogModes" : [ [ "INSERT" ] ],
+    "outputChangelogMode" : [ "INSERT" ]
+  }, {
+    "id" : 4,
+    "type" : "stream-exec-sink_2",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.rowtime-inserter" : "ENABLED",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "out",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "rowtime",
+              "dataType" : "TIMESTAMP(3) WITH LOCAL TIME ZONE"
+            } ]
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT" ],
+    "upsertMaterializeStrategy" : "ADAPTIVE",
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : {
+      "type" : "ROW",
+      "fields" : [ {
+        "name" : "name",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "out",
+        "fieldType" : "VARCHAR(2147483647)"
+      }, {
+        "name" : "rowtime",
+        "fieldType" : {
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
+          "nullable" : false,
+          "precision" : 3,
+          "kind" : "ROWTIME"
+        }
+      } ]
+    },
+    "description" : "Sink(table=[default_catalog.default_database.sink], 
fields=[name, out, rowtime])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 3,
+    "target" : 4,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/savepoint/_metadata
new file mode 100644
index 00000000000..90af4c35308
Binary files /dev/null and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/process-late-events-restore/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
index 318dd4ae0d3..b90a201649e 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
@@ -160,10 +160,6 @@ public abstract class ProcessTableRunner extends 
AbstractRichFunction {
     }
 
     public void processEval() throws Exception {
-        // Drop late events
-        if (rowtime != null && rowtime <= currentWatermark) {
-            return;
-        }
         processMethod(this::callEval);
     }
 
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
index 93eee5e8e4f..cf3a6149af6 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
@@ -155,9 +155,16 @@ public abstract class AbstractProcessTableOperator extends 
AbstractStreamOperato
     @Override
     public void onEventTime(InternalTimer<RowData, Object> timer) throws 
Exception {
         final Object namedTimer = timer.getNamespace();
+        boolean isNamedTimer = namedTimer != VoidNamespace.INSTANCE;
+        // Remove the fired timer's state entry immediately to prevent stale 
entries from
+        // accumulating. Without this, entries for fired timers would persist 
until the same
+        // timer name is re-registered or the state is explicitly cleared.
+        if (isNamedTimer) {
+            namedTimersMapState.remove((StringData) namedTimer);
+        }
         processTableRunner.ingestTimerEvent(
                 timer.getKey(),
-                namedTimer == VoidNamespace.INSTANCE ? null : (StringData) 
namedTimer,
+                isNamedTimer ? (StringData) namedTimer : null,
                 timer.getTimestamp());
         processTableRunner.processOnTimer();
     }
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java
index 5fd996fecaa..ce97c630c80 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java
@@ -87,11 +87,6 @@ class WritableInternalTimeContext extends 
ReadableInternalTimeContext {
     }
 
     private void registerOnTimeInternal(@Nullable String name, long newTime) {
-        if (newTime <= unnamedTimerService.currentWatermark()) {
-            // Do not register timers for late events.
-            // Otherwise, the next watermark would trigger an onTimer() that 
emits late events.
-            return;
-        }
         if (name != null) {
             replaceNamedTimer(StringData.fromString(name), newTime);
         } else {


Reply via email to