This is an automated email from the ASF dual-hosted git repository. jagadish pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new 6b0c20a SAMZA-2251: Minor diagnostics manager change to emit additional job details 6b0c20a is described below commit 6b0c20ae63f9f15da89f35dbb15189300cfab371 Author: Ray Matharu <rmath...@linkedin.com> AuthorDate: Mon Jun 24 17:40:28 2019 -0700 SAMZA-2251: Minor diagnostics manager change to emit additional job details Author: Ray Matharu <rmath...@linkedin.com> Reviewers: Cameron Lee<ca...@linkedin.com> Closes #1083 from rmatharu/test-diagnostics-improvements --- .../clustermanager/ContainerProcessManager.java | 8 +- .../org/apache/samza/config/StorageConfig.java | 8 + .../apache/samza/processor/StreamProcessor.java | 2 +- .../apache/samza/runtime/ContainerLaunchUtil.java | 2 +- .../org/apache/samza/util/DiagnosticsUtil.java | 35 ++- .../samza/diagnostics/DiagnosticsManager.java | 118 ++++++--- .../diagnostics/DiagnosticsStreamMessage.java | 284 +++++++++++++++++++++ .../samza/diagnostics/ProcessorStopEvent.java | 63 +++++ .../samza/diagnostics/TestDiagnosticsManager.java | 245 ++++++++++++++++++ .../diagnostics/TestDiagnosticsStreamMessage.java | 148 +++++++++++ 10 files changed, 862 insertions(+), 51 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java index f75e217..b2cf6b9 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java @@ -146,7 +146,7 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback String jobId = new JobConfig(config).getJobId(); Optional<String> execEnvContainerId = Optional.ofNullable(System.getenv(EXEC_ENV_CONTAINER_ID_SYS_PROPERTY)); Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair = - DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, METRICS_SOURCE_NAME, execEnvContainerId, config); + DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, state.jobModelManager.jobModel(), METRICS_SOURCE_NAME, execEnvContainerId, config); if (diagnosticsManagerReporterPair.isPresent()) { diagnosticsManager = Option.apply(diagnosticsManagerReporterPair.get().getKey()); @@ -308,11 +308,13 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback public void onResourceCompleted(SamzaResourceStatus resourceStatus) { String containerId = resourceStatus.getContainerId(); String processorId = null; + String hostName = null; for (Map.Entry<String, SamzaResource> entry: state.runningProcessors.entrySet()) { if (entry.getValue().getContainerId().equals(resourceStatus.getContainerId())) { log.info("Container ID: {} matched running Processor ID: {} on host: {}", containerId, entry.getKey(), entry.getValue().getHost()); processorId = entry.getKey(); + hostName = entry.getValue().getHost(); break; } } @@ -435,6 +437,10 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback } } + + if (this.diagnosticsManager.isDefined()) { + this.diagnosticsManager.get().addProcessorStopEvent(processorId, resourceStatus.getContainerId(), hostName, exitStatus); + } } @Override diff --git a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java index bc38e56..32e0d8e 100644 --- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java @@ -205,4 +205,12 @@ public class StorageConfig extends MapConfig { Config subConfig = subset(STORE_PREFIX, true); return subConfig.keySet().stream().anyMatch(key -> key.endsWith(CHANGELOG_SUFFIX)); } + + /** + * Helper method to get the number of stores configured with a changelog. + */ + public int getNumStoresWithChangelog() { + Config subConfig = subset(STORE_PREFIX, true); + return new Long(subConfig.keySet().stream().filter(key -> key.endsWith(CHANGELOG_SUFFIX)).count()).intValue(); + } } diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index c90c8f4..75dc62d 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -341,7 +341,7 @@ public class StreamProcessor { String jobName = new JobConfig(config).getName().get(); String jobId = new JobConfig(config).getJobId(); Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair = - DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, processorId, Optional.empty(), config); + DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, processorId, Optional.empty(), config); Option<DiagnosticsManager> diagnosticsManager = Option.empty(); if (diagnosticsManagerReporterPair.isPresent()) { diagnosticsManager = Option.apply(diagnosticsManagerReporterPair.get().getKey()); diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java index e748dd2..c84a4a1 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/ContainerLaunchUtil.java @@ -99,7 +99,7 @@ public class ContainerLaunchUtil { Map<String, MetricsReporter> metricsReporters = loadMetricsReporters(appDesc, containerId, config); // Creating diagnostics manager and reporter, and wiring it respectively - Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair = DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, containerId, execEnvContainerId, config); + Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair = DiagnosticsUtil.buildDiagnosticsManager(jobName, jobId, jobModel, containerId, execEnvContainerId, config); Option<DiagnosticsManager> diagnosticsManager = Option.empty(); if (diagnosticsManagerReporterPair.isPresent()) { diagnosticsManager = Option.apply(diagnosticsManagerReporterPair.get().getKey()); diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java index c290e6c..637c01b 100644 --- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java +++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java @@ -24,13 +24,16 @@ import java.util.Optional; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.samza.SamzaException; +import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.config.Config; import org.apache.samza.config.ConfigException; import org.apache.samza.config.JobConfig; import org.apache.samza.config.MetricsConfig; +import org.apache.samza.config.StorageConfig; import org.apache.samza.config.SystemConfig; import org.apache.samza.config.TaskConfig; import org.apache.samza.diagnostics.DiagnosticsManager; +import org.apache.samza.job.model.JobModel; import org.apache.samza.metrics.MetricsRegistryMap; import org.apache.samza.metrics.reporter.Metrics; import org.apache.samza.metrics.reporter.MetricsHeader; @@ -50,7 +53,6 @@ import scala.Option; public class DiagnosticsUtil { private static final Logger log = LoggerFactory.getLogger(DiagnosticsUtil.class); - // Write a file in the samza.log.dir named {exec-env-container-id}.metadata that contains // metadata about the container such as containerId, jobName, jobId, hostname, timestamp, version info, and others. // The file contents are serialized using {@link JsonSerde}. @@ -61,9 +63,9 @@ public class DiagnosticsUtil { if (metadataFile.isDefined()) { MetricsHeader metricsHeader = - new MetricsHeader(jobName, jobId, "samza-container-" + containerId, execEnvContainerId.orElse(""), LocalContainerRunner.class.getName(), - Util.getTaskClassVersion(config), Util.getSamzaVersion(), Util.getLocalHost().getHostName(), - System.currentTimeMillis(), System.currentTimeMillis()); + new MetricsHeader(jobName, jobId, "samza-container-" + containerId, execEnvContainerId.orElse(""), + LocalContainerRunner.class.getName(), Util.getTaskClassVersion(config), Util.getSamzaVersion(), + Util.getLocalHost().getHostName(), System.currentTimeMillis(), System.currentTimeMillis()); class MetadataFileContents { public final String version; @@ -76,26 +78,30 @@ public class DiagnosticsUtil { } MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new Metrics()); - MetadataFileContents metadataFileContents = new MetadataFileContents("1", new String(new MetricsSnapshotSerdeV2().toBytes(metricsSnapshot))); + MetadataFileContents metadataFileContents = + new MetadataFileContents("1", new String(new MetricsSnapshotSerdeV2().toBytes(metricsSnapshot))); FileUtil.writeToTextFile(metadataFile.get(), new String(new JsonSerde<>().toBytes(metadataFileContents)), false); } else { log.info("Skipping writing metadata file."); } } - /** * Create a pair of DiagnosticsManager and Reporter for the given jobName, jobId, containerId, and execEnvContainerId, * if diagnostics is enabled. * execEnvContainerId is the ID assigned to the container by the cluster manager (e.g., YARN). */ - public static Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> buildDiagnosticsManager(String jobName, String jobId, - String containerId, Optional<String> execEnvContainerId, Config config) { + public static Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> buildDiagnosticsManager(String jobName, + String jobId, JobModel jobModel, String containerId, Optional<String> execEnvContainerId, Config config) { Optional<Pair<DiagnosticsManager, MetricsSnapshotReporter>> diagnosticsManagerReporterPair = Optional.empty(); if (new JobConfig(config).getDiagnosticsEnabled()) { + ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config); + int containerMemoryMb = clusterManagerConfig.getContainerMemoryMb(); + int containerNumCores = clusterManagerConfig.getNumCores(); + // Diagnostic stream, producer, and reporter related parameters String diagnosticsReporterName = MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS; MetricsConfig metricsConfig = new MetricsConfig(config); @@ -111,16 +117,21 @@ public class DiagnosticsUtil { SystemStream diagnosticsSystemStream = StreamUtil.getSystemStreamFromNames(diagnosticsReporterStreamName.get()); - Optional<String> diagnosticsSystemFactoryName = new SystemConfig(config).getSystemFactory(diagnosticsSystemStream.getSystem()); + Optional<String> diagnosticsSystemFactoryName = + new SystemConfig(config).getSystemFactory(diagnosticsSystemStream.getSystem()); if (!diagnosticsSystemFactoryName.isPresent()) { throw new SamzaException("Missing factory in config for system " + diagnosticsSystemStream.getSystem()); } // Create a systemProducer for giving to diagnostic-reporter and diagnosticsManager SystemFactory systemFactory = Util.getObj(diagnosticsSystemFactoryName.get(), SystemFactory.class); - SystemProducer systemProducer = systemFactory.getProducer(diagnosticsSystemStream.getSystem(), config, new MetricsRegistryMap()); - DiagnosticsManager diagnosticsManager = new DiagnosticsManager(jobName, jobId, containerId, execEnvContainerId.orElse(""), taskClassVersion, - samzaVersion, hostName, diagnosticsSystemStream, systemProducer, Duration.ofMillis(new TaskConfig(config).getShutdownMs())); + SystemProducer systemProducer = + systemFactory.getProducer(diagnosticsSystemStream.getSystem(), config, new MetricsRegistryMap()); + DiagnosticsManager diagnosticsManager = + new DiagnosticsManager(jobName, jobId, jobModel.getContainers(), containerMemoryMb, containerNumCores, + new StorageConfig(config).getNumStoresWithChangelog(), containerId, execEnvContainerId.orElse(""), + taskClassVersion, samzaVersion, hostName, diagnosticsSystemStream, systemProducer, + Duration.ofMillis(new TaskConfig(config).getShutdownMs())); Option<String> blacklist = ScalaJavaUtil.JavaOptionals$.MODULE$.toRichOptional( metricsConfig.getMetricsSnapshotReporterBlacklist(diagnosticsReporterName)).toOption(); diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java index daaefe4..aa41940 100644 --- a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java +++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java @@ -19,25 +19,23 @@ package org.apache.samza.diagnostics; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.lang.reflect.InvocationTargetException; import java.time.Duration; import java.time.Instant; -import java.util.Collection; +import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.samza.metrics.reporter.Metrics; -import org.apache.samza.metrics.reporter.MetricsHeader; -import org.apache.samza.metrics.reporter.MetricsSnapshot; +import org.apache.samza.job.model.ContainerModel; import org.apache.samza.serializers.MetricsSnapshotSerdeV2; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemProducer; import org.apache.samza.system.SystemStream; -import org.apache.samza.util.DiagnosticsUtil; import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,8 +53,6 @@ public class DiagnosticsManager { // Period size for pushing data to the diagnostic stream private static final String PUBLISH_THREAD_NAME = "DiagnosticsManager Thread-%d"; - private static final String METRICS_GROUP_NAME = "org.apache.samza.container.SamzaContainerMetrics"; - // Using SamzaContainerMetrics as the group name to maintain compatibility with existing diagnostics // Parameters used for populating the MetricHeader when sending diagnostic-stream messages private final String jobName; @@ -68,31 +64,58 @@ public class DiagnosticsManager { private final String hostname; private final Instant resetTime; - private SystemProducer systemProducer; // SystemProducer for writing diagnostics data - private BoundedList<DiagnosticsExceptionEvent> exceptions; // A BoundedList for storing DiagnosticExceptionEvent + // Job-related params + private final Integer containerMemoryMb; + private final Integer containerNumCores; + private final Integer numStoresWithChangelog; + private final Map<String, ContainerModel> containerModels; + private boolean jobParamsEmitted = false; + + private final SystemProducer systemProducer; // SystemProducer for writing diagnostics data + private final BoundedList<DiagnosticsExceptionEvent> exceptions; // A BoundedList for storing DiagnosticExceptionEvent + private final ConcurrentLinkedQueue<ProcessorStopEvent> processorStopEvents; + // A BoundedList for storing DiagnosticExceptionEvent private final ScheduledExecutorService scheduler; // Scheduler for pushing data to the diagnostic stream private final Duration terminationDuration; // duration to wait when terminating the scheduler private final SystemStream diagnosticSystemStream; - public DiagnosticsManager(String jobName, String jobId, String containerId, String executionEnvContainerId, - String taskClassVersion, String samzaVersion, String hostname, SystemStream diagnosticSystemStream, - SystemProducer systemProducer, Duration terminationDuration) { + public DiagnosticsManager(String jobName, String jobId, Map<String, ContainerModel> containerModels, + Integer containerMemoryMb, Integer containerNumCores, Integer numStoresWithChangelog, String containerId, + String executionEnvContainerId, String taskClassVersion, String samzaVersion, String hostname, + SystemStream diagnosticSystemStream, SystemProducer systemProducer, Duration terminationDuration) { + + this(jobName, jobId, containerModels, containerMemoryMb, containerNumCores, numStoresWithChangelog, containerId, + executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticSystemStream, systemProducer, + terminationDuration, Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build())); + } + + @VisibleForTesting + DiagnosticsManager(String jobName, String jobId, Map<String, ContainerModel> containerModels, + int containerMemoryMb, int containerNumCores, int numStoresWithChangelog, String containerId, + String executionEnvContainerId, String taskClassVersion, String samzaVersion, String hostname, + SystemStream diagnosticSystemStream, SystemProducer systemProducer, Duration terminationDuration, + ScheduledExecutorService executorService) { this.jobName = jobName; this.jobId = jobId; + this.containerModels = containerModels; + this.containerMemoryMb = containerMemoryMb; + this.containerNumCores = containerNumCores; + this.numStoresWithChangelog = numStoresWithChangelog; this.containerId = containerId; this.executionEnvContainerId = executionEnvContainerId; this.taskClassVersion = taskClassVersion; this.samzaVersion = samzaVersion; this.hostname = hostname; - resetTime = Instant.now(); - - this.systemProducer = systemProducer; this.diagnosticSystemStream = diagnosticSystemStream; + this.systemProducer = systemProducer; + this.terminationDuration = terminationDuration; + this.processorStopEvents = new ConcurrentLinkedQueue<>(); this.exceptions = new BoundedList<>("exceptions"); // Create a BoundedList with default size and time parameters - this.scheduler = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder().setNameFormat(PUBLISH_THREAD_NAME).setDaemon(true).build()); - this.terminationDuration = terminationDuration; + this.scheduler = executorService; + + resetTime = Instant.now(); try { @@ -141,36 +164,59 @@ public class DiagnosticsManager { this.exceptions.add(diagnosticsExceptionEvent); } + public void addProcessorStopEvent(String processorId, String resourceId, String host, int exitStatus) { + this.processorStopEvents.add(new ProcessorStopEvent(processorId, resourceId, host, exitStatus)); + LOG.info("Added stop event for Container Id: {}, resource Id: {}, host: {}, exitStatus: {}", processorId, + resourceId, host, exitStatus); + } + private class DiagnosticsStreamPublisher implements Runnable { @Override public void run() { - // Publish exception events if there are any - Collection<DiagnosticsExceptionEvent> exceptionList = exceptions.getValues(); + try { + DiagnosticsStreamMessage diagnosticsStreamMessage = + new DiagnosticsStreamMessage(jobName, jobId, "samza-container-" + containerId, executionEnvContainerId, + taskClassVersion, samzaVersion, hostname, System.currentTimeMillis(), resetTime.toEpochMilli()); + + // Add job-related params to the message (if not already published) + if (!jobParamsEmitted) { + diagnosticsStreamMessage.addContainerMb(containerMemoryMb); + diagnosticsStreamMessage.addContainerNumCores(containerNumCores); + diagnosticsStreamMessage.addNumStoresWithChangelog(numStoresWithChangelog); + diagnosticsStreamMessage.addContainerModels(containerModels); + } - if (!exceptionList.isEmpty()) { + // Add stop event list to the message + diagnosticsStreamMessage.addProcessorStopEvents(new ArrayList(processorStopEvents)); - // Create the metricHeader - MetricsHeader metricsHeader = new MetricsHeader(jobName, jobId, "samza-container-" + containerId, executionEnvContainerId, - DiagnosticsUtil.class.getName(), taskClassVersion, samzaVersion, hostname, System.currentTimeMillis(), - resetTime.toEpochMilli()); + // Add exception events to the message + diagnosticsStreamMessage.addDiagnosticsExceptionEvents(exceptions.getValues()); - Map<String, Map<String, Object>> metricsMessage = new HashMap<>(); - metricsMessage.putIfAbsent(METRICS_GROUP_NAME, new HashMap<>()); - metricsMessage.get(METRICS_GROUP_NAME).put(exceptions.getName(), exceptionList); - MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new Metrics(metricsMessage)); + if (!diagnosticsStreamMessage.isEmpty()) { - try { systemProducer.send(DiagnosticsManager.class.getName(), - new OutgoingMessageEnvelope(diagnosticSystemStream, metricsHeader.getHost(), null, - new MetricsSnapshotSerdeV2().toBytes(metricsSnapshot))); + new OutgoingMessageEnvelope(diagnosticSystemStream, hostname, null, + new MetricsSnapshotSerdeV2().toBytes(diagnosticsStreamMessage.convertToMetricsSnapshot()))); + systemProducer.flush(DiagnosticsManager.class.getName()); + + // Remove stop events from list after successful publish + if (diagnosticsStreamMessage.getProcessorStopEvents() != null) { + processorStopEvents.removeAll(diagnosticsStreamMessage.getProcessorStopEvents()); + } // Remove exceptions from list after successful publish to diagnostics stream - exceptions.remove(exceptionList); - } catch (Exception e) { - LOG.error("Exception when flushing exceptions", e); + if (diagnosticsStreamMessage.getExceptionEvents() != null) { + exceptions.remove(diagnosticsStreamMessage.getExceptionEvents()); + } + + // Emit jobParams once + jobParamsEmitted = true; } + } catch (Exception e) { + LOG.error("Exception when flushing diagnosticsStreamMessage", e); } } } + } diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java new file mode 100644 index 0000000..6840912 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsStreamMessage.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.diagnostics; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.metrics.reporter.Metrics; +import org.apache.samza.metrics.reporter.MetricsHeader; +import org.apache.samza.metrics.reporter.MetricsSnapshot; +import org.apache.samza.serializers.model.SamzaObjectMapper; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Defines the contents for any message emitted to the diagnostic stream by the {@link DiagnosticsManager}. + * All contents are stored in a {@link MetricsHeader} and a metricsMessage map which combine to get a {@link MetricsSnapshot}, + * which can be serialized using serdes ({@link org.apache.samza.serializers.MetricsSnapshotSerdeV2}). + * This class serializes {@link ContainerModel} using {@link SamzaObjectMapper} before adding to the metrics message. + * + */ +public class DiagnosticsStreamMessage { + private static final Logger LOG = LoggerFactory.getLogger(DiagnosticsStreamMessage.class); + + public static final String GROUP_NAME_FOR_DIAGNOSTICS_MANAGER = DiagnosticsManager.class.getName(); + // Using DiagnosticsManager as the group name for processor-stop-events, job-related params, and container model + + private static final String SAMZACONTAINER_METRICS_GROUP_NAME = "org.apache.samza.container.SamzaContainerMetrics"; + // Using SamzaContainerMetrics as the group name for exceptions to maintain compatibility with existing diagnostics + private static final String EXCEPTION_LIST_METRIC_NAME = "exceptions"; + + private static final String STOP_EVENT_LIST_METRIC_NAME = "stopEvents"; + private static final String CONTAINER_MB_METRIC_NAME = "containerMemoryMb"; + private static final String CONTAINER_NUM_CORES_METRIC_NAME = "containerNumCores"; + public static final String CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME = "numStoresWithChangelog"; + private static final String CONTAINER_MODELS_METRIC_NAME = "containerModels"; + + private final MetricsHeader metricsHeader; + private final Map<String, Map<String, Object>> metricsMessage; + + public DiagnosticsStreamMessage(String jobName, String jobId, String containerName, String executionEnvContainerId, + String taskClassVersion, String samzaVersion, String hostname, long timestamp, long resetTimestamp) { + + // Create the metricHeader + metricsHeader = + new MetricsHeader(jobName, jobId, containerName, executionEnvContainerId, DiagnosticsManager.class.getName(), + taskClassVersion, samzaVersion, hostname, timestamp, resetTimestamp); + + this.metricsMessage = new HashMap<>(); + } + + /** + * Add the container memory mb parameter to the message. + * @param containerMemoryMb the memory mb parameter value. + */ + public void addContainerMb(Integer containerMemoryMb) { + addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONTAINER_MB_METRIC_NAME, containerMemoryMb); + } + + /** + * Add the container num cores parameter to the message. + * @param containerNumCores the num core parameter value. + */ + public void addContainerNumCores(Integer containerNumCores) { + addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONTAINER_NUM_CORES_METRIC_NAME, containerNumCores); + } + + /** + * Add the num stores with changelog parameter to the message. + * @param numStoresWithChangelog the parameter value. + */ + public void addNumStoresWithChangelog(Integer numStoresWithChangelog) { + addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME, + numStoresWithChangelog); + } + + /** + * Add a map of container models (indexed by containerID) to the message. + * @param containerModelMap the container models map + */ + public void addContainerModels(Map<String, ContainerModel> containerModelMap) { + if (containerModelMap != null && !containerModelMap.isEmpty()) { + addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONTAINER_MODELS_METRIC_NAME, + serializeContainerModelMap(containerModelMap)); + } + } + + /** + * Add a list of {@link DiagnosticsExceptionEvent}s to the message. + * @param exceptionList the list to add. + */ + public void addDiagnosticsExceptionEvents(Collection<DiagnosticsExceptionEvent> exceptionList) { + if (exceptionList != null && !exceptionList.isEmpty()) { + addToMetricsMessage(SAMZACONTAINER_METRICS_GROUP_NAME, EXCEPTION_LIST_METRIC_NAME, exceptionList); + } + } + + /** + * Add a list of {@link org.apache.samza.diagnostics.ProcessorStopEvent}s to add to the list. + * @param stopEventList the list to add. + */ + public void addProcessorStopEvents(List<ProcessorStopEvent> stopEventList) { + if (stopEventList != null && !stopEventList.isEmpty()) { + addToMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, STOP_EVENT_LIST_METRIC_NAME, stopEventList); + } + } + + /** + * Convert this message into a {@link MetricsSnapshot}, useful for serde-deserde using {@link org.apache.samza.serializers.MetricsSnapshotSerde}. + * @return + */ + public MetricsSnapshot convertToMetricsSnapshot() { + MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new Metrics(metricsMessage)); + return metricsSnapshot; + } + + /** + * Check if the message has no contents. + * @return True if the message is empty, false otherwise. + */ + public boolean isEmpty() { + return metricsMessage.isEmpty(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DiagnosticsStreamMessage that = (DiagnosticsStreamMessage) o; + return metricsHeader.getAsMap().equals(that.metricsHeader.getAsMap()) && metricsMessage.equals(that.metricsMessage); + } + + @Override + public int hashCode() { + return Objects.hash(metricsHeader, metricsMessage); + } + + public Collection<ProcessorStopEvent> getProcessorStopEvents() { + return (Collection<ProcessorStopEvent>) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, + STOP_EVENT_LIST_METRIC_NAME); + } + + public Collection<DiagnosticsExceptionEvent> getExceptionEvents() { + return (Collection<DiagnosticsExceptionEvent>) getFromMetricsMessage(SAMZACONTAINER_METRICS_GROUP_NAME, + EXCEPTION_LIST_METRIC_NAME); + } + + public Integer getContainerMb() { + return (Integer) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONTAINER_MB_METRIC_NAME); + } + + public Integer getContainerNumCores() { + return (Integer) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONTAINER_NUM_CORES_METRIC_NAME); + } + + public Integer getNumStoresWithChangelog() { + return (Integer) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, + CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME); + } + + public Map<String, ContainerModel> getContainerModels() { + return deserializeContainerModelMap((String) getFromMetricsMessage(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER, CONTAINER_MODELS_METRIC_NAME)); + } + + // Helper method to get a {@link DiagnosticsStreamMessage} from a {@link MetricsSnapshot}. + // * This is typically used when deserializing messages from a diagnostics-stream. + // * @param metricsSnapshot + public static DiagnosticsStreamMessage convertToDiagnosticsStreamMessage(MetricsSnapshot metricsSnapshot) { + DiagnosticsStreamMessage diagnosticsStreamMessage = + new DiagnosticsStreamMessage(metricsSnapshot.getHeader().getJobName(), metricsSnapshot.getHeader().getJobId(), + metricsSnapshot.getHeader().getContainerName(), metricsSnapshot.getHeader().getExecEnvironmentContainerId(), + metricsSnapshot.getHeader().getVersion(), metricsSnapshot.getHeader().getSamzaVersion(), + metricsSnapshot.getHeader().getHost(), metricsSnapshot.getHeader().getTime(), + metricsSnapshot.getHeader().getResetTime()); + + Map<String, Map<String, Object>> metricsMap = metricsSnapshot.getMetrics().getAsMap(); + Map<String, Object> diagnosticsManagerGroupMap = metricsMap.get(GROUP_NAME_FOR_DIAGNOSTICS_MANAGER); + Map<String, Object> containerMetricsGroupMap = metricsMap.get(SAMZACONTAINER_METRICS_GROUP_NAME); + + if (diagnosticsManagerGroupMap != null) { + + diagnosticsStreamMessage.addContainerNumCores((Integer) diagnosticsManagerGroupMap.get(CONTAINER_NUM_CORES_METRIC_NAME)); + diagnosticsStreamMessage.addContainerMb((Integer) diagnosticsManagerGroupMap.get(CONTAINER_MB_METRIC_NAME)); + diagnosticsStreamMessage.addNumStoresWithChangelog((Integer) diagnosticsManagerGroupMap.get(CONTAINER_NUM_STORES_WITH_CHANGELOG_METRIC_NAME)); + diagnosticsStreamMessage.addContainerModels(deserializeContainerModelMap((String) diagnosticsManagerGroupMap.get(CONTAINER_MODELS_METRIC_NAME))); + + diagnosticsStreamMessage.addProcessorStopEvents((List<ProcessorStopEvent>) diagnosticsManagerGroupMap.get(STOP_EVENT_LIST_METRIC_NAME)); + } + + if (containerMetricsGroupMap != null && containerMetricsGroupMap.containsKey(EXCEPTION_LIST_METRIC_NAME)) { + diagnosticsStreamMessage.addDiagnosticsExceptionEvents( + (Collection<DiagnosticsExceptionEvent>) containerMetricsGroupMap.get(EXCEPTION_LIST_METRIC_NAME)); + } + + return diagnosticsStreamMessage; + } + + /** + * Helper method to use {@link SamzaObjectMapper} to serialize {@link ContainerModel}s. + * We use SamzaObjectMapper for ContainerModels, rather than using ObjectMapper (in MetricsSnapshotSerdeV2) + * because MetricsSnapshotSerdeV2 enables default typing, which writes type information for all containerModel (and + * underlying) classes, deserializing which requires a large number of jackson related changes to those classes + * (annotations and/or mixins). We cannot disable default typing to avoid backward incompatibility. This is why + * we serde-deserde ContainerModel explicitly using SamzaObjectMapper (which is also used for reads-writes to coordinator + * stream). + * {@link SamzaObjectMapper} provides several conventions and optimizations for serializing containerModels. + * @param containerModelMap map of container models to serialize. + * @return + */ + private static String serializeContainerModelMap(Map<String, ContainerModel> containerModelMap) { + ObjectMapper samzaObjectMapper = SamzaObjectMapper.getObjectMapper(); + try { + return samzaObjectMapper.writeValueAsString(containerModelMap); + } catch (IOException e) { + LOG.error("Exception in serializing container model ", e); + } + + return null; + } + + /** + * Helper method to use {@link SamzaObjectMapper} to deserialize {@link ContainerModel}s. + * {@link SamzaObjectMapper} provides several conventions and optimizations for deserializing containerModels. + * @return + */ + private static Map<String, ContainerModel> deserializeContainerModelMap( + String serializedContainerModel) { + Map<String, ContainerModel> containerModelMap = null; + ObjectMapper samzaObjectMapper = SamzaObjectMapper.getObjectMapper(); + + try { + if (serializedContainerModel != null) { + containerModelMap = samzaObjectMapper.readValue(serializedContainerModel, new TypeReference<Map<String, ContainerModel>>() { + }); + } + } catch (IOException e) { + LOG.error("Exception in deserializing container model ", e); + } + + return containerModelMap; + } + + private void addToMetricsMessage(String groupName, String metricName, Object value) { + if (value != null) { + metricsMessage.putIfAbsent(groupName, new HashMap<>()); + metricsMessage.get(groupName).put(metricName, value); + } + } + + private Object getFromMetricsMessage(String groupName, String metricName) { + if (metricsMessage.containsKey(groupName) && metricsMessage.get(groupName) != null) { + return metricsMessage.get(groupName).get(metricName); + } else { + return null; + } + } +} diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/ProcessorStopEvent.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/ProcessorStopEvent.java new file mode 100644 index 0000000..786fb78 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/ProcessorStopEvent.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.diagnostics; + +import java.util.Objects; + + +/** + * Encapsulates information (emitted to diagnostics stream) about the stopping of a processor. + * Information emitted includes, processorId, resourceId, exit status and host. + */ +public class ProcessorStopEvent { + public final String processorId; + public final String resourceId; + public final String host; + public final int exitStatus; + + // Default constructor, required for deserialization with jackson + private ProcessorStopEvent() { + this("", "", "", -1); + } + + public ProcessorStopEvent(String processorId, String resourceId, String host, int exitStatus) { + this.processorId = processorId; + this.resourceId = resourceId; + this.host = host; + this.exitStatus = exitStatus; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ProcessorStopEvent that = (ProcessorStopEvent) o; + return exitStatus == that.exitStatus && Objects.equals(processorId, that.processorId) && Objects.equals( + resourceId, that.resourceId) && Objects.equals(host, that.host); + } + + @Override + public int hashCode() { + return Objects.hash(processorId, resourceId, host, exitStatus); + } +} diff --git a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java new file mode 100644 index 0000000..c69b278 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsManager.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.diagnostics; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.metrics.reporter.MetricsSnapshot; +import org.apache.samza.serializers.MetricsSnapshotSerdeV2; +import org.apache.samza.system.OutgoingMessageEnvelope; +import org.apache.samza.system.SystemProducer; +import org.apache.samza.system.SystemStream; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + + +public class TestDiagnosticsManager { + private DiagnosticsManager diagnosticsManager; + private MockSystemProducer mockSystemProducer; + private SystemStream diagnosticsSystemStream = new SystemStream("kafka", "test stream"); + + private String jobName = "Testjob"; + private String jobId = "test job id"; + private String executionEnvContainerId = "exec container id"; + private String taskClassVersion = "0.0.1"; + private String samzaVersion = "1.3.0"; + private String hostname = "sample host name"; + private int containerMb = 1024; + private int numStoresWithChangelog = 2; + private int containerNumCores = 2; + private Map<String, ContainerModel> containerModels = TestDiagnosticsStreamMessage.getSampleContainerModels(); + private Collection<DiagnosticsExceptionEvent> exceptionEventList = TestDiagnosticsStreamMessage.getExceptionList(); + + @Before + public void setup() { + + // Mocked system producer for publishing to diagnostics stream + mockSystemProducer = new MockSystemProducer(); + + // Mocked scheduled executor service which does a synchronous run() on scheduling + ScheduledExecutorService mockExecutorService = Mockito.mock(ScheduledExecutorService.class); + Mockito.when(mockExecutorService.scheduleWithFixedDelay(Mockito.any(), Mockito.anyLong(), Mockito.anyLong(), + Mockito.eq(TimeUnit.SECONDS))).thenAnswer(invocation -> { + ((Runnable) invocation.getArguments()[0]).run(); + return Mockito.mock(ScheduledFuture.class); + }); + + this.diagnosticsManager = + new DiagnosticsManager(jobName, jobId, containerModels, containerMb, containerNumCores, numStoresWithChangelog, + "0", executionEnvContainerId, taskClassVersion, samzaVersion, hostname, diagnosticsSystemStream, + mockSystemProducer, Duration.ofSeconds(1), mockExecutorService); + + exceptionEventList.forEach( + diagnosticsExceptionEvent -> this.diagnosticsManager.addExceptionEvent(diagnosticsExceptionEvent)); + + this.diagnosticsManager.addProcessorStopEvent("0", executionEnvContainerId, hostname, 101); + } + + @Test + public void testDiagnosticsStreamFirstMessagePublish() { + // invoking start will do a syncrhonous publish to the stream because of our mocked scheduled exec service + this.diagnosticsManager.start(); + Assert.assertEquals("One message should have been published", 1, mockSystemProducer.getEnvelopeList().size()); + OutgoingMessageEnvelope outgoingMessageEnvelope = mockSystemProducer.getEnvelopeList().get(0); + validateOutgoingMessageEnvelope(outgoingMessageEnvelope); + } + + @Test + public void testNoDualPublish() { + // Across two successive run() invocations only a single message should be published + this.diagnosticsManager.start(); + this.diagnosticsManager.start(); + + Assert.assertEquals("One message should have been published", 1, mockSystemProducer.getEnvelopeList().size()); + OutgoingMessageEnvelope outgoingMessageEnvelope = mockSystemProducer.getEnvelopeList().get(0); + validateMetricsHeader(outgoingMessageEnvelope); + validateOutgoingMessageEnvelope(outgoingMessageEnvelope); + } + + @Test + public void testSecondPublishWithProcessorStopInSecondMessage() { + // Across two successive run() invocations two messages should be published if stop events are added + this.diagnosticsManager.start(); + this.diagnosticsManager.addProcessorStopEvent("0", executionEnvContainerId, hostname, 102); + this.diagnosticsManager.start(); + + Assert.assertEquals("Two messages should have been published", 2, mockSystemProducer.getEnvelopeList().size()); + + // Validate the first message + OutgoingMessageEnvelope outgoingMessageEnvelope = mockSystemProducer.getEnvelopeList().get(0); + validateMetricsHeader(outgoingMessageEnvelope); + validateOutgoingMessageEnvelope(outgoingMessageEnvelope); + + // Validate the second message's header + outgoingMessageEnvelope = mockSystemProducer.getEnvelopeList().get(1); + validateMetricsHeader(outgoingMessageEnvelope); + + // Validate the second message's body (should be all empty except for the processor-stop-event) + MetricsSnapshot metricsSnapshot = + new MetricsSnapshotSerdeV2().fromBytes((byte[]) outgoingMessageEnvelope.getMessage()); + DiagnosticsStreamMessage diagnosticsStreamMessage = + DiagnosticsStreamMessage.convertToDiagnosticsStreamMessage(metricsSnapshot); + + Assert.assertNull(diagnosticsStreamMessage.getContainerMb()); + Assert.assertNull(diagnosticsStreamMessage.getExceptionEvents()); + Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(), + Arrays.asList(new ProcessorStopEvent("0", executionEnvContainerId, hostname, 102))); + Assert.assertNull(diagnosticsStreamMessage.getContainerModels()); + Assert.assertNull(diagnosticsStreamMessage.getContainerNumCores()); + Assert.assertNull(diagnosticsStreamMessage.getNumStoresWithChangelog()); + } + + @Test + public void testSecondPublishWithExceptionInSecondMessage() { + // Across two successive run() invocations two messages should be published if stop events are added + this.diagnosticsManager.start(); + DiagnosticsExceptionEvent diagnosticsExceptionEvent = new DiagnosticsExceptionEvent(System.currentTimeMillis(), new RuntimeException("exception"), new HashMap()); + this.diagnosticsManager.addExceptionEvent(diagnosticsExceptionEvent); + this.diagnosticsManager.start(); + + Assert.assertEquals("Two messages should have been published", 2, mockSystemProducer.getEnvelopeList().size()); + + // Validate the first message + OutgoingMessageEnvelope outgoingMessageEnvelope = mockSystemProducer.getEnvelopeList().get(0); + validateMetricsHeader(outgoingMessageEnvelope); + validateOutgoingMessageEnvelope(outgoingMessageEnvelope); + + // Validate the second message's header + outgoingMessageEnvelope = mockSystemProducer.getEnvelopeList().get(1); + validateMetricsHeader(outgoingMessageEnvelope); + + // Validate the second message's body (should be all empty except for the processor-stop-event) + MetricsSnapshot metricsSnapshot = + new MetricsSnapshotSerdeV2().fromBytes((byte[]) outgoingMessageEnvelope.getMessage()); + DiagnosticsStreamMessage diagnosticsStreamMessage = + DiagnosticsStreamMessage.convertToDiagnosticsStreamMessage(metricsSnapshot); + + Assert.assertNull(diagnosticsStreamMessage.getContainerMb()); + Assert.assertEquals(Arrays.asList(diagnosticsExceptionEvent), diagnosticsStreamMessage.getExceptionEvents()); + Assert.assertNull(diagnosticsStreamMessage.getProcessorStopEvents()); + Assert.assertNull(diagnosticsStreamMessage.getContainerModels()); + Assert.assertNull(diagnosticsStreamMessage.getContainerNumCores()); + Assert.assertNull(diagnosticsStreamMessage.getNumStoresWithChangelog()); + } + + @After + public void teardown() throws Exception { + this.diagnosticsManager.stop(); + } + + private void validateMetricsHeader(OutgoingMessageEnvelope outgoingMessageEnvelope) { + // Validate the outgoing message + + Assert.assertTrue(outgoingMessageEnvelope.getSystemStream().equals(diagnosticsSystemStream)); + MetricsSnapshot metricsSnapshot = + new MetricsSnapshotSerdeV2().fromBytes((byte[]) outgoingMessageEnvelope.getMessage()); + + // Validate all header fields + Assert.assertEquals(metricsSnapshot.getHeader().getJobName(), jobName); + Assert.assertEquals(metricsSnapshot.getHeader().getJobId(), jobId); + Assert.assertEquals(metricsSnapshot.getHeader().getExecEnvironmentContainerId(), executionEnvContainerId); + Assert.assertEquals(metricsSnapshot.getHeader().getVersion(), taskClassVersion); + Assert.assertEquals(metricsSnapshot.getHeader().getSamzaVersion(), samzaVersion); + Assert.assertEquals(metricsSnapshot.getHeader().getHost(), hostname); + Assert.assertEquals(metricsSnapshot.getHeader().getSource(), DiagnosticsManager.class.getName()); + + } + + private void validateOutgoingMessageEnvelope(OutgoingMessageEnvelope outgoingMessageEnvelope) { + MetricsSnapshot metricsSnapshot = + new MetricsSnapshotSerdeV2().fromBytes((byte[]) outgoingMessageEnvelope.getMessage()); + + // Validate the diagnostics stream message + DiagnosticsStreamMessage diagnosticsStreamMessage = + DiagnosticsStreamMessage.convertToDiagnosticsStreamMessage(metricsSnapshot); + + Assert.assertEquals(containerMb, diagnosticsStreamMessage.getContainerMb().intValue()); + Assert.assertEquals(exceptionEventList, diagnosticsStreamMessage.getExceptionEvents()); + Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(), Arrays.asList(new ProcessorStopEvent("0", executionEnvContainerId, hostname, 101))); + Assert.assertEquals(containerModels, diagnosticsStreamMessage.getContainerModels()); + Assert.assertEquals(containerNumCores, diagnosticsStreamMessage.getContainerNumCores().intValue()); + Assert.assertEquals(numStoresWithChangelog, diagnosticsStreamMessage.getNumStoresWithChangelog().intValue()); + } + + private class MockSystemProducer implements SystemProducer { + + private final List<OutgoingMessageEnvelope> envelopeList = new ArrayList<>(); + + @Override + public void start() { + + } + + @Override + public void stop() { + + } + + @Override + public void register(String source) { + + } + + @Override + public void send(String source, OutgoingMessageEnvelope envelope) { + envelopeList.add(envelope); + } + + @Override + public void flush(String source) { + + } + + public List<OutgoingMessageEnvelope> getEnvelopeList() { + return this.envelopeList; + } + } +} diff --git a/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java new file mode 100644 index 0000000..81bc577 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/diagnostics/TestDiagnosticsStreamMessage.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.diagnostics; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.samza.Partition; +import org.apache.samza.container.TaskName; +import org.apache.samza.job.model.ContainerModel; +import org.apache.samza.job.model.TaskModel; +import org.apache.samza.metrics.reporter.MetricsSnapshot; +import org.apache.samza.system.SystemStreamPartition; +import org.junit.Assert; +import org.junit.Test; + + +public class TestDiagnosticsStreamMessage { + + private final String jobName = "Testjob"; + private final String jobId = "test job id"; + private final String containerName = "sample container name"; + private final String executionEnvContainerId = "exec container id"; + private final String taskClassVersion = "0.0.1"; + private final String samzaVersion = "1.3.0"; + private final String hostname = "sample host name"; + private final long timestamp = System.currentTimeMillis(); + private final long resetTimestamp = System.currentTimeMillis(); + + private DiagnosticsStreamMessage getDiagnosticsStreamMessage() { + DiagnosticsStreamMessage diagnosticsStreamMessage = + new DiagnosticsStreamMessage(jobName, jobId, containerName, executionEnvContainerId, taskClassVersion, + samzaVersion, hostname, timestamp, resetTimestamp); + + diagnosticsStreamMessage.addContainerMb(1024); + diagnosticsStreamMessage.addContainerNumCores(2); + diagnosticsStreamMessage.addNumStoresWithChangelog(3); + + diagnosticsStreamMessage.addProcessorStopEvents(getProcessorStopEventList()); + return diagnosticsStreamMessage; + } + + public static Collection<DiagnosticsExceptionEvent> getExceptionList() { + BoundedList boundedList = new BoundedList<DiagnosticsExceptionEvent>("exceptions"); + DiagnosticsExceptionEvent diagnosticsExceptionEvent = + new DiagnosticsExceptionEvent(1, new Exception("this is a samza exception", new Exception("cause")), + new HashMap()); + + boundedList.add(diagnosticsExceptionEvent); + return boundedList.getValues(); + } + + public List<ProcessorStopEvent> getProcessorStopEventList() { + List<ProcessorStopEvent> stopEventList = new ArrayList<>(); + stopEventList.add(new ProcessorStopEvent("0", executionEnvContainerId, hostname, 101)); + return stopEventList; + } + + public static Map<String, ContainerModel> getSampleContainerModels() { + Map<String, ContainerModel> containerModels = new HashMap<>(); + Map<TaskName, TaskModel> tasks = new HashMap<>(); + + Set<SystemStreamPartition> sspsForTask1 = new HashSet<>(); + sspsForTask1.add(new SystemStreamPartition("kafka", "test-stream", new Partition(0))); + tasks.put(new TaskName("Partition 0"), new TaskModel(new TaskName("Partition 0"), sspsForTask1, new Partition(0))); + + Set<SystemStreamPartition> sspsForTask2 = new HashSet<>(); + sspsForTask2.add(new SystemStreamPartition("kafka", "test-stream", new Partition(1))); + tasks.put(new TaskName("Partition 1"), new TaskModel(new TaskName("Partition 1"), sspsForTask2, new Partition(1))); + + containerModels.put("0", new ContainerModel("0", tasks)); + return containerModels; + } + + /** + * Tests basic operations on {@link DiagnosticsStreamMessage}. + */ + @Test + public void basicTest() { + + DiagnosticsStreamMessage diagnosticsStreamMessage = getDiagnosticsStreamMessage(); + Collection<DiagnosticsExceptionEvent> exceptionEventList = getExceptionList(); + diagnosticsStreamMessage.addDiagnosticsExceptionEvents(exceptionEventList); + diagnosticsStreamMessage.addProcessorStopEvents(getProcessorStopEventList()); + diagnosticsStreamMessage.addContainerModels(getSampleContainerModels()); + + Assert.assertEquals(1024, (int) diagnosticsStreamMessage.getContainerMb()); + Assert.assertEquals(2, (int) diagnosticsStreamMessage.getContainerNumCores()); + Assert.assertEquals(3, (int) diagnosticsStreamMessage.getNumStoresWithChangelog()); + Assert.assertEquals(exceptionEventList, diagnosticsStreamMessage.getExceptionEvents()); + Assert.assertEquals(getSampleContainerModels(), diagnosticsStreamMessage.getContainerModels()); + Assert.assertEquals(diagnosticsStreamMessage.getProcessorStopEvents(), getProcessorStopEventList()); + } + + /** + * Tests serialization and deserialization of a {@link DiagnosticsStreamMessage} + */ + @Test + public void serdeTest() { + DiagnosticsStreamMessage diagnosticsStreamMessage = getDiagnosticsStreamMessage(); + Collection<DiagnosticsExceptionEvent> exceptionEventList = getExceptionList(); + diagnosticsStreamMessage.addDiagnosticsExceptionEvents(exceptionEventList); + diagnosticsStreamMessage.addProcessorStopEvents(getProcessorStopEventList()); + diagnosticsStreamMessage.addContainerModels(getSampleContainerModels()); + + MetricsSnapshot metricsSnapshot = diagnosticsStreamMessage.convertToMetricsSnapshot(); + Assert.assertEquals(metricsSnapshot.getHeader().getJobName(), jobName); + Assert.assertEquals(metricsSnapshot.getHeader().getJobId(), jobId); + Assert.assertEquals(metricsSnapshot.getHeader().getExecEnvironmentContainerId(), executionEnvContainerId); + Assert.assertEquals(metricsSnapshot.getHeader().getVersion(), taskClassVersion); + Assert.assertEquals(metricsSnapshot.getHeader().getSamzaVersion(), samzaVersion); + Assert.assertEquals(metricsSnapshot.getHeader().getHost(), hostname); + Assert.assertEquals(metricsSnapshot.getHeader().getSource(), DiagnosticsManager.class.getName()); + + Map<String, Map<String, Object>> metricsMap = metricsSnapshot.getMetrics().getAsMap(); + Assert.assertTrue(metricsMap.get("org.apache.samza.container.SamzaContainerMetrics").containsKey("exceptions")); + Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerModels")); + Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("numStoresWithChangelog")); + Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerNumCores")); + Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("containerMemoryMb")); + Assert.assertTrue(metricsMap.get(DiagnosticsManager.class.getName()).containsKey("stopEvents")); + + DiagnosticsStreamMessage convertedDiagnosticsStreamMessage = + DiagnosticsStreamMessage.convertToDiagnosticsStreamMessage(metricsSnapshot); + + Assert.assertTrue(convertedDiagnosticsStreamMessage.equals(diagnosticsStreamMessage)); + } +}