This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch 
fhueske-FLINK-39437-Support-interruptible-timers-in-PTFs
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bc076574596faed334d1809f7d5a571842003176
Author: Fabian Hueske <[email protected]>
AuthorDate: Fri Apr 17 17:33:26 2026 +0200

    [FLINK-39437][table] Support interruptible timers in PTFs
    
    Override `useInterruptibleTimers()` in `AbstractProcessTableOperator` to
    return `true`, activating the `MailboxWatermarkProcessor` for PTF operators.
    This allows timer firing to be interrupted between mailbox iterations,
    improving throughput by not blocking mailbox processing during large timer
    batches.
    
    Also reorder `processWatermark()` to call `ingestWatermarkEvent()` before
    `super.processWatermark()`, ensuring all timer callbacks (including those
    deferred across mailbox iterations) see a consistent watermark in the 
runner.
    This matches the behavior of 
`WritableInternalTimeContext.currentWatermark()`,
    which reads from the timer service and already sees the new watermark before
    any timer fires.
---
 .../operators/process/AbstractProcessTableOperator.java     | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
index 5432a826cc2..55e1d734174 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/AbstractProcessTableOperator.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
@@ -145,10 +146,20 @@ public abstract class AbstractProcessTableOperator 
extends AbstractStreamOperato
         FunctionUtils.openFunction(processTableRunner, 
DefaultOpenContext.INSTANCE);
     }
 
+    @Override
+    public boolean useInterruptibleTimers(ReadableConfig config) {
+        return true;
+    }
+
     @Override
     public void processWatermark(Watermark mark) throws Exception {
-        super.processWatermark(mark);
+        // Update the runner's watermark before firing timers so that all 
timer callbacks
+        // (including those deferred across mailbox iterations by 
interruptible timers) see
+        // a consistent watermark. This matches 
WritableInternalTimeContext.currentWatermark(),
+        // which reads from the timer service and is also set to the new 
watermark before
+        // any timer fires.
         processTableRunner.ingestWatermarkEvent(mark.getTimestamp());
+        super.processWatermark(mark);
     }
 
     @Override

Reply via email to