Repository: samza
Updated Branches:
  refs/heads/master 13a38f28a -> 06488bf7a


SAMZA-1733: Adding comments, adding emptyness check to MetricsSnapshotReporter 
message

* Added comments for MetricsConfig
* Added simple emptyness check for MetricsSnapshotReporter

Author: rmath...@linkedin.com <rmath...@linkedin.com>

Reviewers: Prateek M<pmahe...@linkedin.com>

Closes #610 from rmatharu/bugfix-emptymessage


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/06488bf7
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/06488bf7
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/06488bf7

Branch: refs/heads/master
Commit: 06488bf7a7dfdfa6d26ed91f79b6c316460960b1
Parents: 13a38f2
Author: rmath...@linkedin.com <rmath...@linkedin.com>
Authored: Fri Aug 17 18:41:09 2018 -0700
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Fri Aug 17 18:41:09 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/config/MetricsConfig.scala | 11 +++---
 .../scala/org/apache/samza/job/JobRunner.scala  |  2 +-
 .../reporter/MetricsSnapshotReporter.scala      | 35 +++++++++++---------
 .../MetricsSnapshotReporterFactory.scala        |  6 ++--
 4 files changed, 30 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/06488bf7/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
index dab9527..95350cf 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala
@@ -27,9 +27,12 @@ object MetricsConfig {
   // metrics config constants
   val METRICS_REPORTERS = "metrics.reporters"
   val METRICS_REPORTER_FACTORY = "metrics.reporter.%s.class"
+  val METRICS_TIMER_ENABLED= "metrics.timer.enabled"
+
+  // The following configs are applicable only to {@link 
MetricsSnapshotReporter}
+  // added here only to maintain backwards compatibility of config
   val METRICS_SNAPSHOT_REPORTER_STREAM = "metrics.reporter.%s.stream"
   val METRICS_SNAPSHOT_REPORTER_INTERVAL= "metrics.reporter.%s.interval"
-  val METRICS_TIMER_ENABLED= "metrics.timer.enabled"
   val METRICS_SNAPSHOT_REPORTER_BLACKLIST = "metrics.reporter.%s.blacklist"
   val METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS = "diagnosticsreporter"
 
@@ -41,11 +44,11 @@ class MetricsConfig(config: Config) extends 
ScalaMapConfig(config) {
 
   def getMetricsFactoryClass(name: String): Option[String] = 
getOption(MetricsConfig.METRICS_REPORTER_FACTORY format name)
 
-  def getMetricsReporterStream(name: String): Option[String] = 
getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM format name)
+  def getMetricsSnapshotReporterStream(name: String): Option[String] = 
getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM format name)
 
-  def getMetricsReporterInterval(name: String): Option[String] = 
getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_INTERVAL format name)
+  def getMetricsSnapshotReporterInterval(name: String): Option[String] = 
getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_INTERVAL format name)
 
-  def getBlacklist(name: String): Option[String] = 
getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_BLACKLIST format name)
+  def getMetricsSnapshotReporterBlacklist(name: String): Option[String] = 
getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_BLACKLIST format name)
 
   /**
    * Returns a list of all metrics names from the config file. Useful for

http://git-wip-us.apache.org/repos/asf/samza/blob/06488bf7/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala 
b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 399aa14..bf4f252 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -122,7 +122,7 @@ class JobRunner(config: Config) extends Logging {
     if (new JobConfig(config).getDiagnosticsEnabled) {
       val DIAGNOSTICS_STREAM_ID = "samza-diagnostics-stream-id"
       val diagnosticsSystemStreamName = new MetricsConfig(config).
-        
getMetricsReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS).
+        
getMetricsSnapshotReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS).
         getOrElse(throw new ConfigException("Missing required config: " +
           String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM,
             MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS)))

http://git-wip-us.apache.org/repos/asf/samza/blob/06488bf7/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
 
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
index 95e6705..8c13840 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
@@ -132,31 +132,34 @@ class MetricsSnapshotReporter(
         }
 
         // dont emit empty groups
-        if(!groupMsg.isEmpty) {
+        if (!groupMsg.isEmpty) {
           metricsMsg.put(group, groupMsg)
         }
       })
 
-      val header = new MetricsHeader(jobName, jobId, containerName, 
execEnvironmentContainerId, source, version, samzaVersion, host, clock(), 
resetTime)
-      val metrics = new Metrics(metricsMsg)
+      // publish to Kafka only if the metricsMsg carries any metrics
+      if (!metricsMsg.isEmpty) {
+        val header = new MetricsHeader(jobName, jobId, containerName, 
execEnvironmentContainerId, source, version, samzaVersion, host, clock(), 
resetTime)
+        val metrics = new Metrics(metricsMsg)
 
-      debug("Flushing metrics for %s to %s with header and map: header=%s, 
map=%s." format (source, out, header.getAsMap, metrics.getAsMap))
+        debug("Flushing metrics for %s to %s with header and map: header=%s, 
map=%s." format(source, out, header.getAsMap, metrics.getAsMap))
 
-      val metricsSnapshot = new MetricsSnapshot(header, metrics)
-      val maybeSerialized = if (serializer != null) {
-        serializer.toBytes(metricsSnapshot)
-      } else {
-        metricsSnapshot
-      }
+        val metricsSnapshot = new MetricsSnapshot(header, metrics)
+        val maybeSerialized = if (serializer != null) {
+          serializer.toBytes(metricsSnapshot)
+        } else {
+          metricsSnapshot
+        }
 
-      try {
+        try {
 
-        producer.send(source, new OutgoingMessageEnvelope(out, host, null, 
maybeSerialized))
+          producer.send(source, new OutgoingMessageEnvelope(out, host, null, 
maybeSerialized))
 
-        // Always flush, since we don't want metrics to get batched up.
-        producer.flush(source)
-      } catch  {
-        case e: Exception => error("Exception when flushing metrics for source 
%s " format(source), e)
+          // Always flush, since we don't want metrics to get batched up.
+          producer.flush(source)
+        } catch {
+          case e: Exception => error("Exception when flushing metrics for 
source %s " format (source), e)
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/06488bf7/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
 
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
index 3fd66da..d1e6554 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
@@ -64,7 +64,7 @@ class MetricsSnapshotReporterFactory extends 
MetricsReporterFactory with Logging
       })
 
     val metricsSystemStreamName = config
-      .getMetricsReporterStream(name)
+      .getMetricsSnapshotReporterStream(name)
       .getOrElse(throw new SamzaException("No metrics stream defined in 
config."))
 
     val systemStream = 
StreamUtil.getSystemStreamFromNames(metricsSystemStreamName)
@@ -103,12 +103,12 @@ class MetricsSnapshotReporterFactory extends 
MetricsReporterFactory with Logging
     info("Got serde %s." format serde)
 
     val pollingInterval: Int = config
-      .getMetricsReporterInterval(name)
+      .getMetricsSnapshotReporterInterval(name)
       .getOrElse("60").toInt
 
     info("Setting polling interval to %d" format pollingInterval)
 
-    val blacklist = config.getBlacklist(name)
+    val blacklist = config.getMetricsSnapshotReporterBlacklist(name)
     info("Setting blacklist to %s" format blacklist)
 
     val reporter = new MetricsSnapshotReporter(

Reply via email to