This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch 1.1.0
in repository https://gitbox.apache.org/repos/asf/samza.git

commit 655d60f5ed6a8b3cc6bace6b535744373736945a
Author: Ray Matharu <rmath...@linkedin.com>
AuthorDate: Fri Mar 8 12:21:20 2019 -0800

    Bugfix: Recent CSM refactor was causing some metrics to not be emitted. 
Fixed <task>-restore-time metric.
    
    Author: Ray Matharu <rmath...@linkedin.com>
    
    Reviewers: prateekm
    
    Closes #942 from rmatharu/test-metricsfix and squashes the following 
commits:
    
    c5a072f4 [Ray Matharu] minor fix
    3c1e25ad [Ray Matharu] minor
    b13b485a [Ray Matharu] minor
    7e60ad8b [Ray Matharu] minor
    7634a470 [Ray Matharu] removing CSM's registerMetrics
    78ee37e1 [Ray Matharu] removing unused imports
    f207f03d [Ray Matharu] minor
    8aec4fa7 [Ray Matharu] minor
    0c121262 [Ray Matharu] Fixing metrics after moving sideInputs to CSM
    8bd3b19f [Ray Matharu] Merge branch 'master' of 
https://github.com/apache/samza
    6b58c862 [Ray Matharu] Merge branch 'master' of 
https://github.com/apache/samza
    89be3652 [Ray Matharu] Merge branch 'master' of 
https://github.com/apache/samza
    6fe29268 [Ray Matharu] Merge branch 'master' of 
https://github.com/apache/samza
    96e3d8f3 [Ray Matharu] Merge branch 'master' of 
https://github.com/apache/samza
    40f68a61 [Ray Matharu] Merge branch 'master' of 
https://github.com/apache/samza
    497602ab [Ray Matharu] Merge branch 'master' of 
https://github.com/apache/samza
    1a72dc48 [Ray Matharu] Merge branch 'master' of 
https://github.com/apache/samza
    36c0b339 [Ray Matharu] Merge branch 'master' of 
https://github.com/apache/samza
    12ca96bb [Ray Matharu] Merge branch 'master' of 
https://github.com/apache/samza
    ee7daac8 [Ray Matharu] Merge branch 'master' of 
https://github.com/apache/samza
    08006871 [Ray Matharu] Merge branch 'master' of 
https://github.com/apache/samza
    916f66ae [Ray Matharu] Merge branch 'master' of 
https://github.com/apache/samza
    2c09b081 [Ray Matharu] Rocksdb bug fix
---
 .../org/apache/samza/container/SamzaContainer.scala     |  9 +++++----
 .../apache/samza/container/SamzaContainerMetrics.scala  |  4 ++--
 .../scala/org/apache/samza/container/TaskInstance.scala |  7 -------
 .../apache/samza/storage/ContainerStorageManager.java   | 17 +++++++++--------
 .../apache/samza/system/SystemConsumersMetrics.scala    |  7 +++++--
 .../java/org/apache/samza/task/TestAsyncRunLoop.java    |  1 -
 .../org/apache/samza/container/TestSamzaContainer.scala |  1 +
 .../samza/processor/StreamProcessorTestUtils.scala      |  1 +
 8 files changed, 23 insertions(+), 24 deletions(-)

diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 5df4678..ab89396 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -594,7 +594,6 @@ object SamzaContainer extends Logging {
           offsetManager = offsetManager,
           storageManager = storageManager,
           tableManager = tableManager,
-          reporters = reporters,
           systemStreamPartitions = taskSSPs -- taskSideInputSSPs,
           exceptionHandler = 
TaskInstanceExceptionHandler(taskInstanceMetrics.get(taskName).get, config),
           jobModel = jobModel,
@@ -663,6 +662,7 @@ object SamzaContainer extends Logging {
     new SamzaContainer(
       config = config,
       taskInstances = taskInstances,
+      taskInstanceMetrics = taskInstanceMetrics,
       runLoop = runLoop,
       systemAdmins = systemAdmins,
       consumerMultiplexer = consumerMultiplexer,
@@ -700,6 +700,7 @@ object SamzaContainer extends Logging {
 class SamzaContainer(
   config: Config,
   taskInstances: Map[TaskName, TaskInstance],
+  taskInstanceMetrics: Map[TaskName, TaskInstanceMetrics],
   runLoop: Runnable,
   systemAdmins: SystemAdmins,
   consumerMultiplexer: SystemConsumers,
@@ -879,9 +880,9 @@ class SamzaContainer(
   }
 
   def startMetrics {
-    info("Registering task instances with metrics.")
-
-    taskInstances.values.foreach(_.registerMetrics)
+    info("Registering task instance metrics.")
+    reporters.values.foreach(reporter =>
+      taskInstanceMetrics.values.foreach(taskMetrics => 
reporter.register(taskMetrics.source, taskMetrics.registry)))
 
     info("Starting JVM metrics.")
 
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index d5cf6c6..326156b 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -51,8 +51,8 @@ class SamzaContainerMetrics(
 
   val exceptions = newListGauge[DiagnosticsExceptionEvent]("exceptions")
 
-  def addStoreRestorationGauge(taskName: TaskName, storeName: String) {
-    taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" 
format(taskName.toString, storeName), -1L))
+  def addStoresRestorationGauge(taskName: TaskName) {
+    taskStoreRestorationMetrics.put(taskName, newGauge("%s-restore-time" 
format(taskName.toString), -1L))
   }
 
 }
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 0c8102b..fa17f24 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -52,7 +52,6 @@ class TaskInstance(
   val offsetManager: OffsetManager = new OffsetManager,
   storageManager: TaskStorageManager = null,
   tableManager: TableManager = null,
-  reporters: Map[String, MetricsReporter] = Map(),
   val systemStreamPartitions: Set[SystemStreamPartition] = Set(),
   val exceptionHandler: TaskInstanceExceptionHandler = new 
TaskInstanceExceptionHandler,
   jobModel: JobModel = null,
@@ -105,12 +104,6 @@ class TaskInstance(
 
   val streamsToDeleteCommittedMessages: Set[String] = 
config.getStreamIds.filter(config.getDeleteCommittedMessages).map(config.getPhysicalName).toSet
 
-  def registerMetrics {
-    debug("Registering metrics for taskName: %s" format taskName)
-
-    reporters.values.foreach(_.register(metrics.source, metrics.registry))
-  }
-
   def registerOffsets {
     debug("Registering offsets for taskName: %s" format taskName)
     offsetManager.register(taskName, systemStreamPartitions)
diff --git 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index ad9637d..da61a35 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ 
b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -113,7 +113,8 @@ public class ContainerStorageManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(ContainerStorageManager.class);
   private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d";
   private static final String SIDEINPUTS_FLUSH_THREAD_NAME = "SideInputs Flush 
Thread";
-  private static final String SIDEINPUTS_METRICS_NAME = 
"samza-container-%s-sideinputs";
+  private static final String SIDEINPUTS_METRICS_PREFIX = "side-inputs-";
+  // We use a prefix to differentiate the SystemConsumersMetrics for 
side-inputs from the ones in SamzaContainer
 
   /** Maps containing relevant per-task objects */
   private final Map<TaskName, Map<String, StorageEngine>> taskStores;
@@ -215,7 +216,7 @@ public class ContainerStorageManager {
     this.storeConsumers = createStoreIndexedMap(this.changelogSystemStreams, 
storeSystemConsumers);
 
     // creating task restore managers
-    this.taskRestoreManagers = createTaskRestoreManagers(systemAdmins, clock);
+    this.taskRestoreManagers = createTaskRestoreManagers(systemAdmins, clock, 
this.samzaContainerMetrics);
 
     // create side input storage managers
     sideInputStorageManagers = createSideInputStorageManagers(clock);
@@ -229,15 +230,15 @@ public class ContainerStorageManager {
       scala.collection.immutable.Map<SystemStream, SystemStreamMetadata> 
inputStreamMetadata = 
streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet(
           
this.sideInputSystemStreams.values().stream().flatMap(Set::stream).collect(Collectors.toSet())).toSet(),
 false);
 
-      SystemConsumersMetrics systemConsumersMetrics = new 
SystemConsumersMetrics(
-          new MetricsRegistryMap(String.format(SIDEINPUTS_METRICS_NAME, 
containerModel.getId())));
+      SystemConsumersMetrics sideInputSystemConsumersMetrics = new 
SystemConsumersMetrics(samzaContainerMetrics.registry(), 
SIDEINPUTS_METRICS_PREFIX);
+      // we use the same registry as samza-container-metrics
 
       MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new 
RoundRobinChooserFactory(), config,
-          systemConsumersMetrics.registry(), systemAdmins);
+          sideInputSystemConsumersMetrics.registry(), systemAdmins);
 
       sideInputSystemConsumers =
           new SystemConsumers(chooser, 
ScalaJavaUtil.toScalaMap(this.sideInputConsumers), serdeManager,
-              systemConsumersMetrics, 
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), 
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
+              sideInputSystemConsumersMetrics, 
SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), 
SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(),
               SystemConsumers.DEFAULT_POLL_INTERVAL_MS(), 
ScalaJavaUtil.toScalaFunction(() -> System.nanoTime()));
     }
 
@@ -336,11 +337,12 @@ public class ContainerStorageManager {
     return storeConsumers;
   }
 
-  private Map<TaskName, TaskRestoreManager> 
createTaskRestoreManagers(SystemAdmins systemAdmins, Clock clock) {
+  private Map<TaskName, TaskRestoreManager> 
createTaskRestoreManagers(SystemAdmins systemAdmins, Clock clock, 
SamzaContainerMetrics samzaContainerMetrics) {
     Map<TaskName, TaskRestoreManager> taskRestoreManagers = new HashMap<>();
     containerModel.getTasks().forEach((taskName, taskModel) -> {
         taskRestoreManagers.put(taskName,
             new TaskRestoreManager(taskModel, changelogSystemStreams, 
getNonSideInputStores(taskName), systemAdmins, clock));
+        samzaContainerMetrics.addStoresRestorationGauge(taskName);
       });
     return taskRestoreManagers;
   }
@@ -573,7 +575,6 @@ public class ContainerStorageManager {
     return 
this.sideInputStorageManagers.values().stream().collect(Collectors.toSet());
   }
 
-
   public void start() throws SamzaException {
     restoreStores();
     if (sideInputsPresent()) {
diff --git 
a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
 
b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
index 43d381b..afdce08 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
@@ -19,12 +19,13 @@
 
 package org.apache.samza.system
 
-import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.metrics.Counter
 import org.apache.samza.metrics.MetricsHelper
+import org.apache.samza.metrics.ReadableMetricsRegistry
 
-class SystemConsumersMetrics(val registry: MetricsRegistry = new 
MetricsRegistryMap) extends MetricsHelper {
+class SystemConsumersMetrics(val registry: ReadableMetricsRegistry = new 
MetricsRegistryMap,
+  val prefix: String = "") extends MetricsHelper {
   val choseNull = newCounter("chose-null")
   val choseObject = newCounter("chose-object")
   val deserializationError = newCounter("deserialization error")
@@ -55,6 +56,8 @@ class SystemConsumersMetrics(val registry: MetricsRegistry = 
new MetricsRegistry
     }
   }
 
+  override def getPrefix: String = prefix
+
   def registerSystemStreamPartition(systemStreamPartition: 
SystemStreamPartition) {
     systemStreamMessagesChosen += systemStreamPartition -> 
newCounter("%s-%s-%d-messages-chosen" format (systemStreamPartition.getSystem, 
systemStreamPartition.getStream, 
systemStreamPartition.getPartition.getPartitionId))
   }
diff --git 
a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index acaecdb..48f8619 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -97,7 +97,6 @@ public class TestAsyncRunLoop {
         manager,
         null,
         null,
-        null,
         sspSet,
         new TaskInstanceExceptionHandler(taskInstanceMetrics, new 
scala.collection.immutable.HashSet<String>()),
         null,
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index d1f60bc..e75fe54 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -280,6 +280,7 @@ class TestSamzaContainer extends AssertionsForJUnit with 
MockitoSugar {
     this.samzaContainer = new SamzaContainer(
       this.config,
       Map(TASK_NAME -> this.taskInstance),
+      Map(TASK_NAME -> new TaskInstanceMetrics),
       this.runLoop,
       this.systemAdmins,
       this.consumerMultiplexer,
diff --git 
a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
 
b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
index 9bb485a..3ff651b 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
@@ -63,6 +63,7 @@ object StreamProcessorTestUtils {
     val container = new SamzaContainer(
       config = config,
       taskInstances = Map(taskName -> taskInstance),
+      taskInstanceMetrics = Map(taskName -> new TaskInstanceMetrics),
       runLoop = mockRunloop,
       systemAdmins = adminMultiplexer,
       consumerMultiplexer = consumerMultiplexer,

Reply via email to