This is an automated email from the ASF dual-hosted git repository.
fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 5106d15e83d [FLINK-39437][table] Support interruptible timers in PTFs
(#27962)
5106d15e83d is described below
commit 5106d15e83d4497824c33b71fd0021846bd1ec09
Author: Fabian Hueske <[email protected]>
AuthorDate: Wed Apr 22 15:49:37 2026 +0200
[FLINK-39437][table] Support interruptible timers in PTFs (#27962)
Override `useInterruptibleTimers()` in `AbstractProcessTableOperator` to
return `true`, activating the `MailboxWatermarkProcessor` for PTF operators.
This allows timer firing to be interrupted between mailbox iterations,
improving throughput by not blocking mailbox processing during large timer
batches.
Also removes `currentWatermark` from `ProcessTableRunner` and redirect
reads to
`AbstractStreamOperatorV2.combinedWatermark`.
Add a test (`ProcessSetTableOperatorInterruptibleTimersTest`) to assert
correct behavior
of PTF framework when timer processing is interrupted.
---
.../runtime/generated/ProcessTableRunner.java | 9 -
.../process/AbstractProcessTableOperator.java | 10 +-
.../operators/process/ProcessRowTableOperator.java | 13 +-
...essSetTableOperatorInterruptibleTimersTest.java | 352 +++++++++++++++++++++
4 files changed, 371 insertions(+), 13 deletions(-)
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
index b90a201649e..5477fc4523f 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/ProcessTableRunner.java
@@ -65,7 +65,6 @@ public abstract class ProcessTableRunner extends
AbstractRichFunction {
// Current time
private long tableWatermark = Long.MIN_VALUE;
- private long currentWatermark = Long.MIN_VALUE;
private @Nullable Long rowtime;
private @Nullable StringData timerName;
@@ -131,10 +130,6 @@ public abstract class ProcessTableRunner extends
AbstractRichFunction {
tableWatermark = Long.MIN_VALUE;
}
- public void ingestCurrentWatermarkEvent(long watermark) {
- currentWatermark = watermark;
- }
-
public void clearAllState() {
Arrays.fill(stateCleared, true);
}
@@ -143,10 +138,6 @@ public abstract class ProcessTableRunner extends
AbstractRichFunction {
stateCleared[statePos] = true;
}
- public long getCurrentWatermark() {
- return currentWatermark;
- }
-
public long getTableWatermark() {
return tableWatermark;
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
index cf3a6149af6..d3b58b5c886 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
@@ -145,11 +146,14 @@ public abstract class AbstractProcessTableOperator
extends AbstractStreamOperato
FunctionUtils.openFunction(processTableRunner,
DefaultOpenContext.INSTANCE);
}
+ @Override
+ public final boolean useInterruptibleTimers(ReadableConfig config) {
+ return true;
+ }
+
@Override
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
- // TODO this line has issues with interruptible timers, see FLINK-39437
- processTableRunner.ingestCurrentWatermarkEvent(mark.getTimestamp());
}
@Override
@@ -220,7 +224,7 @@ public abstract class AbstractProcessTableOperator extends
AbstractStreamOperato
internalTimeContext.setTime(
processTableRunner.getTableWatermark(),
- processTableRunner.getCurrentWatermark(),
+ combinedWatermark.getCombinedWatermark(),
processTableRunner.getTime());
return (TimeContext<TimeType>)
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessRowTableOperator.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessRowTableOperator.java
index 1964b2d2376..80528c1e63f 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessRowTableOperator.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/ProcessRowTableOperator.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.runtime.operators.process;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
@@ -82,11 +83,21 @@ public class ProcessRowTableOperator extends
AbstractProcessTableOperator
0,
element.getValue(),
inputSemantics.timeColumn(),
- processTableRunner.getCurrentWatermark());
+ combinedWatermark.getCombinedWatermark());
}
processTableRunner.processEval();
}
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ // Advance combinedWatermark before super.processWatermark(mark):
super.processWatermark()
+ // fires due timers, which read the watermark via timeContext(). V2's
OneInputStreamOperator
+ // watermark path doesn't update combinedWatermark itself — only
reportWatermark()
+ // (multi-input) does — so we do it here.
+ combinedWatermark.updateWatermark(0, mark.getTimestamp());
+ super.processWatermark(mark);
+ }
+
@Override
public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws
Exception {
super.processWatermarkStatus(watermarkStatus, 1);
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java
new file mode 100644
index 00000000000..be390ab5f55
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.process;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializerConfigImpl;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
+import
org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.ProcessTableFunction.TimeContext;
+import org.apache.flink.table.runtime.generated.HashFunction;
+import org.apache.flink.table.runtime.generated.ProcessTableRunner;
+import org.apache.flink.table.runtime.generated.RecordComparator;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests that {@link ProcessSetTableOperator} drives {@link
ProcessTableRunner} correctly when user
+ * timers are fired under unaligned checkpoints, both with and without
interruptible timers.
+ *
+ * <p>A minimal {@link ProcessTableRunner} is used: {@link
ProcessTableRunner#callEval()} emits a
+ * "record-$ts@wm$wm" row via {@code evalCollector} and registers an unnamed
event-time timer at the
+ * row's event time (plus an additional named timer for the row at ts=1000);
{@link
+ * ProcessTableRunner#callOnTimer()} emits a "fired-[name-]$ts@wm$wm" row via
{@code
+ * onTimerCollector}. The operator is wrapped so that after each timer firing
a non-deferrable
+ * "mail-[name-]$ts" mailbox mail is scheduled — the PTF user API does not
expose the mailbox, so
+ * this is the only way to observe externally whether the mailbox is drained
between timer firings.
+ *
+ * <p>With interruptible timers enabled, the mailbox is drained between
successive timer firings, so
+ * each "fired" row is immediately followed by its "mail" row. With
interruptible timers disabled,
+ * all timers in a batch fire first (and the watermark is emitted downstream)
before any of the
+ * scheduled mails are processed. The watermark values on "record" and "fired"
rows also validate
+ * that {@link TimeContext#currentWatermark()} advances consistently across
input processing and
+ * timer firing.
+ */
+class ProcessSetTableOperatorInterruptibleTimersTest {
+
+ private static final DataType INPUT_TYPE =
+ DataTypes.ROW(
+ DataTypes.FIELD("k", DataTypes.BIGINT()),
+ DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)));
+
+ // Output layout is driven by PassPartitionKeysCollector /
PassAllCollector in
+ // AbstractProcessTableOperator: [partition-key, PTF-emitted label,
rowtime].
+ private static final DataType OUTPUT_TYPE =
+ DataTypes.ROW(
+ DataTypes.FIELD("k", DataTypes.BIGINT()),
+ DataTypes.FIELD("label", DataTypes.STRING()),
+ DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)));
+
+ private static final InternalTypeInfo<RowData> INPUT_TYPE_INFO =
+ InternalTypeInfo.of(INPUT_TYPE.getLogicalType());
+
+ private static final InternalTypeInfo<RowData> OUTPUT_TYPE_INFO =
+ InternalTypeInfo.of(OUTPUT_TYPE.getLogicalType());
+
+ private static final RowDataKeySelector KEY_SELECTOR =
+ HandwrittenSelectorUtil.getRowDataSelector(
+ new int[] {0}, INPUT_TYPE_INFO.toRowFieldTypes());
+
+ private static final long KEY = 42L;
+
+ private static final String NAMED_TIMER = "n";
+
+ @ParameterizedTest(name = "interruptibleTimers={0}")
+ @ValueSource(booleans = {true, false})
+ void testTimersThroughProcessTableRunner(boolean interruptibleTimers)
throws Exception {
+ try (StreamTaskMailboxTestHarness<RowData> harness =
createHarness(interruptibleTimers)) {
+ // Each input row registers one event-time timer at its own
timestamp. The row at
+ // ts=1000 additionally registers a named timer at the same
timestamp, so the first
+ // batch contains three unnamed firings and one named firing at
identical timestamps
+ // (ts=1000) — exercising both timer services advancing together.
+ for (long ts = 1000L; ts <= 3000L; ts += 1000L) {
+ harness.processElement(
+ new StreamRecord<>(
+ GenericRowData.of(KEY,
TimestampData.fromEpochMillis(ts)), ts));
+ }
+ // Advance watermark past registered timers (first batch: three
unnamed + one named).
+ harness.processElement(new Watermark(5000L));
+
+ // One more input row followed by a second watermark advance
(second batch: one
+ // unnamed timer only).
+ harness.processElement(
+ new StreamRecord<>(
+ GenericRowData.of(KEY,
TimestampData.fromEpochMillis(6000L)), 6000L));
+ harness.processElement(new Watermark(7000L));
+
+ List<String> labels =
+ harness.getOutput().stream()
+
.map(ProcessSetTableOperatorInterruptibleTimersTest::describe)
+ .collect(Collectors.toList());
+
+ // Records emitted from callEval carry the watermark visible
during eval, which is null
+ // before any watermark has arrived and 5000 for the record
processed between the two
+ // watermarks.
+ if (interruptibleTimers) {
+ // With interruptible timers, the mailbox is drained between
successive timer
+ // firings, so each "fired-[name-]$ts" is immediately followed
by its matching
+ // "mail-[name-]$ts". The watermark is only emitted downstream
once all firings in
+ // the batch have completed.
+ assertThat(labels)
+ .containsExactly(
+ recordLabel(1000L, null),
+ recordLabel(2000L, null),
+ recordLabel(3000L, null),
+ firedLabel(null, 1000L, 5000L),
+ mailLabel(null, 1000L),
+ firedLabel(null, 2000L, 5000L),
+ mailLabel(null, 2000L),
+ firedLabel(null, 3000L, 5000L),
+ mailLabel(null, 3000L),
+ firedLabel(NAMED_TIMER, 1000L, 5000L),
+ mailLabel(NAMED_TIMER, 1000L),
+ watermarkLabel(5000L),
+ recordLabel(6000L, 5000L),
+ firedLabel(null, 6000L, 7000L),
+ mailLabel(null, 6000L),
+ watermarkLabel(7000L));
+ } else {
+ // Without interruptible timers, all timers in a batch fire
synchronously (emitting
+ // their "fired" rows) before the watermark is emitted
downstream. Only afterwards
+ // does the mailbox drain the scheduled "mail" rows in FIFO
order.
+ assertThat(labels)
+ .containsExactly(
+ recordLabel(1000L, null),
+ recordLabel(2000L, null),
+ recordLabel(3000L, null),
+ firedLabel(null, 1000L, 5000L),
+ firedLabel(null, 2000L, 5000L),
+ firedLabel(null, 3000L, 5000L),
+ firedLabel(NAMED_TIMER, 1000L, 5000L),
+ watermarkLabel(5000L),
+ mailLabel(null, 1000L),
+ mailLabel(null, 2000L),
+ mailLabel(null, 3000L),
+ mailLabel(NAMED_TIMER, 1000L),
+ recordLabel(6000L, 5000L),
+ firedLabel(null, 6000L, 7000L),
+ watermarkLabel(7000L),
+ mailLabel(null, 6000L));
+ }
+ }
+ }
+
+ private StreamTaskMailboxTestHarness<RowData> createHarness(boolean
interruptibleTimers)
+ throws Exception {
+ return new StreamTaskMailboxTestHarnessBuilder<>(
+ MultipleInputStreamTask::new, OUTPUT_TYPE_INFO)
+ .addJobConfig(CheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofSeconds(1))
+ .addJobConfig(CheckpointingOptions.ENABLE_UNALIGNED, true)
+ .addJobConfig(
+
CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS,
+ interruptibleTimers)
+ .setKeyType(KEY_SELECTOR.getProducedType())
+ .addInput(INPUT_TYPE_INFO, 1, KEY_SELECTOR)
+ .setupOperatorChain(new TestOperatorFactory())
+ .name("ptf")
+ .finishForSingletonOperatorChain(
+ OUTPUT_TYPE_INFO.createSerializer(new
SerializerConfigImpl()))
+ .build();
+ }
+
+ private static String describe(Object obj) {
+ if (obj instanceof Watermark) {
+ return watermarkLabel(((Watermark) obj).getTimestamp());
+ } else if (obj instanceof StreamRecord) {
+ Object value = ((StreamRecord<?>) obj).getValue();
+ if (value instanceof RowData) {
+ // Field 1 is the PTF-emitted label (see OUTPUT_TYPE); fields
0 and 2 are the
+ // pass-through partition key and rowtime.
+ return ((RowData) value).getString(1).toString();
+ }
+ return value.toString();
+ }
+ return obj.toString();
+ }
+
+ private static String recordLabel(long ts, @Nullable Long watermark) {
+ return "record-" + ts + "@wm" + (watermark == null ? "null" :
watermark);
+ }
+
+ private static String firedLabel(@Nullable String name, long ts, long
watermark) {
+ return "fired-" + (name == null ? "" : name + "-") + ts + "@wm" +
watermark;
+ }
+
+ private static String mailLabel(@Nullable String name, long ts) {
+ return "mail-" + (name == null ? "" : name + "-") + ts;
+ }
+
+ private static String watermarkLabel(long ts) {
+ return "Watermark@" + ts;
+ }
+
+ private static RuntimeTableSemantics tableSemantics() {
+ return new RuntimeTableSemantics(
+ "r",
+ 0,
+ INPUT_TYPE,
+ new int[] {0},
+ new int[0],
+ new RuntimeTableSemantics.SortDirection[0],
+ RuntimeChangelogMode.serialize(ChangelogMode.insertOnly()),
+ /* passColumnsThrough */ false,
+ /* hasSetSemantics */ true,
+ /* timeColumn */ 1);
+ }
+
+ //
--------------------------------------------------------------------------------------------
+ // Factory and operator
+ //
--------------------------------------------------------------------------------------------
+
+ private static class TestOperatorFactory extends
AbstractStreamOperatorFactory<RowData> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T extends StreamOperator<RowData>> T createStreamOperator(
+ StreamOperatorParameters<RowData> parameters) {
+ return (T) new TestProcessSetTableOperator(parameters);
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public Class<? extends StreamOperator>
getStreamOperatorClass(ClassLoader classLoader) {
+ return TestProcessSetTableOperator.class;
+ }
+ }
+
+ /**
+ * {@link ProcessSetTableOperator} with a test-only override of {@link
#onEventTime} that
+ * schedules a non-deferrable "mail-[name-]$ts" emission after each timer
firing. The PTF user
+ * API does not expose the mailbox; this test-only shortcut is the only
way to observe whether
+ * interruption happens between timer firings driven by {@link
+ * ProcessTableRunner#callOnTimer()}.
+ */
+ private static class TestProcessSetTableOperator extends
ProcessSetTableOperator {
+
+ private final MailboxExecutor mailboxExecutor;
+
+ TestProcessSetTableOperator(StreamOperatorParameters<RowData>
parameters) {
+ super(
+ parameters,
+ List.of(tableSemantics()),
+ List.of(),
+ // No ORDER BY on the input, so the comparator slot is
null (the operator
+ // short-circuits sort-buffer setup when orderByColumns is
empty). The array
+ // size must still match the number of inputs.
+ new RecordComparator[] {null},
+ new TestProcessTableRunner(),
+ new HashFunction[0],
+ new RecordEqualiser[0],
+ RuntimeChangelogMode.serialize(ChangelogMode.insertOnly()),
+ List.of());
+ this.mailboxExecutor = parameters.getMailboxExecutor();
+ }
+
+ @Override
+ public void onEventTime(InternalTimer<RowData, Object> timer) throws
Exception {
+ // Runs the full PTF path: ingestTimerEvent -> processOnTimer ->
callOnTimer, emitting
+ // "fired-[name-]$ts" via onTimerCollector.
+ super.onEventTime(timer);
+ long ts = timer.getTimestamp();
+ long keyValue = timer.getKey().getLong(0);
+ final Object ns = timer.getNamespace();
+ final String name = (ns instanceof StringData) ? ns.toString() :
null;
+ mailboxExecutor.execute(
+ () ->
+ output.collect(
+ new StreamRecord<>(
+ GenericRowData.of(
+ keyValue,
+
StringData.fromString(mailLabel(name, ts)),
+
TimestampData.fromEpochMillis(ts)))),
+ mailLabel(name, ts));
+ }
+ }
+
+ /**
+ * Minimal {@link ProcessTableRunner}: {@link #callEval} emits a
"record-$ts@wm$wm" row and
+ * registers an unnamed event-time timer at the input row's event time
(plus a named timer for
+ * the row at ts=1000); {@link #callOnTimer} emits a
"fired-[name-]$ts@wm$wm" row.
+ */
+ private static class TestProcessTableRunner extends ProcessTableRunner {
+
+ @Override
+ public void callEval() {
+ final TimeContext<Long> tc = runnerContext.timeContext(Long.class);
+ final long ts = Objects.requireNonNull(tc.time(), "input row event
time");
+ final Long wm = tc.currentWatermark();
+ tc.registerOnTime(ts);
+ if (ts == 1000L) {
+ tc.registerOnTime(NAMED_TIMER, ts);
+ }
+
evalCollector.collect(GenericRowData.of(StringData.fromString(recordLabel(ts,
wm))));
+ }
+
+ @Override
+ public void callOnTimer() {
+ final TimeContext<Long> tc =
runnerOnTimerContext.timeContext(Long.class);
+ final long ts = Objects.requireNonNull(tc.time(), "timer fire
time");
+ final long wm =
+ Objects.requireNonNull(tc.currentWatermark(), "watermark
at timer fire");
+ final String name = runnerOnTimerContext.currentTimer();
+ onTimerCollector.collect(
+ GenericRowData.of(StringData.fromString(firedLabel(name,
ts, wm))));
+ }
+ }
+}