This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 8621db6  SAMZA-2239 : Using jsonserde to write metadata file (#1071)
8621db6 is described below

commit 8621db68b64f3d4f2c1a2aa986e3b1ea0dd80969
Author: rmatharu <[email protected]>
AuthorDate: Fri Jun 7 11:43:32 2019 -0700

    SAMZA-2239 : Using jsonserde to write metadata file (#1071)
    
    * Using jsonserde to write metadata file
    
    * Getting rid of public static class
---
 .../java/org/apache/samza/util/DiagnosticsUtil.java | 21 +++++++++++++++------
 .../src/main/scala/org/apache/samza/util/Util.scala |  4 ++--
 2 files changed, 17 insertions(+), 8 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java 
b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index 6060720..0b680c4 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -37,6 +37,7 @@ import org.apache.samza.metrics.reporter.MetricsHeader;
 import org.apache.samza.metrics.reporter.MetricsSnapshot;
 import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
 import org.apache.samza.runtime.LocalContainerRunner;
+import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
 import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemProducer;
@@ -49,26 +50,34 @@ import scala.Option;
 public class DiagnosticsUtil {
   private static final Logger log = 
LoggerFactory.getLogger(DiagnosticsUtil.class);
 
+
   // Write a file in the samza.log.dir named {exec-env-container-id}.metadata 
that contains
   // metadata about the container such as containerId, jobName, jobId, 
hostname, timestamp, version info, and others.
+  // The file contents are serialized using {@link JsonSerde}.
   public static void writeMetadataFile(String jobName, String jobId, String 
containerId,
       Optional<String> execEnvContainerId, Config config) {
 
     Option<File> metadataFile = 
JobConfig.getMetadataFile(Option.apply(execEnvContainerId.orElse(null)));
 
     if (metadataFile.isDefined()) {
-
-      StringBuilder metadata = new StringBuilder("Version: 1");
-      metadata.append(System.lineSeparator());
       MetricsHeader metricsHeader =
           new MetricsHeader(jobName, jobId, "samza-container-" + containerId, 
execEnvContainerId.orElse(""), LocalContainerRunner.class.getName(),
               Util.getTaskClassVersion(config), Util.getSamzaVersion(), 
Util.getLocalHost().getHostName(),
               System.currentTimeMillis(), System.currentTimeMillis());
 
+      class MetadataFileContents {
+        public final String version;
+        public final String metricsSnapshot;
+
+        public MetadataFileContents(String version, String metricsSnapshot) {
+          this.version = version;
+          this.metricsSnapshot = metricsSnapshot;
+        }
+      }
+
       MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new 
Metrics());
-      metadata.append("ContainerMetadata: ");
-      metadata.append(new String(new 
MetricsSnapshotSerdeV2().toBytes(metricsSnapshot)));
-      FileUtil.writeToTextFile(metadataFile.get(), metadata.toString(), false);
+      MetadataFileContents metadataFileContents = new 
MetadataFileContents("1", new String(new 
MetricsSnapshotSerdeV2().toBytes(metricsSnapshot)));
+      FileUtil.writeToTextFile(metadataFile.get(), new String(new 
JsonSerde<>().toBytes(metadataFileContents)), false);
     } else {
       log.info("Skipping writing metadata file.");
     }
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala 
b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 7ee96de..c1e952d 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -81,11 +81,11 @@ object Util extends Logging {
     try {
       val appClass = Option(new ApplicationConfig(config).getAppClass)
       if (appClass.isDefined) {
-        Class.forName(appClass.get).getPackage.getImplementationVersion
+        
Option.apply(Class.forName(appClass.get).getPackage.getImplementationVersion).getOrElse(FALLBACK_VERSION)
       } else {
         val taskClass = new TaskConfig(config).getTaskClass
         if (taskClass.isPresent) {
-          Class.forName(taskClass.get()).getPackage.getImplementationVersion
+          
Option.apply(Class.forName(taskClass.get()).getPackage.getImplementationVersion).getOrElse(FALLBACK_VERSION)
         } else {
           warn("Unable to find app class or task class. Defaulting to %s" 
format FALLBACK_VERSION)
           FALLBACK_VERSION

Reply via email to