This is an automated email from the ASF dual-hosted git repository. rayman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new d20b9b3 SAMZA-2397: Updating gauge-val function on newGauge on same metric name (#1223) d20b9b3 is described below commit d20b9b32a8285e936511e4be2643610671ac2922 Author: rmatharu <40646191+rmath...@users.noreply.github.com> AuthorDate: Tue Jan 21 15:43:53 2020 -0800 SAMZA-2397: Updating gauge-val function on newGauge on same metric name (#1223) * SAMZA-2397: Updating gauge-val function on newGauge on same metric name * Adding unit test for metrics --- .../apache/samza/metrics/MetricsRegistryMap.scala | 6 ++++-- .../storage/kv/TestRocksDbKeyValueStore.scala | 25 +++++++++++++++++++++- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala index 40ffee2..9751f68 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala @@ -50,8 +50,10 @@ class MetricsRegistryMap(val name: String) extends ReadableMetricsRegistry with } def newGauge[T](group: String, gauge: Gauge[T]) = { - debug("Adding new gauge %s %s %s." format (group, gauge.getName, gauge)) - putAndGetGroup(group).putIfAbsent(gauge.getName, gauge) + if (putAndGetGroup(group).containsKey(gauge.getName)) { + debug("Updating existing gauge %s %s %s" format (group, gauge.getName, gauge)) + } + putAndGetGroup(group).put(gauge.getName, gauge) val realGauge = metrics.get(group).get(gauge.getName).asInstanceOf[Gauge[T]] listeners.foreach(_.onGauge(group, realGauge)) realGauge diff --git a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala index ca9c023..2a4f44e 100644 --- a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala +++ b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala @@ -25,7 +25,7 @@ import java.util import org.apache.samza.SamzaException import org.apache.samza.config.MapConfig -import org.apache.samza.metrics.{Gauge, MetricsRegistryMap} +import org.apache.samza.metrics.{Counter, Gauge, MetricsRegistryMap, MetricsVisitor, Timer} import org.apache.samza.util.ExponentialSleepStrategy import org.junit.{Assert, Test} import org.rocksdb.{FlushOptions, Options, RocksDB, RocksIterator} @@ -210,4 +210,27 @@ class TestRocksDbKeyValueStore rocksDB.close() } + + @Test + def testRocksDBMetricsWithBulkLoadRWRecreate(): Unit = { + val registry = new MetricsRegistryMap("registrymap") + val metrics = new KeyValueStoreMetrics("dbstore", registry) + + // Sample metric values for estimate-num-keys metrics + val bulkloadStoreMetricValue = "100" + val readWriteStoreMetricValue = "10" + + // Metric during bulk-load/bootstrap + metrics.newGauge("estimate-num-keys", () => bulkloadStoreMetricValue) + + assert(registry.getGroup("org.apache.samza.storage.kv.KeyValueStoreMetrics"). + get("dbstore-estimate-num-keys").asInstanceOf[Gauge[String]].getValue.eq("100")) + + // Bulk-load complete, new store in read-write mode + metrics.newGauge("estimate-num-keys", () => readWriteStoreMetricValue.toString) + + assert(registry.getGroup("org.apache.samza.storage.kv.KeyValueStoreMetrics"). + get("dbstore-estimate-num-keys").asInstanceOf[Gauge[String]].getValue.eq("10")) + } + }