This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5f5d5022c03b59f943e357c0c8028325bf12e8c0
Author: Hangxiang Yu <master...@gmail.com>
AuthorDate: Mon Apr 15 23:16:43 2024 +0800

    [FLINK-35047][state] Introduce ResourceContainer for ForStStateBackend
---
 .../state/forst/ForStMemoryConfiguration.java      | 248 +++++++++++
 .../state/forst/ForStMemoryControllerUtils.java    | 195 +++++++++
 .../flink/state/forst/ForStResourceContainer.java  | 468 +++++++++++++++++++++
 .../flink/state/forst/ForStSharedResources.java    |  73 ++++
 .../state/forst/ForStSharedResourcesFactory.java   | 185 ++++++++
 .../forst/ForStMemoryControllerUtilsTest.java      | 129 ++++++
 .../state/forst/ForStResourceContainerTest.java    | 297 +++++++++++++
 7 files changed, 1595 insertions(+)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMemoryConfiguration.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMemoryConfiguration.java
new file mode 100644
index 00000000000..558f48cb0a1
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMemoryConfiguration.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The settings regarding ForSt memory usage. */
+public final class ForStMemoryConfiguration implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /** Flag whether to use the managed memory budget for ForSt. Null is not 
set. */
+    @Nullable private Boolean useManagedMemory;
+
+    /** The total memory for all ForSt instances at this slot. Null is not 
set. */
+    @Nullable private MemorySize fixedMemoryPerSlot;
+
+    /**
+     * The maximum fraction of the total shared memory consumed by the write 
buffers. Null if not
+     * set.
+     */
+    @Nullable private Double writeBufferRatio;
+
+    /**
+     * The high priority pool ratio in the shared cache, used for index & 
filter blocks. Null if not
+     * set.
+     */
+    @Nullable private Double highPriorityPoolRatio;
+
+    /** Flag whether to use partition index/filters. Null if not set. */
+    @Nullable private Boolean usePartitionedIndexFilters;
+
+    // ------------------------------------------------------------------------
+
+    /**
+     * Configures ForSt to use the managed memory of a slot. See {@link
+     * ForStOptions#USE_MANAGED_MEMORY} for details.
+     */
+    public void setUseManagedMemory(boolean useManagedMemory) {
+        this.useManagedMemory = useManagedMemory;
+    }
+
+    /**
+     * Configures ForSt to use a fixed amount of memory shared between all 
instances (operators) in
+     * a slot. See {@link ForStOptions#FIX_PER_SLOT_MEMORY_SIZE} for details.
+     */
+    public void setFixedMemoryPerSlot(MemorySize fixedMemoryPerSlot) {
+        checkArgument(
+                fixedMemoryPerSlot == null || fixedMemoryPerSlot.getBytes() > 
0,
+                "Total memory per slot must be > 0");
+
+        this.fixedMemoryPerSlot = fixedMemoryPerSlot;
+    }
+
+    /**
+     * Configures ForSt to use a fixed amount of memory shared between all 
instances (operators) in
+     * a slot. See {@link #setFixedMemoryPerSlot(MemorySize)} for details.
+     */
+    public void setFixedMemoryPerSlot(String totalMemoryPerSlotStr) {
+        setFixedMemoryPerSlot(MemorySize.parse(totalMemoryPerSlotStr));
+    }
+
+    /**
+     * Sets the fraction of the total memory to be used for write buffers. 
This only has an effect
+     * is either {@link #setUseManagedMemory(boolean)} or {@link 
#setFixedMemoryPerSlot(MemorySize)}
+     * are set.
+     *
+     * <p>See {@link ForStOptions#WRITE_BUFFER_RATIO} for details.
+     */
+    public void setWriteBufferRatio(double writeBufferRatio) {
+        Preconditions.checkArgument(
+                writeBufferRatio > 0 && writeBufferRatio < 1.0,
+                "Write Buffer ratio %s must be in (0, 1)",
+                writeBufferRatio);
+        this.writeBufferRatio = writeBufferRatio;
+    }
+
+    /**
+     * Sets the fraction of the total memory to be used for high priority 
blocks like indexes,
+     * dictionaries, etc. This only has an effect is either {@link 
#setUseManagedMemory(boolean)} or
+     * {@link #setFixedMemoryPerSlot(MemorySize)} are set.
+     *
+     * <p>See {@link ForStOptions#HIGH_PRIORITY_POOL_RATIO} for details.
+     */
+    public void setHighPriorityPoolRatio(double highPriorityPoolRatio) {
+        Preconditions.checkArgument(
+                highPriorityPoolRatio > 0 && highPriorityPoolRatio < 1.0,
+                "High priority pool ratio %s must be in (0, 1)",
+                highPriorityPoolRatio);
+        this.highPriorityPoolRatio = highPriorityPoolRatio;
+    }
+
+    /**
+     * Gets whether the state backend is configured to use the managed memory 
of a slot for ForSt.
+     * See {@link ForStOptions#USE_MANAGED_MEMORY} for details.
+     */
+    public boolean isUsingManagedMemory() {
+        return useManagedMemory != null
+                ? useManagedMemory
+                : ForStOptions.USE_MANAGED_MEMORY.defaultValue();
+    }
+
+    /**
+     * Gets whether the state backend is configured to use a fixed amount of 
memory shared between
+     * all ForSt instances (in all tasks and operators) of a slot. See {@link
+     * ForStOptions#FIX_PER_SLOT_MEMORY_SIZE} for details.
+     */
+    public boolean isUsingFixedMemoryPerSlot() {
+        return fixedMemoryPerSlot != null;
+    }
+
+    /**
+     * Gets the fixed amount of memory to be shared between all RocksDB 
instances (in all tasks and
+     * operators) of a slot. Null is not configured. See {@link 
ForStOptions#USE_MANAGED_MEMORY} for
+     * details.
+     */
+    @Nullable
+    public MemorySize getFixedMemoryPerSlot() {
+        return fixedMemoryPerSlot;
+    }
+
+    /**
+     * Gets the fraction of the total memory to be used for write buffers. 
This only has an effect
+     * is either {@link #setUseManagedMemory(boolean)} or {@link 
#setFixedMemoryPerSlot(MemorySize)}
+     * are set.
+     *
+     * <p>See {@link ForStOptions#WRITE_BUFFER_RATIO} for details.
+     */
+    public double getWriteBufferRatio() {
+        return writeBufferRatio != null
+                ? writeBufferRatio
+                : ForStOptions.WRITE_BUFFER_RATIO.defaultValue();
+    }
+
+    /**
+     * Gets the fraction of the total memory to be used for high priority 
blocks like indexes,
+     * dictionaries, etc. This only has an effect is either {@link 
#setUseManagedMemory(boolean)} or
+     * {@link #setFixedMemoryPerSlot(MemorySize)} are set.
+     *
+     * <p>See {@link ForStOptions#HIGH_PRIORITY_POOL_RATIO} for details.
+     */
+    public double getHighPriorityPoolRatio() {
+        return highPriorityPoolRatio != null
+                ? highPriorityPoolRatio
+                : ForStOptions.HIGH_PRIORITY_POOL_RATIO.defaultValue();
+    }
+
+    /**
+     * Gets whether the state backend is configured to use partitioned 
index/filters for ForSt.
+     *
+     * <p>See {@link ForStOptions#USE_PARTITIONED_INDEX_FILTERS} for details.
+     */
+    public Boolean isUsingPartitionedIndexFilters() {
+        return usePartitionedIndexFilters != null
+                ? usePartitionedIndexFilters
+                : ForStOptions.USE_PARTITIONED_INDEX_FILTERS.defaultValue();
+    }
+
+    // ------------------------------------------------------------------------
+
+    /** Validates if the configured options are valid with respect to one 
another. */
+    public void validate() {
+        // As FLINK-15512 introduce a new mechanism to calculate the cache 
capacity,
+        // the relationship of write_buffer_manager_capacity and 
cache_capacity has changed to:
+        // write_buffer_manager_capacity / cache_capacity = 2 * 
writeBufferRatio / (3 -
+        // writeBufferRatio)
+        // we should ensure the sum of write buffer manager capacity and high 
priority pool less
+        // than cache capacity.
+        // TODO change the formula once FLINK-15532 resolved.
+        if (writeBufferRatio != null
+                && highPriorityPoolRatio != null
+                && 2 * writeBufferRatio / (3 - writeBufferRatio) + 
highPriorityPoolRatio >= 1.0) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Invalid configuration: writeBufferRatio %s with 
highPriPoolRatio %s",
+                            writeBufferRatio, highPriorityPoolRatio));
+        }
+    }
+
+    // ------------------------------------------------------------------------
+
+    /**
+     * Derives a ForStMemoryConfiguration from another object and a 
configuration. The values set on
+     * the other object take precedence, and the values from the configuration 
are used if no values
+     * are set on the other config object.
+     */
+    public static ForStMemoryConfiguration fromOtherAndConfiguration(
+            ForStMemoryConfiguration other, ReadableConfig config) {
+
+        final ForStMemoryConfiguration newConfig = new 
ForStMemoryConfiguration();
+
+        newConfig.useManagedMemory =
+                other.useManagedMemory != null
+                        ? other.useManagedMemory
+                        : config.get(ForStOptions.USE_MANAGED_MEMORY);
+
+        newConfig.fixedMemoryPerSlot =
+                other.fixedMemoryPerSlot != null
+                        ? other.fixedMemoryPerSlot
+                        : config.get(ForStOptions.FIX_PER_SLOT_MEMORY_SIZE);
+
+        newConfig.writeBufferRatio =
+                other.writeBufferRatio != null
+                        ? other.writeBufferRatio
+                        : config.get(ForStOptions.WRITE_BUFFER_RATIO);
+
+        newConfig.highPriorityPoolRatio =
+                other.highPriorityPoolRatio != null
+                        ? other.highPriorityPoolRatio
+                        : config.get(ForStOptions.HIGH_PRIORITY_POOL_RATIO);
+
+        newConfig.usePartitionedIndexFilters =
+                other.usePartitionedIndexFilters != null
+                        ? other.usePartitionedIndexFilters
+                        : 
config.get(ForStOptions.USE_PARTITIONED_INDEX_FILTERS);
+
+        return newConfig;
+    }
+
+    public static ForStMemoryConfiguration fromConfiguration(Configuration 
configuration) {
+        return fromOtherAndConfiguration(new ForStMemoryConfiguration(), 
configuration);
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMemoryControllerUtils.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMemoryControllerUtils.java
new file mode 100644
index 00000000000..670c991cd60
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStMemoryControllerUtils.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.rocksdb.Cache;
+import org.rocksdb.LRUCache;
+import org.rocksdb.WriteBufferManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+/**
+ * Utils to create {@link Cache} and {@link WriteBufferManager} which are used 
to control total
+ * memory usage of ForSt.
+ */
+public class ForStMemoryControllerUtils {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ForStMemoryControllerUtils.class);
+
+    /**
+     * Allocate memory controllable ForSt shared resources.
+     *
+     * @param totalMemorySize The total memory limit size.
+     * @param writeBufferRatio The ratio of total memory which is occupied by 
write buffer manager.
+     * @param highPriorityPoolRatio The high priority pool ratio of cache.
+     * @param factory creates Write Buffer Manager and Bock Cache
+     * @return memory controllable RocksDB shared resources.
+     */
+    public static ForStSharedResources allocateForStSharedResources(
+            long totalMemorySize,
+            double writeBufferRatio,
+            double highPriorityPoolRatio,
+            boolean usingPartitionedIndexFilters,
+            ForStMemoryFactory factory) {
+
+        long calculatedCacheCapacity =
+                ForStMemoryControllerUtils.calculateActualCacheCapacity(
+                        totalMemorySize, writeBufferRatio);
+        final Cache cache = factory.createCache(calculatedCacheCapacity, 
highPriorityPoolRatio);
+
+        long writeBufferManagerCapacity =
+                ForStMemoryControllerUtils.calculateWriteBufferManagerCapacity(
+                        totalMemorySize, writeBufferRatio);
+        final WriteBufferManager wbm =
+                factory.createWriteBufferManager(writeBufferManagerCapacity, 
cache);
+
+        LOG.debug(
+                "Allocated ForSt shared resources, calculatedCacheCapacity: 
{}, highPriorityPoolRatio: {}, writeBufferManagerCapacity: {}, 
usingPartitionedIndexFilters: {}",
+                calculatedCacheCapacity,
+                highPriorityPoolRatio,
+                writeBufferManagerCapacity,
+                usingPartitionedIndexFilters);
+        return new ForStSharedResources(
+                cache, wbm, writeBufferManagerCapacity, 
usingPartitionedIndexFilters);
+    }
+
+    /**
+     * Calculate the actual memory capacity of cache, which would be shared 
among ForSt instance(s).
+     * We introduce this method because: a) We cannot create a strict capacity 
limit cache util
+     * FLINK-15532 resolved. b) Regardless of the memory usage of blocks 
pinned by ForSt iterators,
+     * which is difficult to calculate and only happened when we iterator 
entries in MapState, the
+     * overuse of memory is mainly occupied by at most half of the write 
buffer usage. (see <a
+     * 
href="https://github.com/dataArtisans/frocksdb/blob/958f191d3f7276ae59b270f9db8390034d549ee0/include/rocksdb/write_buffer_manager.h#L51";>the
+     * flush implementation of write buffer manager</a>). Thus, we have four 
equations below:
+     * write_buffer_manager_memory = 1.5 * write_buffer_manager_capacity 
write_buffer_manager_memory
+     * = total_memory_size * write_buffer_ratio write_buffer_manager_memory + 
other_part =
+     * total_memory_size write_buffer_manager_capacity + other_part = 
cache_capacity And we would
+     * deduce the formula: cache_capacity = (3 - write_buffer_ratio) * 
total_memory_size / 3
+     * write_buffer_manager_capacity = 2 * total_memory_size * 
write_buffer_ratio / 3
+     *
+     * @param totalMemorySize Total off-heap memory size reserved for ForSt 
instance(s).
+     * @param writeBufferRatio The ratio of total memory size which would be 
reserved for write
+     *     buffer manager and its over-capacity part.
+     * @return The actual calculated cache capacity.
+     */
+    @VisibleForTesting
+    public static long calculateActualCacheCapacity(long totalMemorySize, 
double writeBufferRatio) {
+        return (long) ((3 - writeBufferRatio) * totalMemorySize / 3);
+    }
+
+    /**
+     * Calculate the actual memory capacity of write buffer manager, which 
would be shared among
+     * ForSt instance(s). The formula to use here could refer to the doc of 
{@link
+     * #calculateActualCacheCapacity(long, double)}.
+     *
+     * @param totalMemorySize Total off-heap memory size reserved for ForSt 
instance(s).
+     * @param writeBufferRatio The ratio of total memory size which would be 
reserved for write
+     *     buffer manager and its over-capacity part.
+     * @return The actual calculated write buffer manager capacity.
+     */
+    @VisibleForTesting
+    static long calculateWriteBufferManagerCapacity(long totalMemorySize, 
double writeBufferRatio) {
+        return (long) (2 * totalMemorySize * writeBufferRatio / 3);
+    }
+
+    @VisibleForTesting
+    static Cache createCache(long cacheCapacity, double highPriorityPoolRatio) 
{
+        // TODO use strict capacity limit until FLINK-15532 resolved
+        return new LRUCache(cacheCapacity, -1, false, highPriorityPoolRatio);
+    }
+
+    @VisibleForTesting
+    static WriteBufferManager createWriteBufferManager(
+            long writeBufferManagerCapacity, Cache cache) {
+        return new WriteBufferManager(writeBufferManagerCapacity, cache);
+    }
+
+    /**
+     * Calculate the default arena block size as ForSt calculates it in <a
+     * 
href="https://github.com/dataArtisans/frocksdb/blob/49bc897d5d768026f1eb816d960c1f2383396ef4/db/column_family.cc#L196-L201";>
+     * here</a>.
+     *
+     * @return the default arena block size
+     * @param writeBufferSize the write buffer size (bytes)
+     */
+    static long calculateForStDefaultArenaBlockSize(long writeBufferSize) {
+        long arenaBlockSize = writeBufferSize / 8;
+
+        // Align up to 4k
+        final long align = 4 * 1024;
+        return ((arenaBlockSize + align - 1) / align) * align;
+    }
+
+    /**
+     * Calculate {@code mutable_limit_} as ForSt calculates it in <a
+     * 
href="https://github.com/dataArtisans/frocksdb/blob/FRocksDB-5.17.2/memtable/write_buffer_manager.cc#L54";>
+     * here</a>.
+     *
+     * @param bufferSize write buffer size
+     * @return mutableLimit
+     */
+    static long calculateForStMutableLimit(long bufferSize) {
+        return bufferSize * 7 / 8;
+    }
+
+    /**
+     * ForSt starts flushing the active memtable constantly in the case when 
the arena block size is
+     * greater than mutable limit (as calculated in {@link 
#calculateForStMutableLimit(long)}).
+     *
+     * <p>This happens because in such a case the check <a
+     * 
href="https://github.com/dataArtisans/frocksdb/blob/958f191d3f7276ae59b270f9db8390034d549ee0/include/rocksdb/write_buffer_manager.h#L47";>
+     * here</a> is always true.
+     *
+     * <p>This method checks that arena block size is smaller than mutable 
limit.
+     *
+     * @param arenaBlockSize Arena block size
+     * @param mutableLimit mutable limit
+     * @return whether arena block size is sensible
+     */
+    @VisibleForTesting
+    static boolean validateArenaBlockSize(long arenaBlockSize, long 
mutableLimit) {
+        return arenaBlockSize <= mutableLimit;
+    }
+
+    /** Factory for Write Buffer Manager and Bock Cache. */
+    public interface ForStMemoryFactory extends Serializable {
+        Cache createCache(long cacheCapacity, double highPriorityPoolRatio);
+
+        WriteBufferManager createWriteBufferManager(long 
writeBufferManagerCapacity, Cache cache);
+
+        ForStMemoryFactory DEFAULT =
+                new ForStMemoryFactory() {
+                    @Override
+                    public Cache createCache(long cacheCapacity, double 
highPriorityPoolRatio) {
+                        return ForStMemoryControllerUtils.createCache(
+                                cacheCapacity, highPriorityPoolRatio);
+                    }
+
+                    @Override
+                    public WriteBufferManager createWriteBufferManager(
+                            long writeBufferManagerCapacity, Cache cache) {
+                        return 
ForStMemoryControllerUtils.createWriteBufferManager(
+                                writeBufferManagerCapacity, cache);
+                    }
+                };
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
new file mode 100644
index 00000000000..2f54522d1c7
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStResourceContainer.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.runtime.memory.OpaqueMemoryResource;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.Cache;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Filter;
+import org.rocksdb.IndexType;
+import org.rocksdb.PlainTableConfig;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.Statistics;
+import org.rocksdb.TableFormatConfig;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * The container for ForSt resources, including option factory and shared 
resource among instances.
+ *
+ * <p>This should be the only entrance for ForStStateBackend to get ForSt 
options, and should be
+ * properly (and necessarily) closed to prevent resource leak.
+ */
+public final class ForStResourceContainer implements AutoCloseable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ForStResourceContainer.class);
+
+    private static final String FORST_RELOCATE_LOG_SUFFIX = "_LOG";
+
+    // the filename length limit is 255 on most operating systems
+    private static final int INSTANCE_PATH_LENGTH_LIMIT = 255 - 
FORST_RELOCATE_LOG_SUFFIX.length();
+
+    private static final String LOCAL_DB_DIR_STRING = "db";
+
+    @Nullable private final File localBasePath;
+
+    @Nullable private final File localForStPath;
+
+    /** The configurations from file. */
+    private final ReadableConfig configuration;
+
+    /** The options factory to create the ForSt options. */
+    @Nullable private final ForStOptionsFactory optionsFactory;
+
+    /**
+     * The shared resource among ForSt instances. This resource is not part of 
the 'handlesToClose',
+     * because the handles to close are closed quietly, whereas for this one, 
we want exceptions to
+     * be reported.
+     */
+    @Nullable private final OpaqueMemoryResource<ForStSharedResources> 
sharedResources;
+
+    private final boolean enableStatistics;
+
+    /** The handles to be closed when the container is closed. */
+    private final ArrayList<AutoCloseable> handlesToClose;
+
+    @Nullable private Path relocatedDbLogBaseDir;
+
+    @VisibleForTesting
+    public ForStResourceContainer() {
+        this(new Configuration(), null, null, null, false);
+    }
+
+    @VisibleForTesting
+    public ForStResourceContainer(@Nullable ForStOptionsFactory 
optionsFactory) {
+        this(new Configuration(), optionsFactory, null, null, false);
+    }
+
+    @VisibleForTesting
+    public ForStResourceContainer(
+            @Nullable ForStOptionsFactory optionsFactory,
+            @Nullable OpaqueMemoryResource<ForStSharedResources> 
sharedResources) {
+        this(new Configuration(), optionsFactory, sharedResources, null, 
false);
+    }
+
+    public ForStResourceContainer(
+            ReadableConfig configuration,
+            @Nullable ForStOptionsFactory optionsFactory,
+            @Nullable OpaqueMemoryResource<ForStSharedResources> 
sharedResources,
+            @Nullable File localBasePath,
+            boolean enableStatistics) {
+
+        this.configuration = configuration;
+        this.optionsFactory = optionsFactory;
+        this.sharedResources = sharedResources;
+
+        this.localBasePath = localBasePath;
+        this.localForStPath =
+                localBasePath != null ? new File(localBasePath, 
LOCAL_DB_DIR_STRING) : null;
+
+        this.enableStatistics = enableStatistics;
+        this.handlesToClose = new ArrayList<>();
+    }
+
+    /** Gets the ForSt {@link DBOptions} to be used for ForSt instances. */
+    public DBOptions getDbOptions() {
+        // initial options from common profile
+        DBOptions opt = createBaseCommonDBOptions();
+        handlesToClose.add(opt);
+
+        // load configurable options on top of pre-defined profile
+        setDBOptionsFromConfigurableOptions(opt);
+
+        // add user-defined options factory, if specified
+        if (optionsFactory != null) {
+            opt = optionsFactory.createDBOptions(opt, handlesToClose);
+        }
+
+        // add necessary default options
+        opt = opt.setCreateIfMissing(true).setAvoidFlushDuringShutdown(true);
+
+        // if sharedResources is non-null, use the write buffer manager from 
it.
+        if (sharedResources != null) {
+            
opt.setWriteBufferManager(sharedResources.getResourceHandle().getWriteBufferManager());
+        }
+
+        if (enableStatistics) {
+            Statistics statistics = new Statistics();
+            opt.setStatistics(statistics);
+            handlesToClose.add(statistics);
+        }
+
+        return opt;
+    }
+
+    /** Gets the ForSt {@link ColumnFamilyOptions} to be used for all ForSt 
instances. */
+    public ColumnFamilyOptions getColumnOptions() {
+        // initial options from common profile
+        ColumnFamilyOptions opt = createBaseCommonColumnOptions();
+        handlesToClose.add(opt);
+
+        // load configurable options on top of pre-defined profile
+        setColumnFamilyOptionsFromConfigurableOptions(opt, handlesToClose);
+
+        // add user-defined options, if specified
+        if (optionsFactory != null) {
+            opt = optionsFactory.createColumnOptions(opt, handlesToClose);
+        }
+
+        // if sharedResources is non-null, use the block cache from it and
+        // set necessary options for performance consideration with memory 
control
+        if (sharedResources != null) {
+            final ForStSharedResources rocksResources = 
sharedResources.getResourceHandle();
+            final Cache blockCache = rocksResources.getCache();
+            TableFormatConfig tableFormatConfig = opt.tableFormatConfig();
+            BlockBasedTableConfig blockBasedTableConfig;
+            if (tableFormatConfig == null) {
+                blockBasedTableConfig = new BlockBasedTableConfig();
+            } else {
+                Preconditions.checkArgument(
+                        tableFormatConfig instanceof BlockBasedTableConfig,
+                        "We currently only support BlockBasedTableConfig When 
bounding total memory.");
+                blockBasedTableConfig = (BlockBasedTableConfig) 
tableFormatConfig;
+            }
+            if (rocksResources.isUsingPartitionedIndexFilters()
+                    && overwriteFilterIfExist(blockBasedTableConfig)) {
+                
blockBasedTableConfig.setIndexType(IndexType.kTwoLevelIndexSearch);
+                blockBasedTableConfig.setPartitionFilters(true);
+                blockBasedTableConfig.setPinTopLevelIndexAndFilter(true);
+            }
+            blockBasedTableConfig.setBlockCache(blockCache);
+            blockBasedTableConfig.setCacheIndexAndFilterBlocks(true);
+            
blockBasedTableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
+            blockBasedTableConfig.setPinL0FilterAndIndexBlocksInCache(true);
+            opt.setTableFormatConfig(blockBasedTableConfig);
+        }
+
+        return opt;
+    }
+
+    /** Gets the ForSt {@link WriteOptions} to be used for write operations. */
+    public WriteOptions getWriteOptions() {
+        // Disable WAL by default
+        WriteOptions opt = new WriteOptions().setDisableWAL(true);
+        handlesToClose.add(opt);
+
+        // add user-defined options factory, if specified
+        if (optionsFactory != null) {
+            opt = optionsFactory.createWriteOptions(opt, handlesToClose);
+        }
+
+        return opt;
+    }
+
+    /** Gets the ForSt {@link ReadOptions} to be used for read operations. */
+    public ReadOptions getReadOptions() {
+        ReadOptions opt = new ReadOptions();
+        handlesToClose.add(opt);
+
+        // add user-defined options factory, if specified
+        if (optionsFactory != null) {
+            opt = optionsFactory.createReadOptions(opt, handlesToClose);
+        }
+
+        return opt;
+    }
+
+    @Nullable
+    public File getLocalBasePath() {
+        return localBasePath;
+    }
+
+    @Nullable
+    public File getLocalForStPath() {
+        return localForStPath;
+    }
+
+    ForStNativeMetricOptions getMemoryWatcherOptions(
+            ForStNativeMetricOptions defaultMetricOptions) {
+        return optionsFactory == null
+                ? defaultMetricOptions
+                : 
optionsFactory.createNativeMetricsOptions(defaultMetricOptions);
+    }
+
+    @Override
+    public void close() throws Exception {
+        handlesToClose.forEach(IOUtils::closeQuietly);
+        handlesToClose.clear();
+
+        if (sharedResources != null) {
+            sharedResources.close();
+        }
+        cleanRelocatedDbLogs();
+    }
+
+    /**
+     * Overwrite configured {@link Filter} if enable partitioned filter. 
Partitioned filter only
+     * worked in full bloom filter, not blocked based.
+     */
+    private boolean overwriteFilterIfExist(BlockBasedTableConfig 
blockBasedTableConfig) {
+        if (blockBasedTableConfig.filterPolicy() != null) {
+            // TODO Can get filter's config in the future ForSt version, and 
build new filter use
+            // existing config.
+            BloomFilter newFilter = new BloomFilter(10, false);
+            LOG.info(
+                    "Existing filter has been overwritten to full filters 
since partitioned index filters is enabled.");
+            blockBasedTableConfig.setFilterPolicy(newFilter);
+            handlesToClose.add(newFilter);
+        }
+        return true;
+    }
+
+    /** Create a {@link DBOptions} for ForSt, including some common settings. 
*/
+    DBOptions createBaseCommonDBOptions() {
+        return new DBOptions().setUseFsync(false).setStatsDumpPeriodSec(0);
+    }
+
+    /** Create a {@link ColumnFamilyOptions} for ForSt, including some common 
settings. */
+    ColumnFamilyOptions createBaseCommonColumnOptions() {
+        return new ColumnFamilyOptions();
+    }
+
+    /**
+     * Get a value for option from pre-defined option and configurable option 
settings. The priority
+     * relationship is as below.
+     *
+     * <p>Configured value > pre-defined value > default value.
+     *
+     * @param option the wanted option
+     * @param <T> the value type
+     * @return the final value for the option according to the priority above.
+     */
+    @Nullable
+    private <T> T internalGetOption(ConfigOption<T> option) {
+        return 
configuration.getOptional(option).orElseGet(option::defaultValue);
+    }
+
+    @SuppressWarnings("ConstantConditions")
+    private DBOptions setDBOptionsFromConfigurableOptions(DBOptions 
currentOptions) {
+
+        currentOptions.setMaxBackgroundJobs(
+                
internalGetOption(ForStConfigurableOptions.MAX_BACKGROUND_THREADS));
+
+        
currentOptions.setMaxOpenFiles(internalGetOption(ForStConfigurableOptions.MAX_OPEN_FILES));
+
+        
currentOptions.setInfoLogLevel(internalGetOption(ForStConfigurableOptions.LOG_LEVEL));
+
+        String logDir = internalGetOption(ForStConfigurableOptions.LOG_DIR);
+        if (logDir == null || logDir.isEmpty()) {
+            if (localForStPath == null
+                    || localForStPath.getAbsolutePath().length() <= 
INSTANCE_PATH_LENGTH_LIMIT) {
+                relocateDefaultDbLogDir(currentOptions);
+            } else {
+                // disable log relocate when instance path length exceeds 
limit to prevent ForSt
+                // log file creation failure, details in FLINK-31743
+                LOG.warn(
+                        "ForSt local path length exceeds limit : {}, disable 
log relocate.",
+                        localForStPath);
+            }
+        } else {
+            currentOptions.setDbLogDir(logDir);
+        }
+
+        currentOptions.setMaxLogFileSize(
+                
internalGetOption(ForStConfigurableOptions.LOG_MAX_FILE_SIZE).getBytes());
+
+        
currentOptions.setKeepLogFileNum(internalGetOption(ForStConfigurableOptions.LOG_FILE_NUM));
+
+        return currentOptions;
+    }
+
+    @SuppressWarnings("ConstantConditions")
+    private ColumnFamilyOptions setColumnFamilyOptionsFromConfigurableOptions(
+            ColumnFamilyOptions currentOptions, Collection<AutoCloseable> 
handlesToClose) {
+
+        currentOptions.setCompactionStyle(
+                internalGetOption(ForStConfigurableOptions.COMPACTION_STYLE));
+
+        currentOptions.setCompressionPerLevel(
+                
internalGetOption(ForStConfigurableOptions.COMPRESSION_PER_LEVEL));
+
+        currentOptions.setLevelCompactionDynamicLevelBytes(
+                
internalGetOption(ForStConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE));
+
+        currentOptions.setTargetFileSizeBase(
+                
internalGetOption(ForStConfigurableOptions.TARGET_FILE_SIZE_BASE).getBytes());
+
+        currentOptions.setMaxBytesForLevelBase(
+                
internalGetOption(ForStConfigurableOptions.MAX_SIZE_LEVEL_BASE).getBytes());
+
+        currentOptions.setWriteBufferSize(
+                
internalGetOption(ForStConfigurableOptions.WRITE_BUFFER_SIZE).getBytes());
+
+        currentOptions.setMaxWriteBufferNumber(
+                
internalGetOption(ForStConfigurableOptions.MAX_WRITE_BUFFER_NUMBER));
+
+        currentOptions.setMinWriteBufferNumberToMerge(
+                
internalGetOption(ForStConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE));
+
+        TableFormatConfig tableFormatConfig = 
currentOptions.tableFormatConfig();
+
+        BlockBasedTableConfig blockBasedTableConfig;
+        if (tableFormatConfig == null) {
+            blockBasedTableConfig = new BlockBasedTableConfig();
+        } else {
+            if (tableFormatConfig instanceof PlainTableConfig) {
+                // if the table format config is PlainTableConfig, we just 
return current
+                // column-family options
+                return currentOptions;
+            } else {
+                blockBasedTableConfig = (BlockBasedTableConfig) 
tableFormatConfig;
+            }
+        }
+
+        blockBasedTableConfig.setBlockSize(
+                
internalGetOption(ForStConfigurableOptions.BLOCK_SIZE).getBytes());
+
+        blockBasedTableConfig.setMetadataBlockSize(
+                
internalGetOption(ForStConfigurableOptions.METADATA_BLOCK_SIZE).getBytes());
+
+        blockBasedTableConfig.setBlockCacheSize(
+                
internalGetOption(ForStConfigurableOptions.BLOCK_CACHE_SIZE).getBytes());
+
+        if (internalGetOption(ForStConfigurableOptions.USE_BLOOM_FILTER)) {
+            final double bitsPerKey =
+                    
internalGetOption(ForStConfigurableOptions.BLOOM_FILTER_BITS_PER_KEY);
+            final boolean blockBasedMode =
+                    
internalGetOption(ForStConfigurableOptions.BLOOM_FILTER_BLOCK_BASED_MODE);
+            BloomFilter bloomFilter = new BloomFilter(bitsPerKey, 
blockBasedMode);
+            handlesToClose.add(bloomFilter);
+            blockBasedTableConfig.setFilterPolicy(bloomFilter);
+        }
+
+        return currentOptions.setTableFormatConfig(blockBasedTableConfig);
+    }
+
+    /**
+     * Relocates the default log directory of ForSt with the Flink log 
directory. Finds the Flink
+     * log directory using log.file Java property that is set during startup.
+     *
+     * @param dbOptions The ForSt {@link DBOptions}.
+     */
+    private void relocateDefaultDbLogDir(DBOptions dbOptions) {
+        String logFilePath = System.getProperty("log.file");
+        if (logFilePath != null) {
+            File logFile = resolveFileLocation(logFilePath);
+            if (logFile != null && resolveFileLocation(logFile.getParent()) != 
null) {
+                String relocatedDbLogDir = logFile.getParent();
+                this.relocatedDbLogBaseDir = new 
File(relocatedDbLogDir).toPath();
+                dbOptions.setDbLogDir(relocatedDbLogDir);
+            }
+        }
+    }
+
+    /**
+     * Verify log file location.
+     *
+     * @param logFilePath Path to log file
+     * @return File or null if not a valid log file
+     */
+    private File resolveFileLocation(String logFilePath) {
+        File logFile = new File(logFilePath);
+        return (logFile.exists() && logFile.canRead()) ? logFile : null;
+    }
+
+    /** Clean all relocated ForSt logs. */
+    private void cleanRelocatedDbLogs() {
+        if (localForStPath != null && relocatedDbLogBaseDir != null) {
+            LOG.info("Cleaning up relocated ForSt logs: {}.", 
relocatedDbLogBaseDir);
+
+            String relocatedDbLogPrefix =
+                    
resolveRelocatedDbLogPrefix(localForStPath.getAbsolutePath());
+            try {
+                Arrays.stream(FileUtils.listDirectory(relocatedDbLogBaseDir))
+                        .filter(
+                                path ->
+                                        !Files.isDirectory(path)
+                                                && path.toFile()
+                                                        .getName()
+                                                        
.startsWith(relocatedDbLogPrefix))
+                        .forEach(IOUtils::deleteFileQuietly);
+            } catch (IOException e) {
+                LOG.warn("Could not list relocated ForSt log directory: {}", 
relocatedDbLogBaseDir);
+            }
+        }
+    }
+
+    /**
+     * Resolve the prefix of ForSt's log file name according to ForSt's log 
file name rules.
+     *
+     * @param instanceForStAbsolutePath The path where the ForSt directory is 
located.
+     * @return Resolved ForSt log name prefix.
+     */
+    private String resolveRelocatedDbLogPrefix(String 
instanceForStAbsolutePath) {
+        if (!instanceForStAbsolutePath.isEmpty()
+                && !instanceForStAbsolutePath.matches("^[a-zA-Z0-9\\-._].*")) {
+            instanceForStAbsolutePath = instanceForStAbsolutePath.substring(1);
+        }
+        return instanceForStAbsolutePath.replaceAll("[^a-zA-Z0-9\\-._]", "_")
+                + FORST_RELOCATE_LOG_SUFFIX;
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStSharedResources.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStSharedResources.java
new file mode 100644
index 00000000000..e6451fefda6
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStSharedResources.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.rocksdb.Cache;
+import org.rocksdb.WriteBufferManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The set of resources that can be shared by all ForSt instances in a slot. 
Sharing these resources
+ * helps ForSt a predictable resource footprint.
+ */
+final class ForStSharedResources implements AutoCloseable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ForStSharedResources.class);
+
+    private final Cache cache;
+
+    private final WriteBufferManager writeBufferManager;
+    private final long writeBufferManagerCapacity;
+
+    private final boolean usingPartitionedIndexFilters;
+
+    ForStSharedResources(
+            Cache cache,
+            WriteBufferManager writeBufferManager,
+            long writeBufferManagerCapacity,
+            boolean usingPartitionedIndexFilters) {
+        this.cache = cache;
+        this.writeBufferManager = writeBufferManager;
+        this.writeBufferManagerCapacity = writeBufferManagerCapacity;
+        this.usingPartitionedIndexFilters = usingPartitionedIndexFilters;
+    }
+
+    public Cache getCache() {
+        return cache;
+    }
+
+    public WriteBufferManager getWriteBufferManager() {
+        return writeBufferManager;
+    }
+
+    public long getWriteBufferManagerCapacity() {
+        return writeBufferManagerCapacity;
+    }
+
+    public boolean isUsingPartitionedIndexFilters() {
+        return usingPartitionedIndexFilters;
+    }
+
+    @Override
+    public void close() {
+        LOG.debug("Closing ForStSharedResources");
+        writeBufferManager.close();
+        cache.close();
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStSharedResourcesFactory.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStSharedResourcesFactory.java
new file mode 100644
index 00000000000..beb98e71509
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStSharedResourcesFactory.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.memory.OpaqueMemoryResource;
+import org.apache.flink.runtime.memory.SharedResources;
+import org.apache.flink.util.function.LongFunctionWithException;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.state.forst.ForStOptions.FIX_PER_TM_MEMORY_SIZE;
+
+/**
+ * A factory of {@link ForStSharedResources}. Encapsulates memory share scope 
(e.g. TM, Slot) and
+ * lifecycle (managed/unmanaged).
+ */
+enum ForStSharedResourcesFactory {
+    /** Memory allocated per Slot (shared across slot tasks), managed by 
Flink. */
+    SLOT_SHARED_MANAGED(false, MemoryShareScope.SLOT) {
+        @Override
+        protected OpaqueMemoryResource<ForStSharedResources> createInternal(
+                ForStMemoryConfiguration jobMemoryConfig,
+                String resourceId,
+                Environment env,
+                double memoryFraction,
+                LongFunctionWithException<ForStSharedResources, Exception> 
allocator)
+                throws Exception {
+            return env.getMemoryManager()
+                    .getSharedMemoryResourceForManagedMemory(resourceId, 
allocator, memoryFraction);
+        }
+    },
+    /** Memory allocated per Slot (shared across slot tasks), unmanaged. */
+    SLOT_SHARED_UNMANAGED(false, MemoryShareScope.SLOT) {
+        @Override
+        protected OpaqueMemoryResource<ForStSharedResources> createInternal(
+                ForStMemoryConfiguration jobMemoryConfig,
+                String resourceId,
+                Environment env,
+                double memoryFraction,
+                LongFunctionWithException<ForStSharedResources, Exception> 
allocator)
+                throws Exception {
+            return env.getMemoryManager()
+                    .getExternalSharedMemoryResource(
+                            resourceId,
+                            allocator,
+                            
jobMemoryConfig.getFixedMemoryPerSlot().getBytes());
+        }
+    },
+    /** Memory allocated per TM (shared across all tasks), unmanaged. */
+    TM_SHARED_UNMANAGED(false, MemoryShareScope.TM) {
+        @Override
+        protected OpaqueMemoryResource<ForStSharedResources> createInternal(
+                ForStMemoryConfiguration jobMemoryConfig,
+                String resourceId,
+                Environment env,
+                double memoryFraction,
+                LongFunctionWithException<ForStSharedResources, Exception> 
allocator)
+                throws Exception {
+
+            SharedResources sharedResources = env.getSharedResources();
+            Object leaseHolder = new Object();
+            SharedResources.ResourceAndSize<ForStSharedResources> resource =
+                    sharedResources.getOrAllocateSharedResource(
+                            resourceId, leaseHolder, allocator, 
getTmSharedMemorySize(env));
+            ThrowingRunnable<Exception> disposer =
+                    () -> sharedResources.release(resourceId, leaseHolder, 
unused -> {});
+
+            return new OpaqueMemoryResource<>(resource.resourceHandle(), 
resource.size(), disposer);
+        }
+    };
+
+    private final boolean managed;
+    private final MemoryShareScope shareScope;
+
+    ForStSharedResourcesFactory(boolean managed, MemoryShareScope shareScope) {
+        this.managed = managed;
+        this.shareScope = shareScope;
+    }
+
+    @Nullable
+    public static ForStSharedResourcesFactory from(
+            ForStMemoryConfiguration jobMemoryConfig, Environment env) {
+        if (jobMemoryConfig.isUsingFixedMemoryPerSlot()) {
+            return ForStSharedResourcesFactory.SLOT_SHARED_UNMANAGED;
+        } else if (jobMemoryConfig.isUsingManagedMemory()) {
+            return ForStSharedResourcesFactory.SLOT_SHARED_MANAGED;
+        } else if (getTmSharedMemorySize(env) > 0) {
+            return ForStSharedResourcesFactory.TM_SHARED_UNMANAGED;
+        } else {
+            // not shared and not managed - allocate per column family
+            return null;
+        }
+    }
+
+    public final OpaqueMemoryResource<ForStSharedResources> create(
+            ForStMemoryConfiguration jobMemoryConfig,
+            Environment env,
+            double memoryFraction,
+            Logger logger,
+            ForStMemoryControllerUtils.ForStMemoryFactory forStMemoryFactory)
+            throws Exception {
+        logger.info(
+                "Getting shared memory for ForSt: shareScope={}, managed={}", 
shareScope, managed);
+        return createInternal(
+                jobMemoryConfig,
+                managed ? MANAGED_MEMORY_RESOURCE_ID : 
UNMANAGED_MEMORY_RESOURCE_ID,
+                env,
+                memoryFraction,
+                createAllocator(
+                        shareScope.getConfiguration(jobMemoryConfig, env), 
forStMemoryFactory));
+    }
+
+    protected abstract OpaqueMemoryResource<ForStSharedResources> 
createInternal(
+            ForStMemoryConfiguration jobMemoryConfig,
+            String resourceId,
+            Environment env,
+            double memoryFraction,
+            LongFunctionWithException<ForStSharedResources, Exception> 
allocator)
+            throws Exception;
+
+    private static long getTmSharedMemorySize(Environment env) {
+        return env.getTaskManagerInfo()
+                .getConfiguration()
+                .getOptional(FIX_PER_TM_MEMORY_SIZE)
+                .orElse(MemorySize.ZERO)
+                .getBytes();
+    }
+
+    private static final String MANAGED_MEMORY_RESOURCE_ID = 
"state-forst-managed-memory";
+
+    private static final String UNMANAGED_MEMORY_RESOURCE_ID = 
"state-forst-fixed-slot-memory";
+
+    private static LongFunctionWithException<ForStSharedResources, Exception> 
createAllocator(
+            ForStMemoryConfiguration config,
+            ForStMemoryControllerUtils.ForStMemoryFactory forStMemoryFactory) {
+        return size ->
+                ForStMemoryControllerUtils.allocateForStSharedResources(
+                        size,
+                        config.getWriteBufferRatio(),
+                        config.getHighPriorityPoolRatio(),
+                        config.isUsingPartitionedIndexFilters(),
+                        forStMemoryFactory);
+    }
+}
+
+enum MemoryShareScope {
+    TM {
+        @Override
+        public ForStMemoryConfiguration getConfiguration(
+                ForStMemoryConfiguration jobMemoryConfig, Environment env) {
+            return ForStMemoryConfiguration.fromConfiguration(
+                    env.getTaskManagerInfo().getConfiguration());
+        }
+    },
+    SLOT {
+        @Override
+        public ForStMemoryConfiguration getConfiguration(
+                ForStMemoryConfiguration jobMemoryConfig, Environment env) {
+            return jobMemoryConfig;
+        }
+    };
+
+    public abstract ForStMemoryConfiguration getConfiguration(
+            ForStMemoryConfiguration jobMemoryConfig, Environment env);
+}
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStMemoryControllerUtilsTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStMemoryControllerUtilsTest.java
new file mode 100644
index 00000000000..ae9a1ae6391
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStMemoryControllerUtilsTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.Cache;
+import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.WriteBufferManager;
+
+import java.io.IOException;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Tests to guard {@link ForStMemoryControllerUtils}. */
+public class ForStMemoryControllerUtilsTest {
+
+    @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+    @Before
+    public void ensureRocksDbNativeLibraryLoaded() throws IOException {
+        NativeLibraryLoader.getInstance()
+                .loadLibrary(temporaryFolder.newFolder().getAbsolutePath());
+    }
+
+    @Test
+    public void testCreateSharedResourcesWithExpectedCapacity() {
+        long totalMemorySize = 2048L;
+        double writeBufferRatio = 0.5;
+        double highPriPoolRatio = 0.1;
+        TestingForStMemoryFactory factory = new TestingForStMemoryFactory();
+        ForStSharedResources forStSharedResources =
+                ForStMemoryControllerUtils.allocateForStSharedResources(
+                        totalMemorySize, writeBufferRatio, highPriPoolRatio, 
false, factory);
+        long expectedCacheCapacity =
+                ForStMemoryControllerUtils.calculateActualCacheCapacity(
+                        totalMemorySize, writeBufferRatio);
+        long expectedWbmCapacity =
+                ForStMemoryControllerUtils.calculateWriteBufferManagerCapacity(
+                        totalMemorySize, writeBufferRatio);
+
+        assertThat(factory.actualCacheCapacity, is(expectedCacheCapacity));
+        assertThat(factory.actualWbmCapacity, is(expectedWbmCapacity));
+        assertThat(forStSharedResources.getWriteBufferManagerCapacity(), 
is(expectedWbmCapacity));
+    }
+
+    @Test
+    public void testCalculateForStDefaultArenaBlockSize() {
+        final long align = 4 * 1024;
+        final long writeBufferSize = 64 * 1024 * 1024;
+        final long expectArenaBlockSize = writeBufferSize / 8;
+
+        // Normal case test
+        assertThat(
+                "Arena block size calculation error for normal case",
+                
ForStMemoryControllerUtils.calculateForStDefaultArenaBlockSize(writeBufferSize),
+                is(expectArenaBlockSize));
+
+        // Alignment tests
+        assertThat(
+                "Arena block size calculation error for alignment case",
+                
ForStMemoryControllerUtils.calculateForStDefaultArenaBlockSize(writeBufferSize 
- 1),
+                is(expectArenaBlockSize));
+        assertThat(
+                "Arena block size calculation error for alignment case2",
+                
ForStMemoryControllerUtils.calculateForStDefaultArenaBlockSize(writeBufferSize 
+ 8),
+                is(expectArenaBlockSize + align));
+    }
+
+    @Test
+    public void testCalculateForStMutableLimit() {
+        long bufferSize = 64 * 1024 * 1024;
+        long limit = bufferSize * 7 / 8;
+        
assertThat(ForStMemoryControllerUtils.calculateForStMutableLimit(bufferSize), 
is(limit));
+    }
+
+    @Test
+    public void testValidateArenaBlockSize() {
+        long arenaBlockSize = 8 * 1024 * 1024;
+        assertFalse(
+                ForStMemoryControllerUtils.validateArenaBlockSize(
+                        arenaBlockSize, (long) (arenaBlockSize * 0.5)));
+        assertTrue(
+                ForStMemoryControllerUtils.validateArenaBlockSize(
+                        arenaBlockSize, (long) (arenaBlockSize * 1.5)));
+    }
+
+    private static final class TestingForStMemoryFactory
+            implements ForStMemoryControllerUtils.ForStMemoryFactory {
+        private Long actualCacheCapacity = null;
+        private Long actualWbmCapacity = null;
+
+        @Override
+        public Cache createCache(long cacheCapacity, double 
highPriorityPoolRatio) {
+            actualCacheCapacity = cacheCapacity;
+            return 
ForStMemoryControllerUtils.ForStMemoryFactory.DEFAULT.createCache(
+                    cacheCapacity, highPriorityPoolRatio);
+        }
+
+        @Override
+        public WriteBufferManager createWriteBufferManager(
+                long writeBufferManagerCapacity, Cache cache) {
+            actualWbmCapacity = writeBufferManagerCapacity;
+            return 
ForStMemoryControllerUtils.ForStMemoryFactory.DEFAULT.createWriteBufferManager(
+                    writeBufferManagerCapacity, cache);
+        }
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
new file mode 100644
index 00000000000..53c0524308b
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStResourceContainerTest.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.runtime.memory.OpaqueMemoryResource;
+import org.apache.flink.util.function.ThrowingRunnable;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
+import org.rocksdb.Cache;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.IndexType;
+import org.rocksdb.LRUCache;
+import org.rocksdb.NativeLibraryLoader;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.TableFormatConfig;
+import org.rocksdb.WriteBufferManager;
+import org.rocksdb.WriteOptions;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+/** Tests to guard {@link ForStResourceContainer}. */
+public class ForStResourceContainerTest {
+
+    @ClassRule public static final TemporaryFolder TMP_FOLDER = new 
TemporaryFolder();
+
+    @BeforeClass
+    public static void ensureForStNativeLibraryLoaded() throws IOException {
+        
NativeLibraryLoader.getInstance().loadLibrary(TMP_FOLDER.newFolder().getAbsolutePath());
+    }
+
+    // ------------------------------------------------------------------------
+
+    @Test
+    public void testFreeDBOptionsAfterClose() throws Exception {
+        ForStResourceContainer container = new ForStResourceContainer();
+        DBOptions dbOptions = container.getDbOptions();
+        assertThat(dbOptions.isOwningHandle(), is(true));
+        container.close();
+        assertThat(dbOptions.isOwningHandle(), is(false));
+    }
+
+    @Test
+    public void testFreeMultipleDBOptionsAfterClose() throws Exception {
+        ForStResourceContainer container = new ForStResourceContainer();
+        final int optionNumber = 20;
+        ArrayList<DBOptions> dbOptions = new ArrayList<>(optionNumber);
+        for (int i = 0; i < optionNumber; i++) {
+            dbOptions.add(container.getDbOptions());
+        }
+        container.close();
+        for (DBOptions dbOption : dbOptions) {
+            assertThat(dbOption.isOwningHandle(), is(false));
+        }
+    }
+
+    /**
+     * Guard the shared resources will be released after {@link 
ForStResourceContainer#close()} when
+     * the {@link ForStResourceContainer} instance is initiated with {@link 
OpaqueMemoryResource}.
+     *
+     * @throws Exception if unexpected error happened.
+     */
+    @Test
+    public void testSharedResourcesAfterClose() throws Exception {
+        OpaqueMemoryResource<ForStSharedResources> sharedResources = 
getSharedResources();
+        ForStResourceContainer container = new ForStResourceContainer(null, 
sharedResources);
+        container.close();
+        ForStSharedResources forStSharedResources = 
sharedResources.getResourceHandle();
+        assertThat(forStSharedResources.getCache().isOwningHandle(), 
is(false));
+        
assertThat(forStSharedResources.getWriteBufferManager().isOwningHandle(), 
is(false));
+    }
+
+    /**
+     * Guard that {@link ForStResourceContainer#getDbOptions()} shares the 
same {@link
+     * WriteBufferManager} instance if the {@link ForStResourceContainer} 
instance is initiated with
+     * {@link OpaqueMemoryResource}.
+     *
+     * @throws Exception if unexpected error happened.
+     */
+    @Test
+    public void testGetDbOptionsWithSharedResources() throws Exception {
+        final int optionNumber = 20;
+        OpaqueMemoryResource<ForStSharedResources> sharedResources = 
getSharedResources();
+        ForStResourceContainer container = new ForStResourceContainer(null, 
sharedResources);
+        HashSet<WriteBufferManager> writeBufferManagers = new HashSet<>();
+        for (int i = 0; i < optionNumber; i++) {
+            DBOptions dbOptions = container.getDbOptions();
+            WriteBufferManager writeBufferManager = 
getWriteBufferManager(dbOptions);
+            writeBufferManagers.add(writeBufferManager);
+        }
+        assertThat(writeBufferManagers.size(), is(1));
+        assertThat(
+                writeBufferManagers.iterator().next(),
+                
is(sharedResources.getResourceHandle().getWriteBufferManager()));
+        container.close();
+    }
+
+    /**
+     * Guard that {@link ForStResourceContainer#getColumnOptions()} shares the 
same {@link Cache}
+     * instance if the {@link ForStResourceContainer} instance is initiated 
with {@link
+     * OpaqueMemoryResource}.
+     *
+     * @throws Exception if unexpected error happened.
+     */
+    @Test
+    public void testGetColumnFamilyOptionsWithSharedResources() throws 
Exception {
+        final int optionNumber = 20;
+        OpaqueMemoryResource<ForStSharedResources> sharedResources = 
getSharedResources();
+        ForStResourceContainer container = new ForStResourceContainer(null, 
sharedResources);
+        HashSet<Cache> caches = new HashSet<>();
+        for (int i = 0; i < optionNumber; i++) {
+            ColumnFamilyOptions columnOptions = container.getColumnOptions();
+            Cache cache = getBlockCache(columnOptions);
+            caches.add(cache);
+        }
+        assertThat(caches.size(), is(1));
+        assertThat(caches.iterator().next(), 
is(sharedResources.getResourceHandle().getCache()));
+        container.close();
+    }
+
+    private OpaqueMemoryResource<ForStSharedResources> getSharedResources() {
+        final long cacheSize = 1024L, writeBufferSize = 512L;
+        final LRUCache cache = new LRUCache(cacheSize, -1, false, 0.1);
+        final WriteBufferManager wbm = new WriteBufferManager(writeBufferSize, 
cache);
+        ForStSharedResources forStSharedResources =
+                new ForStSharedResources(cache, wbm, writeBufferSize, false);
+        return new OpaqueMemoryResource<>(
+                forStSharedResources, cacheSize, forStSharedResources::close);
+    }
+
+    private Cache getBlockCache(ColumnFamilyOptions columnOptions) {
+        BlockBasedTableConfig blockBasedTableConfig = null;
+        try {
+            blockBasedTableConfig = (BlockBasedTableConfig) 
columnOptions.tableFormatConfig();
+        } catch (ClassCastException e) {
+            fail("Table config got from ColumnFamilyOptions is not 
BlockBasedTableConfig");
+        }
+        Field cacheField = null;
+        try {
+            cacheField = 
BlockBasedTableConfig.class.getDeclaredField("blockCache");
+        } catch (NoSuchFieldException e) {
+            fail("blockCache is not defined");
+        }
+        cacheField.setAccessible(true);
+        try {
+            return (Cache) cacheField.get(blockBasedTableConfig);
+        } catch (IllegalAccessException e) {
+            fail("Cannot access blockCache field.");
+            return null;
+        }
+    }
+
+    private WriteBufferManager getWriteBufferManager(DBOptions dbOptions) {
+
+        Field writeBufferManagerField = null;
+        try {
+            writeBufferManagerField = 
DBOptions.class.getDeclaredField("writeBufferManager_");
+        } catch (NoSuchFieldException e) {
+            fail("writeBufferManager_ is not defined.");
+        }
+        writeBufferManagerField.setAccessible(true);
+        try {
+            return (WriteBufferManager) writeBufferManagerField.get(dbOptions);
+        } catch (IllegalAccessException e) {
+            fail("Cannot access writeBufferManager_ field.");
+            return null;
+        }
+    }
+
+    @Test
+    public void testFreeColumnOptionsAfterClose() throws Exception {
+        ForStResourceContainer container = new ForStResourceContainer();
+        ColumnFamilyOptions columnFamilyOptions = container.getColumnOptions();
+        assertThat(columnFamilyOptions.isOwningHandle(), is(true));
+        container.close();
+        assertThat(columnFamilyOptions.isOwningHandle(), is(false));
+    }
+
+    @Test
+    public void testFreeMultipleColumnOptionsAfterClose() throws Exception {
+        ForStResourceContainer container = new ForStResourceContainer();
+        final int optionNumber = 20;
+        ArrayList<ColumnFamilyOptions> columnFamilyOptions = new 
ArrayList<>(optionNumber);
+        for (int i = 0; i < optionNumber; i++) {
+            columnFamilyOptions.add(container.getColumnOptions());
+        }
+        container.close();
+        for (ColumnFamilyOptions columnFamilyOption : columnFamilyOptions) {
+            assertThat(columnFamilyOption.isOwningHandle(), is(false));
+        }
+    }
+
+    @Test
+    public void testFreeSharedResourcesAfterClose() throws Exception {
+        LRUCache cache = new LRUCache(1024L);
+        WriteBufferManager wbm = new WriteBufferManager(1024L, cache);
+        ForStSharedResources sharedResources = new ForStSharedResources(cache, 
wbm, 1024L, false);
+        final ThrowingRunnable<Exception> disposer = sharedResources::close;
+        OpaqueMemoryResource<ForStSharedResources> opaqueResource =
+                new OpaqueMemoryResource<>(sharedResources, 1024L, disposer);
+
+        ForStResourceContainer container = new ForStResourceContainer(null, 
opaqueResource);
+
+        container.close();
+        assertThat(cache.isOwningHandle(), is(false));
+        assertThat(wbm.isOwningHandle(), is(false));
+    }
+
+    @Test
+    public void testFreeWriteReadOptionsAfterClose() throws Exception {
+        ForStResourceContainer container = new ForStResourceContainer();
+        WriteOptions writeOptions = container.getWriteOptions();
+        ReadOptions readOptions = container.getReadOptions();
+        assertThat(writeOptions.isOwningHandle(), is(true));
+        assertThat(readOptions.isOwningHandle(), is(true));
+        container.close();
+        assertThat(writeOptions.isOwningHandle(), is(false));
+        assertThat(readOptions.isOwningHandle(), is(false));
+    }
+
+    @Test
+    public void testGetColumnFamilyOptionsWithPartitionedIndex() throws 
Exception {
+        LRUCache cache = new LRUCache(1024L);
+        WriteBufferManager wbm = new WriteBufferManager(1024L, cache);
+        ForStSharedResources sharedResources = new ForStSharedResources(cache, 
wbm, 1024L, true);
+        final ThrowingRunnable<Exception> disposer = sharedResources::close;
+        OpaqueMemoryResource<ForStSharedResources> opaqueResource =
+                new OpaqueMemoryResource<>(sharedResources, 1024L, disposer);
+        BloomFilter blockBasedFilter = new BloomFilter();
+        ForStOptionsFactory blockBasedBloomFilterOptionFactory =
+                new ForStOptionsFactory() {
+
+                    @Override
+                    public DBOptions createDBOptions(
+                            DBOptions currentOptions, 
Collection<AutoCloseable> handlesToClose) {
+                        return currentOptions;
+                    }
+
+                    @Override
+                    public ColumnFamilyOptions createColumnOptions(
+                            ColumnFamilyOptions currentOptions,
+                            Collection<AutoCloseable> handlesToClose) {
+                        TableFormatConfig tableFormatConfig = 
currentOptions.tableFormatConfig();
+                        BlockBasedTableConfig blockBasedTableConfig =
+                                tableFormatConfig == null
+                                        ? new BlockBasedTableConfig()
+                                        : (BlockBasedTableConfig) 
tableFormatConfig;
+                        blockBasedTableConfig.setFilter(blockBasedFilter);
+                        handlesToClose.add(blockBasedFilter);
+                        
currentOptions.setTableFormatConfig(blockBasedTableConfig);
+                        return currentOptions;
+                    }
+                };
+        try (ForStResourceContainer container =
+                new ForStResourceContainer(blockBasedBloomFilterOptionFactory, 
opaqueResource)) {
+            ColumnFamilyOptions columnOptions = container.getColumnOptions();
+            BlockBasedTableConfig actual =
+                    (BlockBasedTableConfig) columnOptions.tableFormatConfig();
+            assertThat(actual.indexType(), is(IndexType.kTwoLevelIndexSearch));
+            assertThat(actual.partitionFilters(), is(true));
+            assertThat(actual.pinTopLevelIndexAndFilter(), is(true));
+            assertThat(actual.filterPolicy(), not(blockBasedFilter));
+        }
+        assertFalse("Block based filter is left unclosed.", 
blockBasedFilter.isOwningHandle());
+    }
+}

Reply via email to