This is an automated email from the ASF dual-hosted git repository. fhueske pushed a commit to branch fhueske-FLINK-39436-Allow-late-data-in-PTFs in repository https://gitbox.apache.org/repos/asf/flink.git
commit eba2f582ca59b15ac3f2ce3aad49d4255c59868f Author: Fabian Hueske <[email protected]> AuthorDate: Wed Apr 15 09:57:43 2026 +0200 [FLINK-39436] Allow late data in PTFs --- .../stream/ProcessTableFunctionTestPrograms.java | 29 ++++++++++++++-------- .../exec/stream/ProcessTableFunctionTestUtils.java | 8 ------ .../runtime/generated/ProcessTableRunner.java | 4 --- .../process/AbstractProcessTableOperator.java | 8 ++++++ .../process/WritableInternalTimeContext.java | 5 ---- 5 files changed, 27 insertions(+), 27 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java index e65dd8319f1..c22fb898162 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java @@ -1072,8 +1072,8 @@ public class ProcessTableFunctionTestPrograms { "+I[Bob, {Processing input row +I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark null}, 1970-01-01T00:00:00.004Z]", "+I[Bob, {Processing input row +I[Bob, 5, 1970-01-01T00:00:00.005Z] at time 5 watermark null}, 1970-01-01T00:00:00.005Z]", "+I[Bob, {Processing input row +I[Bob, 6, 1970-01-01T00:00:00.006Z] at time 6 watermark null}, 1970-01-01T00:00:00.006Z]", - "+I[Bob, {Timer timeout2 fired at time 2 watermark 9223372036854775807}, 1970-01-01T00:00:00.002Z]", - "+I[Bob, {Clearing all timers at time 2 watermark 9223372036854775807}, 1970-01-01T00:00:00.002Z]") + "+I[Bob, {Timer timeout2 fired at time 5 watermark 9223372036854775807}, 1970-01-01T00:00:00.005Z]", + "+I[Bob, {Clearing all timers at time 5 watermark 9223372036854775807}, 1970-01-01T00:00:00.005Z]") .build()) .runSql( "INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name, on_time => DESCRIPTOR(ts))") @@ -1112,7 +1112,7 @@ public class ProcessTableFunctionTestPrograms { public static final TableTestProgram PROCESS_LATE_EVENTS = TableTestProgram.of( "process-late-events", - "test that late events are dropped in both input and when registering timers") + "test that late events enter PTF (eval and timer registration)") .setupTemporarySystemFunction("f", LateTimersFunction.class) .setupTableSource(TIMED_SOURCE_LATE_EVENTS) .setupTableSink( @@ -1126,16 +1126,22 @@ public class ProcessTableFunctionTestPrograms { "+I[Alice, {Registering timer t for 0 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]", "+I[Alice, {Registering timer for 0 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]", "+I[Bob, {Timer null fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Bob, {Registering timer again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]", "+I[Alice, {Timer null fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Alice, {Registering timer again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]", "+I[Bob, {Timer t fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Bob, {Registering timer again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]", "+I[Alice, {Timer t fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Alice, {Registering timer again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]", "+I[Bob, {Processing input row +I[Bob, 2, 1970-01-01T00:01:39.999Z] at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]", "+I[Bob, {Registering timer t for 0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]", - "+I[Bob, {Registering timer for 0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]") + "+I[Bob, {Registering timer for 0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]", + "+I[Bob, {Timer null fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Timer t fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Processing input row +I[Bob, 3, 1970-01-01T00:00:00.003Z] at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]", + "+I[Bob, {Registering timer t for 0 at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]", + "+I[Bob, {Registering timer for 0 at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]", + "+I[Bob, {Processing input row +I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]", + "+I[Bob, {Registering timer t for 0 at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]", + "+I[Bob, {Registering timer for 0 at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]", + "+I[Bob, {Timer null fired at time 0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Timer t fired at time 0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]") .build()) .runSql( "INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name, on_time => DESCRIPTOR(ts))") @@ -1184,7 +1190,7 @@ public class ProcessTableFunctionTestPrograms { public static final TableTestProgram PROCESS_OPTIONAL_ON_TIME = TableTestProgram.of( "process-optional-on-time", - "test optional time attribute, fire once for constant timer") + "test optional time attribute, re-fires timer for constant timer registration after time passed") .setupTemporarySystemFunction("f", OptionalOnTimeFunction.class) .setupTableSource(TIMED_SOURCE) .setupTableSink( @@ -1203,10 +1209,13 @@ public class ProcessTableFunctionTestPrograms { "+I[Bob, {Timer t fired at time 2 watermark 2}]", "+I[Bob, {Processing input row +I[Bob, 4, 1970-01-01T00:00:00.004Z] at time null watermark 2}]", "+I[Bob, {Registering timer t for 2 at time null watermark 2}]", + "+I[Bob, {Timer t fired at time 2 watermark 3}]", "+I[Bob, {Processing input row +I[Bob, 5, 1970-01-01T00:00:00.005Z] at time null watermark 3}]", "+I[Bob, {Registering timer t for 2 at time null watermark 3}]", + "+I[Bob, {Timer t fired at time 2 watermark 4}]", "+I[Bob, {Processing input row +I[Bob, 6, 1970-01-01T00:00:00.006Z] at time null watermark 4}]", - "+I[Bob, {Registering timer t for 2 at time null watermark 4}]") + "+I[Bob, {Registering timer t for 2 at time null watermark 4}]", + "+I[Bob, {Timer t fired at time 2 watermark 5}]") .build()) .runSql("INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name)") .build(); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java index cab52688050..d0c246861f4 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java @@ -610,20 +610,12 @@ public class ProcessTableFunctionTestUtils { public void eval(Context ctx, @ArgumentHint({SET_SEMANTIC_TABLE, REQUIRE_ON_TIME}) Row r) { final TimeContext<Long> timeCtx = ctx.timeContext(Long.class); collectEvalEvent(timeCtx, r); - // all timers should be executed once - if (timeCtx.time() == 99998) { - // will never be fired because it's late - collectCreateTimer(timeCtx, "late", 1L); - } collectCreateTimer(timeCtx, "t", 0L); collectCreateTimer(timeCtx, 0L); } public void onTimer(OnTimerContext ctx) { - final TimeContext<Long> timeCtx = ctx.timeContext(Long.class); collectOnTimerEvent(ctx); - // will never be fired because it's late - collectCreateTimer(timeCtx, "again", timeCtx.time()); } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java index eda3a6c2442..8aa7f046382 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java @@ -153,10 +153,6 @@ public abstract class ProcessTableRunner extends AbstractRichFunction { } public void processEval() throws Exception { - // Drop late events - if (rowtime != null && rowtime <= currentWatermark) { - return; - } processMethod(this::callEval); } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java index 5432a826cc2..10695d385c2 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java @@ -154,6 +154,14 @@ public abstract class AbstractProcessTableOperator extends AbstractStreamOperato @Override public void onEventTime(InternalTimer<RowData, Object> timer) throws Exception { final Object namedTimer = timer.getNamespace(); + // Clean up the named timer state before calling onTimer() so that if the callback + // re-registers the same timer name, it sees a clean (null) state entry. Without cleanup, + // stale state from a previously fired timer would cause replaceNamedTimer() to delete an + // already-fired timer entry and then re-register it, potentially leading to unexpected + // repeated timer firings. + if (namedTimersMapState != null && namedTimer != VoidNamespace.INSTANCE) { + namedTimersMapState.remove((StringData) namedTimer); + } processTableRunner.ingestTimerEvent( timer.getKey(), namedTimer == VoidNamespace.INSTANCE ? null : (StringData) namedTimer, diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java index 5fd996fecaa..ce97c630c80 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java @@ -87,11 +87,6 @@ class WritableInternalTimeContext extends ReadableInternalTimeContext { } private void registerOnTimeInternal(@Nullable String name, long newTime) { - if (newTime <= unnamedTimerService.currentWatermark()) { - // Do not register timers for late events. - // Otherwise, the next watermark would trigger an onTimer() that emits late events. - return; - } if (name != null) { replaceNamedTimer(StringData.fromString(name), newTime); } else {
