This is an automated email from the ASF dual-hosted git repository. hangxiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5f5d5022c03b59f943e357c0c8028325bf12e8c0 Author: Hangxiang Yu <master...@gmail.com> AuthorDate: Mon Apr 15 23:16:43 2024 +0800 [FLINK-35047][state] Introduce ResourceContainer for ForStStateBackend --- .../state/forst/ForStMemoryConfiguration.java | 248 +++++++++++ .../state/forst/ForStMemoryControllerUtils.java | 195 +++++++++ .../flink/state/forst/ForStResourceContainer.java | 468 +++++++++++++++++++++ .../flink/state/forst/ForStSharedResources.java | 73 ++++ .../state/forst/ForStSharedResourcesFactory.java | 185 ++++++++ .../forst/ForStMemoryControllerUtilsTest.java | 129 ++++++ .../state/forst/ForStResourceContainerTest.java | 297 +++++++++++++ 7 files changed, 1595 insertions(+) diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMemoryConfiguration.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMemoryConfiguration.java new file mode 100644 index 00000000000..558f48cb0a1 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMemoryConfiguration.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.io.Serializable; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** The settings regarding ForSt memory usage. */ +public final class ForStMemoryConfiguration implements Serializable { + + private static final long serialVersionUID = 1L; + + /** Flag whether to use the managed memory budget for ForSt. Null is not set. */ + @Nullable private Boolean useManagedMemory; + + /** The total memory for all ForSt instances at this slot. Null is not set. */ + @Nullable private MemorySize fixedMemoryPerSlot; + + /** + * The maximum fraction of the total shared memory consumed by the write buffers. Null if not + * set. + */ + @Nullable private Double writeBufferRatio; + + /** + * The high priority pool ratio in the shared cache, used for index & filter blocks. Null if not + * set. + */ + @Nullable private Double highPriorityPoolRatio; + + /** Flag whether to use partition index/filters. Null if not set. */ + @Nullable private Boolean usePartitionedIndexFilters; + + // ------------------------------------------------------------------------ + + /** + * Configures ForSt to use the managed memory of a slot. See {@link + * ForStOptions#USE_MANAGED_MEMORY} for details. + */ + public void setUseManagedMemory(boolean useManagedMemory) { + this.useManagedMemory = useManagedMemory; + } + + /** + * Configures ForSt to use a fixed amount of memory shared between all instances (operators) in + * a slot. See {@link ForStOptions#FIX_PER_SLOT_MEMORY_SIZE} for details. + */ + public void setFixedMemoryPerSlot(MemorySize fixedMemoryPerSlot) { + checkArgument( + fixedMemoryPerSlot == null || fixedMemoryPerSlot.getBytes() > 0, + "Total memory per slot must be > 0"); + + this.fixedMemoryPerSlot = fixedMemoryPerSlot; + } + + /** + * Configures ForSt to use a fixed amount of memory shared between all instances (operators) in + * a slot. See {@link #setFixedMemoryPerSlot(MemorySize)} for details. + */ + public void setFixedMemoryPerSlot(String totalMemoryPerSlotStr) { + setFixedMemoryPerSlot(MemorySize.parse(totalMemoryPerSlotStr)); + } + + /** + * Sets the fraction of the total memory to be used for write buffers. This only has an effect + * is either {@link #setUseManagedMemory(boolean)} or {@link #setFixedMemoryPerSlot(MemorySize)} + * are set. + * + * <p>See {@link ForStOptions#WRITE_BUFFER_RATIO} for details. + */ + public void setWriteBufferRatio(double writeBufferRatio) { + Preconditions.checkArgument( + writeBufferRatio > 0 && writeBufferRatio < 1.0, + "Write Buffer ratio %s must be in (0, 1)", + writeBufferRatio); + this.writeBufferRatio = writeBufferRatio; + } + + /** + * Sets the fraction of the total memory to be used for high priority blocks like indexes, + * dictionaries, etc. This only has an effect is either {@link #setUseManagedMemory(boolean)} or + * {@link #setFixedMemoryPerSlot(MemorySize)} are set. + * + * <p>See {@link ForStOptions#HIGH_PRIORITY_POOL_RATIO} for details. + */ + public void setHighPriorityPoolRatio(double highPriorityPoolRatio) { + Preconditions.checkArgument( + highPriorityPoolRatio > 0 && highPriorityPoolRatio < 1.0, + "High priority pool ratio %s must be in (0, 1)", + highPriorityPoolRatio); + this.highPriorityPoolRatio = highPriorityPoolRatio; + } + + /** + * Gets whether the state backend is configured to use the managed memory of a slot for ForSt. + * See {@link ForStOptions#USE_MANAGED_MEMORY} for details. + */ + public boolean isUsingManagedMemory() { + return useManagedMemory != null + ? useManagedMemory + : ForStOptions.USE_MANAGED_MEMORY.defaultValue(); + } + + /** + * Gets whether the state backend is configured to use a fixed amount of memory shared between + * all ForSt instances (in all tasks and operators) of a slot. See {@link + * ForStOptions#FIX_PER_SLOT_MEMORY_SIZE} for details. + */ + public boolean isUsingFixedMemoryPerSlot() { + return fixedMemoryPerSlot != null; + } + + /** + * Gets the fixed amount of memory to be shared between all RocksDB instances (in all tasks and + * operators) of a slot. Null is not configured. See {@link ForStOptions#USE_MANAGED_MEMORY} for + * details. + */ + @Nullable + public MemorySize getFixedMemoryPerSlot() { + return fixedMemoryPerSlot; + } + + /** + * Gets the fraction of the total memory to be used for write buffers. This only has an effect + * is either {@link #setUseManagedMemory(boolean)} or {@link #setFixedMemoryPerSlot(MemorySize)} + * are set. + * + * <p>See {@link ForStOptions#WRITE_BUFFER_RATIO} for details. + */ + public double getWriteBufferRatio() { + return writeBufferRatio != null + ? writeBufferRatio + : ForStOptions.WRITE_BUFFER_RATIO.defaultValue(); + } + + /** + * Gets the fraction of the total memory to be used for high priority blocks like indexes, + * dictionaries, etc. This only has an effect is either {@link #setUseManagedMemory(boolean)} or + * {@link #setFixedMemoryPerSlot(MemorySize)} are set. + * + * <p>See {@link ForStOptions#HIGH_PRIORITY_POOL_RATIO} for details. + */ + public double getHighPriorityPoolRatio() { + return highPriorityPoolRatio != null + ? highPriorityPoolRatio + : ForStOptions.HIGH_PRIORITY_POOL_RATIO.defaultValue(); + } + + /** + * Gets whether the state backend is configured to use partitioned index/filters for ForSt. + * + * <p>See {@link ForStOptions#USE_PARTITIONED_INDEX_FILTERS} for details. + */ + public Boolean isUsingPartitionedIndexFilters() { + return usePartitionedIndexFilters != null + ? usePartitionedIndexFilters + : ForStOptions.USE_PARTITIONED_INDEX_FILTERS.defaultValue(); + } + + // ------------------------------------------------------------------------ + + /** Validates if the configured options are valid with respect to one another. */ + public void validate() { + // As FLINK-15512 introduce a new mechanism to calculate the cache capacity, + // the relationship of write_buffer_manager_capacity and cache_capacity has changed to: + // write_buffer_manager_capacity / cache_capacity = 2 * writeBufferRatio / (3 - + // writeBufferRatio) + // we should ensure the sum of write buffer manager capacity and high priority pool less + // than cache capacity. + // TODO change the formula once FLINK-15532 resolved. + if (writeBufferRatio != null + && highPriorityPoolRatio != null + && 2 * writeBufferRatio / (3 - writeBufferRatio) + highPriorityPoolRatio >= 1.0) { + throw new IllegalArgumentException( + String.format( + "Invalid configuration: writeBufferRatio %s with highPriPoolRatio %s", + writeBufferRatio, highPriorityPoolRatio)); + } + } + + // ------------------------------------------------------------------------ + + /** + * Derives a ForStMemoryConfiguration from another object and a configuration. The values set on + * the other object take precedence, and the values from the configuration are used if no values + * are set on the other config object. + */ + public static ForStMemoryConfiguration fromOtherAndConfiguration( + ForStMemoryConfiguration other, ReadableConfig config) { + + final ForStMemoryConfiguration newConfig = new ForStMemoryConfiguration(); + + newConfig.useManagedMemory = + other.useManagedMemory != null + ? other.useManagedMemory + : config.get(ForStOptions.USE_MANAGED_MEMORY); + + newConfig.fixedMemoryPerSlot = + other.fixedMemoryPerSlot != null + ? other.fixedMemoryPerSlot + : config.get(ForStOptions.FIX_PER_SLOT_MEMORY_SIZE); + + newConfig.writeBufferRatio = + other.writeBufferRatio != null + ? other.writeBufferRatio + : config.get(ForStOptions.WRITE_BUFFER_RATIO); + + newConfig.highPriorityPoolRatio = + other.highPriorityPoolRatio != null + ? other.highPriorityPoolRatio + : config.get(ForStOptions.HIGH_PRIORITY_POOL_RATIO); + + newConfig.usePartitionedIndexFilters = + other.usePartitionedIndexFilters != null + ? other.usePartitionedIndexFilters + : config.get(ForStOptions.USE_PARTITIONED_INDEX_FILTERS); + + return newConfig; + } + + public static ForStMemoryConfiguration fromConfiguration(Configuration configuration) { + return fromOtherAndConfiguration(new ForStMemoryConfiguration(), configuration); + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMemoryControllerUtils.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMemoryControllerUtils.java new file mode 100644 index 00000000000..670c991cd60 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMemoryControllerUtils.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.rocksdb.Cache; +import org.rocksdb.LRUCache; +import org.rocksdb.WriteBufferManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; + +/** + * Utils to create {@link Cache} and {@link WriteBufferManager} which are used to control total + * memory usage of ForSt. + */ +public class ForStMemoryControllerUtils { + private static final Logger LOG = LoggerFactory.getLogger(ForStMemoryControllerUtils.class); + + /** + * Allocate memory controllable ForSt shared resources. + * + * @param totalMemorySize The total memory limit size. + * @param writeBufferRatio The ratio of total memory which is occupied by write buffer manager. + * @param highPriorityPoolRatio The high priority pool ratio of cache. + * @param factory creates Write Buffer Manager and Bock Cache + * @return memory controllable RocksDB shared resources. + */ + public static ForStSharedResources allocateForStSharedResources( + long totalMemorySize, + double writeBufferRatio, + double highPriorityPoolRatio, + boolean usingPartitionedIndexFilters, + ForStMemoryFactory factory) { + + long calculatedCacheCapacity = + ForStMemoryControllerUtils.calculateActualCacheCapacity( + totalMemorySize, writeBufferRatio); + final Cache cache = factory.createCache(calculatedCacheCapacity, highPriorityPoolRatio); + + long writeBufferManagerCapacity = + ForStMemoryControllerUtils.calculateWriteBufferManagerCapacity( + totalMemorySize, writeBufferRatio); + final WriteBufferManager wbm = + factory.createWriteBufferManager(writeBufferManagerCapacity, cache); + + LOG.debug( + "Allocated ForSt shared resources, calculatedCacheCapacity: {}, highPriorityPoolRatio: {}, writeBufferManagerCapacity: {}, usingPartitionedIndexFilters: {}", + calculatedCacheCapacity, + highPriorityPoolRatio, + writeBufferManagerCapacity, + usingPartitionedIndexFilters); + return new ForStSharedResources( + cache, wbm, writeBufferManagerCapacity, usingPartitionedIndexFilters); + } + + /** + * Calculate the actual memory capacity of cache, which would be shared among ForSt instance(s). + * We introduce this method because: a) We cannot create a strict capacity limit cache util + * FLINK-15532 resolved. b) Regardless of the memory usage of blocks pinned by ForSt iterators, + * which is difficult to calculate and only happened when we iterator entries in MapState, the + * overuse of memory is mainly occupied by at most half of the write buffer usage. (see <a + * href="https://github.com/dataArtisans/frocksdb/blob/958f191d3f7276ae59b270f9db8390034d549ee0/include/rocksdb/write_buffer_manager.h#L51">the + * flush implementation of write buffer manager</a>). Thus, we have four equations below: + * write_buffer_manager_memory = 1.5 * write_buffer_manager_capacity write_buffer_manager_memory + * = total_memory_size * write_buffer_ratio write_buffer_manager_memory + other_part = + * total_memory_size write_buffer_manager_capacity + other_part = cache_capacity And we would + * deduce the formula: cache_capacity = (3 - write_buffer_ratio) * total_memory_size / 3 + * write_buffer_manager_capacity = 2 * total_memory_size * write_buffer_ratio / 3 + * + * @param totalMemorySize Total off-heap memory size reserved for ForSt instance(s). + * @param writeBufferRatio The ratio of total memory size which would be reserved for write + * buffer manager and its over-capacity part. + * @return The actual calculated cache capacity. + */ + @VisibleForTesting + public static long calculateActualCacheCapacity(long totalMemorySize, double writeBufferRatio) { + return (long) ((3 - writeBufferRatio) * totalMemorySize / 3); + } + + /** + * Calculate the actual memory capacity of write buffer manager, which would be shared among + * ForSt instance(s). The formula to use here could refer to the doc of {@link + * #calculateActualCacheCapacity(long, double)}. + * + * @param totalMemorySize Total off-heap memory size reserved for ForSt instance(s). + * @param writeBufferRatio The ratio of total memory size which would be reserved for write + * buffer manager and its over-capacity part. + * @return The actual calculated write buffer manager capacity. + */ + @VisibleForTesting + static long calculateWriteBufferManagerCapacity(long totalMemorySize, double writeBufferRatio) { + return (long) (2 * totalMemorySize * writeBufferRatio / 3); + } + + @VisibleForTesting + static Cache createCache(long cacheCapacity, double highPriorityPoolRatio) { + // TODO use strict capacity limit until FLINK-15532 resolved + return new LRUCache(cacheCapacity, -1, false, highPriorityPoolRatio); + } + + @VisibleForTesting + static WriteBufferManager createWriteBufferManager( + long writeBufferManagerCapacity, Cache cache) { + return new WriteBufferManager(writeBufferManagerCapacity, cache); + } + + /** + * Calculate the default arena block size as ForSt calculates it in <a + * href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/column_family.cc#L196-L201"> + * here</a>. + * + * @return the default arena block size + * @param writeBufferSize the write buffer size (bytes) + */ + static long calculateForStDefaultArenaBlockSize(long writeBufferSize) { + long arenaBlockSize = writeBufferSize / 8; + + // Align up to 4k + final long align = 4 * 1024; + return ((arenaBlockSize + align - 1) / align) * align; + } + + /** + * Calculate {@code mutable_limit_} as ForSt calculates it in <a + * href="https://github.com/dataArtisans/frocksdb/blob/FRocksDB-5.17.2/memtable/write_buffer_manager.cc#L54"> + * here</a>. + * + * @param bufferSize write buffer size + * @return mutableLimit + */ + static long calculateForStMutableLimit(long bufferSize) { + return bufferSize * 7 / 8; + } + + /** + * ForSt starts flushing the active memtable constantly in the case when the arena block size is + * greater than mutable limit (as calculated in {@link #calculateForStMutableLimit(long)}). + * + * <p>This happens because in such a case the check <a + * href="https://github.com/dataArtisans/frocksdb/blob/958f191d3f7276ae59b270f9db8390034d549ee0/include/rocksdb/write_buffer_manager.h#L47"> + * here</a> is always true. + * + * <p>This method checks that arena block size is smaller than mutable limit. + * + * @param arenaBlockSize Arena block size + * @param mutableLimit mutable limit + * @return whether arena block size is sensible + */ + @VisibleForTesting + static boolean validateArenaBlockSize(long arenaBlockSize, long mutableLimit) { + return arenaBlockSize <= mutableLimit; + } + + /** Factory for Write Buffer Manager and Bock Cache. */ + public interface ForStMemoryFactory extends Serializable { + Cache createCache(long cacheCapacity, double highPriorityPoolRatio); + + WriteBufferManager createWriteBufferManager(long writeBufferManagerCapacity, Cache cache); + + ForStMemoryFactory DEFAULT = + new ForStMemoryFactory() { + @Override + public Cache createCache(long cacheCapacity, double highPriorityPoolRatio) { + return ForStMemoryControllerUtils.createCache( + cacheCapacity, highPriorityPoolRatio); + } + + @Override + public WriteBufferManager createWriteBufferManager( + long writeBufferManagerCapacity, Cache cache) { + return ForStMemoryControllerUtils.createWriteBufferManager( + writeBufferManagerCapacity, cache); + } + }; + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java new file mode 100644 index 00000000000..2f54522d1c7 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.runtime.memory.OpaqueMemoryResource; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.Preconditions; + +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.BloomFilter; +import org.rocksdb.Cache; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.Filter; +import org.rocksdb.IndexType; +import org.rocksdb.PlainTableConfig; +import org.rocksdb.ReadOptions; +import org.rocksdb.Statistics; +import org.rocksdb.TableFormatConfig; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; + +/** + * The container for ForSt resources, including option factory and shared resource among instances. + * + * <p>This should be the only entrance for ForStStateBackend to get ForSt options, and should be + * properly (and necessarily) closed to prevent resource leak. + */ +public final class ForStResourceContainer implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(ForStResourceContainer.class); + + private static final String FORST_RELOCATE_LOG_SUFFIX = "_LOG"; + + // the filename length limit is 255 on most operating systems + private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 - FORST_RELOCATE_LOG_SUFFIX.length(); + + private static final String LOCAL_DB_DIR_STRING = "db"; + + @Nullable private final File localBasePath; + + @Nullable private final File localForStPath; + + /** The configurations from file. */ + private final ReadableConfig configuration; + + /** The options factory to create the ForSt options. */ + @Nullable private final ForStOptionsFactory optionsFactory; + + /** + * The shared resource among ForSt instances. This resource is not part of the 'handlesToClose', + * because the handles to close are closed quietly, whereas for this one, we want exceptions to + * be reported. + */ + @Nullable private final OpaqueMemoryResource<ForStSharedResources> sharedResources; + + private final boolean enableStatistics; + + /** The handles to be closed when the container is closed. */ + private final ArrayList<AutoCloseable> handlesToClose; + + @Nullable private Path relocatedDbLogBaseDir; + + @VisibleForTesting + public ForStResourceContainer() { + this(new Configuration(), null, null, null, false); + } + + @VisibleForTesting + public ForStResourceContainer(@Nullable ForStOptionsFactory optionsFactory) { + this(new Configuration(), optionsFactory, null, null, false); + } + + @VisibleForTesting + public ForStResourceContainer( + @Nullable ForStOptionsFactory optionsFactory, + @Nullable OpaqueMemoryResource<ForStSharedResources> sharedResources) { + this(new Configuration(), optionsFactory, sharedResources, null, false); + } + + public ForStResourceContainer( + ReadableConfig configuration, + @Nullable ForStOptionsFactory optionsFactory, + @Nullable OpaqueMemoryResource<ForStSharedResources> sharedResources, + @Nullable File localBasePath, + boolean enableStatistics) { + + this.configuration = configuration; + this.optionsFactory = optionsFactory; + this.sharedResources = sharedResources; + + this.localBasePath = localBasePath; + this.localForStPath = + localBasePath != null ? new File(localBasePath, LOCAL_DB_DIR_STRING) : null; + + this.enableStatistics = enableStatistics; + this.handlesToClose = new ArrayList<>(); + } + + /** Gets the ForSt {@link DBOptions} to be used for ForSt instances. */ + public DBOptions getDbOptions() { + // initial options from common profile + DBOptions opt = createBaseCommonDBOptions(); + handlesToClose.add(opt); + + // load configurable options on top of pre-defined profile + setDBOptionsFromConfigurableOptions(opt); + + // add user-defined options factory, if specified + if (optionsFactory != null) { + opt = optionsFactory.createDBOptions(opt, handlesToClose); + } + + // add necessary default options + opt = opt.setCreateIfMissing(true).setAvoidFlushDuringShutdown(true); + + // if sharedResources is non-null, use the write buffer manager from it. + if (sharedResources != null) { + opt.setWriteBufferManager(sharedResources.getResourceHandle().getWriteBufferManager()); + } + + if (enableStatistics) { + Statistics statistics = new Statistics(); + opt.setStatistics(statistics); + handlesToClose.add(statistics); + } + + return opt; + } + + /** Gets the ForSt {@link ColumnFamilyOptions} to be used for all ForSt instances. */ + public ColumnFamilyOptions getColumnOptions() { + // initial options from common profile + ColumnFamilyOptions opt = createBaseCommonColumnOptions(); + handlesToClose.add(opt); + + // load configurable options on top of pre-defined profile + setColumnFamilyOptionsFromConfigurableOptions(opt, handlesToClose); + + // add user-defined options, if specified + if (optionsFactory != null) { + opt = optionsFactory.createColumnOptions(opt, handlesToClose); + } + + // if sharedResources is non-null, use the block cache from it and + // set necessary options for performance consideration with memory control + if (sharedResources != null) { + final ForStSharedResources rocksResources = sharedResources.getResourceHandle(); + final Cache blockCache = rocksResources.getCache(); + TableFormatConfig tableFormatConfig = opt.tableFormatConfig(); + BlockBasedTableConfig blockBasedTableConfig; + if (tableFormatConfig == null) { + blockBasedTableConfig = new BlockBasedTableConfig(); + } else { + Preconditions.checkArgument( + tableFormatConfig instanceof BlockBasedTableConfig, + "We currently only support BlockBasedTableConfig When bounding total memory."); + blockBasedTableConfig = (BlockBasedTableConfig) tableFormatConfig; + } + if (rocksResources.isUsingPartitionedIndexFilters() + && overwriteFilterIfExist(blockBasedTableConfig)) { + blockBasedTableConfig.setIndexType(IndexType.kTwoLevelIndexSearch); + blockBasedTableConfig.setPartitionFilters(true); + blockBasedTableConfig.setPinTopLevelIndexAndFilter(true); + } + blockBasedTableConfig.setBlockCache(blockCache); + blockBasedTableConfig.setCacheIndexAndFilterBlocks(true); + blockBasedTableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true); + blockBasedTableConfig.setPinL0FilterAndIndexBlocksInCache(true); + opt.setTableFormatConfig(blockBasedTableConfig); + } + + return opt; + } + + /** Gets the ForSt {@link WriteOptions} to be used for write operations. */ + public WriteOptions getWriteOptions() { + // Disable WAL by default + WriteOptions opt = new WriteOptions().setDisableWAL(true); + handlesToClose.add(opt); + + // add user-defined options factory, if specified + if (optionsFactory != null) { + opt = optionsFactory.createWriteOptions(opt, handlesToClose); + } + + return opt; + } + + /** Gets the ForSt {@link ReadOptions} to be used for read operations. */ + public ReadOptions getReadOptions() { + ReadOptions opt = new ReadOptions(); + handlesToClose.add(opt); + + // add user-defined options factory, if specified + if (optionsFactory != null) { + opt = optionsFactory.createReadOptions(opt, handlesToClose); + } + + return opt; + } + + @Nullable + public File getLocalBasePath() { + return localBasePath; + } + + @Nullable + public File getLocalForStPath() { + return localForStPath; + } + + ForStNativeMetricOptions getMemoryWatcherOptions( + ForStNativeMetricOptions defaultMetricOptions) { + return optionsFactory == null + ? defaultMetricOptions + : optionsFactory.createNativeMetricsOptions(defaultMetricOptions); + } + + @Override + public void close() throws Exception { + handlesToClose.forEach(IOUtils::closeQuietly); + handlesToClose.clear(); + + if (sharedResources != null) { + sharedResources.close(); + } + cleanRelocatedDbLogs(); + } + + /** + * Overwrite configured {@link Filter} if enable partitioned filter. Partitioned filter only + * worked in full bloom filter, not blocked based. + */ + private boolean overwriteFilterIfExist(BlockBasedTableConfig blockBasedTableConfig) { + if (blockBasedTableConfig.filterPolicy() != null) { + // TODO Can get filter's config in the future ForSt version, and build new filter use + // existing config. + BloomFilter newFilter = new BloomFilter(10, false); + LOG.info( + "Existing filter has been overwritten to full filters since partitioned index filters is enabled."); + blockBasedTableConfig.setFilterPolicy(newFilter); + handlesToClose.add(newFilter); + } + return true; + } + + /** Create a {@link DBOptions} for ForSt, including some common settings. */ + DBOptions createBaseCommonDBOptions() { + return new DBOptions().setUseFsync(false).setStatsDumpPeriodSec(0); + } + + /** Create a {@link ColumnFamilyOptions} for ForSt, including some common settings. */ + ColumnFamilyOptions createBaseCommonColumnOptions() { + return new ColumnFamilyOptions(); + } + + /** + * Get a value for option from pre-defined option and configurable option settings. The priority + * relationship is as below. + * + * <p>Configured value > pre-defined value > default value. + * + * @param option the wanted option + * @param <T> the value type + * @return the final value for the option according to the priority above. + */ + @Nullable + private <T> T internalGetOption(ConfigOption<T> option) { + return configuration.getOptional(option).orElseGet(option::defaultValue); + } + + @SuppressWarnings("ConstantConditions") + private DBOptions setDBOptionsFromConfigurableOptions(DBOptions currentOptions) { + + currentOptions.setMaxBackgroundJobs( + internalGetOption(ForStConfigurableOptions.MAX_BACKGROUND_THREADS)); + + currentOptions.setMaxOpenFiles(internalGetOption(ForStConfigurableOptions.MAX_OPEN_FILES)); + + currentOptions.setInfoLogLevel(internalGetOption(ForStConfigurableOptions.LOG_LEVEL)); + + String logDir = internalGetOption(ForStConfigurableOptions.LOG_DIR); + if (logDir == null || logDir.isEmpty()) { + if (localForStPath == null + || localForStPath.getAbsolutePath().length() <= INSTANCE_PATH_LENGTH_LIMIT) { + relocateDefaultDbLogDir(currentOptions); + } else { + // disable log relocate when instance path length exceeds limit to prevent ForSt + // log file creation failure, details in FLINK-31743 + LOG.warn( + "ForSt local path length exceeds limit : {}, disable log relocate.", + localForStPath); + } + } else { + currentOptions.setDbLogDir(logDir); + } + + currentOptions.setMaxLogFileSize( + internalGetOption(ForStConfigurableOptions.LOG_MAX_FILE_SIZE).getBytes()); + + currentOptions.setKeepLogFileNum(internalGetOption(ForStConfigurableOptions.LOG_FILE_NUM)); + + return currentOptions; + } + + @SuppressWarnings("ConstantConditions") + private ColumnFamilyOptions setColumnFamilyOptionsFromConfigurableOptions( + ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) { + + currentOptions.setCompactionStyle( + internalGetOption(ForStConfigurableOptions.COMPACTION_STYLE)); + + currentOptions.setCompressionPerLevel( + internalGetOption(ForStConfigurableOptions.COMPRESSION_PER_LEVEL)); + + currentOptions.setLevelCompactionDynamicLevelBytes( + internalGetOption(ForStConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE)); + + currentOptions.setTargetFileSizeBase( + internalGetOption(ForStConfigurableOptions.TARGET_FILE_SIZE_BASE).getBytes()); + + currentOptions.setMaxBytesForLevelBase( + internalGetOption(ForStConfigurableOptions.MAX_SIZE_LEVEL_BASE).getBytes()); + + currentOptions.setWriteBufferSize( + internalGetOption(ForStConfigurableOptions.WRITE_BUFFER_SIZE).getBytes()); + + currentOptions.setMaxWriteBufferNumber( + internalGetOption(ForStConfigurableOptions.MAX_WRITE_BUFFER_NUMBER)); + + currentOptions.setMinWriteBufferNumberToMerge( + internalGetOption(ForStConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE)); + + TableFormatConfig tableFormatConfig = currentOptions.tableFormatConfig(); + + BlockBasedTableConfig blockBasedTableConfig; + if (tableFormatConfig == null) { + blockBasedTableConfig = new BlockBasedTableConfig(); + } else { + if (tableFormatConfig instanceof PlainTableConfig) { + // if the table format config is PlainTableConfig, we just return current + // column-family options + return currentOptions; + } else { + blockBasedTableConfig = (BlockBasedTableConfig) tableFormatConfig; + } + } + + blockBasedTableConfig.setBlockSize( + internalGetOption(ForStConfigurableOptions.BLOCK_SIZE).getBytes()); + + blockBasedTableConfig.setMetadataBlockSize( + internalGetOption(ForStConfigurableOptions.METADATA_BLOCK_SIZE).getBytes()); + + blockBasedTableConfig.setBlockCacheSize( + internalGetOption(ForStConfigurableOptions.BLOCK_CACHE_SIZE).getBytes()); + + if (internalGetOption(ForStConfigurableOptions.USE_BLOOM_FILTER)) { + final double bitsPerKey = + internalGetOption(ForStConfigurableOptions.BLOOM_FILTER_BITS_PER_KEY); + final boolean blockBasedMode = + internalGetOption(ForStConfigurableOptions.BLOOM_FILTER_BLOCK_BASED_MODE); + BloomFilter bloomFilter = new BloomFilter(bitsPerKey, blockBasedMode); + handlesToClose.add(bloomFilter); + blockBasedTableConfig.setFilterPolicy(bloomFilter); + } + + return currentOptions.setTableFormatConfig(blockBasedTableConfig); + } + + /** + * Relocates the default log directory of ForSt with the Flink log directory. Finds the Flink + * log directory using log.file Java property that is set during startup. + * + * @param dbOptions The ForSt {@link DBOptions}. + */ + private void relocateDefaultDbLogDir(DBOptions dbOptions) { + String logFilePath = System.getProperty("log.file"); + if (logFilePath != null) { + File logFile = resolveFileLocation(logFilePath); + if (logFile != null && resolveFileLocation(logFile.getParent()) != null) { + String relocatedDbLogDir = logFile.getParent(); + this.relocatedDbLogBaseDir = new File(relocatedDbLogDir).toPath(); + dbOptions.setDbLogDir(relocatedDbLogDir); + } + } + } + + /** + * Verify log file location. + * + * @param logFilePath Path to log file + * @return File or null if not a valid log file + */ + private File resolveFileLocation(String logFilePath) { + File logFile = new File(logFilePath); + return (logFile.exists() && logFile.canRead()) ? logFile : null; + } + + /** Clean all relocated ForSt logs. */ + private void cleanRelocatedDbLogs() { + if (localForStPath != null && relocatedDbLogBaseDir != null) { + LOG.info("Cleaning up relocated ForSt logs: {}.", relocatedDbLogBaseDir); + + String relocatedDbLogPrefix = + resolveRelocatedDbLogPrefix(localForStPath.getAbsolutePath()); + try { + Arrays.stream(FileUtils.listDirectory(relocatedDbLogBaseDir)) + .filter( + path -> + !Files.isDirectory(path) + && path.toFile() + .getName() + .startsWith(relocatedDbLogPrefix)) + .forEach(IOUtils::deleteFileQuietly); + } catch (IOException e) { + LOG.warn("Could not list relocated ForSt log directory: {}", relocatedDbLogBaseDir); + } + } + } + + /** + * Resolve the prefix of ForSt's log file name according to ForSt's log file name rules. + * + * @param instanceForStAbsolutePath The path where the ForSt directory is located. + * @return Resolved ForSt log name prefix. + */ + private String resolveRelocatedDbLogPrefix(String instanceForStAbsolutePath) { + if (!instanceForStAbsolutePath.isEmpty() + && !instanceForStAbsolutePath.matches("^[a-zA-Z0-9\\-._].*")) { + instanceForStAbsolutePath = instanceForStAbsolutePath.substring(1); + } + return instanceForStAbsolutePath.replaceAll("[^a-zA-Z0-9\\-._]", "_") + + FORST_RELOCATE_LOG_SUFFIX; + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStSharedResources.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStSharedResources.java new file mode 100644 index 00000000000..e6451fefda6 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStSharedResources.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.rocksdb.Cache; +import org.rocksdb.WriteBufferManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The set of resources that can be shared by all ForSt instances in a slot. Sharing these resources + * helps ForSt a predictable resource footprint. + */ +final class ForStSharedResources implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(ForStSharedResources.class); + + private final Cache cache; + + private final WriteBufferManager writeBufferManager; + private final long writeBufferManagerCapacity; + + private final boolean usingPartitionedIndexFilters; + + ForStSharedResources( + Cache cache, + WriteBufferManager writeBufferManager, + long writeBufferManagerCapacity, + boolean usingPartitionedIndexFilters) { + this.cache = cache; + this.writeBufferManager = writeBufferManager; + this.writeBufferManagerCapacity = writeBufferManagerCapacity; + this.usingPartitionedIndexFilters = usingPartitionedIndexFilters; + } + + public Cache getCache() { + return cache; + } + + public WriteBufferManager getWriteBufferManager() { + return writeBufferManager; + } + + public long getWriteBufferManagerCapacity() { + return writeBufferManagerCapacity; + } + + public boolean isUsingPartitionedIndexFilters() { + return usingPartitionedIndexFilters; + } + + @Override + public void close() { + LOG.debug("Closing ForStSharedResources"); + writeBufferManager.close(); + cache.close(); + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStSharedResourcesFactory.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStSharedResourcesFactory.java new file mode 100644 index 00000000000..beb98e71509 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStSharedResourcesFactory.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.memory.OpaqueMemoryResource; +import org.apache.flink.runtime.memory.SharedResources; +import org.apache.flink.util.function.LongFunctionWithException; +import org.apache.flink.util.function.ThrowingRunnable; + +import org.slf4j.Logger; + +import javax.annotation.Nullable; + +import static org.apache.flink.state.forst.ForStOptions.FIX_PER_TM_MEMORY_SIZE; + +/** + * A factory of {@link ForStSharedResources}. Encapsulates memory share scope (e.g. TM, Slot) and + * lifecycle (managed/unmanaged). + */ +enum ForStSharedResourcesFactory { + /** Memory allocated per Slot (shared across slot tasks), managed by Flink. */ + SLOT_SHARED_MANAGED(false, MemoryShareScope.SLOT) { + @Override + protected OpaqueMemoryResource<ForStSharedResources> createInternal( + ForStMemoryConfiguration jobMemoryConfig, + String resourceId, + Environment env, + double memoryFraction, + LongFunctionWithException<ForStSharedResources, Exception> allocator) + throws Exception { + return env.getMemoryManager() + .getSharedMemoryResourceForManagedMemory(resourceId, allocator, memoryFraction); + } + }, + /** Memory allocated per Slot (shared across slot tasks), unmanaged. */ + SLOT_SHARED_UNMANAGED(false, MemoryShareScope.SLOT) { + @Override + protected OpaqueMemoryResource<ForStSharedResources> createInternal( + ForStMemoryConfiguration jobMemoryConfig, + String resourceId, + Environment env, + double memoryFraction, + LongFunctionWithException<ForStSharedResources, Exception> allocator) + throws Exception { + return env.getMemoryManager() + .getExternalSharedMemoryResource( + resourceId, + allocator, + jobMemoryConfig.getFixedMemoryPerSlot().getBytes()); + } + }, + /** Memory allocated per TM (shared across all tasks), unmanaged. */ + TM_SHARED_UNMANAGED(false, MemoryShareScope.TM) { + @Override + protected OpaqueMemoryResource<ForStSharedResources> createInternal( + ForStMemoryConfiguration jobMemoryConfig, + String resourceId, + Environment env, + double memoryFraction, + LongFunctionWithException<ForStSharedResources, Exception> allocator) + throws Exception { + + SharedResources sharedResources = env.getSharedResources(); + Object leaseHolder = new Object(); + SharedResources.ResourceAndSize<ForStSharedResources> resource = + sharedResources.getOrAllocateSharedResource( + resourceId, leaseHolder, allocator, getTmSharedMemorySize(env)); + ThrowingRunnable<Exception> disposer = + () -> sharedResources.release(resourceId, leaseHolder, unused -> {}); + + return new OpaqueMemoryResource<>(resource.resourceHandle(), resource.size(), disposer); + } + }; + + private final boolean managed; + private final MemoryShareScope shareScope; + + ForStSharedResourcesFactory(boolean managed, MemoryShareScope shareScope) { + this.managed = managed; + this.shareScope = shareScope; + } + + @Nullable + public static ForStSharedResourcesFactory from( + ForStMemoryConfiguration jobMemoryConfig, Environment env) { + if (jobMemoryConfig.isUsingFixedMemoryPerSlot()) { + return ForStSharedResourcesFactory.SLOT_SHARED_UNMANAGED; + } else if (jobMemoryConfig.isUsingManagedMemory()) { + return ForStSharedResourcesFactory.SLOT_SHARED_MANAGED; + } else if (getTmSharedMemorySize(env) > 0) { + return ForStSharedResourcesFactory.TM_SHARED_UNMANAGED; + } else { + // not shared and not managed - allocate per column family + return null; + } + } + + public final OpaqueMemoryResource<ForStSharedResources> create( + ForStMemoryConfiguration jobMemoryConfig, + Environment env, + double memoryFraction, + Logger logger, + ForStMemoryControllerUtils.ForStMemoryFactory forStMemoryFactory) + throws Exception { + logger.info( + "Getting shared memory for ForSt: shareScope={}, managed={}", shareScope, managed); + return createInternal( + jobMemoryConfig, + managed ? MANAGED_MEMORY_RESOURCE_ID : UNMANAGED_MEMORY_RESOURCE_ID, + env, + memoryFraction, + createAllocator( + shareScope.getConfiguration(jobMemoryConfig, env), forStMemoryFactory)); + } + + protected abstract OpaqueMemoryResource<ForStSharedResources> createInternal( + ForStMemoryConfiguration jobMemoryConfig, + String resourceId, + Environment env, + double memoryFraction, + LongFunctionWithException<ForStSharedResources, Exception> allocator) + throws Exception; + + private static long getTmSharedMemorySize(Environment env) { + return env.getTaskManagerInfo() + .getConfiguration() + .getOptional(FIX_PER_TM_MEMORY_SIZE) + .orElse(MemorySize.ZERO) + .getBytes(); + } + + private static final String MANAGED_MEMORY_RESOURCE_ID = "state-forst-managed-memory"; + + private static final String UNMANAGED_MEMORY_RESOURCE_ID = "state-forst-fixed-slot-memory"; + + private static LongFunctionWithException<ForStSharedResources, Exception> createAllocator( + ForStMemoryConfiguration config, + ForStMemoryControllerUtils.ForStMemoryFactory forStMemoryFactory) { + return size -> + ForStMemoryControllerUtils.allocateForStSharedResources( + size, + config.getWriteBufferRatio(), + config.getHighPriorityPoolRatio(), + config.isUsingPartitionedIndexFilters(), + forStMemoryFactory); + } +} + +enum MemoryShareScope { + TM { + @Override + public ForStMemoryConfiguration getConfiguration( + ForStMemoryConfiguration jobMemoryConfig, Environment env) { + return ForStMemoryConfiguration.fromConfiguration( + env.getTaskManagerInfo().getConfiguration()); + } + }, + SLOT { + @Override + public ForStMemoryConfiguration getConfiguration( + ForStMemoryConfiguration jobMemoryConfig, Environment env) { + return jobMemoryConfig; + } + }; + + public abstract ForStMemoryConfiguration getConfiguration( + ForStMemoryConfiguration jobMemoryConfig, Environment env); +} diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStMemoryControllerUtilsTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStMemoryControllerUtilsTest.java new file mode 100644 index 00000000000..ae9a1ae6391 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStMemoryControllerUtilsTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.Cache; +import org.rocksdb.NativeLibraryLoader; +import org.rocksdb.WriteBufferManager; + +import java.io.IOException; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** Tests to guard {@link ForStMemoryControllerUtils}. */ +public class ForStMemoryControllerUtilsTest { + + @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Before + public void ensureRocksDbNativeLibraryLoaded() throws IOException { + NativeLibraryLoader.getInstance() + .loadLibrary(temporaryFolder.newFolder().getAbsolutePath()); + } + + @Test + public void testCreateSharedResourcesWithExpectedCapacity() { + long totalMemorySize = 2048L; + double writeBufferRatio = 0.5; + double highPriPoolRatio = 0.1; + TestingForStMemoryFactory factory = new TestingForStMemoryFactory(); + ForStSharedResources forStSharedResources = + ForStMemoryControllerUtils.allocateForStSharedResources( + totalMemorySize, writeBufferRatio, highPriPoolRatio, false, factory); + long expectedCacheCapacity = + ForStMemoryControllerUtils.calculateActualCacheCapacity( + totalMemorySize, writeBufferRatio); + long expectedWbmCapacity = + ForStMemoryControllerUtils.calculateWriteBufferManagerCapacity( + totalMemorySize, writeBufferRatio); + + assertThat(factory.actualCacheCapacity, is(expectedCacheCapacity)); + assertThat(factory.actualWbmCapacity, is(expectedWbmCapacity)); + assertThat(forStSharedResources.getWriteBufferManagerCapacity(), is(expectedWbmCapacity)); + } + + @Test + public void testCalculateForStDefaultArenaBlockSize() { + final long align = 4 * 1024; + final long writeBufferSize = 64 * 1024 * 1024; + final long expectArenaBlockSize = writeBufferSize / 8; + + // Normal case test + assertThat( + "Arena block size calculation error for normal case", + ForStMemoryControllerUtils.calculateForStDefaultArenaBlockSize(writeBufferSize), + is(expectArenaBlockSize)); + + // Alignment tests + assertThat( + "Arena block size calculation error for alignment case", + ForStMemoryControllerUtils.calculateForStDefaultArenaBlockSize(writeBufferSize - 1), + is(expectArenaBlockSize)); + assertThat( + "Arena block size calculation error for alignment case2", + ForStMemoryControllerUtils.calculateForStDefaultArenaBlockSize(writeBufferSize + 8), + is(expectArenaBlockSize + align)); + } + + @Test + public void testCalculateForStMutableLimit() { + long bufferSize = 64 * 1024 * 1024; + long limit = bufferSize * 7 / 8; + assertThat(ForStMemoryControllerUtils.calculateForStMutableLimit(bufferSize), is(limit)); + } + + @Test + public void testValidateArenaBlockSize() { + long arenaBlockSize = 8 * 1024 * 1024; + assertFalse( + ForStMemoryControllerUtils.validateArenaBlockSize( + arenaBlockSize, (long) (arenaBlockSize * 0.5))); + assertTrue( + ForStMemoryControllerUtils.validateArenaBlockSize( + arenaBlockSize, (long) (arenaBlockSize * 1.5))); + } + + private static final class TestingForStMemoryFactory + implements ForStMemoryControllerUtils.ForStMemoryFactory { + private Long actualCacheCapacity = null; + private Long actualWbmCapacity = null; + + @Override + public Cache createCache(long cacheCapacity, double highPriorityPoolRatio) { + actualCacheCapacity = cacheCapacity; + return ForStMemoryControllerUtils.ForStMemoryFactory.DEFAULT.createCache( + cacheCapacity, highPriorityPoolRatio); + } + + @Override + public WriteBufferManager createWriteBufferManager( + long writeBufferManagerCapacity, Cache cache) { + actualWbmCapacity = writeBufferManagerCapacity; + return ForStMemoryControllerUtils.ForStMemoryFactory.DEFAULT.createWriteBufferManager( + writeBufferManagerCapacity, cache); + } + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java new file mode 100644 index 00000000000..53c0524308b --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.runtime.memory.OpaqueMemoryResource; +import org.apache.flink.util.function.ThrowingRunnable; + +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.BloomFilter; +import org.rocksdb.Cache; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.IndexType; +import org.rocksdb.LRUCache; +import org.rocksdb.NativeLibraryLoader; +import org.rocksdb.ReadOptions; +import org.rocksdb.TableFormatConfig; +import org.rocksdb.WriteBufferManager; +import org.rocksdb.WriteOptions; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +/** Tests to guard {@link ForStResourceContainer}. */ +public class ForStResourceContainerTest { + + @ClassRule public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); + + @BeforeClass + public static void ensureForStNativeLibraryLoaded() throws IOException { + NativeLibraryLoader.getInstance().loadLibrary(TMP_FOLDER.newFolder().getAbsolutePath()); + } + + // ------------------------------------------------------------------------ + + @Test + public void testFreeDBOptionsAfterClose() throws Exception { + ForStResourceContainer container = new ForStResourceContainer(); + DBOptions dbOptions = container.getDbOptions(); + assertThat(dbOptions.isOwningHandle(), is(true)); + container.close(); + assertThat(dbOptions.isOwningHandle(), is(false)); + } + + @Test + public void testFreeMultipleDBOptionsAfterClose() throws Exception { + ForStResourceContainer container = new ForStResourceContainer(); + final int optionNumber = 20; + ArrayList<DBOptions> dbOptions = new ArrayList<>(optionNumber); + for (int i = 0; i < optionNumber; i++) { + dbOptions.add(container.getDbOptions()); + } + container.close(); + for (DBOptions dbOption : dbOptions) { + assertThat(dbOption.isOwningHandle(), is(false)); + } + } + + /** + * Guard the shared resources will be released after {@link ForStResourceContainer#close()} when + * the {@link ForStResourceContainer} instance is initiated with {@link OpaqueMemoryResource}. + * + * @throws Exception if unexpected error happened. + */ + @Test + public void testSharedResourcesAfterClose() throws Exception { + OpaqueMemoryResource<ForStSharedResources> sharedResources = getSharedResources(); + ForStResourceContainer container = new ForStResourceContainer(null, sharedResources); + container.close(); + ForStSharedResources forStSharedResources = sharedResources.getResourceHandle(); + assertThat(forStSharedResources.getCache().isOwningHandle(), is(false)); + assertThat(forStSharedResources.getWriteBufferManager().isOwningHandle(), is(false)); + } + + /** + * Guard that {@link ForStResourceContainer#getDbOptions()} shares the same {@link + * WriteBufferManager} instance if the {@link ForStResourceContainer} instance is initiated with + * {@link OpaqueMemoryResource}. + * + * @throws Exception if unexpected error happened. + */ + @Test + public void testGetDbOptionsWithSharedResources() throws Exception { + final int optionNumber = 20; + OpaqueMemoryResource<ForStSharedResources> sharedResources = getSharedResources(); + ForStResourceContainer container = new ForStResourceContainer(null, sharedResources); + HashSet<WriteBufferManager> writeBufferManagers = new HashSet<>(); + for (int i = 0; i < optionNumber; i++) { + DBOptions dbOptions = container.getDbOptions(); + WriteBufferManager writeBufferManager = getWriteBufferManager(dbOptions); + writeBufferManagers.add(writeBufferManager); + } + assertThat(writeBufferManagers.size(), is(1)); + assertThat( + writeBufferManagers.iterator().next(), + is(sharedResources.getResourceHandle().getWriteBufferManager())); + container.close(); + } + + /** + * Guard that {@link ForStResourceContainer#getColumnOptions()} shares the same {@link Cache} + * instance if the {@link ForStResourceContainer} instance is initiated with {@link + * OpaqueMemoryResource}. + * + * @throws Exception if unexpected error happened. + */ + @Test + public void testGetColumnFamilyOptionsWithSharedResources() throws Exception { + final int optionNumber = 20; + OpaqueMemoryResource<ForStSharedResources> sharedResources = getSharedResources(); + ForStResourceContainer container = new ForStResourceContainer(null, sharedResources); + HashSet<Cache> caches = new HashSet<>(); + for (int i = 0; i < optionNumber; i++) { + ColumnFamilyOptions columnOptions = container.getColumnOptions(); + Cache cache = getBlockCache(columnOptions); + caches.add(cache); + } + assertThat(caches.size(), is(1)); + assertThat(caches.iterator().next(), is(sharedResources.getResourceHandle().getCache())); + container.close(); + } + + private OpaqueMemoryResource<ForStSharedResources> getSharedResources() { + final long cacheSize = 1024L, writeBufferSize = 512L; + final LRUCache cache = new LRUCache(cacheSize, -1, false, 0.1); + final WriteBufferManager wbm = new WriteBufferManager(writeBufferSize, cache); + ForStSharedResources forStSharedResources = + new ForStSharedResources(cache, wbm, writeBufferSize, false); + return new OpaqueMemoryResource<>( + forStSharedResources, cacheSize, forStSharedResources::close); + } + + private Cache getBlockCache(ColumnFamilyOptions columnOptions) { + BlockBasedTableConfig blockBasedTableConfig = null; + try { + blockBasedTableConfig = (BlockBasedTableConfig) columnOptions.tableFormatConfig(); + } catch (ClassCastException e) { + fail("Table config got from ColumnFamilyOptions is not BlockBasedTableConfig"); + } + Field cacheField = null; + try { + cacheField = BlockBasedTableConfig.class.getDeclaredField("blockCache"); + } catch (NoSuchFieldException e) { + fail("blockCache is not defined"); + } + cacheField.setAccessible(true); + try { + return (Cache) cacheField.get(blockBasedTableConfig); + } catch (IllegalAccessException e) { + fail("Cannot access blockCache field."); + return null; + } + } + + private WriteBufferManager getWriteBufferManager(DBOptions dbOptions) { + + Field writeBufferManagerField = null; + try { + writeBufferManagerField = DBOptions.class.getDeclaredField("writeBufferManager_"); + } catch (NoSuchFieldException e) { + fail("writeBufferManager_ is not defined."); + } + writeBufferManagerField.setAccessible(true); + try { + return (WriteBufferManager) writeBufferManagerField.get(dbOptions); + } catch (IllegalAccessException e) { + fail("Cannot access writeBufferManager_ field."); + return null; + } + } + + @Test + public void testFreeColumnOptionsAfterClose() throws Exception { + ForStResourceContainer container = new ForStResourceContainer(); + ColumnFamilyOptions columnFamilyOptions = container.getColumnOptions(); + assertThat(columnFamilyOptions.isOwningHandle(), is(true)); + container.close(); + assertThat(columnFamilyOptions.isOwningHandle(), is(false)); + } + + @Test + public void testFreeMultipleColumnOptionsAfterClose() throws Exception { + ForStResourceContainer container = new ForStResourceContainer(); + final int optionNumber = 20; + ArrayList<ColumnFamilyOptions> columnFamilyOptions = new ArrayList<>(optionNumber); + for (int i = 0; i < optionNumber; i++) { + columnFamilyOptions.add(container.getColumnOptions()); + } + container.close(); + for (ColumnFamilyOptions columnFamilyOption : columnFamilyOptions) { + assertThat(columnFamilyOption.isOwningHandle(), is(false)); + } + } + + @Test + public void testFreeSharedResourcesAfterClose() throws Exception { + LRUCache cache = new LRUCache(1024L); + WriteBufferManager wbm = new WriteBufferManager(1024L, cache); + ForStSharedResources sharedResources = new ForStSharedResources(cache, wbm, 1024L, false); + final ThrowingRunnable<Exception> disposer = sharedResources::close; + OpaqueMemoryResource<ForStSharedResources> opaqueResource = + new OpaqueMemoryResource<>(sharedResources, 1024L, disposer); + + ForStResourceContainer container = new ForStResourceContainer(null, opaqueResource); + + container.close(); + assertThat(cache.isOwningHandle(), is(false)); + assertThat(wbm.isOwningHandle(), is(false)); + } + + @Test + public void testFreeWriteReadOptionsAfterClose() throws Exception { + ForStResourceContainer container = new ForStResourceContainer(); + WriteOptions writeOptions = container.getWriteOptions(); + ReadOptions readOptions = container.getReadOptions(); + assertThat(writeOptions.isOwningHandle(), is(true)); + assertThat(readOptions.isOwningHandle(), is(true)); + container.close(); + assertThat(writeOptions.isOwningHandle(), is(false)); + assertThat(readOptions.isOwningHandle(), is(false)); + } + + @Test + public void testGetColumnFamilyOptionsWithPartitionedIndex() throws Exception { + LRUCache cache = new LRUCache(1024L); + WriteBufferManager wbm = new WriteBufferManager(1024L, cache); + ForStSharedResources sharedResources = new ForStSharedResources(cache, wbm, 1024L, true); + final ThrowingRunnable<Exception> disposer = sharedResources::close; + OpaqueMemoryResource<ForStSharedResources> opaqueResource = + new OpaqueMemoryResource<>(sharedResources, 1024L, disposer); + BloomFilter blockBasedFilter = new BloomFilter(); + ForStOptionsFactory blockBasedBloomFilterOptionFactory = + new ForStOptionsFactory() { + + @Override + public DBOptions createDBOptions( + DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) { + return currentOptions; + } + + @Override + public ColumnFamilyOptions createColumnOptions( + ColumnFamilyOptions currentOptions, + Collection<AutoCloseable> handlesToClose) { + TableFormatConfig tableFormatConfig = currentOptions.tableFormatConfig(); + BlockBasedTableConfig blockBasedTableConfig = + tableFormatConfig == null + ? new BlockBasedTableConfig() + : (BlockBasedTableConfig) tableFormatConfig; + blockBasedTableConfig.setFilter(blockBasedFilter); + handlesToClose.add(blockBasedFilter); + currentOptions.setTableFormatConfig(blockBasedTableConfig); + return currentOptions; + } + }; + try (ForStResourceContainer container = + new ForStResourceContainer(blockBasedBloomFilterOptionFactory, opaqueResource)) { + ColumnFamilyOptions columnOptions = container.getColumnOptions(); + BlockBasedTableConfig actual = + (BlockBasedTableConfig) columnOptions.tableFormatConfig(); + assertThat(actual.indexType(), is(IndexType.kTwoLevelIndexSearch)); + assertThat(actual.partitionFilters(), is(true)); + assertThat(actual.pinTopLevelIndexAndFilter(), is(true)); + assertThat(actual.filterPolicy(), not(blockBasedFilter)); + } + assertFalse("Block based filter is left unclosed.", blockBasedFilter.isOwningHandle()); + } +}