YARN-6555. Store application flow context in NM state store for work-preserving 
restart. (Rohith Sharma K S via Haibo Chen)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/47474fff
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/47474fff
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/47474fff

Branch: refs/heads/YARN-1011
Commit: 47474fffac085e0e5ea46336bf80ccd0677017a3
Parents: 2b5ad48
Author: Haibo Chen <haiboc...@apache.org>
Authored: Thu May 25 21:15:27 2017 -0700
Committer: Haibo Chen <haiboc...@apache.org>
Committed: Thu May 25 21:15:27 2017 -0700

----------------------------------------------------------------------
 .../containermanager/ContainerManagerImpl.java  | 71 +++++++++++++-------
 .../application/ApplicationImpl.java            | 27 ++++++--
 .../yarn_server_nodemanager_recovery.proto      |  7 ++
 .../TestContainerManagerRecovery.java           | 40 +++++++++--
 4 files changed, 111 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/47474fff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index f65f1ac..50268b9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -85,6 +85,7 @@ import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationACLMapProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
+import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ContainerType;
@@ -381,10 +382,20 @@ public class ContainerManagerImpl extends 
CompositeService implements
           new LogAggregationContextPBImpl(p.getLogAggregationContext());
     }
 
+    FlowContext fc = null;
+    if (p.getFlowContext() != null) {
+      FlowContextProto fcp = p.getFlowContext();
+      fc = new FlowContext(fcp.getFlowName(), fcp.getFlowVersion(),
+          fcp.getFlowRunId());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Recovering Flow context: " + fc + " for an application " + appId);
+      }
+    }
+
     LOG.info("Recovering application " + appId);
-    //TODO: Recover flow and flow run ID
-    ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
-        creds, context, p.getAppLogAggregationInitedTime());
+    ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), fc,
+        appId, creds, context, p.getAppLogAggregationInitedTime());
     context.getApplications().put(appId, app);
     app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
   }
@@ -936,7 +947,7 @@ public class ContainerManagerImpl extends CompositeService 
implements
   private ContainerManagerApplicationProto buildAppProto(ApplicationId appId,
       String user, Credentials credentials,
       Map<ApplicationAccessType, String> appAcls,
-      LogAggregationContext logAggregationContext) {
+      LogAggregationContext logAggregationContext, FlowContext flowContext) {
 
     ContainerManagerApplicationProto.Builder builder =
         ContainerManagerApplicationProto.newBuilder();
@@ -971,6 +982,16 @@ public class ContainerManagerImpl extends CompositeService 
implements
       }
     }
 
+    builder.clearFlowContext();
+    if (flowContext != null && flowContext.getFlowName() != null
+        && flowContext.getFlowVersion() != null) {
+      FlowContextProto fcp =
+          FlowContextProto.newBuilder().setFlowName(flowContext.getFlowName())
+              .setFlowVersion(flowContext.getFlowVersion())
+              .setFlowRunId(flowContext.getFlowRunId()).build();
+      builder.setFlowContext(fcp);
+    }
+
     return builder.build();
   }
 
@@ -1016,25 +1037,29 @@ public class ContainerManagerImpl extends 
CompositeService implements
     this.readLock.lock();
     try {
       if (!isServiceStopped()) {
-        // Create the application
-        // populate the flow context from the launch context if the timeline
-        // service v.2 is enabled
-        FlowContext flowContext = null;
-        if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
-          String flowName = launchContext.getEnvironment().get(
-              TimelineUtils.FLOW_NAME_TAG_PREFIX);
-          String flowVersion = launchContext.getEnvironment().get(
-              TimelineUtils.FLOW_VERSION_TAG_PREFIX);
-          String flowRunIdStr = launchContext.getEnvironment().get(
-              TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
-          long flowRunId = 0L;
-          if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
-            flowRunId = Long.parseLong(flowRunIdStr);
-          }
-          flowContext =
-              new FlowContext(flowName, flowVersion, flowRunId);
-        }
         if (!context.getApplications().containsKey(applicationID)) {
+          // Create the application
+          // populate the flow context from the launch context if the timeline
+          // service v.2 is enabled
+          FlowContext flowContext = null;
+          if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
+            String flowName = launchContext.getEnvironment()
+                .get(TimelineUtils.FLOW_NAME_TAG_PREFIX);
+            String flowVersion = launchContext.getEnvironment()
+                .get(TimelineUtils.FLOW_VERSION_TAG_PREFIX);
+            String flowRunIdStr = launchContext.getEnvironment()
+                .get(TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
+            long flowRunId = 0L;
+            if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
+              flowRunId = Long.parseLong(flowRunIdStr);
+            }
+            flowContext = new FlowContext(flowName, flowVersion, flowRunId);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Flow context: " + flowContext
+                  + " created for an application " + applicationID);
+            }
+          }
+
           Application application =
               new ApplicationImpl(dispatcher, user, flowContext,
                   applicationID, credentials, context);
@@ -1048,7 +1073,7 @@ public class ContainerManagerImpl extends 
CompositeService implements
                 container.getLaunchContext().getApplicationACLs();
             context.getNMStateStore().storeApplication(applicationID,
                 buildAppProto(applicationID, user, credentials, appAcls,
-                    logAggregationContext));
+                    logAggregationContext, flowContext));
             dispatcher.getEventHandler().handle(new ApplicationInitEvent(
                 applicationID, appAcls, logAggregationContext));
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/47474fff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index 444581c..80863a1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.proto.YarnProtos;
 import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
+import 
org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
@@ -106,13 +107,6 @@ public class ApplicationImpl implements Application {
   }
 
   public ApplicationImpl(Dispatcher dispatcher, String user,
-      ApplicationId appId, Credentials credentials, Context context,
-      long recoveredLogInitedTime) {
-    this(dispatcher, user, null, appId, credentials, context,
-      recoveredLogInitedTime);
-  }
-
-  public ApplicationImpl(Dispatcher dispatcher, String user,
       FlowContext flowContext, ApplicationId appId, Credentials credentials,
       Context context, long recoveredLogInitedTime) {
     this.dispatcher = dispatcher;
@@ -171,6 +165,15 @@ public class ApplicationImpl implements Application {
     public long getFlowRunId() {
       return flowRunId;
     }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder("{");
+      sb.append("Flow Name=").append(getFlowName());
+      sb.append(" Flow Versioin=").append(getFlowVersion());
+      sb.append(" Flow Run Id=").append(getFlowRunId()).append(" }");
+      return sb.toString();
+    }
   }
 
   @Override
@@ -390,6 +393,16 @@ public class ApplicationImpl implements Application {
 
     builder.setAppLogAggregationInitedTime(app.applicationLogInitedTimestamp);
 
+    builder.clearFlowContext();
+    if (app.flowContext != null && app.flowContext.getFlowName() != null
+        && app.flowContext.getFlowVersion() != null) {
+      FlowContextProto fcp = FlowContextProto.newBuilder()
+          .setFlowName(app.flowContext.getFlowName())
+          .setFlowVersion(app.flowContext.getFlowVersion())
+          .setFlowRunId(app.flowContext.getFlowRunId()).build();
+      builder.setFlowContext(fcp);
+    }
+
     return builder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/47474fff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
index 0dfa20e..7831711 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/proto/yarn_server_nodemanager_recovery.proto
@@ -31,6 +31,7 @@ message ContainerManagerApplicationProto {
   repeated ApplicationACLMapProto acls = 4;
   optional LogAggregationContextProto log_aggregation_context = 5;
   optional int64 appLogAggregationInitedTime = 6 [ default = -1 ];
+  optional FlowContextProto flowContext = 7;
 }
 
 message DeletionServiceDeleteTaskProto {
@@ -52,3 +53,9 @@ message LogDeleterProto {
   optional string user = 1;
   optional int64 deletionTime = 2;
 }
+
+message FlowContextProto {
+  optional string flowName = 1;
+  optional string flowVersion = 2;
+  optional int64 flowRunId = 3;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/47474fff/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index 633bb6d..075d857 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -107,6 +107,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecret
 import 
org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
 import 
org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -136,6 +137,11 @@ public class TestContainerManagerRecovery extends 
BaseContainerManagerTest {
     conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
     conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, 
remoteLogDir.getAbsolutePath());
     conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
+
+    // enable atsv2 by default in test
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+
     // Default delSrvc
     delSrvc = createDeletionService();
     delSrvc.init(conf);
@@ -144,6 +150,7 @@ public class TestContainerManagerRecovery extends 
BaseContainerManagerTest {
     nodeHealthChecker = new NodeHealthCheckerService(
         NodeManager.getNodeHealthScriptRunner(conf), dirsHandler);
     nodeHealthChecker.init(conf);
+
   }
 
   @Test
@@ -161,6 +168,7 @@ public class TestContainerManagerRecovery extends 
BaseContainerManagerTest {
     cm.start();
 
     // add an application by starting a container
+    String appName = "app_name1";
     String appUser = "app_user1";
     String modUser = "modify_user1";
     String viewUser = "view_user1";
@@ -170,7 +178,8 @@ public class TestContainerManagerRecovery extends 
BaseContainerManagerTest {
         ApplicationAttemptId.newInstance(appId, 1);
     ContainerId cid = ContainerId.newContainerId(attemptId, 1);
     Map<String, LocalResource> localResources = Collections.emptyMap();
-    Map<String, String> containerEnv = Collections.emptyMap();
+    Map<String, String> containerEnv = new HashMap<>();
+    setFlowContext(containerEnv, appName, appId);
     List<String> containerCmds = Collections.emptyList();
     Map<String, ByteBuffer> serviceData = Collections.emptyMap();
     Credentials containerCreds = new Credentials();
@@ -318,7 +327,8 @@ public class TestContainerManagerRecovery extends 
BaseContainerManagerTest {
         ApplicationAttemptId.newInstance(appId, 1);
     ContainerId cid = ContainerId.newContainerId(attemptId, 1);
     Map<String, LocalResource> localResources = Collections.emptyMap();
-    Map<String, String> containerEnv = Collections.emptyMap();
+    Map<String, String> containerEnv = new HashMap<>();
+    setFlowContext(containerEnv, "app_name1", appId);
     List<String> containerCmds = Collections.emptyList();
     Map<String, ByteBuffer> serviceData = Collections.emptyMap();
 
@@ -400,7 +410,8 @@ public class TestContainerManagerRecovery extends 
BaseContainerManagerTest {
     ApplicationAttemptId attemptId =
         ApplicationAttemptId.newInstance(appId, 1);
     ContainerId cid = ContainerId.newContainerId(attemptId, 1);
-    Map<String, String> containerEnv = Collections.emptyMap();
+    Map<String, String> containerEnv = new HashMap<>();
+    setFlowContext(containerEnv, "app_name1", appId);
     Map<String, ByteBuffer> serviceData = Collections.emptyMap();
     Credentials containerCreds = new Credentials();
     DataOutputBuffer dob = new DataOutputBuffer();
@@ -476,7 +487,8 @@ public class TestContainerManagerRecovery extends 
BaseContainerManagerTest {
         ApplicationAttemptId.newInstance(appId, 1);
     ContainerId cid = ContainerId.newContainerId(attemptId, 1);
     Map<String, LocalResource> localResources = Collections.emptyMap();
-    Map<String, String> containerEnv = Collections.emptyMap();
+    Map<String, String> containerEnv = new HashMap<>();
+    setFlowContext(containerEnv, "app_name1", appId);
     List<String> containerCmds = Collections.emptyList();
     Map<String, ByteBuffer> serviceData = Collections.emptyMap();
     Credentials containerCreds = new Credentials();
@@ -760,4 +772,24 @@ public class TestContainerManagerRecovery extends 
BaseContainerManagerTest {
     containerManager.dispatcher.disableExitOnDispatchException();
     return containerManager;
   }
+
+  private void setFlowContext(Map<String, String> containerEnv, String appName,
+      ApplicationId appId) {
+    if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+      setFlowTags(containerEnv, TimelineUtils.FLOW_NAME_TAG_PREFIX,
+          TimelineUtils.generateDefaultFlowName(appName, appId));
+      setFlowTags(containerEnv, TimelineUtils.FLOW_VERSION_TAG_PREFIX,
+          TimelineUtils.DEFAULT_FLOW_VERSION);
+      setFlowTags(containerEnv, TimelineUtils.FLOW_RUN_ID_TAG_PREFIX,
+          String.valueOf(System.currentTimeMillis()));
+    }
+  }
+
+  private static void setFlowTags(Map<String, String> environment,
+      String tagPrefix, String value) {
+    if (!value.isEmpty()) {
+      environment.put(tagPrefix, value);
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to