This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cd722033fb3 [FLINK-26050] Manually compact small SST files
cd722033fb3 is described below

commit cd722033fb326837a80a6233603d12ad176da15c
Author: Roman Khachatryan <khachatryan.ro...@gmail.com>
AuthorDate: Fri Feb 16 19:42:43 2024 +0100

    [FLINK-26050] Manually compact small SST files
    
    In some cases, the number of files produced by RocksDB state backend
    grows indefinitely.
    This might cause task state info (TDD and checkpoint ACK) to
    exceed RPC message size and fail recovery/checkpoint in addition to
    having lots of small files.
    
    With this change, such files are merged in the background using RocksDB API.
---
 .../state/api/runtime/SavepointEnvironment.java    |   6 +-
 .../flink/runtime/io/disk/iomanager/IOManager.java |  10 +-
 .../runtime/io/disk/iomanager/IOManagerAsync.java  |  31 ++-
 .../runtime/taskexecutor/TaskManagerServices.java  |   2 +-
 .../runtime/io/disk/iomanager/IOManagerTest.java   |   3 +-
 .../flink-statebackend-rocksdb/pom.xml             |   5 +
 .../state/EmbeddedRocksDBStateBackend.java         |  20 +-
 .../streaming/state/RocksDBKeyedStateBackend.java  |  14 +-
 .../state/RocksDBKeyedStateBackendBuilder.java     |  33 ++-
 .../state/RocksDBPriorityQueueSetFactory.java      |   7 +-
 .../state/sstmerge/ColumnFamilyLookup.java         |  77 ++++++
 .../state/sstmerge/CompactionScheduler.java        | 149 +++++++++++
 .../streaming/state/sstmerge/CompactionTask.java   |  73 ++++++
 .../state/sstmerge/CompactionTaskProducer.java     | 234 +++++++++++++++++
 .../state/sstmerge/CompactionTracker.java          | 131 ++++++++++
 .../streaming/state/sstmerge/Compactor.java        |  80 ++++++
 .../sstmerge/RocksDBManualCompactionConfig.java    | 167 +++++++++++++
 .../sstmerge/RocksDBManualCompactionManager.java   |  70 ++++++
 .../RocksDBManualCompactionManagerImpl.java        |  84 +++++++
 .../sstmerge/RocksDBManualCompactionOptions.java   |  86 +++++++
 .../state/EmbeddedRocksDBStateBackendTest.java     |  53 ++++
 .../state/sstmerge/CompactionSchedulerTest.java    |  50 ++++
 .../state/sstmerge/CompactionTaskProducerTest.java | 278 +++++++++++++++++++++
 .../ManuallyTriggeredScheduledExecutorService.java |   4 +
 24 files changed, 1652 insertions(+), 15 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
index 00f471f1a7b..42ffb7e6037 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java
@@ -68,6 +68,7 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.UserCodeClassLoader;
+import org.apache.flink.util.concurrent.Executors;
 
 import java.util.Collections;
 import java.util.Map;
@@ -141,7 +142,10 @@ public class SavepointEnvironment implements Environment {
 
         this.registry = new KvStateRegistry().createTaskRegistry(jobID, 
vertexID);
         this.taskStateManager = new 
SavepointTaskStateManager(prioritizedOperatorSubtaskState);
-        this.ioManager = new 
IOManagerAsync(ConfigurationUtils.parseTempDirectories(configuration));
+        this.ioManager =
+                new IOManagerAsync(
+                        ConfigurationUtils.parseTempDirectories(configuration),
+                        Executors.newDirectExecutorService());
         this.memoryManager = MemoryManager.create(64 * 1024 * 1024, 
DEFAULT_PAGE_SIZE);
         this.sharedResources = new SharedResources();
         this.accumulatorRegistry = new AccumulatorRegistry(jobID, attemptID);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index 9c1e9d9c245..d56b122f25c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -33,6 +33,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.stream.Collectors;
 
@@ -44,6 +45,8 @@ public abstract class IOManager implements AutoCloseable {
 
     private final FileChannelManager fileChannelManager;
 
+    protected final ExecutorService executorService;
+
     // 
-------------------------------------------------------------------------
     //               Constructors / Destructors
     // 
-------------------------------------------------------------------------
@@ -53,7 +56,7 @@ public abstract class IOManager implements AutoCloseable {
      *
      * @param tempDirs The basic directories for files underlying anonymous 
channels.
      */
-    protected IOManager(String[] tempDirs) {
+    protected IOManager(String[] tempDirs, ExecutorService executorService) {
         this.fileChannelManager =
                 new 
FileChannelManagerImpl(Preconditions.checkNotNull(tempDirs), DIR_NAME_PREFIX);
         if (LOG.isInfoEnabled()) {
@@ -64,6 +67,7 @@ public abstract class IOManager implements AutoCloseable {
                             .map(File::getAbsolutePath)
                             .collect(Collectors.joining("\n\t")));
         }
+        this.executorService = executorService;
     }
 
     /** Removes all temporary files. */
@@ -226,4 +230,8 @@ public abstract class IOManager implements AutoCloseable {
      */
     public abstract BulkBlockChannelReader createBulkBlockChannelReader(
             ID channelID, List<MemorySegment> targetSegments, int numBlocks) 
throws IOException;
+
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
index d1ce53e2fdb..52408bb5d14 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
@@ -18,16 +18,19 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.ShutdownHookUtil;
+import org.apache.flink.util.concurrent.Executors;
 
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -53,8 +56,17 @@ public class IOManagerAsync extends IOManager implements 
UncaughtExceptionHandle
     // 
-------------------------------------------------------------------------
 
     /** Constructs a new asynchronous I/O manager, writing files to the system 
's temp directory. */
+    @VisibleForTesting
     public IOManagerAsync() {
-        this(EnvironmentInformation.getTemporaryFileDirectory());
+        this(
+                EnvironmentInformation.getTemporaryFileDirectory(),
+                Executors.newDirectExecutorService());
+    }
+
+    /** Constructs a new asynchronous I/O manager, writing files to the system 
's temp directory. */
+    @VisibleForTesting
+    public IOManagerAsync(ExecutorService executorService) {
+        this(EnvironmentInformation.getTemporaryFileDirectory(), 
executorService);
     }
 
     /**
@@ -62,8 +74,19 @@ public class IOManagerAsync extends IOManager implements 
UncaughtExceptionHandle
      *
      * @param tempDir The directory to write temporary files to.
      */
+    @VisibleForTesting
     public IOManagerAsync(String tempDir) {
-        this(new String[] {tempDir});
+        this(new String[] {tempDir}, Executors.newDirectExecutorService());
+    }
+
+    /**
+     * Constructs a new asynchronous I/O manager, writing file to the given 
directory.
+     *
+     * @param tempDir The directory to write temporary files to.
+     */
+    @VisibleForTesting
+    public IOManagerAsync(String tempDir, ExecutorService executorService) {
+        this(new String[] {tempDir}, executorService);
     }
 
     /**
@@ -72,8 +95,8 @@ public class IOManagerAsync extends IOManager implements 
UncaughtExceptionHandle
      *
      * @param tempDirs The directories to write temporary files to.
      */
-    public IOManagerAsync(String[] tempDirs) {
-        super(tempDirs);
+    public IOManagerAsync(String[] tempDirs, ExecutorService executorService) {
+        super(tempDirs, executorService);
 
         // start a write worker thread for each directory
         this.writers = new WriterThread[tempDirs.length];
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index 6398107ea48..e514123ee77 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -339,7 +339,7 @@ public class TaskManagerServices {
 
         // start the I/O manager, it will create some temp directories.
         final IOManager ioManager =
-                new 
IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
+                new 
IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths(), ioExecutor);
 
         final ShuffleEnvironment<?, ?> shuffleEnvironment =
                 createShuffleEnvironment(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
index 67a65d1f24a..f64d2dc4891 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.io.TempDir;
 import java.io.File;
 import java.nio.file.Files;
 import java.util.List;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -84,7 +85,7 @@ class IOManagerTest {
     private static class TestIOManager extends IOManager {
 
         protected TestIOManager(String[] paths) {
-            super(paths);
+            super(paths, Executors.newSingleThreadScheduledExecutor());
         }
 
         @Override
diff --git a/flink-state-backends/flink-statebackend-rocksdb/pom.xml 
b/flink-state-backends/flink-statebackend-rocksdb/pom.xml
index 1a39fc84b7f..3186b968e54 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/pom.xml
+++ b/flink-state-backends/flink-statebackend-rocksdb/pom.xml
@@ -66,6 +66,11 @@ under the License.
                        <version>6.20.3-ververica-2.0</version>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-shaded-guava</artifactId>
+               </dependency>
+
                <!-- test dependencies -->
 
                <dependency>
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
index baca5f4a013..4dd4c26c8fc 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.configuration.description.InlineElement;
 import 
org.apache.flink.contrib.streaming.state.RocksDBMemoryControllerUtils.RocksDBMemoryFactory;
+import 
org.apache.flink.contrib.streaming.state.sstmerge.RocksDBManualCompactionConfig;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
@@ -46,6 +47,7 @@ import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.DynamicCodeLoadingException;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.MdcUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TernaryBoolean;
 
@@ -195,6 +197,8 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
     private RocksDBMemoryFactory rocksDBMemoryFactory;
     // ------------------------------------------------------------------------
 
+    private final RocksDBManualCompactionConfig manualCompactionConfig;
+
     /** Creates a new {@code EmbeddedRocksDBStateBackend} for storing local 
state. */
     public EmbeddedRocksDBStateBackend() {
         this(TernaryBoolean.UNDEFINED);
@@ -226,6 +230,7 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
         this.useIngestDbRestoreMode = TernaryBoolean.UNDEFINED;
         this.incrementalRestoreAsyncCompactAfterRescale = 
TernaryBoolean.UNDEFINED;
         this.rescalingUseDeleteFilesInRange = TernaryBoolean.UNDEFINED;
+        this.manualCompactionConfig = null;
     }
 
     /**
@@ -337,6 +342,11 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
                         config);
 
         this.rocksDBMemoryFactory = original.rocksDBMemoryFactory;
+
+        this.manualCompactionConfig =
+                original.manualCompactionConfig != null
+                        ? original.manualCompactionConfig
+                        : RocksDBManualCompactionConfig.from(config);
     }
 
     // ------------------------------------------------------------------------
@@ -510,7 +520,15 @@ public class EmbeddedRocksDBStateBackend extends 
AbstractManagedMemoryStateBacke
                         .setIncrementalRestoreAsyncCompactAfterRescale(
                                 
getIncrementalRestoreAsyncCompactAfterRescale())
                         .setUseIngestDbRestoreMode(getUseIngestDbRestoreMode())
-                        
.setRescalingUseDeleteFilesInRange(isRescalingUseDeleteFilesInRange());
+                        
.setRescalingUseDeleteFilesInRange(isRescalingUseDeleteFilesInRange())
+                        .setIOExecutor(
+                                MdcUtils.scopeToJob(
+                                        jobId,
+                                        
parameters.getEnv().getIOManager().getExecutorService()))
+                        .setManualCompactionConfig(
+                                manualCompactionConfig == null
+                                        ? 
RocksDBManualCompactionConfig.getDefault()
+                                        : manualCompactionConfig);
         return builder.build();
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 8e531ecebf8..47225691b54 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysAndNamesp
 import 
org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysIterator;
 import 
org.apache.flink.contrib.streaming.state.snapshot.RocksDBFullSnapshotResources;
 import 
org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase;
+import 
org.apache.flink.contrib.streaming.state.sstmerge.RocksDBManualCompactionManager;
 import 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.memory.DataInputDeserializer;
@@ -160,6 +161,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                     StateDescriptor.Type.REDUCING,
                                     (StateUpdateFactory) 
RocksDBReducingState::update))
                     .collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+    private final RocksDBManualCompactionManager sstMergeManager;
 
     private interface StateCreateFactory {
         <K, N, SV, S extends State, IS extends S> IS createState(
@@ -290,7 +292,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
             RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
             InternalKeyContext<K> keyContext,
             @Nonnegative long writeBatchSize,
-            @Nullable CompletableFuture<Void> asyncCompactFuture) {
+            @Nullable CompletableFuture<Void> asyncCompactFuture,
+            RocksDBManualCompactionManager rocksDBManualCompactionManager) {
 
         super(
                 kvStateRegistry,
@@ -338,6 +341,11 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         } else {
             this.heapPriorityQueuesManager = null;
         }
+        this.sstMergeManager = rocksDBManualCompactionManager;
+        for (RocksDbKvStateInfo stateInfo : kvStateInformation.values()) {
+            this.sstMergeManager.register(stateInfo);
+        }
+        this.sstMergeManager.start();
     }
 
     @SuppressWarnings("unchecked")
@@ -446,6 +454,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
         }
         super.dispose();
 
+        IOUtils.closeQuietly(sstMergeManager);
+
         // This call will block until all clients that still acquire access to 
the RocksDB instance
         // have released it,
         // so that we cannot release the native resources while clients are 
still working with it in
@@ -685,6 +695,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
             newRocksStateInfo =
                     new RocksDbKvStateInfo(oldStateInfo.columnFamilyHandle, 
newMetaInfo);
             kvStateInformation.put(stateDesc.getName(), newRocksStateInfo);
+            sstMergeManager.register(newRocksStateInfo);
         } else {
             newMetaInfo =
                     new RegisteredKeyValueStateBackendMetaInfo<>(
@@ -711,6 +722,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                     this.nativeMetricMonitor,
                     stateDesc.getName(),
                     newRocksStateInfo);
+            sstMergeManager.register(newRocksStateInfo);
         }
 
         StateSnapshotTransformFactory<SV> wrappedSnapshotTransformFactory =
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 226ec0ac36a..1a63950305d 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -30,6 +30,8 @@ import 
org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreResult;
 import 
org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase;
 import 
org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy;
 import 
org.apache.flink.contrib.streaming.state.snapshot.RocksNativeFullSnapshotStrategy;
+import 
org.apache.flink.contrib.streaming.state.sstmerge.RocksDBManualCompactionConfig;
+import 
org.apache.flink.contrib.streaming.state.sstmerge.RocksDBManualCompactionManager;
 import 
org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.metrics.MetricGroup;
@@ -76,6 +78,7 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
 
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE;
@@ -83,6 +86,7 @@ import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOption
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING;
 import static 
org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Builder class for {@link RocksDBKeyedStateBackend} which handles all 
necessary initializations
@@ -138,6 +142,9 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
     private boolean useIngestDbRestoreMode = 
USE_INGEST_DB_RESTORE_MODE.defaultValue();
     private ColumnFamilyHandle injectedDefaultColumnFamilyHandle; // for 
testing
     private RocksDBStateUploader injectRocksDBStateUploader; // for testing
+    private RocksDBManualCompactionConfig manualCompactionConfig =
+            RocksDBManualCompactionConfig.getDefault();
+    private ExecutorService ioExecutor;
 
     public RocksDBKeyedStateBackendBuilder(
             String operatorIdentifier,
@@ -298,6 +305,11 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
         return this;
     }
 
+    RocksDBKeyedStateBackendBuilder<K> setIOExecutor(ExecutorService 
ioExecutor) {
+        this.ioExecutor = ioExecutor;
+        return this;
+    }
+
     public static File getInstanceRocksDBPath(File instanceBasePath) {
         return new File(instanceBasePath, DB_INSTANCE_DIR_STRING);
     }
@@ -337,6 +349,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
         int keyGroupPrefixBytes =
                 
CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(
                         numberOfKeyGroups);
+        RocksDBManualCompactionManager manualCompactionManager;
 
         try {
             // Variables for snapshot strategy when incremental checkpoint is 
enabled
@@ -398,13 +411,16 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                             materializedSstFiles,
                             lastCompletedCheckpointId);
             // init priority queue factory
+            manualCompactionManager =
+                    RocksDBManualCompactionManager.create(db, 
manualCompactionConfig, ioExecutor);
             priorityQueueFactory =
                     initPriorityQueueFactory(
                             keyGroupPrefixBytes,
                             kvStateInformation,
                             db,
                             writeBatchWrapper,
-                            nativeMetricMonitor);
+                            nativeMetricMonitor,
+                            manualCompactionManager);
         } catch (Throwable e) {
             // Do clean up
             List<ColumnFamilyOptions> columnFamilyOptions =
@@ -472,7 +488,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                 ttlCompactFiltersManager,
                 keyContext,
                 writeBatchSize,
-                asyncCompactAfterRestoreFuture);
+                asyncCompactAfterRestoreFuture,
+                manualCompactionManager);
     }
 
     private RocksDBRestoreOperation getRocksDBRestoreOperation(
@@ -607,7 +624,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
             Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> 
kvStateInformation,
             RocksDB db,
             RocksDBWriteBatchWrapper writeBatchWrapper,
-            RocksDBNativeMetricMonitor nativeMetricMonitor) {
+            RocksDBNativeMetricMonitor nativeMetricMonitor,
+            RocksDBManualCompactionManager manualCompactionManager) {
         PriorityQueueSetFactory priorityQueueFactory;
         switch (priorityQueueConfig.getPriorityQueueStateType()) {
             case HEAP:
@@ -626,7 +644,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
                                 nativeMetricMonitor,
                                 columnFamilyOptionsFactory,
                                 
optionsContainer.getWriteBufferManagerCapacity(),
-                                
priorityQueueConfig.getRocksDBPriorityQueueSetCacheSize());
+                                
priorityQueueConfig.getRocksDBPriorityQueueSetCacheSize(),
+                                manualCompactionManager);
                 break;
             default:
                 throw new IllegalArgumentException(
@@ -648,4 +667,10 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
             FileUtils.deleteDirectory(instanceBasePath);
         }
     }
+
+    public RocksDBKeyedStateBackendBuilder<K> setManualCompactionConfig(
+            RocksDBManualCompactionConfig manualCompactionConfig) {
+        this.manualCompactionConfig = checkNotNull(manualCompactionConfig);
+        return this;
+    }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
index 23a1a6913f6..28bd3b0d1a5 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.contrib.streaming.state;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import 
org.apache.flink.contrib.streaming.state.sstmerge.RocksDBManualCompactionManager;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.runtime.state.KeyExtractorFunction;
@@ -71,6 +72,7 @@ public class RocksDBPriorityQueueSetFactory implements 
PriorityQueueSetFactory {
     private final RocksDBNativeMetricMonitor nativeMetricMonitor;
     private final Function<String, ColumnFamilyOptions> 
columnFamilyOptionsFactory;
     private final Long writeBufferManagerCapacity;
+    private final RocksDBManualCompactionManager manualCompactionManager;
 
     RocksDBPriorityQueueSetFactory(
             KeyGroupRange keyGroupRange,
@@ -83,7 +85,8 @@ public class RocksDBPriorityQueueSetFactory implements 
PriorityQueueSetFactory {
             RocksDBNativeMetricMonitor nativeMetricMonitor,
             Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
             Long writeBufferManagerCapacity,
-            int cacheSize) {
+            int cacheSize,
+            RocksDBManualCompactionManager manualCompactionManager) {
         this.keyGroupRange = keyGroupRange;
         this.keyGroupPrefixBytes = keyGroupPrefixBytes;
         this.numberOfKeyGroups = numberOfKeyGroups;
@@ -98,6 +101,7 @@ public class RocksDBPriorityQueueSetFactory implements 
PriorityQueueSetFactory {
         this.writeBufferManagerCapacity = writeBufferManagerCapacity;
         Preconditions.checkArgument(cacheSize > 0);
         this.cacheSize = cacheSize;
+        this.manualCompactionManager = manualCompactionManager;
     }
 
     @Nonnull
@@ -225,6 +229,7 @@ public class RocksDBPriorityQueueSetFactory implements 
PriorityQueueSetFactory {
                 kvStateInformation.put(stateName, stateInfo);
             }
         }
+        manualCompactionManager.register(stateInfo);
 
         return stateInfo;
     }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/ColumnFamilyLookup.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/ColumnFamilyLookup.java
new file mode 100644
index 00000000000..66c79c2b8f8
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/ColumnFamilyLookup.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.sstmerge;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Lookup helper for RocksDB column families by their name. */
+class ColumnFamilyLookup {
+    private final Map<Key, ColumnFamilyHandle> map;
+
+    public ColumnFamilyLookup() {
+        map = new ConcurrentHashMap<>();
+    }
+
+    @Nullable
+    public ColumnFamilyHandle get(byte[] name) {
+        return map.get(new Key(name));
+    }
+
+    public void add(ColumnFamilyHandle handle) {
+        try {
+            map.put(new Key(handle.getName()), handle);
+        } catch (RocksDBException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static class Key {
+        private final byte[] payload;
+
+        private Key(byte[] payload) {
+            this.payload = checkNotNull(payload);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            Key key = (Key) o;
+            return Arrays.equals(payload, key.payload);
+        }
+
+        @Override
+        public int hashCode() {
+            return Arrays.hashCode(payload);
+        }
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionScheduler.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionScheduler.java
new file mode 100644
index 00000000000..4c81e6e054d
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionScheduler.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.sstmerge;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Collections.emptyList;
+
+/**
+ * Schedules manual compactions of small disjoint SST files created by 
RocksDB. It does so
+ * periodically while maintaining {@link 
RocksDBManualCompactionOptions#MIN_INTERVAL} between
+ * compaction rounds, where each round is at most {@link
+ * RocksDBManualCompactionOptions#MAX_PARALLEL_COMPACTIONS}.
+ */
+class CompactionScheduler {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CompactionScheduler.class);
+
+    private final ScheduledExecutorService scheduledExecutor;
+    private final ExecutorService ioExecutor;
+    private final long checkPeriodMs;
+    private final CompactionTracker tracker;
+    private final Compactor compactor;
+    private final CompactionTaskProducer taskProducer;
+    private final Object lock = new Object();
+    private boolean running = true;
+
+    public CompactionScheduler(
+            RocksDBManualCompactionConfig settings,
+            ExecutorService ioExecutor,
+            CompactionTaskProducer taskProducer,
+            Compactor compactor,
+            CompactionTracker tracker) {
+        this(
+                settings,
+                ioExecutor,
+                taskProducer,
+                compactor,
+                tracker,
+                Executors.newSingleThreadScheduledExecutor());
+    }
+
+    public CompactionScheduler(
+            RocksDBManualCompactionConfig settings,
+            ExecutorService ioExecutor,
+            CompactionTaskProducer taskProducer,
+            Compactor compactor,
+            CompactionTracker tracker,
+            ScheduledExecutorService scheduledExecutor) {
+        this.ioExecutor = ioExecutor;
+        this.scheduledExecutor = scheduledExecutor;
+        this.checkPeriodMs = settings.minInterval;
+        this.tracker = tracker;
+        this.compactor = compactor;
+        this.taskProducer = taskProducer;
+    }
+
+    public void start() {
+        scheduleScan();
+    }
+
+    public void stop() throws InterruptedException {
+        synchronized (lock) {
+            if (running) {
+                running = false;
+                scheduledExecutor.shutdownNow();
+            }
+        }
+        if (!scheduledExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+            LOG.warn("Unable to terminate scheduled tasks in 5s");
+        }
+    }
+
+    public void scheduleScan() {
+        synchronized (lock) {
+            if (running) {
+                LOG.trace("Schedule SST scan in {} ms", checkPeriodMs);
+                scheduledExecutor.schedule(
+                        () -> ioExecutor.execute(this::maybeScan),
+                        checkPeriodMs,
+                        TimeUnit.MILLISECONDS);
+            } else {
+                LOG.debug("Not scheduling next scan: shutting down");
+            }
+        }
+    }
+
+    public void maybeScan() {
+        LOG.trace("Starting SST scan");
+        if (tracker.haveManualCompactions() || tracker.isShuttingDown()) {
+            LOG.trace("Skip SST scan {}", tracker);
+            // nothing to do:
+            // previous compactions didn't finish yet
+            // the last one will reschedule this task
+            return;
+        }
+
+        final List<CompactionTask> targets = scan();
+        LOG.trace("SST scan resulted in targets {}", targets);
+        if (targets.isEmpty()) {
+            scheduleScan();
+            return;
+        }
+
+        for (CompactionTask target : targets) {
+            ioExecutor.execute(
+                    () ->
+                            tracker.runWithTracking(
+                                    target.columnFamilyHandle,
+                                    () ->
+                                            compactor.compact(
+                                                    target.columnFamilyHandle,
+                                                    target.level,
+                                                    target.files),
+                                    this::scheduleScan));
+        }
+    }
+
+    private List<CompactionTask> scan() {
+        try {
+            return taskProducer.produce();
+        } catch (Exception e) {
+            LOG.warn("Unable to scan for compaction targets", e);
+            return emptyList();
+        }
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionTask.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionTask.java
new file mode 100644
index 00000000000..5559be5cf46
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionTask.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.sstmerge;
+
+import org.rocksdb.ColumnFamilyHandle;
+
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Target for a single manual compaction call. */
+class CompactionTask {
+    final int level;
+    final List<String> files;
+    final ColumnFamilyHandle columnFamilyHandle;
+
+    CompactionTask(int level, List<String> files, ColumnFamilyHandle 
columnFamilyHandle) {
+        checkArgument(!files.isEmpty());
+        checkArgument(level >= 0);
+        this.level = level;
+        this.files = checkNotNull(files);
+        this.columnFamilyHandle = checkNotNull(columnFamilyHandle);
+    }
+
+    @Override
+    public String toString() {
+        return "CompactionTask{"
+                + "level="
+                + level
+                + ", files="
+                + files
+                + ", columnFamily="
+                + columnFamilyHandle
+                + '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        CompactionTask that = (CompactionTask) o;
+        return level == that.level
+                && Objects.equals(files, that.files)
+                && Objects.equals(columnFamilyHandle, that.columnFamilyHandle);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(level, files, columnFamilyHandle);
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionTaskProducer.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionTaskProducer.java
new file mode 100644
index 00000000000..d4ef5ddfda2
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionTaskProducer.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.sstmerge;
+
+import 
org.apache.flink.shaded.guava31.com.google.common.primitives.UnsignedBytes;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.LiveFileMetaData;
+import org.rocksdb.RocksDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * Checks live RocksDB SST files for merging (compacting) according to the 
{@link
+ * RocksDBManualCompactionOptions}. The result is a list of {@link 
CompactionTask}s.
+ *
+ * <p>SSTs from all the column families are scanned at once using {@link
+ * RocksDB#getLiveFilesMetaData()}. However, each task only contains SSTs of 
the same level of the
+ * same column family.
+ *
+ * <p>At most {@link RocksDBManualCompactionConfig#maxManualCompactions} tasks 
are chosen from the
+ * candidates with the highest number of files.
+ */
+class CompactionTaskProducer {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CompactionTaskProducer.class);
+
+    private static final Comparator<SstFileMetaData> SST_COMPARATOR =
+            (o1, o2) -> {
+                Comparator<byte[]> cmp = 
UnsignedBytes.lexicographicalComparator();
+                int cfCmp = cmp.compare(o1.columnFamilyName(), 
o2.columnFamilyName());
+                if (cfCmp != 0) {
+                    return cfCmp;
+                } else {
+                    return cmp.compare(o1.smallestKey(), o2.smallestKey());
+                }
+            };
+
+    private final RocksDBManualCompactionConfig settings;
+    private final ColumnFamilyLookup columnFamilyLookup;
+    private final Supplier<List<SstFileMetaData>> metadataSupplier;
+
+    CompactionTaskProducer(
+            RocksDB db,
+            RocksDBManualCompactionConfig settings,
+            ColumnFamilyLookup columnFamilyLookup) {
+        this(
+                () -> SstFileMetaData.mapFrom(db.getLiveFilesMetaData()),
+                settings,
+                columnFamilyLookup);
+    }
+
+    CompactionTaskProducer(
+            Supplier<List<SstFileMetaData>> metadataSupplier,
+            RocksDBManualCompactionConfig settings,
+            ColumnFamilyLookup columnFamilyLookup) {
+        this.settings = settings;
+        this.columnFamilyLookup = columnFamilyLookup;
+        this.metadataSupplier = metadataSupplier;
+    }
+
+    public List<CompactionTask> produce() {
+
+        // get all CF files sorted by key range start (L1+)
+        List<SstFileMetaData> sstSortedByCfAndStartingKeys =
+                metadataSupplier.get().stream()
+                        .filter(l -> l.level() > 0) // let RocksDB deal with L0
+                        .sorted(SST_COMPARATOR)
+                        .collect(Collectors.toList());
+        LOG.trace("Input files: {}", sstSortedByCfAndStartingKeys.size());
+
+        List<CompactionTask> tasks = 
groupIntoTasks(sstSortedByCfAndStartingKeys);
+        tasks.sort(Comparator.<CompactionTask>comparingInt(t -> 
t.files.size()).reversed());
+        return tasks.subList(0, Math.min(tasks.size(), 
settings.maxManualCompactions));
+    }
+
+    private List<CompactionTask> groupIntoTasks(List<SstFileMetaData> files) {
+        // collect the files which won't be compacted by RocksDB
+        // and won't cause any problems if merged - i.e. don't overlap with 
any known files on
+        // the other levels
+        List<CompactionTask> tasks = new ArrayList<>();
+        List<SstFileMetaData> group = new ArrayList<>();
+        SstFileMetaData prevFile = null;
+        long compactionOutputSize = 0;
+
+        for (SstFileMetaData file : files) {
+            final boolean compact = shouldCompact(file);
+            final boolean newGroup =
+                    !compact || !sameGroup(file, prevFile, group, 
compactionOutputSize);
+            if (newGroup) {
+                createTask(group).ifPresent(tasks::add);
+                group.clear();
+                compactionOutputSize = 0;
+            }
+            if (compact) {
+                group.add(file);
+                compactionOutputSize += file.size();
+            }
+            LOG.trace(
+                    "Processed SST file: {}, level={}, cf: {}, being 
compacted={}, compact: {}, change group: {}, prev level={}",
+                    file.fileName(),
+                    file.level(),
+                    file.columnFamilyName(),
+                    file.beingCompacted(),
+                    compact,
+                    newGroup,
+                    prevFile == null ? -1 : prevFile.level());
+            prevFile = file;
+        }
+        createTask(group).ifPresent(tasks::add);
+        return tasks;
+    }
+
+    private Optional<CompactionTask> createTask(List<SstFileMetaData> 
compaction) {
+        if (compaction.size() < settings.minFilesToCompact) {
+            return Optional.empty();
+        }
+        SstFileMetaData head = compaction.iterator().next();
+        ColumnFamilyHandle cf = 
columnFamilyLookup.get(head.columnFamilyName());
+        if (cf == null) {
+            LOG.warn("Unknown column family: {}", head.columnFamilyName);
+            return Optional.empty();
+        }
+        List<String> fileNames =
+                
compaction.stream().map(SstFileMetaData::fileName).collect(Collectors.toList());
+        return Optional.of(new CompactionTask(head.level(), fileNames, cf));
+    }
+
+    private boolean sameGroup(
+            SstFileMetaData file,
+            SstFileMetaData prevFile,
+            List<SstFileMetaData> group,
+            long compactionOutputSize) {
+        if (prevFile == null) {
+            return true;
+        }
+        return (file.level() == prevFile.level())
+                && Arrays.equals(file.columnFamilyName(), 
prevFile.columnFamilyName())
+                && compactionOutputSize + file.size() <= 
settings.maxOutputFileSize.getBytes()
+                && group.size() < settings.maxFilesToCompact;
+    }
+
+    private boolean shouldCompact(SstFileMetaData file) {
+        return file.size() <= settings.maxFileSizeToCompact.getBytes() && 
!file.beingCompacted();
+    }
+
+    static class SstFileMetaData {
+
+        private final byte[] columnFamilyName;
+        private final String fileName;
+        private final int level;
+        private final long size;
+        private final byte[] smallestKey;
+        private final boolean beingCompacted;
+
+        public SstFileMetaData(
+                byte[] columnFamilyName,
+                String fileName,
+                int level,
+                long size,
+                byte[] smallestKey,
+                boolean beingCompacted) {
+            this.columnFamilyName = columnFamilyName;
+            this.fileName = fileName;
+            this.level = level;
+            this.size = size;
+            this.smallestKey = smallestKey;
+            this.beingCompacted = beingCompacted;
+        }
+
+        public String fileName() {
+            return fileName;
+        }
+
+        public byte[] columnFamilyName() {
+            return columnFamilyName;
+        }
+
+        public int level() {
+            return level;
+        }
+
+        public long size() {
+            return size;
+        }
+
+        public byte[] smallestKey() {
+            return smallestKey;
+        }
+
+        public boolean beingCompacted() {
+            return beingCompacted;
+        }
+
+        static List<SstFileMetaData> mapFrom(List<LiveFileMetaData> list) {
+            return list.stream()
+                    .map(SstFileMetaData::fromLiveFileMetaData)
+                    .collect(Collectors.toList());
+        }
+
+        static SstFileMetaData fromLiveFileMetaData(LiveFileMetaData 
fileMetaData) {
+            return new SstFileMetaData(
+                    fileMetaData.columnFamilyName(),
+                    fileMetaData.fileName(),
+                    fileMetaData.level(),
+                    fileMetaData.size(),
+                    fileMetaData.smallestKey(),
+                    fileMetaData.beingCompacted());
+        }
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionTracker.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionTracker.java
new file mode 100644
index 00000000000..0b72ac6de11
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionTracker.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.sstmerge;
+
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.function.Function;
+
+/**
+ * Tracks the number of pending/running compactions (manual and automatic) and 
the DB status. Used
+ * concurrently by different compaction threads and by SST scanning threads.
+ */
+@ThreadSafe
+class CompactionTracker {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CompactionTracker.class);
+
+    private final Function<ColumnFamilyHandle, Long> runningAutoCompactions;
+    private final int maxManualCompactions;
+    private final int maxAutoCompactions;
+    private int pendingManualCompactions;
+    private int runningManualCompactions;
+    private boolean isShuttingDown;
+
+    public CompactionTracker(
+            RocksDBManualCompactionConfig settings,
+            Function<ColumnFamilyHandle, Long> runningAutoCompactions) {
+        this.maxManualCompactions = settings.maxManualCompactions;
+        this.maxAutoCompactions = settings.maxAutoCompactions;
+        this.runningAutoCompactions = runningAutoCompactions;
+        this.isShuttingDown = false;
+    }
+
+    private synchronized void complete() {
+        runningManualCompactions--;
+    }
+
+    private synchronized void cancel() {
+        pendingManualCompactions--;
+    }
+
+    private synchronized boolean tryStart(ColumnFamilyHandle cf) {
+        if (runningManualCompactions >= maxManualCompactions) {
+            return false;
+        }
+        if (isShuttingDown()) {
+            return false;
+        }
+        if (runningAutoCompactions.apply(cf) >= maxAutoCompactions) {
+            return false;
+        }
+        // all good
+        pendingManualCompactions--;
+        runningManualCompactions++;
+        return true;
+    }
+
+    private synchronized void runIfNoManualCompactions(Runnable runnable) {
+        if (!haveManualCompactions()) {
+            runnable.run();
+        }
+    }
+
+    public synchronized boolean haveManualCompactions() {
+        return runningManualCompactions > 0 || pendingManualCompactions > 0;
+    }
+
+    public synchronized boolean isShuttingDown() {
+        return isShuttingDown;
+    }
+
+    public synchronized void close() {
+        isShuttingDown = true;
+    }
+
+    @Override
+    public String toString() {
+        return "CompactionTracker{"
+                + "maxManualCompactions="
+                + maxManualCompactions
+                + ", maxAutoCompactions="
+                + maxAutoCompactions
+                + ", pendingManualCompactions="
+                + pendingManualCompactions
+                + ", runningManualCompactions="
+                + runningManualCompactions
+                + ", isShuttingDown="
+                + isShuttingDown
+                + '}';
+    }
+
+    void runWithTracking(
+            ColumnFamilyHandle columnFamily,
+            RunnableWithException compaction,
+            Runnable lastCompactionPostAction) {
+        if (tryStart(columnFamily)) {
+            try {
+                compaction.run();
+            } catch (Exception e) {
+                LOG.warn("Unable to compact {} (concurrent compaction?)", 
compaction, e);
+            }
+            complete();
+        } else {
+            // drop this task - new will be created with a fresh set of files
+            cancel();
+        }
+        // we were the last manual compaction - schedule the scan
+        runIfNoManualCompactions(lastCompactionPostAction);
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/Compactor.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/Compactor.java
new file mode 100644
index 00000000000..8471182e759
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/Compactor.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.sstmerge;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.CompactionJobInfo;
+import org.rocksdb.CompactionOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Compacts multiple RocksDB SST files using {@link 
RocksDB#compactFiles(CompactionOptions,
+ * ColumnFamilyHandle, List, int, int, CompactionJobInfo) 
RocksDB#compactFiles} into the last level.
+ * Usually this results in a single SST file if it doesn't exceed RocksDB 
target output file size
+ * for that level.
+ */
+class Compactor {
+    private static final Logger LOG = LoggerFactory.getLogger(Compactor.class);
+    private static final int OUTPUT_PATH_ID = 0; // just use the first one
+
+    private final CompactionTarget db;
+    private final long targetOutputFileSize;
+
+    public Compactor(RocksDB db, long targetOutputFileSize) {
+        this(db::compactFiles, targetOutputFileSize);
+    }
+
+    public Compactor(CompactionTarget target, long targetOutputFileSize) {
+        this.db = target;
+        this.targetOutputFileSize = targetOutputFileSize;
+    }
+
+    void compact(ColumnFamilyHandle cfName, int level, List<String> files) 
throws RocksDBException {
+        int outputLevel = Math.min(level + 1, 
cfName.getDescriptor().getOptions().numLevels() - 1);
+        LOG.debug(
+                "Manually compacting {} files from level {} to {}: {}",
+                files.size(),
+                level,
+                outputLevel,
+                files);
+        try (CompactionOptions options =
+                        new 
CompactionOptions().setOutputFileSizeLimit(targetOutputFileSize);
+                CompactionJobInfo compactionJobInfo = new CompactionJobInfo()) 
{
+            db.compactFiles(options, cfName, files, outputLevel, 
OUTPUT_PATH_ID, compactionJobInfo);
+        }
+    }
+
+    public interface CompactionTarget {
+        void compactFiles(
+                CompactionOptions var1,
+                ColumnFamilyHandle var2,
+                List<String> var3,
+                int var4,
+                int var5,
+                CompactionJobInfo var6)
+                throws RocksDBException;
+
+        CompactionTarget NO_OP = (var1, var2, var3, var4, var5, var6) -> {};
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionConfig.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionConfig.java
new file mode 100644
index 00000000000..d4ac6403f6d
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionConfig.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.sstmerge;
+
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ReadableConfig;
+
+import java.io.Serializable;
+
+/** Configuration for {@link RocksDBManualCompactionManager}. */
+public class RocksDBManualCompactionConfig implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    public final long minInterval; // this small value is for testing, 
prod=10_000 ?
+    public final int maxManualCompactions;
+    public final MemorySize maxFileSizeToCompact;
+    public final int minFilesToCompact; // in a single compaction
+    public final int maxFilesToCompact; // in a single compaction
+    public final MemorySize maxOutputFileSize;
+    public final int maxAutoCompactions;
+
+    public RocksDBManualCompactionConfig(
+            long periodMs,
+            int maxManualCompactions,
+            MemorySize maxFileSizeToCompact,
+            int minFilesToCompact,
+            int maxFilesToCompact,
+            MemorySize maxOutputFileSize,
+            int maxAutoCompactions) {
+        this.minInterval = periodMs;
+        this.maxManualCompactions = maxManualCompactions;
+        this.maxFileSizeToCompact = maxFileSizeToCompact;
+        this.minFilesToCompact = minFilesToCompact;
+        this.maxFilesToCompact = maxFilesToCompact;
+        this.maxOutputFileSize = maxOutputFileSize;
+        this.maxAutoCompactions = maxAutoCompactions;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static RocksDBManualCompactionConfig from(ReadableConfig config) {
+        return builder()
+                
.setMinInterval(config.get(RocksDBManualCompactionOptions.MIN_INTERVAL).toMillis())
+                .setMaxParallelCompactions(
+                        
config.get(RocksDBManualCompactionOptions.MAX_PARALLEL_COMPACTIONS))
+                .setMaxFileSizeToCompact(
+                        
config.get(RocksDBManualCompactionOptions.MAX_FILE_SIZE_TO_COMPACT))
+                .setMaxFilesToCompact(
+                        
config.get(RocksDBManualCompactionOptions.MAX_FILES_TO_COMPACT))
+                .setMinFilesToCompact(
+                        
config.get(RocksDBManualCompactionOptions.MIN_FILES_TO_COMPACT))
+                .setMaxOutputFileSize(
+                        
config.get(RocksDBManualCompactionOptions.MAX_OUTPUT_FILE_SIZE))
+                .setMaxAutoCompactions(
+                        
config.get(RocksDBManualCompactionOptions.MAX_AUTO_COMPACTIONS))
+                .build();
+    }
+
+    public static RocksDBManualCompactionConfig getDefault() {
+        return builder().build();
+    }
+
+    @Override
+    public String toString() {
+        return "RocksDBManualCompactionConfig{"
+                + "minInterval="
+                + minInterval
+                + ", maxManualCompactions="
+                + maxManualCompactions
+                + ", maxFileSizeToCompact="
+                + maxFileSizeToCompact
+                + ", minFilesToCompact="
+                + minFilesToCompact
+                + ", maxFilesToCompact="
+                + maxFilesToCompact
+                + ", maxOutputFileSize="
+                + maxOutputFileSize
+                + ", maxAutoCompactions="
+                + maxAutoCompactions
+                + '}';
+    }
+
+    /**
+     * Builder for {@link
+     * 
org.apache.flink.contrib.streaming.state.sstmerge.RocksDBManualCompactionConfig}.
+     */
+    public static class Builder {
+        private long minInterval =
+                
RocksDBManualCompactionOptions.MIN_INTERVAL.defaultValue().toMillis();
+        private int maxParallelCompactions =
+                
RocksDBManualCompactionOptions.MAX_PARALLEL_COMPACTIONS.defaultValue();
+        private MemorySize maxFileSizeToCompact =
+                
RocksDBManualCompactionOptions.MAX_FILE_SIZE_TO_COMPACT.defaultValue();
+        private int minFilesToCompact =
+                
RocksDBManualCompactionOptions.MIN_FILES_TO_COMPACT.defaultValue();
+        private int maxFilesToCompact =
+                
RocksDBManualCompactionOptions.MAX_FILES_TO_COMPACT.defaultValue();
+        private MemorySize maxOutputFileSize =
+                
RocksDBManualCompactionOptions.MAX_OUTPUT_FILE_SIZE.defaultValue();
+        private int maxAutoCompactions =
+                
RocksDBManualCompactionOptions.MAX_AUTO_COMPACTIONS.defaultValue();
+
+        public Builder setMinInterval(long minInterval) {
+            this.minInterval = minInterval;
+            return this;
+        }
+
+        public Builder setMaxParallelCompactions(int maxParallelCompactions) {
+            this.maxParallelCompactions = maxParallelCompactions;
+            return this;
+        }
+
+        public Builder setMaxFileSizeToCompact(MemorySize 
maxFileSizeToCompact) {
+            this.maxFileSizeToCompact = maxFileSizeToCompact;
+            return this;
+        }
+
+        public Builder setMinFilesToCompact(int minFilesToCompact) {
+            this.minFilesToCompact = minFilesToCompact;
+            return this;
+        }
+
+        public Builder setMaxFilesToCompact(int maxFilesToCompact) {
+            this.maxFilesToCompact = maxFilesToCompact;
+            return this;
+        }
+
+        public Builder setMaxOutputFileSize(MemorySize maxOutputFileSize) {
+            this.maxOutputFileSize = maxOutputFileSize;
+            return this;
+        }
+
+        public Builder setMaxAutoCompactions(int maxAutoCompactions) {
+            this.maxAutoCompactions = maxAutoCompactions;
+            return this;
+        }
+
+        public RocksDBManualCompactionConfig build() {
+            return new RocksDBManualCompactionConfig(
+                    minInterval,
+                    maxParallelCompactions,
+                    maxFileSizeToCompact,
+                    minFilesToCompact,
+                    maxFilesToCompact,
+                    maxOutputFileSize,
+                    maxAutoCompactions);
+        }
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionManager.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionManager.java
new file mode 100644
index 00000000000..6eac9e55615
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionManager.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.sstmerge;
+
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+
+import org.rocksdb.RocksDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Manages compactions of small and disjoint RocksDB SST files that otherwise 
would not be merged to
+ * reduce write amplification.
+ *
+ * <p>Such files are usually small and are inlined into the Checkpoint 
metadata. Which might lead to
+ * exceeding RPC message size on checkpoint ACK or recovery.
+ *
+ * <p>This class manages compactions of one or more Column Families of a 
single RocksDB instance.
+ *
+ * <p>Note that "manual" means that the compactions are <b>requested</b> 
manually (by Flink), but
+ * they are still executed by RocksDB.
+ */
+public interface RocksDBManualCompactionManager extends AutoCloseable {
+    Logger LOG = LoggerFactory.getLogger(RocksDBManualCompactionManager.class);
+
+    static RocksDBManualCompactionManager create(
+            RocksDB db, RocksDBManualCompactionConfig settings, 
ExecutorService ioExecutor) {
+        LOG.info("Creating RocksDBManualCompactionManager with settings: {}", 
settings);
+        return settings.minInterval <= 0
+                ? NO_OP
+                : new RocksDBManualCompactionManagerImpl(db, settings, 
ioExecutor);
+    }
+
+    RocksDBManualCompactionManager NO_OP =
+            new RocksDBManualCompactionManager() {
+                @Override
+                public void 
register(RocksDBKeyedStateBackend.RocksDbKvStateInfo stateInfo) {}
+
+                @Override
+                public void close() {}
+
+                @Override
+                public void start() {}
+            };
+
+    void register(RocksDBKeyedStateBackend.RocksDbKvStateInfo stateInfo);
+
+    @Override
+    void close() throws Exception;
+
+    void start();
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionManagerImpl.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionManagerImpl.java
new file mode 100644
index 00000000000..ebbae150a70
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionManagerImpl.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.sstmerge;
+
+import 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
+import org.apache.flink.contrib.streaming.state.RocksDBProperty;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutorService;
+
+/** Default implementation of {@link RocksDBManualCompactionManager}. */
+class RocksDBManualCompactionManagerImpl implements 
RocksDBManualCompactionManager {
+    private static final Logger LOG =
+            LoggerFactory.getLogger(RocksDBManualCompactionManagerImpl.class);
+
+    private final ColumnFamilyLookup lookup;
+    private final CompactionScheduler scheduler;
+    private final CompactionTracker tracker;
+
+    public RocksDBManualCompactionManagerImpl(
+            RocksDB db, RocksDBManualCompactionConfig settings, 
ExecutorService ioExecutor) {
+        this.lookup = new ColumnFamilyLookup();
+        this.tracker = new CompactionTracker(settings, cf -> 
getNumAutoCompactions(db, cf));
+        this.scheduler =
+                new CompactionScheduler(
+                        settings,
+                        ioExecutor,
+                        new CompactionTaskProducer(db, settings, lookup),
+                        new Compactor(db, 
settings.maxOutputFileSize.getBytes()),
+                        tracker);
+    }
+
+    @Override
+    public void start() {
+        scheduler.start();
+    }
+
+    @Override
+    public void register(RocksDbKvStateInfo stateInfo) {
+        LOG.debug("Register state for manual compactions: '{}'", 
stateInfo.metaInfo.getName());
+        lookup.add(stateInfo.columnFamilyHandle);
+    }
+
+    @Override
+    public void close() throws Exception {
+        LOG.info("Stopping RocksDBManualCompactionManager");
+        tracker.close();
+        try {
+            scheduler.stop();
+        } catch (Exception e) {
+            LOG.warn("Unable to stop compaction scheduler {}", scheduler, e);
+        }
+    }
+
+    private static long getNumAutoCompactions(RocksDB db, ColumnFamilyHandle 
columnFamily) {
+        try {
+            return db.getLongProperty(
+                    columnFamily, 
RocksDBProperty.NumRunningCompactions.getRocksDBProperty());
+        } catch (RocksDBException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionOptions.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionOptions.java
new file mode 100644
index 00000000000..a48c63bf6f5
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionOptions.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.sstmerge;
+
+import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.MemorySize;
+
+import java.time.Duration;
+
+/** Configuration options for manual compaction for the RocksDB backend. */
+public class RocksDBManualCompactionOptions {
+
+    @Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB)
+    public static final ConfigOption<Duration> MIN_INTERVAL =
+            
ConfigOptions.key("state.backend.rocksdb.manual-compaction.min-interval")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(0))
+                    .withDescription(
+                            "The minimum interval between manual compactions. 
Zero disables manual compactions");
+
+    @Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB)
+    public static final ConfigOption<Integer> MAX_PARALLEL_COMPACTIONS =
+            
ConfigOptions.key("state.backend.rocksdb.manual-compaction.max-parallel-compactions")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "The maximum number of manual compactions to 
start."
+                                    + "Note that only one of them can run at a 
time as of v8.10.0; all the others will be waiting");
+
+    @Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB)
+    public static final ConfigOption<MemorySize> MAX_FILE_SIZE_TO_COMPACT =
+            
ConfigOptions.key("state.backend.rocksdb.manual-compaction.max-file-size-to-compact")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("50k"))
+                    .withDescription("The maximum size of individual input 
files");
+
+    @Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB)
+    public static final ConfigOption<Integer> MIN_FILES_TO_COMPACT =
+            
ConfigOptions.key("state.backend.rocksdb.manual-compaction.min-files-to-compact")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "The minimum number of input files to compact 
together in a single compaction run");
+
+    @Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB)
+    public static final ConfigOption<Integer> MAX_FILES_TO_COMPACT =
+            
ConfigOptions.key("state.backend.rocksdb.manual-compaction.max-files-to-compact")
+                    .intType()
+                    .defaultValue(30)
+                    .withDescription(
+                            "The maximum number of input files to compact 
together in a single compaction run");
+
+    @Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB)
+    public static final ConfigOption<MemorySize> MAX_OUTPUT_FILE_SIZE =
+            
ConfigOptions.key("state.backend.rocksdb.manual-compaction.max-output-file-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("64Mb"))
+                    .withDescription("The maximum output file size");
+
+    @Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB)
+    public static final ConfigOption<Integer> MAX_AUTO_COMPACTIONS =
+            
ConfigOptions.key("state.backend.rocksdb.manual-compaction.max-auto-compactions")
+                    .intType()
+                    .defaultValue(30)
+                    .withDescription(
+                            "The maximum number of automatic compactions 
running for manual compaction to start."
+                                    + "If the actual number is higher, manual 
compaction won't be started to avoid delaying automatic ones.");
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
index 41773e4be28..d58f388ab44 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
@@ -29,11 +29,13 @@ import 
org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.contrib.streaming.state.sstmerge.RocksDBManualCompactionOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
 import org.apache.flink.runtime.state.ConfigurableStateBackend;
 import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
@@ -82,6 +84,7 @@ import org.rocksdb.Snapshot;
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -202,6 +205,7 @@ public class EmbeddedRocksDBStateBackendTest
         configuration.set(
                 RocksDBOptions.TIMER_SERVICE_FACTORY,
                 EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
+        configuration.set(RocksDBManualCompactionOptions.MIN_INTERVAL, 
Duration.ofMillis(1));
         return configuration;
     }
 
@@ -707,6 +711,55 @@ public class EmbeddedRocksDBStateBackendTest
         
assertEquals(testConfig.getOptional(option).orElse(!option.defaultValue()), 
value);
     }
 
+    /** Test that most of the non-overlapping small SST files are eventually 
merged. */
+    @TestTemplate
+    public void testSmallFilesCompaction() throws Exception {
+        ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", 
String.class);
+        SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistryImpl();
+        CheckpointStreamFactory streamFactory = createStreamFactory();
+
+        final KeyGroupRange range = KeyGroupRange.of(0, 49);
+        double expectedNumSstFiles = range.getNumberOfKeyGroups() * .5;
+        final CheckpointableKeyedStateBackend<Integer> backend =
+                createKeyedBackend(
+                        IntSerializer.INSTANCE,
+                        range.getEndKeyGroup() - range.getStartKeyGroup() + 1,
+                        range,
+                        env);
+        try {
+            ValueState<String> state =
+                    backend.getPartitionedState(
+                            VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
+
+            for (int i = range.getStartKeyGroup(); i < range.getEndKeyGroup(); 
i++) {
+                backend.setCurrentKey(i);
+                state.update(Integer.toString(i));
+                // snapshot to force flushing memtables to disk and create a 
new SST file
+                runSnapshot(
+                        backend.snapshot(
+                                i, // checkpoint id
+                                i, // timestamp
+                                streamFactory,
+                                
CheckpointOptions.forCheckpointWithDefaultLocation()),
+                        sharedStateRegistry);
+            }
+
+            // expect files under dpPath: job_123_op_456/db/*.sst
+            File sstPath = new File(dbPath).listFiles()[0].listFiles()[0];
+
+            int length = sstPath.listFiles((dir, name) -> 
name.endsWith(".sst")).length;
+            assertThat(length)
+                    .isLessThanOrEqualTo((int) expectedNumSstFiles)
+                    .withFailMessage("actual: " + length + ", expected: " + 
expectedNumSstFiles);
+
+        } finally {
+            IOUtils.closeQuietly(backend);
+            backend.dispose();
+        }
+        // allow some time for the background compaction to fail hard by 
calling closed db
+        Thread.sleep(100);
+    }
+
     private void runStateUpdates() throws Exception {
         for (int i = 50; i < 150; ++i) {
             if (i % 10 == 0) {
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionSchedulerTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionSchedulerTest.java
new file mode 100644
index 00000000000..d5281f60a31
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionSchedulerTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.sstmerge;
+
+import 
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+
+class CompactionSchedulerTest {
+    @Test
+    void testClose() throws InterruptedException {
+        RocksDBManualCompactionConfig config = 
RocksDBManualCompactionConfig.getDefault();
+        ManuallyTriggeredScheduledExecutorService scheduledExecutor =
+                new ManuallyTriggeredScheduledExecutorService();
+        ManuallyTriggeredScheduledExecutorService ioExecutor =
+                new ManuallyTriggeredScheduledExecutorService();
+        CompactionScheduler compactionScheduler =
+                new CompactionScheduler(
+                        config,
+                        ioExecutor,
+                        new CompactionTaskProducer(
+                                Collections::emptyList, config, new 
ColumnFamilyLookup()),
+                        new Compactor(Compactor.CompactionTarget.NO_OP, 1L),
+                        new CompactionTracker(config, ign -> 0L),
+                        scheduledExecutor);
+        compactionScheduler.start();
+        scheduledExecutor.triggerScheduledTasks();
+        compactionScheduler.stop();
+        ioExecutor.triggerAll(); // should not fail e.g. because 
compactionScheduler was stopped
+        ioExecutor.shutdown();
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionTaskProducerTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionTaskProducerTest.java
new file mode 100644
index 00000000000..200547a9fb6
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionTaskProducerTest.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state.sstmerge;
+
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.contrib.streaming.state.RocksDBExtension;
+import 
org.apache.flink.contrib.streaming.state.sstmerge.CompactionTaskProducer.SstFileMetaData;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static java.util.Collections.singletonList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** {@link CompactionTaskProducer} test. */
+class CompactionTaskProducerTest {
+    @RegisterExtension public RocksDBExtension rocksDBExtension = new 
RocksDBExtension();
+    private static final Random RANDOM = new Random();
+
+    private ColumnFamilyLookup defaultCfLookup;
+
+    @BeforeEach
+    public void beforeEach() {
+        defaultCfLookup = new ColumnFamilyLookup();
+        defaultCfLookup.add(rocksDBExtension.getDefaultColumnFamily());
+    }
+
+    @Test
+    void testEmpty() {
+        assertThat(produce(configBuilder().build())).isEmpty();
+    }
+
+    @Test
+    void testSingleFile() {
+        assertThat(produce(configBuilder().build(), 
sstBuilder().build())).isNotEmpty();
+    }
+
+    @Test
+    void testMinFilesToCompact() {
+        assertThat(produce(configBuilder().setMinFilesToCompact(2).build(), 
sstBuilder().build()))
+                .isEmpty();
+    }
+
+    @Test
+    void testMaxFilesToCompact() {
+        assertThat(
+                        produce(
+                                
configBuilder().setMaxFilesToCompact(1).build(),
+                                sstBuilder().build(),
+                                sstBuilder().build()))
+                .hasSize(2);
+    }
+
+    @Test
+    void testMaxParallelCompactions() {
+        assertThat(
+                        produce(
+                                configBuilder()
+                                        .setMaxFilesToCompact(1)
+                                        .setMaxParallelCompactions(2)
+                                        .build(),
+                                sstBuilder().build(),
+                                sstBuilder().build(),
+                                sstBuilder().build(),
+                                sstBuilder().build()))
+                .hasSize(2);
+    }
+
+    @Test
+    void testMaxFileSizeToCompact() {
+        assertThat(
+                        produce(
+                                configBuilder().setMaxFileSizeToCompact(new 
MemorySize(1)).build(),
+                                sstBuilder().build()))
+                .isEmpty();
+    }
+
+    @Test
+    void testMaxOutputFileSize() {
+        final int numFiles = 5;
+        final int fileSize = 10;
+        final long totalSize = fileSize * numFiles;
+        assertThat(
+                        produce(
+                                configBuilder()
+                                        .setMaxOutputFileSize(new 
MemorySize(totalSize - 1))
+                                        .build(),
+                                buildSstFiles(1, fileSize, numFiles)))
+                .hasSize(2);
+    }
+
+    @Test
+    void testGrouping() {
+        int level = 3;
+        SstFileMetaData[] files = buildSstFiles(level, 1024, 10);
+        assertThat(produce(configBuilder().build(), files))
+                .hasSameElementsAs(singletonList(createTask(level, files)));
+    }
+
+    @Test
+    void testGroupingWithGap() {
+        SstFileMetaData sst0 = 
sstBuilder().setLevel(1).setSmallestKey("0".getBytes()).build();
+        SstFileMetaData sst1 = 
sstBuilder().setLevel(1).setSmallestKey("1".getBytes()).build();
+        SstFileMetaData sst2 =
+                sstBuilder()
+                        .setLevel(2)
+                        .setSmallestKey("2".getBytes())
+                        .setBeingCompacted(true)
+                        .build();
+        SstFileMetaData sst3 = 
sstBuilder().setLevel(3).setSmallestKey("3".getBytes()).build();
+        assertThat(produce(configBuilder().build(), sst0, sst1, sst2, sst3))
+                .hasSameElementsAs(
+                        Arrays.asList(
+                                createTask(sst0.level(), sst0, sst1),
+                                createTask(sst3.level(), sst3)));
+    }
+
+    @Test
+    void testNotGroupingOnDifferentLevels() {
+        SstFileMetaData sst1 = sstBuilder().setLevel(1).build();
+        SstFileMetaData sst2 = sstBuilder().setLevel(2).build();
+        assertThat(produce(configBuilder().build(), sst1, sst2)).hasSize(2);
+    }
+
+    @Test
+    void testSkipBeingCompacted() {
+        assertThat(produce(configBuilder().build(), 
sstBuilder().setBeingCompacted(true).build()))
+                .isEmpty();
+    }
+
+    @Test
+    void testSkipZeroLevel() {
+        assertThat(produce(configBuilder().build(), 
sstBuilder().setLevel(0).build())).isEmpty();
+    }
+
+    @Test
+    void testNotGroupingDifferentColumnFamilies() {
+        ColumnFamilyHandle cf1 = rocksDBExtension.createNewColumnFamily("cf1");
+        defaultCfLookup.add(cf1);
+        ColumnFamilyHandle cf2 = rocksDBExtension.createNewColumnFamily("cf2");
+        defaultCfLookup.add(cf2);
+        assertThat(
+                        produce(
+                                configBuilder().build(),
+                                sstBuilder().setColumnFamily(cf1).build(),
+                                sstBuilder().setColumnFamily(cf2).build()))
+                .hasSize(2);
+    }
+
+    //////////////////
+    // utility methods
+
+    static class SstFileMetaDataBuilder {
+        private byte[] columnFamilyName;
+        private String fileName;
+        private int level;
+        private long size;
+        private byte[] smallestKey;
+        private boolean beingCompacted;
+
+        public SstFileMetaDataBuilder(ColumnFamilyHandle columnFamily) {
+            try {
+                this.columnFamilyName = columnFamily.getName();
+            } catch (RocksDBException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public SstFileMetaDataBuilder setColumnFamily(ColumnFamilyHandle 
columnFamily) {
+            try {
+                this.columnFamilyName = columnFamily.getName();
+            } catch (RocksDBException e) {
+                throw new RuntimeException(e);
+            }
+            return this;
+        }
+
+        public SstFileMetaDataBuilder setFileName(String fileName) {
+            this.fileName = fileName;
+            return this;
+        }
+
+        public SstFileMetaDataBuilder setLevel(int level) {
+            this.level = level;
+            return this;
+        }
+
+        public SstFileMetaDataBuilder setSize(long size) {
+            this.size = size;
+            return this;
+        }
+
+        public SstFileMetaDataBuilder setSmallestKey(byte[] smallestKey) {
+            this.smallestKey = smallestKey;
+            return this;
+        }
+
+        public SstFileMetaDataBuilder setBeingCompacted(boolean 
beingCompacted) {
+            this.beingCompacted = beingCompacted;
+            return this;
+        }
+
+        public SstFileMetaData build() {
+            return new SstFileMetaData(
+                    columnFamilyName, fileName, level, size, smallestKey, 
beingCompacted);
+        }
+    }
+
+    private List<CompactionTask> produce(
+            RocksDBManualCompactionConfig config, SstFileMetaData... sst) {
+        return new CompactionTaskProducer(() -> Arrays.asList(sst), config, 
defaultCfLookup)
+                .produce();
+    }
+
+    private static RocksDBManualCompactionConfig.Builder configBuilder() {
+        return RocksDBManualCompactionConfig.builder()
+                .setMaxFilesToCompact(Integer.MAX_VALUE)
+                .setMaxAutoCompactions(Integer.MAX_VALUE)
+                .setMaxParallelCompactions(Integer.MAX_VALUE)
+                .setMaxOutputFileSize(MemorySize.MAX_VALUE)
+                .setMinFilesToCompact(1)
+                .setMinInterval(1L);
+    }
+
+    private SstFileMetaDataBuilder sstBuilder() {
+        byte[] bytes = new byte[128];
+        RANDOM.nextBytes(bytes);
+        return new 
SstFileMetaDataBuilder(rocksDBExtension.getDefaultColumnFamily())
+                .setFileName(RANDOM.nextInt() + ".sst")
+                .setLevel(1)
+                .setSize(4)
+                .setSmallestKey(bytes);
+    }
+
+    private SstFileMetaData[] buildSstFiles(int level, int fileSize, int 
numFiles) {
+        return IntStream.range(0, numFiles)
+                .mapToObj(
+                        i ->
+                                sstBuilder()
+                                        .setSmallestKey(new byte[] {(byte) i})
+                                        .setLevel(level)
+                                        .setSize(fileSize)
+                                        .build())
+                .toArray(SstFileMetaData[]::new);
+    }
+
+    private CompactionTask createTask(int level, SstFileMetaData... files) {
+        return new CompactionTask(
+                level,
+                
Arrays.stream(files).map(SstFileMetaData::fileName).collect(Collectors.toList()),
+                rocksDBExtension.getDefaultColumnFamily());
+    }
+}
diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java
index 638b2f316bd..cc0d0bc3ed2 100644
--- 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java
+++ 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java
@@ -32,6 +32,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -349,6 +350,9 @@ public class ManuallyTriggeredScheduledExecutorService 
implements ScheduledExecu
         final ScheduledTask<V> scheduledTask =
                 new ScheduledTask<>(callable, unit.convert(delay, 
TimeUnit.MILLISECONDS));
 
+        if (shutdown) {
+            throw new RejectedExecutionException();
+        }
         nonPeriodicScheduledTasks.offer(scheduledTask);
 
         return scheduledTask;

Reply via email to