This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new d20b9b3  SAMZA-2397: Updating gauge-val function on newGauge on same 
metric name (#1223)
d20b9b3 is described below

commit d20b9b32a8285e936511e4be2643610671ac2922
Author: rmatharu <40646191+rmath...@users.noreply.github.com>
AuthorDate: Tue Jan 21 15:43:53 2020 -0800

    SAMZA-2397: Updating gauge-val function on newGauge on same metric name 
(#1223)
    
    * SAMZA-2397: Updating gauge-val function on newGauge on same metric name
    * Adding unit test for metrics
---
 .../apache/samza/metrics/MetricsRegistryMap.scala  |  6 ++++--
 .../storage/kv/TestRocksDbKeyValueStore.scala      | 25 +++++++++++++++++++++-
 2 files changed, 28 insertions(+), 3 deletions(-)

diff --git 
a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala 
b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
index 40ffee2..9751f68 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/metrics/MetricsRegistryMap.scala
@@ -50,8 +50,10 @@ class MetricsRegistryMap(val name: String) extends 
ReadableMetricsRegistry with
   }
 
   def newGauge[T](group: String, gauge: Gauge[T]) = {
-    debug("Adding new gauge %s %s %s." format (group, gauge.getName, gauge))
-    putAndGetGroup(group).putIfAbsent(gauge.getName, gauge)
+    if (putAndGetGroup(group).containsKey(gauge.getName)) {
+      debug("Updating existing gauge %s %s %s" format (group, gauge.getName, 
gauge))
+    }
+    putAndGetGroup(group).put(gauge.getName, gauge)
     val realGauge = 
metrics.get(group).get(gauge.getName).asInstanceOf[Gauge[T]]
     listeners.foreach(_.onGauge(group, realGauge))
     realGauge
diff --git 
a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
 
b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
index ca9c023..2a4f44e 100644
--- 
a/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
+++ 
b/samza-kv-rocksdb/src/test/scala/org/apache/samza/storage/kv/TestRocksDbKeyValueStore.scala
@@ -25,7 +25,7 @@ import java.util
 
 import org.apache.samza.SamzaException
 import org.apache.samza.config.MapConfig
-import org.apache.samza.metrics.{Gauge, MetricsRegistryMap}
+import org.apache.samza.metrics.{Counter, Gauge, MetricsRegistryMap, 
MetricsVisitor, Timer}
 import org.apache.samza.util.ExponentialSleepStrategy
 import org.junit.{Assert, Test}
 import org.rocksdb.{FlushOptions, Options, RocksDB, RocksIterator}
@@ -210,4 +210,27 @@ class TestRocksDbKeyValueStore
 
     rocksDB.close()
   }
+
+  @Test
+  def testRocksDBMetricsWithBulkLoadRWRecreate(): Unit = {
+    val registry = new MetricsRegistryMap("registrymap")
+    val metrics = new KeyValueStoreMetrics("dbstore", registry)
+
+    // Sample metric values for estimate-num-keys metrics
+    val bulkloadStoreMetricValue = "100"
+    val readWriteStoreMetricValue = "10"
+
+    // Metric during bulk-load/bootstrap
+    metrics.newGauge("estimate-num-keys", () => bulkloadStoreMetricValue)
+
+    
assert(registry.getGroup("org.apache.samza.storage.kv.KeyValueStoreMetrics").
+      
get("dbstore-estimate-num-keys").asInstanceOf[Gauge[String]].getValue.eq("100"))
+
+    // Bulk-load complete, new store in read-write mode
+    metrics.newGauge("estimate-num-keys", () => 
readWriteStoreMetricValue.toString)
+
+    
assert(registry.getGroup("org.apache.samza.storage.kv.KeyValueStoreMetrics").
+      
get("dbstore-estimate-num-keys").asInstanceOf[Gauge[String]].getValue.eq("10"))
+  }
+
 }

Reply via email to