This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 83496f49acde4cdd5c3e23ac59d7683f259f1ce9
Author: Piotr Nowojski <[email protected]>
AuthorDate: Tue Mar 24 18:20:07 2026 +0100

    [FLINK-39318][table] Fix interruptible timers are not working for 
TemporalJoin
---
 ...seTwoInputStreamOperatorWithStateRetention.java |   1 +
 ...oInputStreamOperatorWithStateRetentionTest.java | 235 +++++++++++++++++++++
 2 files changed, 236 insertions(+)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java
index ae4b8b0e6d1..86407e510ea 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/temporal/BaseTwoInputStreamOperatorWithStateRetention.java
@@ -89,6 +89,7 @@ public abstract class 
BaseTwoInputStreamOperatorWithStateRetention
     @Override
     public void open() throws Exception {
         initializeTimerService();
+        super.open();
 
         if (stateCleaningEnabled) {
             ValueStateDescriptor<Long> cleanupStateDescriptor =
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/BaseTwoInputStreamOperatorWithStateRetentionTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/BaseTwoInputStreamOperatorWithStateRetentionTest.java
new file mode 100644
index 00000000000..c0e5f705dc9
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/temporal/BaseTwoInputStreamOperatorWithStateRetentionTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.join.temporal;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.YieldingOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
+import 
org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
+import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.utils.HandwrittenSelectorUtil;
+
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests that {@link BaseTwoInputStreamOperatorWithStateRetention} supports 
interruptible timers
+ * with unaligned checkpoints.
+ */
+class BaseTwoInputStreamOperatorWithStateRetentionTest {
+
+    private static final InternalTypeInfo<RowData> ROW_TYPE =
+            InternalTypeInfo.ofFields(new BigIntType(), 
VarCharType.STRING_TYPE);
+
+    private static final RowDataKeySelector KEY_SELECTOR =
+            HandwrittenSelectorUtil.getRowDataSelector(new int[] {0}, 
ROW_TYPE.toRowFieldTypes());
+
+    @Test
+    void testInterruptibleTimersWithWatermarks() throws Exception {
+        final Instant firstWindowEnd = Instant.ofEpochMilli(1000L);
+        final int numFirstWindowTimers = 2;
+        final Instant secondWindowEnd = Instant.ofEpochMilli(2000L);
+        final int numSecondWindowTimers = 2;
+
+        try (StreamTaskMailboxTestHarness<RowData> harness =
+                createHarness(
+                        new StubOperatorWithEventTimeTimers(0, 0)
+                                .withTimers(firstWindowEnd, 
numFirstWindowTimers)
+                                .withTimers(secondWindowEnd, 
numSecondWindowTimers))) {
+            harness.processElement(new StreamRecord<>(row(1L, "register")), 0);
+            harness.processAll();
+            // Advance watermarks on both inputs so the combined watermark 
progresses.
+            harness.processElement(asWatermark(firstWindowEnd), 0);
+            harness.processElement(asWatermark(firstWindowEnd), 1);
+            harness.processElement(asWatermark(secondWindowEnd), 0);
+            harness.processElement(asWatermark(secondWindowEnd), 1);
+
+            List<String> output =
+                    harness.getOutput().stream()
+                            
.map(BaseTwoInputStreamOperatorWithStateRetentionTest::describeRecord)
+                            .collect(Collectors.toList());
+            assertThat(output)
+                    .containsExactly(
+                            firedDesc(0L),
+                            mailDesc(0L),
+                            firedDesc(1L),
+                            mailDesc(1L),
+                            watermarkDesc(firstWindowEnd),
+                            firedDesc(0L),
+                            mailDesc(0L),
+                            firedDesc(1L),
+                            mailDesc(1L),
+                            watermarkDesc(secondWindowEnd));
+        }
+    }
+
+    private StreamTaskMailboxTestHarness<RowData> createHarness(
+            StubOperatorWithEventTimeTimers operator) throws Exception {
+        return new StreamTaskMailboxTestHarnessBuilder<>(
+                        (Environment env) -> new TwoInputStreamTask<>(env), 
ROW_TYPE)
+                .addJobConfig(CheckpointingOptions.CHECKPOINTING_INTERVAL, 
Duration.ofSeconds(1))
+                .addJobConfig(CheckpointingOptions.ENABLE_UNALIGNED, true)
+                
.addJobConfig(CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS, true)
+                .setKeyType(KEY_SELECTOR.getProducedType())
+                .addInput(ROW_TYPE, 1, KEY_SELECTOR)
+                .addInput(ROW_TYPE, 1, KEY_SELECTOR)
+                .setupOperatorChain(SimpleOperatorFactory.of(operator))
+                .name("temporal-join")
+                
.finishForSingletonOperatorChain(ROW_TYPE.createSerializer(null))
+                .build();
+    }
+
+    private static RowData row(long key, String value) {
+        return GenericRowData.of(key, StringData.fromString(value));
+    }
+
+    private static Watermark asWatermark(Instant timestamp) {
+        return new Watermark(timestamp.toEpochMilli());
+    }
+
+    private static String describeRecord(Object obj) {
+        if (obj instanceof Watermark) {
+            return "Watermark@" + ((Watermark) obj).getTimestamp();
+        } else if (obj instanceof StreamRecord) {
+            Object value = ((StreamRecord<?>) obj).getValue();
+            if (value instanceof RowData) {
+                RowData row = (RowData) value;
+                return row.getLong(0) + "," + row.getString(1).toString();
+            }
+            return value.toString();
+        }
+        return obj.toString();
+    }
+
+    private static String firedDesc(long key) {
+        return key + ",fired";
+    }
+
+    private static String mailDesc(long key) {
+        return key + ",mail";
+    }
+
+    private static String watermarkDesc(Instant timestamp) {
+        return "Watermark@" + timestamp.toEpochMilli();
+    }
+
+    /**
+     * A stub {@link BaseTwoInputStreamOperatorWithStateRetention} that 
registers event-time timers
+     * and emits records when they fire, used to verify interruptible timer 
support.
+     */
+    private static class StubOperatorWithEventTimeTimers
+            extends BaseTwoInputStreamOperatorWithStateRetention
+            implements YieldingOperator<RowData> {
+
+        private final Map<Instant, Integer> timersToRegister;
+        private transient @Nullable MailboxExecutor mailboxExecutor;
+        private transient InternalTimerService<VoidNamespace> 
eventTimeTimerService;
+
+        StubOperatorWithEventTimeTimers(long minRetentionTime, long 
maxRetentionTime) {
+            this(minRetentionTime, maxRetentionTime, Collections.emptyMap());
+        }
+
+        StubOperatorWithEventTimeTimers(
+                long minRetentionTime,
+                long maxRetentionTime,
+                Map<Instant, Integer> timersToRegister) {
+            super(minRetentionTime, maxRetentionTime);
+            this.timersToRegister = timersToRegister;
+        }
+
+        @Override
+        public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
+            super.setMailboxExecutor(mailboxExecutor);
+            this.mailboxExecutor = mailboxExecutor;
+        }
+
+        @Override
+        public void open() throws Exception {
+            super.open();
+            eventTimeTimerService =
+                    getInternalTimerService("event-timers", 
VoidNamespaceSerializer.INSTANCE, this);
+        }
+
+        @Override
+        public void processElement1(StreamRecord<RowData> element) {
+            registerTimers();
+        }
+
+        @Override
+        public void processElement2(StreamRecord<RowData> element) {
+            registerTimers();
+        }
+
+        private void registerTimers() {
+            for (Map.Entry<Instant, Integer> entry : 
timersToRegister.entrySet()) {
+                for (int keyIdx = 0; keyIdx < entry.getValue(); keyIdx++) {
+                    setCurrentKey(GenericRowData.of((long) keyIdx, null));
+                    eventTimeTimerService.registerEventTimeTimer(
+                            VoidNamespace.INSTANCE, 
entry.getKey().toEpochMilli());
+                }
+            }
+        }
+
+        @Override
+        public void onEventTime(InternalTimer<Object, VoidNamespace> timer) 
throws Exception {
+            RowData key = (RowData) timer.getKey();
+            long keyValue = key.getLong(0);
+            mailboxExecutor.execute(
+                    () -> output.collect(new StreamRecord<>(row(keyValue, 
"mail"))),
+                    "mail-" + keyValue);
+            output.collect(new StreamRecord<>(row(keyValue, "fired")));
+        }
+
+        @Override
+        public void cleanupState(long time) {}
+
+        StubOperatorWithEventTimeTimers withTimers(Instant timestamp, int 
count) {
+            Map<Instant, Integer> copy = new HashMap<>(timersToRegister);
+            copy.put(timestamp, count);
+            return new StubOperatorWithEventTimeTimers(0, 0, copy);
+        }
+    }
+}

Reply via email to