This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch fhueske-FLINK-39436-Allow-late-data-in-PTFs
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 857e77c4ce7459c44b911941b2019e2e9025fbb4
Author: Fabian Hueske <[email protected]>
AuthorDate: Wed Apr 15 09:57:43 2026 +0200

    [FLINK-39436] Allow late data in PTFs
    
    Previously, late events (rowtime <= watermark) were silently dropped
    before reaching PTF eval(), and timer registrations for times <=
    watermark were also silently dropped. This change removes both
    restrictions:
    
    - ProcessTableRunner: remove the early-return guard in processEval()
      so that late events are passed to the PTF's eval() method.
    
    - WritableInternalTimeContext: remove the watermark check in
      registerOnTimeInternal() so that timers can be registered for past
      times. Such timers fire immediately at the next watermark advance,
      including when registered from within onTimer(). The previous guard
      also had an unintended side effect: any call to replaceNamedTimer()
      with a past time would delete the existing timer entry but then
      silently drop the new registration, leaving the named timer in a
      state where it appeared un-registered but the old timer was gone.
    
    - AbstractProcessTableOperator: remove the fired named timer's state
      entry before invoking onTimer() to prevent stale entries from
      accumulating in the named timers map state.
    
    Tests are updated to reflect the new semantics:
    
    - PROCESS_LATE_EVENTS: demonstrates that late events enter eval(),
      can register timers (including for past times), and that such timers
      fire immediately at the next watermark advance.
    
    - PROCESS_ROW_LATE_EVENTS (new): verifies the same for row-semantics
      PTFs.
    
    - PROCESS_OPTIONAL_ON_TIME / PROCESS_NAMED_TIMERS: updated to reflect
      that timer registrations for past times are no longer dropped.
      Previously, once the watermark passed the registered time, the timer
      was silently discarded; now it fires immediately.
---
 .../stream/ProcessTableFunctionSemanticTests.java  |  1 +
 .../stream/ProcessTableFunctionTestPrograms.java   | 77 +++++++++++++++-------
 .../exec/stream/ProcessTableFunctionTestUtils.java | 26 +++++---
 .../runtime/generated/ProcessTableRunner.java      |  4 --
 .../process/AbstractProcessTableOperator.java      |  6 ++
 .../process/WritableInternalTimeContext.java       |  5 --
 6 files changed, 78 insertions(+), 41 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
index cace4ad26c5..6fce5b569df 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java
@@ -87,6 +87,7 @@ public class ProcessTableFunctionSemanticTests extends 
SemanticTestBase {
                 ProcessTableFunctionTestPrograms.PROCESS_NAMED_TIMERS,
                 ProcessTableFunctionTestPrograms.PROCESS_UNNAMED_TIMERS,
                 ProcessTableFunctionTestPrograms.PROCESS_LATE_EVENTS,
+                ProcessTableFunctionTestPrograms.PROCESS_ROW_LATE_EVENTS,
                 ProcessTableFunctionTestPrograms.PROCESS_SCALAR_ARGS_TIME,
                 
ProcessTableFunctionTestPrograms.PROCESS_OPTIONAL_PARTITION_BY_TIME,
                 ProcessTableFunctionTestPrograms.PROCESS_OPTIONAL_ON_TIME,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
index e65dd8319f1..2153481e15e 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
@@ -48,6 +48,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctio
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.PojoStateFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.PojoStateTimeFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.PojoWithDefaultStateFunction;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.RequiredTimeFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.RowSemanticTableFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.RowSemanticTablePassThroughFunction;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ScalarArgsFunction;
@@ -1072,8 +1073,8 @@ public class ProcessTableFunctionTestPrograms {
                                             "+I[Bob, {Processing input row 
+I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark null}, 
1970-01-01T00:00:00.004Z]",
                                             "+I[Bob, {Processing input row 
+I[Bob, 5, 1970-01-01T00:00:00.005Z] at time 5 watermark null}, 
1970-01-01T00:00:00.005Z]",
                                             "+I[Bob, {Processing input row 
+I[Bob, 6, 1970-01-01T00:00:00.006Z] at time 6 watermark null}, 
1970-01-01T00:00:00.006Z]",
-                                            "+I[Bob, {Timer timeout2 fired at 
time 2 watermark 9223372036854775807}, 1970-01-01T00:00:00.002Z]",
-                                            "+I[Bob, {Clearing all timers at 
time 2 watermark 9223372036854775807}, 1970-01-01T00:00:00.002Z]")
+                                            "+I[Bob, {Timer timeout2 fired at 
time 5 watermark 9223372036854775807}, 1970-01-01T00:00:00.005Z]",
+                                            "+I[Bob, {Clearing all timers at 
time 5 watermark 9223372036854775807}, 1970-01-01T00:00:00.005Z]")
                                     .build())
                     .runSql(
                             "INSERT INTO sink SELECT * FROM f(r => TABLE t 
PARTITION BY name, on_time => DESCRIPTOR(ts))")
@@ -1112,7 +1113,7 @@ public class ProcessTableFunctionTestPrograms {
     public static final TableTestProgram PROCESS_LATE_EVENTS =
             TableTestProgram.of(
                             "process-late-events",
-                            "test that late events are dropped in both input 
and when registering timers")
+                            "test that late events enter PTF (eval and timer 
registration)")
                     .setupTemporarySystemFunction("f", 
LateTimersFunction.class)
                     .setupTableSource(TIMED_SOURCE_LATE_EVENTS)
                     .setupTableSink(
@@ -1120,27 +1121,53 @@ public class ProcessTableFunctionTestPrograms {
                                     .addSchema(KEYED_TIMED_BASE_SINK_SCHEMA)
                                     .consumedValues(
                                             "+I[Bob, {Processing input row 
+I[Bob, 1, 1970-01-01T00:00:00Z] at time 0 watermark null}, 
1970-01-01T00:00:00Z]",
-                                            "+I[Bob, {Registering timer t for 
0 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Registering timer bob 
for 0 at time 0 watermark null}, 1970-01-01T00:00:00Z]",
                                             "+I[Bob, {Registering timer for 0 
at time 0 watermark null}, 1970-01-01T00:00:00Z]",
                                             "+I[Alice, {Processing input row 
+I[Alice, 1, 1970-01-01T00:00:00.001Z] at time 1 watermark -1}, 
1970-01-01T00:00:00.001Z]",
-                                            "+I[Alice, {Registering timer t 
for 0 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]",
-                                            "+I[Alice, {Registering timer for 
0 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]",
+                                            "+I[Alice, {Registering timer 
alice for 1 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]",
                                             "+I[Bob, {Timer null fired at time 
0 watermark 0}, 1970-01-01T00:00:00Z]",
-                                            "+I[Bob, {Registering timer again 
for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
-                                            "+I[Alice, {Timer null fired at 
time 0 watermark 0}, 1970-01-01T00:00:00Z]",
-                                            "+I[Alice, {Registering timer 
again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
-                                            "+I[Bob, {Timer t fired at time 0 
watermark 0}, 1970-01-01T00:00:00Z]",
-                                            "+I[Bob, {Registering timer again 
for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
-                                            "+I[Alice, {Timer t fired at time 
0 watermark 0}, 1970-01-01T00:00:00Z]",
-                                            "+I[Alice, {Registering timer 
again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Timer bob fired at time 
0 watermark 0}, 1970-01-01T00:00:00Z]",
                                             "+I[Bob, {Processing input row 
+I[Bob, 2, 1970-01-01T00:01:39.999Z] at time 99999 watermark 0}, 
1970-01-01T00:01:39.999Z]",
-                                            "+I[Bob, {Registering timer t for 
0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]",
-                                            "+I[Bob, {Registering timer for 0 
at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]")
+                                            "+I[Bob, {Registering timer bob 
for 0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]",
+                                            "+I[Bob, {Registering timer for 0 
at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]",
+                                            "+I[Bob, {Timer null fired at time 
0 watermark 99998}, 1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Timer bob fired at time 
0 watermark 99998}, 1970-01-01T00:00:00Z]",
+                                            "+I[Alice, {Timer alice fired at 
time 1 watermark 99998}, 1970-01-01T00:00:00.001Z]",
+                                            "+I[Alice, {Registering timer 
alice-again for 0 at time 1 watermark 99998}, 1970-01-01T00:00:00.001Z]",
+                                            "+I[Alice, {Timer alice-again 
fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Processing input row 
+I[Bob, 3, 1970-01-01T00:00:00.003Z] at time 3 watermark 99998}, 
1970-01-01T00:00:00.003Z]",
+                                            "+I[Bob, {Registering timer bob 
for 0 at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]",
+                                            "+I[Bob, {Registering timer for 0 
at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]",
+                                            "+I[Bob, {Processing input row 
+I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark 99998}, 
1970-01-01T00:00:00.004Z]",
+                                            "+I[Bob, {Registering timer bob 
for 0 at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]",
+                                            "+I[Bob, {Registering timer for 0 
at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]",
+                                            "+I[Bob, {Timer null fired at time 
0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Timer bob fired at time 
0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]")
                                     .build())
                     .runSql(
                             "INSERT INTO sink SELECT * FROM f(r => TABLE t 
PARTITION BY name, on_time => DESCRIPTOR(ts))")
                     .build();
 
+    public static final TableTestProgram PROCESS_ROW_LATE_EVENTS =
+            TableTestProgram.of(
+                            "process-row-late-events",
+                            "test that late events enter a row-level PTF")
+                    .setupTemporarySystemFunction("f", 
RequiredTimeFunction.class)
+                    .setupTableSource(TIMED_SOURCE_LATE_EVENTS)
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema(TIMED_BASE_SINK_SCHEMA)
+                                    .consumedValues(
+                                            "+I[{+I[Bob, 1, 
1970-01-01T00:00:00Z]}, 1970-01-01T00:00:00Z]",
+                                            "+I[{+I[Alice, 1, 
1970-01-01T00:00:00.001Z]}, 1970-01-01T00:00:00.001Z]",
+                                            "+I[{+I[Bob, 2, 
1970-01-01T00:01:39.999Z]}, 1970-01-01T00:01:39.999Z]",
+                                            "+I[{+I[Bob, 3, 
1970-01-01T00:00:00.003Z]}, 1970-01-01T00:00:00.003Z]",
+                                            "+I[{+I[Bob, 4, 
1970-01-01T00:00:00.004Z]}, 1970-01-01T00:00:00.004Z]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM f(r => TABLE t, 
on_time => DESCRIPTOR(ts))")
+                    .build();
+
     public static final TableTestProgram PROCESS_SCALAR_ARGS_TIME =
             TableTestProgram.of(
                             "process-scalar-args-time",
@@ -1184,7 +1211,7 @@ public class ProcessTableFunctionTestPrograms {
     public static final TableTestProgram PROCESS_OPTIONAL_ON_TIME =
             TableTestProgram.of(
                             "process-optional-on-time",
-                            "test optional time attribute, fire once for 
constant timer")
+                            "test optional time attribute, re-fires timer for 
constant timer registration after time passed")
                     .setupTemporarySystemFunction("f", 
OptionalOnTimeFunction.class)
                     .setupTableSource(TIMED_SOURCE)
                     .setupTableSink(
@@ -1192,21 +1219,25 @@ public class ProcessTableFunctionTestPrograms {
                                     .addSchema(KEYED_BASE_SINK_SCHEMA)
                                     .consumedValues(
                                             "+I[Bob, {Processing input row 
+I[Bob, 1, 1970-01-01T00:00:00Z] at time null watermark null}]",
-                                            "+I[Bob, {Registering timer t for 
2 at time null watermark null}]",
+                                            "+I[Bob, {Registering timer t for 
1 at time null watermark null}]",
                                             "+I[Alice, {Processing input row 
+I[Alice, 1, 1970-01-01T00:00:00.001Z] at time null watermark -1}]",
-                                            "+I[Alice, {Registering timer t 
for 2 at time null watermark -1}]",
+                                            "+I[Alice, {Registering timer t 
for 1 at time null watermark -1}]",
                                             "+I[Bob, {Processing input row 
+I[Bob, 2, 1970-01-01T00:00:00.002Z] at time null watermark 0}]",
-                                            "+I[Bob, {Registering timer t for 
2 at time null watermark 0}]",
+                                            "+I[Bob, {Registering timer t for 
1 at time null watermark 0}]",
+                                            "+I[Alice, {Timer t fired at time 
1 watermark 1}]",
+                                            "+I[Bob, {Timer t fired at time 1 
watermark 1}]",
                                             "+I[Bob, {Processing input row 
+I[Bob, 3, 1970-01-01T00:00:00.003Z] at time null watermark 1}]",
                                             "+I[Bob, {Registering timer t for 
2 at time null watermark 1}]",
-                                            "+I[Alice, {Timer t fired at time 
2 watermark 2}]",
                                             "+I[Bob, {Timer t fired at time 2 
watermark 2}]",
                                             "+I[Bob, {Processing input row 
+I[Bob, 4, 1970-01-01T00:00:00.004Z] at time null watermark 2}]",
-                                            "+I[Bob, {Registering timer t for 
2 at time null watermark 2}]",
+                                            "+I[Bob, {Registering timer t for 
3 at time null watermark 2}]",
+                                            "+I[Bob, {Timer t fired at time 3 
watermark 3}]",
                                             "+I[Bob, {Processing input row 
+I[Bob, 5, 1970-01-01T00:00:00.005Z] at time null watermark 3}]",
-                                            "+I[Bob, {Registering timer t for 
2 at time null watermark 3}]",
+                                            "+I[Bob, {Registering timer t for 
4 at time null watermark 3}]",
+                                            "+I[Bob, {Timer t fired at time 4 
watermark 4}]",
                                             "+I[Bob, {Processing input row 
+I[Bob, 6, 1970-01-01T00:00:00.006Z] at time null watermark 4}]",
-                                            "+I[Bob, {Registering timer t for 
2 at time null watermark 4}]")
+                                            "+I[Bob, {Registering timer t for 
5 at time null watermark 4}]",
+                                            "+I[Bob, {Timer t fired at time 5 
watermark 5}]")
                                     .build())
                     .runSql("INSERT INTO sink SELECT * FROM f(r => TABLE t 
PARTITION BY name)")
                     .build();
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
index cab52688050..57c15bdc444 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
@@ -610,20 +610,23 @@ public class ProcessTableFunctionTestUtils {
         public void eval(Context ctx, @ArgumentHint({SET_SEMANTIC_TABLE, 
REQUIRE_ON_TIME}) Row r) {
             final TimeContext<Long> timeCtx = ctx.timeContext(Long.class);
             collectEvalEvent(timeCtx, r);
-            // all timers should be executed once
-            if (timeCtx.time() == 99998) {
-                // will never be fired because it's late
-                collectCreateTimer(timeCtx, "late", 1L);
+            if (r.getFieldAs("name").equals("Bob")) {
+                // Bob registers timers at 0
+                collectCreateTimer(timeCtx, "bob", 0L);
+                collectCreateTimer(timeCtx, 0L);
+            } else {
+                // Alice registers timer at 1
+                collectCreateTimer(timeCtx, "alice", 1L);
             }
-            collectCreateTimer(timeCtx, "t", 0L);
-            collectCreateTimer(timeCtx, 0L);
         }
 
         public void onTimer(OnTimerContext ctx) {
             final TimeContext<Long> timeCtx = ctx.timeContext(Long.class);
             collectOnTimerEvent(ctx);
-            // will never be fired because it's late
-            collectCreateTimer(timeCtx, "again", timeCtx.time());
+            if (ctx.currentTimer() != null && 
ctx.currentTimer().equals("alice")) {
+                // register a timer in the past, which should fire immediately
+                collectCreateTimer(timeCtx, "alice-again", 0);
+            }
         }
     }
 
@@ -664,7 +667,12 @@ public class ProcessTableFunctionTestUtils {
         public void eval(Context ctx, @ArgumentHint(SET_SEMANTIC_TABLE) Row r) 
{
             final TimeContext<Long> timeCtx = ctx.timeContext(Long.class);
             collectEvalEvent(timeCtx, r);
-            collectCreateTimer(timeCtx, "t", 2);
+            Long wm = timeCtx.currentWatermark();
+            // Register at wm+1 to always target the immediate next watermark: 
the timer fires
+            // exactly once per watermark advance, and each new row 
re-registers the timer for the
+            // following watermark step, demonstrating repeated timer 
re-registration.
+            long timer = wm == null || wm < 0 ? 1 : wm + 1;
+            collectCreateTimer(timeCtx, "t", timer);
         }
 
         public void onTimer(OnTimerContext ctx) {
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
index eda3a6c2442..8aa7f046382 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
@@ -153,10 +153,6 @@ public abstract class ProcessTableRunner extends 
AbstractRichFunction {
     }
 
     public void processEval() throws Exception {
-        // Drop late events
-        if (rowtime != null && rowtime <= currentWatermark) {
-            return;
-        }
         processMethod(this::callEval);
     }
 
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
index 5432a826cc2..21b33337efc 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
@@ -154,6 +154,12 @@ public abstract class AbstractProcessTableOperator extends 
AbstractStreamOperato
     @Override
     public void onEventTime(InternalTimer<RowData, Object> timer) throws 
Exception {
         final Object namedTimer = timer.getNamespace();
+        // Remove the fired timer's state entry immediately to prevent stale 
entries from
+        // accumulating. Without this, entries for fired timers would persist 
until the same
+        // timer name is re-registered or the state is explicitly cleared.
+        if (namedTimersMapState != null && namedTimer != 
VoidNamespace.INSTANCE) {
+            namedTimersMapState.remove((StringData) namedTimer);
+        }
         processTableRunner.ingestTimerEvent(
                 timer.getKey(),
                 namedTimer == VoidNamespace.INSTANCE ? null : (StringData) 
namedTimer,
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java
index 5fd996fecaa..ce97c630c80 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java
@@ -87,11 +87,6 @@ class WritableInternalTimeContext extends 
ReadableInternalTimeContext {
     }
 
     private void registerOnTimeInternal(@Nullable String name, long newTime) {
-        if (newTime <= unnamedTimerService.currentWatermark()) {
-            // Do not register timers for late events.
-            // Otherwise, the next watermark would trigger an onTimer() that 
emits late events.
-            return;
-        }
         if (name != null) {
             replaceNamedTimer(StringData.fromString(name), newTime);
         } else {

Reply via email to