This is an automated email from the ASF dual-hosted git repository. shanthoosh pushed a commit to branch 1.1.0 in repository https://gitbox.apache.org/repos/asf/samza.git
commit 655d60f5ed6a8b3cc6bace6b535744373736945a Author: Ray Matharu <rmath...@linkedin.com> AuthorDate: Fri Mar 8 12:21:20 2019 -0800 Bugfix: Recent CSM refactor was causing some metrics to not be emitted. Fixed <task>-restore-time metric. Author: Ray Matharu <rmath...@linkedin.com> Reviewers: prateekm Closes #942 from rmatharu/test-metricsfix and squashes the following commits: c5a072f4 [Ray Matharu] minor fix 3c1e25ad [Ray Matharu] minor b13b485a [Ray Matharu] minor 7e60ad8b [Ray Matharu] minor 7634a470 [Ray Matharu] removing CSM's registerMetrics 78ee37e1 [Ray Matharu] removing unused imports f207f03d [Ray Matharu] minor 8aec4fa7 [Ray Matharu] minor 0c121262 [Ray Matharu] Fixing metrics after moving sideInputs to CSM 8bd3b19f [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza 6b58c862 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza 89be3652 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza 6fe29268 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza 96e3d8f3 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza 40f68a61 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza 497602ab [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza 1a72dc48 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza 36c0b339 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza 12ca96bb [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza ee7daac8 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza 08006871 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza 916f66ae [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza 2c09b081 [Ray Matharu] Rocksdb bug fix --- .../org/apache/samza/container/SamzaContainer.scala | 9 +++++---- .../apache/samza/container/SamzaContainerMetrics.scala | 4 ++-- .../scala/org/apache/samza/container/TaskInstance.scala | 7 ------- .../apache/samza/storage/ContainerStorageManager.java | 17 +++++++++-------- .../apache/samza/system/SystemConsumersMetrics.scala | 7 +++++-- .../java/org/apache/samza/task/TestAsyncRunLoop.java | 1 - .../org/apache/samza/container/TestSamzaContainer.scala | 1 + .../samza/processor/StreamProcessorTestUtils.scala | 1 + 8 files changed, 23 insertions(+), 24 deletions(-) diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 5df4678..ab89396 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -594,7 +594,6 @@ object SamzaContainer extends Logging { offsetManager = offsetManager, storageManager = storageManager, tableManager = tableManager, - reporters = reporters, systemStreamPartitions = taskSSPs -- taskSideInputSSPs, exceptionHandler = TaskInstanceExceptionHandler(taskInstanceMetrics.get(taskName).get, config), jobModel = jobModel, @@ -663,6 +662,7 @@ object SamzaContainer extends Logging { new SamzaContainer( config = config, taskInstances = taskInstances, + taskInstanceMetrics = taskInstanceMetrics, runLoop = runLoop, systemAdmins = systemAdmins, consumerMultiplexer = consumerMultiplexer, @@ -700,6 +700,7 @@ object SamzaContainer extends Logging { class SamzaContainer( config: Config, taskInstances: Map[TaskName, TaskInstance], + taskInstanceMetrics: Map[TaskName, TaskInstanceMetrics], runLoop: Runnable, systemAdmins: SystemAdmins, consumerMultiplexer: SystemConsumers, @@ -879,9 +880,9 @@ class SamzaContainer( } def startMetrics { - info("Registering task instances with metrics.") - - taskInstances.values.foreach(_.registerMetrics) + info("Registering task instance metrics.") + reporters.values.foreach(reporter => + taskInstanceMetrics.values.foreach(taskMetrics => reporter.register(taskMetrics.source, taskMetrics.registry))) info("Starting JVM metrics.") diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala index d5cf6c6..326156b 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala @@ -51,8 +51,8 @@ class SamzaContainerMetrics( val exceptions = newListGauge[DiagnosticsExceptionEvent]("exceptions") - def addStoreRestorationGauge(taskName: TaskName, storeName: String) { - taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" format(taskName.toString, storeName), -1L)) + def addStoresRestorationGauge(taskName: TaskName) { + taskStoreRestorationMetrics.put(taskName, newGauge("%s-restore-time" format(taskName.toString), -1L)) } } diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index 0c8102b..fa17f24 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -52,7 +52,6 @@ class TaskInstance( val offsetManager: OffsetManager = new OffsetManager, storageManager: TaskStorageManager = null, tableManager: TableManager = null, - reporters: Map[String, MetricsReporter] = Map(), val systemStreamPartitions: Set[SystemStreamPartition] = Set(), val exceptionHandler: TaskInstanceExceptionHandler = new TaskInstanceExceptionHandler, jobModel: JobModel = null, @@ -105,12 +104,6 @@ class TaskInstance( val streamsToDeleteCommittedMessages: Set[String] = config.getStreamIds.filter(config.getDeleteCommittedMessages).map(config.getPhysicalName).toSet - def registerMetrics { - debug("Registering metrics for taskName: %s" format taskName) - - reporters.values.foreach(_.register(metrics.source, metrics.registry)) - } - def registerOffsets { debug("Registering offsets for taskName: %s" format taskName) offsetManager.register(taskName, systemStreamPartitions) diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java index ad9637d..da61a35 100644 --- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java +++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java @@ -113,7 +113,8 @@ public class ContainerStorageManager { private static final Logger LOG = LoggerFactory.getLogger(ContainerStorageManager.class); private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d"; private static final String SIDEINPUTS_FLUSH_THREAD_NAME = "SideInputs Flush Thread"; - private static final String SIDEINPUTS_METRICS_NAME = "samza-container-%s-sideinputs"; + private static final String SIDEINPUTS_METRICS_PREFIX = "side-inputs-"; + // We use a prefix to differentiate the SystemConsumersMetrics for side-inputs from the ones in SamzaContainer /** Maps containing relevant per-task objects */ private final Map<TaskName, Map<String, StorageEngine>> taskStores; @@ -215,7 +216,7 @@ public class ContainerStorageManager { this.storeConsumers = createStoreIndexedMap(this.changelogSystemStreams, storeSystemConsumers); // creating task restore managers - this.taskRestoreManagers = createTaskRestoreManagers(systemAdmins, clock); + this.taskRestoreManagers = createTaskRestoreManagers(systemAdmins, clock, this.samzaContainerMetrics); // create side input storage managers sideInputStorageManagers = createSideInputStorageManagers(clock); @@ -229,15 +230,15 @@ public class ContainerStorageManager { scala.collection.immutable.Map<SystemStream, SystemStreamMetadata> inputStreamMetadata = streamMetadataCache.getStreamMetadata(JavaConversions.asScalaSet( this.sideInputSystemStreams.values().stream().flatMap(Set::stream).collect(Collectors.toSet())).toSet(), false); - SystemConsumersMetrics systemConsumersMetrics = new SystemConsumersMetrics( - new MetricsRegistryMap(String.format(SIDEINPUTS_METRICS_NAME, containerModel.getId()))); + SystemConsumersMetrics sideInputSystemConsumersMetrics = new SystemConsumersMetrics(samzaContainerMetrics.registry(), SIDEINPUTS_METRICS_PREFIX); + // we use the same registry as samza-container-metrics MessageChooser chooser = DefaultChooser.apply(inputStreamMetadata, new RoundRobinChooserFactory(), config, - systemConsumersMetrics.registry(), systemAdmins); + sideInputSystemConsumersMetrics.registry(), systemAdmins); sideInputSystemConsumers = new SystemConsumers(chooser, ScalaJavaUtil.toScalaMap(this.sideInputConsumers), serdeManager, - systemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(), + sideInputSystemConsumersMetrics, SystemConsumers.DEFAULT_NO_NEW_MESSAGES_TIMEOUT(), SystemConsumers.DEFAULT_DROP_SERIALIZATION_ERROR(), SystemConsumers.DEFAULT_POLL_INTERVAL_MS(), ScalaJavaUtil.toScalaFunction(() -> System.nanoTime())); } @@ -336,11 +337,12 @@ public class ContainerStorageManager { return storeConsumers; } - private Map<TaskName, TaskRestoreManager> createTaskRestoreManagers(SystemAdmins systemAdmins, Clock clock) { + private Map<TaskName, TaskRestoreManager> createTaskRestoreManagers(SystemAdmins systemAdmins, Clock clock, SamzaContainerMetrics samzaContainerMetrics) { Map<TaskName, TaskRestoreManager> taskRestoreManagers = new HashMap<>(); containerModel.getTasks().forEach((taskName, taskModel) -> { taskRestoreManagers.put(taskName, new TaskRestoreManager(taskModel, changelogSystemStreams, getNonSideInputStores(taskName), systemAdmins, clock)); + samzaContainerMetrics.addStoresRestorationGauge(taskName); }); return taskRestoreManagers; } @@ -573,7 +575,6 @@ public class ContainerStorageManager { return this.sideInputStorageManagers.values().stream().collect(Collectors.toSet()); } - public void start() throws SamzaException { restoreStores(); if (sideInputsPresent()) { diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala index 43d381b..afdce08 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala @@ -19,12 +19,13 @@ package org.apache.samza.system -import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.metrics.Counter import org.apache.samza.metrics.MetricsHelper +import org.apache.samza.metrics.ReadableMetricsRegistry -class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistryMap) extends MetricsHelper { +class SystemConsumersMetrics(val registry: ReadableMetricsRegistry = new MetricsRegistryMap, + val prefix: String = "") extends MetricsHelper { val choseNull = newCounter("chose-null") val choseObject = newCounter("chose-object") val deserializationError = newCounter("deserialization error") @@ -55,6 +56,8 @@ class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistry } } + override def getPrefix: String = prefix + def registerSystemStreamPartition(systemStreamPartition: SystemStreamPartition) { systemStreamMessagesChosen += systemStreamPartition -> newCounter("%s-%s-%d-messages-chosen" format (systemStreamPartition.getSystem, systemStreamPartition.getStream, systemStreamPartition.getPartition.getPartitionId)) } diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java index acaecdb..48f8619 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java @@ -97,7 +97,6 @@ public class TestAsyncRunLoop { manager, null, null, - null, sspSet, new TaskInstanceExceptionHandler(taskInstanceMetrics, new scala.collection.immutable.HashSet<String>()), null, diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index d1f60bc..e75fe54 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -280,6 +280,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar { this.samzaContainer = new SamzaContainer( this.config, Map(TASK_NAME -> this.taskInstance), + Map(TASK_NAME -> new TaskInstanceMetrics), this.runLoop, this.systemAdmins, this.consumerMultiplexer, diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala index 9bb485a..3ff651b 100644 --- a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala +++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala @@ -63,6 +63,7 @@ object StreamProcessorTestUtils { val container = new SamzaContainer( config = config, taskInstances = Map(taskName -> taskInstance), + taskInstanceMetrics = Map(taskName -> new TaskInstanceMetrics), runLoop = mockRunloop, systemAdmins = adminMultiplexer, consumerMultiplexer = consumerMultiplexer,