This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 31daa9d9d0b4de6620de642a85a6bfd36384f619 Author: Piotr Nowojski <[email protected]> AuthorDate: Wed Mar 25 10:13:16 2026 +0100 [FLINK-39318][table] Fix interruptible timers are not working for TemporalSort --- .../operators/sort/BaseTemporalSortOperator.java | 1 + .../sort/BaseTemporalSortOperatorTest.java | 173 +++++++++++++++++++++ 2 files changed, 174 insertions(+) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BaseTemporalSortOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BaseTemporalSortOperator.java index e00dcb1ed5a..449b2a430a6 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BaseTemporalSortOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/BaseTemporalSortOperator.java @@ -46,6 +46,7 @@ abstract class BaseTemporalSortOperator extends AbstractStreamOperator<RowData> @Override public void open() throws Exception { + super.open(); InternalTimerService<VoidNamespace> internalTimerService = getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this); timerService = new SimpleTimerService(internalTimerService); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BaseTemporalSortOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BaseTemporalSortOperatorTest.java new file mode 100644 index 00000000000..65eaa89001e --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sort/BaseTemporalSortOperatorTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.sort; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.streaming.api.operators.YieldingOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness; +import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.table.utils.HandwrittenSelectorUtil; + +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.List; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests that {@link BaseTemporalSortOperator} supports interruptible timers with unaligned + * checkpoints. + */ +class BaseTemporalSortOperatorTest { + + private static final InternalTypeInfo<RowData> ROW_TYPE = + InternalTypeInfo.ofFields(new BigIntType(), VarCharType.STRING_TYPE); + + private static final RowDataKeySelector KEY_SELECTOR = + HandwrittenSelectorUtil.getRowDataSelector(new int[] {0}, ROW_TYPE.toRowFieldTypes()); + + @Test + void testInterruptibleTimersWithWatermarks() throws Exception { + try (StreamTaskMailboxTestHarness<RowData> harness = createHarness()) { + // Register event-time timers at 4 distinct timestamps under the same key. + long key = 42L; + for (long ts = 1000L; ts <= 4000L; ts += 1000L) { + harness.processElement( + new StreamRecord<>(GenericRowData.of(key, StringData.fromString("v")), ts)); + } + harness.processAll(); + + // Advance watermark past all timers. + harness.processElement(new Watermark(5000L)); + + List<String> output = + harness.getOutput().stream() + .map(BaseTemporalSortOperatorTest::describeRecord) + .collect(Collectors.toList()); + // With interruptible timers, each timer fires and yields to the mailbox + // before the next timer fires. The mail record between fired records proves this. + assertThat(output) + .containsExactly( + firedDesc(1000L), + mailDesc(1000L), + firedDesc(2000L), + mailDesc(2000L), + firedDesc(3000L), + mailDesc(3000L), + firedDesc(4000L), + mailDesc(4000L), + watermarkDesc(5000L)); + } + } + + private StreamTaskMailboxTestHarness<RowData> createHarness() throws Exception { + return new StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, ROW_TYPE) + .addJobConfig(CheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(1)) + .addJobConfig(CheckpointingOptions.ENABLE_UNALIGNED, true) + .addJobConfig(CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS, true) + .setKeyType(KEY_SELECTOR.getProducedType()) + .addInput(ROW_TYPE, 1, KEY_SELECTOR) + .setupOperatorChain(SimpleOperatorFactory.of(new StubTemporalSortOperator())) + .name("sort") + .finishForSingletonOperatorChain(ROW_TYPE.createSerializer(null)) + .build(); + } + + private static String describeRecord(Object obj) { + if (obj instanceof Watermark) { + return watermarkDesc(((Watermark) obj).getTimestamp()); + } else if (obj instanceof StreamRecord) { + Object value = ((StreamRecord<?>) obj).getValue(); + if (value instanceof RowData) { + RowData row = (RowData) value; + return row.getString(1).toString(); + } + return value.toString(); + } + return obj.toString(); + } + + private static String firedDesc(long timestamp) { + return "fired-" + timestamp; + } + + private static String mailDesc(long timestamp) { + return "mail-" + timestamp; + } + + private static String watermarkDesc(long timestamp) { + return "Watermark@" + timestamp; + } + + /** + * A stub {@link BaseTemporalSortOperator} that registers event-time timers per key and emits + * labeled records when they fire, used to verify interruptible timer support. + */ + private static class StubTemporalSortOperator extends BaseTemporalSortOperator + implements YieldingOperator<RowData> { + + private transient @Nullable MailboxExecutor mailboxExecutor; + + @Override + public void setMailboxExecutor(MailboxExecutor mailboxExecutor) { + super.setMailboxExecutor(mailboxExecutor); + this.mailboxExecutor = mailboxExecutor; + } + + @Override + public void processElement(StreamRecord<RowData> element) throws Exception { + setCurrentKey(element.getValue()); + timerService.registerEventTimeTimer(element.getTimestamp()); + } + + @Override + public void onEventTime(InternalTimer<RowData, VoidNamespace> timer) throws Exception { + long ts = timer.getTimestamp(); + long keyValue = timer.getKey().getLong(0); + mailboxExecutor.execute( + () -> + collector.collect( + GenericRowData.of( + keyValue, StringData.fromString(mailDesc(ts)))), + mailDesc(ts)); + collector.collect(GenericRowData.of(keyValue, StringData.fromString(firedDesc(ts)))); + } + + @Override + public void onProcessingTime(InternalTimer<RowData, VoidNamespace> timer) {} + } +}
