This is an automated email from the ASF dual-hosted git repository. fhueske pushed a commit to branch fhueske-FLINK-39436-Allow-late-data-in-PTFs in repository https://gitbox.apache.org/repos/asf/flink.git
commit 857e77c4ce7459c44b911941b2019e2e9025fbb4 Author: Fabian Hueske <[email protected]> AuthorDate: Wed Apr 15 09:57:43 2026 +0200 [FLINK-39436] Allow late data in PTFs Previously, late events (rowtime <= watermark) were silently dropped before reaching PTF eval(), and timer registrations for times <= watermark were also silently dropped. This change removes both restrictions: - ProcessTableRunner: remove the early-return guard in processEval() so that late events are passed to the PTF's eval() method. - WritableInternalTimeContext: remove the watermark check in registerOnTimeInternal() so that timers can be registered for past times. Such timers fire immediately at the next watermark advance, including when registered from within onTimer(). The previous guard also had an unintended side effect: any call to replaceNamedTimer() with a past time would delete the existing timer entry but then silently drop the new registration, leaving the named timer in a state where it appeared un-registered but the old timer was gone. - AbstractProcessTableOperator: remove the fired named timer's state entry before invoking onTimer() to prevent stale entries from accumulating in the named timers map state. Tests are updated to reflect the new semantics: - PROCESS_LATE_EVENTS: demonstrates that late events enter eval(), can register timers (including for past times), and that such timers fire immediately at the next watermark advance. - PROCESS_ROW_LATE_EVENTS (new): verifies the same for row-semantics PTFs. - PROCESS_OPTIONAL_ON_TIME / PROCESS_NAMED_TIMERS: updated to reflect that timer registrations for past times are no longer dropped. Previously, once the watermark passed the registered time, the timer was silently discarded; now it fires immediately. --- .../stream/ProcessTableFunctionSemanticTests.java | 1 + .../stream/ProcessTableFunctionTestPrograms.java | 77 +++++++++++++++------- .../exec/stream/ProcessTableFunctionTestUtils.java | 26 +++++--- .../runtime/generated/ProcessTableRunner.java | 4 -- .../process/AbstractProcessTableOperator.java | 6 ++ .../process/WritableInternalTimeContext.java | 5 -- 6 files changed, 78 insertions(+), 41 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java index cace4ad26c5..6fce5b569df 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionSemanticTests.java @@ -87,6 +87,7 @@ public class ProcessTableFunctionSemanticTests extends SemanticTestBase { ProcessTableFunctionTestPrograms.PROCESS_NAMED_TIMERS, ProcessTableFunctionTestPrograms.PROCESS_UNNAMED_TIMERS, ProcessTableFunctionTestPrograms.PROCESS_LATE_EVENTS, + ProcessTableFunctionTestPrograms.PROCESS_ROW_LATE_EVENTS, ProcessTableFunctionTestPrograms.PROCESS_SCALAR_ARGS_TIME, ProcessTableFunctionTestPrograms.PROCESS_OPTIONAL_PARTITION_BY_TIME, ProcessTableFunctionTestPrograms.PROCESS_OPTIONAL_ON_TIME, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java index e65dd8319f1..2153481e15e 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestPrograms.java @@ -48,6 +48,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctio import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.PojoStateFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.PojoStateTimeFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.PojoWithDefaultStateFunction; +import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.RequiredTimeFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.RowSemanticTableFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.RowSemanticTablePassThroughFunction; import org.apache.flink.table.planner.plan.nodes.exec.stream.ProcessTableFunctionTestUtils.ScalarArgsFunction; @@ -1072,8 +1073,8 @@ public class ProcessTableFunctionTestPrograms { "+I[Bob, {Processing input row +I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark null}, 1970-01-01T00:00:00.004Z]", "+I[Bob, {Processing input row +I[Bob, 5, 1970-01-01T00:00:00.005Z] at time 5 watermark null}, 1970-01-01T00:00:00.005Z]", "+I[Bob, {Processing input row +I[Bob, 6, 1970-01-01T00:00:00.006Z] at time 6 watermark null}, 1970-01-01T00:00:00.006Z]", - "+I[Bob, {Timer timeout2 fired at time 2 watermark 9223372036854775807}, 1970-01-01T00:00:00.002Z]", - "+I[Bob, {Clearing all timers at time 2 watermark 9223372036854775807}, 1970-01-01T00:00:00.002Z]") + "+I[Bob, {Timer timeout2 fired at time 5 watermark 9223372036854775807}, 1970-01-01T00:00:00.005Z]", + "+I[Bob, {Clearing all timers at time 5 watermark 9223372036854775807}, 1970-01-01T00:00:00.005Z]") .build()) .runSql( "INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name, on_time => DESCRIPTOR(ts))") @@ -1112,7 +1113,7 @@ public class ProcessTableFunctionTestPrograms { public static final TableTestProgram PROCESS_LATE_EVENTS = TableTestProgram.of( "process-late-events", - "test that late events are dropped in both input and when registering timers") + "test that late events enter PTF (eval and timer registration)") .setupTemporarySystemFunction("f", LateTimersFunction.class) .setupTableSource(TIMED_SOURCE_LATE_EVENTS) .setupTableSink( @@ -1120,27 +1121,53 @@ public class ProcessTableFunctionTestPrograms { .addSchema(KEYED_TIMED_BASE_SINK_SCHEMA) .consumedValues( "+I[Bob, {Processing input row +I[Bob, 1, 1970-01-01T00:00:00Z] at time 0 watermark null}, 1970-01-01T00:00:00Z]", - "+I[Bob, {Registering timer t for 0 at time 0 watermark null}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Registering timer bob for 0 at time 0 watermark null}, 1970-01-01T00:00:00Z]", "+I[Bob, {Registering timer for 0 at time 0 watermark null}, 1970-01-01T00:00:00Z]", "+I[Alice, {Processing input row +I[Alice, 1, 1970-01-01T00:00:00.001Z] at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]", - "+I[Alice, {Registering timer t for 0 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]", - "+I[Alice, {Registering timer for 0 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]", + "+I[Alice, {Registering timer alice for 1 at time 1 watermark -1}, 1970-01-01T00:00:00.001Z]", "+I[Bob, {Timer null fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Bob, {Registering timer again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Alice, {Timer null fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Alice, {Registering timer again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Bob, {Timer t fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Bob, {Registering timer again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Alice, {Timer t fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]", - "+I[Alice, {Registering timer again for 0 at time 0 watermark 0}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Timer bob fired at time 0 watermark 0}, 1970-01-01T00:00:00Z]", "+I[Bob, {Processing input row +I[Bob, 2, 1970-01-01T00:01:39.999Z] at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]", - "+I[Bob, {Registering timer t for 0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]", - "+I[Bob, {Registering timer for 0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]") + "+I[Bob, {Registering timer bob for 0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]", + "+I[Bob, {Registering timer for 0 at time 99999 watermark 0}, 1970-01-01T00:01:39.999Z]", + "+I[Bob, {Timer null fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Timer bob fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]", + "+I[Alice, {Timer alice fired at time 1 watermark 99998}, 1970-01-01T00:00:00.001Z]", + "+I[Alice, {Registering timer alice-again for 0 at time 1 watermark 99998}, 1970-01-01T00:00:00.001Z]", + "+I[Alice, {Timer alice-again fired at time 0 watermark 99998}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Processing input row +I[Bob, 3, 1970-01-01T00:00:00.003Z] at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]", + "+I[Bob, {Registering timer bob for 0 at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]", + "+I[Bob, {Registering timer for 0 at time 3 watermark 99998}, 1970-01-01T00:00:00.003Z]", + "+I[Bob, {Processing input row +I[Bob, 4, 1970-01-01T00:00:00.004Z] at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]", + "+I[Bob, {Registering timer bob for 0 at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]", + "+I[Bob, {Registering timer for 0 at time 4 watermark 99998}, 1970-01-01T00:00:00.004Z]", + "+I[Bob, {Timer null fired at time 0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]", + "+I[Bob, {Timer bob fired at time 0 watermark 9223372036854775807}, 1970-01-01T00:00:00Z]") .build()) .runSql( "INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name, on_time => DESCRIPTOR(ts))") .build(); + public static final TableTestProgram PROCESS_ROW_LATE_EVENTS = + TableTestProgram.of( + "process-row-late-events", + "test that late events enter a row-level PTF") + .setupTemporarySystemFunction("f", RequiredTimeFunction.class) + .setupTableSource(TIMED_SOURCE_LATE_EVENTS) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema(TIMED_BASE_SINK_SCHEMA) + .consumedValues( + "+I[{+I[Bob, 1, 1970-01-01T00:00:00Z]}, 1970-01-01T00:00:00Z]", + "+I[{+I[Alice, 1, 1970-01-01T00:00:00.001Z]}, 1970-01-01T00:00:00.001Z]", + "+I[{+I[Bob, 2, 1970-01-01T00:01:39.999Z]}, 1970-01-01T00:01:39.999Z]", + "+I[{+I[Bob, 3, 1970-01-01T00:00:00.003Z]}, 1970-01-01T00:00:00.003Z]", + "+I[{+I[Bob, 4, 1970-01-01T00:00:00.004Z]}, 1970-01-01T00:00:00.004Z]") + .build()) + .runSql( + "INSERT INTO sink SELECT * FROM f(r => TABLE t, on_time => DESCRIPTOR(ts))") + .build(); + public static final TableTestProgram PROCESS_SCALAR_ARGS_TIME = TableTestProgram.of( "process-scalar-args-time", @@ -1184,7 +1211,7 @@ public class ProcessTableFunctionTestPrograms { public static final TableTestProgram PROCESS_OPTIONAL_ON_TIME = TableTestProgram.of( "process-optional-on-time", - "test optional time attribute, fire once for constant timer") + "test optional time attribute, re-fires timer for constant timer registration after time passed") .setupTemporarySystemFunction("f", OptionalOnTimeFunction.class) .setupTableSource(TIMED_SOURCE) .setupTableSink( @@ -1192,21 +1219,25 @@ public class ProcessTableFunctionTestPrograms { .addSchema(KEYED_BASE_SINK_SCHEMA) .consumedValues( "+I[Bob, {Processing input row +I[Bob, 1, 1970-01-01T00:00:00Z] at time null watermark null}]", - "+I[Bob, {Registering timer t for 2 at time null watermark null}]", + "+I[Bob, {Registering timer t for 1 at time null watermark null}]", "+I[Alice, {Processing input row +I[Alice, 1, 1970-01-01T00:00:00.001Z] at time null watermark -1}]", - "+I[Alice, {Registering timer t for 2 at time null watermark -1}]", + "+I[Alice, {Registering timer t for 1 at time null watermark -1}]", "+I[Bob, {Processing input row +I[Bob, 2, 1970-01-01T00:00:00.002Z] at time null watermark 0}]", - "+I[Bob, {Registering timer t for 2 at time null watermark 0}]", + "+I[Bob, {Registering timer t for 1 at time null watermark 0}]", + "+I[Alice, {Timer t fired at time 1 watermark 1}]", + "+I[Bob, {Timer t fired at time 1 watermark 1}]", "+I[Bob, {Processing input row +I[Bob, 3, 1970-01-01T00:00:00.003Z] at time null watermark 1}]", "+I[Bob, {Registering timer t for 2 at time null watermark 1}]", - "+I[Alice, {Timer t fired at time 2 watermark 2}]", "+I[Bob, {Timer t fired at time 2 watermark 2}]", "+I[Bob, {Processing input row +I[Bob, 4, 1970-01-01T00:00:00.004Z] at time null watermark 2}]", - "+I[Bob, {Registering timer t for 2 at time null watermark 2}]", + "+I[Bob, {Registering timer t for 3 at time null watermark 2}]", + "+I[Bob, {Timer t fired at time 3 watermark 3}]", "+I[Bob, {Processing input row +I[Bob, 5, 1970-01-01T00:00:00.005Z] at time null watermark 3}]", - "+I[Bob, {Registering timer t for 2 at time null watermark 3}]", + "+I[Bob, {Registering timer t for 4 at time null watermark 3}]", + "+I[Bob, {Timer t fired at time 4 watermark 4}]", "+I[Bob, {Processing input row +I[Bob, 6, 1970-01-01T00:00:00.006Z] at time null watermark 4}]", - "+I[Bob, {Registering timer t for 2 at time null watermark 4}]") + "+I[Bob, {Registering timer t for 5 at time null watermark 4}]", + "+I[Bob, {Timer t fired at time 5 watermark 5}]") .build()) .runSql("INSERT INTO sink SELECT * FROM f(r => TABLE t PARTITION BY name)") .build(); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java index cab52688050..57c15bdc444 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java @@ -610,20 +610,23 @@ public class ProcessTableFunctionTestUtils { public void eval(Context ctx, @ArgumentHint({SET_SEMANTIC_TABLE, REQUIRE_ON_TIME}) Row r) { final TimeContext<Long> timeCtx = ctx.timeContext(Long.class); collectEvalEvent(timeCtx, r); - // all timers should be executed once - if (timeCtx.time() == 99998) { - // will never be fired because it's late - collectCreateTimer(timeCtx, "late", 1L); + if (r.getFieldAs("name").equals("Bob")) { + // Bob registers timers at 0 + collectCreateTimer(timeCtx, "bob", 0L); + collectCreateTimer(timeCtx, 0L); + } else { + // Alice registers timer at 1 + collectCreateTimer(timeCtx, "alice", 1L); } - collectCreateTimer(timeCtx, "t", 0L); - collectCreateTimer(timeCtx, 0L); } public void onTimer(OnTimerContext ctx) { final TimeContext<Long> timeCtx = ctx.timeContext(Long.class); collectOnTimerEvent(ctx); - // will never be fired because it's late - collectCreateTimer(timeCtx, "again", timeCtx.time()); + if (ctx.currentTimer() != null && ctx.currentTimer().equals("alice")) { + // register a timer in the past, which should fire immediately + collectCreateTimer(timeCtx, "alice-again", 0); + } } } @@ -664,7 +667,12 @@ public class ProcessTableFunctionTestUtils { public void eval(Context ctx, @ArgumentHint(SET_SEMANTIC_TABLE) Row r) { final TimeContext<Long> timeCtx = ctx.timeContext(Long.class); collectEvalEvent(timeCtx, r); - collectCreateTimer(timeCtx, "t", 2); + Long wm = timeCtx.currentWatermark(); + // Register at wm+1 to always target the immediate next watermark: the timer fires + // exactly once per watermark advance, and each new row re-registers the timer for the + // following watermark step, demonstrating repeated timer re-registration. + long timer = wm == null || wm < 0 ? 1 : wm + 1; + collectCreateTimer(timeCtx, "t", timer); } public void onTimer(OnTimerContext ctx) { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java index eda3a6c2442..8aa7f046382 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java @@ -153,10 +153,6 @@ public abstract class ProcessTableRunner extends AbstractRichFunction { } public void processEval() throws Exception { - // Drop late events - if (rowtime != null && rowtime <= currentWatermark) { - return; - } processMethod(this::callEval); } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java index 5432a826cc2..21b33337efc 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java @@ -154,6 +154,12 @@ public abstract class AbstractProcessTableOperator extends AbstractStreamOperato @Override public void onEventTime(InternalTimer<RowData, Object> timer) throws Exception { final Object namedTimer = timer.getNamespace(); + // Remove the fired timer's state entry immediately to prevent stale entries from + // accumulating. Without this, entries for fired timers would persist until the same + // timer name is re-registered or the state is explicitly cleared. + if (namedTimersMapState != null && namedTimer != VoidNamespace.INSTANCE) { + namedTimersMapState.remove((StringData) namedTimer); + } processTableRunner.ingestTimerEvent( timer.getKey(), namedTimer == VoidNamespace.INSTANCE ? null : (StringData) namedTimer, diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java index 5fd996fecaa..ce97c630c80 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/WritableInternalTimeContext.java @@ -87,11 +87,6 @@ class WritableInternalTimeContext extends ReadableInternalTimeContext { } private void registerOnTimeInternal(@Nullable String name, long newTime) { - if (newTime <= unnamedTimerService.currentWatermark()) { - // Do not register timers for late events. - // Otherwise, the next watermark would trigger an onTimer() that emits late events. - return; - } if (name != null) { replaceNamedTimer(StringData.fromString(name), newTime); } else {
