This is an automated email from the ASF dual-hosted git repository.

jagadish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b0c20a  SAMZA-2251: Minor diagnostics manager change to emit 
additional job details
6b0c20a is described below

commit 6b0c20ae63f9f15da89f35dbb15189300cfab371
Author: Ray Matharu <rmath...@linkedin.com>
AuthorDate: Mon Jun 24 17:40:28 2019 -0700

    SAMZA-2251: Minor diagnostics manager change to emit additional job details
    
    Author: Ray Matharu <rmath...@linkedin.com>
    
    Reviewers: Cameron Lee<ca...@linkedin.com>
    
    Closes #1083 from rmatharu/test-diagnostics-improvements
---
 .../clustermanager/ContainerProcessManager.java    |   8 +-
 .../org/apache/samza/config/StorageConfig.java     |   8 +
 .../apache/samza/processor/StreamProcessor.java    |   2 +-
 .../apache/samza/runtime/ContainerLaunchUtil.java  |   2 +-
 .../org/apache/samza/util/DiagnosticsUtil.java     |  35 ++-
 .../samza/diagnostics/DiagnosticsManager.java      | 118 ++++++---
 .../diagnostics/DiagnosticsStreamMessage.java      | 284 +++++++++++++++++++++
 .../samza/diagnostics/ProcessorStopEvent.java      |  63 +++++
 .../samza/diagnostics/TestDiagnosticsManager.java  | 245 ++++++++++++++++++
 .../diagnostics/TestDiagnosticsStreamMessage.java  | 148 +++++++++++
 10 files changed, 862 insertions(+), 51 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
index f75e217..b2cf6b9 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
@@ -146,7 +146,7 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
     String jobId = new JobConfig(config).getJobId();
     Optional<String> execEnvContainerId = 
Optional.ofNullable(System.getenv(EXEC_ENV_CONTAINER_ID_SYS_PROPERTY));
     Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> 
diagnosticsManagerReporterPair =
-        DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, 
METRICS_SOURCE_NAME, execEnvContainerId, config);
+        DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, 
state.jobModelManager.jobModel(), METRICS_SOURCE_NAME, execEnvContainerId, 
config);
 
     if (diagnosticsManagerReporterPair.isPresent()) {
       diagnosticsManager = 
Option.apply(diagnosticsManagerReporterPair.get().getKey());
@@ -308,11 +308,13 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
   public void onResourceCompleted(SamzaResourceStatus resourceStatus) {
     String containerId = resourceStatus.getContainerId();
     String processorId = null;
+    String hostName = null;
     for (Map.Entry<String, SamzaResource> entry: 
state.runningProcessors.entrySet()) {
       if 
(entry.getValue().getContainerId().equals(resourceStatus.getContainerId())) {
         log.info("Container ID: {} matched running Processor ID: {} on host: 
{}", containerId, entry.getKey(), entry.getValue().getHost());
 
         processorId = entry.getKey();
+        hostName = entry.getValue().getHost();
         break;
       }
     }
@@ -435,6 +437,10 @@ public class ContainerProcessManager implements 
ClusterResourceManager.Callback
         }
 
     }
+
+    if (this.diagnosticsManager.isDefined()) {
+      this.diagnosticsManager.get().addProcessorStopEvent(processorId, 
resourceStatus.getContainerId(), hostName, exitStatus);
+    }
   }
 
   @Override
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index bc38e56..32e0d8e 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -205,4 +205,12 @@ public class StorageConfig extends MapConfig {
     Config subConfig = subset(STORE_PREFIX, true);
     return subConfig.keySet().stream().anyMatch(key -> 
key.endsWith(CHANGELOG_SUFFIX));
   }
+
+  /**
+   * Helper method to get the number of stores configured with a changelog.
+   */
+  public int getNumStoresWithChangelog() {
+    Config subConfig = subset(STORE_PREFIX, true);
+    return new Long(subConfig.keySet().stream().filter(key -> 
key.endsWith(CHANGELOG_SUFFIX)).count()).intValue();
+  }
 }
diff --git 
a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index c90c8f4..75dc62d 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -341,7 +341,7 @@ public class StreamProcessor {
     String jobName = new JobConfig(config).getName().get();
     String jobId = new JobConfig(config).getJobId();
     Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> 
diagnosticsManagerReporterPair =
-        DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, processorId, 
Optional.empty(), config);
+        DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, 
processorId, Optional.empty(), config);
     Option<DiagnosticsManager> diagnosticsManager = Option.empty();
     if (diagnosticsManagerReporterPair.isPresent()) {
       diagnosticsManager = 
Option.apply(diagnosticsManagerReporterPair.get().getKey());
diff --git 
a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java 
b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
index e748dd2..c84a4a1 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java
@@ -99,7 +99,7 @@ public class ContainerLaunchUtil {
       Map<String, MetricsReporter> metricsReporters = 
loadMetricsReporters(appDesc, containerId, config);
 
       // Creating diagnostics manager and reporter, and wiring it respectively
-      Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> 
diagnosticsManagerReporterPair = 
DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, containerId, 
execEnvContainerId, config);
+      Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> 
diagnosticsManagerReporterPair = 
DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, 
execEnvContainerId, config);
       Option<DiagnosticsManager> diagnosticsManager = Option.empty();
       if (diagnosticsManagerReporterPair.isPresent()) {
         diagnosticsManager = 
Option.apply(diagnosticsManagerReporterPair.get().getKey());
diff --git 
a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java 
b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index c290e6c..637c01b 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -24,13 +24,16 @@ import java.util.Optional;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MetricsConfig;
+import org.apache.samza.config.StorageConfig;
 import org.apache.samza.config.SystemConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.diagnostics.DiagnosticsManager;
+import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.reporter.Metrics;
 import org.apache.samza.metrics.reporter.MetricsHeader;
@@ -50,7 +53,6 @@ import scala.Option;
 public class DiagnosticsUtil {
   private static final Logger log = 
LoggerFactory.getLogger(DiagnosticsUtil.class);
 
-
   // Write a file in the samza.log.dir named {exec-env-container-id}.metadata 
that contains
   // metadata about the container such as containerId, jobName, jobId, 
hostname, timestamp, version info, and others.
   // The file contents are serialized using {@link JsonSerde}.
@@ -61,9 +63,9 @@ public class DiagnosticsUtil {
 
     if (metadataFile.isDefined()) {
       MetricsHeader metricsHeader =
-          new MetricsHeader(jobName, jobId, "samza-container-" + containerId, 
execEnvContainerId.orElse(""), LocalContainerRunner.class.getName(),
-              Util.getTaskClassVersion(config), Util.getSamzaVersion(), 
Util.getLocalHost().getHostName(),
-              System.currentTimeMillis(), System.currentTimeMillis());
+          new MetricsHeader(jobName, jobId, "samza-container-" + containerId, 
execEnvContainerId.orElse(""),
+              LocalContainerRunner.class.getName(), 
Util.getTaskClassVersion(config), Util.getSamzaVersion(),
+              Util.getLocalHost().getHostName(), System.currentTimeMillis(), 
System.currentTimeMillis());
 
       class MetadataFileContents {
         public final String version;
@@ -76,26 +78,30 @@ public class DiagnosticsUtil {
       }
 
       MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new 
Metrics());
-      MetadataFileContents metadataFileContents = new 
MetadataFileContents("1", new String(new 
MetricsSnapshotSerdeV2().toBytes(metricsSnapshot)));
+      MetadataFileContents metadataFileContents =
+          new MetadataFileContents("1", new String(new 
MetricsSnapshotSerdeV2().toBytes(metricsSnapshot)));
       FileUtil.writeToTextFile(metadataFile.get(), new String(new 
JsonSerde<>().toBytes(metadataFileContents)), false);
     } else {
       log.info("Skipping writing metadata file.");
     }
   }
 
-
   /**
    * Create a pair of DiagnosticsManager and Reporter for the given jobName, 
jobId, containerId, and execEnvContainerId,
    * if diagnostics is enabled.
    * execEnvContainerId is the ID assigned to the container by the cluster 
manager (e.g., YARN).
    */
-  public static Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> 
buildDiagnosticsManager(String jobName, String jobId,
-      String containerId, Optional<String> execEnvContainerId, Config config) {
+  public static Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> 
buildDiagnosticsManager(String jobName,
+      String jobId, JobModel jobModel, String containerId, Optional<String> 
execEnvContainerId, Config config) {
 
     Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> 
diagnosticsManagerReporterPair = Optional.empty();
 
     if (new JobConfig(config).getDiagnosticsEnabled()) {
 
+      ClusterManagerConfig clusterManagerConfig = new 
ClusterManagerConfig(config);
+      int containerMemoryMb = clusterManagerConfig.getContainerMemoryMb();
+      int containerNumCores = clusterManagerConfig.getNumCores();
+
       // Diagnostic stream, producer, and reporter related parameters
       String diagnosticsReporterName = 
MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS;
       MetricsConfig metricsConfig = new MetricsConfig(config);
@@ -111,16 +117,21 @@ public class DiagnosticsUtil {
 
       SystemStream diagnosticsSystemStream = 
StreamUtil.getSystemStreamFromNames(diagnosticsReporterStreamName.get());
 
-      Optional<String> diagnosticsSystemFactoryName = new 
SystemConfig(config).getSystemFactory(diagnosticsSystemStream.getSystem());
+      Optional<String> diagnosticsSystemFactoryName =
+          new 
SystemConfig(config).getSystemFactory(diagnosticsSystemStream.getSystem());
       if (!diagnosticsSystemFactoryName.isPresent()) {
         throw new SamzaException("Missing factory in config for system " + 
diagnosticsSystemStream.getSystem());
       }
 
       // Create a systemProducer for giving to diagnostic-reporter and 
diagnosticsManager
       SystemFactory systemFactory = 
Util.getObj(diagnosticsSystemFactoryName.get(), SystemFactory.class);
-      SystemProducer systemProducer = 
systemFactory.getProducer(diagnosticsSystemStream.getSystem(), config, new 
MetricsRegistryMap());
-      DiagnosticsManager diagnosticsManager = new DiagnosticsManager(jobName, 
jobId, containerId, execEnvContainerId.orElse(""), taskClassVersion,
-          samzaVersion, hostName, diagnosticsSystemStream, systemProducer, 
Duration.ofMillis(new TaskConfig(config).getShutdownMs()));
+      SystemProducer systemProducer =
+          systemFactory.getProducer(diagnosticsSystemStream.getSystem(), 
config, new MetricsRegistryMap());
+      DiagnosticsManager diagnosticsManager =
+          new DiagnosticsManager(jobName, jobId, jobModel.getContainers(), 
containerMemoryMb, containerNumCores,
+              new StorageConfig(config).getNumStoresWithChangelog(), 
containerId, execEnvContainerId.orElse(""),
+              taskClassVersion, samzaVersion, hostName, 
diagnosticsSystemStream, systemProducer,
+              Duration.ofMillis(new TaskConfig(config).getShutdownMs()));
 
       Option<String> blacklist = 
ScalaJavaUtil.JavaOptionals$.MODULE$.toRichOptional(
           
metricsConfig.getMetricsSnapshotReporterBlacklist(diagnosticsReporterName)).toOption();
diff --git 
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
 
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
index daaefe4..aa41940 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
+++ 
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
@@ -19,25 +19,23 @@
 
 package org.apache.samza.diagnostics;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.lang.reflect.InvocationTargetException;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.Collection;
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import org.apache.samza.metrics.reporter.Metrics;
-import org.apache.samza.metrics.reporter.MetricsHeader;
-import org.apache.samza.metrics.reporter.MetricsSnapshot;
+import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStream;
-import org.apache.samza.util.DiagnosticsUtil;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,8 +53,6 @@ public class DiagnosticsManager {
   // Period size for pushing data to the diagnostic stream
 
   private static final String PUBLISH_THREAD_NAME = "DiagnosticsManager 
Thread-%d";
-  private static final String METRICS_GROUP_NAME = 
"org.apache.samza.container.SamzaContainerMetrics";
-  // Using SamzaContainerMetrics as the group name to maintain compatibility 
with existing diagnostics
 
   // Parameters used for populating the MetricHeader when sending 
diagnostic-stream messages
   private final String jobName;
@@ -68,31 +64,58 @@ public class DiagnosticsManager {
   private final String hostname;
   private final Instant resetTime;
 
-  private SystemProducer systemProducer; // SystemProducer for writing 
diagnostics data
-  private BoundedList<DiagnosticsExceptionEvent> exceptions; // A BoundedList 
for storing DiagnosticExceptionEvent
+  // Job-related params
+  private final Integer containerMemoryMb;
+  private final Integer containerNumCores;
+  private final Integer numStoresWithChangelog;
+  private final Map<String, ContainerModel> containerModels;
+  private boolean jobParamsEmitted = false;
+
+  private final SystemProducer systemProducer; // SystemProducer for writing 
diagnostics data
+  private final BoundedList<DiagnosticsExceptionEvent> exceptions; // A 
BoundedList for storing DiagnosticExceptionEvent
+  private final ConcurrentLinkedQueue<ProcessorStopEvent> processorStopEvents;
+  // A BoundedList for storing DiagnosticExceptionEvent
   private final ScheduledExecutorService scheduler; // Scheduler for pushing 
data to the diagnostic stream
   private final Duration terminationDuration; // duration to wait when 
terminating the scheduler
   private final SystemStream diagnosticSystemStream;
 
-  public DiagnosticsManager(String jobName, String jobId, String containerId, 
String executionEnvContainerId,
-      String taskClassVersion, String samzaVersion, String hostname, 
SystemStream diagnosticSystemStream,
-      SystemProducer systemProducer, Duration terminationDuration) {
+  public DiagnosticsManager(String jobName, String jobId, Map<String, 
ContainerModel> containerModels,
+      Integer containerMemoryMb, Integer containerNumCores, Integer 
numStoresWithChangelog, String containerId,
+      String executionEnvContainerId, String taskClassVersion, String 
samzaVersion, String hostname,
+      SystemStream diagnosticSystemStream, SystemProducer systemProducer, 
Duration terminationDuration) {
+
+    this(jobName, jobId, containerModels, containerMemoryMb, 
containerNumCores, numStoresWithChangelog, containerId,
+        executionEnvContainerId, taskClassVersion, samzaVersion, hostname, 
diagnosticSystemStream, systemProducer,
+        terminationDuration, Executors.newSingleThreadScheduledExecutor(
+            new 
ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()));
+  }
+
+  @VisibleForTesting
+  DiagnosticsManager(String jobName, String jobId, Map<String, ContainerModel> 
containerModels,
+      int containerMemoryMb, int containerNumCores, int 
numStoresWithChangelog, String containerId,
+      String executionEnvContainerId, String taskClassVersion, String 
samzaVersion, String hostname,
+      SystemStream diagnosticSystemStream, SystemProducer systemProducer, 
Duration terminationDuration,
+      ScheduledExecutorService executorService) {
     this.jobName = jobName;
     this.jobId = jobId;
+    this.containerModels = containerModels;
+    this.containerMemoryMb = containerMemoryMb;
+    this.containerNumCores = containerNumCores;
+    this.numStoresWithChangelog = numStoresWithChangelog;
     this.containerId = containerId;
     this.executionEnvContainerId = executionEnvContainerId;
     this.taskClassVersion = taskClassVersion;
     this.samzaVersion = samzaVersion;
     this.hostname = hostname;
-    resetTime = Instant.now();
-
-    this.systemProducer = systemProducer;
     this.diagnosticSystemStream = diagnosticSystemStream;
+    this.systemProducer = systemProducer;
+    this.terminationDuration = terminationDuration;
 
+    this.processorStopEvents = new ConcurrentLinkedQueue<>();
     this.exceptions = new BoundedList<>("exceptions"); // Create a BoundedList 
with default size and time parameters
-    this.scheduler = Executors.newSingleThreadScheduledExecutor(
-        new 
ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build());
-    this.terminationDuration = terminationDuration;
+    this.scheduler = executorService;
+
+    resetTime = Instant.now();
 
     try {
 
@@ -141,36 +164,59 @@ public class DiagnosticsManager {
     this.exceptions.add(diagnosticsExceptionEvent);
   }
 
+  public void addProcessorStopEvent(String processorId, String resourceId, 
String host, int exitStatus) {
+    this.processorStopEvents.add(new ProcessorStopEvent(processorId, 
resourceId, host, exitStatus));
+    LOG.info("Added stop event for Container Id: {}, resource Id: {}, host: 
{}, exitStatus: {}", processorId,
+        resourceId, host, exitStatus);
+  }
+
   private class DiagnosticsStreamPublisher implements Runnable {
 
     @Override
     public void run() {
-      // Publish exception events if there are any
-      Collection<DiagnosticsExceptionEvent> exceptionList = 
exceptions.getValues();
+      try {
+        DiagnosticsStreamMessage diagnosticsStreamMessage =
+            new DiagnosticsStreamMessage(jobName, jobId, "samza-container-" + 
containerId, executionEnvContainerId,
+                taskClassVersion, samzaVersion, hostname, 
System.currentTimeMillis(), resetTime.toEpochMilli());
+
+        // Add job-related params to the message (if not already published)
+        if (!jobParamsEmitted) {
+          diagnosticsStreamMessage.addContainerMb(containerMemoryMb);
+          diagnosticsStreamMessage.addContainerNumCores(containerNumCores);
+          
diagnosticsStreamMessage.addNumStoresWithChangelog(numStoresWithChangelog);
+          diagnosticsStreamMessage.addContainerModels(containerModels);
+        }
 
-      if (!exceptionList.isEmpty()) {
+        // Add stop event list to the message
+        diagnosticsStreamMessage.addProcessorStopEvents(new 
ArrayList(processorStopEvents));
 
-        // Create the metricHeader
-        MetricsHeader metricsHeader = new MetricsHeader(jobName, jobId, 
"samza-container-" + containerId, executionEnvContainerId,
-            DiagnosticsUtil.class.getName(), taskClassVersion, samzaVersion, 
hostname, System.currentTimeMillis(),
-            resetTime.toEpochMilli());
+        // Add exception events to the message
+        
diagnosticsStreamMessage.addDiagnosticsExceptionEvents(exceptions.getValues());
 
-        Map<String, Map<String, Object>> metricsMessage = new HashMap<>();
-        metricsMessage.putIfAbsent(METRICS_GROUP_NAME, new HashMap<>());
-        metricsMessage.get(METRICS_GROUP_NAME).put(exceptions.getName(), 
exceptionList);
-        MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, 
new Metrics(metricsMessage));
+        if (!diagnosticsStreamMessage.isEmpty()) {
 
-        try {
           systemProducer.send(DiagnosticsManager.class.getName(),
-              new OutgoingMessageEnvelope(diagnosticSystemStream, 
metricsHeader.getHost(), null,
-                  new MetricsSnapshotSerdeV2().toBytes(metricsSnapshot)));
+              new OutgoingMessageEnvelope(diagnosticSystemStream, hostname, 
null,
+                  new 
MetricsSnapshotSerdeV2().toBytes(diagnosticsStreamMessage.convertToMetricsSnapshot())));
+          systemProducer.flush(DiagnosticsManager.class.getName());
+
+          // Remove stop events from list after successful publish
+          if (diagnosticsStreamMessage.getProcessorStopEvents() != null) {
+            
processorStopEvents.removeAll(diagnosticsStreamMessage.getProcessorStopEvents());
+          }
 
           // Remove exceptions from list after successful publish to 
diagnostics stream
-          exceptions.remove(exceptionList);
-        } catch (Exception e) {
-          LOG.error("Exception when flushing exceptions", e);
+          if (diagnosticsStreamMessage.getExceptionEvents() != null) {
+            exceptions.remove(diagnosticsStreamMessage.getExceptionEvents());
+          }
+
+          // Emit jobParams once
+          jobParamsEmitted = true;
         }
+      } catch (Exception e) {
+        LOG.error("Exception when flushing diagnosticsStreamMessage", e);
       }
     }
   }
+
 }
diff --git 
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
 
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
new file mode 100644
index 0000000..6840912
--- /dev/null
+++ 
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.diagnostics;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.metrics.reporter.Metrics;
+import org.apache.samza.metrics.reporter.MetricsHeader;
+import org.apache.samza.metrics.reporter.MetricsSnapshot;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Defines the contents for any message emitted to the diagnostic stream by 
the {@link DiagnosticsManager}.
+ * All contents are stored in a {@link MetricsHeader} and a metricsMessage map 
which combine to get a {@link MetricsSnapshot},
+ * which can be serialized using serdes ({@link 
org.apache.samza.serializers.MetricsSnapshotSerdeV2}).
+ * This class serializes {@link ContainerModel} using {@link 
SamzaObjectMapper} before adding to the metrics message.
+ *
+ */
+public class DiagnosticsStreamMessage {
+  private static final Logger LOG = 
LoggerFactory.getLogger(DiagnosticsStreamMessage.class);
+
+  public static final String GROUP_NAME_FOR_DIAGNOSTICS_MANAGER = 
DiagnosticsManager.class.getName();
+  // Using DiagnosticsManager as the group name for processor-stop-events, 
job-related params, and container model
+
+  private static final String SAMZACONTAINER_METRICS_GROUP_NAME = 
"org.apache.samza.container.SamzaContainerMetrics";
+  // Using SamzaContainerMetrics as the group name for exceptions to maintain 
compatibility with existing diagnostics
+  private static final String EXCEPTION_LIST_METRIC_NAME = "exceptions";
+
+  private static final String STOP_EVENT_LIST_METRIC_NAME = "stopEvents";
+  private static final String CONTAINER_MB_METRIC_NAME = "containerMemoryMb";
+  private static final String CONTAINER_NUM_CORES_METRIC_NAME = 
"containerNumCores";
+  public static final String CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME = 
"numStoresWithChangelog";
+  private static final String CONTAINER_MODELS_METRIC_NAME = "containerModels";
+
+  private final MetricsHeader metricsHeader;
+  private final Map<String, Map<String, Object>> metricsMessage;
+
+  public DiagnosticsStreamMessage(String jobName, String jobId, String 
containerName, String executionEnvContainerId,
+      String taskClassVersion, String samzaVersion, String hostname, long 
timestamp, long resetTimestamp) {
+
+    // Create the metricHeader
+    metricsHeader =
+        new MetricsHeader(jobName, jobId, containerName, 
executionEnvContainerId, DiagnosticsManager.class.getName(),
+            taskClassVersion, samzaVersion, hostname, timestamp, 
resetTimestamp);
+
+    this.metricsMessage = new HashMap<>();
+  }
+
+  /**
+   * Add the container memory mb parameter to the message.
+   * @param containerMemoryMb the memory mb parameter value.
+   */
+  public void addContainerMb(Integer containerMemoryMb) {
+    addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, 
CONTAINER_MB_METRIC_NAME, containerMemoryMb);
+  }
+
+  /**
+   * Add the container num cores parameter to the message.
+   * @param containerNumCores the num core parameter value.
+   */
+  public void addContainerNumCores(Integer containerNumCores) {
+    addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, 
CONTAINER_NUM_CORES_METRIC_NAME, containerNumCores);
+  }
+
+  /**
+   * Add the num stores with changelog parameter to the message.
+   * @param numStoresWithChangelog the parameter value.
+   */
+  public void addNumStoresWithChangelog(Integer numStoresWithChangelog) {
+    addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, 
CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME,
+        numStoresWithChangelog);
+  }
+
+  /**
+   * Add a map of container models (indexed by containerID) to the message.
+   * @param containerModelMap the container models map
+   */
+  public void addContainerModels(Map<String, ContainerModel> 
containerModelMap) {
+    if (containerModelMap != null && !containerModelMap.isEmpty()) {
+      addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, 
CONTAINER_MODELS_METRIC_NAME,
+          serializeContainerModelMap(containerModelMap));
+    }
+  }
+
+  /**
+   * Add a list of {@link DiagnosticsExceptionEvent}s to the message.
+   * @param exceptionList the list to add.
+   */
+  public void 
addDiagnosticsExceptionEvents(Collection<DiagnosticsExceptionEvent> 
exceptionList) {
+    if (exceptionList != null && !exceptionList.isEmpty()) {
+      addToMetricsMessage(SAMZACONTAINER_METRICS_GROUP_NAME, 
EXCEPTION_LIST_METRIC_NAME, exceptionList);
+    }
+  }
+
+  /**
+   * Add a list of {@link org.apache.samza.diagnostics.ProcessorStopEvent}s to 
add to the list.
+   * @param stopEventList the list to add.
+   */
+  public void addProcessorStopEvents(List<ProcessorStopEvent> stopEventList) {
+    if (stopEventList != null && !stopEventList.isEmpty()) {
+      addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, 
STOP_EVENT_LIST_METRIC_NAME, stopEventList);
+    }
+  }
+
+  /**
+   * Convert this message into a {@link MetricsSnapshot}, useful for 
serde-deserde using {@link org.apache.samza.serializers.MetricsSnapshotSerde}.
+   * @return
+   */
+  public MetricsSnapshot convertToMetricsSnapshot() {
+    MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new 
Metrics(metricsMessage));
+    return metricsSnapshot;
+  }
+
+  /**
+   * Check if the message has no contents.
+   * @return True if the message is empty, false otherwise.
+   */
+  public boolean isEmpty() {
+    return metricsMessage.isEmpty();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    DiagnosticsStreamMessage that = (DiagnosticsStreamMessage) o;
+    return metricsHeader.getAsMap().equals(that.metricsHeader.getAsMap()) && 
metricsMessage.equals(that.metricsMessage);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(metricsHeader, metricsMessage);
+  }
+
+  public Collection<ProcessorStopEvent> getProcessorStopEvents() {
+    return (Collection<ProcessorStopEvent>) 
getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
+        STOP_EVENT_LIST_METRIC_NAME);
+  }
+
+  public Collection<DiagnosticsExceptionEvent> getExceptionEvents() {
+    return (Collection<DiagnosticsExceptionEvent>) 
getFromMetricsMessage(SAMZACONTAINER_METRICS_GROUP_NAME,
+        EXCEPTION_LIST_METRIC_NAME);
+  }
+
+  public Integer getContainerMb() {
+    return (Integer) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, 
CONTAINER_MB_METRIC_NAME);
+  }
+
+  public Integer getContainerNumCores() {
+    return (Integer) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, 
CONTAINER_NUM_CORES_METRIC_NAME);
+  }
+
+  public Integer getNumStoresWithChangelog() {
+    return (Integer) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER,
+        CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME);
+  }
+
+  public Map<String, ContainerModel> getContainerModels() {
+    return deserializeContainerModelMap((String) 
getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, 
CONTAINER_MODELS_METRIC_NAME));
+  }
+
+  // Helper method to get a {@link DiagnosticsStreamMessage} from a {@link 
MetricsSnapshot}.
+  //   * This is typically used when deserializing messages from a 
diagnostics-stream.
+  //   * @param metricsSnapshot
+  public static DiagnosticsStreamMessage 
convertToDiagnosticsStreamMessage(MetricsSnapshot metricsSnapshot) {
+    DiagnosticsStreamMessage diagnosticsStreamMessage =
+        new DiagnosticsStreamMessage(metricsSnapshot.getHeader().getJobName(), 
metricsSnapshot.getHeader().getJobId(),
+            metricsSnapshot.getHeader().getContainerName(), 
metricsSnapshot.getHeader().getExecEnvironmentContainerId(),
+            metricsSnapshot.getHeader().getVersion(), 
metricsSnapshot.getHeader().getSamzaVersion(),
+            metricsSnapshot.getHeader().getHost(), 
metricsSnapshot.getHeader().getTime(),
+            metricsSnapshot.getHeader().getResetTime());
+
+    Map<String, Map<String, Object>> metricsMap = 
metricsSnapshot.getMetrics().getAsMap();
+    Map<String, Object> diagnosticsManagerGroupMap = 
metricsMap.get(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER);
+    Map<String, Object> containerMetricsGroupMap = 
metricsMap.get(SAMZACONTAINER_METRICS_GROUP_NAME);
+
+    if (diagnosticsManagerGroupMap != null) {
+
+      diagnosticsStreamMessage.addContainerNumCores((Integer) 
diagnosticsManagerGroupMap.get(CONTAINER_NUM_CORES_METRIC_NAME));
+      diagnosticsStreamMessage.addContainerMb((Integer) 
diagnosticsManagerGroupMap.get(CONTAINER_MB_METRIC_NAME));
+      diagnosticsStreamMessage.addNumStoresWithChangelog((Integer) 
diagnosticsManagerGroupMap.get(CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME));
+      
diagnosticsStreamMessage.addContainerModels(deserializeContainerModelMap((String)
 diagnosticsManagerGroupMap.get(CONTAINER_MODELS_METRIC_NAME)));
+
+      
diagnosticsStreamMessage.addProcessorStopEvents((List<ProcessorStopEvent>) 
diagnosticsManagerGroupMap.get(STOP_EVENT_LIST_METRIC_NAME));
+    }
+
+    if (containerMetricsGroupMap != null && 
containerMetricsGroupMap.containsKey(EXCEPTION_LIST_METRIC_NAME)) {
+      diagnosticsStreamMessage.addDiagnosticsExceptionEvents(
+          (Collection<DiagnosticsExceptionEvent>) 
containerMetricsGroupMap.get(EXCEPTION_LIST_METRIC_NAME));
+    }
+
+    return diagnosticsStreamMessage;
+  }
+
+  /**
+   * Helper method to use {@link SamzaObjectMapper} to serialize {@link 
ContainerModel}s.
+   * We use SamzaObjectMapper for ContainerModels, rather than using 
ObjectMapper (in MetricsSnapshotSerdeV2)
+   * because MetricsSnapshotSerdeV2 enables default typing, which writes type 
information for all containerModel (and
+   * underlying) classes, deserializing which requires a large number of 
jackson related changes to those classes
+   * (annotations and/or mixins). We cannot disable default typing to avoid 
backward incompatibility. This is why
+   * we serde-deserde ContainerModel explicitly using SamzaObjectMapper (which 
is also used for reads-writes to coordinator
+   * stream).
+   * {@link SamzaObjectMapper} provides several conventions and optimizations 
for serializing containerModels.
+   * @param containerModelMap map of container models to serialize.
+   * @return
+   */
+  private static String serializeContainerModelMap(Map<String, ContainerModel> 
containerModelMap) {
+    ObjectMapper samzaObjectMapper = SamzaObjectMapper.getObjectMapper();
+    try {
+      return samzaObjectMapper.writeValueAsString(containerModelMap);
+    } catch (IOException e) {
+      LOG.error("Exception in serializing container model ", e);
+    }
+
+    return null;
+  }
+
+  /**
+   * Helper method to use {@link SamzaObjectMapper} to deserialize {@link 
ContainerModel}s.
+   * {@link SamzaObjectMapper} provides several conventions and optimizations 
for deserializing containerModels.
+   * @return
+   */
+  private static Map<String, ContainerModel> deserializeContainerModelMap(
+      String serializedContainerModel) {
+    Map<String, ContainerModel> containerModelMap = null;
+    ObjectMapper samzaObjectMapper = SamzaObjectMapper.getObjectMapper();
+
+    try {
+      if (serializedContainerModel != null) {
+        containerModelMap = 
samzaObjectMapper.readValue(serializedContainerModel, new 
TypeReference<Map<String, ContainerModel>>() {
+        });
+      }
+    } catch (IOException e) {
+      LOG.error("Exception in deserializing container model ", e);
+    }
+
+    return containerModelMap;
+  }
+
+  private void addToMetricsMessage(String groupName, String metricName, Object 
value) {
+    if (value != null) {
+      metricsMessage.putIfAbsent(groupName, new HashMap<>());
+      metricsMessage.get(groupName).put(metricName, value);
+    }
+  }
+
+  private Object getFromMetricsMessage(String groupName, String metricName) {
+    if (metricsMessage.containsKey(groupName) && metricsMessage.get(groupName) 
!= null) {
+      return metricsMessage.get(groupName).get(metricName);
+    } else {
+      return null;
+    }
+  }
+}
diff --git 
a/samza-core/src/main/scala/org/apache/samza/diagnostics/ProcessorStopEvent.java
 
b/samza-core/src/main/scala/org/apache/samza/diagnostics/ProcessorStopEvent.java
new file mode 100644
index 0000000..786fb78
--- /dev/null
+++ 
b/samza-core/src/main/scala/org/apache/samza/diagnostics/ProcessorStopEvent.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.diagnostics;
+
+import java.util.Objects;
+
+
+/**
+ * Encapsulates information (emitted to diagnostics stream) about the stopping 
of a processor.
+ * Information emitted includes, processorId, resourceId, exit status and host.
+ */
+public class ProcessorStopEvent {
+  public final String processorId;
+  public final String resourceId;
+  public final String host;
+  public final int exitStatus;
+
+  // Default constructor, required for deserialization with jackson
+  private ProcessorStopEvent() {
+    this("", "", "", -1);
+  }
+
+  public ProcessorStopEvent(String processorId, String resourceId, String 
host, int exitStatus) {
+    this.processorId = processorId;
+    this.resourceId = resourceId;
+    this.host = host;
+    this.exitStatus = exitStatus;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ProcessorStopEvent that = (ProcessorStopEvent) o;
+    return exitStatus == that.exitStatus && Objects.equals(processorId, 
that.processorId) && Objects.equals(
+        resourceId, that.resourceId) && Objects.equals(host, that.host);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(processorId, resourceId, host, exitStatus);
+  }
+}
diff --git 
a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
 
b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
new file mode 100644
index 0000000..c69b278
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.diagnostics;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.metrics.reporter.MetricsSnapshot;
+import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+
+public class TestDiagnosticsManager {
+  private DiagnosticsManager diagnosticsManager;
+  private MockSystemProducer mockSystemProducer;
+  private SystemStream diagnosticsSystemStream = new SystemStream("kafka", 
"test stream");
+
+  private String jobName = "Testjob";
+  private String jobId = "test job id";
+  private String executionEnvContainerId = "exec container id";
+  private String taskClassVersion = "0.0.1";
+  private String samzaVersion = "1.3.0";
+  private String hostname = "sample host name";
+  private int containerMb = 1024;
+  private int numStoresWithChangelog = 2;
+  private int containerNumCores = 2;
+  private Map<String, ContainerModel> containerModels = 
TestDiagnosticsStreamMessage.getSampleContainerModels();
+  private Collection<DiagnosticsExceptionEvent> exceptionEventList = 
TestDiagnosticsStreamMessage.getExceptionList();
+
+  @Before
+  public void setup() {
+
+    // Mocked system producer for publishing to diagnostics stream
+    mockSystemProducer = new MockSystemProducer();
+
+    // Mocked scheduled executor service which does a synchronous run() on 
scheduling
+    ScheduledExecutorService mockExecutorService = 
Mockito.mock(ScheduledExecutorService.class);
+    Mockito.when(mockExecutorService.scheduleWithFixedDelay(Mockito.any(), 
Mockito.anyLong(), Mockito.anyLong(),
+        Mockito.eq(TimeUnit.SECONDS))).thenAnswer(invocation -> {
+            ((Runnable) invocation.getArguments()[0]).run();
+            return Mockito.mock(ScheduledFuture.class);
+          });
+
+    this.diagnosticsManager =
+        new DiagnosticsManager(jobName, jobId, containerModels, containerMb, 
containerNumCores, numStoresWithChangelog,
+            "0", executionEnvContainerId, taskClassVersion, samzaVersion, 
hostname, diagnosticsSystemStream,
+            mockSystemProducer, Duration.ofSeconds(1), mockExecutorService);
+
+    exceptionEventList.forEach(
+        diagnosticsExceptionEvent -> 
this.diagnosticsManager.addExceptionEvent(diagnosticsExceptionEvent));
+
+    this.diagnosticsManager.addProcessorStopEvent("0", 
executionEnvContainerId, hostname, 101);
+  }
+
+  @Test
+  public void testDiagnosticsStreamFirstMessagePublish() {
+    // invoking start will do a syncrhonous publish to the stream because of 
our mocked scheduled exec service
+    this.diagnosticsManager.start();
+    Assert.assertEquals("One message should have been published", 1, 
mockSystemProducer.getEnvelopeList().size());
+    OutgoingMessageEnvelope outgoingMessageEnvelope = 
mockSystemProducer.getEnvelopeList().get(0);
+    validateOutgoingMessageEnvelope(outgoingMessageEnvelope);
+  }
+
+  @Test
+  public void testNoDualPublish() {
+    // Across two successive run() invocations only a single message should be 
published
+    this.diagnosticsManager.start();
+    this.diagnosticsManager.start();
+
+    Assert.assertEquals("One message should have been published", 1, 
mockSystemProducer.getEnvelopeList().size());
+    OutgoingMessageEnvelope outgoingMessageEnvelope = 
mockSystemProducer.getEnvelopeList().get(0);
+    validateMetricsHeader(outgoingMessageEnvelope);
+    validateOutgoingMessageEnvelope(outgoingMessageEnvelope);
+  }
+
+  @Test
+  public void testSecondPublishWithProcessorStopInSecondMessage() {
+    // Across two successive run() invocations two messages should be 
published if stop events are added
+    this.diagnosticsManager.start();
+    this.diagnosticsManager.addProcessorStopEvent("0", 
executionEnvContainerId, hostname, 102);
+    this.diagnosticsManager.start();
+
+    Assert.assertEquals("Two messages should have been published", 2, 
mockSystemProducer.getEnvelopeList().size());
+
+    // Validate the first message
+    OutgoingMessageEnvelope outgoingMessageEnvelope = 
mockSystemProducer.getEnvelopeList().get(0);
+    validateMetricsHeader(outgoingMessageEnvelope);
+    validateOutgoingMessageEnvelope(outgoingMessageEnvelope);
+
+    // Validate the second message's header
+    outgoingMessageEnvelope = mockSystemProducer.getEnvelopeList().get(1);
+    validateMetricsHeader(outgoingMessageEnvelope);
+
+    // Validate the second message's body (should be all empty except for the 
processor-stop-event)
+    MetricsSnapshot metricsSnapshot =
+        new MetricsSnapshotSerdeV2().fromBytes((byte[]) 
outgoingMessageEnvelope.getMessage());
+    DiagnosticsStreamMessage diagnosticsStreamMessage =
+        
DiagnosticsStreamMessage.convertToDiagnosticsStreamMessage(metricsSnapshot);
+
+    Assert.assertNull(diagnosticsStreamMessage.getContainerMb());
+    Assert.assertNull(diagnosticsStreamMessage.getExceptionEvents());
+    Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(),
+        Arrays.asList(new ProcessorStopEvent("0", executionEnvContainerId, 
hostname, 102)));
+    Assert.assertNull(diagnosticsStreamMessage.getContainerModels());
+    Assert.assertNull(diagnosticsStreamMessage.getContainerNumCores());
+    Assert.assertNull(diagnosticsStreamMessage.getNumStoresWithChangelog());
+  }
+
+  @Test
+  public void testSecondPublishWithExceptionInSecondMessage() {
+    // Across two successive run() invocations two messages should be 
published if stop events are added
+    this.diagnosticsManager.start();
+    DiagnosticsExceptionEvent diagnosticsExceptionEvent = new 
DiagnosticsExceptionEvent(System.currentTimeMillis(), new 
RuntimeException("exception"), new HashMap());
+    this.diagnosticsManager.addExceptionEvent(diagnosticsExceptionEvent);
+    this.diagnosticsManager.start();
+
+    Assert.assertEquals("Two messages should have been published", 2, 
mockSystemProducer.getEnvelopeList().size());
+
+    // Validate the first message
+    OutgoingMessageEnvelope outgoingMessageEnvelope = 
mockSystemProducer.getEnvelopeList().get(0);
+    validateMetricsHeader(outgoingMessageEnvelope);
+    validateOutgoingMessageEnvelope(outgoingMessageEnvelope);
+
+    // Validate the second message's header
+    outgoingMessageEnvelope = mockSystemProducer.getEnvelopeList().get(1);
+    validateMetricsHeader(outgoingMessageEnvelope);
+
+    // Validate the second message's body (should be all empty except for the 
processor-stop-event)
+    MetricsSnapshot metricsSnapshot =
+        new MetricsSnapshotSerdeV2().fromBytes((byte[]) 
outgoingMessageEnvelope.getMessage());
+    DiagnosticsStreamMessage diagnosticsStreamMessage =
+        
DiagnosticsStreamMessage.convertToDiagnosticsStreamMessage(metricsSnapshot);
+
+    Assert.assertNull(diagnosticsStreamMessage.getContainerMb());
+    Assert.assertEquals(Arrays.asList(diagnosticsExceptionEvent), 
diagnosticsStreamMessage.getExceptionEvents());
+    Assert.assertNull(diagnosticsStreamMessage.getProcessorStopEvents());
+    Assert.assertNull(diagnosticsStreamMessage.getContainerModels());
+    Assert.assertNull(diagnosticsStreamMessage.getContainerNumCores());
+    Assert.assertNull(diagnosticsStreamMessage.getNumStoresWithChangelog());
+  }
+
+  @After
+  public void teardown() throws Exception {
+    this.diagnosticsManager.stop();
+  }
+
+  private void validateMetricsHeader(OutgoingMessageEnvelope 
outgoingMessageEnvelope) {
+    // Validate the outgoing message
+
+    
Assert.assertTrue(outgoingMessageEnvelope.getSystemStream().equals(diagnosticsSystemStream));
+    MetricsSnapshot metricsSnapshot =
+        new MetricsSnapshotSerdeV2().fromBytes((byte[]) 
outgoingMessageEnvelope.getMessage());
+
+    // Validate all header fields
+    Assert.assertEquals(metricsSnapshot.getHeader().getJobName(), jobName);
+    Assert.assertEquals(metricsSnapshot.getHeader().getJobId(), jobId);
+    
Assert.assertEquals(metricsSnapshot.getHeader().getExecEnvironmentContainerId(),
 executionEnvContainerId);
+    Assert.assertEquals(metricsSnapshot.getHeader().getVersion(), 
taskClassVersion);
+    Assert.assertEquals(metricsSnapshot.getHeader().getSamzaVersion(), 
samzaVersion);
+    Assert.assertEquals(metricsSnapshot.getHeader().getHost(), hostname);
+    Assert.assertEquals(metricsSnapshot.getHeader().getSource(), 
DiagnosticsManager.class.getName());
+
+  }
+
+  private void validateOutgoingMessageEnvelope(OutgoingMessageEnvelope 
outgoingMessageEnvelope) {
+    MetricsSnapshot metricsSnapshot =
+        new MetricsSnapshotSerdeV2().fromBytes((byte[]) 
outgoingMessageEnvelope.getMessage());
+
+    // Validate the diagnostics stream message
+    DiagnosticsStreamMessage diagnosticsStreamMessage =
+        
DiagnosticsStreamMessage.convertToDiagnosticsStreamMessage(metricsSnapshot);
+
+    Assert.assertEquals(containerMb, 
diagnosticsStreamMessage.getContainerMb().intValue());
+    Assert.assertEquals(exceptionEventList, 
diagnosticsStreamMessage.getExceptionEvents());
+    Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(), 
Arrays.asList(new ProcessorStopEvent("0", executionEnvContainerId, hostname, 
101)));
+    Assert.assertEquals(containerModels, 
diagnosticsStreamMessage.getContainerModels());
+    Assert.assertEquals(containerNumCores, 
diagnosticsStreamMessage.getContainerNumCores().intValue());
+    Assert.assertEquals(numStoresWithChangelog, 
diagnosticsStreamMessage.getNumStoresWithChangelog().intValue());
+  }
+
+  private class MockSystemProducer implements SystemProducer {
+
+    private final List<OutgoingMessageEnvelope> envelopeList = new 
ArrayList<>();
+
+    @Override
+    public void start() {
+
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+    @Override
+    public void register(String source) {
+
+    }
+
+    @Override
+    public void send(String source, OutgoingMessageEnvelope envelope) {
+      envelopeList.add(envelope);
+    }
+
+    @Override
+    public void flush(String source) {
+
+    }
+
+    public List<OutgoingMessageEnvelope> getEnvelopeList() {
+      return this.envelopeList;
+    }
+  }
+}
diff --git 
a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
 
b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
new file mode 100644
index 0000000..81bc577
--- /dev/null
+++ 
b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.diagnostics;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.reporter.MetricsSnapshot;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestDiagnosticsStreamMessage {
+
+  private final String jobName = "Testjob";
+  private final String jobId = "test job id";
+  private final String containerName = "sample container name";
+  private final String executionEnvContainerId = "exec container id";
+  private final String taskClassVersion = "0.0.1";
+  private final String samzaVersion = "1.3.0";
+  private final String hostname = "sample host name";
+  private final long timestamp = System.currentTimeMillis();
+  private final long resetTimestamp = System.currentTimeMillis();
+
+  private DiagnosticsStreamMessage getDiagnosticsStreamMessage() {
+    DiagnosticsStreamMessage diagnosticsStreamMessage =
+        new DiagnosticsStreamMessage(jobName, jobId, containerName, 
executionEnvContainerId, taskClassVersion,
+            samzaVersion, hostname, timestamp, resetTimestamp);
+
+    diagnosticsStreamMessage.addContainerMb(1024);
+    diagnosticsStreamMessage.addContainerNumCores(2);
+    diagnosticsStreamMessage.addNumStoresWithChangelog(3);
+
+    
diagnosticsStreamMessage.addProcessorStopEvents(getProcessorStopEventList());
+    return diagnosticsStreamMessage;
+  }
+
+  public static Collection<DiagnosticsExceptionEvent> getExceptionList() {
+    BoundedList boundedList = new 
BoundedList<DiagnosticsExceptionEvent>("exceptions");
+    DiagnosticsExceptionEvent diagnosticsExceptionEvent =
+        new DiagnosticsExceptionEvent(1, new Exception("this is a samza 
exception", new Exception("cause")),
+            new HashMap());
+
+    boundedList.add(diagnosticsExceptionEvent);
+    return boundedList.getValues();
+  }
+
+  public List<ProcessorStopEvent> getProcessorStopEventList() {
+    List<ProcessorStopEvent> stopEventList = new ArrayList<>();
+    stopEventList.add(new ProcessorStopEvent("0", executionEnvContainerId, 
hostname, 101));
+    return stopEventList;
+  }
+
+  public static Map<String, ContainerModel> getSampleContainerModels() {
+    Map<String, ContainerModel> containerModels = new HashMap<>();
+    Map<TaskName, TaskModel> tasks = new HashMap<>();
+
+    Set<SystemStreamPartition> sspsForTask1 = new HashSet<>();
+    sspsForTask1.add(new SystemStreamPartition("kafka", "test-stream", new 
Partition(0)));
+    tasks.put(new TaskName("Partition 0"), new TaskModel(new 
TaskName("Partition 0"), sspsForTask1, new Partition(0)));
+
+    Set<SystemStreamPartition> sspsForTask2 = new HashSet<>();
+    sspsForTask2.add(new SystemStreamPartition("kafka", "test-stream", new 
Partition(1)));
+    tasks.put(new TaskName("Partition 1"), new TaskModel(new 
TaskName("Partition 1"), sspsForTask2, new Partition(1)));
+
+    containerModels.put("0", new ContainerModel("0", tasks));
+    return containerModels;
+  }
+
+  /**
+   * Tests basic operations on {@link DiagnosticsStreamMessage}.
+   */
+  @Test
+  public void basicTest() {
+
+    DiagnosticsStreamMessage diagnosticsStreamMessage = 
getDiagnosticsStreamMessage();
+    Collection<DiagnosticsExceptionEvent> exceptionEventList = 
getExceptionList();
+    diagnosticsStreamMessage.addDiagnosticsExceptionEvents(exceptionEventList);
+    
diagnosticsStreamMessage.addProcessorStopEvents(getProcessorStopEventList());
+    diagnosticsStreamMessage.addContainerModels(getSampleContainerModels());
+
+    Assert.assertEquals(1024, (int) diagnosticsStreamMessage.getContainerMb());
+    Assert.assertEquals(2, (int) 
diagnosticsStreamMessage.getContainerNumCores());
+    Assert.assertEquals(3, (int) 
diagnosticsStreamMessage.getNumStoresWithChangelog());
+    Assert.assertEquals(exceptionEventList, 
diagnosticsStreamMessage.getExceptionEvents());
+    Assert.assertEquals(getSampleContainerModels(), 
diagnosticsStreamMessage.getContainerModels());
+    Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(), 
getProcessorStopEventList());
+  }
+
+  /**
+   * Tests serialization and deserialization of a {@link 
DiagnosticsStreamMessage}
+   */
+  @Test
+  public void serdeTest() {
+    DiagnosticsStreamMessage diagnosticsStreamMessage = 
getDiagnosticsStreamMessage();
+    Collection<DiagnosticsExceptionEvent> exceptionEventList = 
getExceptionList();
+    diagnosticsStreamMessage.addDiagnosticsExceptionEvents(exceptionEventList);
+    
diagnosticsStreamMessage.addProcessorStopEvents(getProcessorStopEventList());
+    diagnosticsStreamMessage.addContainerModels(getSampleContainerModels());
+
+    MetricsSnapshot metricsSnapshot = 
diagnosticsStreamMessage.convertToMetricsSnapshot();
+    Assert.assertEquals(metricsSnapshot.getHeader().getJobName(), jobName);
+    Assert.assertEquals(metricsSnapshot.getHeader().getJobId(), jobId);
+    
Assert.assertEquals(metricsSnapshot.getHeader().getExecEnvironmentContainerId(),
 executionEnvContainerId);
+    Assert.assertEquals(metricsSnapshot.getHeader().getVersion(), 
taskClassVersion);
+    Assert.assertEquals(metricsSnapshot.getHeader().getSamzaVersion(), 
samzaVersion);
+    Assert.assertEquals(metricsSnapshot.getHeader().getHost(), hostname);
+    Assert.assertEquals(metricsSnapshot.getHeader().getSource(), 
DiagnosticsManager.class.getName());
+
+    Map<String, Map<String, Object>> metricsMap = 
metricsSnapshot.getMetrics().getAsMap();
+    
Assert.assertTrue(metricsMap.get("org.apache.samza.container.SamzaContainerMetrics").containsKey("exceptions"));
+    
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerModels"));
+    
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("numStoresWithChangelog"));
+    
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerNumCores"));
+    
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerMemoryMb"));
+    
Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("stopEvents"));
+
+    DiagnosticsStreamMessage convertedDiagnosticsStreamMessage =
+        
DiagnosticsStreamMessage.convertToDiagnosticsStreamMessage(metricsSnapshot);
+
+    
Assert.assertTrue(convertedDiagnosticsStreamMessage.equals(diagnosticsStreamMessage));
+  }
+}

Reply via email to