This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new cd722033fb3 [FLINK-26050] Manually compact small SST files cd722033fb3 is described below commit cd722033fb326837a80a6233603d12ad176da15c Author: Roman Khachatryan <khachatryan.ro...@gmail.com> AuthorDate: Fri Feb 16 19:42:43 2024 +0100 [FLINK-26050] Manually compact small SST files In some cases, the number of files produced by RocksDB state backend grows indefinitely. This might cause task state info (TDD and checkpoint ACK) to exceed RPC message size and fail recovery/checkpoint in addition to having lots of small files. With this change, such files are merged in the background using RocksDB API. --- .../state/api/runtime/SavepointEnvironment.java | 6 +- .../flink/runtime/io/disk/iomanager/IOManager.java | 10 +- .../runtime/io/disk/iomanager/IOManagerAsync.java | 31 ++- .../runtime/taskexecutor/TaskManagerServices.java | 2 +- .../runtime/io/disk/iomanager/IOManagerTest.java | 3 +- .../flink-statebackend-rocksdb/pom.xml | 5 + .../state/EmbeddedRocksDBStateBackend.java | 20 +- .../streaming/state/RocksDBKeyedStateBackend.java | 14 +- .../state/RocksDBKeyedStateBackendBuilder.java | 33 ++- .../state/RocksDBPriorityQueueSetFactory.java | 7 +- .../state/sstmerge/ColumnFamilyLookup.java | 77 ++++++ .../state/sstmerge/CompactionScheduler.java | 149 +++++++++++ .../streaming/state/sstmerge/CompactionTask.java | 73 ++++++ .../state/sstmerge/CompactionTaskProducer.java | 234 +++++++++++++++++ .../state/sstmerge/CompactionTracker.java | 131 ++++++++++ .../streaming/state/sstmerge/Compactor.java | 80 ++++++ .../sstmerge/RocksDBManualCompactionConfig.java | 167 +++++++++++++ .../sstmerge/RocksDBManualCompactionManager.java | 70 ++++++ .../RocksDBManualCompactionManagerImpl.java | 84 +++++++ .../sstmerge/RocksDBManualCompactionOptions.java | 86 +++++++ .../state/EmbeddedRocksDBStateBackendTest.java | 53 ++++ .../state/sstmerge/CompactionSchedulerTest.java | 50 ++++ .../state/sstmerge/CompactionTaskProducerTest.java | 278 +++++++++++++++++++++ .../ManuallyTriggeredScheduledExecutorService.java | 4 + 24 files changed, 1652 insertions(+), 15 deletions(-) diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java index 00f471f1a7b..42ffb7e6037 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointEnvironment.java @@ -68,6 +68,7 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.UserCodeClassLoader; +import org.apache.flink.util.concurrent.Executors; import java.util.Collections; import java.util.Map; @@ -141,7 +142,10 @@ public class SavepointEnvironment implements Environment { this.registry = new KvStateRegistry().createTaskRegistry(jobID, vertexID); this.taskStateManager = new SavepointTaskStateManager(prioritizedOperatorSubtaskState); - this.ioManager = new IOManagerAsync(ConfigurationUtils.parseTempDirectories(configuration)); + this.ioManager = + new IOManagerAsync( + ConfigurationUtils.parseTempDirectories(configuration), + Executors.newDirectExecutorService()); this.memoryManager = MemoryManager.create(64 * 1024 * 1024, DEFAULT_PAGE_SIZE); this.sharedResources = new SharedResources(); this.accumulatorRegistry = new AccumulatorRegistry(jobID, attemptID); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java index 9c1e9d9c245..d56b122f25c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java @@ -33,6 +33,7 @@ import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.Collectors; @@ -44,6 +45,8 @@ public abstract class IOManager implements AutoCloseable { private final FileChannelManager fileChannelManager; + protected final ExecutorService executorService; + // ------------------------------------------------------------------------- // Constructors / Destructors // ------------------------------------------------------------------------- @@ -53,7 +56,7 @@ public abstract class IOManager implements AutoCloseable { * * @param tempDirs The basic directories for files underlying anonymous channels. */ - protected IOManager(String[] tempDirs) { + protected IOManager(String[] tempDirs, ExecutorService executorService) { this.fileChannelManager = new FileChannelManagerImpl(Preconditions.checkNotNull(tempDirs), DIR_NAME_PREFIX); if (LOG.isInfoEnabled()) { @@ -64,6 +67,7 @@ public abstract class IOManager implements AutoCloseable { .map(File::getAbsolutePath) .collect(Collectors.joining("\n\t"))); } + this.executorService = executorService; } /** Removes all temporary files. */ @@ -226,4 +230,8 @@ public abstract class IOManager implements AutoCloseable { */ public abstract BulkBlockChannelReader createBulkBlockChannelReader( ID channelID, List<MemorySegment> targetSegments, int numBlocks) throws IOException; + + public ExecutorService getExecutorService() { + return executorService; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java index d1ce53e2fdb..52408bb5d14 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java @@ -18,16 +18,19 @@ package org.apache.flink.runtime.io.disk.iomanager; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.util.IOUtils; import org.apache.flink.util.ShutdownHookUtil; +import org.apache.flink.util.concurrent.Executors; import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; @@ -53,8 +56,17 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle // ------------------------------------------------------------------------- /** Constructs a new asynchronous I/O manager, writing files to the system 's temp directory. */ + @VisibleForTesting public IOManagerAsync() { - this(EnvironmentInformation.getTemporaryFileDirectory()); + this( + EnvironmentInformation.getTemporaryFileDirectory(), + Executors.newDirectExecutorService()); + } + + /** Constructs a new asynchronous I/O manager, writing files to the system 's temp directory. */ + @VisibleForTesting + public IOManagerAsync(ExecutorService executorService) { + this(EnvironmentInformation.getTemporaryFileDirectory(), executorService); } /** @@ -62,8 +74,19 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle * * @param tempDir The directory to write temporary files to. */ + @VisibleForTesting public IOManagerAsync(String tempDir) { - this(new String[] {tempDir}); + this(new String[] {tempDir}, Executors.newDirectExecutorService()); + } + + /** + * Constructs a new asynchronous I/O manager, writing file to the given directory. + * + * @param tempDir The directory to write temporary files to. + */ + @VisibleForTesting + public IOManagerAsync(String tempDir, ExecutorService executorService) { + this(new String[] {tempDir}, executorService); } /** @@ -72,8 +95,8 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle * * @param tempDirs The directories to write temporary files to. */ - public IOManagerAsync(String[] tempDirs) { - super(tempDirs); + public IOManagerAsync(String[] tempDirs, ExecutorService executorService) { + super(tempDirs, executorService); // start a write worker thread for each directory this.writers = new WriterThread[tempDirs.length]; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index 6398107ea48..e514123ee77 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -339,7 +339,7 @@ public class TaskManagerServices { // start the I/O manager, it will create some temp directories. final IOManager ioManager = - new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths()); + new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths(), ioExecutor); final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java index 67a65d1f24a..f64d2dc4891 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerTest.java @@ -28,6 +28,7 @@ import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.nio.file.Files; import java.util.List; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import static org.assertj.core.api.Assertions.assertThat; @@ -84,7 +85,7 @@ class IOManagerTest { private static class TestIOManager extends IOManager { protected TestIOManager(String[] paths) { - super(paths); + super(paths, Executors.newSingleThreadScheduledExecutor()); } @Override diff --git a/flink-state-backends/flink-statebackend-rocksdb/pom.xml b/flink-state-backends/flink-statebackend-rocksdb/pom.xml index 1a39fc84b7f..3186b968e54 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/pom.xml +++ b/flink-state-backends/flink-statebackend-rocksdb/pom.xml @@ -66,6 +66,11 @@ under the License. <version>6.20.3-ververica-2.0</version> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-guava</artifactId> + </dependency> + <!-- test dependencies --> <dependency> diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java index baca5f4a013..4dd4c26c8fc 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackend.java @@ -30,6 +30,7 @@ import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.configuration.description.InlineElement; import org.apache.flink.contrib.streaming.state.RocksDBMemoryControllerUtils.RocksDBMemoryFactory; +import org.apache.flink.contrib.streaming.state.sstmerge.RocksDBManualCompactionConfig; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.Environment; @@ -46,6 +47,7 @@ import org.apache.flink.util.AbstractID; import org.apache.flink.util.DynamicCodeLoadingException; import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.MdcUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TernaryBoolean; @@ -195,6 +197,8 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke private RocksDBMemoryFactory rocksDBMemoryFactory; // ------------------------------------------------------------------------ + private final RocksDBManualCompactionConfig manualCompactionConfig; + /** Creates a new {@code EmbeddedRocksDBStateBackend} for storing local state. */ public EmbeddedRocksDBStateBackend() { this(TernaryBoolean.UNDEFINED); @@ -226,6 +230,7 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke this.useIngestDbRestoreMode = TernaryBoolean.UNDEFINED; this.incrementalRestoreAsyncCompactAfterRescale = TernaryBoolean.UNDEFINED; this.rescalingUseDeleteFilesInRange = TernaryBoolean.UNDEFINED; + this.manualCompactionConfig = null; } /** @@ -337,6 +342,11 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke config); this.rocksDBMemoryFactory = original.rocksDBMemoryFactory; + + this.manualCompactionConfig = + original.manualCompactionConfig != null + ? original.manualCompactionConfig + : RocksDBManualCompactionConfig.from(config); } // ------------------------------------------------------------------------ @@ -510,7 +520,15 @@ public class EmbeddedRocksDBStateBackend extends AbstractManagedMemoryStateBacke .setIncrementalRestoreAsyncCompactAfterRescale( getIncrementalRestoreAsyncCompactAfterRescale()) .setUseIngestDbRestoreMode(getUseIngestDbRestoreMode()) - .setRescalingUseDeleteFilesInRange(isRescalingUseDeleteFilesInRange()); + .setRescalingUseDeleteFilesInRange(isRescalingUseDeleteFilesInRange()) + .setIOExecutor( + MdcUtils.scopeToJob( + jobId, + parameters.getEnv().getIOManager().getExecutorService())) + .setManualCompactionConfig( + manualCompactionConfig == null + ? RocksDBManualCompactionConfig.getDefault() + : manualCompactionConfig); return builder.build(); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 8e531ecebf8..47225691b54 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -31,6 +31,7 @@ import org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysAndNamesp import org.apache.flink.contrib.streaming.state.iterator.RocksStateKeysIterator; import org.apache.flink.contrib.streaming.state.snapshot.RocksDBFullSnapshotResources; import org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase; +import org.apache.flink.contrib.streaming.state.sstmerge.RocksDBManualCompactionManager; import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.memory.DataInputDeserializer; @@ -160,6 +161,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { StateDescriptor.Type.REDUCING, (StateUpdateFactory) RocksDBReducingState::update)) .collect(Collectors.toMap(t -> t.f0, t -> t.f1)); + private final RocksDBManualCompactionManager sstMergeManager; private interface StateCreateFactory { <K, N, SV, S extends State, IS extends S> IS createState( @@ -290,7 +292,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { RocksDbTtlCompactFiltersManager ttlCompactFiltersManager, InternalKeyContext<K> keyContext, @Nonnegative long writeBatchSize, - @Nullable CompletableFuture<Void> asyncCompactFuture) { + @Nullable CompletableFuture<Void> asyncCompactFuture, + RocksDBManualCompactionManager rocksDBManualCompactionManager) { super( kvStateRegistry, @@ -338,6 +341,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } else { this.heapPriorityQueuesManager = null; } + this.sstMergeManager = rocksDBManualCompactionManager; + for (RocksDbKvStateInfo stateInfo : kvStateInformation.values()) { + this.sstMergeManager.register(stateInfo); + } + this.sstMergeManager.start(); } @SuppressWarnings("unchecked") @@ -446,6 +454,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { } super.dispose(); + IOUtils.closeQuietly(sstMergeManager); + // This call will block until all clients that still acquire access to the RocksDB instance // have released it, // so that we cannot release the native resources while clients are still working with it in @@ -685,6 +695,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { newRocksStateInfo = new RocksDbKvStateInfo(oldStateInfo.columnFamilyHandle, newMetaInfo); kvStateInformation.put(stateDesc.getName(), newRocksStateInfo); + sstMergeManager.register(newRocksStateInfo); } else { newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>( @@ -711,6 +722,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> { this.nativeMetricMonitor, stateDesc.getName(), newRocksStateInfo); + sstMergeManager.register(newRocksStateInfo); } StateSnapshotTransformFactory<SV> wrappedSnapshotTransformFactory = diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index 226ec0ac36a..1a63950305d 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -30,6 +30,8 @@ import org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreResult; import org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase; import org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy; import org.apache.flink.contrib.streaming.state.snapshot.RocksNativeFullSnapshotStrategy; +import org.apache.flink.contrib.streaming.state.sstmerge.RocksDBManualCompactionConfig; +import org.apache.flink.contrib.streaming.state.sstmerge.RocksDBManualCompactionManager; import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.metrics.MetricGroup; @@ -76,6 +78,7 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.function.Function; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE; @@ -83,6 +86,7 @@ import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOption import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING; import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE; import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; /** * Builder class for {@link RocksDBKeyedStateBackend} which handles all necessary initializations @@ -138,6 +142,9 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken private boolean useIngestDbRestoreMode = USE_INGEST_DB_RESTORE_MODE.defaultValue(); private ColumnFamilyHandle injectedDefaultColumnFamilyHandle; // for testing private RocksDBStateUploader injectRocksDBStateUploader; // for testing + private RocksDBManualCompactionConfig manualCompactionConfig = + RocksDBManualCompactionConfig.getDefault(); + private ExecutorService ioExecutor; public RocksDBKeyedStateBackendBuilder( String operatorIdentifier, @@ -298,6 +305,11 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken return this; } + RocksDBKeyedStateBackendBuilder<K> setIOExecutor(ExecutorService ioExecutor) { + this.ioExecutor = ioExecutor; + return this; + } + public static File getInstanceRocksDBPath(File instanceBasePath) { return new File(instanceBasePath, DB_INSTANCE_DIR_STRING); } @@ -337,6 +349,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken int keyGroupPrefixBytes = CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix( numberOfKeyGroups); + RocksDBManualCompactionManager manualCompactionManager; try { // Variables for snapshot strategy when incremental checkpoint is enabled @@ -398,13 +411,16 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken materializedSstFiles, lastCompletedCheckpointId); // init priority queue factory + manualCompactionManager = + RocksDBManualCompactionManager.create(db, manualCompactionConfig, ioExecutor); priorityQueueFactory = initPriorityQueueFactory( keyGroupPrefixBytes, kvStateInformation, db, writeBatchWrapper, - nativeMetricMonitor); + nativeMetricMonitor, + manualCompactionManager); } catch (Throwable e) { // Do clean up List<ColumnFamilyOptions> columnFamilyOptions = @@ -472,7 +488,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken ttlCompactFiltersManager, keyContext, writeBatchSize, - asyncCompactAfterRestoreFuture); + asyncCompactAfterRestoreFuture, + manualCompactionManager); } private RocksDBRestoreOperation getRocksDBRestoreOperation( @@ -607,7 +624,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation, RocksDB db, RocksDBWriteBatchWrapper writeBatchWrapper, - RocksDBNativeMetricMonitor nativeMetricMonitor) { + RocksDBNativeMetricMonitor nativeMetricMonitor, + RocksDBManualCompactionManager manualCompactionManager) { PriorityQueueSetFactory priorityQueueFactory; switch (priorityQueueConfig.getPriorityQueueStateType()) { case HEAP: @@ -626,7 +644,8 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken nativeMetricMonitor, columnFamilyOptionsFactory, optionsContainer.getWriteBufferManagerCapacity(), - priorityQueueConfig.getRocksDBPriorityQueueSetCacheSize()); + priorityQueueConfig.getRocksDBPriorityQueueSetCacheSize(), + manualCompactionManager); break; default: throw new IllegalArgumentException( @@ -648,4 +667,10 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken FileUtils.deleteDirectory(instanceBasePath); } } + + public RocksDBKeyedStateBackendBuilder<K> setManualCompactionConfig( + RocksDBManualCompactionConfig manualCompactionConfig) { + this.manualCompactionConfig = checkNotNull(manualCompactionConfig); + return this; + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java index 23a1a6913f6..28bd3b0d1a5 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBPriorityQueueSetFactory.java @@ -20,6 +20,7 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.contrib.streaming.state.sstmerge.RocksDBManualCompactionManager; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.runtime.state.KeyExtractorFunction; @@ -71,6 +72,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory { private final RocksDBNativeMetricMonitor nativeMetricMonitor; private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory; private final Long writeBufferManagerCapacity; + private final RocksDBManualCompactionManager manualCompactionManager; RocksDBPriorityQueueSetFactory( KeyGroupRange keyGroupRange, @@ -83,7 +85,8 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory { RocksDBNativeMetricMonitor nativeMetricMonitor, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, Long writeBufferManagerCapacity, - int cacheSize) { + int cacheSize, + RocksDBManualCompactionManager manualCompactionManager) { this.keyGroupRange = keyGroupRange; this.keyGroupPrefixBytes = keyGroupPrefixBytes; this.numberOfKeyGroups = numberOfKeyGroups; @@ -98,6 +101,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory { this.writeBufferManagerCapacity = writeBufferManagerCapacity; Preconditions.checkArgument(cacheSize > 0); this.cacheSize = cacheSize; + this.manualCompactionManager = manualCompactionManager; } @Nonnull @@ -225,6 +229,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory { kvStateInformation.put(stateName, stateInfo); } } + manualCompactionManager.register(stateInfo); return stateInfo; } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/ColumnFamilyLookup.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/ColumnFamilyLookup.java new file mode 100644 index 00000000000..66c79c2b8f8 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/ColumnFamilyLookup.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.sstmerge; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDBException; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Lookup helper for RocksDB column families by their name. */ +class ColumnFamilyLookup { + private final Map<Key, ColumnFamilyHandle> map; + + public ColumnFamilyLookup() { + map = new ConcurrentHashMap<>(); + } + + @Nullable + public ColumnFamilyHandle get(byte[] name) { + return map.get(new Key(name)); + } + + public void add(ColumnFamilyHandle handle) { + try { + map.put(new Key(handle.getName()), handle); + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + } + + private static class Key { + private final byte[] payload; + + private Key(byte[] payload) { + this.payload = checkNotNull(payload); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Key key = (Key) o; + return Arrays.equals(payload, key.payload); + } + + @Override + public int hashCode() { + return Arrays.hashCode(payload); + } + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionScheduler.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionScheduler.java new file mode 100644 index 00000000000..4c81e6e054d --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionScheduler.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.sstmerge; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static java.util.Collections.emptyList; + +/** + * Schedules manual compactions of small disjoint SST files created by RocksDB. It does so + * periodically while maintaining {@link RocksDBManualCompactionOptions#MIN_INTERVAL} between + * compaction rounds, where each round is at most {@link + * RocksDBManualCompactionOptions#MAX_PARALLEL_COMPACTIONS}. + */ +class CompactionScheduler { + private static final Logger LOG = LoggerFactory.getLogger(CompactionScheduler.class); + + private final ScheduledExecutorService scheduledExecutor; + private final ExecutorService ioExecutor; + private final long checkPeriodMs; + private final CompactionTracker tracker; + private final Compactor compactor; + private final CompactionTaskProducer taskProducer; + private final Object lock = new Object(); + private boolean running = true; + + public CompactionScheduler( + RocksDBManualCompactionConfig settings, + ExecutorService ioExecutor, + CompactionTaskProducer taskProducer, + Compactor compactor, + CompactionTracker tracker) { + this( + settings, + ioExecutor, + taskProducer, + compactor, + tracker, + Executors.newSingleThreadScheduledExecutor()); + } + + public CompactionScheduler( + RocksDBManualCompactionConfig settings, + ExecutorService ioExecutor, + CompactionTaskProducer taskProducer, + Compactor compactor, + CompactionTracker tracker, + ScheduledExecutorService scheduledExecutor) { + this.ioExecutor = ioExecutor; + this.scheduledExecutor = scheduledExecutor; + this.checkPeriodMs = settings.minInterval; + this.tracker = tracker; + this.compactor = compactor; + this.taskProducer = taskProducer; + } + + public void start() { + scheduleScan(); + } + + public void stop() throws InterruptedException { + synchronized (lock) { + if (running) { + running = false; + scheduledExecutor.shutdownNow(); + } + } + if (!scheduledExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + LOG.warn("Unable to terminate scheduled tasks in 5s"); + } + } + + public void scheduleScan() { + synchronized (lock) { + if (running) { + LOG.trace("Schedule SST scan in {} ms", checkPeriodMs); + scheduledExecutor.schedule( + () -> ioExecutor.execute(this::maybeScan), + checkPeriodMs, + TimeUnit.MILLISECONDS); + } else { + LOG.debug("Not scheduling next scan: shutting down"); + } + } + } + + public void maybeScan() { + LOG.trace("Starting SST scan"); + if (tracker.haveManualCompactions() || tracker.isShuttingDown()) { + LOG.trace("Skip SST scan {}", tracker); + // nothing to do: + // previous compactions didn't finish yet + // the last one will reschedule this task + return; + } + + final List<CompactionTask> targets = scan(); + LOG.trace("SST scan resulted in targets {}", targets); + if (targets.isEmpty()) { + scheduleScan(); + return; + } + + for (CompactionTask target : targets) { + ioExecutor.execute( + () -> + tracker.runWithTracking( + target.columnFamilyHandle, + () -> + compactor.compact( + target.columnFamilyHandle, + target.level, + target.files), + this::scheduleScan)); + } + } + + private List<CompactionTask> scan() { + try { + return taskProducer.produce(); + } catch (Exception e) { + LOG.warn("Unable to scan for compaction targets", e); + return emptyList(); + } + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionTask.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionTask.java new file mode 100644 index 00000000000..5559be5cf46 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionTask.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.sstmerge; + +import org.rocksdb.ColumnFamilyHandle; + +import java.util.List; +import java.util.Objects; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Target for a single manual compaction call. */ +class CompactionTask { + final int level; + final List<String> files; + final ColumnFamilyHandle columnFamilyHandle; + + CompactionTask(int level, List<String> files, ColumnFamilyHandle columnFamilyHandle) { + checkArgument(!files.isEmpty()); + checkArgument(level >= 0); + this.level = level; + this.files = checkNotNull(files); + this.columnFamilyHandle = checkNotNull(columnFamilyHandle); + } + + @Override + public String toString() { + return "CompactionTask{" + + "level=" + + level + + ", files=" + + files + + ", columnFamily=" + + columnFamilyHandle + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CompactionTask that = (CompactionTask) o; + return level == that.level + && Objects.equals(files, that.files) + && Objects.equals(columnFamilyHandle, that.columnFamilyHandle); + } + + @Override + public int hashCode() { + return Objects.hash(level, files, columnFamilyHandle); + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionTaskProducer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionTaskProducer.java new file mode 100644 index 00000000000..d4ef5ddfda2 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionTaskProducer.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.sstmerge; + +import org.apache.flink.shaded.guava31.com.google.common.primitives.UnsignedBytes; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.LiveFileMetaData; +import org.rocksdb.RocksDB; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * Checks live RocksDB SST files for merging (compacting) according to the {@link + * RocksDBManualCompactionOptions}. The result is a list of {@link CompactionTask}s. + * + * <p>SSTs from all the column families are scanned at once using {@link + * RocksDB#getLiveFilesMetaData()}. However, each task only contains SSTs of the same level of the + * same column family. + * + * <p>At most {@link RocksDBManualCompactionConfig#maxManualCompactions} tasks are chosen from the + * candidates with the highest number of files. + */ +class CompactionTaskProducer { + private static final Logger LOG = LoggerFactory.getLogger(CompactionTaskProducer.class); + + private static final Comparator<SstFileMetaData> SST_COMPARATOR = + (o1, o2) -> { + Comparator<byte[]> cmp = UnsignedBytes.lexicographicalComparator(); + int cfCmp = cmp.compare(o1.columnFamilyName(), o2.columnFamilyName()); + if (cfCmp != 0) { + return cfCmp; + } else { + return cmp.compare(o1.smallestKey(), o2.smallestKey()); + } + }; + + private final RocksDBManualCompactionConfig settings; + private final ColumnFamilyLookup columnFamilyLookup; + private final Supplier<List<SstFileMetaData>> metadataSupplier; + + CompactionTaskProducer( + RocksDB db, + RocksDBManualCompactionConfig settings, + ColumnFamilyLookup columnFamilyLookup) { + this( + () -> SstFileMetaData.mapFrom(db.getLiveFilesMetaData()), + settings, + columnFamilyLookup); + } + + CompactionTaskProducer( + Supplier<List<SstFileMetaData>> metadataSupplier, + RocksDBManualCompactionConfig settings, + ColumnFamilyLookup columnFamilyLookup) { + this.settings = settings; + this.columnFamilyLookup = columnFamilyLookup; + this.metadataSupplier = metadataSupplier; + } + + public List<CompactionTask> produce() { + + // get all CF files sorted by key range start (L1+) + List<SstFileMetaData> sstSortedByCfAndStartingKeys = + metadataSupplier.get().stream() + .filter(l -> l.level() > 0) // let RocksDB deal with L0 + .sorted(SST_COMPARATOR) + .collect(Collectors.toList()); + LOG.trace("Input files: {}", sstSortedByCfAndStartingKeys.size()); + + List<CompactionTask> tasks = groupIntoTasks(sstSortedByCfAndStartingKeys); + tasks.sort(Comparator.<CompactionTask>comparingInt(t -> t.files.size()).reversed()); + return tasks.subList(0, Math.min(tasks.size(), settings.maxManualCompactions)); + } + + private List<CompactionTask> groupIntoTasks(List<SstFileMetaData> files) { + // collect the files which won't be compacted by RocksDB + // and won't cause any problems if merged - i.e. don't overlap with any known files on + // the other levels + List<CompactionTask> tasks = new ArrayList<>(); + List<SstFileMetaData> group = new ArrayList<>(); + SstFileMetaData prevFile = null; + long compactionOutputSize = 0; + + for (SstFileMetaData file : files) { + final boolean compact = shouldCompact(file); + final boolean newGroup = + !compact || !sameGroup(file, prevFile, group, compactionOutputSize); + if (newGroup) { + createTask(group).ifPresent(tasks::add); + group.clear(); + compactionOutputSize = 0; + } + if (compact) { + group.add(file); + compactionOutputSize += file.size(); + } + LOG.trace( + "Processed SST file: {}, level={}, cf: {}, being compacted={}, compact: {}, change group: {}, prev level={}", + file.fileName(), + file.level(), + file.columnFamilyName(), + file.beingCompacted(), + compact, + newGroup, + prevFile == null ? -1 : prevFile.level()); + prevFile = file; + } + createTask(group).ifPresent(tasks::add); + return tasks; + } + + private Optional<CompactionTask> createTask(List<SstFileMetaData> compaction) { + if (compaction.size() < settings.minFilesToCompact) { + return Optional.empty(); + } + SstFileMetaData head = compaction.iterator().next(); + ColumnFamilyHandle cf = columnFamilyLookup.get(head.columnFamilyName()); + if (cf == null) { + LOG.warn("Unknown column family: {}", head.columnFamilyName); + return Optional.empty(); + } + List<String> fileNames = + compaction.stream().map(SstFileMetaData::fileName).collect(Collectors.toList()); + return Optional.of(new CompactionTask(head.level(), fileNames, cf)); + } + + private boolean sameGroup( + SstFileMetaData file, + SstFileMetaData prevFile, + List<SstFileMetaData> group, + long compactionOutputSize) { + if (prevFile == null) { + return true; + } + return (file.level() == prevFile.level()) + && Arrays.equals(file.columnFamilyName(), prevFile.columnFamilyName()) + && compactionOutputSize + file.size() <= settings.maxOutputFileSize.getBytes() + && group.size() < settings.maxFilesToCompact; + } + + private boolean shouldCompact(SstFileMetaData file) { + return file.size() <= settings.maxFileSizeToCompact.getBytes() && !file.beingCompacted(); + } + + static class SstFileMetaData { + + private final byte[] columnFamilyName; + private final String fileName; + private final int level; + private final long size; + private final byte[] smallestKey; + private final boolean beingCompacted; + + public SstFileMetaData( + byte[] columnFamilyName, + String fileName, + int level, + long size, + byte[] smallestKey, + boolean beingCompacted) { + this.columnFamilyName = columnFamilyName; + this.fileName = fileName; + this.level = level; + this.size = size; + this.smallestKey = smallestKey; + this.beingCompacted = beingCompacted; + } + + public String fileName() { + return fileName; + } + + public byte[] columnFamilyName() { + return columnFamilyName; + } + + public int level() { + return level; + } + + public long size() { + return size; + } + + public byte[] smallestKey() { + return smallestKey; + } + + public boolean beingCompacted() { + return beingCompacted; + } + + static List<SstFileMetaData> mapFrom(List<LiveFileMetaData> list) { + return list.stream() + .map(SstFileMetaData::fromLiveFileMetaData) + .collect(Collectors.toList()); + } + + static SstFileMetaData fromLiveFileMetaData(LiveFileMetaData fileMetaData) { + return new SstFileMetaData( + fileMetaData.columnFamilyName(), + fileMetaData.fileName(), + fileMetaData.level(), + fileMetaData.size(), + fileMetaData.smallestKey(), + fileMetaData.beingCompacted()); + } + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionTracker.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionTracker.java new file mode 100644 index 00000000000..0b72ac6de11 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionTracker.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.sstmerge; + +import org.apache.flink.util.function.RunnableWithException; + +import org.rocksdb.ColumnFamilyHandle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.ThreadSafe; + +import java.util.function.Function; + +/** + * Tracks the number of pending/running compactions (manual and automatic) and the DB status. Used + * concurrently by different compaction threads and by SST scanning threads. + */ +@ThreadSafe +class CompactionTracker { + private static final Logger LOG = LoggerFactory.getLogger(CompactionTracker.class); + + private final Function<ColumnFamilyHandle, Long> runningAutoCompactions; + private final int maxManualCompactions; + private final int maxAutoCompactions; + private int pendingManualCompactions; + private int runningManualCompactions; + private boolean isShuttingDown; + + public CompactionTracker( + RocksDBManualCompactionConfig settings, + Function<ColumnFamilyHandle, Long> runningAutoCompactions) { + this.maxManualCompactions = settings.maxManualCompactions; + this.maxAutoCompactions = settings.maxAutoCompactions; + this.runningAutoCompactions = runningAutoCompactions; + this.isShuttingDown = false; + } + + private synchronized void complete() { + runningManualCompactions--; + } + + private synchronized void cancel() { + pendingManualCompactions--; + } + + private synchronized boolean tryStart(ColumnFamilyHandle cf) { + if (runningManualCompactions >= maxManualCompactions) { + return false; + } + if (isShuttingDown()) { + return false; + } + if (runningAutoCompactions.apply(cf) >= maxAutoCompactions) { + return false; + } + // all good + pendingManualCompactions--; + runningManualCompactions++; + return true; + } + + private synchronized void runIfNoManualCompactions(Runnable runnable) { + if (!haveManualCompactions()) { + runnable.run(); + } + } + + public synchronized boolean haveManualCompactions() { + return runningManualCompactions > 0 || pendingManualCompactions > 0; + } + + public synchronized boolean isShuttingDown() { + return isShuttingDown; + } + + public synchronized void close() { + isShuttingDown = true; + } + + @Override + public String toString() { + return "CompactionTracker{" + + "maxManualCompactions=" + + maxManualCompactions + + ", maxAutoCompactions=" + + maxAutoCompactions + + ", pendingManualCompactions=" + + pendingManualCompactions + + ", runningManualCompactions=" + + runningManualCompactions + + ", isShuttingDown=" + + isShuttingDown + + '}'; + } + + void runWithTracking( + ColumnFamilyHandle columnFamily, + RunnableWithException compaction, + Runnable lastCompactionPostAction) { + if (tryStart(columnFamily)) { + try { + compaction.run(); + } catch (Exception e) { + LOG.warn("Unable to compact {} (concurrent compaction?)", compaction, e); + } + complete(); + } else { + // drop this task - new will be created with a fresh set of files + cancel(); + } + // we were the last manual compaction - schedule the scan + runIfNoManualCompactions(lastCompactionPostAction); + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/Compactor.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/Compactor.java new file mode 100644 index 00000000000..8471182e759 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/Compactor.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.sstmerge; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.CompactionJobInfo; +import org.rocksdb.CompactionOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * Compacts multiple RocksDB SST files using {@link RocksDB#compactFiles(CompactionOptions, + * ColumnFamilyHandle, List, int, int, CompactionJobInfo) RocksDB#compactFiles} into the last level. + * Usually this results in a single SST file if it doesn't exceed RocksDB target output file size + * for that level. + */ +class Compactor { + private static final Logger LOG = LoggerFactory.getLogger(Compactor.class); + private static final int OUTPUT_PATH_ID = 0; // just use the first one + + private final CompactionTarget db; + private final long targetOutputFileSize; + + public Compactor(RocksDB db, long targetOutputFileSize) { + this(db::compactFiles, targetOutputFileSize); + } + + public Compactor(CompactionTarget target, long targetOutputFileSize) { + this.db = target; + this.targetOutputFileSize = targetOutputFileSize; + } + + void compact(ColumnFamilyHandle cfName, int level, List<String> files) throws RocksDBException { + int outputLevel = Math.min(level + 1, cfName.getDescriptor().getOptions().numLevels() - 1); + LOG.debug( + "Manually compacting {} files from level {} to {}: {}", + files.size(), + level, + outputLevel, + files); + try (CompactionOptions options = + new CompactionOptions().setOutputFileSizeLimit(targetOutputFileSize); + CompactionJobInfo compactionJobInfo = new CompactionJobInfo()) { + db.compactFiles(options, cfName, files, outputLevel, OUTPUT_PATH_ID, compactionJobInfo); + } + } + + public interface CompactionTarget { + void compactFiles( + CompactionOptions var1, + ColumnFamilyHandle var2, + List<String> var3, + int var4, + int var5, + CompactionJobInfo var6) + throws RocksDBException; + + CompactionTarget NO_OP = (var1, var2, var3, var4, var5, var6) -> {}; + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionConfig.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionConfig.java new file mode 100644 index 00000000000..d4ac6403f6d --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionConfig.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.sstmerge; + +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.ReadableConfig; + +import java.io.Serializable; + +/** Configuration for {@link RocksDBManualCompactionManager}. */ +public class RocksDBManualCompactionConfig implements Serializable { + private static final long serialVersionUID = 1L; + + public final long minInterval; // this small value is for testing, prod=10_000 ? + public final int maxManualCompactions; + public final MemorySize maxFileSizeToCompact; + public final int minFilesToCompact; // in a single compaction + public final int maxFilesToCompact; // in a single compaction + public final MemorySize maxOutputFileSize; + public final int maxAutoCompactions; + + public RocksDBManualCompactionConfig( + long periodMs, + int maxManualCompactions, + MemorySize maxFileSizeToCompact, + int minFilesToCompact, + int maxFilesToCompact, + MemorySize maxOutputFileSize, + int maxAutoCompactions) { + this.minInterval = periodMs; + this.maxManualCompactions = maxManualCompactions; + this.maxFileSizeToCompact = maxFileSizeToCompact; + this.minFilesToCompact = minFilesToCompact; + this.maxFilesToCompact = maxFilesToCompact; + this.maxOutputFileSize = maxOutputFileSize; + this.maxAutoCompactions = maxAutoCompactions; + } + + public static Builder builder() { + return new Builder(); + } + + public static RocksDBManualCompactionConfig from(ReadableConfig config) { + return builder() + .setMinInterval(config.get(RocksDBManualCompactionOptions.MIN_INTERVAL).toMillis()) + .setMaxParallelCompactions( + config.get(RocksDBManualCompactionOptions.MAX_PARALLEL_COMPACTIONS)) + .setMaxFileSizeToCompact( + config.get(RocksDBManualCompactionOptions.MAX_FILE_SIZE_TO_COMPACT)) + .setMaxFilesToCompact( + config.get(RocksDBManualCompactionOptions.MAX_FILES_TO_COMPACT)) + .setMinFilesToCompact( + config.get(RocksDBManualCompactionOptions.MIN_FILES_TO_COMPACT)) + .setMaxOutputFileSize( + config.get(RocksDBManualCompactionOptions.MAX_OUTPUT_FILE_SIZE)) + .setMaxAutoCompactions( + config.get(RocksDBManualCompactionOptions.MAX_AUTO_COMPACTIONS)) + .build(); + } + + public static RocksDBManualCompactionConfig getDefault() { + return builder().build(); + } + + @Override + public String toString() { + return "RocksDBManualCompactionConfig{" + + "minInterval=" + + minInterval + + ", maxManualCompactions=" + + maxManualCompactions + + ", maxFileSizeToCompact=" + + maxFileSizeToCompact + + ", minFilesToCompact=" + + minFilesToCompact + + ", maxFilesToCompact=" + + maxFilesToCompact + + ", maxOutputFileSize=" + + maxOutputFileSize + + ", maxAutoCompactions=" + + maxAutoCompactions + + '}'; + } + + /** + * Builder for {@link + * org.apache.flink.contrib.streaming.state.sstmerge.RocksDBManualCompactionConfig}. + */ + public static class Builder { + private long minInterval = + RocksDBManualCompactionOptions.MIN_INTERVAL.defaultValue().toMillis(); + private int maxParallelCompactions = + RocksDBManualCompactionOptions.MAX_PARALLEL_COMPACTIONS.defaultValue(); + private MemorySize maxFileSizeToCompact = + RocksDBManualCompactionOptions.MAX_FILE_SIZE_TO_COMPACT.defaultValue(); + private int minFilesToCompact = + RocksDBManualCompactionOptions.MIN_FILES_TO_COMPACT.defaultValue(); + private int maxFilesToCompact = + RocksDBManualCompactionOptions.MAX_FILES_TO_COMPACT.defaultValue(); + private MemorySize maxOutputFileSize = + RocksDBManualCompactionOptions.MAX_OUTPUT_FILE_SIZE.defaultValue(); + private int maxAutoCompactions = + RocksDBManualCompactionOptions.MAX_AUTO_COMPACTIONS.defaultValue(); + + public Builder setMinInterval(long minInterval) { + this.minInterval = minInterval; + return this; + } + + public Builder setMaxParallelCompactions(int maxParallelCompactions) { + this.maxParallelCompactions = maxParallelCompactions; + return this; + } + + public Builder setMaxFileSizeToCompact(MemorySize maxFileSizeToCompact) { + this.maxFileSizeToCompact = maxFileSizeToCompact; + return this; + } + + public Builder setMinFilesToCompact(int minFilesToCompact) { + this.minFilesToCompact = minFilesToCompact; + return this; + } + + public Builder setMaxFilesToCompact(int maxFilesToCompact) { + this.maxFilesToCompact = maxFilesToCompact; + return this; + } + + public Builder setMaxOutputFileSize(MemorySize maxOutputFileSize) { + this.maxOutputFileSize = maxOutputFileSize; + return this; + } + + public Builder setMaxAutoCompactions(int maxAutoCompactions) { + this.maxAutoCompactions = maxAutoCompactions; + return this; + } + + public RocksDBManualCompactionConfig build() { + return new RocksDBManualCompactionConfig( + minInterval, + maxParallelCompactions, + maxFileSizeToCompact, + minFilesToCompact, + maxFilesToCompact, + maxOutputFileSize, + maxAutoCompactions); + } + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionManager.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionManager.java new file mode 100644 index 00000000000..6eac9e55615 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionManager.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.sstmerge; + +import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; + +import org.rocksdb.RocksDB; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutorService; + +/** + * Manages compactions of small and disjoint RocksDB SST files that otherwise would not be merged to + * reduce write amplification. + * + * <p>Such files are usually small and are inlined into the Checkpoint metadata. Which might lead to + * exceeding RPC message size on checkpoint ACK or recovery. + * + * <p>This class manages compactions of one or more Column Families of a single RocksDB instance. + * + * <p>Note that "manual" means that the compactions are <b>requested</b> manually (by Flink), but + * they are still executed by RocksDB. + */ +public interface RocksDBManualCompactionManager extends AutoCloseable { + Logger LOG = LoggerFactory.getLogger(RocksDBManualCompactionManager.class); + + static RocksDBManualCompactionManager create( + RocksDB db, RocksDBManualCompactionConfig settings, ExecutorService ioExecutor) { + LOG.info("Creating RocksDBManualCompactionManager with settings: {}", settings); + return settings.minInterval <= 0 + ? NO_OP + : new RocksDBManualCompactionManagerImpl(db, settings, ioExecutor); + } + + RocksDBManualCompactionManager NO_OP = + new RocksDBManualCompactionManager() { + @Override + public void register(RocksDBKeyedStateBackend.RocksDbKvStateInfo stateInfo) {} + + @Override + public void close() {} + + @Override + public void start() {} + }; + + void register(RocksDBKeyedStateBackend.RocksDbKvStateInfo stateInfo); + + @Override + void close() throws Exception; + + void start(); +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionManagerImpl.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionManagerImpl.java new file mode 100644 index 00000000000..ebbae150a70 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionManagerImpl.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.sstmerge; + +import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo; +import org.apache.flink.contrib.streaming.state.RocksDBProperty; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutorService; + +/** Default implementation of {@link RocksDBManualCompactionManager}. */ +class RocksDBManualCompactionManagerImpl implements RocksDBManualCompactionManager { + private static final Logger LOG = + LoggerFactory.getLogger(RocksDBManualCompactionManagerImpl.class); + + private final ColumnFamilyLookup lookup; + private final CompactionScheduler scheduler; + private final CompactionTracker tracker; + + public RocksDBManualCompactionManagerImpl( + RocksDB db, RocksDBManualCompactionConfig settings, ExecutorService ioExecutor) { + this.lookup = new ColumnFamilyLookup(); + this.tracker = new CompactionTracker(settings, cf -> getNumAutoCompactions(db, cf)); + this.scheduler = + new CompactionScheduler( + settings, + ioExecutor, + new CompactionTaskProducer(db, settings, lookup), + new Compactor(db, settings.maxOutputFileSize.getBytes()), + tracker); + } + + @Override + public void start() { + scheduler.start(); + } + + @Override + public void register(RocksDbKvStateInfo stateInfo) { + LOG.debug("Register state for manual compactions: '{}'", stateInfo.metaInfo.getName()); + lookup.add(stateInfo.columnFamilyHandle); + } + + @Override + public void close() throws Exception { + LOG.info("Stopping RocksDBManualCompactionManager"); + tracker.close(); + try { + scheduler.stop(); + } catch (Exception e) { + LOG.warn("Unable to stop compaction scheduler {}", scheduler, e); + } + } + + private static long getNumAutoCompactions(RocksDB db, ColumnFamilyHandle columnFamily) { + try { + return db.getLongProperty( + columnFamily, RocksDBProperty.NumRunningCompactions.getRocksDBProperty()); + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionOptions.java new file mode 100644 index 00000000000..a48c63bf6f5 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/sstmerge/RocksDBManualCompactionOptions.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.sstmerge; + +import org.apache.flink.annotation.docs.Documentation; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.MemorySize; + +import java.time.Duration; + +/** Configuration options for manual compaction for the RocksDB backend. */ +public class RocksDBManualCompactionOptions { + + @Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB) + public static final ConfigOption<Duration> MIN_INTERVAL = + ConfigOptions.key("state.backend.rocksdb.manual-compaction.min-interval") + .durationType() + .defaultValue(Duration.ofMinutes(0)) + .withDescription( + "The minimum interval between manual compactions. Zero disables manual compactions"); + + @Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB) + public static final ConfigOption<Integer> MAX_PARALLEL_COMPACTIONS = + ConfigOptions.key("state.backend.rocksdb.manual-compaction.max-parallel-compactions") + .intType() + .defaultValue(5) + .withDescription( + "The maximum number of manual compactions to start." + + "Note that only one of them can run at a time as of v8.10.0; all the others will be waiting"); + + @Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB) + public static final ConfigOption<MemorySize> MAX_FILE_SIZE_TO_COMPACT = + ConfigOptions.key("state.backend.rocksdb.manual-compaction.max-file-size-to-compact") + .memoryType() + .defaultValue(MemorySize.parse("50k")) + .withDescription("The maximum size of individual input files"); + + @Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB) + public static final ConfigOption<Integer> MIN_FILES_TO_COMPACT = + ConfigOptions.key("state.backend.rocksdb.manual-compaction.min-files-to-compact") + .intType() + .defaultValue(5) + .withDescription( + "The minimum number of input files to compact together in a single compaction run"); + + @Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB) + public static final ConfigOption<Integer> MAX_FILES_TO_COMPACT = + ConfigOptions.key("state.backend.rocksdb.manual-compaction.max-files-to-compact") + .intType() + .defaultValue(30) + .withDescription( + "The maximum number of input files to compact together in a single compaction run"); + + @Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB) + public static final ConfigOption<MemorySize> MAX_OUTPUT_FILE_SIZE = + ConfigOptions.key("state.backend.rocksdb.manual-compaction.max-output-file-size") + .memoryType() + .defaultValue(MemorySize.parse("64Mb")) + .withDescription("The maximum output file size"); + + @Documentation.Section(Documentation.Sections.EXPERT_ROCKSDB) + public static final ConfigOption<Integer> MAX_AUTO_COMPACTIONS = + ConfigOptions.key("state.backend.rocksdb.manual-compaction.max-auto-compactions") + .intType() + .defaultValue(30) + .withDescription( + "The maximum number of automatic compactions running for manual compaction to start." + + "If the actual number is higher, manual compaction won't be started to avoid delaying automatic ones."); +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java index 41773e4be28..d58f388ab44 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java @@ -29,11 +29,13 @@ import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.sstmerge.RocksDBManualCompactionOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.CheckpointStorage; +import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.ConfigurableStateBackend; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; @@ -82,6 +84,7 @@ import org.rocksdb.Snapshot; import java.io.File; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -202,6 +205,7 @@ public class EmbeddedRocksDBStateBackendTest configuration.set( RocksDBOptions.TIMER_SERVICE_FACTORY, EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB); + configuration.set(RocksDBManualCompactionOptions.MIN_INTERVAL, Duration.ofMillis(1)); return configuration; } @@ -707,6 +711,55 @@ public class EmbeddedRocksDBStateBackendTest assertEquals(testConfig.getOptional(option).orElse(!option.defaultValue()), value); } + /** Test that most of the non-overlapping small SST files are eventually merged. */ + @TestTemplate + public void testSmallFilesCompaction() throws Exception { + ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class); + SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl(); + CheckpointStreamFactory streamFactory = createStreamFactory(); + + final KeyGroupRange range = KeyGroupRange.of(0, 49); + double expectedNumSstFiles = range.getNumberOfKeyGroups() * .5; + final CheckpointableKeyedStateBackend<Integer> backend = + createKeyedBackend( + IntSerializer.INSTANCE, + range.getEndKeyGroup() - range.getStartKeyGroup() + 1, + range, + env); + try { + ValueState<String> state = + backend.getPartitionedState( + VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + for (int i = range.getStartKeyGroup(); i < range.getEndKeyGroup(); i++) { + backend.setCurrentKey(i); + state.update(Integer.toString(i)); + // snapshot to force flushing memtables to disk and create a new SST file + runSnapshot( + backend.snapshot( + i, // checkpoint id + i, // timestamp + streamFactory, + CheckpointOptions.forCheckpointWithDefaultLocation()), + sharedStateRegistry); + } + + // expect files under dpPath: job_123_op_456/db/*.sst + File sstPath = new File(dbPath).listFiles()[0].listFiles()[0]; + + int length = sstPath.listFiles((dir, name) -> name.endsWith(".sst")).length; + assertThat(length) + .isLessThanOrEqualTo((int) expectedNumSstFiles) + .withFailMessage("actual: " + length + ", expected: " + expectedNumSstFiles); + + } finally { + IOUtils.closeQuietly(backend); + backend.dispose(); + } + // allow some time for the background compaction to fail hard by calling closed db + Thread.sleep(100); + } + private void runStateUpdates() throws Exception { for (int i = 50; i < 150; ++i) { if (i % 10 == 0) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionSchedulerTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionSchedulerTest.java new file mode 100644 index 00000000000..d5281f60a31 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionSchedulerTest.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.sstmerge; + +import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; + +class CompactionSchedulerTest { + @Test + void testClose() throws InterruptedException { + RocksDBManualCompactionConfig config = RocksDBManualCompactionConfig.getDefault(); + ManuallyTriggeredScheduledExecutorService scheduledExecutor = + new ManuallyTriggeredScheduledExecutorService(); + ManuallyTriggeredScheduledExecutorService ioExecutor = + new ManuallyTriggeredScheduledExecutorService(); + CompactionScheduler compactionScheduler = + new CompactionScheduler( + config, + ioExecutor, + new CompactionTaskProducer( + Collections::emptyList, config, new ColumnFamilyLookup()), + new Compactor(Compactor.CompactionTarget.NO_OP, 1L), + new CompactionTracker(config, ign -> 0L), + scheduledExecutor); + compactionScheduler.start(); + scheduledExecutor.triggerScheduledTasks(); + compactionScheduler.stop(); + ioExecutor.triggerAll(); // should not fail e.g. because compactionScheduler was stopped + ioExecutor.shutdown(); + } +} diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionTaskProducerTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionTaskProducerTest.java new file mode 100644 index 00000000000..200547a9fb6 --- /dev/null +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/sstmerge/CompactionTaskProducerTest.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.contrib.streaming.state.sstmerge; + +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.contrib.streaming.state.RocksDBExtension; +import org.apache.flink.contrib.streaming.state.sstmerge.CompactionTaskProducer.SstFileMetaData; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDBException; + +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.util.Collections.singletonList; +import static org.assertj.core.api.Assertions.assertThat; + +/** {@link CompactionTaskProducer} test. */ +class CompactionTaskProducerTest { + @RegisterExtension public RocksDBExtension rocksDBExtension = new RocksDBExtension(); + private static final Random RANDOM = new Random(); + + private ColumnFamilyLookup defaultCfLookup; + + @BeforeEach + public void beforeEach() { + defaultCfLookup = new ColumnFamilyLookup(); + defaultCfLookup.add(rocksDBExtension.getDefaultColumnFamily()); + } + + @Test + void testEmpty() { + assertThat(produce(configBuilder().build())).isEmpty(); + } + + @Test + void testSingleFile() { + assertThat(produce(configBuilder().build(), sstBuilder().build())).isNotEmpty(); + } + + @Test + void testMinFilesToCompact() { + assertThat(produce(configBuilder().setMinFilesToCompact(2).build(), sstBuilder().build())) + .isEmpty(); + } + + @Test + void testMaxFilesToCompact() { + assertThat( + produce( + configBuilder().setMaxFilesToCompact(1).build(), + sstBuilder().build(), + sstBuilder().build())) + .hasSize(2); + } + + @Test + void testMaxParallelCompactions() { + assertThat( + produce( + configBuilder() + .setMaxFilesToCompact(1) + .setMaxParallelCompactions(2) + .build(), + sstBuilder().build(), + sstBuilder().build(), + sstBuilder().build(), + sstBuilder().build())) + .hasSize(2); + } + + @Test + void testMaxFileSizeToCompact() { + assertThat( + produce( + configBuilder().setMaxFileSizeToCompact(new MemorySize(1)).build(), + sstBuilder().build())) + .isEmpty(); + } + + @Test + void testMaxOutputFileSize() { + final int numFiles = 5; + final int fileSize = 10; + final long totalSize = fileSize * numFiles; + assertThat( + produce( + configBuilder() + .setMaxOutputFileSize(new MemorySize(totalSize - 1)) + .build(), + buildSstFiles(1, fileSize, numFiles))) + .hasSize(2); + } + + @Test + void testGrouping() { + int level = 3; + SstFileMetaData[] files = buildSstFiles(level, 1024, 10); + assertThat(produce(configBuilder().build(), files)) + .hasSameElementsAs(singletonList(createTask(level, files))); + } + + @Test + void testGroupingWithGap() { + SstFileMetaData sst0 = sstBuilder().setLevel(1).setSmallestKey("0".getBytes()).build(); + SstFileMetaData sst1 = sstBuilder().setLevel(1).setSmallestKey("1".getBytes()).build(); + SstFileMetaData sst2 = + sstBuilder() + .setLevel(2) + .setSmallestKey("2".getBytes()) + .setBeingCompacted(true) + .build(); + SstFileMetaData sst3 = sstBuilder().setLevel(3).setSmallestKey("3".getBytes()).build(); + assertThat(produce(configBuilder().build(), sst0, sst1, sst2, sst3)) + .hasSameElementsAs( + Arrays.asList( + createTask(sst0.level(), sst0, sst1), + createTask(sst3.level(), sst3))); + } + + @Test + void testNotGroupingOnDifferentLevels() { + SstFileMetaData sst1 = sstBuilder().setLevel(1).build(); + SstFileMetaData sst2 = sstBuilder().setLevel(2).build(); + assertThat(produce(configBuilder().build(), sst1, sst2)).hasSize(2); + } + + @Test + void testSkipBeingCompacted() { + assertThat(produce(configBuilder().build(), sstBuilder().setBeingCompacted(true).build())) + .isEmpty(); + } + + @Test + void testSkipZeroLevel() { + assertThat(produce(configBuilder().build(), sstBuilder().setLevel(0).build())).isEmpty(); + } + + @Test + void testNotGroupingDifferentColumnFamilies() { + ColumnFamilyHandle cf1 = rocksDBExtension.createNewColumnFamily("cf1"); + defaultCfLookup.add(cf1); + ColumnFamilyHandle cf2 = rocksDBExtension.createNewColumnFamily("cf2"); + defaultCfLookup.add(cf2); + assertThat( + produce( + configBuilder().build(), + sstBuilder().setColumnFamily(cf1).build(), + sstBuilder().setColumnFamily(cf2).build())) + .hasSize(2); + } + + ////////////////// + // utility methods + + static class SstFileMetaDataBuilder { + private byte[] columnFamilyName; + private String fileName; + private int level; + private long size; + private byte[] smallestKey; + private boolean beingCompacted; + + public SstFileMetaDataBuilder(ColumnFamilyHandle columnFamily) { + try { + this.columnFamilyName = columnFamily.getName(); + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + } + + public SstFileMetaDataBuilder setColumnFamily(ColumnFamilyHandle columnFamily) { + try { + this.columnFamilyName = columnFamily.getName(); + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + return this; + } + + public SstFileMetaDataBuilder setFileName(String fileName) { + this.fileName = fileName; + return this; + } + + public SstFileMetaDataBuilder setLevel(int level) { + this.level = level; + return this; + } + + public SstFileMetaDataBuilder setSize(long size) { + this.size = size; + return this; + } + + public SstFileMetaDataBuilder setSmallestKey(byte[] smallestKey) { + this.smallestKey = smallestKey; + return this; + } + + public SstFileMetaDataBuilder setBeingCompacted(boolean beingCompacted) { + this.beingCompacted = beingCompacted; + return this; + } + + public SstFileMetaData build() { + return new SstFileMetaData( + columnFamilyName, fileName, level, size, smallestKey, beingCompacted); + } + } + + private List<CompactionTask> produce( + RocksDBManualCompactionConfig config, SstFileMetaData... sst) { + return new CompactionTaskProducer(() -> Arrays.asList(sst), config, defaultCfLookup) + .produce(); + } + + private static RocksDBManualCompactionConfig.Builder configBuilder() { + return RocksDBManualCompactionConfig.builder() + .setMaxFilesToCompact(Integer.MAX_VALUE) + .setMaxAutoCompactions(Integer.MAX_VALUE) + .setMaxParallelCompactions(Integer.MAX_VALUE) + .setMaxOutputFileSize(MemorySize.MAX_VALUE) + .setMinFilesToCompact(1) + .setMinInterval(1L); + } + + private SstFileMetaDataBuilder sstBuilder() { + byte[] bytes = new byte[128]; + RANDOM.nextBytes(bytes); + return new SstFileMetaDataBuilder(rocksDBExtension.getDefaultColumnFamily()) + .setFileName(RANDOM.nextInt() + ".sst") + .setLevel(1) + .setSize(4) + .setSmallestKey(bytes); + } + + private SstFileMetaData[] buildSstFiles(int level, int fileSize, int numFiles) { + return IntStream.range(0, numFiles) + .mapToObj( + i -> + sstBuilder() + .setSmallestKey(new byte[] {(byte) i}) + .setLevel(level) + .setSize(fileSize) + .build()) + .toArray(SstFileMetaData[]::new); + } + + private CompactionTask createTask(int level, SstFileMetaData... files) { + return new CompactionTask( + level, + Arrays.stream(files).map(SstFileMetaData::fileName).collect(Collectors.toList()), + rocksDBExtension.getDefaultColumnFamily()); + } +} diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java index 638b2f316bd..cc0d0bc3ed2 100644 --- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java @@ -32,6 +32,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -349,6 +350,9 @@ public class ManuallyTriggeredScheduledExecutorService implements ScheduledExecu final ScheduledTask<V> scheduledTask = new ScheduledTask<>(callable, unit.convert(delay, TimeUnit.MILLISECONDS)); + if (shutdown) { + throw new RejectedExecutionException(); + } nonPeriodicScheduledTasks.offer(scheduledTask); return scheduledTask;