This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 8621db6 SAMZA-2239 : Using jsonserde to write metadata file (#1071)
8621db6 is described below
commit 8621db68b64f3d4f2c1a2aa986e3b1ea0dd80969
Author: rmatharu <[email protected]>
AuthorDate: Fri Jun 7 11:43:32 2019 -0700
SAMZA-2239 : Using jsonserde to write metadata file (#1071)
* Using jsonserde to write metadata file
* Getting rid of public static class
---
.../java/org/apache/samza/util/DiagnosticsUtil.java | 21 +++++++++++++++------
.../src/main/scala/org/apache/samza/util/Util.scala | 4 ++--
2 files changed, 17 insertions(+), 8 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index 6060720..0b680c4 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -37,6 +37,7 @@ import org.apache.samza.metrics.reporter.MetricsHeader;
import org.apache.samza.metrics.reporter.MetricsSnapshot;
import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
import org.apache.samza.runtime.LocalContainerRunner;
+import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
@@ -49,26 +50,34 @@ import scala.Option;
public class DiagnosticsUtil {
private static final Logger log =
LoggerFactory.getLogger(DiagnosticsUtil.class);
+
// Write a file in the samza.log.dir named {exec-env-container-id}.metadata
that contains
// metadata about the container such as containerId, jobName, jobId,
hostname, timestamp, version info, and others.
+ // The file contents are serialized using {@link JsonSerde}.
public static void writeMetadataFile(String jobName, String jobId, String
containerId,
Optional<String> execEnvContainerId, Config config) {
Option<File> metadataFile =
JobConfig.getMetadataFile(Option.apply(execEnvContainerId.orElse(null)));
if (metadataFile.isDefined()) {
-
- StringBuilder metadata = new StringBuilder("Version: 1");
- metadata.append(System.lineSeparator());
MetricsHeader metricsHeader =
new MetricsHeader(jobName, jobId, "samza-container-" + containerId,
execEnvContainerId.orElse(""), LocalContainerRunner.class.getName(),
Util.getTaskClassVersion(config), Util.getSamzaVersion(),
Util.getLocalHost().getHostName(),
System.currentTimeMillis(), System.currentTimeMillis());
+ class MetadataFileContents {
+ public final String version;
+ public final String metricsSnapshot;
+
+ public MetadataFileContents(String version, String metricsSnapshot) {
+ this.version = version;
+ this.metricsSnapshot = metricsSnapshot;
+ }
+ }
+
MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new
Metrics());
- metadata.append("ContainerMetadata: ");
- metadata.append(new String(new
MetricsSnapshotSerdeV2().toBytes(metricsSnapshot)));
- FileUtil.writeToTextFile(metadataFile.get(), metadata.toString(), false);
+ MetadataFileContents metadataFileContents = new
MetadataFileContents("1", new String(new
MetricsSnapshotSerdeV2().toBytes(metricsSnapshot)));
+ FileUtil.writeToTextFile(metadataFile.get(), new String(new
JsonSerde<>().toBytes(metadataFileContents)), false);
} else {
log.info("Skipping writing metadata file.");
}
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 7ee96de..c1e952d 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -81,11 +81,11 @@ object Util extends Logging {
try {
val appClass = Option(new ApplicationConfig(config).getAppClass)
if (appClass.isDefined) {
- Class.forName(appClass.get).getPackage.getImplementationVersion
+
Option.apply(Class.forName(appClass.get).getPackage.getImplementationVersion).getOrElse(FALLBACK_VERSION)
} else {
val taskClass = new TaskConfig(config).getTaskClass
if (taskClass.isPresent) {
- Class.forName(taskClass.get()).getPackage.getImplementationVersion
+
Option.apply(Class.forName(taskClass.get()).getPackage.getImplementationVersion).getOrElse(FALLBACK_VERSION)
} else {
warn("Unable to find app class or task class. Defaulting to %s"
format FALLBACK_VERSION)
FALLBACK_VERSION