This is an automated email from the ASF dual-hosted git repository. fhueske pushed a commit to branch fhueske-FLINK-39437-Support-interruptible-timers-in-PTFs in repository https://gitbox.apache.org/repos/asf/flink.git
commit bc076574596faed334d1809f7d5a571842003176 Author: Fabian Hueske <[email protected]> AuthorDate: Fri Apr 17 17:33:26 2026 +0200 [FLINK-39437][table] Support interruptible timers in PTFs Override `useInterruptibleTimers()` in `AbstractProcessTableOperator` to return `true`, activating the `MailboxWatermarkProcessor` for PTF operators. This allows timer firing to be interrupted between mailbox iterations, improving throughput by not blocking mailbox processing during large timer batches. Also reorder `processWatermark()` to call `ingestWatermarkEvent()` before `super.processWatermark()`, ensuring all timer callbacks (including those deferred across mailbox iterations) see a consistent watermark in the runner. This matches the behavior of `WritableInternalTimeContext.currentWatermark()`, which reads from the timer service and already sees the new watermark before any timer fires. --- .../operators/process/AbstractProcessTableOperator.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java index 5432a826cc2..55e1d734174 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java @@ -33,6 +33,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerService; @@ -145,10 +146,20 @@ public abstract class AbstractProcessTableOperator extends AbstractStreamOperato FunctionUtils.openFunction(processTableRunner, DefaultOpenContext.INSTANCE); } + @Override + public boolean useInterruptibleTimers(ReadableConfig config) { + return true; + } + @Override public void processWatermark(Watermark mark) throws Exception { - super.processWatermark(mark); + // Update the runner's watermark before firing timers so that all timer callbacks + // (including those deferred across mailbox iterations by interruptible timers) see + // a consistent watermark. This matches WritableInternalTimeContext.currentWatermark(), + // which reads from the timer service and is also set to the new watermark before + // any timer fires. processTableRunner.ingestWatermarkEvent(mark.getTimestamp()); + super.processWatermark(mark); } @Override
