This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch fhueske-FLINK-39436-Allow-late-data-in-PTFs
in repository https://gitbox.apache.org/repos/asf/flink.git

commit eba2f582ca59b15ac3f2ce3aad49d4255c59868f
Author: Fabian Hueske <[email protected]>
AuthorDate: Wed Apr 15 09:57:43 2026 +0200

    [FLINK-39436] Allow late data in PTFs
---
 .../stream/ProcessTableFunctionTestPrograms.java   | 29 ++++++++++++++--------
 .../exec/stream/ProcessTableFunctionTestUtils.java |  8 ------
 .../runtime/generated/ProcessTableRunner.java      |  4 ---
 .../process/AbstractProcessTableOperator.java      |  8 ++++++
 .../process/WritableInternalTimeContext.java       |  5 ----
 5 files changed, 27 insertions(+), 27 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
index e65dd8319f1..c22fb898162 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java
@@ -1072,8 +1072,8 @@ public class ProcessTableFunctionTestPrograms {
                                             "+I[Bob, {Processing input row 
+I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark null}, 
1970-01-01T00:00:00.004Z]",
                                             "+I[Bob, {Processing input row 
+I[Bob, 5, 1970-01-01T00:00:00.005Z] at time 5 watermark null}, 
1970-01-01T00:00:00.005Z]",
                                             "+I[Bob, {Processing input row 
+I[Bob, 6, 1970-01-01T00:00:00.006Z] at time 6 watermark null}, 
1970-01-01T00:00:00.006Z]",
-                                            "+I[Bob, {Timer timeout2 fired at 
time 2 watermark 9223372036854775807}, 1970-01-01T00:00:00.002Z]",
-                                            "+I[Bob, {Clearing all timers at 
time 2 watermark 9223372036854775807}, 1970-01-01T00:00:00.002Z]")
+                                            "+I[Bob, {Timer timeout2 fired at 
time 5 watermark 9223372036854775807}, 1970-01-01T00:00:00.005Z]",
+                                            "+I[Bob, {Clearing all timers at 
time 5 watermark 9223372036854775807}, 1970-01-01T00:00:00.005Z]")
                                     .build())
                     .runSql(
                             "INSERT INTO sink SELECT * FROM f(r => TABLE t 
PARTITION BY name, on_time => DESCRIPTOR(ts))")
@@ -1112,7 +1112,7 @@ public class ProcessTableFunctionTestPrograms {
     public static final TableTestProgram PROCESS_LATE_EVENTS =
             TableTestProgram.of(
                             "process-late-events",
-                            "test that late events are dropped in both input 
and when registering timers")
+                            "test that late events enter PTF (eval and timer 
registration)")
                     .setupTemporarySystemFunction("f", 
LateTimersFunction.class)
                     .setupTableSource(TIMED_SOURCE_LATE_EVENTS)
                     .setupTableSink(
@@ -1126,16 +1126,22 @@ public class ProcessTableFunctionTestPrograms {
                                             "+I[Alice, {Registering timer t 
for 0 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]",
                                             "+I[Alice, {Registering timer for 
0 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]",
                                             "+I[Bob, {Timer null fired at time 
0 watermark 0}, 1970-01-01T00:00:00Z]",
-                                            "+I[Bob, {Registering timer again 
for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
                                             "+I[Alice, {Timer null fired at 
time 0 watermark 0}, 1970-01-01T00:00:00Z]",
-                                            "+I[Alice, {Registering timer 
again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
                                             "+I[Bob, {Timer t fired at time 0 
watermark 0}, 1970-01-01T00:00:00Z]",
-                                            "+I[Bob, {Registering timer again 
for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
                                             "+I[Alice, {Timer t fired at time 
0 watermark 0}, 1970-01-01T00:00:00Z]",
-                                            "+I[Alice, {Registering timer 
again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]",
                                             "+I[Bob, {Processing input row 
+I[Bob, 2, 1970-01-01T00:01:39.999Z] at time 99999 watermark 0}, 
1970-01-01T00:01:39.999Z]",
                                             "+I[Bob, {Registering timer t for 
0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]",
-                                            "+I[Bob, {Registering timer for 0 
at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]")
+                                            "+I[Bob, {Registering timer for 0 
at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]",
+                                            "+I[Bob, {Timer null fired at time 
0 watermark 99998}, 1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Timer t fired at time 0 
watermark 99998}, 1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Processing input row 
+I[Bob, 3, 1970-01-01T00:00:00.003Z] at time 3 watermark 99998}, 
1970-01-01T00:00:00.003Z]",
+                                            "+I[Bob, {Registering timer t for 
0 at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]",
+                                            "+I[Bob, {Registering timer for 0 
at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]",
+                                            "+I[Bob, {Processing input row 
+I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark 99998}, 
1970-01-01T00:00:00.004Z]",
+                                            "+I[Bob, {Registering timer t for 
0 at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]",
+                                            "+I[Bob, {Registering timer for 0 
at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]",
+                                            "+I[Bob, {Timer null fired at time 
0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]",
+                                            "+I[Bob, {Timer t fired at time 0 
watermark 9223372036854775807}, 1970-01-01T00:00:00Z]")
                                     .build())
                     .runSql(
                             "INSERT INTO sink SELECT * FROM f(r => TABLE t 
PARTITION BY name, on_time => DESCRIPTOR(ts))")
@@ -1184,7 +1190,7 @@ public class ProcessTableFunctionTestPrograms {
     public static final TableTestProgram PROCESS_OPTIONAL_ON_TIME =
             TableTestProgram.of(
                             "process-optional-on-time",
-                            "test optional time attribute, fire once for 
constant timer")
+                            "test optional time attribute, re-fires timer for 
constant timer registration after time passed")
                     .setupTemporarySystemFunction("f", 
OptionalOnTimeFunction.class)
                     .setupTableSource(TIMED_SOURCE)
                     .setupTableSink(
@@ -1203,10 +1209,13 @@ public class ProcessTableFunctionTestPrograms {
                                             "+I[Bob, {Timer t fired at time 2 
watermark 2}]",
                                             "+I[Bob, {Processing input row 
+I[Bob, 4, 1970-01-01T00:00:00.004Z] at time null watermark 2}]",
                                             "+I[Bob, {Registering timer t for 
2 at time null watermark 2}]",
+                                            "+I[Bob, {Timer t fired at time 2 
watermark 3}]",
                                             "+I[Bob, {Processing input row 
+I[Bob, 5, 1970-01-01T00:00:00.005Z] at time null watermark 3}]",
                                             "+I[Bob, {Registering timer t for 
2 at time null watermark 3}]",
+                                            "+I[Bob, {Timer t fired at time 2 
watermark 4}]",
                                             "+I[Bob, {Processing input row 
+I[Bob, 6, 1970-01-01T00:00:00.006Z] at time null watermark 4}]",
-                                            "+I[Bob, {Registering timer t for 
2 at time null watermark 4}]")
+                                            "+I[Bob, {Registering timer t for 
2 at time null watermark 4}]",
+                                            "+I[Bob, {Timer t fired at time 2 
watermark 5}]")
                                     .build())
                     .runSql("INSERT INTO sink SELECT * FROM f(r => TABLE t 
PARTITION BY name)")
                     .build();
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
index cab52688050..d0c246861f4 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java
@@ -610,20 +610,12 @@ public class ProcessTableFunctionTestUtils {
         public void eval(Context ctx, @ArgumentHint({SET_SEMANTIC_TABLE, 
REQUIRE_ON_TIME}) Row r) {
             final TimeContext<Long> timeCtx = ctx.timeContext(Long.class);
             collectEvalEvent(timeCtx, r);
-            // all timers should be executed once
-            if (timeCtx.time() == 99998) {
-                // will never be fired because it's late
-                collectCreateTimer(timeCtx, "late", 1L);
-            }
             collectCreateTimer(timeCtx, "t", 0L);
             collectCreateTimer(timeCtx, 0L);
         }
 
         public void onTimer(OnTimerContext ctx) {
-            final TimeContext<Long> timeCtx = ctx.timeContext(Long.class);
             collectOnTimerEvent(ctx);
-            // will never be fired because it's late
-            collectCreateTimer(timeCtx, "again", timeCtx.time());
         }
     }
 
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
index eda3a6c2442..8aa7f046382 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
@@ -153,10 +153,6 @@ public abstract class ProcessTableRunner extends 
AbstractRichFunction {
     }
 
     public void processEval() throws Exception {
-        // Drop late events
-        if (rowtime != null && rowtime <= currentWatermark) {
-            return;
-        }
         processMethod(this::callEval);
     }
 
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
index 5432a826cc2..10695d385c2 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
@@ -154,6 +154,14 @@ public abstract class AbstractProcessTableOperator extends 
AbstractStreamOperato
     @Override
     public void onEventTime(InternalTimer<RowData, Object> timer) throws 
Exception {
         final Object namedTimer = timer.getNamespace();
+        // Clean up the named timer state before calling onTimer() so that if 
the callback
+        // re-registers the same timer name, it sees a clean (null) state 
entry. Without cleanup,
+        // stale state from a previously fired timer would cause 
replaceNamedTimer() to delete an
+        // already-fired timer entry and then re-register it, potentially 
leading to unexpected
+        // repeated timer firings.
+        if (namedTimersMapState != null && namedTimer != 
VoidNamespace.INSTANCE) {
+            namedTimersMapState.remove((StringData) namedTimer);
+        }
         processTableRunner.ingestTimerEvent(
                 timer.getKey(),
                 namedTimer == VoidNamespace.INSTANCE ? null : (StringData) 
namedTimer,
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java
index 5fd996fecaa..ce97c630c80 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java
@@ -87,11 +87,6 @@ class WritableInternalTimeContext extends 
ReadableInternalTimeContext {
     }
 
     private void registerOnTimeInternal(@Nullable String name, long newTime) {
-        if (newTime <= unnamedTimerService.currentWatermark()) {
-            // Do not register timers for late events.
-            // Otherwise, the next watermark would trigger an onTimer() that 
emits late events.
-            return;
-        }
         if (name != null) {
             replaceNamedTimer(StringData.fromString(name), newTime);
         } else {

Reply via email to