Repository: samza Updated Branches: refs/heads/master 13a38f28a -> 06488bf7a
SAMZA-1733: Adding comments, adding emptyness check to MetricsSnapshotReporter message * Added comments for MetricsConfig * Added simple emptyness check for MetricsSnapshotReporter Author: rmath...@linkedin.com <rmath...@linkedin.com> Reviewers: Prateek M<pmahe...@linkedin.com> Closes #610 from rmatharu/bugfix-emptymessage Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/06488bf7 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/06488bf7 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/06488bf7 Branch: refs/heads/master Commit: 06488bf7a7dfdfa6d26ed91f79b6c316460960b1 Parents: 13a38f2 Author: rmath...@linkedin.com <rmath...@linkedin.com> Authored: Fri Aug 17 18:41:09 2018 -0700 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Fri Aug 17 18:41:09 2018 -0700 ---------------------------------------------------------------------- .../org/apache/samza/config/MetricsConfig.scala | 11 +++--- .../scala/org/apache/samza/job/JobRunner.scala | 2 +- .../reporter/MetricsSnapshotReporter.scala | 35 +++++++++++--------- .../MetricsSnapshotReporterFactory.scala | 6 ++-- 4 files changed, 30 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/06488bf7/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala index dab9527..95350cf 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/MetricsConfig.scala @@ -27,9 +27,12 @@ object MetricsConfig { // metrics config constants val METRICS_REPORTERS = "metrics.reporters" val METRICS_REPORTER_FACTORY = "metrics.reporter.%s.class" + val METRICS_TIMER_ENABLED= "metrics.timer.enabled" + + // The following configs are applicable only to {@link MetricsSnapshotReporter} + // added here only to maintain backwards compatibility of config val METRICS_SNAPSHOT_REPORTER_STREAM = "metrics.reporter.%s.stream" val METRICS_SNAPSHOT_REPORTER_INTERVAL= "metrics.reporter.%s.interval" - val METRICS_TIMER_ENABLED= "metrics.timer.enabled" val METRICS_SNAPSHOT_REPORTER_BLACKLIST = "metrics.reporter.%s.blacklist" val METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS = "diagnosticsreporter" @@ -41,11 +44,11 @@ class MetricsConfig(config: Config) extends ScalaMapConfig(config) { def getMetricsFactoryClass(name: String): Option[String] = getOption(MetricsConfig.METRICS_REPORTER_FACTORY format name) - def getMetricsReporterStream(name: String): Option[String] = getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM format name) + def getMetricsSnapshotReporterStream(name: String): Option[String] = getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM format name) - def getMetricsReporterInterval(name: String): Option[String] = getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_INTERVAL format name) + def getMetricsSnapshotReporterInterval(name: String): Option[String] = getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_INTERVAL format name) - def getBlacklist(name: String): Option[String] = getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_BLACKLIST format name) + def getMetricsSnapshotReporterBlacklist(name: String): Option[String] = getOption(MetricsConfig.METRICS_SNAPSHOT_REPORTER_BLACKLIST format name) /** * Returns a list of all metrics names from the config file. Useful for http://git-wip-us.apache.org/repos/asf/samza/blob/06488bf7/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala index 399aa14..bf4f252 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala @@ -122,7 +122,7 @@ class JobRunner(config: Config) extends Logging { if (new JobConfig(config).getDiagnosticsEnabled) { val DIAGNOSTICS_STREAM_ID = "samza-diagnostics-stream-id" val diagnosticsSystemStreamName = new MetricsConfig(config). - getMetricsReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS). + getMetricsSnapshotReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS). getOrElse(throw new ConfigException("Missing required config: " + String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM, MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS))) http://git-wip-us.apache.org/repos/asf/samza/blob/06488bf7/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala index 95e6705..8c13840 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala @@ -132,31 +132,34 @@ class MetricsSnapshotReporter( } // dont emit empty groups - if(!groupMsg.isEmpty) { + if (!groupMsg.isEmpty) { metricsMsg.put(group, groupMsg) } }) - val header = new MetricsHeader(jobName, jobId, containerName, execEnvironmentContainerId, source, version, samzaVersion, host, clock(), resetTime) - val metrics = new Metrics(metricsMsg) + // publish to Kafka only if the metricsMsg carries any metrics + if (!metricsMsg.isEmpty) { + val header = new MetricsHeader(jobName, jobId, containerName, execEnvironmentContainerId, source, version, samzaVersion, host, clock(), resetTime) + val metrics = new Metrics(metricsMsg) - debug("Flushing metrics for %s to %s with header and map: header=%s, map=%s." format (source, out, header.getAsMap, metrics.getAsMap)) + debug("Flushing metrics for %s to %s with header and map: header=%s, map=%s." format(source, out, header.getAsMap, metrics.getAsMap)) - val metricsSnapshot = new MetricsSnapshot(header, metrics) - val maybeSerialized = if (serializer != null) { - serializer.toBytes(metricsSnapshot) - } else { - metricsSnapshot - } + val metricsSnapshot = new MetricsSnapshot(header, metrics) + val maybeSerialized = if (serializer != null) { + serializer.toBytes(metricsSnapshot) + } else { + metricsSnapshot + } - try { + try { - producer.send(source, new OutgoingMessageEnvelope(out, host, null, maybeSerialized)) + producer.send(source, new OutgoingMessageEnvelope(out, host, null, maybeSerialized)) - // Always flush, since we don't want metrics to get batched up. - producer.flush(source) - } catch { - case e: Exception => error("Exception when flushing metrics for source %s " format(source), e) + // Always flush, since we don't want metrics to get batched up. + producer.flush(source) + } catch { + case e: Exception => error("Exception when flushing metrics for source %s " format (source), e) + } } } http://git-wip-us.apache.org/repos/asf/samza/blob/06488bf7/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala index 3fd66da..d1e6554 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala @@ -64,7 +64,7 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging }) val metricsSystemStreamName = config - .getMetricsReporterStream(name) + .getMetricsSnapshotReporterStream(name) .getOrElse(throw new SamzaException("No metrics stream defined in config.")) val systemStream = StreamUtil.getSystemStreamFromNames(metricsSystemStreamName) @@ -103,12 +103,12 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging info("Got serde %s." format serde) val pollingInterval: Int = config - .getMetricsReporterInterval(name) + .getMetricsSnapshotReporterInterval(name) .getOrElse("60").toInt info("Setting polling interval to %d" format pollingInterval) - val blacklist = config.getBlacklist(name) + val blacklist = config.getMetricsSnapshotReporterBlacklist(name) info("Setting blacklist to %s" format blacklist) val reporter = new MetricsSnapshotReporter(