This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new be9702a8623 [hotfix] Improve test coverage of 
UnalignedCheckpointsInterruptibleTimersTest
be9702a8623 is described below

commit be9702a86231a1bf5aa3e4a07b5af26568eb968a
Author: Piotr Nowojski <[email protected]>
AuthorDate: Wed Jan 7 17:35:12 2026 +0100

    [hotfix] Improve test coverage of 
UnalignedCheckpointsInterruptibleTimersTest
    
    Previous version was overloading more methods, and for example
    bugs in the orginal #open method would not be covered by the old
    version of this test. Bugs like for example logic of handling
    ConfigOptions.
---
 ...nalignedCheckpointsInterruptibleTimersTest.java | 25 +++++-----------------
 1 file changed, 5 insertions(+), 20 deletions(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsInterruptibleTimersTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsInterruptibleTimersTest.java
index b73021125ff..10650dcb319 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsInterruptibleTimersTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsInterruptibleTimersTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
-import org.apache.flink.streaming.api.operators.MailboxWatermarkProcessor;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.Triggerable;
@@ -170,7 +169,6 @@ class UnalignedCheckpointsInterruptibleTimersTest {
 
         private final Map<Instant, Integer> timersToRegister;
         private transient @Nullable MailboxExecutor mailboxExecutor;
-        private transient @Nullable MailboxWatermarkProcessor 
watermarkProcessor;
 
         MultipleTimersAtTheSameTimestamp() {
             this(Collections.emptyMap());
@@ -181,18 +179,14 @@ class UnalignedCheckpointsInterruptibleTimersTest {
         }
 
         @Override
-        public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
-            this.mailboxExecutor = mailboxExecutor;
+        public boolean useInterruptibleTimers() {
+            return true;
         }
 
         @Override
-        public void open() throws Exception {
-            super.open();
-            if (getTimeServiceManager().isPresent()) {
-                this.watermarkProcessor =
-                        new MailboxWatermarkProcessor(
-                                output, mailboxExecutor, 
getTimeServiceManager().get());
-            }
+        public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
+            super.setMailboxExecutor(mailboxExecutor);
+            this.mailboxExecutor = mailboxExecutor;
         }
 
         @Override
@@ -212,15 +206,6 @@ class UnalignedCheckpointsInterruptibleTimersTest {
             }
         }
 
-        @Override
-        public void processWatermark(Watermark mark) throws Exception {
-            if (watermarkProcessor == null) {
-                super.processWatermark(mark);
-            } else {
-                watermarkProcessor.emitWatermarkInsideMailbox(mark);
-            }
-        }
-
         @Override
         public void onEventTime(InternalTimer<String, String> timer) throws 
Exception {
             mailboxExecutor.execute(

Reply via email to