[ 
https://issues.apache.org/jira/browse/FLINK-10423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16656477#comment-16656477
 ] 

ASF GitHub Bot commented on FLINK-10423:
----------------------------------------

zentol closed pull request #6814: [FLINK-10423][rocksdb][metrics] rocksdb 
native metrics monitor
URL: https://github.com/apache/flink/pull/6814
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/rocks_db_native_metric_configuration.html 
b/docs/_includes/generated/rocks_db_native_metric_configuration.html
new file mode 100644
index 00000000000..78fc523b35b
--- /dev/null
+++ b/docs/_includes/generated/rocks_db_native_metric_configuration.html
@@ -0,0 +1,116 @@
+<table class="table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 65%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            
<td><h5>state.backend.rocksdb.metrics.actual-delayed-write-rate</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Monitor the current actual delayed write rate. 0 means no 
delay.</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.metrics.background-errors</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Monitor the number of background errors in RocksDB.</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.metrics.compaction-pending</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Track pending compactions in RocksDB. Returns 1 if a 
compaction is pending, 0 otherwise.</td>
+        </tr>
+        <tr>
+            
<td><h5>state.backend.rocksdb.metrics.cur-size-active-mem-table</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Monitor the approximate size of the active memtable in 
bytes.</td>
+        </tr>
+        <tr>
+            
<td><h5>state.backend.rocksdb.metrics.cur-size-all-mem-tables</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Monitor the approximate size of the active and unflushed 
immutable memtables in bytes.</td>
+        </tr>
+        <tr>
+            
<td><h5>state.backend.rocksdb.metrics.estimate-live-data-size</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Estimate of the amount of live data in bytes.</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.metrics.estimate-num-keys</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Estimate the number of keys in RocksDB.</td>
+        </tr>
+        <tr>
+            
<td><h5>state.backend.rocksdb.metrics.estimate-pending-compaction-bytes</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Estimated total number of bytes compaction needs to rewrite to 
get all levels down to under target size. Not valid for other compactions than 
level-based.</td>
+        </tr>
+        <tr>
+            
<td><h5>state.backend.rocksdb.metrics.estimate-table-readers-mem</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Estimate the memory used for reading SST tables, excluding 
memory used in block cache (e.g.,filter and index blocks) in bytes.</td>
+        </tr>
+        <tr>
+            
<td><h5>state.backend.rocksdb.metrics.mem-table-flush-pending</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Monitor the number of pending memtable flushes in RocksDB.</td>
+        </tr>
+        <tr>
+            
<td><h5>state.backend.rocksdb.metrics.num-deletes-active-mem-table</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Monitor the total number of delete entries in the active 
memtable.</td>
+        </tr>
+        <tr>
+            
<td><h5>state.backend.rocksdb.metrics.num-deletes-imm-mem-tables</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Monitor the total number of delete entries in the unflushed 
immutable memtables.</td>
+        </tr>
+        <tr>
+            
<td><h5>state.backend.rocksdb.metrics.num-entries-active-mem-table</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Monitor the total number of entries in the active 
memtable.</td>
+        </tr>
+        <tr>
+            
<td><h5>state.backend.rocksdb.metrics.num-entries-imm-mem-tables</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Monitor the total number of entries in the unflushed immutable 
memtables.</td>
+        </tr>
+        <tr>
+            
<td><h5>state.backend.rocksdb.metrics.num-immutable-mem-table</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Monitor the number of immutable memtables in RocksDB.</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.metrics.num-live-versions</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Monitor number of live versions. Version is an internal data 
structure. See RocksDB file version_set.h for details. More live versions often 
mean more SST files are held from being deleted, by iterators or unfinished 
compactions.</td>
+        </tr>
+        <tr>
+            
<td><h5>state.backend.rocksdb.metrics.num-running-compactions</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Monitor the number of currently running compactions.</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.metrics.num-running-flushes</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Monitor the number of currently running flushes.</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.metrics.num-snapshots</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Monitor the number of unreleased snapshots of the 
database.</td>
+        </tr>
+        <tr>
+            <td><h5>state.backend.rocksdb.metrics.size-all-mem-tables</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Monitor the approximate size of the active, unflushed 
immutable, and pinned immutable memtables in bytes.</td>
+        </tr>
+        <tr>
+            
<td><h5>state.backend.rocksdb.metrics.total-sst-files-size</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Monitor the total size (bytes) of all SST files.WARNING: may 
slow down online queries if there are too many files.</td>
+        </tr>
+    </tbody>
+</table>
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 85c60a67a22..04efdef26fc 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1205,6 +1205,9 @@ Thus, in order to infer the metric identifier:
   </tbody>
 </table>
 
+### RocksDB
+Certain RocksDB native metrics are available but disabled by default, you can 
find full documentation [here]({{ site.baseurl 
}}/ops/config.html#rocksdb-native-metrics)
+
 ### IO
 <table class="table table-bordered">
   <thead>
diff --git a/docs/ops/config.md b/docs/ops/config.md
index d370e56d695..267b670d15a 100644
--- a/docs/ops/config.md
+++ b/docs/ops/config.md
@@ -162,6 +162,16 @@ The configuration keys in this section are independent of 
the used resource mana
 
 {% include generated/metric_configuration.html %}
 
+### RocksDB Native Metrics
+Certain RocksDB native metrics may be forwarded to Flink's metrics reporter.
+All native metrics are scoped to operators and then further broken down by 
column family; values are reported as unsigned longs. 
+
+<div class="alert alert-warning">
+  <strong>Note:</strong> Enabling native metrics may cause degraded 
performance and should be set carefully. 
+</div>
+
+{% include generated/rocks_db_native_metric_configuration.html %}
+
 ### History Server
 
 You have to configure `jobmanager.archive.fs.dir` in order to archive 
terminated jobs and add it to the list of monitored directories via 
`historyserver.archive.fs.dir` if you want to display them via the 
HistoryServer's web frontend.
diff --git a/docs/ops/state/state_backends.md b/docs/ops/state/state_backends.md
index 4e09b345e10..b95d2b7e4db 100644
--- a/docs/ops/state/state_backends.md
+++ b/docs/ops/state/state_backends.md
@@ -121,6 +121,8 @@ on-heap representation as the heap-based backends are doing.
 
 RocksDBStateBackend is currently the only backend that offers incremental 
checkpoints (see [here](large_state_tuning.html)). 
 
+Certain RocksDB native metrics are available but disabled by default, you can 
find full documentation [here]({{ site.baseurl 
}}/ops/config.html#rocksdb-native-metrics)
+
 ## Configuring a State Backend
 
 The default state backend, if you specify nothing, is the jobmanager. If you 
wish to establish a different default for all jobs on your cluster, you can do 
so by defining a new default state backend in **flink-conf.yaml**. The default 
state backend can be overridden on a per-job basis, as shown below.
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
index 176e55a7d8c..fff8f02285f 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
@@ -25,7 +25,9 @@
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.contrib.streaming.state.PredefinedOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.queryablestate.client.VoidNamespace;
 import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -82,7 +84,9 @@ public void testListSerialization() throws Exception {
                                false,
                                TestLocalRecoveryConfig.disabled(),
                                RocksDBStateBackend.PriorityQueueStateType.HEAP,
-                               TtlTimeProvider.DEFAULT
+                               TtlTimeProvider.DEFAULT,
+                               new RocksDBNativeMetricOptions(),
+                               new UnregisteredMetricsGroup()
                        );
                longHeapKeyedStateBackend.restore(null);
                longHeapKeyedStateBackend.setCurrentKey(key);
@@ -124,7 +128,10 @@ public void testMapSerialization() throws Exception {
                                false,
                                TestLocalRecoveryConfig.disabled(),
                                RocksDBStateBackend.PriorityQueueStateType.HEAP,
-                               TtlTimeProvider.DEFAULT);
+                               TtlTimeProvider.DEFAULT,
+                               new RocksDBNativeMetricOptions(),
+                               new UnregisteredMetricsGroup()
+                       );
                longHeapKeyedStateBackend.restore(null);
                longHeapKeyedStateBackend.setCurrentKey(key);
 
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index adcf3aefb7c..bfadb75cfe6 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -26,6 +26,7 @@
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.queryablestate.client.VoidNamespace;
 import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
@@ -763,6 +764,7 @@ public void notifyKvStateUnregistered(JobID jobId,
                        numKeyGroups,
                        new KeyGroupRange(0, 0),
                        registry.createTaskRegistry(dummyEnv.getJobID(), 
dummyEnv.getJobVertexId()),
-                       TtlTimeProvider.DEFAULT);
+                       TtlTimeProvider.DEFAULT,
+                       new UnregisteredMetricsGroup());
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index d397a88c214..6c72acd888e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -21,6 +21,7 @@
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
@@ -50,7 +51,8 @@
                int numberOfKeyGroups,
                KeyGroupRange keyGroupRange,
                TaskKvStateRegistry kvStateRegistry,
-               TtlTimeProvider ttlTimeProvider) throws IOException;
+               TtlTimeProvider ttlTimeProvider,
+               MetricGroup metricGroup) throws IOException;
 
        @Override
        public abstract OperatorStateBackend createOperatorStateBackend(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
index 2775b71ac72..4235f762ff1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -21,6 +21,8 @@
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
@@ -144,7 +146,42 @@
                        numberOfKeyGroups,
                        keyGroupRange,
                        kvStateRegistry,
-                       TtlTimeProvider.DEFAULT);
+                       TtlTimeProvider.DEFAULT
+               );
+       }
+
+       /**
+        * Creates a new {@link AbstractKeyedStateBackend} that is responsible 
for holding <b>keyed state</b>
+        * and checkpointing it.
+        *
+        * <p><i>Keyed State</i> is state where each value is bound to a key.
+        *
+        * @param <K> The type of the keys by which the state is organized.
+        *
+        * @return The Keyed State Backend for the given job, operator, and key 
group range.
+        *
+        * @throws Exception This method may forward all exceptions that occur 
while instantiating the backend.
+        */
+       default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+               Environment env,
+               JobID jobID,
+               String operatorIdentifier,
+               TypeSerializer<K> keySerializer,
+               int numberOfKeyGroups,
+               KeyGroupRange keyGroupRange,
+               TaskKvStateRegistry kvStateRegistry,
+               TtlTimeProvider ttlTimeProvider
+       ) throws Exception {
+               return createKeyedStateBackend(
+                       env,
+                       jobID,
+                       operatorIdentifier,
+                       keySerializer,
+                       numberOfKeyGroups,
+                       keyGroupRange,
+                       kvStateRegistry,
+                       ttlTimeProvider,
+                       new UnregisteredMetricsGroup());
        }
 
        /**
@@ -167,7 +204,8 @@
                int numberOfKeyGroups,
                KeyGroupRange keyGroupRange,
                TaskKvStateRegistry kvStateRegistry,
-               TtlTimeProvider ttlTimeProvider) throws Exception;
+               TtlTimeProvider ttlTimeProvider,
+               MetricGroup metricGroup) throws Exception;
        
        /**
         * Creates a new {@link OperatorStateBackend} that can be used for 
storing operator state.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index f5a86e1ac51..135030e88d6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -25,6 +25,7 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -456,7 +457,8 @@ public CheckpointStorage createCheckpointStorage(JobID 
jobId) throws IOException
                int numberOfKeyGroups,
                KeyGroupRange keyGroupRange,
                TaskKvStateRegistry kvStateRegistry,
-               TtlTimeProvider ttlTimeProvider) {
+               TtlTimeProvider ttlTimeProvider,
+               MetricGroup metricGroup) {
 
                TaskStateManager taskStateManager = env.getTaskStateManager();
                LocalRecoveryConfig localRecoveryConfig = 
taskStateManager.createLocalRecoveryConfig();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 1c464d7fa97..4cc3a2c7376 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -24,6 +24,7 @@
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -309,7 +310,8 @@ public OperatorStateBackend createOperatorStateBackend(
                        int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
                        TaskKvStateRegistry kvStateRegistry,
-                       TtlTimeProvider ttlTimeProvider) {
+                       TtlTimeProvider ttlTimeProvider,
+                       MetricGroup metricGroup) {
 
                TaskStateManager taskStateManager = env.getTaskStateManager();
                HeapPriorityQueueSetFactory priorityQueueSetFactory =
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index ffebc52f8a0..4b5ae06f217 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -23,6 +23,7 @@
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.execution.Environment;
@@ -170,7 +171,8 @@ public CheckpointStorage createCheckpointStorage(JobID 
jobId) throws IOException
                        int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
                        TaskKvStateRegistry kvStateRegistry,
-                       TtlTimeProvider ttlTimeProvider) throws Exception {
+                       TtlTimeProvider ttlTimeProvider,
+                       MetricGroup metricGroup) throws Exception {
                        throw new UnsupportedOperationException();
                }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
index ccfafec806f..f88e6d7d31b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
@@ -29,6 +29,7 @@
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -95,7 +96,8 @@
                int numberOfKeyGroups,
                KeyGroupRange keyGroupRange,
                ExecutionConfig executionConfig,
-               TtlTimeProvider ttlTimeProvider) {
+               TtlTimeProvider ttlTimeProvider,
+               MetricGroup operatorMetricGroup) {
                super(kvStateRegistry, keySerializer, userCodeClassLoader,
                        numberOfKeyGroups, keyGroupRange, executionConfig, 
ttlTimeProvider);
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
index a8d49dde3ff..8ed84c031a1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.query.KvStateRegistry;
@@ -93,7 +94,8 @@ public CheckpointStreamFactory 
resolveCheckpointStorageLocation(long checkpointI
                int numberOfKeyGroups,
                KeyGroupRange keyGroupRange,
                TaskKvStateRegistry kvStateRegistry,
-               TtlTimeProvider ttlTimeProvider) {
+               TtlTimeProvider ttlTimeProvider,
+               MetricGroup metricGroup) {
                return new MockKeyedStateBackend<>(
                        new KvStateRegistry().createTaskRegistry(jobID, new 
JobVertexID()),
                        keySerializer,
@@ -101,7 +103,8 @@ public CheckpointStreamFactory 
resolveCheckpointStorageLocation(long checkpointI
                        numberOfKeyGroups,
                        keyGroupRange,
                        env.getExecutionConfig(),
-                       ttlTimeProvider);
+                       ttlTimeProvider,
+                       metricGroup);
        }
 
        @Override
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
index 84db465f12c..32045804aab 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
@@ -69,4 +69,16 @@
         */
        ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions 
currentOptions);
 
+       /**
+        * This method should enable certain RocksDB metrics to be forwarded to
+        * Flink's metrics reporter.
+        *
+        * <p>Enabling these monitoring options may degrade RockDB performance
+        * and should be set with care.
+        * @param nativeMetricOptions The options object with the pre-defined 
options.
+        * @return The options object on which the additional options are set.
+        */
+       default RocksDBNativeMetricOptions 
createNativeMetricsOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
+               return nativeMetricOptions;
+       }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 42a1e26b8b5..c56bd20b580 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -47,6 +47,7 @@
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -241,6 +242,13 @@
        /** Shared wrapper for batch writes to the RocksDB instance. */
        private RocksDBWriteBatchWrapper writeBatchWrapper;
 
+       private final RocksDBNativeMetricOptions metricOptions;
+
+       private final MetricGroup metricGroup;
+
+       /** The native metrics monitor. */
+       private RocksDBNativeMetricMonitor nativeMetricMonitor;
+
        public RocksDBKeyedStateBackend(
                String operatorIdentifier,
                ClassLoader userCodeClassLoader,
@@ -255,7 +263,9 @@ public RocksDBKeyedStateBackend(
                boolean enableIncrementalCheckpointing,
                LocalRecoveryConfig localRecoveryConfig,
                RocksDBStateBackend.PriorityQueueStateType 
priorityQueueStateType,
-               TtlTimeProvider ttlTimeProvider
+               TtlTimeProvider ttlTimeProvider,
+               RocksDBNativeMetricOptions metricOptions,
+               MetricGroup metricGroup
        ) throws IOException {
 
                super(kvStateRegistry, keySerializer, userCodeClassLoader,
@@ -291,6 +301,9 @@ public RocksDBKeyedStateBackend(
 
                this.writeOptions = new WriteOptions().setDisableWAL(true);
 
+               this.metricOptions = metricOptions;
+               this.metricGroup = metricGroup;
+
                switch (priorityQueueStateType) {
                        case HEAP:
                                this.priorityQueueFactory = new 
HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
@@ -358,6 +371,14 @@ public ColumnFamilyHandle getColumnFamilyHandle(String 
state) {
                return columnInfo != null ? columnInfo.f0 : null;
        }
 
+       private void registerKvStateInformation(String columnFamilyName, 
Tuple2<ColumnFamilyHandle, RegisteredStateMetaInfoBase> registeredColumn) {
+               kvStateInformation.put(columnFamilyName, registeredColumn);
+
+               if (nativeMetricMonitor != null) {
+                       
nativeMetricMonitor.registerColumnFamily(columnFamilyName, registeredColumn.f0);
+               }
+       }
+
        /**
         * Should only be called by one thread, and only after all accesses to 
the DB happened.
         */
@@ -375,6 +396,13 @@ public void dispose() {
 
                        IOUtils.closeQuietly(writeBatchWrapper);
 
+                       // Metric collection occurs on a background thread. 
When this method returns
+                       // it is guaranteed that thr RocksDB reference has been 
invalidated
+                       // and no more metric collection will be attempted 
against the database.
+                       if (nativeMetricMonitor != null) {
+                               nativeMetricMonitor.close();
+                       }
+
                        // RocksDB's native memory management requires that 
*all* CFs (including default) are closed before the
                        // DB is closed. See:
                        // 
https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families
@@ -606,6 +634,14 @@ private RocksDB openDB(
                Preconditions.checkState(1 + 
stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
                        "Not all requested column family handles have been 
created");
 
+               if (this.metricOptions.isEnabled()) {
+                       this.nativeMetricMonitor = new 
RocksDBNativeMetricMonitor(
+                               dbRef,
+                               metricOptions,
+                               metricGroup
+                       );
+               }
+
                return dbRef;
        }
 
@@ -733,7 +769,7 @@ private void restoreKVStateMetaData() throws IOException, 
StateMigrationExceptio
                                        ColumnFamilyHandle columnFamily = 
rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
 
                                        registeredColumn = new 
Tuple2<>(columnFamily, stateMetaInfo);
-                                       
rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), 
registeredColumn);
+                                       
rocksDBKeyedStateBackend.registerKvStateInformation(stateMetaInfo.getName(), 
registeredColumn);
 
                                } else {
                                        // TODO with eager state registration 
in place, check here for serializer migration strategies
@@ -1051,7 +1087,7 @@ private ColumnFamilyHandle 
getOrRegisterColumnFamilyHandle(
                                                columnFamilyHandle != null ? 
columnFamilyHandle : stateBackend.db.createColumnFamily(columnFamilyDescriptor),
                                                stateMetaInfo);
 
-                               stateBackend.kvStateInformation.put(
+                               stateBackend.registerKvStateInformation(
                                        stateMetaInfoSnapshot.getName(),
                                        registeredStateMetaInfoEntry);
                        }
@@ -1180,7 +1216,7 @@ private void restoreLocalStateIntoFullInstance(
                                RegisteredStateMetaInfoBase stateMetaInfo =
                                        
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
 
-                               stateBackend.kvStateInformation.put(
+                               stateBackend.registerKvStateInformation(
                                        stateMetaInfoSnapshot.getName(),
                                        new Tuple2<>(columnFamilyHandle, 
stateMetaInfo));
                        }
@@ -1291,6 +1327,7 @@ private void transferAllDataFromStateHandles(
                                StreamStateHandle remoteFileHandle = 
entry.getValue();
                                copyStateDataHandleData(new 
Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle);
                        }
+
                }
 
                /**
@@ -1383,7 +1420,7 @@ private void copyStateDataHandleData(
                        ColumnFamilyHandle columnFamily = 
createColumnFamily(stateName);
 
                        stateInfo = Tuple2.of(columnFamily, newMetaInfo);
-                       kvStateInformation.put(stateDesc.getName(), stateInfo);
+                       registerKvStateInformation(stateDesc.getName(), 
stateInfo);
                }
 
                return Tuple2.of(stateInfo.f0, newMetaInfo);
@@ -1567,7 +1604,7 @@ public static RocksIteratorWrapper getRocksIterator(
                                new 
RegisteredPriorityQueueStateBackendMetaInfo<>(stateName, 
byteOrderedElementSerializer);
 
                        metaInfoTuple = new Tuple2<>(columnFamilyHandle, 
metaInfo);
-                       kvStateInformation.put(stateName, metaInfoTuple);
+                       registerKvStateInformation(stateName, metaInfoTuple);
                } else {
                        // TODO we implement the simple way of supporting the 
current functionality, mimicking keyed state
                        // because this should be reworked in FLINK-9376 and 
then we should have a common algorithm over
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
new file mode 100644
index 00000000000..4e3a526dc23
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitor.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.View;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.Closeable;
+import java.math.BigInteger;
+
+/**
+ * A monitor which pulls {{@link RocksDB}} native metrics
+ * and forwards them to Flink's metric group. All metrics are
+ * unsigned longs and are reported at the column family level.
+ */
+@Internal
+public class RocksDBNativeMetricMonitor implements Closeable {
+       private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBNativeMetricMonitor.class);
+
+       private final RocksDBNativeMetricOptions options;
+
+       private final MetricGroup metricGroup;
+
+       private final Object lock;
+
+       @GuardedBy("lock")
+       private RocksDB rocksDB;
+
+       RocksDBNativeMetricMonitor(
+               @Nonnull RocksDB rocksDB,
+               @Nonnull RocksDBNativeMetricOptions options,
+               @Nonnull MetricGroup metricGroup
+       ) {
+               this.options = options;
+               this.metricGroup = metricGroup;
+               this.rocksDB = rocksDB;
+
+               this.lock = new Object();
+       }
+
+       /**
+        * Register gauges to pull native metrics for the column family.
+        * @param columnFamilyName group name for the new gauges
+        * @param handle native handle to the column family
+        */
+       void registerColumnFamily(String columnFamilyName, ColumnFamilyHandle 
handle) {
+               MetricGroup group = metricGroup.addGroup(columnFamilyName);
+
+               for (String property : options.getProperties()) {
+                       RocksDBNativeMetricView gauge = new 
RocksDBNativeMetricView(handle, property);
+                       group.gauge(property, gauge);
+               }
+       }
+
+       /**
+        * Updates the value of metricView if the reference is still valid.
+        */
+       private void setProperty(ColumnFamilyHandle handle, String property, 
RocksDBNativeMetricView metricView) {
+               if (metricView.isClosed()) {
+                       return;
+               }
+               try {
+                       synchronized (lock) {
+                               if (rocksDB != null) {
+                                       long value = 
rocksDB.getLongProperty(handle, property);
+                                       metricView.setValue(value);
+                               }
+                       }
+               } catch (RocksDBException e) {
+                       metricView.close();
+                       LOG.warn("Failed to read native metric %s from 
RocksDB", property, e);
+               }
+       }
+
+       @Override
+       public void close() {
+               synchronized (lock) {
+                       rocksDB = null;
+               }
+       }
+
+       /**
+        * A gauge which periodically pulls a RocksDB native metric
+        * for the specified column family / metric pair.
+        *
+        *<p><strong>Note</strong>: As the returned property is of type
+        * {@code uint64_t} on C++ side the returning value can be negative.
+        * Because java does not support unsigned long types, this gauge
+        * wraps the result in a {@link BigInteger}.
+        */
+       class RocksDBNativeMetricView implements Gauge<BigInteger>, View {
+               private final String property;
+
+               private final ColumnFamilyHandle handle;
+
+               private BigInteger bigInteger;
+
+               private boolean closed;
+
+               private RocksDBNativeMetricView(
+                       @Nonnull ColumnFamilyHandle handle,
+                       @Nonnull String property
+               ) {
+                       this.handle = handle;
+                       this.property = property;
+                       this.bigInteger = BigInteger.ZERO;
+                       this.closed = false;
+               }
+
+               public void setValue(long value) {
+                       if (value >= 0L) {
+                               bigInteger = BigInteger.valueOf(value);
+                       } else {
+                               int upper = (int) (value >>> 32);
+                               int lower = (int) value;
+
+                               bigInteger = BigInteger
+                                       .valueOf(Integer.toUnsignedLong(upper))
+                                       .shiftLeft(32)
+                                       
.add(BigInteger.valueOf(Integer.toUnsignedLong(lower)));
+                       }
+               }
+
+               public void close() {
+                       closed = true;
+               }
+
+               public boolean isClosed() {
+                       return closed;
+               }
+
+               @Override
+               public BigInteger getValue() {
+                       return bigInteger;
+               }
+
+               @Override
+               public void update() {
+                       setProperty(handle, property, this);
+               }
+       }
+}
+
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptions.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptions.java
new file mode 100644
index 00000000000..f647e4c153f
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptions.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Enable which RocksDB metrics to forward to Flink's metrics reporter.
+ * All metrics report at the column family level and return unsigned long 
values.
+ *
+ * <p>Properties and doc comments are taken from RocksDB documentation. See
+ * <a 
href="https://github.com/facebook/rocksdb/blob/64324e329eb0a9b4e77241a425a1615ff524c7f1/include/rocksdb/db.h#L429";>
+ * db.h</a> for more information.
+ */
+public class RocksDBNativeMetricOptions implements Serializable {
+
+       public static final ConfigOption<Boolean> 
MONITOR_NUM_IMMUTABLE_MEM_TABLES = ConfigOptions
+               .key(RocksDBProperty.NumImmutableMemTable.getConfigKey())
+               .defaultValue(false)
+               .withDescription("Monitor the number of immutable memtables in 
RocksDB.");
+
+       public static final ConfigOption<Boolean> 
MONITOR_MEM_TABLE_FLUSH_PENDING = ConfigOptions
+               .key(RocksDBProperty.MemTableFlushPending.getConfigKey())
+               .defaultValue(false)
+               .withDescription("Monitor the number of pending memtable 
flushes in RocksDB.");
+
+       public static final ConfigOption<Boolean> TRACK_COMPACTION_PENDING = 
ConfigOptions
+               .key(RocksDBProperty.CompactionPending.getConfigKey())
+               .defaultValue(false)
+               .withDescription("Track pending compactions in RocksDB. Returns 
1 if a compaction is pending, 0 otherwise.");
+
+       public static final ConfigOption<Boolean> MONITOR_BACKGROUND_ERRORS = 
ConfigOptions
+               .key(RocksDBProperty.BackgroundErrors.getConfigKey())
+               .defaultValue(false)
+               .withDescription("Monitor the number of background errors in 
RocksDB.");
+
+       public static final ConfigOption<Boolean> 
MONITOR_CUR_SIZE_ACTIVE_MEM_TABLE = ConfigOptions
+               .key(RocksDBProperty.CurSizeActiveMemTable.getConfigKey())
+               .defaultValue(false)
+               .withDescription("Monitor the approximate size of the active 
memtable in bytes.");
+
+       public static final ConfigOption<Boolean> 
MONITOR_CUR_SIZE_ALL_MEM_TABLE = ConfigOptions
+               .key(RocksDBProperty.CurSizeAllMemTables.getConfigKey())
+               .defaultValue(false)
+               .withDescription("Monitor the approximate size of the active 
and unflushed immutable memtables" +
+                       " in bytes.");
+
+       public static final ConfigOption<Boolean> MONITOR_SIZE_ALL_MEM_TABLES = 
ConfigOptions
+               .key(RocksDBProperty.SizeAllMemTables.getConfigKey())
+               .defaultValue(false)
+               .withDescription("Monitor the approximate size of the active, 
unflushed immutable, " +
+                       "and pinned immutable memtables in bytes.");
+
+       public static final ConfigOption<Boolean> 
MONITOR_NUM_ENTRIES_ACTIVE_MEM_TABLE = ConfigOptions
+               .key(RocksDBProperty.NumEntriesActiveMemTable.getConfigKey())
+               .defaultValue(false)
+               .withDescription("Monitor the total number of entries in the 
active memtable.");
+
+       public static final ConfigOption<Boolean> 
MONITOR_NUM_ENTRIES_IMM_MEM_TABLES = ConfigOptions
+               .key(RocksDBProperty.NumEntriesImmMemTables.getConfigKey())
+               .defaultValue(false)
+               .withDescription("Monitor the total number of entries in the 
unflushed immutable memtables.");
+
+       public static final ConfigOption<Boolean> 
MONITOR_NUM_DELETES_ACTIVE_MEM_TABLE = ConfigOptions
+               .key(RocksDBProperty.NumDeletesActiveMemTable.getConfigKey())
+               .defaultValue(false)
+               .withDescription("Monitor the total number of delete entries in 
the active memtable.");
+
+       public static final ConfigOption<Boolean> 
MONITOR_NUM_DELETES_IMM_MEM_TABLE = ConfigOptions
+               .key(RocksDBProperty.NumDeletesImmMemTables.getConfigKey())
+               .defaultValue(false)
+               .withDescription("Monitor the total number of delete entries in 
the unflushed immutable memtables.");
+
+       public static final ConfigOption<Boolean> ESTIMATE_NUM_KEYS = 
ConfigOptions
+               .key(RocksDBProperty.EstimateNumKeys.getConfigKey())
+               .defaultValue(false)
+               .withDescription("Estimate the number of keys in RocksDB.");
+
+       public static final ConfigOption<Boolean> ESTIMATE_TABLE_READERS_MEM = 
ConfigOptions
+               .key(RocksDBProperty.EstimateTableReadersMem.getConfigKey())
+               .defaultValue(false)
+               .withDescription("Estimate the memory used for reading SST 
tables, excluding memory" +
+                       " used in block cache (e.g.,filter and index blocks) in 
bytes.");
+
+       public static final ConfigOption<Boolean> MONITOR_NUM_SNAPSHOTS = 
ConfigOptions
+               .key(RocksDBProperty.NumSnapshots.getConfigKey())
+               .defaultValue(false)
+               .withDescription("Monitor the number of unreleased snapshots of 
the database.");
+
+       public static final ConfigOption<Boolean> MONITOR_NUM_LIVE_VERSIONS = 
ConfigOptions
+               .key(RocksDBProperty.NumLiveVersions.getConfigKey())
+               .defaultValue(false)
+               .withDescription("Monitor number of live versions. Version is 
an internal data structure. " +
+                       "See RocksDB file version_set.h for details. More live 
versions often mean more SST files are held " +
+                       "from being deleted, by iterators or unfinished 
compactions.");
+
+       public static final ConfigOption<Boolean> ESTIMATE_LIVE_DATA_SIZE = 
ConfigOptions
+               .key(RocksDBProperty.EstimateLiveDataSize.getConfigKey())
+               .defaultValue(false)
+               .withDescription("Estimate of the amount of live data in 
bytes.");
+
+       public static final ConfigOption<Boolean> MONITOR_TOTAL_SST_FILES_SIZE 
= ConfigOptions
+               .key(RocksDBProperty.TotalSstFilesSize.getConfigKey())
+               .defaultValue(false)
+               .withDescription("Monitor the total size (bytes) of all SST 
files." +
+                       "WARNING: may slow down online queries if there are too 
many files.");
+
+       public static final ConfigOption<Boolean> 
ESTIMATE_PENDING_COMPACTION_BYTES = ConfigOptions
+               
.key(RocksDBProperty.EstimatePendingCompactionBytes.getConfigKey())
+               .defaultValue(false)
+               .withDescription("Estimated total number of bytes compaction 
needs to rewrite to get all levels " +
+                       "down to under target size. Not valid for other 
compactions than level-based.");
+
+       public static final ConfigOption<Boolean> 
MONITOR_NUM_RUNNING_COMPACTIONS = ConfigOptions
+               .key(RocksDBProperty.NumRunningCompactions.getConfigKey())
+               .defaultValue(false)
+               .withDescription("Monitor the number of currently running 
compactions.");
+
+       public static final ConfigOption<Boolean> MONITOR_NUM_RUNNING_FLUSHES = 
ConfigOptions
+               .key(RocksDBProperty.NumRunningFlushes.getConfigKey())
+               .defaultValue(false)
+               .withDescription("Monitor the number of currently running 
flushes.");
+
+       public static final ConfigOption<Boolean> 
MONITOR_ACTUAL_DELAYED_WRITE_RATE = ConfigOptions
+               .key(RocksDBProperty.ActualDelayedWriteRate.getConfigKey())
+               .defaultValue(false)
+               .withDescription("Monitor the current actual delayed write 
rate. 0 means no delay.");
+       /**
+        * Creates a {@link RocksDBNativeMetricOptions} based on an
+        * external configuration.
+        */
+       public static RocksDBNativeMetricOptions fromConfig(Configuration 
config) {
+               RocksDBNativeMetricOptions options = new 
RocksDBNativeMetricOptions();
+               if (config.getBoolean(MONITOR_NUM_IMMUTABLE_MEM_TABLES)) {
+                       options.enableNumImmutableMemTable();
+               }
+
+               if (config.getBoolean(MONITOR_MEM_TABLE_FLUSH_PENDING)) {
+                       options.enableMemTableFlushPending();
+               }
+
+               if (config.getBoolean(TRACK_COMPACTION_PENDING)) {
+                       options.enableCompactionPending();
+               }
+
+               if (config.getBoolean(MONITOR_BACKGROUND_ERRORS)) {
+                       options.enableBackgroundErrors();
+               }
+
+               if (config.getBoolean(MONITOR_CUR_SIZE_ACTIVE_MEM_TABLE)) {
+                       options.enableCurSizeActiveMemTable();
+               }
+
+               if (config.getBoolean(MONITOR_CUR_SIZE_ALL_MEM_TABLE)) {
+                       options.enableCurSizeAllMemTables();
+               }
+
+               if (config.getBoolean(MONITOR_SIZE_ALL_MEM_TABLES)) {
+                       options.enableSizeAllMemTables();
+               }
+
+               if (config.getBoolean(MONITOR_NUM_ENTRIES_ACTIVE_MEM_TABLE)) {
+                       options.enableNumEntriesActiveMemTable();
+               }
+
+               if (config.getBoolean(MONITOR_NUM_ENTRIES_IMM_MEM_TABLES)) {
+                       options.enableNumEntriesImmMemTables();
+               }
+
+               if (config.getBoolean(MONITOR_NUM_DELETES_ACTIVE_MEM_TABLE)) {
+                       options.enableNumDeletesActiveMemTable();
+               }
+
+               if (config.getBoolean(MONITOR_NUM_DELETES_IMM_MEM_TABLE)) {
+                       options.enableNumDeletesImmMemTables();
+               }
+
+               if (config.getBoolean(ESTIMATE_NUM_KEYS)) {
+                       options.enableEstimateNumKeys();
+               }
+
+               if (config.getBoolean(ESTIMATE_TABLE_READERS_MEM)) {
+                       options.enableEstimateTableReadersMem();
+               }
+
+               if (config.getBoolean(MONITOR_NUM_SNAPSHOTS)) {
+                       options.enableNumSnapshots();
+               }
+
+               if (config.getBoolean(MONITOR_NUM_LIVE_VERSIONS)) {
+                       options.enableNumLiveVersions();
+               }
+
+               if (config.getBoolean(ESTIMATE_LIVE_DATA_SIZE)) {
+                       options.enableEstimateLiveDataSize();
+               }
+
+               if (config.getBoolean(MONITOR_TOTAL_SST_FILES_SIZE)) {
+                       options.enableTotalSstFilesSize();
+               }
+
+               if (config.getBoolean(ESTIMATE_PENDING_COMPACTION_BYTES)) {
+                       options.enableEstimatePendingCompactionBytes();
+               }
+
+               if (config.getBoolean(MONITOR_NUM_RUNNING_COMPACTIONS)) {
+                       options.enableNumRunningCompactions();
+               }
+
+               if (config.getBoolean(MONITOR_NUM_RUNNING_FLUSHES)) {
+                       options.enableNumRunningFlushes();
+               }
+
+               if (config.getBoolean(MONITOR_ACTUAL_DELAYED_WRITE_RATE)) {
+                       options.enableActualDelayedWriteRate();
+               }
+
+               return options;
+       }
+
+       private Set<String> properties;
+
+       public RocksDBNativeMetricOptions() {
+               this.properties = new HashSet<>();
+       }
+
+       /**
+        * Returns number of immutable memtables that have not yet been flushed.
+        */
+       public void enableNumImmutableMemTable() {
+               
this.properties.add(RocksDBProperty.NumImmutableMemTable.getRocksDBProperty());
+       }
+
+       /**
+        * Returns 1 if a memtable flush is pending; otherwise, returns 0.
+        */
+       public void enableMemTableFlushPending() {
+               
this.properties.add(RocksDBProperty.MemTableFlushPending.getRocksDBProperty());
+       }
+
+       /**
+        * Returns 1 if at least one compaction is pending; otherwise, returns 
0.
+        */
+       public void enableCompactionPending() {
+               
this.properties.add(RocksDBProperty.CompactionPending.getRocksDBProperty());
+       }
+
+       /**
+        * Returns accumulated number of background errors.
+        */
+       public void enableBackgroundErrors() {
+               
this.properties.add(RocksDBProperty.BackgroundErrors.getRocksDBProperty());
+       }
+
+       /**
+        * Returns approximate size of active memtable (bytes).
+        */
+       public void enableCurSizeActiveMemTable() {
+               
this.properties.add(RocksDBProperty.CurSizeActiveMemTable.getRocksDBProperty());
+       }
+
+       /**
+        * Returns approximate size of active and unflushed immutable memtables 
(bytes).
+        */
+       public void enableCurSizeAllMemTables() {
+               
this.properties.add(RocksDBProperty.CurSizeAllMemTables.getRocksDBProperty());
+       }
+
+       /**
+        * Returns approximate size of active, unflushed immutable, and pinned 
immutable memtables (bytes).
+        */
+       public void enableSizeAllMemTables() {
+               
this.properties.add(RocksDBProperty.SizeAllMemTables.getRocksDBProperty());
+       }
+
+       /**
+        * Returns total number of entries in the active memtable.
+        */
+       public void enableNumEntriesActiveMemTable() {
+               
this.properties.add(RocksDBProperty.NumEntriesActiveMemTable.getRocksDBProperty());
+       }
+
+       /**
+        * Returns total number of entries in the unflushed immutable memtables.
+        */
+       public void enableNumEntriesImmMemTables() {
+               
this.properties.add(RocksDBProperty.NumEntriesImmMemTables.getRocksDBProperty());
+       }
+
+       /**
+        * Returns total number of delete entries in the active memtable.
+        */
+       public void enableNumDeletesActiveMemTable() {
+               
this.properties.add(RocksDBProperty.NumDeletesActiveMemTable.getRocksDBProperty());
+       }
+
+       /**
+        * Returns total number of delete entries in the unflushed immutable 
memtables.
+        */
+       public void enableNumDeletesImmMemTables() {
+               
this.properties.add(RocksDBProperty.NumDeletesImmMemTables.getRocksDBProperty());
+       }
+
+       /**
+        * Returns estimated number of total keys in the active and unflushed 
immutable memtables and storage.
+        */
+       public void enableEstimateNumKeys() {
+               
this.properties.add(RocksDBProperty.EstimateNumKeys.getRocksDBProperty());
+       }
+
+       /**
+        * Returns estimated memory used for reading SST tables, excluding 
memory
+        * used in block cache (e.g.,filter and index blocks).
+        */
+       public void enableEstimateTableReadersMem() {
+               
this.properties.add(RocksDBProperty.EstimateTableReadersMem.getRocksDBProperty());
+       }
+
+       /**
+        * Returns number of unreleased snapshots of the database.
+        */
+       public void enableNumSnapshots() {
+               
this.properties.add(RocksDBProperty.NumSnapshots.getRocksDBProperty());
+       }
+
+       /**
+        * Returns number of live versions. `Version`
+        * is an internal data structure. See version_set.h for details. More
+        * live versions often mean more SST files are held from being deleted,
+        * by iterators or unfinished compactions.
+        */
+       public void enableNumLiveVersions() {
+               
this.properties.add(RocksDBProperty.NumLiveVersions.getRocksDBProperty());
+       }
+
+       /**
+        * Returns an estimate of the amount of live data in bytes.
+        */
+       public void enableEstimateLiveDataSize() {
+               
this.properties.add(RocksDBProperty.EstimateLiveDataSize.getRocksDBProperty());
+       }
+
+       /**
+        * Returns total size (bytes) of all SST files.
+        * <strong>WARNING</strong>: may slow down online queries if there are 
too many files.
+        */
+       public void enableTotalSstFilesSize() {
+               
this.properties.add(RocksDBProperty.TotalSstFilesSize.getRocksDBProperty());
+       }
+
+       /**
+        * Returns estimated total number of bytes compaction needs to rewrite 
to get all levels down
+        * to under target size. Not valid for other compactions than 
level-based.
+        */
+       public void enableEstimatePendingCompactionBytes() {
+               
this.properties.add(RocksDBProperty.EstimatePendingCompactionBytes.getRocksDBProperty());
+       }
+
+       /**
+        * Returns the number of currently running compactions.
+        */
+       public void enableNumRunningCompactions() {
+               
this.properties.add(RocksDBProperty.NumRunningCompactions.getRocksDBProperty());
+       }
+
+       /**
+        * Returns the number of currently running flushes.
+        */
+       public void enableNumRunningFlushes() {
+               
this.properties.add(RocksDBProperty.NumRunningFlushes.getRocksDBProperty());
+       }
+
+       /**
+        * Returns the current actual delayed write rate. 0 means no delay.
+        */
+       public void enableActualDelayedWriteRate() {
+               
this.properties.add(RocksDBProperty.ActualDelayedWriteRate.getRocksDBProperty());
+       }
+
+       /**
+        * @return the enabled RocksDB metrics
+        */
+       public Collection<String> getProperties() {
+               return Collections.unmodifiableCollection(properties);
+       }
+
+       /**
+        * {{@link RocksDBNativeMetricMonitor}} is enabled is any property is 
set.
+        *
+        * @return true if {{RocksDBNativeMetricMonitor}} should be enabled, 
false otherwise.
+        */
+       public boolean isEnabled() {
+               return !properties.isEmpty();
+       }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBProperty.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBProperty.java
new file mode 100644
index 00000000000..ef680c90075
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBProperty.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.annotation.Internal;
+
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+
+/**
+ * {@link RocksDB} properties that can be queried by Flink's metrics reporter.
+ *
+ * <p>Note: Metrics properties are added in each new version of {@link 
RocksDB},
+ * when upgrading to a latter version consider updating this class with newly 
added
+ * properties.
+ */
+@Internal
+public enum RocksDBProperty {
+       NumImmutableMemTable("num-immutable-mem-table"),
+       MemTableFlushPending("mem-table-flush-pending"),
+       CompactionPending("compaction-pending"),
+       BackgroundErrors("background-errors"),
+       CurSizeActiveMemTable("cur-size-active-mem-table"),
+       CurSizeAllMemTables("cur-size-all-mem-tables"),
+       SizeAllMemTables("size-all-mem-tables"),
+       NumEntriesActiveMemTable("num-entries-active-mem-table"),
+       NumEntriesImmMemTables("num-entries-imm-mem-tables"),
+       NumDeletesActiveMemTable("num-deletes-active-mem-table"),
+       NumDeletesImmMemTables("num-deletes-imm-mem-tables"),
+       EstimateNumKeys("estimate-num-keys"),
+       EstimateTableReadersMem("estimate-table-readers-mem"),
+       NumSnapshots("num-snapshots"),
+       NumLiveVersions("num-live-versions"),
+       EstimateLiveDataSize("estimate-live-data-size"),
+       TotalSstFilesSize("total-sst-files-size"),
+       EstimatePendingCompactionBytes("estimate-pending-compaction-bytes"),
+       NumRunningCompactions("num-running-compactions"),
+       NumRunningFlushes("num-running-flushes"),
+       ActualDelayedWriteRate("actual-delayed-write-rate");
+
+       private static final String ROCKS_DB_PROPERTY_FORMAT = "rocksdb.%s";
+
+       private static final String CONFIG_KEY_FORMAT = 
"state.backend.rocksdb.metrics.%s";
+
+       private final String property;
+
+       RocksDBProperty(String property) {
+               this.property = property;
+       }
+
+       /**
+        * @return property string that can be used to query {@link 
RocksDB#getLongProperty(ColumnFamilyHandle, String)}.
+        */
+       public String getRocksDBProperty() {
+               return String.format(ROCKS_DB_PROPERTY_FORMAT, property);
+       }
+
+       /**
+        * @return key for enabling metric using {@link 
org.apache.flink.configuration.Configuration}.
+        */
+       public String getConfigKey() {
+               return String.format(CONFIG_KEY_FORMAT, property);
+       }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index b8bd73c08e1..794e22160a2 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -24,6 +24,7 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -122,6 +123,9 @@
        /** This determines the type of priority queue state. */
        private final PriorityQueueStateType priorityQueueStateType;
 
+       /** The default rocksdb metrics options. */
+       private final RocksDBNativeMetricOptions defaultMetricOptions;
+
        // -- runtime values, set on TaskManager when initializing / using the 
backend
 
        /** Base paths for RocksDB directory, as initialized. */
@@ -236,6 +240,7 @@ public RocksDBStateBackend(StateBackend 
checkpointStreamBackend, TernaryBoolean
                this.enableIncrementalCheckpointing = 
enableIncrementalCheckpointing;
                // for now, we use still the heap-based implementation as 
default
                this.priorityQueueStateType = PriorityQueueStateType.HEAP;
+               this.defaultMetricOptions = new RocksDBNativeMetricOptions();
        }
 
        /**
@@ -295,6 +300,9 @@ private RocksDBStateBackend(RocksDBStateBackend original, 
Configuration config)
                        }
                }
 
+               // configure metric options
+               this.defaultMetricOptions = 
RocksDBNativeMetricOptions.fromConfig(config);
+
                // copy remaining settings
                this.predefinedOptions = original.predefinedOptions;
                this.optionsFactory = original.optionsFactory;
@@ -412,7 +420,8 @@ public CheckpointStorage createCheckpointStorage(JobID 
jobId) throws IOException
                        int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
                        TaskKvStateRegistry kvStateRegistry,
-                       TtlTimeProvider ttlTimeProvider) throws IOException {
+                       TtlTimeProvider ttlTimeProvider,
+                       MetricGroup metricGroup) throws IOException {
 
                // first, make sure that the RocksDB JNI library is loaded
                // we do this explicitly here to have better error handling
@@ -445,7 +454,9 @@ public CheckpointStorage createCheckpointStorage(JobID 
jobId) throws IOException
                                isIncrementalCheckpointsEnabled(),
                                localRecoveryConfig,
                                priorityQueueStateType,
-                               ttlTimeProvider);
+                               ttlTimeProvider,
+                               getMemoryWatcherOptions(),
+                               metricGroup);
        }
 
        @Override
@@ -666,6 +677,15 @@ public ColumnFamilyOptions getColumnOptions() {
                return opt;
        }
 
+       public RocksDBNativeMetricOptions getMemoryWatcherOptions() {
+               RocksDBNativeMetricOptions options = this.defaultMetricOptions;
+               if (optionsFactory != null) {
+                       options = 
optionsFactory.createNativeMetricsOptions(options);
+               }
+
+               return options;
+       }
+
        // 
------------------------------------------------------------------------
        //  utilities
        // 
------------------------------------------------------------------------
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java
new file mode 100644
index 00000000000..7a817ebb0e1
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricMonitorTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.GenericMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.rocksdb.ColumnFamilyHandle;
+
+import javax.annotation.Nullable;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+
+/**
+ * validate native metric monitor.
+ */
+public class RocksDBNativeMetricMonitorTest {
+
+       private static final String OPERATOR_NAME = "dummy";
+
+       private static final String COLUMN_FAMILY_NAME = "column-family";
+
+       @Rule
+       public RocksDBResource rocksDBResource = new RocksDBResource();
+
+       @Test
+       public void testMetricMonitorLifecycle() throws Throwable {
+               //We use a local variable here to manually control the 
life-cycle.
+               // This allows us to verify that metrics do not try to access
+               // RocksDB after the monitor was closed.
+               RocksDBResource localRocksDBResource = new RocksDBResource();
+               localRocksDBResource.before();
+
+               SimpleMetricRegistry registry = new SimpleMetricRegistry();
+               GenericMetricGroup group = new GenericMetricGroup(
+                       registry,
+                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
+                       OPERATOR_NAME
+               );
+
+               RocksDBNativeMetricOptions options = new 
RocksDBNativeMetricOptions();
+               // always returns a non-zero
+               // value since empty memtables
+               // have overhead.
+               options.enableSizeAllMemTables();
+
+               RocksDBNativeMetricMonitor monitor = new 
RocksDBNativeMetricMonitor(
+                       localRocksDBResource.getRocksDB(),
+                       options,
+                       group
+               );
+
+               ColumnFamilyHandle handle = 
localRocksDBResource.createNewColumnFamily(COLUMN_FAMILY_NAME);
+               monitor.registerColumnFamily(COLUMN_FAMILY_NAME, handle);
+
+               Assert.assertEquals("Failed to register metrics for column 
family", 1, registry.metrics.size());
+
+               RocksDBNativeMetricMonitor.RocksDBNativeMetricView view = 
registry.metrics.get(0);
+
+               view.update();
+
+               Assert.assertNotEquals("Failed to pull metric from RocksDB", 
BigInteger.ZERO, view.getValue());
+
+               view.setValue(0L);
+
+               //After the monitor is closed no metric should be accessing 
RocksDB anymore.
+               //If they do, then this test will likely fail with a 
segmentation fault.
+               monitor.close();
+
+               localRocksDBResource.after();
+
+               view.update();
+
+               Assert.assertEquals("Failed to release RocksDB reference", 
BigInteger.ZERO, view.getValue());
+       }
+
+       @Test
+       public void testReturnsUnsigned() {
+               SimpleMetricRegistry registry = new SimpleMetricRegistry();
+               GenericMetricGroup group = new GenericMetricGroup(
+                       registry,
+                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
+                       OPERATOR_NAME
+               );
+
+               RocksDBNativeMetricOptions options = new 
RocksDBNativeMetricOptions();
+               options.enableSizeAllMemTables();
+
+               RocksDBNativeMetricMonitor monitor = new 
RocksDBNativeMetricMonitor(
+                       null,
+                       options,
+                       group
+               );
+
+               monitor.registerColumnFamily(COLUMN_FAMILY_NAME, null);
+               RocksDBNativeMetricMonitor.RocksDBNativeMetricView view = 
registry.metrics.get(0);
+
+               view.setValue(-1);
+               BigInteger result = view.getValue();
+
+               Assert.assertEquals("Failed to interpret RocksDB result as an 
unsigned long", 1, result.signum());
+       }
+
+       @Test
+       public void testClosedGaugesDontRead() {
+               SimpleMetricRegistry registry = new SimpleMetricRegistry();
+               GenericMetricGroup group = new GenericMetricGroup(
+                       registry,
+                       
UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
+                       OPERATOR_NAME
+               );
+
+               RocksDBNativeMetricOptions options = new 
RocksDBNativeMetricOptions();
+               options.enableSizeAllMemTables();
+
+               RocksDBNativeMetricMonitor monitor = new 
RocksDBNativeMetricMonitor(
+                       rocksDBResource.getRocksDB(),
+                       options,
+                       group
+               );
+
+               ColumnFamilyHandle handle = 
rocksDBResource.createNewColumnFamily(COLUMN_FAMILY_NAME);
+               monitor.registerColumnFamily(COLUMN_FAMILY_NAME, handle);
+
+               RocksDBNativeMetricMonitor.RocksDBNativeMetricView view = 
registry.metrics.get(0);
+
+               view.close();
+               view.update();
+
+               Assert.assertEquals("Closed gauge still queried RocksDB", 
BigInteger.ZERO, view.getValue());
+       }
+
+       static class SimpleMetricRegistry implements MetricRegistry {
+               ArrayList<RocksDBNativeMetricMonitor.RocksDBNativeMetricView> 
metrics = new ArrayList<>();
+
+               @Override
+               public char getDelimiter() {
+                       return 0;
+               }
+
+               @Override
+               public char getDelimiter(int index) {
+                       return 0;
+               }
+
+               @Override
+               public int getNumberReporters() {
+                       return 0;
+               }
+
+               @Override
+               public void register(Metric metric, String metricName, 
AbstractMetricGroup group) {
+                       if (metric instanceof 
RocksDBNativeMetricMonitor.RocksDBNativeMetricView) {
+                               
metrics.add((RocksDBNativeMetricMonitor.RocksDBNativeMetricView) metric);
+                       }
+               }
+
+               @Override
+               public void unregister(Metric metric, String metricName, 
AbstractMetricGroup group) {
+
+               }
+
+               @Override
+               public ScopeFormats getScopeFormats() {
+                       Configuration config = new Configuration();
+
+                       config.setString(MetricOptions.SCOPE_NAMING_TM, "A");
+                       config.setString(MetricOptions.SCOPE_NAMING_TM_JOB, 
"B");
+                       config.setString(MetricOptions.SCOPE_NAMING_TASK, "C");
+                       config.setString(MetricOptions.SCOPE_NAMING_OPERATOR, 
"D");
+
+                       return ScopeFormats.fromConfig(config);
+               }
+
+               @Nullable
+               @Override
+               public String getMetricQueryServicePath() {
+                       return null;
+               }
+       }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptionsTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptionsTest.java
new file mode 100644
index 00000000000..f5becef43b8
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptionsTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test all native metrics can be set using configuration.
+ */
+public class RocksDBNativeMetricOptionsTest {
+       @Test
+       public void testNativeMetricsConfigurable() {
+               for (RocksDBProperty property : RocksDBProperty.values()) {
+                       Configuration config = new Configuration();
+                       config.setBoolean(property.getConfigKey(), true);
+
+                       RocksDBNativeMetricOptions options = 
RocksDBNativeMetricOptions.fromConfig(config);
+
+                       Assert.assertTrue(
+                               String.format("Failed to enable native metrics 
with property %s", property.getConfigKey()),
+                               options.isEnabled());
+
+                       Assert.assertTrue(
+                               String.format("Failed to enable native metric 
%s using config", property.getConfigKey()),
+                               
options.getProperties().contains(property.getRocksDBProperty())
+                       );
+               }
+       }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBPropertyTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBPropertyTest.java
new file mode 100644
index 00000000000..da8d07c9434
--- /dev/null
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBPropertyTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+
+
+/**
+ * Validate RocksDB properties.
+ */
+public class RocksDBPropertyTest {
+
+       @Rule
+       public RocksDBResource rocksDBResource = new RocksDBResource();
+
+       @Test
+       public void testRocksDBPropertiesValid() throws RocksDBException {
+               RocksDB db = rocksDBResource.getRocksDB();
+               ColumnFamilyHandle handle = 
rocksDBResource.getDefaultColumnFamily();
+
+               for (RocksDBProperty property : RocksDBProperty.values()) {
+                       try {
+                               db.getLongProperty(handle, 
property.getRocksDBProperty());
+                       } catch (RocksDBException e) {
+                               throw new AssertionError(String.format("Invalid 
RocksDB property %s", property.getRocksDBProperty()), e);
+                       }
+               }
+       }
+}
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 4916251fc1f..d7d6bdea879 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -25,6 +25,7 @@
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -250,7 +251,9 @@ public void testCorrectMergeOperatorSet() throws 
IOException {
                                enableIncrementalCheckpointing,
                                TestLocalRecoveryConfig.disabled(),
                                RocksDBStateBackend.PriorityQueueStateType.HEAP,
-                               TtlTimeProvider.DEFAULT);
+                               TtlTimeProvider.DEFAULT,
+                               new RocksDBNativeMetricOptions(),
+                               new UnregisteredMetricsGroup());
 
                        verify(columnFamilyOptions, Mockito.times(1))
                                
.setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index f3c22080ab7..a63a7971679 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -252,7 +252,8 @@ public final void initializeState() throws Exception {
                                getClass().getSimpleName(),
                                this,
                                keySerializer,
-                               streamTaskCloseableRegistry);
+                               streamTaskCloseableRegistry,
+                               metrics);
 
                this.operatorStateBackend = context.operatorStateBackend();
                this.keyedStateBackend = context.keyedStateBackend();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializer.java
index 4c9153506be..60e81828c54 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializer.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 
 import javax.annotation.Nonnull;
@@ -41,6 +42,7 @@
         * @param keyContext the key context of the operator instance for which 
the context is created Cannot be null.
         * @param keySerializer the key-serializer for the operator. Can be 
null.
         * @param streamTaskCloseableRegistry the closeable registry to which 
created closeable objects will be registered.
+        * @param metricGroup the parent metric group for all statebackend 
metrics
         * @return a context from which the given operator can initialize 
everything related to state.
         * @throws Exception when something went wrong while creating the 
context.
         */
@@ -49,5 +51,6 @@ StreamOperatorStateContext streamOperatorStateContext(
                @Nonnull String operatorClassName,
                @Nonnull KeyContext keyContext,
                @Nullable TypeSerializer<?> keySerializer,
-               @Nonnull CloseableRegistry streamTaskCloseableRegistry) throws 
Exception;
+               @Nonnull CloseableRegistry streamTaskCloseableRegistry,
+               @Nonnull MetricGroup metricGroup) throws Exception;
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
index 64af9939834..42abb5139bf 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
@@ -23,6 +23,7 @@
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.execution.Environment;
@@ -106,7 +107,8 @@ public StreamOperatorStateContext 
streamOperatorStateContext(
                @Nonnull String operatorClassName,
                @Nonnull KeyContext keyContext,
                @Nullable TypeSerializer<?> keySerializer,
-               @Nonnull CloseableRegistry streamTaskCloseableRegistry) throws 
Exception {
+               @Nonnull CloseableRegistry streamTaskCloseableRegistry,
+               @Nonnull MetricGroup metricGroup) throws Exception {
 
                TaskInfo taskInfo = environment.getTaskInfo();
                OperatorSubtaskDescriptionText operatorSubtaskDescription =
@@ -134,7 +136,8 @@ public StreamOperatorStateContext 
streamOperatorStateContext(
                                keySerializer,
                                operatorIdentifierText,
                                prioritizedOperatorSubtaskStates,
-                               streamTaskCloseableRegistry);
+                               streamTaskCloseableRegistry,
+                               metricGroup);
 
                        // -------------- Operator State Backend --------------
                        operatorStateBackend = operatorStateBackend(
@@ -247,7 +250,8 @@ protected OperatorStateBackend operatorStateBackend(
                TypeSerializer<K> keySerializer,
                String operatorIdentifierText,
                PrioritizedOperatorSubtaskState 
prioritizedOperatorSubtaskStates,
-               CloseableRegistry backendCloseableRegistry) throws Exception {
+               CloseableRegistry backendCloseableRegistry,
+               MetricGroup metricGroup) throws Exception {
 
                if (keySerializer == null) {
                        return null;
@@ -272,7 +276,8 @@ protected OperatorStateBackend operatorStateBackend(
                                        
taskInfo.getMaxNumberOfParallelSubtasks(),
                                        keyGroupRange,
                                        environment.getTaskKvStateRegistry(),
-                                       TtlTimeProvider.DEFAULT),
+                                       TtlTimeProvider.DEFAULT,
+                                       metricGroup),
                                backendCloseableRegistry,
                                logDescription);
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
index 7d9dcd5ef11..bcfe195f340 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
@@ -28,6 +28,7 @@
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
@@ -196,7 +197,8 @@ public void setUp() throws Exception {
                        // notice that this essentially disables the previous 
test of the keyed stream because it was and is always
                        // consumed by the timer service.
                        IntSerializer.INSTANCE,
-                       closableRegistry);
+                       closableRegistry,
+                       new UnregisteredMetricsGroup());
 
                this.initializationContext =
                                new StateInitializationContextImpl(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
index 6233c4c12c4..3dafa024f8b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
@@ -22,6 +22,8 @@
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
@@ -91,7 +93,8 @@ public void testNoRestore() throws Exception {
                        streamOperator.getClass().getSimpleName(),
                        streamOperator,
                        typeSerializer,
-                       closeableRegistry);
+                       closeableRegistry,
+                       new UnregisteredMetricsGroup());
 
                OperatorStateBackend operatorStateBackend = 
stateContext.operatorStateBackend();
                AbstractKeyedStateBackend<?> keyedStateBackend = 
stateContext.keyedStateBackend();
@@ -140,7 +143,8 @@ public CheckpointStorage createCheckpointStorage(JobID 
jobId) throws IOException
                                TypeSerializer<K> keySerializer,
                                int numberOfKeyGroups, KeyGroupRange 
keyGroupRange,
                                TaskKvStateRegistry kvStateRegistry,
-                               TtlTimeProvider ttlTimeProvider) throws 
Exception {
+                               TtlTimeProvider ttlTimeProvider,
+                               MetricGroup metricGroup) throws Exception {
                                return mock(AbstractKeyedStateBackend.class);
                        }
 
@@ -192,7 +196,8 @@ public OperatorStateBackend createOperatorStateBackend(
                        streamOperator.getClass().getSimpleName(),
                        streamOperator,
                        typeSerializer,
-                       closeableRegistry);
+                       closeableRegistry,
+                       new UnregisteredMetricsGroup());
 
                OperatorStateBackend operatorStateBackend = 
stateContext.operatorStateBackend();
                AbstractKeyedStateBackend<?> keyedStateBackend = 
stateContext.keyedStateBackend();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index aebad543d06..09276986c58 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -23,6 +23,7 @@
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.blob.BlobCacheService;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
 import org.apache.flink.runtime.blob.TransientBlobCache;
@@ -263,7 +264,8 @@ public CheckpointStorage createCheckpointStorage(JobID 
jobId) throws IOException
                        int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
                        TaskKvStateRegistry kvStateRegistry,
-                       TtlTimeProvider ttlTimeProvider) {
+                       TtlTimeProvider ttlTimeProvider,
+                       MetricGroup metricGroup) {
                        return null;
                }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 997e05c5bf4..bc145799fb3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -1073,14 +1073,15 @@ protected void cancelTask() throws Exception {}
                @Override
                public StreamTaskStateInitializer 
createStreamTaskStateInitializer() {
                        final StreamTaskStateInitializer streamTaskStateManager 
= super.createStreamTaskStateInitializer();
-                       return (operatorID, operatorClassName, keyContext, 
keySerializer, closeableRegistry) -> {
+                       return (operatorID, operatorClassName, keyContext, 
keySerializer, closeableRegistry, metricGroup) -> {
 
                                final StreamOperatorStateContext context = 
streamTaskStateManager.streamOperatorStateContext(
                                        operatorID,
                                        operatorClassName,
                                        keyContext,
                                        keySerializer,
-                                       closeableRegistry);
+                                       closeableRegistry,
+                                       metricGroup);
 
                                return new StreamOperatorStateContext() {
                                        @Override
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java
index 214fab5b5a6..15f1ddd29f7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -55,7 +56,8 @@ public TestSpyWrapperStateBackend(AbstractStateBackend 
delegate) {
                        int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
                        TaskKvStateRegistry kvStateRegistry,
-                       TtlTimeProvider ttlTimeProvider) throws IOException {
+                       TtlTimeProvider ttlTimeProvider,
+                       MetricGroup metricGroup) throws IOException {
                        return spy(delegate.createKeyedStateBackend(
                                env,
                                jobID,
@@ -64,7 +66,8 @@ public TestSpyWrapperStateBackend(AbstractStateBackend 
delegate) {
                                numberOfKeyGroups,
                                keyGroupRange,
                                kvStateRegistry,
-                               ttlTimeProvider));
+                               ttlTimeProvider,
+                               metricGroup));
                }
 
                @Override
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index f3bae166a03..d1543b5765b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -25,6 +25,7 @@
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -113,7 +114,8 @@ public CheckpointStorage createCheckpointStorage(JobID 
jobId) throws IOException
                        int numberOfKeyGroups,
                        KeyGroupRange keyGroupRange,
                        TaskKvStateRegistry kvStateRegistry,
-                       TtlTimeProvider ttlTimeProvider) throws IOException {
+                       TtlTimeProvider ttlTimeProvider,
+                       MetricGroup metricGroup) throws IOException {
                        throw new SuccessException();
                }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Forward RocksDB native metrics to Flink metrics reporter 
> ---------------------------------------------------------
>
>                 Key: FLINK-10423
>                 URL: https://issues.apache.org/jira/browse/FLINK-10423
>             Project: Flink
>          Issue Type: New Feature
>          Components: Metrics, State Backends, Checkpointing
>            Reporter: Seth Wiesman
>            Assignee: Seth Wiesman
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> RocksDB contains a number of metrics at the column family level about current 
> memory usage, open memtables,  etc that would be useful to users wishing 
> greater insight what rocksdb is doing. This work is inspired heavily by the 
> comments on this rocksdb issue thread 
> (https://github.com/facebook/rocksdb/issues/3216#issuecomment-348779233)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to