YARN-5110. Fix OpportunisticContainerAllocator to insert complete HostAddress 
in issued ContainerTokenIds. (Konstantinos Karanasos via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/15976306
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/15976306
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/15976306

Branch: refs/heads/YARN-4757
Commit: 1597630681c784a3d59f5605b87e96197b8139d7
Parents: 010e6ac
Author: Arun Suresh <asur...@apache.org>
Authored: Wed May 18 18:46:00 2016 -0700
Committer: Arun Suresh <asur...@apache.org>
Committed: Wed May 18 18:46:00 2016 -0700

----------------------------------------------------------------------
 .../api/records/QueuedContainersStatus.java     |  2 +-
 .../queuing/QueuingContainerManagerImpl.java    |  1 +
 .../nodemanager/scheduler/LocalScheduler.java   |  6 +-
 .../OpportunisticContainerAllocator.java        | 17 +++---
 .../scheduler/TestLocalScheduler.java           |  9 ++-
 .../distributed/NodeQueueLoadMonitor.java       | 14 +++--
 .../hadoop/yarn/server/MiniYARNCluster.java     | 62 +++++++++++++++++++-
 7 files changed, 91 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/15976306/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java
index a7f0ece..fb567d5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java
@@ -41,5 +41,5 @@ public abstract class QueuedContainersStatus {
 
   public abstract int getWaitQueueLength();
 
-  public abstract void setWaitQueueLength(int queueWaitTime);
+  public abstract void setWaitQueueLength(int waitQueueLength);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15976306/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
index 4f9d5a3..707051f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
@@ -200,6 +200,7 @@ public class QueuingContainerManagerImpl extends 
ContainerManagerImpl {
         .getContainerTokenIdentifier().getContainerID();
     this.context.getQueuingContext().getQueuedContainers().remove(containerId);
     try {
+      LOG.info("Starting container [" + containerId + "]");
       super.startContainerInternal(
           allocatedContainerInfo.getContainerTokenIdentifier(),
           allocatedContainerInfo.getStartRequest());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15976306/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
index 42c1dcd..fca814b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
@@ -337,8 +337,10 @@ public final class LocalScheduler extends 
AbstractRequestInterceptor {
   @Override
   public DistSchedAllocateResponse allocateForDistributedScheduling
       (AllocateRequest request) throws YarnException, IOException {
-    LOG.info("Forwarding allocate request to the" +
-        "Distributed Scheduler Service on YARN RM");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Forwarding allocate request to the" +
+          "Distributed Scheduler Service on YARN RM");
+    }
     // Partition requests into GUARANTEED and OPPORTUNISTIC reqs
     PartitionedResourceRequests partitionedAsks = partitionAskList(request
         .getAskList());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15976306/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
index 03ba61d..e33c389 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java
@@ -84,14 +84,14 @@ public class OpportunisticContainerAllocator {
       Map<String, NodeId> allNodes, String userName) throws YarnException {
     Map<Resource, List<Container>> containers = new HashMap<>();
     Set<String> nodesAllocated = new HashSet<>();
-    int numAsks = resourceAsks.size();
     for (ResourceRequest anyAsk : resourceAsks) {
       allocateOpportunisticContainers(appParams, idCounter, blacklist, 
appAttId,
           allNodes, userName, containers, nodesAllocated, anyAsk);
-    }
-    if (numAsks > 0) {
-      LOG.info("Opportunistic allocation requested for: " + numAsks
-          + " containers; allocated = " + containers.size());
+      LOG.info("Opportunistic allocation requested for ["
+          + "priority=" + anyAsk.getPriority()
+          + ", num_containers=" + anyAsk.getNumContainers()
+          + ", capability=" + anyAsk.getCapability() + "]"
+          + " allocated = " + containers.get(anyAsk.getCapability()).size());
     }
     return containers;
   }
@@ -129,8 +129,9 @@ public class OpportunisticContainerAllocator {
       }
       cList.add(container);
       numAllocated++;
-      LOG.info("Allocated " + numAllocated + " opportunistic containers.");
+      LOG.info("Allocated [" + container.getId() + "] as opportunistic.");
     }
+    LOG.info("Allocated " + numAllocated + " opportunistic containers.");
   }
 
   private Container buildContainer(DistSchedulerParams appParams,
@@ -146,8 +147,8 @@ public class OpportunisticContainerAllocator {
     long currTime = System.currentTimeMillis();
     ContainerTokenIdentifier containerTokenIdentifier =
         new ContainerTokenIdentifier(
-            cId, nodeId.getHost(), userName, capability,
-            currTime + appParams.containerTokenExpiryInterval,
+            cId, nodeId.getHost() + ":" + nodeId.getPort(), userName,
+            capability, currTime + appParams.containerTokenExpiryInterval,
             
context.getContainerTokenSecretManager().getCurrentKey().getKeyId(),
             nodeStatusUpdater.getRMIdentifier(), rr.getPriority(), currTime,
             null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15976306/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
index efc682a..e987e79 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.nodemanager.scheduler;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -45,6 +46,7 @@ import org.apache.hadoop.yarn.server.nodemanager.security
     .NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security
     .NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.junit.Test;
@@ -196,9 +198,14 @@ public class TestLocalScheduler {
   }
 
   private Map<NodeId, List<ContainerId>> mapAllocs(AllocateResponse
-      allocateResponse) {
+      allocateResponse) throws Exception {
     Map<NodeId, List<ContainerId>> allocs = new HashMap<>();
     for (Container c : allocateResponse.getAllocatedContainers()) {
+      ContainerTokenIdentifier cTokId = BuilderUtils
+          .newContainerTokenIdentifier(c.getContainerToken());
+      Assert.assertEquals(
+          c.getNodeId().getHost() + ":" + c.getNodeId().getPort(),
+          cTokId.getNmHostAddress());
       List<ContainerId> cIds = allocs.get(c.getNodeId());
       if (cIds == null) {
         cIds = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15976306/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
index 21f4f6e..017a256 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
@@ -195,11 +195,11 @@ public class NodeQueueLoadMonitor implements 
ClusterMonitor {
               new ClusterNode(rmNode.getNodeID())
                   .setQueueWaitTime(estimatedQueueWaitTime)
                   .setQueueLength(waitQueueLength));
-          LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" +
+          LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "] " +
               "with queue wait time [" + estimatedQueueWaitTime + "] and " +
               "wait queue length [" + waitQueueLength + "]");
         } else {
-          LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "]" +
+          LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "] " +
               "with queue wait time [" + estimatedQueueWaitTime + "] and " +
               "wait queue length [" + waitQueueLength + "]");
         }
@@ -210,12 +210,14 @@ public class NodeQueueLoadMonitor implements 
ClusterMonitor {
               .setQueueWaitTime(estimatedQueueWaitTime)
               .setQueueLength(waitQueueLength)
               .updateTimestamp();
-          LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" +
-              "with queue wait time [" + estimatedQueueWaitTime + "] and " +
-              "wait queue length [" + waitQueueLength + "]");
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Updating ClusterNode [" + rmNode.getNodeID() + "] " +
+                "with queue wait time [" + estimatedQueueWaitTime + "] and " +
+                "wait queue length [" + waitQueueLength + "]");
+          }
         } else {
           this.clusterNodes.remove(rmNode.getNodeID());
-          LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" +
+          LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "] " +
               "with queue wait time [" + currentNode.queueWaitTime + "] and " +
               "wait queue length [" + currentNode.queueLength + "]");
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/15976306/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
index 2372ea2..de4e22d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
@@ -75,6 +75,9 @@ import 
org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService;
 import 
org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor;
 import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing.QueuingContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
@@ -713,8 +716,14 @@ public class MiniYARNCluster extends CompositeService {
         ContainerExecutor exec, DeletionService del,
         NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager 
aclsManager,
         LocalDirsHandlerService dirsHandler) {
-      return new CustomContainerManagerImpl(context, exec, del,
-          nodeStatusUpdater, metrics, dirsHandler);
+      if 
(getConfig().getBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED,
+          YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED_DEFAULT)) {
+        return new CustomQueueingContainerManagerImpl(context, exec, del,
+            nodeStatusUpdater, metrics, dirsHandler);
+      } else {
+        return new CustomContainerManagerImpl(context, exec, del,
+            nodeStatusUpdater, metrics, dirsHandler);
+      }
     }
   }
 
@@ -846,6 +855,55 @@ public class MiniYARNCluster extends CompositeService {
     }
   }
 
+  private class CustomQueueingContainerManagerImpl extends
+      QueuingContainerManagerImpl {
+
+    public CustomQueueingContainerManagerImpl(Context context,
+        ContainerExecutor exec, DeletionService del, NodeStatusUpdater
+        nodeStatusUpdater, NodeManagerMetrics metrics,
+        LocalDirsHandlerService dirsHandler) {
+      super(context, exec, del, nodeStatusUpdater, metrics, dirsHandler);
+    }
+
+    @Override
+    protected ContainersMonitor createContainersMonitor(ContainerExecutor
+        exec) {
+      return new ContainersMonitorImpl(exec, dispatcher, this.context) {
+
+        @Override
+        public void increaseContainersAllocation(ProcessTreeInfo pti) { }
+
+        @Override
+        public void decreaseContainersAllocation(ProcessTreeInfo pti) { }
+
+        @Override
+        public boolean hasResourcesAvailable(
+            ContainersMonitorImpl.ProcessTreeInfo pti) {
+          return true;
+        }
+      };
+    }
+
+    @Override
+    protected void createAMRMProxyService(Configuration conf) {
+      this.amrmProxyEnabled =
+          conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
+              YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
+
+      if (this.amrmProxyEnabled) {
+        LOG.info("CustomAMRMProxyService is enabled. "
+            + "All the AM->RM requests will be intercepted by the proxy");
+        AMRMProxyService amrmProxyService =
+            useRpc ? new AMRMProxyService(getContext(), dispatcher)
+                : new ShortCircuitedAMRMProxy(getContext(), dispatcher);
+        this.setAMRMProxyService(amrmProxyService);
+        addService(this.getAMRMProxyService());
+      } else {
+        LOG.info("CustomAMRMProxyService is disabled");
+      }
+    }
+  }
+
   private class ShortCircuitedAMRMProxy extends AMRMProxyService {
 
     public ShortCircuitedAMRMProxy(Context context,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to