This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f93cac36ff51e342a002126487484cc2d3d8a0c9
Author: David Moravek <d...@apache.org>
AuthorDate: Wed Nov 15 14:31:44 2023 +0100

    [FLINK-20217][task] Allow certains operators to yield to unaligned 
checkpoint in case timers are firing.
    
    Co-authored-by: Piotr Nowojski <piotr.nowoj...@gmail.com>
---
 .../flink/configuration/CheckpointingOptions.java  |   8 +
 .../api/environment/CheckpointConfig.java          |  13 ++
 .../flink/streaming/api/graph/StreamConfig.java    |   8 +
 .../api/graph/StreamingJobGraphGenerator.java      |   2 +
 .../api/operators/AbstractStreamOperator.java      |  57 ++++-
 .../api/operators/InternalTimeServiceManager.java  |  19 ++
 .../operators/InternalTimeServiceManagerImpl.java  |  11 +
 .../api/operators/InternalTimerServiceImpl.java    |  26 ++-
 .../api/operators/MailboxWatermarkProcessor.java   |  92 +++++++++
 .../api/operators/StreamOperatorFactoryUtil.java   |   2 +-
 .../BatchExecutionInternalTimeServiceManager.java  |   7 +
 .../flink/streaming/runtime/tasks/StreamTask.java  |   9 +
 .../runtime/tasks/mailbox/MailboxProcessor.java    |  16 ++
 .../CheckpointConfigFromConfigurationTest.java     |   8 +
 .../operators/MailboxWatermarkProcessorTest.java   | 120 +++++++++++
 ...nalignedCheckpointsInterruptibleTimersTest.java | 230 +++++++++++++++++++++
 .../io/checkpointing/UnalignedCheckpointsTest.java |   4 +-
 .../runtime/operators/TableStreamOperator.java     |  11 +-
 .../streaming/util/TestStreamEnvironment.java      |   2 +
 19 files changed, 632 insertions(+), 13 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
index 10344a3d30c..ec7b82ecfd8 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java
@@ -632,6 +632,14 @@ public class CheckpointingOptions {
                                             "Forces unaligned checkpoints, 
particularly allowing them for iterative jobs.")
                                     .build());
 
+    @Experimental
+    public static final ConfigOption<Boolean> 
ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS =
+            
ConfigOptions.key("execution.checkpointing.unaligned.interruptible-timers.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Allows unaligned checkpoints to skip timers that 
are currently being fired.");
+
     public static final ConfigOption<Boolean> 
ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH =
             
ConfigOptions.key("execution.checkpointing.checkpoints-after-tasks-finish.enabled")
                     .booleanType()
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
index c404212d587..8308dcaee2c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -656,6 +656,16 @@ public class CheckpointConfig implements 
java.io.Serializable {
         return configuration.get(CheckpointingOptions.ENABLE_UNALIGNED);
     }
 
+    @Experimental
+    public void enableUnalignedCheckpointsInterruptibleTimers(boolean enabled) 
{
+        
configuration.set(CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS, 
enabled);
+    }
+
+    @Experimental
+    public boolean isUnalignedCheckpointsInterruptibleTimersEnabled() {
+        return 
configuration.get(CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS);
+    }
+
     /**
      * Only relevant if {@link #isUnalignedCheckpointsEnabled} is enabled.
      *
@@ -1065,6 +1075,9 @@ public class CheckpointConfig implements 
java.io.Serializable {
         configuration
                 .getOptional(CheckpointingOptions.ENABLE_UNALIGNED)
                 .ifPresent(this::enableUnalignedCheckpoints);
+        configuration
+                
.getOptional(CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS)
+                
.ifPresent(this::enableUnalignedCheckpointsInterruptibleTimers);
         configuration
                 
.getOptional(StateRecoveryOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA)
                 .ifPresent(this::setCheckpointIdOfIgnoredInFlightData);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index eb98755d6b7..e56ffdbf10a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -532,6 +532,14 @@ public class StreamConfig implements Serializable {
         return config.get(CheckpointingOptions.ENABLE_UNALIGNED, false);
     }
 
+    public void setUnalignedCheckpointsSplittableTimersEnabled(boolean 
enabled) {
+        
config.setBoolean(CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS, 
enabled);
+    }
+
+    public boolean isUnalignedCheckpointsSplittableTimersEnabled() {
+        return 
config.get(CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS);
+    }
+
     public boolean isExactlyOnceCheckpointMode() {
         return getCheckpointMode() == CheckpointingMode.EXACTLY_ONCE;
     }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index c8099a47913..cbe6513db5e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -1142,6 +1142,8 @@ public class StreamingJobGraphGenerator {
                         streamGraph.isEnableCheckpointsAfterTasksFinish());
         config.setCheckpointMode(getCheckpointingMode(checkpointCfg));
         
config.setUnalignedCheckpointsEnabled(checkpointCfg.isUnalignedCheckpointsEnabled());
+        config.setUnalignedCheckpointsSplittableTimersEnabled(
+                
checkpointCfg.isUnalignedCheckpointsInterruptibleTimersEnabled());
         
config.setAlignedCheckpointTimeout(checkpointCfg.getAlignedCheckpointTimeout());
         
config.setMaxSubtasksPerChannelStateFile(checkpointCfg.getMaxSubtasksPerChannelStateFile());
         
config.setMaxConcurrentCheckpoints(checkpointCfg.getMaxConcurrentCheckpoints());
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index ca82984af6a..e685d1e2b81 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -24,6 +24,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.eventtime.IndexedCombinedWatermarkStatus;
+import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
@@ -63,6 +64,8 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
@@ -93,6 +96,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 public abstract class AbstractStreamOperator<OUT>
         implements StreamOperator<OUT>,
                 SetupableStreamOperator<OUT>,
+                YieldingOperator<OUT>,
                 CheckpointedStreamOperator,
                 KeyContextHandler,
                 Serializable {
@@ -120,6 +124,10 @@ public abstract class AbstractStreamOperator<OUT>
     /** The runtime context for UDFs. */
     private transient StreamingRuntimeContext runtimeContext;
 
+    private transient @Nullable MailboxExecutor mailboxExecutor;
+
+    private transient @Nullable MailboxWatermarkProcessor watermarkProcessor;
+
     // ---------------- key/value state ------------------
 
     /**
@@ -311,6 +319,37 @@ public abstract class AbstractStreamOperator<OUT>
         return false;
     }
 
+    @Internal
+    @Override
+    public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
+        this.mailboxExecutor = mailboxExecutor;
+    }
+
+    /**
+     * Can be overridden to disable splittable timers for this particular 
operator even if config
+     * option is enabled. By default, splittable timers are disabled.
+     *
+     * @return {@code true} if splittable timers should be used (subject to 
{@link
+     *     StreamConfig#isUnalignedCheckpointsEnabled()} and {@link
+     *     StreamConfig#isUnalignedCheckpointsSplittableTimersEnabled()}. 
{@code false} if
+     *     splittable timers should never be used.
+     */
+    @Internal
+    public boolean useSplittableTimers() {
+        return false;
+    }
+
+    @Internal
+    private boolean areSplittableTimersConfigured() {
+        return areSplittableTimersConfigured(config);
+    }
+
+    static boolean areSplittableTimersConfigured(StreamConfig config) {
+        return config.isCheckpointingEnabled()
+                && config.isUnalignedCheckpointsEnabled()
+                && config.isUnalignedCheckpointsSplittableTimersEnabled();
+    }
+
     /**
      * This method is called immediately before any elements are processed, it 
should contain the
      * operator's initialization logic, e.g. state initialization.
@@ -320,7 +359,15 @@ public abstract class AbstractStreamOperator<OUT>
      * @throws Exception An exception in this method causes the operator to 
fail.
      */
     @Override
-    public void open() throws Exception {}
+    public void open() throws Exception {
+        if (useSplittableTimers()
+                && areSplittableTimersConfigured()
+                && getTimeServiceManager().isPresent()) {
+            this.watermarkProcessor =
+                    new MailboxWatermarkProcessor(
+                            output, mailboxExecutor, 
getTimeServiceManager().get());
+        }
+    }
 
     @Override
     public void finish() throws Exception {}
@@ -618,6 +665,14 @@ public abstract class AbstractStreamOperator<OUT>
     }
 
     public void processWatermark(Watermark mark) throws Exception {
+        if (watermarkProcessor != null) {
+            watermarkProcessor.emitWatermarkInsideMailbox(mark);
+        } else {
+            emitWatermarkDirectly(mark);
+        }
+    }
+
+    private void emitWatermarkDirectly(Watermark mark) throws Exception {
         if (timeServiceManager != null) {
             timeServiceManager.advanceWatermark(mark);
         }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
index f078401c045..a1b89031f5e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java
@@ -40,6 +40,16 @@ import java.io.Serializable;
  */
 @Internal
 public interface InternalTimeServiceManager<K> {
+
+    /** Signals whether the watermark should continue advancing. */
+    @Internal
+    @FunctionalInterface
+    interface ShouldStopAdvancingFn {
+
+        /** @return {@code true} if firing timers should be interrupted. */
+        boolean test();
+    }
+
     /**
      * Creates an {@link InternalTimerService} for handling a group of timers 
identified by the
      * given {@code name}. The timers are scoped to a key and namespace.
@@ -73,6 +83,15 @@ public interface InternalTimeServiceManager<K> {
      */
     void advanceWatermark(Watermark watermark) throws Exception;
 
+    /**
+     * Try to {@link #advanceWatermark(Watermark)}, but if {@link 
ShouldStopAdvancingFn} returns
+     * {@code true}, stop the advancement and return as soon as possible.
+     *
+     * @return true if {@link Watermark} has been fully processed, false 
otherwise.
+     */
+    boolean tryAdvanceWatermark(Watermark watermark, ShouldStopAdvancingFn 
shouldStopAdvancingFn)
+            throws Exception;
+
     /**
      * Snapshots the timers to raw keyed state.
      *
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java
index 886801c35ce..6bdfe05f4b9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl.java
@@ -245,6 +245,17 @@ public class InternalTimeServiceManagerImpl<K> implements 
InternalTimeServiceMan
         }
     }
 
+    @Override
+    public boolean tryAdvanceWatermark(
+            Watermark watermark, ShouldStopAdvancingFn shouldStopAdvancingFn) 
throws Exception {
+        for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
+            if (!service.tryAdvanceWatermark(watermark.getTimestamp(), 
shouldStopAdvancingFn)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     //////////////////                         Fault Tolerance Methods         
                ///////////////////
 
     @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
index fc57c625613..269ba493b18 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
@@ -307,18 +307,38 @@ public class InternalTimerServiceImpl<K, N> implements 
InternalTimerService<N> {
     }
 
     public void advanceWatermark(long time) throws Exception {
-        currentWatermark = time;
+        Preconditions.checkState(
+                tryAdvanceWatermark(
+                        time,
+                        () -> {
+                            // Never stop advancing.
+                            return false;
+                        }));
+    }
 
+    /**
+     * @return true if following watermarks can be processed immediately. 
False if the firing timers
+     *     should be interrupted as soon as possible.
+     */
+    public boolean tryAdvanceWatermark(
+            long time, InternalTimeServiceManager.ShouldStopAdvancingFn 
shouldStopAdvancingFn)
+            throws Exception {
+        currentWatermark = time;
         InternalTimer<K, N> timer;
-
+        boolean interrupted = false;
         while ((timer = eventTimeTimersQueue.peek()) != null
                 && timer.getTimestamp() <= time
-                && !cancellationContext.isCancelled()) {
+                && !cancellationContext.isCancelled()
+                && !interrupted) {
             keyContext.setCurrentKey(timer.getKey());
             eventTimeTimersQueue.poll();
             triggerTarget.onEventTime(timer);
             taskIOMetricGroup.getNumFiredTimers().inc();
+            // Check if we should stop advancing after at least one iteration 
to guarantee progress
+            // and prevent a potential starvation.
+            interrupted = shouldStopAdvancingFn.test();
         }
+        return !interrupted;
     }
 
     /**
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java
new file mode 100644
index 00000000000..02cfd966c1c
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A helper class to let operators emit watermarks incrementally from mailbox. 
Instead of emitting
+ * all the watermarks at once in a single {@code processWatermark} call, if a 
mail in mailbox is
+ * present, the process of firing timers is interrupted and a continuation to 
finish it off later is
+ * scheduled via a mailbox mail.
+ *
+ * <p>Note that interrupting firing timers can change order of some 
invocations. It is possible that
+ * between firing timers, some records might be processed.
+ */
+@Internal
+public class MailboxWatermarkProcessor<OUT> {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(MailboxWatermarkProcessor.class);
+
+    private final Output<StreamRecord<OUT>> output;
+    private final MailboxExecutor mailboxExecutor;
+    private final InternalTimeServiceManager<?> internalTimeServiceManager;
+    /**
+     * Flag to indicate whether a progress watermark is scheduled in the 
mailbox. This is used to
+     * avoid duplicate scheduling in case we have multiple watermarks to 
process.
+     */
+    private boolean progressWatermarkScheduled = false;
+
+    private Watermark maxInputWatermark = Watermark.UNINITIALIZED;
+
+    public MailboxWatermarkProcessor(
+            Output<StreamRecord<OUT>> output,
+            MailboxExecutor mailboxExecutor,
+            InternalTimeServiceManager<?> internalTimeServiceManager) {
+        this.output = checkNotNull(output);
+        this.mailboxExecutor = checkNotNull(mailboxExecutor);
+        this.internalTimeServiceManager = 
checkNotNull(internalTimeServiceManager);
+    }
+
+    public void emitWatermarkInsideMailbox(Watermark mark) throws Exception {
+        maxInputWatermark =
+                new Watermark(Math.max(maxInputWatermark.getTimestamp(), 
mark.getTimestamp()));
+        emitWatermarkInsideMailbox();
+    }
+
+    private void emitWatermarkInsideMailbox() throws Exception {
+        // Try to progress min watermark as far as we can.
+        if (internalTimeServiceManager.tryAdvanceWatermark(
+                maxInputWatermark, mailboxExecutor::shouldInterrupt)) {
+            // In case output watermark has fully progressed emit it 
downstream.
+            output.emitWatermark(maxInputWatermark);
+        } else if (!progressWatermarkScheduled) {
+            progressWatermarkScheduled = true;
+            // We still have work to do, but we need to let other mails to be 
processed first.
+            mailboxExecutor.execute(
+                    () -> {
+                        progressWatermarkScheduled = false;
+                        emitWatermarkInsideMailbox();
+                    },
+                    "emitWatermarkInsideMailbox");
+        } else {
+            // We're not guaranteed that MailboxProcessor is going to process 
all mails before
+            // processing additional input, so the advanceWatermark could be 
called before the
+            // previous watermark is fully processed.
+            LOG.debug("emitWatermarkInsideMailbox is already scheduled, 
skipping.");
+        }
+    }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java
index 6f6ff1c2de2..016a7d8b302 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java
@@ -88,7 +88,7 @@ public class StreamOperatorFactoryUtil {
                                         : processingTimeServiceFactory,
                                 operatorEventDispatcher));
         if (op instanceof YieldingOperator) {
-            ((YieldingOperator<?>) 
operatorFactory).setMailboxExecutor(mailboxExecutor);
+            ((YieldingOperator<?>) op).setMailboxExecutor(mailboxExecutor);
         }
         return new Tuple2<>(op, Optional.ofNullable(processingTimeService));
     }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java
index 32da4c5b4ae..bc08bf635bf 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionInternalTimeServiceManager.java
@@ -91,6 +91,13 @@ public class BatchExecutionInternalTimeServiceManager<K>
         }
     }
 
+    @Override
+    public boolean tryAdvanceWatermark(
+            Watermark watermark, ShouldStopAdvancingFn shouldStopAdvancingFn) {
+        advanceWatermark(watermark);
+        return true;
+    }
+
     @Override
     public void snapshotToRawKeyedState(
             KeyedStateCheckpointOutputStream context, String operatorName) 
throws Exception {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 6c70dc47039..e830b8a0343 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -954,6 +954,11 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
         }
     }
 
+    @VisibleForTesting
+    public boolean runSingleMailboxLoop() throws Exception {
+        return mailboxProcessor.runSingleMailboxLoop();
+    }
+
     @VisibleForTesting
     public boolean runMailboxStep() throws Exception {
         return mailboxProcessor.runMailboxStep();
@@ -1126,6 +1131,10 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
         return this.mailboxProcessor::getMailboxExecutor;
     }
 
+    public boolean hasMail() {
+        return mailboxProcessor.hasMail();
+    }
+
     private boolean taskIsAvailable() {
         return recordWriter.isAvailable()
                 && (changelogWriterAvailabilityProvider == null
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
index 3dae1cec3df..1a6f0694ed8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
@@ -239,6 +239,22 @@ public class MailboxProcessor implements Closeable {
         sendPoisonMail(() -> suspended = true);
     }
 
+    /**
+     * Execute a single (as small as possible) step of the mailbox.
+     *
+     * @return true if something was processed.
+     */
+    @VisibleForTesting
+    public boolean runSingleMailboxLoop() throws Exception {
+        suspended = !mailboxLoopRunning;
+        boolean processed = processMail(mailbox, true);
+        if (isDefaultActionAvailable() && isNextLoopPossible()) {
+            mailboxDefaultAction.runDefaultAction(new MailboxController(this));
+            processed = true;
+        }
+        return processed;
+    }
+
     /**
      * Execute a single (as small as possible) step of the mailbox.
      *
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
index 40c4131204e..d97ca4b9c30 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java
@@ -151,6 +151,14 @@ public class CheckpointConfigFromConfigurationTest {
                         
.viaSetter(CheckpointConfig::enableUnalignedCheckpoints)
                         
.getterVia(CheckpointConfig::isUnalignedCheckpointsEnabled)
                         .nonDefaultValue(true),
+                TestSpec.testValue(true)
+                        .whenSetFromFile(
+                                
"execution.checkpointing.unaligned.interruptible-timers.enabled",
+                                "true")
+                        
.viaSetter(CheckpointConfig::enableUnalignedCheckpointsInterruptibleTimers)
+                        .getterVia(
+                                
CheckpointConfig::isUnalignedCheckpointsInterruptibleTimersEnabled)
+                        .nonDefaultValue(true),
                 TestSpec.testValue(
                                 (CheckpointStorage)
                                         new FileSystemCheckpointStorage(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java
new file mode 100644
index 00000000000..9fb9d348525
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/MailboxWatermarkProcessorTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
+import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
+import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
+import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
+import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
+import org.apache.flink.streaming.util.CollectorOutput;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link MailboxWatermarkProcessor}. */
+class MailboxWatermarkProcessorTest {
+
+    @Test
+    void testEmitWatermarkInsideMailbox() throws Exception {
+        final List<StreamElement> emittedElements = new ArrayList<>();
+        final TaskMailboxImpl mailbox = new TaskMailboxImpl();
+        final InternalTimeServiceManager<?> timerService = new 
NoOpInternalTimeServiceManager();
+
+        final MailboxWatermarkProcessor<StreamRecord<String>> 
watermarkProcessor =
+                new MailboxWatermarkProcessor<>(
+                        new CollectorOutput<>(emittedElements),
+                        new MailboxExecutorImpl(mailbox, 0, 
StreamTaskActionExecutor.IMMEDIATE),
+                        timerService);
+        final List<Watermark> expectedOutput = new ArrayList<>();
+        watermarkProcessor.emitWatermarkInsideMailbox(new Watermark(1));
+        watermarkProcessor.emitWatermarkInsideMailbox(new Watermark(2));
+        watermarkProcessor.emitWatermarkInsideMailbox(new Watermark(3));
+        expectedOutput.add(new Watermark(1));
+        expectedOutput.add(new Watermark(2));
+        expectedOutput.add(new Watermark(3));
+
+        assertThat(emittedElements).containsExactlyElementsOf(expectedOutput);
+
+        mailbox.put(new Mail(() -> {}, TaskMailbox.MIN_PRIORITY, "checkpoint 
mail"));
+
+        watermarkProcessor.emitWatermarkInsideMailbox(new Watermark(4));
+        watermarkProcessor.emitWatermarkInsideMailbox(new Watermark(5));
+
+        assertThat(emittedElements).containsExactlyElementsOf(expectedOutput);
+
+        while (mailbox.hasMail()) {
+            mailbox.take(TaskMailbox.MIN_PRIORITY).run();
+        }
+        // Watermark(4) is processed together with Watermark(5)
+        expectedOutput.add(new Watermark(5));
+
+        assertThat(emittedElements).containsExactlyElementsOf(expectedOutput);
+    }
+
+    private static class NoOpInternalTimeServiceManager
+            implements InternalTimeServiceManager<Object> {
+        @Override
+        public <N> InternalTimerService<N> getInternalTimerService(
+                String name,
+                TypeSerializer<Object> keySerializer,
+                TypeSerializer<N> namespaceSerializer,
+                Triggerable<Object, N> triggerable) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public <N> InternalTimerService<N> getAsyncInternalTimerService(
+                String name,
+                TypeSerializer<Object> keySerializer,
+                TypeSerializer<N> namespaceSerializer,
+                Triggerable<Object, N> triggerable,
+                AsyncExecutionController<Object> asyncExecutionController) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void advanceWatermark(Watermark watermark) throws Exception {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean tryAdvanceWatermark(
+                Watermark watermark, ShouldStopAdvancingFn 
shouldStopAdvancingFn) throws Exception {
+            return !shouldStopAdvancingFn.test();
+        }
+
+        @Override
+        public void snapshotToRawKeyedState(
+                KeyedStateCheckpointOutputStream stateCheckpointOutputStream, 
String operatorName)
+                throws Exception {
+            throw new UnsupportedOperationException();
+        }
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsInterruptibleTimersTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsInterruptibleTimersTest.java
new file mode 100644
index 00000000000..a5e3712ec76
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsInterruptibleTimersTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io.checkpointing;
+
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.MailboxWatermarkProcessor;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.YieldingOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
+import 
org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import javax.annotation.Nullable;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests interaction between {@link 
CheckpointingOptions#ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS} and
+ * unaligned checkpoints.
+ */
+@ExtendWith(TestLoggerExtension.class)
+class UnalignedCheckpointsInterruptibleTimersTest {
+
+    @Test
+    void testSingleWatermarkHoldingOperatorInTheChain() throws Exception {
+        final Instant firstWindowEnd = Instant.ofEpochMilli(1000L);
+        final int numFirstWindowTimers = 2;
+        final Instant secondWindowEnd = Instant.ofEpochMilli(2000L);
+        final int numSecondWindowTimers = 2;
+
+        try (final StreamTaskMailboxTestHarness<String> harness =
+                new 
StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, Types.STRING)
+                        .modifyStreamConfig(
+                                
UnalignedCheckpointsInterruptibleTimersTest::setupStreamConfig)
+                        .addInput(Types.STRING)
+                        .setupOperatorChain(
+                                SimpleOperatorFactory.of(
+                                        new MultipleTimersAtTheSameTimestamp()
+                                                .withTimers(firstWindowEnd, 
numFirstWindowTimers)
+                                                .withTimers(
+                                                        secondWindowEnd, 
numSecondWindowTimers)))
+                        .name("first")
+                        
.finishForSingletonOperatorChain(StringSerializer.INSTANCE)
+                        .build()) {
+            harness.processElement(new StreamRecord<>("register timers"));
+            harness.processAll();
+            harness.processElement(asWatermark(firstWindowEnd));
+            harness.processElement(asWatermark(secondWindowEnd));
+
+            assertThat(harness.getOutput())
+                    .containsExactly(
+                            asFiredRecord("key-0"),
+                            asMailRecord("key-0"),
+                            asFiredRecord("key-1"),
+                            asMailRecord("key-1"),
+                            asWatermark(firstWindowEnd),
+                            asFiredRecord("key-0"),
+                            asMailRecord("key-0"),
+                            asFiredRecord("key-1"),
+                            asMailRecord("key-1"),
+                            asWatermark(secondWindowEnd));
+        }
+    }
+
+    @Test
+    void testWatermarkProgressWithNoTimers() throws Exception {
+        final Instant firstWindowEnd = Instant.ofEpochMilli(1000L);
+        final Instant secondWindowEnd = Instant.ofEpochMilli(2000L);
+
+        try (final StreamTaskMailboxTestHarness<String> harness =
+                new 
StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, Types.STRING)
+                        .modifyStreamConfig(
+                                
UnalignedCheckpointsInterruptibleTimersTest::setupStreamConfig)
+                        .addInput(Types.STRING)
+                        .setupOperatorChain(
+                                SimpleOperatorFactory.of(new 
MultipleTimersAtTheSameTimestamp()))
+                        .name("first")
+                        
.finishForSingletonOperatorChain(StringSerializer.INSTANCE)
+                        .build()) {
+            harness.setAutoProcess(false);
+            harness.processElement(new StreamRecord<>("impulse"));
+            harness.processAll();
+            harness.processElement(asWatermark(firstWindowEnd));
+            harness.processElement(asWatermark(secondWindowEnd));
+
+            final List<Watermark> seenWatermarks = new ArrayList<>();
+            while (seenWatermarks.size() < 2) {
+                harness.processSingleStep();
+                Object outputElement;
+                while ((outputElement = harness.getOutput().poll()) != null) {
+                    if (outputElement instanceof Watermark) {
+                        seenWatermarks.add((Watermark) outputElement);
+                    }
+                }
+            }
+            assertThat(seenWatermarks)
+                    .containsExactly(asWatermark(firstWindowEnd), 
asWatermark(secondWindowEnd));
+        }
+    }
+
+    private static Watermark asWatermark(Instant timestamp) {
+        return new Watermark(timestamp.toEpochMilli());
+    }
+
+    private static StreamRecord<String> asFiredRecord(String key) {
+        return new StreamRecord("fired-" + key);
+    }
+
+    private static StreamRecord<String> asMailRecord(String key) {
+        return new StreamRecord("mail-" + key);
+    }
+
+    private static void setupStreamConfig(StreamConfig cfg) {
+        cfg.setUnalignedCheckpointsEnabled(true);
+        cfg.setUnalignedCheckpointsSplittableTimersEnabled(true);
+        cfg.setStateKeySerializer(StringSerializer.INSTANCE);
+    }
+
+    private static class MultipleTimersAtTheSameTimestamp extends 
AbstractStreamOperator<String>
+            implements OneInputStreamOperator<String, String>,
+                    Triggerable<String, String>,
+                    YieldingOperator<String> {
+
+        private final Map<Instant, Integer> timersToRegister;
+        private transient @Nullable MailboxExecutor mailboxExecutor;
+        private transient @Nullable MailboxWatermarkProcessor 
watermarkProcessor;
+
+        MultipleTimersAtTheSameTimestamp() {
+            this(Collections.emptyMap());
+        }
+
+        MultipleTimersAtTheSameTimestamp(Map<Instant, Integer> 
timersToRegister) {
+            this.timersToRegister = timersToRegister;
+        }
+
+        @Override
+        public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
+            this.mailboxExecutor = mailboxExecutor;
+        }
+
+        @Override
+        public void open() throws Exception {
+            super.open();
+            if (getTimeServiceManager().isPresent()) {
+                this.watermarkProcessor =
+                        new MailboxWatermarkProcessor(
+                                output, mailboxExecutor, 
getTimeServiceManager().get());
+            }
+        }
+
+        @Override
+        public void processElement(StreamRecord<String> element) {
+            if (!timersToRegister.isEmpty()) {
+                final InternalTimerService<String> timers =
+                        getInternalTimerService("timers", 
StringSerializer.INSTANCE, this);
+                for (Map.Entry<Instant, Integer> entry : 
timersToRegister.entrySet()) {
+                    for (int keyIdx = 0; keyIdx < entry.getValue(); keyIdx++) {
+                        final String key = String.format("key-%d", keyIdx);
+                        setCurrentKey(key);
+                        timers.registerEventTimeTimer(
+                                String.format("window-%s", entry.getKey()),
+                                entry.getKey().toEpochMilli());
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void processWatermark(Watermark mark) throws Exception {
+            if (watermarkProcessor == null) {
+                super.processWatermark(mark);
+            } else {
+                watermarkProcessor.emitWatermarkInsideMailbox(mark);
+            }
+        }
+
+        @Override
+        public void onEventTime(InternalTimer<String, String> timer) throws 
Exception {
+            mailboxExecutor.execute(
+                    () -> output.collect(asMailRecord(timer.getKey())), 
"mail-" + timer.getKey());
+            output.collect(asFiredRecord(timer.getKey()));
+        }
+
+        @Override
+        public void onProcessingTime(InternalTimer<String, String> timer) 
throws Exception {}
+
+        MultipleTimersAtTheSameTimestamp withTimers(Instant timestamp, int 
count) {
+            final Map<Instant, Integer> copy = new HashMap<>(timersToRegister);
+            copy.put(timestamp, count);
+            return new MultipleTimersAtTheSameTimestamp(copy);
+        }
+    }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsTest.java
index 82c06c851f8..53e59680a7d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/UnalignedCheckpointsTest.java
@@ -548,7 +548,7 @@ class UnalignedCheckpointsTest {
     }
 
     @Test
-    void testNotifyAbortCheckpointBeforeCanellingAsyncCheckpoint() throws 
Exception {
+    public void testNotifyAbortCheckpointBeforeCancellingAsyncCheckpoint() 
throws Exception {
         ValidateAsyncFutureNotCompleted handler = new 
ValidateAsyncFutureNotCompleted(1);
         inputGate = createInputGate(2, handler);
         handler.setInputGate(inputGate);
@@ -699,7 +699,7 @@ class UnalignedCheckpointsTest {
      * Tests {@link
      * 
SingleCheckpointBarrierHandler#processCancellationBarrier(CancelCheckpointMarker,
      * InputChannelInfo)} abort the current pending checkpoint triggered by 
{@link
-     * CheckpointBarrierHandler#processBarrier(CheckpointBarrier, 
InputChannelInfo)}.
+     * CheckpointBarrierHandler#processBarrier(CheckpointBarrier, 
InputChannelInfo, boolean)}.
      */
     @Test
     void testProcessCancellationBarrierAfterProcessBarrier() throws Exception {
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/TableStreamOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/TableStreamOperator.java
index b8717bb1ce9..d91a9b3102e 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/TableStreamOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/TableStreamOperator.java
@@ -52,12 +52,6 @@ public abstract class TableStreamOperator<OUT> extends 
AbstractStreamOperator<OU
         this.ctx = new ContextImpl(getProcessingTimeService());
     }
 
-    @Override
-    public void processWatermark(Watermark mark) throws Exception {
-        super.processWatermark(mark);
-        currentWatermark = mark.getTimestamp();
-    }
-
     /** Compute memory size from memory faction. */
     public long computeMemorySize() {
         final Environment environment = getContainingTask().getEnvironment();
@@ -72,6 +66,11 @@ public abstract class TableStreamOperator<OUT> extends 
AbstractStreamOperator<OU
                                         
environment.getUserCodeClassLoader().asClassLoader()));
     }
 
+    @Override
+    public void processWatermark(Watermark mark) throws Exception {
+        currentWatermark = mark.getTimestamp();
+        super.processWatermark(mark);
+    }
     /** Information available in an invocation of processElement. */
     protected class ContextImpl implements TimerService {
 
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index f233f80c36c..ec595609437 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -120,6 +120,8 @@ public class TestStreamEnvironment extends 
StreamExecutionEnvironment {
                     Duration.ofMillis(100),
                     Duration.ofSeconds(2));
             randomize(conf, CheckpointingOptions.CLEANER_PARALLEL_MODE, true, 
false);
+            randomize(
+                    conf, 
CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS, true, false);
             randomize(conf, ExecutionOptions.SNAPSHOT_COMPRESSION, true, 
false);
             if (!conf.contains(CheckpointingOptions.FILE_MERGING_ENABLED)) {
                 randomize(conf, CheckpointingOptions.FILE_MERGING_ENABLED, 
true);


Reply via email to