This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8411367ab0a213ec55c679a496352853bf96af51
Author: Hangxiang Yu <master...@gmail.com>
AuthorDate: Mon Apr 15 21:25:47 2024 +0800

    [FLINK-35047][state] Introduce Metrics for ForStStateBackend
---
 .../state/forst/ForStNativeMetricMonitor.java      | 233 +++++++
 .../state/forst/ForStNativeMetricOptions.java      | 700 +++++++++++++++++++++
 .../flink/state/forst/ForStOptionsFactory.java     |  14 +-
 .../apache/flink/state/forst/ForStProperty.java    | 115 ++++
 .../apache/flink/state/forst/ForStExtension.java   | 212 +++++++
 .../state/forst/ForStNativeMetricMonitorTest.java  | 254 ++++++++
 .../state/forst/ForStNativeMetricOptionsTest.java  |  53 ++
 .../flink/state/forst/ForStPropertyTest.java       |  47 ++
 8 files changed, 1627 insertions(+), 1 deletion(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStNativeMetricMonitor.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStNativeMetricMonitor.java
new file mode 100644
index 00000000000..71b511853d5
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStNativeMetricMonitor.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.Statistics;
+import org.rocksdb.TickerType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.math.BigInteger;
+
+/**
+ * A monitor which pulls {{@link RocksDB}} native metrics and forwards them to 
Flink's metric group.
+ * All metrics are unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class ForStNativeMetricMonitor implements Closeable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ForStNativeMetricMonitor.class);
+
+    private final ForStNativeMetricOptions options;
+
+    private final MetricGroup metricGroup;
+
+    private final Object lock;
+
+    static final String COLUMN_FAMILY_KEY = "column_family";
+
+    @GuardedBy("lock")
+    private RocksDB rocksDB;
+
+    @Nullable
+    @GuardedBy("lock")
+    private Statistics statistics;
+
+    public ForStNativeMetricMonitor(
+            @Nonnull ForStNativeMetricOptions options,
+            @Nonnull MetricGroup metricGroup,
+            @Nonnull RocksDB rocksDB,
+            @Nullable Statistics statistics) {
+        this.options = options;
+        this.metricGroup = metricGroup;
+        this.rocksDB = rocksDB;
+        this.statistics = statistics;
+        this.lock = new Object();
+        registerStatistics();
+    }
+
+    /** Register gauges to pull native metrics for the database. */
+    private void registerStatistics() {
+        if (statistics != null) {
+            for (TickerType tickerType : options.getMonitorTickerTypes()) {
+                metricGroup.gauge(
+                        String.format("rocksdb.%s", 
tickerType.name().toLowerCase()),
+                        new ForStNativeStatisticsMetricView(tickerType));
+            }
+        }
+    }
+
+    /**
+     * Register gauges to pull native metrics for the column family.
+     *
+     * @param columnFamilyName group name for the new gauges
+     * @param handle native handle to the column family
+     */
+    void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+
+        boolean columnFamilyAsVariable = options.isColumnFamilyAsVariable();
+        MetricGroup group =
+                columnFamilyAsVariable
+                        ? metricGroup.addGroup(COLUMN_FAMILY_KEY, 
columnFamilyName)
+                        : metricGroup.addGroup(columnFamilyName);
+
+        for (ForStProperty property : options.getProperties()) {
+            ForStNativePropertyMetricView gauge =
+                    new ForStNativePropertyMetricView(handle, property);
+            group.gauge(property.getForStProperty(), gauge);
+        }
+    }
+
+    /** Updates the value of metricView if the reference is still valid. */
+    private void setProperty(ForStNativePropertyMetricView metricView) {
+        if (metricView.isClosed()) {
+            return;
+        }
+        try {
+            synchronized (lock) {
+                if (rocksDB != null) {
+                    long value =
+                            metricView.property.getNumericalPropertyValue(
+                                    rocksDB, metricView.handle);
+                    metricView.setValue(value);
+                }
+            }
+        } catch (Exception e) {
+            metricView.close();
+            LOG.warn("Failed to read native metric {} from ForSt.", 
metricView.property, e);
+        }
+    }
+
+    private void setStatistics(ForStNativeStatisticsMetricView metricView) {
+        if (metricView.isClosed()) {
+            return;
+        }
+        if (statistics != null) {
+            synchronized (lock) {
+                
metricView.setValue(statistics.getTickerCount(metricView.tickerType));
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        synchronized (lock) {
+            rocksDB = null;
+            statistics = null;
+        }
+    }
+
+    abstract static class ForStNativeView implements View {
+        private boolean closed;
+
+        ForStNativeView() {
+            this.closed = false;
+        }
+
+        void close() {
+            closed = true;
+        }
+
+        boolean isClosed() {
+            return closed;
+        }
+    }
+
+    /**
+     * A gauge which periodically pulls a ForSt property-based native metric 
for the specified
+     * column family / metric pair.
+     *
+     * <p><strong>Note</strong>: As the returned property is of type {@code 
uint64_t} on C++ side
+     * the returning value can be negative. Because java does not support 
unsigned long types, this
+     * gauge wraps the result in a {@link BigInteger}.
+     */
+    class ForStNativePropertyMetricView extends ForStNativeView implements 
Gauge<BigInteger> {
+        private final ForStProperty property;
+
+        private final ColumnFamilyHandle handle;
+
+        private BigInteger bigInteger;
+
+        private ForStNativePropertyMetricView(
+                ColumnFamilyHandle handle, @Nonnull ForStProperty property) {
+            this.handle = handle;
+            this.property = property;
+            this.bigInteger = BigInteger.ZERO;
+        }
+
+        public void setValue(long value) {
+            if (value >= 0L) {
+                bigInteger = BigInteger.valueOf(value);
+            } else {
+                int upper = (int) (value >>> 32);
+                int lower = (int) value;
+
+                bigInteger =
+                        BigInteger.valueOf(Integer.toUnsignedLong(upper))
+                                .shiftLeft(32)
+                                
.add(BigInteger.valueOf(Integer.toUnsignedLong(lower)));
+            }
+        }
+
+        @Override
+        public BigInteger getValue() {
+            return bigInteger;
+        }
+
+        @Override
+        public void update() {
+            setProperty(this);
+        }
+    }
+
+    /** A gauge which periodically pulls a ForSt statistics-based native 
metric for the database. */
+    class ForStNativeStatisticsMetricView extends ForStNativeView implements 
Gauge<Long> {
+        private final TickerType tickerType;
+        private long value;
+
+        private ForStNativeStatisticsMetricView(TickerType tickerType) {
+            this.tickerType = tickerType;
+        }
+
+        @Override
+        public Long getValue() {
+            return value;
+        }
+
+        void setValue(long value) {
+            this.value = value;
+        }
+
+        @Override
+        public void update() {
+            setStatistics(this);
+        }
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStNativeMetricOptions.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStNativeMetricOptions.java
new file mode 100644
index 00000000000..40a973b61ef
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStNativeMetricOptions.java
@@ -0,0 +1,700 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+
+import org.rocksdb.TickerType;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Enable which ForSt metrics to forward to Flink's metrics reporter.
+ *
+ * <p>Property based metrics would report at the column family level and 
return unsigned long
+ * values.
+ *
+ * <p>Statistics based metrics would report at the database level, it can 
return ticker or histogram
+ * kind results.
+ *
+ * <p>Properties and doc comments are taken from RocksDB documentation. See <a
+ * 
href="https://github.com/facebook/rocksdb/blob/64324e329eb0a9b4e77241a425a1615ff524c7f1/include/rocksdb/db.h#L429";>
+ * db.h</a> for more information.
+ */
+@Experimental
+public class ForStNativeMetricOptions implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    // 
--------------------------------------------------------------------------------------------
+    //  RocksDB property based metrics, report at column family level
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final String METRICS_COLUMN_FAMILY_AS_VARIABLE_KEY =
+            "state.backend.rocksdb.metrics" + ".column-family-as-variable";
+
+    public static final ConfigOption<Boolean> MONITOR_NUM_IMMUTABLE_MEM_TABLES 
=
+            
ConfigOptions.key(ForStProperty.NumImmutableMemTable.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Monitor the number of immutable 
memtables in RocksDB.");
+
+    public static final ConfigOption<Boolean> MONITOR_MEM_TABLE_FLUSH_PENDING =
+            
ConfigOptions.key(ForStProperty.MemTableFlushPending.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Monitor the number of pending memtable 
flushes in RocksDB.");
+
+    public static final ConfigOption<Boolean> TRACK_COMPACTION_PENDING =
+            ConfigOptions.key(ForStProperty.CompactionPending.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Track pending compactions in RocksDB. Returns 1 
if a compaction is pending, 0 otherwise.");
+
+    public static final ConfigOption<Boolean> MONITOR_BACKGROUND_ERRORS =
+            ConfigOptions.key(ForStProperty.BackgroundErrors.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Monitor the number of background errors 
in RocksDB.");
+
+    public static final ConfigOption<Boolean> 
MONITOR_CUR_SIZE_ACTIVE_MEM_TABLE =
+            
ConfigOptions.key(ForStProperty.CurSizeActiveMemTable.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the approximate size of the active 
memtable in bytes.");
+
+    public static final ConfigOption<Boolean> MONITOR_CUR_SIZE_ALL_MEM_TABLE =
+            ConfigOptions.key(ForStProperty.CurSizeAllMemTables.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the approximate size of the active and 
unflushed immutable memtables"
+                                    + " in bytes.");
+
+    public static final ConfigOption<Boolean> MONITOR_SIZE_ALL_MEM_TABLES =
+            ConfigOptions.key(ForStProperty.SizeAllMemTables.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the approximate size of the active, 
unflushed immutable, "
+                                    + "and pinned immutable memtables in 
bytes.");
+
+    public static final ConfigOption<Boolean> 
MONITOR_NUM_ENTRIES_ACTIVE_MEM_TABLE =
+            
ConfigOptions.key(ForStProperty.NumEntriesActiveMemTable.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Monitor the total number of entries in 
the active memtable.");
+
+    public static final ConfigOption<Boolean> 
MONITOR_NUM_ENTRIES_IMM_MEM_TABLES =
+            
ConfigOptions.key(ForStProperty.NumEntriesImmMemTables.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the total number of entries in the 
unflushed immutable memtables.");
+
+    public static final ConfigOption<Boolean> 
MONITOR_NUM_DELETES_ACTIVE_MEM_TABLE =
+            
ConfigOptions.key(ForStProperty.NumDeletesActiveMemTable.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the total number of delete entries in the 
active memtable.");
+
+    public static final ConfigOption<Boolean> 
MONITOR_NUM_DELETES_IMM_MEM_TABLE =
+            
ConfigOptions.key(ForStProperty.NumDeletesImmMemTables.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the total number of delete entries in the 
unflushed immutable memtables.");
+
+    public static final ConfigOption<Boolean> ESTIMATE_NUM_KEYS =
+            ConfigOptions.key(ForStProperty.EstimateNumKeys.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Estimate the number of keys in 
RocksDB.");
+
+    public static final ConfigOption<Boolean> ESTIMATE_TABLE_READERS_MEM =
+            
ConfigOptions.key(ForStProperty.EstimateTableReadersMem.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Estimate the memory used for reading SST tables, 
excluding memory"
+                                    + " used in block cache (e.g.,filter and 
index blocks) in bytes.");
+
+    public static final ConfigOption<Boolean> MONITOR_NUM_SNAPSHOTS =
+            ConfigOptions.key(ForStProperty.NumSnapshots.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Monitor the number of unreleased 
snapshots of the database.");
+
+    public static final ConfigOption<Boolean> MONITOR_NUM_LIVE_VERSIONS =
+            ConfigOptions.key(ForStProperty.NumLiveVersions.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor number of live versions. Version is an 
internal data structure. "
+                                    + "See RocksDB file version_set.h for 
details. More live versions often mean more SST files are held "
+                                    + "from being deleted, by iterators or 
unfinished compactions.");
+
+    public static final ConfigOption<Boolean> ESTIMATE_LIVE_DATA_SIZE =
+            
ConfigOptions.key(ForStProperty.EstimateLiveDataSize.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Estimate of the amount of live data in bytes 
(usually smaller than sst files size due to space amplification).");
+
+    public static final ConfigOption<Boolean> MONITOR_TOTAL_SST_FILES_SIZE =
+            ConfigOptions.key(ForStProperty.TotalSstFilesSize.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the total size (bytes) of all SST files 
of all versions."
+                                    + "WARNING: may slow down online queries 
if there are too many files.");
+
+    public static final ConfigOption<Boolean> MONITOR_LIVE_SST_FILES_SIZE =
+            ConfigOptions.key(ForStProperty.LiveSstFilesSize.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the total size (bytes) of all SST files 
belonging to the latest version."
+                                    + "WARNING: may slow down online queries 
if there are too many files.");
+
+    public static final ConfigOption<Boolean> 
ESTIMATE_PENDING_COMPACTION_BYTES =
+            
ConfigOptions.key(ForStProperty.EstimatePendingCompactionBytes.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Estimated total number of bytes compaction needs 
to rewrite to get all levels "
+                                    + "down to under target size. Not valid 
for other compactions than level-based.");
+
+    public static final ConfigOption<Boolean> MONITOR_NUM_RUNNING_COMPACTIONS =
+            
ConfigOptions.key(ForStProperty.NumRunningCompactions.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Monitor the number of currently running 
compactions.");
+
+    public static final ConfigOption<Boolean> MONITOR_NUM_RUNNING_FLUSHES =
+            ConfigOptions.key(ForStProperty.NumRunningFlushes.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Monitor the number of currently running 
flushes.");
+
+    public static final ConfigOption<Boolean> 
MONITOR_ACTUAL_DELAYED_WRITE_RATE =
+            
ConfigOptions.key(ForStProperty.ActualDelayedWriteRate.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the current actual delayed write rate. 0 
means no delay.");
+
+    public static final ConfigOption<Boolean> IS_WRITE_STOPPED =
+            ConfigOptions.key(ForStProperty.IsWriteStopped.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Track whether write has been stopped in RocksDB. 
Returns 1 if write has been stopped, 0 otherwise.");
+
+    public static final ConfigOption<Boolean> BLOCK_CACHE_CAPACITY =
+            ConfigOptions.key(ForStProperty.BlockCacheCapacity.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Monitor block cache capacity.");
+
+    public static final ConfigOption<Boolean> BLOCK_CACHE_USAGE =
+            ConfigOptions.key(ForStProperty.BlockCacheUsage.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the memory size for the entries residing 
in block cache.");
+
+    public static final ConfigOption<Boolean> BLOCK_CACHE_PINNED_USAGE =
+            
ConfigOptions.key(ForStProperty.BlockCachePinnedUsage.getConfigKey())
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the memory size for the entries being 
pinned in block cache.");
+
+    public static final ConfigOption<Boolean> COLUMN_FAMILY_AS_VARIABLE =
+            ConfigOptions.key(METRICS_COLUMN_FAMILY_AS_VARIABLE_KEY)
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to expose the column family as a variable 
for RocksDB property based metrics.");
+
+    public static final ConfigOption<Boolean> MONITOR_NUM_FILES_AT_LEVEL =
+            
ConfigOptions.key("state.backend.rocksdb.metrics.num-files-at-level")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Monitor the number of files at each 
level.");
+
+    // 
--------------------------------------------------------------------------------------------
+    //  RocksDB statistics based metrics, report at database level
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final ConfigOption<Boolean> MONITOR_BLOCK_CACHE_HIT =
+            ConfigOptions.key("state.backend.rocksdb.metrics.block-cache-hit")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the total count of block cache hit in 
RocksDB (BLOCK_CACHE_HIT == BLOCK_CACHE_INDEX_HIT + BLOCK_CACHE_FILTER_HIT + 
BLOCK_CACHE_DATA_HIT).");
+
+    public static final ConfigOption<Boolean> MONITOR_BLOCK_CACHE_MISS =
+            ConfigOptions.key("state.backend.rocksdb.metrics.block-cache-miss")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the total count of block cache misses in 
RocksDB (BLOCK_CACHE_MISS == BLOCK_CACHE_INDEX_MISS + BLOCK_CACHE_FILTER_MISS + 
BLOCK_CACHE_DATA_MISS).");
+
+    public static final ConfigOption<Boolean> MONITOR_BLOOM_FILTER_USEFUL =
+            
ConfigOptions.key("state.backend.rocksdb.metrics.bloom-filter-useful")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Monitor the total count of reads avoided 
by bloom filter.");
+
+    public static final ConfigOption<Boolean> 
MONITOR_BLOOM_FILTER_FULL_POSITIVE =
+            
ConfigOptions.key("state.backend.rocksdb.metrics.bloom-filter-full-positive")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the total count of reads not avoided by 
bloom full filter.");
+
+    public static final ConfigOption<Boolean> 
MONITOR_BLOOM_FILTER_FULL_TRUE_POSITIVE =
+            
ConfigOptions.key("state.backend.rocksdb.metrics.bloom-filter-full-true-positive")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the total count of reads not avoided by 
bloom full filter and the data actually exists in RocksDB.");
+
+    public static final ConfigOption<Boolean> MONITOR_BYTES_READ =
+            ConfigOptions.key("state.backend.rocksdb.metrics.bytes-read")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the number of uncompressed bytes read 
(from memtables/cache/sst) from Get() operation in RocksDB.");
+
+    public static final ConfigOption<Boolean> MONITOR_ITER_BYTES_READ =
+            ConfigOptions.key("state.backend.rocksdb.metrics.iter-bytes-read")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the number of uncompressed bytes read 
(from memtables/cache/sst) from an iterator operation in RocksDB.");
+
+    public static final ConfigOption<Boolean> MONITOR_BYTES_WRITTEN =
+            ConfigOptions.key("state.backend.rocksdb.metrics.bytes-written")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the number of uncompressed bytes written 
by DB::{Put(), Delete(), Merge(), Write()} operations, which does not include 
the compaction written bytes, in RocksDB.");
+
+    public static final ConfigOption<Boolean> MONITOR_COMPACTION_READ_BYTES =
+            
ConfigOptions.key("state.backend.rocksdb.metrics.compaction-read-bytes")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Monitor the bytes read during compaction 
in RocksDB.");
+
+    public static final ConfigOption<Boolean> MONITOR_COMPACTION_WRITE_BYTES =
+            
ConfigOptions.key("state.backend.rocksdb.metrics.compaction-write-bytes")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Monitor the bytes written during 
compaction in RocksDB.");
+
+    public static final ConfigOption<Boolean> MONITOR_STALL_MICROS =
+            ConfigOptions.key("state.backend.rocksdb.metrics.stall-micros")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Monitor the duration of writer requiring to wait 
for compaction or flush to finish in RocksDB.");
+
+    /** Creates a {@link ForStNativeMetricOptions} based on an external 
configuration. */
+    public static ForStNativeMetricOptions fromConfig(ReadableConfig config) {
+        ForStNativeMetricOptions options = new ForStNativeMetricOptions();
+        configurePropertyMetrics(options, config);
+        configureStatisticsMetrics(options, config);
+        return options;
+    }
+
+    private static void configurePropertyMetrics(
+            ForStNativeMetricOptions options, ReadableConfig config) {
+        if (config.get(MONITOR_NUM_IMMUTABLE_MEM_TABLES)) {
+            options.enableNumImmutableMemTable();
+        }
+
+        if (config.get(MONITOR_MEM_TABLE_FLUSH_PENDING)) {
+            options.enableMemTableFlushPending();
+        }
+
+        if (config.get(TRACK_COMPACTION_PENDING)) {
+            options.enableCompactionPending();
+        }
+
+        if (config.get(MONITOR_BACKGROUND_ERRORS)) {
+            options.enableBackgroundErrors();
+        }
+
+        if (config.get(MONITOR_CUR_SIZE_ACTIVE_MEM_TABLE)) {
+            options.enableCurSizeActiveMemTable();
+        }
+
+        if (config.get(MONITOR_CUR_SIZE_ALL_MEM_TABLE)) {
+            options.enableCurSizeAllMemTables();
+        }
+
+        if (config.get(MONITOR_SIZE_ALL_MEM_TABLES)) {
+            options.enableSizeAllMemTables();
+        }
+
+        if (config.get(MONITOR_NUM_ENTRIES_ACTIVE_MEM_TABLE)) {
+            options.enableNumEntriesActiveMemTable();
+        }
+
+        if (config.get(MONITOR_NUM_ENTRIES_IMM_MEM_TABLES)) {
+            options.enableNumEntriesImmMemTables();
+        }
+
+        if (config.get(MONITOR_NUM_DELETES_ACTIVE_MEM_TABLE)) {
+            options.enableNumDeletesActiveMemTable();
+        }
+
+        if (config.get(MONITOR_NUM_DELETES_IMM_MEM_TABLE)) {
+            options.enableNumDeletesImmMemTables();
+        }
+
+        if (config.get(ESTIMATE_NUM_KEYS)) {
+            options.enableEstimateNumKeys();
+        }
+
+        if (config.get(ESTIMATE_TABLE_READERS_MEM)) {
+            options.enableEstimateTableReadersMem();
+        }
+
+        if (config.get(MONITOR_NUM_SNAPSHOTS)) {
+            options.enableNumSnapshots();
+        }
+
+        if (config.get(MONITOR_NUM_LIVE_VERSIONS)) {
+            options.enableNumLiveVersions();
+        }
+
+        if (config.get(ESTIMATE_LIVE_DATA_SIZE)) {
+            options.enableEstimateLiveDataSize();
+        }
+
+        if (config.get(MONITOR_TOTAL_SST_FILES_SIZE)) {
+            options.enableTotalSstFilesSize();
+        }
+
+        if (config.get(MONITOR_LIVE_SST_FILES_SIZE)) {
+            options.enableLiveSstFilesSize();
+        }
+
+        if (config.get(ESTIMATE_PENDING_COMPACTION_BYTES)) {
+            options.enableEstimatePendingCompactionBytes();
+        }
+
+        if (config.get(MONITOR_NUM_RUNNING_COMPACTIONS)) {
+            options.enableNumRunningCompactions();
+        }
+
+        if (config.get(MONITOR_NUM_RUNNING_FLUSHES)) {
+            options.enableNumRunningFlushes();
+        }
+
+        if (config.get(MONITOR_ACTUAL_DELAYED_WRITE_RATE)) {
+            options.enableActualDelayedWriteRate();
+        }
+
+        if (config.get(IS_WRITE_STOPPED)) {
+            options.enableIsWriteStopped();
+        }
+
+        if (config.get(BLOCK_CACHE_CAPACITY)) {
+            options.enableBlockCacheCapacity();
+        }
+
+        if (config.get(BLOCK_CACHE_USAGE)) {
+            options.enableBlockCacheUsage();
+        }
+
+        if (config.get(BLOCK_CACHE_PINNED_USAGE)) {
+            options.enableBlockCachePinnedUsage();
+        }
+
+        if (config.get(MONITOR_NUM_FILES_AT_LEVEL)) {
+            options.enableNumFilesAtLevel();
+        }
+
+        
options.setColumnFamilyAsVariable(config.get(COLUMN_FAMILY_AS_VARIABLE));
+    }
+
+    private static void configureStatisticsMetrics(
+            ForStNativeMetricOptions options, ReadableConfig config) {
+        for (Map.Entry<ConfigOption<Boolean>, TickerType> entry : 
tickerTypeMapping.entrySet()) {
+            if (config.get(entry.getKey())) {
+                options.monitorTickerTypes.add(entry.getValue());
+            }
+        }
+    }
+
+    private static final Map<ConfigOption<Boolean>, TickerType> 
tickerTypeMapping =
+            new HashMap<ConfigOption<Boolean>, TickerType>() {
+                private static final long serialVersionUID = 1L;
+
+                {
+                    put(MONITOR_BLOCK_CACHE_HIT, TickerType.BLOCK_CACHE_HIT);
+                    put(MONITOR_BLOCK_CACHE_MISS, TickerType.BLOCK_CACHE_MISS);
+                    put(MONITOR_BLOOM_FILTER_USEFUL, 
TickerType.BLOOM_FILTER_USEFUL);
+                    put(MONITOR_BLOOM_FILTER_FULL_POSITIVE, 
TickerType.BLOOM_FILTER_FULL_POSITIVE);
+                    put(
+                            MONITOR_BLOOM_FILTER_FULL_TRUE_POSITIVE,
+                            TickerType.BLOOM_FILTER_FULL_TRUE_POSITIVE);
+                    put(MONITOR_BYTES_READ, TickerType.BYTES_READ);
+                    put(MONITOR_ITER_BYTES_READ, TickerType.ITER_BYTES_READ);
+                    put(MONITOR_BYTES_WRITTEN, TickerType.BYTES_WRITTEN);
+                    put(MONITOR_COMPACTION_READ_BYTES, 
TickerType.COMPACT_READ_BYTES);
+                    put(MONITOR_COMPACTION_WRITE_BYTES, 
TickerType.COMPACT_WRITE_BYTES);
+                    put(MONITOR_STALL_MICROS, TickerType.STALL_MICROS);
+                }
+            };
+
+    private final Set<ForStProperty> properties;
+    private final Set<TickerType> monitorTickerTypes;
+    private boolean columnFamilyAsVariable = 
COLUMN_FAMILY_AS_VARIABLE.defaultValue();
+
+    public ForStNativeMetricOptions() {
+        this.properties = new HashSet<>();
+        this.monitorTickerTypes = new HashSet<>();
+    }
+
+    @VisibleForTesting
+    public void enableNativeStatistics(ConfigOption<Boolean> 
nativeStatisticsOption) {
+        TickerType tickerType = tickerTypeMapping.get(nativeStatisticsOption);
+        if (tickerType != null) {
+            monitorTickerTypes.add(tickerType);
+        } else {
+            throw new IllegalArgumentException(
+                    "Unknown configurable native statistics option " + 
nativeStatisticsOption);
+        }
+    }
+
+    /** Returns number of immutable memtables that have not yet been flushed. 
*/
+    public void enableNumImmutableMemTable() {
+        this.properties.add(ForStProperty.NumImmutableMemTable);
+    }
+
+    /** Returns 1 if a memtable flush is pending; otherwise, returns 0. */
+    public void enableMemTableFlushPending() {
+        this.properties.add(ForStProperty.MemTableFlushPending);
+    }
+
+    /** Returns 1 if at least one compaction is pending; otherwise, returns 0. 
*/
+    public void enableCompactionPending() {
+        this.properties.add(ForStProperty.CompactionPending);
+    }
+
+    /** Returns accumulated number of background errors. */
+    public void enableBackgroundErrors() {
+        this.properties.add(ForStProperty.BackgroundErrors);
+    }
+
+    /** Returns approximate size of active memtable (bytes). */
+    public void enableCurSizeActiveMemTable() {
+        this.properties.add(ForStProperty.CurSizeActiveMemTable);
+    }
+
+    /** Returns approximate size of active and unflushed immutable memtables 
(bytes). */
+    public void enableCurSizeAllMemTables() {
+        this.properties.add(ForStProperty.CurSizeAllMemTables);
+    }
+
+    /**
+     * Returns approximate size of active, unflushed immutable, and pinned 
immutable memtables
+     * (bytes).
+     */
+    public void enableSizeAllMemTables() {
+        this.properties.add(ForStProperty.SizeAllMemTables);
+    }
+
+    /** Returns total number of entries in the active memtable. */
+    public void enableNumEntriesActiveMemTable() {
+        this.properties.add(ForStProperty.NumEntriesActiveMemTable);
+    }
+
+    /** Returns total number of entries in the unflushed immutable memtables. 
*/
+    public void enableNumEntriesImmMemTables() {
+        this.properties.add(ForStProperty.NumEntriesImmMemTables);
+    }
+
+    /** Returns total number of delete entries in the active memtable. */
+    public void enableNumDeletesActiveMemTable() {
+        this.properties.add(ForStProperty.NumDeletesActiveMemTable);
+    }
+
+    /** Returns total number of delete entries in the unflushed immutable 
memtables. */
+    public void enableNumDeletesImmMemTables() {
+        this.properties.add(ForStProperty.NumDeletesImmMemTables);
+    }
+
+    /**
+     * Returns estimated number of total keys in the active and unflushed 
immutable memtables and
+     * storage.
+     */
+    public void enableEstimateNumKeys() {
+        this.properties.add(ForStProperty.EstimateNumKeys);
+    }
+
+    /**
+     * Returns estimated memory used for reading SST tables, excluding memory 
used in block cache
+     * (e.g.,filter and index blocks).
+     */
+    public void enableEstimateTableReadersMem() {
+        this.properties.add(ForStProperty.EstimateTableReadersMem);
+    }
+
+    /** Returns number of unreleased snapshots of the database. */
+    public void enableNumSnapshots() {
+        this.properties.add(ForStProperty.NumSnapshots);
+    }
+
+    /**
+     * Returns number of live versions. `Version` is an internal data 
structure. See version_set.h
+     * for details. More live versions often mean more SST files are held from 
being deleted, by
+     * iterators or unfinished compactions.
+     */
+    public void enableNumLiveVersions() {
+        this.properties.add(ForStProperty.NumLiveVersions);
+    }
+
+    /** Returns an estimate of the amount of live data in bytes. */
+    public void enableEstimateLiveDataSize() {
+        this.properties.add(ForStProperty.EstimateLiveDataSize);
+    }
+
+    /**
+     * Returns total size (bytes) of all SST files. <strong>WARNING</strong>: 
may slow down online
+     * queries if there are too many files.
+     */
+    public void enableTotalSstFilesSize() {
+        this.properties.add(ForStProperty.TotalSstFilesSize);
+    }
+
+    public void enableLiveSstFilesSize() {
+        this.properties.add(ForStProperty.LiveSstFilesSize);
+    }
+
+    /**
+     * Returns estimated total number of bytes compaction needs to rewrite to 
get all levels down to
+     * under target size. Not valid for other compactions than level-based.
+     */
+    public void enableEstimatePendingCompactionBytes() {
+        this.properties.add(ForStProperty.EstimatePendingCompactionBytes);
+    }
+
+    /** Returns the number of currently running compactions. */
+    public void enableNumRunningCompactions() {
+        this.properties.add(ForStProperty.NumRunningCompactions);
+    }
+
+    /** Returns the number of currently running flushes. */
+    public void enableNumRunningFlushes() {
+        this.properties.add(ForStProperty.NumRunningFlushes);
+    }
+
+    /** Returns the current actual delayed write rate. 0 means no delay. */
+    public void enableActualDelayedWriteRate() {
+        this.properties.add(ForStProperty.ActualDelayedWriteRate);
+    }
+
+    /** Returns 1 if write has been stopped. */
+    public void enableIsWriteStopped() {
+        this.properties.add(ForStProperty.IsWriteStopped);
+    }
+
+    /** Returns block cache capacity. */
+    public void enableBlockCacheCapacity() {
+        this.properties.add(ForStProperty.BlockCacheCapacity);
+    }
+
+    /** Returns the memory size for the entries residing in block cache. */
+    public void enableBlockCacheUsage() {
+        this.properties.add(ForStProperty.BlockCacheUsage);
+    }
+
+    /** Returns the memory size for the entries being pinned in block cache. */
+    public void enableBlockCachePinnedUsage() {
+        this.properties.add(ForStProperty.BlockCachePinnedUsage);
+    }
+
+    /** Returns the number of files per level. */
+    public void enableNumFilesAtLevel() {
+        this.properties.add(ForStProperty.NumFilesAtLevel0);
+        this.properties.add(ForStProperty.NumFilesAtLevel1);
+        this.properties.add(ForStProperty.NumFilesAtLevel2);
+        this.properties.add(ForStProperty.NumFilesAtLevel3);
+        this.properties.add(ForStProperty.NumFilesAtLevel4);
+        this.properties.add(ForStProperty.NumFilesAtLevel5);
+        this.properties.add(ForStProperty.NumFilesAtLevel6);
+    }
+
+    /** Returns the column family as variable. */
+    public void setColumnFamilyAsVariable(boolean columnFamilyAsVariable) {
+        this.columnFamilyAsVariable = columnFamilyAsVariable;
+    }
+
+    /** @return the enabled RocksDB property-based metrics */
+    public Collection<ForStProperty> getProperties() {
+        return Collections.unmodifiableCollection(properties);
+    }
+
+    /** @return the enabled RocksDB statistics metrics. */
+    public Collection<TickerType> getMonitorTickerTypes() {
+        return Collections.unmodifiableCollection(monitorTickerTypes);
+    }
+
+    /**
+     * {{@link ForStNativeMetricMonitor}} is enabled if any property or ticker 
type is set.
+     *
+     * @return true if {{RocksDBNativeMetricMonitor}} should be enabled, false 
otherwise.
+     */
+    public boolean isEnabled() {
+        return !properties.isEmpty() || isStatisticsEnabled();
+    }
+
+    /** @return true if RocksDB statistics metrics are enabled, false 
otherwise. */
+    public boolean isStatisticsEnabled() {
+        return !monitorTickerTypes.isEmpty();
+    }
+
+    /**
+     * {{@link ForStNativeMetricMonitor}} Whether to expose the column family 
as a variable..
+     *
+     * @return true is column family to expose variable, false otherwise.
+     */
+    public boolean isColumnFamilyAsVariable() {
+        return this.columnFamilyAsVariable;
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptionsFactory.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptionsFactory.java
index 7fbc432e659..80ac8511fa6 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptionsFactory.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptionsFactory.java
@@ -66,7 +66,19 @@ public interface ForStOptionsFactory extends 
java.io.Serializable {
     ColumnFamilyOptions createColumnOptions(
             ColumnFamilyOptions currentOptions, Collection<AutoCloseable> 
handlesToClose);
 
-    // TODO: Support Metrics Options
+    /**
+     * This method should enable certain ForSt metrics to be forwarded to 
Flink's metrics reporter.
+     *
+     * <p>Enabling these monitoring options may degrade ForSt performance and 
should be set with
+     * care.
+     *
+     * @param nativeMetricOptions The options object with the pre-defined 
options.
+     * @return The options object on which the additional options are set.
+     */
+    default ForStNativeMetricOptions createNativeMetricsOptions(
+            ForStNativeMetricOptions nativeMetricOptions) {
+        return nativeMetricOptions;
+    }
 
     /**
      * This method should set the additional options on top of the current 
options object. The
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStProperty.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStProperty.java
new file mode 100644
index 00000000000..042fb5916b8
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStProperty.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.annotation.Internal;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+
+/**
+ * {@link RocksDB} properties that can be queried by Flink's metrics reporter.
+ *
+ * <p>Note: Metrics properties are added in each new version of {@link 
RocksDB}, when upgrading to a
+ * latter version consider updating this class with newly added properties.
+ */
+@Internal
+public enum ForStProperty {
+    NumImmutableMemTable("num-immutable-mem-table", PropertyType.NUMBER),
+    MemTableFlushPending("mem-table-flush-pending", PropertyType.NUMBER),
+    CompactionPending("compaction-pending", PropertyType.NUMBER),
+    BackgroundErrors("background-errors", PropertyType.NUMBER),
+    CurSizeActiveMemTable("cur-size-active-mem-table", PropertyType.NUMBER),
+    CurSizeAllMemTables("cur-size-all-mem-tables", PropertyType.NUMBER),
+    SizeAllMemTables("size-all-mem-tables", PropertyType.NUMBER),
+    NumEntriesActiveMemTable("num-entries-active-mem-table", 
PropertyType.NUMBER),
+    NumEntriesImmMemTables("num-entries-imm-mem-tables", PropertyType.NUMBER),
+    NumDeletesActiveMemTable("num-deletes-active-mem-table", 
PropertyType.NUMBER),
+    NumDeletesImmMemTables("num-deletes-imm-mem-tables", PropertyType.NUMBER),
+    EstimateNumKeys("estimate-num-keys", PropertyType.NUMBER),
+    EstimateTableReadersMem("estimate-table-readers-mem", PropertyType.NUMBER),
+    NumSnapshots("num-snapshots", PropertyType.NUMBER),
+    NumLiveVersions("num-live-versions", PropertyType.NUMBER),
+    EstimateLiveDataSize("estimate-live-data-size", PropertyType.NUMBER),
+    TotalSstFilesSize("total-sst-files-size", PropertyType.NUMBER),
+    LiveSstFilesSize("live-sst-files-size", PropertyType.NUMBER),
+    EstimatePendingCompactionBytes("estimate-pending-compaction-bytes", 
PropertyType.NUMBER),
+    NumRunningCompactions("num-running-compactions", PropertyType.NUMBER),
+    NumRunningFlushes("num-running-flushes", PropertyType.NUMBER),
+    ActualDelayedWriteRate("actual-delayed-write-rate", PropertyType.NUMBER),
+    IsWriteStopped("is-write-stopped", PropertyType.NUMBER),
+    BlockCacheCapacity("block-cache-capacity", PropertyType.NUMBER),
+    BlockCacheUsage("block-cache-usage", PropertyType.NUMBER),
+    BlockCachePinnedUsage("block-cache-pinned-usage", PropertyType.NUMBER),
+    NumFilesAtLevel0("num-files-at-level0", PropertyType.STRING),
+    NumFilesAtLevel1("num-files-at-level1", PropertyType.STRING),
+    NumFilesAtLevel2("num-files-at-level2", PropertyType.STRING),
+    NumFilesAtLevel3("num-files-at-level3", PropertyType.STRING),
+    NumFilesAtLevel4("num-files-at-level4", PropertyType.STRING),
+    NumFilesAtLevel5("num-files-at-level5", PropertyType.STRING),
+    NumFilesAtLevel6("num-files-at-level6", PropertyType.STRING);
+
+    private static final String ROCKS_DB_PROPERTY_FORMAT = "rocksdb.%s";
+
+    private static final String CONFIG_KEY_FORMAT = 
"state.backend.forst.metrics.%s";
+
+    private final String property;
+
+    private final PropertyType type;
+
+    /** Property type. */
+    private enum PropertyType {
+        NUMBER,
+        STRING
+    }
+
+    ForStProperty(String property, PropertyType type) {
+        this.property = property;
+        this.type = type;
+    }
+
+    /**
+     * @return property string that can be used to query {@link
+     *     RocksDB#getLongProperty(ColumnFamilyHandle, String)}.
+     */
+    public String getForStProperty() {
+        return String.format(ROCKS_DB_PROPERTY_FORMAT, property);
+    }
+
+    public long getNumericalPropertyValue(RocksDB rocksDB, ColumnFamilyHandle 
handle)
+            throws Exception {
+        String forStProperty = getForStProperty();
+        switch (type) {
+            case NUMBER:
+                return rocksDB.getLongProperty(handle, forStProperty);
+            case STRING:
+                return Long.parseLong(rocksDB.getProperty(handle, 
forStProperty));
+            default:
+                throw new RuntimeException(
+                        String.format("ForSt property type: %s not supported", 
type));
+        }
+    }
+
+    /**
+     * @return key for enabling metric using {@link 
org.apache.flink.configuration.Configuration}.
+     */
+    public String getConfigKey() {
+        return String.format(CONFIG_KEY_FORMAT, property);
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStExtension.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStExtension.java
new file mode 100644
index 00000000000..a6a2bfa9a22
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStExtension.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.IOUtils;
+
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.InfoLogLevel;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.Statistics;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/** External extension for tests that require an instance of ForSt. */
+public class ForStExtension implements BeforeEachCallback, AfterEachCallback {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ForStExtension.class);
+
+    /** Factory for {@link DBOptions} and {@link ColumnFamilyOptions}. */
+    private final ForStOptionsFactory optionsFactory;
+
+    private final boolean enableStatistics;
+
+    /** Temporary folder that provides the working directory for the ForSt 
instance. */
+    private TemporaryFolder temporaryFolder;
+
+    /** The options for the ForSt instance. */
+    private DBOptions dbOptions;
+
+    /** The options for column families created with the ForSt instance. */
+    private ColumnFamilyOptions columnFamilyOptions;
+
+    /** The options for writes. */
+    private WriteOptions writeOptions;
+
+    /** The options for reads. */
+    private ReadOptions readOptions;
+
+    /** The ForSt instance object. */
+    private RocksDB db;
+
+    /** List of all column families that have been created with the ForSt 
instance. */
+    private List<ColumnFamilyHandle> columnFamilyHandles;
+
+    /** Resources to close. */
+    private final ArrayList<AutoCloseable> handlesToClose = new ArrayList<>();
+
+    public ForStExtension() {
+        this(false);
+    }
+
+    public ForStExtension(boolean enableStatistics) {
+        this(
+                new ForStOptionsFactory() {
+                    private static final long serialVersionUID = 1L;
+
+                    @Override
+                    public DBOptions createDBOptions(
+                            DBOptions currentOptions, 
Collection<AutoCloseable> handlesToClose) {
+                        // close it before reuse the reference.
+                        try {
+                            currentOptions.close();
+                        } catch (Exception e) {
+                            LOG.error("Close previous DBOptions's instance 
failed.", e);
+                        }
+
+                        return new DBOptions()
+                                .setMaxBackgroundJobs(4)
+                                .setUseFsync(false)
+                                .setMaxOpenFiles(-1)
+                                .setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
+                                .setStatsDumpPeriodSec(0);
+                    }
+
+                    @Override
+                    public ColumnFamilyOptions createColumnOptions(
+                            ColumnFamilyOptions currentOptions,
+                            Collection<AutoCloseable> handlesToClose) {
+                        // close it before reuse the reference.
+                        try {
+                            currentOptions.close();
+                        } catch (Exception e) {
+                            LOG.error("Close previous ColumnOptions's instance 
failed.", e);
+                        }
+
+                        return new 
ColumnFamilyOptions().optimizeForPointLookup(40960);
+                    }
+                },
+                enableStatistics);
+    }
+
+    public ForStExtension(@Nonnull ForStOptionsFactory optionsFactory, boolean 
enableStatistics) {
+        this.optionsFactory = optionsFactory;
+        this.enableStatistics = enableStatistics;
+    }
+
+    public ColumnFamilyHandle getDefaultColumnFamily() {
+        return columnFamilyHandles.get(0);
+    }
+
+    public RocksDB getDB() {
+        return db;
+    }
+
+    public DBOptions getDbOptions() {
+        return dbOptions;
+    }
+
+    /** Creates and returns a new column family with the given name. */
+    public ColumnFamilyHandle createNewColumnFamily(String name) {
+        try {
+            final ColumnFamilyHandle columnFamily =
+                    db.createColumnFamily(
+                            new ColumnFamilyDescriptor(name.getBytes(), 
columnFamilyOptions));
+            columnFamilyHandles.add(columnFamily);
+            return columnFamily;
+        } catch (Exception ex) {
+            throw new FlinkRuntimeException("Could not create column family.", 
ex);
+        }
+    }
+
+    public void before() throws Exception {
+        this.temporaryFolder = new TemporaryFolder();
+        this.temporaryFolder.create();
+        final File rocksFolder = temporaryFolder.newFolder();
+        this.dbOptions =
+                optionsFactory
+                        .createDBOptions(
+                                new DBOptions()
+                                        .setUseFsync(false)
+                                        
.setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
+                                        .setStatsDumpPeriodSec(0),
+                                handlesToClose)
+                        .setCreateIfMissing(true);
+        if (enableStatistics) {
+            Statistics statistics = new Statistics();
+            dbOptions.setStatistics(statistics);
+            handlesToClose.add(statistics);
+        }
+        this.columnFamilyOptions =
+                optionsFactory.createColumnOptions(new ColumnFamilyOptions(), 
handlesToClose);
+        this.writeOptions = new WriteOptions();
+        this.writeOptions.disableWAL();
+        this.readOptions = new ReadOptions();
+        this.columnFamilyHandles = new ArrayList<>(1);
+        this.db =
+                RocksDB.open(
+                        dbOptions,
+                        rocksFolder.getAbsolutePath(),
+                        Collections.singletonList(
+                                new ColumnFamilyDescriptor(
+                                        "default".getBytes(), 
columnFamilyOptions)),
+                        columnFamilyHandles);
+    }
+
+    public void after() {
+        // destruct in reversed order of creation.
+        for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
+            IOUtils.closeQuietly(columnFamilyHandle);
+        }
+        IOUtils.closeQuietly(this.db);
+        IOUtils.closeQuietly(this.readOptions);
+        IOUtils.closeQuietly(this.writeOptions);
+        IOUtils.closeQuietly(this.columnFamilyOptions);
+        IOUtils.closeQuietly(this.dbOptions);
+        handlesToClose.forEach(IOUtils::closeQuietly);
+        temporaryFolder.delete();
+    }
+
+    @Override
+    public void beforeEach(ExtensionContext context) throws Exception {
+        before();
+    }
+
+    @Override
+    public void afterEach(ExtensionContext context) {
+        after();
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStNativeMetricMonitorTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStNativeMetricMonitorTest.java
new file mode 100644
index 00000000000..b1a2716e8a3
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStNativeMetricMonitorTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.GenericMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+import org.apache.flink.traces.SpanBuilder;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** validate native metric monitor. */
+class ForStNativeMetricMonitorTest {
+
+    private static final String OPERATOR_NAME = "dummy";
+
+    private static final String COLUMN_FAMILY_NAME = "column-family";
+
+    @RegisterExtension public ForStExtension forStExtension = new 
ForStExtension(true);
+
+    @Test
+    void testMetricMonitorLifecycle() throws Throwable {
+        // We use a local variable here to manually control the life-cycle.
+        // This allows us to verify that metrics do not try to access
+        // RocksDB after the monitor was closed.
+        ForStExtension localForStExtension = new ForStExtension(true);
+        localForStExtension.before();
+
+        SimpleMetricRegistry registry = new SimpleMetricRegistry();
+        GenericMetricGroup group =
+                new GenericMetricGroup(
+                        registry,
+                        
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
+                        OPERATOR_NAME);
+
+        ForStNativeMetricOptions options = new ForStNativeMetricOptions();
+        // always returns a non-zero
+        // value since empty memtables
+        // have overhead.
+        options.enableSizeAllMemTables();
+        
options.enableNativeStatistics(ForStNativeMetricOptions.MONITOR_BYTES_WRITTEN);
+
+        ForStNativeMetricMonitor monitor =
+                new ForStNativeMetricMonitor(
+                        options,
+                        group,
+                        localForStExtension.getDB(),
+                        localForStExtension.getDbOptions().statistics());
+
+        ColumnFamilyHandle handle = 
localForStExtension.createNewColumnFamily(COLUMN_FAMILY_NAME);
+        monitor.registerColumnFamily(COLUMN_FAMILY_NAME, handle);
+
+        assertThat(registry.propertyMetrics)
+                .withFailMessage("Failed to register metrics for column 
family")
+                .hasSize(1);
+
+        // write something to ensure the bytes-written is not zero.
+        localForStExtension.getDB().put(new byte[4], new byte[10]);
+
+        for (ForStNativeMetricMonitor.ForStNativePropertyMetricView view :
+                registry.propertyMetrics) {
+            view.update();
+            assertThat(view.getValue())
+                    .withFailMessage("Failed to pull metric from ForSt")
+                    .isNotEqualTo(BigInteger.ZERO);
+            view.setValue(0L);
+        }
+
+        for (ForStNativeMetricMonitor.ForStNativeStatisticsMetricView view :
+                registry.statisticsMetrics) {
+            view.update();
+            assertThat(view.getValue()).isNotZero();
+            view.setValue(0L);
+        }
+
+        // After the monitor is closed no metric should be accessing RocksDB 
anymore.
+        // If they do, then this test will likely fail with a segmentation 
fault.
+        monitor.close();
+
+        localForStExtension.after();
+
+        for (ForStNativeMetricMonitor.ForStNativePropertyMetricView view :
+                registry.propertyMetrics) {
+            view.update();
+            assertThat(view.getValue())
+                    .withFailMessage("Failed to release ForSt reference")
+                    .isEqualTo(BigInteger.ZERO);
+        }
+
+        for (ForStNativeMetricMonitor.ForStNativeStatisticsMetricView view :
+                registry.statisticsMetrics) {
+            view.update();
+            assertThat(view.getValue()).isZero();
+        }
+    }
+
+    @Test
+    void testReturnsUnsigned() throws Throwable {
+        ForStExtension localForStExtension = new ForStExtension();
+        localForStExtension.before();
+
+        SimpleMetricRegistry registry = new SimpleMetricRegistry();
+        GenericMetricGroup group =
+                new GenericMetricGroup(
+                        registry,
+                        
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
+                        OPERATOR_NAME);
+
+        ForStNativeMetricOptions options = new ForStNativeMetricOptions();
+        options.enableSizeAllMemTables();
+
+        ForStNativeMetricMonitor monitor =
+                new ForStNativeMetricMonitor(
+                        options,
+                        group,
+                        localForStExtension.getDB(),
+                        localForStExtension.getDbOptions().statistics());
+
+        ColumnFamilyHandle handle = 
forStExtension.createNewColumnFamily(COLUMN_FAMILY_NAME);
+        monitor.registerColumnFamily(COLUMN_FAMILY_NAME, handle);
+        ForStNativeMetricMonitor.ForStNativePropertyMetricView view =
+                registry.propertyMetrics.get(0);
+
+        view.setValue(-1);
+        BigInteger result = view.getValue();
+
+        localForStExtension.after();
+
+        assertThat(result.signum())
+                .withFailMessage("Failed to interpret ForSt result as an 
unsigned long")
+                .isOne();
+    }
+
+    @Test
+    void testClosedGaugesDontRead() throws RocksDBException {
+        SimpleMetricRegistry registry = new SimpleMetricRegistry();
+        GenericMetricGroup group =
+                new GenericMetricGroup(
+                        registry,
+                        
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
+                        OPERATOR_NAME);
+
+        ForStNativeMetricOptions options = new ForStNativeMetricOptions();
+        options.enableSizeAllMemTables();
+        
options.enableNativeStatistics(ForStNativeMetricOptions.MONITOR_BLOCK_CACHE_HIT);
+
+        ForStNativeMetricMonitor monitor =
+                new ForStNativeMetricMonitor(
+                        options,
+                        group,
+                        forStExtension.getDB(),
+                        forStExtension.getDbOptions().statistics());
+
+        ColumnFamilyHandle handle = 
forStExtension.createNewColumnFamily(COLUMN_FAMILY_NAME);
+        monitor.registerColumnFamily(COLUMN_FAMILY_NAME, handle);
+
+        forStExtension.getDB().put(new byte[4], new byte[10]);
+
+        for (ForStNativeMetricMonitor.ForStNativePropertyMetricView view :
+                registry.propertyMetrics) {
+            view.close();
+            view.update();
+            assertThat(view.getValue())
+                    .withFailMessage("Closed gauge still queried ForSt")
+                    .isEqualTo(BigInteger.ZERO);
+        }
+
+        for (ForStNativeMetricMonitor.ForStNativeStatisticsMetricView view :
+                registry.statisticsMetrics) {
+            view.close();
+            view.update();
+            assertThat(view.getValue())
+                    .withFailMessage("Closed gauge still queried ForSt")
+                    .isZero();
+        }
+    }
+
+    static class SimpleMetricRegistry implements MetricRegistry {
+        List<ForStNativeMetricMonitor.ForStNativePropertyMetricView> 
propertyMetrics =
+                new ArrayList<>();
+
+        List<ForStNativeMetricMonitor.ForStNativeStatisticsMetricView> 
statisticsMetrics =
+                new ArrayList<>();
+
+        @Override
+        public char getDelimiter() {
+            return 0;
+        }
+
+        @Override
+        public int getNumberReporters() {
+            return 0;
+        }
+
+        @Override
+        public void addSpan(SpanBuilder spanBuilder) {}
+
+        @Override
+        public void register(Metric metric, String metricName, 
AbstractMetricGroup group) {
+            if (metric instanceof 
ForStNativeMetricMonitor.ForStNativePropertyMetricView) {
+                propertyMetrics.add(
+                        
(ForStNativeMetricMonitor.ForStNativePropertyMetricView) metric);
+            } else if (metric instanceof 
ForStNativeMetricMonitor.ForStNativeStatisticsMetricView) {
+                statisticsMetrics.add(
+                        
(ForStNativeMetricMonitor.ForStNativeStatisticsMetricView) metric);
+            }
+        }
+
+        @Override
+        public void unregister(Metric metric, String metricName, 
AbstractMetricGroup group) {}
+
+        @Override
+        public ScopeFormats getScopeFormats() {
+            Configuration config = new Configuration();
+
+            config.set(MetricOptions.SCOPE_NAMING_TM, "A");
+            config.set(MetricOptions.SCOPE_NAMING_TM_JOB, "B");
+            config.set(MetricOptions.SCOPE_NAMING_TASK, "C");
+            config.set(MetricOptions.SCOPE_NAMING_OPERATOR, "D");
+
+            return ScopeFormats.fromConfig(config);
+        }
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStNativeMetricOptionsTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStNativeMetricOptionsTest.java
new file mode 100644
index 00000000000..1c8a3bddb61
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStNativeMetricOptionsTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/** Test all native metrics can be set using configuration. */
+public class ForStNativeMetricOptionsTest {
+    @Test
+    public void testNativeMetricsConfigurable() {
+        for (ForStProperty property : ForStProperty.values()) {
+            Configuration config = new Configuration();
+            if (property.getConfigKey().contains("num-files-at-level")) {
+                
config.set(ForStNativeMetricOptions.MONITOR_NUM_FILES_AT_LEVEL, true);
+            } else {
+                config.setBoolean(property.getConfigKey(), true);
+            }
+
+            ForStNativeMetricOptions options = 
ForStNativeMetricOptions.fromConfig(config);
+
+            Assert.assertTrue(
+                    String.format(
+                            "Failed to enable native metrics with property %s",
+                            property.getConfigKey()),
+                    options.isEnabled());
+
+            Assert.assertTrue(
+                    String.format(
+                            "Failed to enable native metric %s using config",
+                            property.getConfigKey()),
+                    options.getProperties().contains(property));
+        }
+    }
+}
diff --git 
a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStPropertyTest.java
 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStPropertyTest.java
new file mode 100644
index 00000000000..56e577b803f
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStPropertyTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+
+/** Validate ForSt properties. */
+class ForStPropertyTest {
+
+    @RegisterExtension public ForStExtension forStExtension = new 
ForStExtension();
+
+    @Test
+    void testForStPropertiesValid() {
+        RocksDB db = forStExtension.getDB();
+        ColumnFamilyHandle handle = forStExtension.getDefaultColumnFamily();
+
+        for (ForStProperty forStProperty : ForStProperty.values()) {
+            try {
+                forStProperty.getNumericalPropertyValue(db, handle);
+            } catch (Exception e) {
+                throw new AssertionError(
+                        String.format(
+                                "Invalid ForSt property %s", 
forStProperty.getForStProperty()),
+                        e);
+            }
+        }
+    }
+}

Reply via email to