Repository: samza Updated Branches: refs/heads/master 267dfc6ba -> 3a9e80642
SAMZA-1500: Added metrics for RocksDB state store memory usage Approximate RocksDB memory usage = Configured Block Cache size + MemTable size + Indexes and Bloom Filters size = rocksdb.block-cache-size + rocksdb.size-all-mem-tables + rocksdb.estimate-table-readers-mem Author: Prateek Maheshwari <pmahe...@linkedin.com> Reviewers: Jagadish <jagad...@apache.org> Closes #404 from prateekm/rocksdb-memory Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3a9e8064 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3a9e8064 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3a9e8064 Branch: refs/heads/master Commit: 3a9e8064211a79827a2a4793c1f159a689dfa256 Parents: 267dfc6 Author: Prateek Maheshwari <pmahe...@linkedin.com> Authored: Wed Jan 17 09:39:26 2018 -0800 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Wed Jan 17 09:39:26 2018 -0800 ---------------------------------------------------------------------- .../samza/storage/kv/RocksDbOptionsHelper.java | 12 ++++++---- .../RocksDbKeyValueStorageEngineFactory.scala | 3 +++ .../samza/storage/kv/RocksDbKeyValueStore.scala | 25 ++++++++++++++------ .../storage/kv/TestRocksDbKeyValueStore.scala | 6 ++--- 4 files changed, 32 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/3a9e8064/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java index 9b8f44b..9389681 100644 --- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java +++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java @@ -75,12 +75,10 @@ public class RocksDbOptionsHelper { } options.setCompressionType(compressionType); - Long cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 1024 * 1024L); - Long cacheSizePerContainer = cacheSize / numTasks; - + long blockCacheSize = getBlockCacheSize(storeConfig, containerContext); int blockSize = storeConfig.getInt(ROCKSDB_BLOCK_SIZE_BYTES, 4096); BlockBasedTableConfig tableOptions = new BlockBasedTableConfig(); - tableOptions.setBlockCacheSize(cacheSizePerContainer).setBlockSize(blockSize); + tableOptions.setBlockCacheSize(blockCacheSize).setBlockSize(blockSize); options.setTableFormatConfig(tableOptions); CompactionStyle compactionStyle = CompactionStyle.UNIVERSAL; @@ -110,4 +108,10 @@ public class RocksDbOptionsHelper { return options; } + + public static Long getBlockCacheSize(Config storeConfig, SamzaContainerContext containerContext) { + int numTasks = containerContext.taskNames.size(); + long cacheSize = storeConfig.getLong("container.cache.size.bytes", 100 * 1024 * 1024L); + return cacheSize / numTasks; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/3a9e8064/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala index a7b748f..2b7ffb5 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStorageEngineFactory.scala @@ -44,6 +44,9 @@ class RocksDbKeyValueStorageEngineFactory [K, V] extends BaseKeyValueStorageEngi val storageConfig = containerContext.config.subset("stores." + storeName + ".", true) val isLoggedStore = containerContext.config.getChangelogStream(storeName).isDefined val rocksDbMetrics = new KeyValueStoreMetrics(storeName, registry) + rocksDbMetrics.newGauge("rocksdb.block-cache-size", + () => RocksDbOptionsHelper.getBlockCacheSize(storageConfig, containerContext)) + val rocksDbOptions = RocksDbOptionsHelper.options(storageConfig, containerContext) val rocksDbWriteOptions = new WriteOptions().setDisableWAL(true) val rocksDbFlushOptions = new FlushOptions().setWaitForFlush(true) http://git-wip-us.apache.org/repos/asf/samza/blob/3a9e8064/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala index 6aad45f..eae7da2 100644 --- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala @@ -72,13 +72,24 @@ object RocksDbKeyValueStore extends Logging { RocksDB.open(options, dir.toString) } - if (storeConfig.containsKey("rocksdb.metrics.list")) { - storeConfig - .get("rocksdb.metrics.list") - .split(",") - .map(property => property.trim) - .foreach(property => metrics.newGauge(property, () => rocksDb.getProperty(property))) - } + // See https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h for available properties + val rocksDbMetrics = Set ( + "rocksdb.estimate-table-readers-mem", // indexes and bloom filters + "rocksdb.cur-size-active-mem-table", // approximate active memtable size in bytes + "rocksdb.cur-size-all-mem-tables", // approximate active and unflushed memtable size in bytes + "rocksdb.size-all-mem-tables", // approximate active, unflushed and pinned memtable size in bytes + "rocksdb.estimate-num-keys" // approximate number keys in the active and unflushed memtable and storage + ) + + val configuredMetrics = storeConfig + .get("rocksdb.metrics.list", "") + .split(",") + .map(property => property.trim) + .filter(!_.isEmpty) + .toSet + + (configuredMetrics ++ rocksDbMetrics) + .foreach(property => metrics.newGauge(property, () => rocksDb.getProperty(property))) rocksDb } catch { http://git-wip-us.apache.org/repos/asf/samza/blob/3a9e8064/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala index 418e986..ca9c023 100644 --- a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala @@ -44,7 +44,7 @@ class TestRocksDbKeyValueStore config, false, "someStore", - null) + new KeyValueStoreMetrics()) val key = "test".getBytes("UTF-8") rocksDB.put(key, "val".getBytes("UTF-8")) Assert.assertNotNull(rocksDB.get(key)) @@ -76,7 +76,7 @@ class TestRocksDbKeyValueStore config, false, "dbStore", - null) + new KeyValueStoreMetrics()) val key = "key".getBytes("UTF-8") rocksDB.put(key, "val".getBytes("UTF-8")) // SAMZA-836: Mysteriously,calling new FlushOptions() does not invoke the NativeLibraryLoader in rocksdbjni-3.13.1! @@ -136,7 +136,7 @@ class TestRocksDbKeyValueStore config, false, "dbStore", - null) + new KeyValueStoreMetrics()) val key = "key".getBytes("UTF-8") val key1 = "key1".getBytes("UTF-8")