Repository: hadoop
Updated Branches:
  refs/heads/YARN-2928 f746c80b3 -> fb0acd08e


YARN-3461. Consolidate flow name/version/run defaults. (Sangjin Lee via Varun 
Saxena)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fb0acd08
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fb0acd08
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fb0acd08

Branch: refs/heads/YARN-2928
Commit: fb0acd08e6f0b030d82eeb7cbfa5404376313e60
Parents: f746c80
Author: Varun Saxena <varunsax...@apache.org>
Authored: Thu Apr 7 22:10:11 2016 +0530
Committer: Varun Saxena <varunsax...@apache.org>
Committed: Thu Apr 7 22:10:11 2016 +0530

----------------------------------------------------------------------
 .../mapred/TestMRTimelineEventHandling.java     | 46 +++++++-------
 hadoop-yarn-project/CHANGES.txt                 |  2 +
 .../distributedshell/TestDistributedShell.java  | 18 ++++--
 .../yarn/util/timeline/TimelineUtils.java       |  8 ++-
 .../resourcemanager/amlauncher/AMLauncher.java  | 67 +++++++++++++++-----
 .../RMTimelineCollectorManager.java             | 36 +++++++++--
 .../TestSystemMetricsPublisherForV2.java        | 20 +++---
 .../collector/AppLevelTimelineCollector.java    | 11 +---
 .../collector/NodeTimelineCollectorManager.java | 12 ++++
 .../collector/TimelineCollectorContext.java     |  5 +-
 10 files changed, 150 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb0acd08/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
index a9bbdf5..0481b35 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMRTimelineEventHandling.java
@@ -20,15 +20,12 @@ package org.apache.hadoop.mapred;
 
 import java.io.File;
 import java.io.IOException;
-
 import java.util.EnumSet;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -38,9 +35,9 @@ import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
@@ -48,7 +45,6 @@ import org.apache.hadoop.yarn.server.timeline.TimelineStore;
 import 
org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
 import 
org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
-
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -204,7 +200,7 @@ public class TestMRTimelineEventHandling {
       ApplicationReport appReport = apps.get(0);
       firstAppId = appReport.getApplicationId();
 
-      checkNewTimelineEvent(firstAppId);
+      checkNewTimelineEvent(firstAppId, appReport);
 
       LOG.info("Run 2nd job which should be failed.");
       job = UtilsForTests.runJobFail(new JobConf(conf), inDir, outDir);
@@ -213,11 +209,10 @@ public class TestMRTimelineEventHandling {
       
       apps = yarnClient.getApplications(appStates);
       Assert.assertEquals(apps.size(), 2);
-      
-      ApplicationId secAppId = null;
-      secAppId = apps.get(0).getApplicationId() == firstAppId ? 
-          apps.get(1).getApplicationId() : apps.get(0).getApplicationId();
-      checkNewTimelineEvent(firstAppId);
+
+      appReport = apps.get(0).getApplicationId().equals(firstAppId) ?
+          apps.get(0) : apps.get(1);
+      checkNewTimelineEvent(firstAppId, appReport);
 
     } finally {
       if (cluster != null) {
@@ -234,7 +229,8 @@ public class TestMRTimelineEventHandling {
     }
   }
   
-  private void checkNewTimelineEvent(ApplicationId appId) throws IOException {
+  private void checkNewTimelineEvent(ApplicationId appId,
+      ApplicationReport appReport) throws IOException {
     String tmpRoot =
         FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
             + "/entities/";
@@ -242,15 +238,18 @@ public class TestMRTimelineEventHandling {
     File tmpRootFolder = new File(tmpRoot);
     
     Assert.assertTrue(tmpRootFolder.isDirectory());
-    String basePath = tmpRoot + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" +
-        UserGroupInformation.getCurrentUser().getShortUserName() +
-        "/" + TimelineUtils.generateDefaultFlowNameBasedOnAppId(appId) +
-        "/1/1/" + appId.toString();
+    String basePath = tmpRoot + YarnConfiguration.DEFAULT_RM_CLUSTER_ID +
+        "/" + UserGroupInformation.getCurrentUser().getShortUserName() +
+        "/" + appReport.getName() +
+        "/" + TimelineUtils.DEFAULT_FLOW_VERSION +
+        "/" + appReport.getStartTime() +
+        "/" + appId.toString();
     // for this test, we expect MAPREDUCE_JOB and MAPREDUCE_TASK dirs
     String outputDirJob = basePath + "/MAPREDUCE_JOB/";
 
     File entityFolder = new File(outputDirJob);
-    Assert.assertTrue("Job output directory: " + outputDirJob + " is not 
exist.",
+    Assert.assertTrue("Job output directory: " + outputDirJob +
+        " does not exist.",
         entityFolder.isDirectory());
 
     // check for job event file
@@ -259,13 +258,15 @@ public class TestMRTimelineEventHandling {
 
     String jobEventFilePath = outputDirJob + jobEventFileName;
     File jobEventFile = new File(jobEventFilePath);
-    Assert.assertTrue("jobEventFilePath: " + jobEventFilePath + " is not 
exist.",
+    Assert.assertTrue("jobEventFilePath: " + jobEventFilePath +
+        " does not exist.",
         jobEventFile.exists());
 
     // check for task event file
     String outputDirTask = basePath + "/MAPREDUCE_TASK/";
     File taskFolder = new File(outputDirTask);
-    Assert.assertTrue("Task output directory: " + outputDirTask + " is not 
exist.",
+    Assert.assertTrue("Task output directory: " + outputDirTask +
+        " does not exist.",
         taskFolder.isDirectory());
     
     String taskEventFileName = appId.toString().replaceAll("application", 
"task")
@@ -273,14 +274,15 @@ public class TestMRTimelineEventHandling {
 
     String taskEventFilePath = outputDirTask + taskEventFileName;
     File taskEventFile = new File(taskEventFilePath);
-    Assert.assertTrue("taskEventFileName: " + taskEventFilePath + " is not 
exist.",
+    Assert.assertTrue("taskEventFileName: " + taskEventFilePath +
+        " does not exist.",
         taskEventFile.exists());
     
     // check for task attempt event file
     String outputDirTaskAttempt = basePath + "/MAPREDUCE_TASK_ATTEMPT/";
     File taskAttemptFolder = new File(outputDirTaskAttempt);
     Assert.assertTrue("TaskAttempt output directory: " + outputDirTaskAttempt 
+ 
-        " is not exist.", taskAttemptFolder.isDirectory());
+        " does not exist.", taskAttemptFolder.isDirectory());
     
     String taskAttemptEventFileName = appId.toString().replaceAll(
         "application", "attempt") + "_m_000000_0" + 
@@ -290,7 +292,7 @@ public class TestMRTimelineEventHandling {
         taskAttemptEventFileName;
     File taskAttemptEventFile = new File(taskAttemptEventFilePath);
     Assert.assertTrue("taskAttemptEventFileName: " + taskAttemptEventFilePath +
-        " is not exist.", taskAttemptEventFile.exists());
+        " does not exist.", taskAttemptEventFile.exists());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb0acd08/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index d71f7fd..c66a851 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -173,6 +173,8 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3367. Replace starting a separate thread for post entity with event
     loop in TimelineClient (Naganarasimha G R via sjlee)
 
+    YARN-3461. Consolidate flow name/version/run defaults. (Sangjin Lee via 
varunsaxena)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb0acd08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index f141ca2..15e21f6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -283,13 +283,14 @@ public class TestDistributedShell {
     boolean verified = false;
     String errorMessage = "";
     ApplicationId appId = null;
+    ApplicationReport appReport = null;
     while(!verified) {
       List<ApplicationReport> apps = yarnClient.getApplications();
       if (apps.size() == 0 ) {
         Thread.sleep(10);
         continue;
       }
-      ApplicationReport appReport = apps.get(0);
+      appReport = apps.get(0);
       appId = appReport.getApplicationId();
       if(appReport.getHost().equals("N/A")) {
         Thread.sleep(10);
@@ -316,7 +317,7 @@ public class TestDistributedShell {
     if (!isTestingTimelineV2) {
       checkTimelineV1(haveDomain);
     } else {
-      checkTimelineV2(haveDomain, appId, defaultFlow);
+      checkTimelineV2(haveDomain, appId, defaultFlow, appReport);
     }
   }
 
@@ -366,7 +367,7 @@ public class TestDistributedShell {
   }
 
   private void checkTimelineV2(boolean haveDomain, ApplicationId appId,
-      boolean defaultFlow) throws Exception {
+      boolean defaultFlow, ApplicationReport appReport) throws Exception {
     LOG.info("Started checkTimelineV2 ");
     // For PoC check in /tmp/timeline_service_data YARN-3264
     String tmpRoot =
@@ -379,10 +380,13 @@ public class TestDistributedShell {
       String basePath = tmpRoot +
           YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" +
           UserGroupInformation.getCurrentUser().getShortUserName() +
-          (defaultFlow ? "/" +
-              TimelineUtils.generateDefaultFlowNameBasedOnAppId(appId) +
-              "/1/1/" : "/test_flow_name/test_flow_version/12345678/") +
-              appId.toString();
+          (defaultFlow ?
+              "/" + appReport.getName() + "/" +
+                  TimelineUtils.DEFAULT_FLOW_VERSION +"/" +
+                  appReport.getStartTime() +"/" :
+              "/test_flow_name/test_flow_version/12345678/") +
+          appId.toString();
+      LOG.info("basePath: " + basePath);
       // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
 
       // Verify DS_APP_ATTEMPT entities posted by the client

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb0acd08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
index 53f5af2..ccb7105 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/timeline/TimelineUtils.java
@@ -49,6 +49,7 @@ public class TimelineUtils {
       "TIMELINE_FLOW_VERSION_TAG";
   public static final String FLOW_RUN_ID_TAG_PREFIX =
       "TIMELINE_FLOW_RUN_ID_TAG";
+  public final static String DEFAULT_FLOW_VERSION = "1";
 
   private static ObjectMapper mapper;
 
@@ -127,9 +128,12 @@ public class TimelineUtils {
     return SecurityUtil.buildTokenService(timelineServiceAddr);
   }
 
-  public static String generateDefaultFlowNameBasedOnAppId(
+  public static String generateDefaultFlowName(String appName,
       ApplicationId appId) {
-    return "flow_" + appId.getClusterTimestamp() + "_" + appId.getId();
+    return (appName != null &&
+        !appName.equals(YarnConfiguration.DEFAULT_APPLICATION_NAME)) ?
+        appName :
+        "flow_" + appId.getClusterTimestamp() + "_" + appId.getId();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb0acd08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
index 9a7638c..f5ed662 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@@ -195,6 +196,8 @@ public class AMLauncher implements Runnable {
 
     // Finalize the container
     setupTokens(container, containerID);
+    // set the flow context optionally for timeline service v.2
+    setFlowContext(container);
 
     return container;
   }
@@ -216,15 +219,6 @@ public class AMLauncher implements Runnable {
             .get(applicationId)
             .getSubmitTime()));
 
-    if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
-      // Set flow context info
-      for (String tag :
-          rmContext.getRMApps().get(applicationId).getApplicationTags()) {
-        setFlowTags(environment, TimelineUtils.FLOW_NAME_TAG_PREFIX, tag);
-        setFlowTags(environment, TimelineUtils.FLOW_VERSION_TAG_PREFIX, tag);
-        setFlowTags(environment, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX, tag);
-      }
-    }
     Credentials credentials = new Credentials();
     DataInputByteBuffer dibb = new DataInputByteBuffer();
     ByteBuffer tokens = container.getTokens();
@@ -245,17 +239,58 @@ public class AMLauncher implements Runnable {
     container.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
   }
 
-  private static void setFlowTags(
-      Map<String, String> environment, String tagPrefix, String tag) {
-    if (tag.startsWith(tagPrefix + ":") ||
-        tag.startsWith(tagPrefix.toLowerCase() + ":")) {
-      String value = tag.substring(tagPrefix.length() + 1);
-      if (!value.isEmpty()) {
-        environment.put(tagPrefix, value);
+  private void setFlowContext(ContainerLaunchContext container) {
+    if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+      Map<String, String> environment = container.getEnvironment();
+      ApplicationId applicationId =
+          application.getAppAttemptId().getApplicationId();
+      RMApp app = rmContext.getRMApps().get(applicationId);
+
+      // initialize the flow in the environment with default values for those
+      // that do not specify the flow tags
+      // flow name: app name (or app id if app name is missing),
+      // flow version: "1", flow run id: start time
+      setFlowTags(environment, TimelineUtils.FLOW_NAME_TAG_PREFIX,
+          TimelineUtils.generateDefaultFlowName(app.getName(), applicationId));
+      setFlowTags(environment, TimelineUtils.FLOW_VERSION_TAG_PREFIX,
+          TimelineUtils.DEFAULT_FLOW_VERSION);
+      setFlowTags(environment, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX,
+          String.valueOf(app.getStartTime()));
+
+      // Set flow context info: the flow context is received via the 
application
+      // tags
+      for (String tag : app.getApplicationTags()) {
+        String[] parts = tag.split(":", 2);
+        if (parts.length != 2 || parts[1].isEmpty()) {
+          continue;
+        }
+        switch (parts[0].toUpperCase()) {
+        case TimelineUtils.FLOW_NAME_TAG_PREFIX:
+          setFlowTags(environment, TimelineUtils.FLOW_NAME_TAG_PREFIX,
+              parts[1]);
+          break;
+        case TimelineUtils.FLOW_VERSION_TAG_PREFIX:
+          setFlowTags(environment, TimelineUtils.FLOW_VERSION_TAG_PREFIX,
+              parts[1]);
+          break;
+        case TimelineUtils.FLOW_RUN_ID_TAG_PREFIX:
+          setFlowTags(environment, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX,
+              parts[1]);
+          break;
+        default:
+          break;
+        }
       }
     }
   }
 
+  private static void setFlowTags(
+      Map<String, String> environment, String tagPrefix, String value) {
+    if (!value.isEmpty()) {
+      environment.put(tagPrefix, value);
+    }
+  }
+
   @VisibleForTesting
   protected Token<AMRMTokenIdentifier> createAndSetAMRMToken() {
     Token<AMRMTokenIdentifier> amrmToken =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb0acd08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
index ff055a1..a4f1084 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.timelineservice;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -25,6 +27,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
 import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
@@ -35,6 +38,9 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class RMTimelineCollectorManager extends TimelineCollectorManager {
+  private static final Log LOG =
+      LogFactory.getLog(RMTimelineCollectorManager.class);
+
   private RMContext rmContext;
 
   public RMTimelineCollectorManager(RMContext rmContext) {
@@ -51,9 +57,21 @@ public class RMTimelineCollectorManager extends 
TimelineCollectorManager {
           "non-existing app " + appId);
     }
     String userId = app.getUser();
+    TimelineCollectorContext context = collector.getTimelineEntityContext();
     if (userId != null && !userId.isEmpty()) {
-      collector.getTimelineEntityContext().setUserId(userId);
+      context.setUserId(userId);
     }
+
+    // initialize the flow in the environment with default values for those
+    // that do not specify the flow tags
+    // flow name: app name (or app id if app name is missing),
+    // flow version: "1", flow run id: start time
+    context.setFlowName(TimelineUtils.generateDefaultFlowName(
+        app.getName(), appId));
+    context.setFlowVersion(TimelineUtils.DEFAULT_FLOW_VERSION);
+    context.setFlowRunId(app.getStartTime());
+
+    // the flow context is received via the application tags
     for (String tag : app.getApplicationTags()) {
       String[] parts = tag.split(":", 2);
       if (parts.length != 2 || parts[1].isEmpty()) {
@@ -61,14 +79,22 @@ public class RMTimelineCollectorManager extends 
TimelineCollectorManager {
       }
       switch (parts[0].toUpperCase()) {
       case TimelineUtils.FLOW_NAME_TAG_PREFIX:
-        collector.getTimelineEntityContext().setFlowName(parts[1]);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting the flow name: " + parts[1]);
+        }
+        context.setFlowName(parts[1]);
         break;
       case TimelineUtils.FLOW_VERSION_TAG_PREFIX:
-        collector.getTimelineEntityContext().setFlowVersion(parts[1]);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting the flow version: " + parts[1]);
+        }
+        context.setFlowVersion(parts[1]);
         break;
       case TimelineUtils.FLOW_RUN_ID_TAG_PREFIX:
-        collector.getTimelineEntityContext().setFlowRunId(
-            Long.parseLong(parts[1]));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting the flow run id: " + parts[1]);
+        }
+        context.setFlowRunId(Long.parseLong(parts[1]));
         break;
       default:
         break;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb0acd08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
index 1a58cb2..7a018ca 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisherForV2.java
@@ -83,8 +83,6 @@ public class TestSystemMetricsPublisherForV2 {
 
   private static TimelineServiceV2Publisher metricsPublisher;
   private static DrainDispatcher dispatcher = new DrainDispatcher();
-  private static final String DEFAULT_FLOW_VERSION = "1";
-  private static final long DEFAULT_FLOW_RUN = 1;
 
   private static ConcurrentMap<ApplicationId, RMApp> rmAppsMapInContext;
 
@@ -316,16 +314,14 @@ public class TestSystemMetricsPublisherForV2 {
 
   private String getTimelineEntityDir(RMApp app) {
     String outputDirApp =
-        testRootDir.getAbsolutePath()+"/"
-            + FileSystemTimelineWriterImpl.ENTITIES_DIR
-            + "/"
-            + YarnConfiguration.DEFAULT_RM_CLUSTER_ID
-            + "/"
-            + app.getUser()
-            + "/"
-            + TimelineUtils.generateDefaultFlowNameBasedOnAppId(app
-                .getApplicationId()) + "/" + DEFAULT_FLOW_VERSION + "/"
-            + DEFAULT_FLOW_RUN + "/" + app.getApplicationId();
+        testRootDir.getAbsolutePath() + "/"
+            + FileSystemTimelineWriterImpl.ENTITIES_DIR + "/"
+            + YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/"
+            + app.getUser() + "/"
+            + app.getName() + "/"
+            + TimelineUtils.DEFAULT_FLOW_VERSION + "/"
+            + app.getStartTime() + "/"
+            + app.getApplicationId();
     return outputDirApp;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb0acd08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
index 36dd7b0..4fe445a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
@@ -18,14 +18,14 @@
 
 package org.apache.hadoop.yarn.server.timelineservice.collector;
 
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+import com.google.common.base.Preconditions;
 
 /**
  * Service that handles writes to the timeline service and writes them to the
@@ -54,13 +54,6 @@ public class AppLevelTimelineCollector extends 
TimelineCollector {
     // context info from NM.
     // Current user usually is not the app user, but keep this field non-null
     
context.setUserId(UserGroupInformation.getCurrentUser().getShortUserName());
-    // Use app ID to generate a default flow name for orphan app
-    context.setFlowName(
-        TimelineUtils.generateDefaultFlowNameBasedOnAppId(appId));
-    // Set the flow version to string 1 if it's an orphan app
-    context.setFlowVersion("1");
-    // Set the flow run ID to 1 if it's an orphan app
-    context.setFlowRunId(1L);
     context.setAppId(appId.toString());
     super.serviceInit(conf);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb0acd08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
index 785fb19..75557a8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
@@ -164,18 +164,30 @@ public class NodeTimelineCollectorManager extends 
TimelineCollectorManager {
         getNMCollectorService().getTimelineCollectorContext(request);
     String userId = response.getUserId();
     if (userId != null && !userId.isEmpty()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting the user in the context: " + userId);
+      }
       collector.getTimelineEntityContext().setUserId(userId);
     }
     String flowName = response.getFlowName();
     if (flowName != null && !flowName.isEmpty()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting the flow name: " + flowName);
+      }
       collector.getTimelineEntityContext().setFlowName(flowName);
     }
     String flowVersion = response.getFlowVersion();
     if (flowVersion != null && !flowVersion.isEmpty()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting the flow version: " + flowVersion);
+      }
       collector.getTimelineEntityContext().setFlowVersion(flowVersion);
     }
     long flowRunId = response.getFlowRunId();
     if (flowRunId != 0L) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Setting the flow run id: " + flowRunId);
+      }
       collector.getTimelineEntityContext().setFlowRunId(flowRunId);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb0acd08/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java
index 58d68df..981ee2a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorContext.java
@@ -19,12 +19,12 @@
 package org.apache.hadoop.yarn.server.timelineservice.collector;
 
 import org.apache.hadoop.yarn.server.timelineservice.TimelineContext;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
 /**
  * Encapsulates context information required by collector during a put.
  */
 public class TimelineCollectorContext extends TimelineContext {
-
   private String flowVersion;
 
   public TimelineCollectorContext() {
@@ -34,7 +34,8 @@ public class TimelineCollectorContext extends TimelineContext 
{
   public TimelineCollectorContext(String clusterId, String userId,
       String flowName, String flowVersion, Long flowRunId, String appId) {
     super(clusterId, userId, flowName, flowRunId, appId);
-    this.flowVersion = flowVersion;
+    this.flowVersion = flowVersion == null ?
+        TimelineUtils.DEFAULT_FLOW_VERSION : flowVersion;
   }
 
   @Override

Reply via email to