This is an automated email from the ASF dual-hosted git repository. hangxiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8411367ab0a213ec55c679a496352853bf96af51 Author: Hangxiang Yu <master...@gmail.com> AuthorDate: Mon Apr 15 21:25:47 2024 +0800 [FLINK-35047][state] Introduce Metrics for ForStStateBackend --- .../state/forst/ForStNativeMetricMonitor.java | 233 +++++++ .../state/forst/ForStNativeMetricOptions.java | 700 +++++++++++++++++++++ .../flink/state/forst/ForStOptionsFactory.java | 14 +- .../apache/flink/state/forst/ForStProperty.java | 115 ++++ .../apache/flink/state/forst/ForStExtension.java | 212 +++++++ .../state/forst/ForStNativeMetricMonitorTest.java | 254 ++++++++ .../state/forst/ForStNativeMetricOptionsTest.java | 53 ++ .../flink/state/forst/ForStPropertyTest.java | 47 ++ 8 files changed, 1627 insertions(+), 1 deletion(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStNativeMetricMonitor.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStNativeMetricMonitor.java new file mode 100644 index 00000000000..71b511853d5 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStNativeMetricMonitor.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.View; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; +import org.rocksdb.Statistics; +import org.rocksdb.TickerType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.io.Closeable; +import java.math.BigInteger; + +/** + * A monitor which pulls {{@link RocksDB}} native metrics and forwards them to Flink's metric group. + * All metrics are unsigned longs and are reported at the column family level. + */ +@Internal +public class ForStNativeMetricMonitor implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(ForStNativeMetricMonitor.class); + + private final ForStNativeMetricOptions options; + + private final MetricGroup metricGroup; + + private final Object lock; + + static final String COLUMN_FAMILY_KEY = "column_family"; + + @GuardedBy("lock") + private RocksDB rocksDB; + + @Nullable + @GuardedBy("lock") + private Statistics statistics; + + public ForStNativeMetricMonitor( + @Nonnull ForStNativeMetricOptions options, + @Nonnull MetricGroup metricGroup, + @Nonnull RocksDB rocksDB, + @Nullable Statistics statistics) { + this.options = options; + this.metricGroup = metricGroup; + this.rocksDB = rocksDB; + this.statistics = statistics; + this.lock = new Object(); + registerStatistics(); + } + + /** Register gauges to pull native metrics for the database. */ + private void registerStatistics() { + if (statistics != null) { + for (TickerType tickerType : options.getMonitorTickerTypes()) { + metricGroup.gauge( + String.format("rocksdb.%s", tickerType.name().toLowerCase()), + new ForStNativeStatisticsMetricView(tickerType)); + } + } + } + + /** + * Register gauges to pull native metrics for the column family. + * + * @param columnFamilyName group name for the new gauges + * @param handle native handle to the column family + */ + void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle handle) { + + boolean columnFamilyAsVariable = options.isColumnFamilyAsVariable(); + MetricGroup group = + columnFamilyAsVariable + ? metricGroup.addGroup(COLUMN_FAMILY_KEY, columnFamilyName) + : metricGroup.addGroup(columnFamilyName); + + for (ForStProperty property : options.getProperties()) { + ForStNativePropertyMetricView gauge = + new ForStNativePropertyMetricView(handle, property); + group.gauge(property.getForStProperty(), gauge); + } + } + + /** Updates the value of metricView if the reference is still valid. */ + private void setProperty(ForStNativePropertyMetricView metricView) { + if (metricView.isClosed()) { + return; + } + try { + synchronized (lock) { + if (rocksDB != null) { + long value = + metricView.property.getNumericalPropertyValue( + rocksDB, metricView.handle); + metricView.setValue(value); + } + } + } catch (Exception e) { + metricView.close(); + LOG.warn("Failed to read native metric {} from ForSt.", metricView.property, e); + } + } + + private void setStatistics(ForStNativeStatisticsMetricView metricView) { + if (metricView.isClosed()) { + return; + } + if (statistics != null) { + synchronized (lock) { + metricView.setValue(statistics.getTickerCount(metricView.tickerType)); + } + } + } + + @Override + public void close() { + synchronized (lock) { + rocksDB = null; + statistics = null; + } + } + + abstract static class ForStNativeView implements View { + private boolean closed; + + ForStNativeView() { + this.closed = false; + } + + void close() { + closed = true; + } + + boolean isClosed() { + return closed; + } + } + + /** + * A gauge which periodically pulls a ForSt property-based native metric for the specified + * column family / metric pair. + * + * <p><strong>Note</strong>: As the returned property is of type {@code uint64_t} on C++ side + * the returning value can be negative. Because java does not support unsigned long types, this + * gauge wraps the result in a {@link BigInteger}. + */ + class ForStNativePropertyMetricView extends ForStNativeView implements Gauge<BigInteger> { + private final ForStProperty property; + + private final ColumnFamilyHandle handle; + + private BigInteger bigInteger; + + private ForStNativePropertyMetricView( + ColumnFamilyHandle handle, @Nonnull ForStProperty property) { + this.handle = handle; + this.property = property; + this.bigInteger = BigInteger.ZERO; + } + + public void setValue(long value) { + if (value >= 0L) { + bigInteger = BigInteger.valueOf(value); + } else { + int upper = (int) (value >>> 32); + int lower = (int) value; + + bigInteger = + BigInteger.valueOf(Integer.toUnsignedLong(upper)) + .shiftLeft(32) + .add(BigInteger.valueOf(Integer.toUnsignedLong(lower))); + } + } + + @Override + public BigInteger getValue() { + return bigInteger; + } + + @Override + public void update() { + setProperty(this); + } + } + + /** A gauge which periodically pulls a ForSt statistics-based native metric for the database. */ + class ForStNativeStatisticsMetricView extends ForStNativeView implements Gauge<Long> { + private final TickerType tickerType; + private long value; + + private ForStNativeStatisticsMetricView(TickerType tickerType) { + this.tickerType = tickerType; + } + + @Override + public Long getValue() { + return value; + } + + void setValue(long value) { + this.value = value; + } + + @Override + public void update() { + setStatistics(this); + } + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStNativeMetricOptions.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStNativeMetricOptions.java new file mode 100644 index 00000000000..40a973b61ef --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStNativeMetricOptions.java @@ -0,0 +1,700 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.ReadableConfig; + +import org.rocksdb.TickerType; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * Enable which ForSt metrics to forward to Flink's metrics reporter. + * + * <p>Property based metrics would report at the column family level and return unsigned long + * values. + * + * <p>Statistics based metrics would report at the database level, it can return ticker or histogram + * kind results. + * + * <p>Properties and doc comments are taken from RocksDB documentation. See <a + * href="https://github.com/facebook/rocksdb/blob/64324e329eb0a9b4e77241a425a1615ff524c7f1/include/rocksdb/db.h#L429"> + * db.h</a> for more information. + */ +@Experimental +public class ForStNativeMetricOptions implements Serializable { + private static final long serialVersionUID = 1L; + + // -------------------------------------------------------------------------------------------- + // RocksDB property based metrics, report at column family level + // -------------------------------------------------------------------------------------------- + + public static final String METRICS_COLUMN_FAMILY_AS_VARIABLE_KEY = + "state.backend.rocksdb.metrics" + ".column-family-as-variable"; + + public static final ConfigOption<Boolean> MONITOR_NUM_IMMUTABLE_MEM_TABLES = + ConfigOptions.key(ForStProperty.NumImmutableMemTable.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription("Monitor the number of immutable memtables in RocksDB."); + + public static final ConfigOption<Boolean> MONITOR_MEM_TABLE_FLUSH_PENDING = + ConfigOptions.key(ForStProperty.MemTableFlushPending.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription("Monitor the number of pending memtable flushes in RocksDB."); + + public static final ConfigOption<Boolean> TRACK_COMPACTION_PENDING = + ConfigOptions.key(ForStProperty.CompactionPending.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription( + "Track pending compactions in RocksDB. Returns 1 if a compaction is pending, 0 otherwise."); + + public static final ConfigOption<Boolean> MONITOR_BACKGROUND_ERRORS = + ConfigOptions.key(ForStProperty.BackgroundErrors.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription("Monitor the number of background errors in RocksDB."); + + public static final ConfigOption<Boolean> MONITOR_CUR_SIZE_ACTIVE_MEM_TABLE = + ConfigOptions.key(ForStProperty.CurSizeActiveMemTable.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the approximate size of the active memtable in bytes."); + + public static final ConfigOption<Boolean> MONITOR_CUR_SIZE_ALL_MEM_TABLE = + ConfigOptions.key(ForStProperty.CurSizeAllMemTables.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the approximate size of the active and unflushed immutable memtables" + + " in bytes."); + + public static final ConfigOption<Boolean> MONITOR_SIZE_ALL_MEM_TABLES = + ConfigOptions.key(ForStProperty.SizeAllMemTables.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the approximate size of the active, unflushed immutable, " + + "and pinned immutable memtables in bytes."); + + public static final ConfigOption<Boolean> MONITOR_NUM_ENTRIES_ACTIVE_MEM_TABLE = + ConfigOptions.key(ForStProperty.NumEntriesActiveMemTable.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription("Monitor the total number of entries in the active memtable."); + + public static final ConfigOption<Boolean> MONITOR_NUM_ENTRIES_IMM_MEM_TABLES = + ConfigOptions.key(ForStProperty.NumEntriesImmMemTables.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the total number of entries in the unflushed immutable memtables."); + + public static final ConfigOption<Boolean> MONITOR_NUM_DELETES_ACTIVE_MEM_TABLE = + ConfigOptions.key(ForStProperty.NumDeletesActiveMemTable.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the total number of delete entries in the active memtable."); + + public static final ConfigOption<Boolean> MONITOR_NUM_DELETES_IMM_MEM_TABLE = + ConfigOptions.key(ForStProperty.NumDeletesImmMemTables.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the total number of delete entries in the unflushed immutable memtables."); + + public static final ConfigOption<Boolean> ESTIMATE_NUM_KEYS = + ConfigOptions.key(ForStProperty.EstimateNumKeys.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription("Estimate the number of keys in RocksDB."); + + public static final ConfigOption<Boolean> ESTIMATE_TABLE_READERS_MEM = + ConfigOptions.key(ForStProperty.EstimateTableReadersMem.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription( + "Estimate the memory used for reading SST tables, excluding memory" + + " used in block cache (e.g.,filter and index blocks) in bytes."); + + public static final ConfigOption<Boolean> MONITOR_NUM_SNAPSHOTS = + ConfigOptions.key(ForStProperty.NumSnapshots.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription("Monitor the number of unreleased snapshots of the database."); + + public static final ConfigOption<Boolean> MONITOR_NUM_LIVE_VERSIONS = + ConfigOptions.key(ForStProperty.NumLiveVersions.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor number of live versions. Version is an internal data structure. " + + "See RocksDB file version_set.h for details. More live versions often mean more SST files are held " + + "from being deleted, by iterators or unfinished compactions."); + + public static final ConfigOption<Boolean> ESTIMATE_LIVE_DATA_SIZE = + ConfigOptions.key(ForStProperty.EstimateLiveDataSize.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription( + "Estimate of the amount of live data in bytes (usually smaller than sst files size due to space amplification)."); + + public static final ConfigOption<Boolean> MONITOR_TOTAL_SST_FILES_SIZE = + ConfigOptions.key(ForStProperty.TotalSstFilesSize.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the total size (bytes) of all SST files of all versions." + + "WARNING: may slow down online queries if there are too many files."); + + public static final ConfigOption<Boolean> MONITOR_LIVE_SST_FILES_SIZE = + ConfigOptions.key(ForStProperty.LiveSstFilesSize.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the total size (bytes) of all SST files belonging to the latest version." + + "WARNING: may slow down online queries if there are too many files."); + + public static final ConfigOption<Boolean> ESTIMATE_PENDING_COMPACTION_BYTES = + ConfigOptions.key(ForStProperty.EstimatePendingCompactionBytes.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription( + "Estimated total number of bytes compaction needs to rewrite to get all levels " + + "down to under target size. Not valid for other compactions than level-based."); + + public static final ConfigOption<Boolean> MONITOR_NUM_RUNNING_COMPACTIONS = + ConfigOptions.key(ForStProperty.NumRunningCompactions.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription("Monitor the number of currently running compactions."); + + public static final ConfigOption<Boolean> MONITOR_NUM_RUNNING_FLUSHES = + ConfigOptions.key(ForStProperty.NumRunningFlushes.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription("Monitor the number of currently running flushes."); + + public static final ConfigOption<Boolean> MONITOR_ACTUAL_DELAYED_WRITE_RATE = + ConfigOptions.key(ForStProperty.ActualDelayedWriteRate.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the current actual delayed write rate. 0 means no delay."); + + public static final ConfigOption<Boolean> IS_WRITE_STOPPED = + ConfigOptions.key(ForStProperty.IsWriteStopped.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription( + "Track whether write has been stopped in RocksDB. Returns 1 if write has been stopped, 0 otherwise."); + + public static final ConfigOption<Boolean> BLOCK_CACHE_CAPACITY = + ConfigOptions.key(ForStProperty.BlockCacheCapacity.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription("Monitor block cache capacity."); + + public static final ConfigOption<Boolean> BLOCK_CACHE_USAGE = + ConfigOptions.key(ForStProperty.BlockCacheUsage.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the memory size for the entries residing in block cache."); + + public static final ConfigOption<Boolean> BLOCK_CACHE_PINNED_USAGE = + ConfigOptions.key(ForStProperty.BlockCachePinnedUsage.getConfigKey()) + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the memory size for the entries being pinned in block cache."); + + public static final ConfigOption<Boolean> COLUMN_FAMILY_AS_VARIABLE = + ConfigOptions.key(METRICS_COLUMN_FAMILY_AS_VARIABLE_KEY) + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to expose the column family as a variable for RocksDB property based metrics."); + + public static final ConfigOption<Boolean> MONITOR_NUM_FILES_AT_LEVEL = + ConfigOptions.key("state.backend.rocksdb.metrics.num-files-at-level") + .booleanType() + .defaultValue(false) + .withDescription("Monitor the number of files at each level."); + + // -------------------------------------------------------------------------------------------- + // RocksDB statistics based metrics, report at database level + // -------------------------------------------------------------------------------------------- + + public static final ConfigOption<Boolean> MONITOR_BLOCK_CACHE_HIT = + ConfigOptions.key("state.backend.rocksdb.metrics.block-cache-hit") + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the total count of block cache hit in RocksDB (BLOCK_CACHE_HIT == BLOCK_CACHE_INDEX_HIT + BLOCK_CACHE_FILTER_HIT + BLOCK_CACHE_DATA_HIT)."); + + public static final ConfigOption<Boolean> MONITOR_BLOCK_CACHE_MISS = + ConfigOptions.key("state.backend.rocksdb.metrics.block-cache-miss") + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the total count of block cache misses in RocksDB (BLOCK_CACHE_MISS == BLOCK_CACHE_INDEX_MISS + BLOCK_CACHE_FILTER_MISS + BLOCK_CACHE_DATA_MISS)."); + + public static final ConfigOption<Boolean> MONITOR_BLOOM_FILTER_USEFUL = + ConfigOptions.key("state.backend.rocksdb.metrics.bloom-filter-useful") + .booleanType() + .defaultValue(false) + .withDescription("Monitor the total count of reads avoided by bloom filter."); + + public static final ConfigOption<Boolean> MONITOR_BLOOM_FILTER_FULL_POSITIVE = + ConfigOptions.key("state.backend.rocksdb.metrics.bloom-filter-full-positive") + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the total count of reads not avoided by bloom full filter."); + + public static final ConfigOption<Boolean> MONITOR_BLOOM_FILTER_FULL_TRUE_POSITIVE = + ConfigOptions.key("state.backend.rocksdb.metrics.bloom-filter-full-true-positive") + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the total count of reads not avoided by bloom full filter and the data actually exists in RocksDB."); + + public static final ConfigOption<Boolean> MONITOR_BYTES_READ = + ConfigOptions.key("state.backend.rocksdb.metrics.bytes-read") + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the number of uncompressed bytes read (from memtables/cache/sst) from Get() operation in RocksDB."); + + public static final ConfigOption<Boolean> MONITOR_ITER_BYTES_READ = + ConfigOptions.key("state.backend.rocksdb.metrics.iter-bytes-read") + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the number of uncompressed bytes read (from memtables/cache/sst) from an iterator operation in RocksDB."); + + public static final ConfigOption<Boolean> MONITOR_BYTES_WRITTEN = + ConfigOptions.key("state.backend.rocksdb.metrics.bytes-written") + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the number of uncompressed bytes written by DB::{Put(), Delete(), Merge(), Write()} operations, which does not include the compaction written bytes, in RocksDB."); + + public static final ConfigOption<Boolean> MONITOR_COMPACTION_READ_BYTES = + ConfigOptions.key("state.backend.rocksdb.metrics.compaction-read-bytes") + .booleanType() + .defaultValue(false) + .withDescription("Monitor the bytes read during compaction in RocksDB."); + + public static final ConfigOption<Boolean> MONITOR_COMPACTION_WRITE_BYTES = + ConfigOptions.key("state.backend.rocksdb.metrics.compaction-write-bytes") + .booleanType() + .defaultValue(false) + .withDescription("Monitor the bytes written during compaction in RocksDB."); + + public static final ConfigOption<Boolean> MONITOR_STALL_MICROS = + ConfigOptions.key("state.backend.rocksdb.metrics.stall-micros") + .booleanType() + .defaultValue(false) + .withDescription( + "Monitor the duration of writer requiring to wait for compaction or flush to finish in RocksDB."); + + /** Creates a {@link ForStNativeMetricOptions} based on an external configuration. */ + public static ForStNativeMetricOptions fromConfig(ReadableConfig config) { + ForStNativeMetricOptions options = new ForStNativeMetricOptions(); + configurePropertyMetrics(options, config); + configureStatisticsMetrics(options, config); + return options; + } + + private static void configurePropertyMetrics( + ForStNativeMetricOptions options, ReadableConfig config) { + if (config.get(MONITOR_NUM_IMMUTABLE_MEM_TABLES)) { + options.enableNumImmutableMemTable(); + } + + if (config.get(MONITOR_MEM_TABLE_FLUSH_PENDING)) { + options.enableMemTableFlushPending(); + } + + if (config.get(TRACK_COMPACTION_PENDING)) { + options.enableCompactionPending(); + } + + if (config.get(MONITOR_BACKGROUND_ERRORS)) { + options.enableBackgroundErrors(); + } + + if (config.get(MONITOR_CUR_SIZE_ACTIVE_MEM_TABLE)) { + options.enableCurSizeActiveMemTable(); + } + + if (config.get(MONITOR_CUR_SIZE_ALL_MEM_TABLE)) { + options.enableCurSizeAllMemTables(); + } + + if (config.get(MONITOR_SIZE_ALL_MEM_TABLES)) { + options.enableSizeAllMemTables(); + } + + if (config.get(MONITOR_NUM_ENTRIES_ACTIVE_MEM_TABLE)) { + options.enableNumEntriesActiveMemTable(); + } + + if (config.get(MONITOR_NUM_ENTRIES_IMM_MEM_TABLES)) { + options.enableNumEntriesImmMemTables(); + } + + if (config.get(MONITOR_NUM_DELETES_ACTIVE_MEM_TABLE)) { + options.enableNumDeletesActiveMemTable(); + } + + if (config.get(MONITOR_NUM_DELETES_IMM_MEM_TABLE)) { + options.enableNumDeletesImmMemTables(); + } + + if (config.get(ESTIMATE_NUM_KEYS)) { + options.enableEstimateNumKeys(); + } + + if (config.get(ESTIMATE_TABLE_READERS_MEM)) { + options.enableEstimateTableReadersMem(); + } + + if (config.get(MONITOR_NUM_SNAPSHOTS)) { + options.enableNumSnapshots(); + } + + if (config.get(MONITOR_NUM_LIVE_VERSIONS)) { + options.enableNumLiveVersions(); + } + + if (config.get(ESTIMATE_LIVE_DATA_SIZE)) { + options.enableEstimateLiveDataSize(); + } + + if (config.get(MONITOR_TOTAL_SST_FILES_SIZE)) { + options.enableTotalSstFilesSize(); + } + + if (config.get(MONITOR_LIVE_SST_FILES_SIZE)) { + options.enableLiveSstFilesSize(); + } + + if (config.get(ESTIMATE_PENDING_COMPACTION_BYTES)) { + options.enableEstimatePendingCompactionBytes(); + } + + if (config.get(MONITOR_NUM_RUNNING_COMPACTIONS)) { + options.enableNumRunningCompactions(); + } + + if (config.get(MONITOR_NUM_RUNNING_FLUSHES)) { + options.enableNumRunningFlushes(); + } + + if (config.get(MONITOR_ACTUAL_DELAYED_WRITE_RATE)) { + options.enableActualDelayedWriteRate(); + } + + if (config.get(IS_WRITE_STOPPED)) { + options.enableIsWriteStopped(); + } + + if (config.get(BLOCK_CACHE_CAPACITY)) { + options.enableBlockCacheCapacity(); + } + + if (config.get(BLOCK_CACHE_USAGE)) { + options.enableBlockCacheUsage(); + } + + if (config.get(BLOCK_CACHE_PINNED_USAGE)) { + options.enableBlockCachePinnedUsage(); + } + + if (config.get(MONITOR_NUM_FILES_AT_LEVEL)) { + options.enableNumFilesAtLevel(); + } + + options.setColumnFamilyAsVariable(config.get(COLUMN_FAMILY_AS_VARIABLE)); + } + + private static void configureStatisticsMetrics( + ForStNativeMetricOptions options, ReadableConfig config) { + for (Map.Entry<ConfigOption<Boolean>, TickerType> entry : tickerTypeMapping.entrySet()) { + if (config.get(entry.getKey())) { + options.monitorTickerTypes.add(entry.getValue()); + } + } + } + + private static final Map<ConfigOption<Boolean>, TickerType> tickerTypeMapping = + new HashMap<ConfigOption<Boolean>, TickerType>() { + private static final long serialVersionUID = 1L; + + { + put(MONITOR_BLOCK_CACHE_HIT, TickerType.BLOCK_CACHE_HIT); + put(MONITOR_BLOCK_CACHE_MISS, TickerType.BLOCK_CACHE_MISS); + put(MONITOR_BLOOM_FILTER_USEFUL, TickerType.BLOOM_FILTER_USEFUL); + put(MONITOR_BLOOM_FILTER_FULL_POSITIVE, TickerType.BLOOM_FILTER_FULL_POSITIVE); + put( + MONITOR_BLOOM_FILTER_FULL_TRUE_POSITIVE, + TickerType.BLOOM_FILTER_FULL_TRUE_POSITIVE); + put(MONITOR_BYTES_READ, TickerType.BYTES_READ); + put(MONITOR_ITER_BYTES_READ, TickerType.ITER_BYTES_READ); + put(MONITOR_BYTES_WRITTEN, TickerType.BYTES_WRITTEN); + put(MONITOR_COMPACTION_READ_BYTES, TickerType.COMPACT_READ_BYTES); + put(MONITOR_COMPACTION_WRITE_BYTES, TickerType.COMPACT_WRITE_BYTES); + put(MONITOR_STALL_MICROS, TickerType.STALL_MICROS); + } + }; + + private final Set<ForStProperty> properties; + private final Set<TickerType> monitorTickerTypes; + private boolean columnFamilyAsVariable = COLUMN_FAMILY_AS_VARIABLE.defaultValue(); + + public ForStNativeMetricOptions() { + this.properties = new HashSet<>(); + this.monitorTickerTypes = new HashSet<>(); + } + + @VisibleForTesting + public void enableNativeStatistics(ConfigOption<Boolean> nativeStatisticsOption) { + TickerType tickerType = tickerTypeMapping.get(nativeStatisticsOption); + if (tickerType != null) { + monitorTickerTypes.add(tickerType); + } else { + throw new IllegalArgumentException( + "Unknown configurable native statistics option " + nativeStatisticsOption); + } + } + + /** Returns number of immutable memtables that have not yet been flushed. */ + public void enableNumImmutableMemTable() { + this.properties.add(ForStProperty.NumImmutableMemTable); + } + + /** Returns 1 if a memtable flush is pending; otherwise, returns 0. */ + public void enableMemTableFlushPending() { + this.properties.add(ForStProperty.MemTableFlushPending); + } + + /** Returns 1 if at least one compaction is pending; otherwise, returns 0. */ + public void enableCompactionPending() { + this.properties.add(ForStProperty.CompactionPending); + } + + /** Returns accumulated number of background errors. */ + public void enableBackgroundErrors() { + this.properties.add(ForStProperty.BackgroundErrors); + } + + /** Returns approximate size of active memtable (bytes). */ + public void enableCurSizeActiveMemTable() { + this.properties.add(ForStProperty.CurSizeActiveMemTable); + } + + /** Returns approximate size of active and unflushed immutable memtables (bytes). */ + public void enableCurSizeAllMemTables() { + this.properties.add(ForStProperty.CurSizeAllMemTables); + } + + /** + * Returns approximate size of active, unflushed immutable, and pinned immutable memtables + * (bytes). + */ + public void enableSizeAllMemTables() { + this.properties.add(ForStProperty.SizeAllMemTables); + } + + /** Returns total number of entries in the active memtable. */ + public void enableNumEntriesActiveMemTable() { + this.properties.add(ForStProperty.NumEntriesActiveMemTable); + } + + /** Returns total number of entries in the unflushed immutable memtables. */ + public void enableNumEntriesImmMemTables() { + this.properties.add(ForStProperty.NumEntriesImmMemTables); + } + + /** Returns total number of delete entries in the active memtable. */ + public void enableNumDeletesActiveMemTable() { + this.properties.add(ForStProperty.NumDeletesActiveMemTable); + } + + /** Returns total number of delete entries in the unflushed immutable memtables. */ + public void enableNumDeletesImmMemTables() { + this.properties.add(ForStProperty.NumDeletesImmMemTables); + } + + /** + * Returns estimated number of total keys in the active and unflushed immutable memtables and + * storage. + */ + public void enableEstimateNumKeys() { + this.properties.add(ForStProperty.EstimateNumKeys); + } + + /** + * Returns estimated memory used for reading SST tables, excluding memory used in block cache + * (e.g.,filter and index blocks). + */ + public void enableEstimateTableReadersMem() { + this.properties.add(ForStProperty.EstimateTableReadersMem); + } + + /** Returns number of unreleased snapshots of the database. */ + public void enableNumSnapshots() { + this.properties.add(ForStProperty.NumSnapshots); + } + + /** + * Returns number of live versions. `Version` is an internal data structure. See version_set.h + * for details. More live versions often mean more SST files are held from being deleted, by + * iterators or unfinished compactions. + */ + public void enableNumLiveVersions() { + this.properties.add(ForStProperty.NumLiveVersions); + } + + /** Returns an estimate of the amount of live data in bytes. */ + public void enableEstimateLiveDataSize() { + this.properties.add(ForStProperty.EstimateLiveDataSize); + } + + /** + * Returns total size (bytes) of all SST files. <strong>WARNING</strong>: may slow down online + * queries if there are too many files. + */ + public void enableTotalSstFilesSize() { + this.properties.add(ForStProperty.TotalSstFilesSize); + } + + public void enableLiveSstFilesSize() { + this.properties.add(ForStProperty.LiveSstFilesSize); + } + + /** + * Returns estimated total number of bytes compaction needs to rewrite to get all levels down to + * under target size. Not valid for other compactions than level-based. + */ + public void enableEstimatePendingCompactionBytes() { + this.properties.add(ForStProperty.EstimatePendingCompactionBytes); + } + + /** Returns the number of currently running compactions. */ + public void enableNumRunningCompactions() { + this.properties.add(ForStProperty.NumRunningCompactions); + } + + /** Returns the number of currently running flushes. */ + public void enableNumRunningFlushes() { + this.properties.add(ForStProperty.NumRunningFlushes); + } + + /** Returns the current actual delayed write rate. 0 means no delay. */ + public void enableActualDelayedWriteRate() { + this.properties.add(ForStProperty.ActualDelayedWriteRate); + } + + /** Returns 1 if write has been stopped. */ + public void enableIsWriteStopped() { + this.properties.add(ForStProperty.IsWriteStopped); + } + + /** Returns block cache capacity. */ + public void enableBlockCacheCapacity() { + this.properties.add(ForStProperty.BlockCacheCapacity); + } + + /** Returns the memory size for the entries residing in block cache. */ + public void enableBlockCacheUsage() { + this.properties.add(ForStProperty.BlockCacheUsage); + } + + /** Returns the memory size for the entries being pinned in block cache. */ + public void enableBlockCachePinnedUsage() { + this.properties.add(ForStProperty.BlockCachePinnedUsage); + } + + /** Returns the number of files per level. */ + public void enableNumFilesAtLevel() { + this.properties.add(ForStProperty.NumFilesAtLevel0); + this.properties.add(ForStProperty.NumFilesAtLevel1); + this.properties.add(ForStProperty.NumFilesAtLevel2); + this.properties.add(ForStProperty.NumFilesAtLevel3); + this.properties.add(ForStProperty.NumFilesAtLevel4); + this.properties.add(ForStProperty.NumFilesAtLevel5); + this.properties.add(ForStProperty.NumFilesAtLevel6); + } + + /** Returns the column family as variable. */ + public void setColumnFamilyAsVariable(boolean columnFamilyAsVariable) { + this.columnFamilyAsVariable = columnFamilyAsVariable; + } + + /** @return the enabled RocksDB property-based metrics */ + public Collection<ForStProperty> getProperties() { + return Collections.unmodifiableCollection(properties); + } + + /** @return the enabled RocksDB statistics metrics. */ + public Collection<TickerType> getMonitorTickerTypes() { + return Collections.unmodifiableCollection(monitorTickerTypes); + } + + /** + * {{@link ForStNativeMetricMonitor}} is enabled if any property or ticker type is set. + * + * @return true if {{RocksDBNativeMetricMonitor}} should be enabled, false otherwise. + */ + public boolean isEnabled() { + return !properties.isEmpty() || isStatisticsEnabled(); + } + + /** @return true if RocksDB statistics metrics are enabled, false otherwise. */ + public boolean isStatisticsEnabled() { + return !monitorTickerTypes.isEmpty(); + } + + /** + * {{@link ForStNativeMetricMonitor}} Whether to expose the column family as a variable.. + * + * @return true is column family to expose variable, false otherwise. + */ + public boolean isColumnFamilyAsVariable() { + return this.columnFamilyAsVariable; + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptionsFactory.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptionsFactory.java index 7fbc432e659..80ac8511fa6 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptionsFactory.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStOptionsFactory.java @@ -66,7 +66,19 @@ public interface ForStOptionsFactory extends java.io.Serializable { ColumnFamilyOptions createColumnOptions( ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose); - // TODO: Support Metrics Options + /** + * This method should enable certain ForSt metrics to be forwarded to Flink's metrics reporter. + * + * <p>Enabling these monitoring options may degrade ForSt performance and should be set with + * care. + * + * @param nativeMetricOptions The options object with the pre-defined options. + * @return The options object on which the additional options are set. + */ + default ForStNativeMetricOptions createNativeMetricsOptions( + ForStNativeMetricOptions nativeMetricOptions) { + return nativeMetricOptions; + } /** * This method should set the additional options on top of the current options object. The diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStProperty.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStProperty.java new file mode 100644 index 00000000000..042fb5916b8 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStProperty.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.annotation.Internal; + +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; + +/** + * {@link RocksDB} properties that can be queried by Flink's metrics reporter. + * + * <p>Note: Metrics properties are added in each new version of {@link RocksDB}, when upgrading to a + * latter version consider updating this class with newly added properties. + */ +@Internal +public enum ForStProperty { + NumImmutableMemTable("num-immutable-mem-table", PropertyType.NUMBER), + MemTableFlushPending("mem-table-flush-pending", PropertyType.NUMBER), + CompactionPending("compaction-pending", PropertyType.NUMBER), + BackgroundErrors("background-errors", PropertyType.NUMBER), + CurSizeActiveMemTable("cur-size-active-mem-table", PropertyType.NUMBER), + CurSizeAllMemTables("cur-size-all-mem-tables", PropertyType.NUMBER), + SizeAllMemTables("size-all-mem-tables", PropertyType.NUMBER), + NumEntriesActiveMemTable("num-entries-active-mem-table", PropertyType.NUMBER), + NumEntriesImmMemTables("num-entries-imm-mem-tables", PropertyType.NUMBER), + NumDeletesActiveMemTable("num-deletes-active-mem-table", PropertyType.NUMBER), + NumDeletesImmMemTables("num-deletes-imm-mem-tables", PropertyType.NUMBER), + EstimateNumKeys("estimate-num-keys", PropertyType.NUMBER), + EstimateTableReadersMem("estimate-table-readers-mem", PropertyType.NUMBER), + NumSnapshots("num-snapshots", PropertyType.NUMBER), + NumLiveVersions("num-live-versions", PropertyType.NUMBER), + EstimateLiveDataSize("estimate-live-data-size", PropertyType.NUMBER), + TotalSstFilesSize("total-sst-files-size", PropertyType.NUMBER), + LiveSstFilesSize("live-sst-files-size", PropertyType.NUMBER), + EstimatePendingCompactionBytes("estimate-pending-compaction-bytes", PropertyType.NUMBER), + NumRunningCompactions("num-running-compactions", PropertyType.NUMBER), + NumRunningFlushes("num-running-flushes", PropertyType.NUMBER), + ActualDelayedWriteRate("actual-delayed-write-rate", PropertyType.NUMBER), + IsWriteStopped("is-write-stopped", PropertyType.NUMBER), + BlockCacheCapacity("block-cache-capacity", PropertyType.NUMBER), + BlockCacheUsage("block-cache-usage", PropertyType.NUMBER), + BlockCachePinnedUsage("block-cache-pinned-usage", PropertyType.NUMBER), + NumFilesAtLevel0("num-files-at-level0", PropertyType.STRING), + NumFilesAtLevel1("num-files-at-level1", PropertyType.STRING), + NumFilesAtLevel2("num-files-at-level2", PropertyType.STRING), + NumFilesAtLevel3("num-files-at-level3", PropertyType.STRING), + NumFilesAtLevel4("num-files-at-level4", PropertyType.STRING), + NumFilesAtLevel5("num-files-at-level5", PropertyType.STRING), + NumFilesAtLevel6("num-files-at-level6", PropertyType.STRING); + + private static final String ROCKS_DB_PROPERTY_FORMAT = "rocksdb.%s"; + + private static final String CONFIG_KEY_FORMAT = "state.backend.forst.metrics.%s"; + + private final String property; + + private final PropertyType type; + + /** Property type. */ + private enum PropertyType { + NUMBER, + STRING + } + + ForStProperty(String property, PropertyType type) { + this.property = property; + this.type = type; + } + + /** + * @return property string that can be used to query {@link + * RocksDB#getLongProperty(ColumnFamilyHandle, String)}. + */ + public String getForStProperty() { + return String.format(ROCKS_DB_PROPERTY_FORMAT, property); + } + + public long getNumericalPropertyValue(RocksDB rocksDB, ColumnFamilyHandle handle) + throws Exception { + String forStProperty = getForStProperty(); + switch (type) { + case NUMBER: + return rocksDB.getLongProperty(handle, forStProperty); + case STRING: + return Long.parseLong(rocksDB.getProperty(handle, forStProperty)); + default: + throw new RuntimeException( + String.format("ForSt property type: %s not supported", type)); + } + } + + /** + * @return key for enabling metric using {@link org.apache.flink.configuration.Configuration}. + */ + public String getConfigKey() { + return String.format(CONFIG_KEY_FORMAT, property); + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStExtension.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStExtension.java new file mode 100644 index 00000000000..a6a2bfa9a22 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStExtension.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.IOUtils; + +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.rules.TemporaryFolder; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.InfoLogLevel; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.Statistics; +import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +/** External extension for tests that require an instance of ForSt. */ +public class ForStExtension implements BeforeEachCallback, AfterEachCallback { + private static final Logger LOG = LoggerFactory.getLogger(ForStExtension.class); + + /** Factory for {@link DBOptions} and {@link ColumnFamilyOptions}. */ + private final ForStOptionsFactory optionsFactory; + + private final boolean enableStatistics; + + /** Temporary folder that provides the working directory for the ForSt instance. */ + private TemporaryFolder temporaryFolder; + + /** The options for the ForSt instance. */ + private DBOptions dbOptions; + + /** The options for column families created with the ForSt instance. */ + private ColumnFamilyOptions columnFamilyOptions; + + /** The options for writes. */ + private WriteOptions writeOptions; + + /** The options for reads. */ + private ReadOptions readOptions; + + /** The ForSt instance object. */ + private RocksDB db; + + /** List of all column families that have been created with the ForSt instance. */ + private List<ColumnFamilyHandle> columnFamilyHandles; + + /** Resources to close. */ + private final ArrayList<AutoCloseable> handlesToClose = new ArrayList<>(); + + public ForStExtension() { + this(false); + } + + public ForStExtension(boolean enableStatistics) { + this( + new ForStOptionsFactory() { + private static final long serialVersionUID = 1L; + + @Override + public DBOptions createDBOptions( + DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) { + // close it before reuse the reference. + try { + currentOptions.close(); + } catch (Exception e) { + LOG.error("Close previous DBOptions's instance failed.", e); + } + + return new DBOptions() + .setMaxBackgroundJobs(4) + .setUseFsync(false) + .setMaxOpenFiles(-1) + .setInfoLogLevel(InfoLogLevel.HEADER_LEVEL) + .setStatsDumpPeriodSec(0); + } + + @Override + public ColumnFamilyOptions createColumnOptions( + ColumnFamilyOptions currentOptions, + Collection<AutoCloseable> handlesToClose) { + // close it before reuse the reference. + try { + currentOptions.close(); + } catch (Exception e) { + LOG.error("Close previous ColumnOptions's instance failed.", e); + } + + return new ColumnFamilyOptions().optimizeForPointLookup(40960); + } + }, + enableStatistics); + } + + public ForStExtension(@Nonnull ForStOptionsFactory optionsFactory, boolean enableStatistics) { + this.optionsFactory = optionsFactory; + this.enableStatistics = enableStatistics; + } + + public ColumnFamilyHandle getDefaultColumnFamily() { + return columnFamilyHandles.get(0); + } + + public RocksDB getDB() { + return db; + } + + public DBOptions getDbOptions() { + return dbOptions; + } + + /** Creates and returns a new column family with the given name. */ + public ColumnFamilyHandle createNewColumnFamily(String name) { + try { + final ColumnFamilyHandle columnFamily = + db.createColumnFamily( + new ColumnFamilyDescriptor(name.getBytes(), columnFamilyOptions)); + columnFamilyHandles.add(columnFamily); + return columnFamily; + } catch (Exception ex) { + throw new FlinkRuntimeException("Could not create column family.", ex); + } + } + + public void before() throws Exception { + this.temporaryFolder = new TemporaryFolder(); + this.temporaryFolder.create(); + final File rocksFolder = temporaryFolder.newFolder(); + this.dbOptions = + optionsFactory + .createDBOptions( + new DBOptions() + .setUseFsync(false) + .setInfoLogLevel(InfoLogLevel.HEADER_LEVEL) + .setStatsDumpPeriodSec(0), + handlesToClose) + .setCreateIfMissing(true); + if (enableStatistics) { + Statistics statistics = new Statistics(); + dbOptions.setStatistics(statistics); + handlesToClose.add(statistics); + } + this.columnFamilyOptions = + optionsFactory.createColumnOptions(new ColumnFamilyOptions(), handlesToClose); + this.writeOptions = new WriteOptions(); + this.writeOptions.disableWAL(); + this.readOptions = new ReadOptions(); + this.columnFamilyHandles = new ArrayList<>(1); + this.db = + RocksDB.open( + dbOptions, + rocksFolder.getAbsolutePath(), + Collections.singletonList( + new ColumnFamilyDescriptor( + "default".getBytes(), columnFamilyOptions)), + columnFamilyHandles); + } + + public void after() { + // destruct in reversed order of creation. + for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { + IOUtils.closeQuietly(columnFamilyHandle); + } + IOUtils.closeQuietly(this.db); + IOUtils.closeQuietly(this.readOptions); + IOUtils.closeQuietly(this.writeOptions); + IOUtils.closeQuietly(this.columnFamilyOptions); + IOUtils.closeQuietly(this.dbOptions); + handlesToClose.forEach(IOUtils::closeQuietly); + temporaryFolder.delete(); + } + + @Override + public void beforeEach(ExtensionContext context) throws Exception { + before(); + } + + @Override + public void afterEach(ExtensionContext context) { + after(); + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStNativeMetricMonitorTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStNativeMetricMonitorTest.java new file mode 100644 index 00000000000..b1a2716e8a3 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStNativeMetricMonitorTest.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.metrics.Metric; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; +import org.apache.flink.runtime.metrics.groups.GenericMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.metrics.scope.ScopeFormats; +import org.apache.flink.traces.SpanBuilder; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDBException; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** validate native metric monitor. */ +class ForStNativeMetricMonitorTest { + + private static final String OPERATOR_NAME = "dummy"; + + private static final String COLUMN_FAMILY_NAME = "column-family"; + + @RegisterExtension public ForStExtension forStExtension = new ForStExtension(true); + + @Test + void testMetricMonitorLifecycle() throws Throwable { + // We use a local variable here to manually control the life-cycle. + // This allows us to verify that metrics do not try to access + // RocksDB after the monitor was closed. + ForStExtension localForStExtension = new ForStExtension(true); + localForStExtension.before(); + + SimpleMetricRegistry registry = new SimpleMetricRegistry(); + GenericMetricGroup group = + new GenericMetricGroup( + registry, + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), + OPERATOR_NAME); + + ForStNativeMetricOptions options = new ForStNativeMetricOptions(); + // always returns a non-zero + // value since empty memtables + // have overhead. + options.enableSizeAllMemTables(); + options.enableNativeStatistics(ForStNativeMetricOptions.MONITOR_BYTES_WRITTEN); + + ForStNativeMetricMonitor monitor = + new ForStNativeMetricMonitor( + options, + group, + localForStExtension.getDB(), + localForStExtension.getDbOptions().statistics()); + + ColumnFamilyHandle handle = localForStExtension.createNewColumnFamily(COLUMN_FAMILY_NAME); + monitor.registerColumnFamily(COLUMN_FAMILY_NAME, handle); + + assertThat(registry.propertyMetrics) + .withFailMessage("Failed to register metrics for column family") + .hasSize(1); + + // write something to ensure the bytes-written is not zero. + localForStExtension.getDB().put(new byte[4], new byte[10]); + + for (ForStNativeMetricMonitor.ForStNativePropertyMetricView view : + registry.propertyMetrics) { + view.update(); + assertThat(view.getValue()) + .withFailMessage("Failed to pull metric from ForSt") + .isNotEqualTo(BigInteger.ZERO); + view.setValue(0L); + } + + for (ForStNativeMetricMonitor.ForStNativeStatisticsMetricView view : + registry.statisticsMetrics) { + view.update(); + assertThat(view.getValue()).isNotZero(); + view.setValue(0L); + } + + // After the monitor is closed no metric should be accessing RocksDB anymore. + // If they do, then this test will likely fail with a segmentation fault. + monitor.close(); + + localForStExtension.after(); + + for (ForStNativeMetricMonitor.ForStNativePropertyMetricView view : + registry.propertyMetrics) { + view.update(); + assertThat(view.getValue()) + .withFailMessage("Failed to release ForSt reference") + .isEqualTo(BigInteger.ZERO); + } + + for (ForStNativeMetricMonitor.ForStNativeStatisticsMetricView view : + registry.statisticsMetrics) { + view.update(); + assertThat(view.getValue()).isZero(); + } + } + + @Test + void testReturnsUnsigned() throws Throwable { + ForStExtension localForStExtension = new ForStExtension(); + localForStExtension.before(); + + SimpleMetricRegistry registry = new SimpleMetricRegistry(); + GenericMetricGroup group = + new GenericMetricGroup( + registry, + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), + OPERATOR_NAME); + + ForStNativeMetricOptions options = new ForStNativeMetricOptions(); + options.enableSizeAllMemTables(); + + ForStNativeMetricMonitor monitor = + new ForStNativeMetricMonitor( + options, + group, + localForStExtension.getDB(), + localForStExtension.getDbOptions().statistics()); + + ColumnFamilyHandle handle = forStExtension.createNewColumnFamily(COLUMN_FAMILY_NAME); + monitor.registerColumnFamily(COLUMN_FAMILY_NAME, handle); + ForStNativeMetricMonitor.ForStNativePropertyMetricView view = + registry.propertyMetrics.get(0); + + view.setValue(-1); + BigInteger result = view.getValue(); + + localForStExtension.after(); + + assertThat(result.signum()) + .withFailMessage("Failed to interpret ForSt result as an unsigned long") + .isOne(); + } + + @Test + void testClosedGaugesDontRead() throws RocksDBException { + SimpleMetricRegistry registry = new SimpleMetricRegistry(); + GenericMetricGroup group = + new GenericMetricGroup( + registry, + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), + OPERATOR_NAME); + + ForStNativeMetricOptions options = new ForStNativeMetricOptions(); + options.enableSizeAllMemTables(); + options.enableNativeStatistics(ForStNativeMetricOptions.MONITOR_BLOCK_CACHE_HIT); + + ForStNativeMetricMonitor monitor = + new ForStNativeMetricMonitor( + options, + group, + forStExtension.getDB(), + forStExtension.getDbOptions().statistics()); + + ColumnFamilyHandle handle = forStExtension.createNewColumnFamily(COLUMN_FAMILY_NAME); + monitor.registerColumnFamily(COLUMN_FAMILY_NAME, handle); + + forStExtension.getDB().put(new byte[4], new byte[10]); + + for (ForStNativeMetricMonitor.ForStNativePropertyMetricView view : + registry.propertyMetrics) { + view.close(); + view.update(); + assertThat(view.getValue()) + .withFailMessage("Closed gauge still queried ForSt") + .isEqualTo(BigInteger.ZERO); + } + + for (ForStNativeMetricMonitor.ForStNativeStatisticsMetricView view : + registry.statisticsMetrics) { + view.close(); + view.update(); + assertThat(view.getValue()) + .withFailMessage("Closed gauge still queried ForSt") + .isZero(); + } + } + + static class SimpleMetricRegistry implements MetricRegistry { + List<ForStNativeMetricMonitor.ForStNativePropertyMetricView> propertyMetrics = + new ArrayList<>(); + + List<ForStNativeMetricMonitor.ForStNativeStatisticsMetricView> statisticsMetrics = + new ArrayList<>(); + + @Override + public char getDelimiter() { + return 0; + } + + @Override + public int getNumberReporters() { + return 0; + } + + @Override + public void addSpan(SpanBuilder spanBuilder) {} + + @Override + public void register(Metric metric, String metricName, AbstractMetricGroup group) { + if (metric instanceof ForStNativeMetricMonitor.ForStNativePropertyMetricView) { + propertyMetrics.add( + (ForStNativeMetricMonitor.ForStNativePropertyMetricView) metric); + } else if (metric instanceof ForStNativeMetricMonitor.ForStNativeStatisticsMetricView) { + statisticsMetrics.add( + (ForStNativeMetricMonitor.ForStNativeStatisticsMetricView) metric); + } + } + + @Override + public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {} + + @Override + public ScopeFormats getScopeFormats() { + Configuration config = new Configuration(); + + config.set(MetricOptions.SCOPE_NAMING_TM, "A"); + config.set(MetricOptions.SCOPE_NAMING_TM_JOB, "B"); + config.set(MetricOptions.SCOPE_NAMING_TASK, "C"); + config.set(MetricOptions.SCOPE_NAMING_OPERATOR, "D"); + + return ScopeFormats.fromConfig(config); + } + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStNativeMetricOptionsTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStNativeMetricOptionsTest.java new file mode 100644 index 00000000000..1c8a3bddb61 --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStNativeMetricOptionsTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.apache.flink.configuration.Configuration; + +import org.junit.Assert; +import org.junit.Test; + +/** Test all native metrics can be set using configuration. */ +public class ForStNativeMetricOptionsTest { + @Test + public void testNativeMetricsConfigurable() { + for (ForStProperty property : ForStProperty.values()) { + Configuration config = new Configuration(); + if (property.getConfigKey().contains("num-files-at-level")) { + config.set(ForStNativeMetricOptions.MONITOR_NUM_FILES_AT_LEVEL, true); + } else { + config.setBoolean(property.getConfigKey(), true); + } + + ForStNativeMetricOptions options = ForStNativeMetricOptions.fromConfig(config); + + Assert.assertTrue( + String.format( + "Failed to enable native metrics with property %s", + property.getConfigKey()), + options.isEnabled()); + + Assert.assertTrue( + String.format( + "Failed to enable native metric %s using config", + property.getConfigKey()), + options.getProperties().contains(property)); + } + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStPropertyTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStPropertyTest.java new file mode 100644 index 00000000000..56e577b803f --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStPropertyTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.RocksDB; + +/** Validate ForSt properties. */ +class ForStPropertyTest { + + @RegisterExtension public ForStExtension forStExtension = new ForStExtension(); + + @Test + void testForStPropertiesValid() { + RocksDB db = forStExtension.getDB(); + ColumnFamilyHandle handle = forStExtension.getDefaultColumnFamily(); + + for (ForStProperty forStProperty : ForStProperty.values()) { + try { + forStProperty.getNumericalPropertyValue(db, handle); + } catch (Exception e) { + throw new AssertionError( + String.format( + "Invalid ForSt property %s", forStProperty.getForStProperty()), + e); + } + } + } +}