YARN-4412. Create ClusterMonitor to compute ordered list of preferred NMs for 
OPPORTUNITIC containers. (asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/341888a0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/341888a0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/341888a0

Branch: refs/heads/HADOOP-12930
Commit: 341888a0aa23f24458b4e6e34868794b9735c06a
Parents: 68b4564
Author: Arun Suresh <asur...@apache.org>
Authored: Tue Apr 26 20:12:12 2016 -0700
Committer: Arun Suresh <asur...@apache.org>
Committed: Tue Apr 26 20:12:12 2016 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/sls/nodemanager/NodeInfo.java   |   5 +
 .../yarn/sls/scheduler/RMNodeWrapper.java       |   5 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |  17 ++
 .../yarn/conf/TestYarnConfigurationFields.java  |  25 +++
 .../hadoop/yarn/event/EventDispatcher.java      | 137 ++++++++++++
 .../yarn/server/api/records/NodeStatus.java     |   9 +
 .../api/records/QueuedContainersStatus.java     |  45 ++++
 .../api/records/impl/pb/NodeStatusPBImpl.java   |  40 +++-
 .../impl/pb/QueuedContainersStatusPBImpl.java   |  80 +++++++
 .../main/proto/yarn_server_common_protos.proto  |   6 +
 .../protocolrecords/TestProtocolRecords.java    |  30 +++
 .../nodemanager/NodeStatusUpdaterImpl.java      |   8 +
 .../server/resourcemanager/ClusterMonitor.java  |  36 +++
 .../DistributedSchedulingService.java           | 151 ++++++++++++-
 .../server/resourcemanager/ResourceManager.java | 122 ++--------
 .../server/resourcemanager/rmnode/RMNode.java   |   4 +
 .../resourcemanager/rmnode/RMNodeImpl.java      |  29 ++-
 .../rmnode/RMNodeStatusEvent.java               |   7 +
 .../scheduler/distributed/TopKNodeSelector.java | 223 +++++++++++++++++++
 .../yarn/server/resourcemanager/MockNodes.java  |   6 +-
 .../resourcemanager/TestApplicationCleanup.java |   7 +-
 .../TestDistributedSchedulingService.java       |   5 +
 .../resourcemanager/TestRMDispatcher.java       |   6 +-
 .../TestResourceTrackerService.java             |   4 +-
 .../TestAMRMRPCNodeUpdates.java                 |   6 +-
 .../applicationsmanager/TestAMRestart.java      |   6 +-
 .../distributed/TestTopKNodeSelector.java       | 147 ++++++++++++
 27 files changed, 1034 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index 92d586b..85096ba 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@@ -190,6 +191,10 @@ public class NodeInfo {
       return null;
     }
 
+    public QueuedContainersStatus getQueuedContainersStatus() {
+      return null;
+    }
+
     @Override
     public ResourceUtilization getAggregatedContainersUtilization() {
       return null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index 2e9cccb..ab82e66 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@@ -179,6 +180,10 @@ public class RMNodeWrapper implements RMNode {
     return Collections.EMPTY_LIST;
   }
 
+  public QueuedContainersStatus getQueuedContainersStatus() {
+    return null;
+  }
+
   @Override
   public ResourceUtilization getAggregatedContainersUtilization() {
     return node.getAggregatedContainersUtilization();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index d8a5b71..cdfaaf1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -338,6 +338,23 @@ public class YarnConfiguration extends Configuration {
   public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT =
       600000;
 
+  /** K least loaded nodes to be provided to the LocalScheduler of a
+   * NodeManager for Distributed Scheduling */
+  public static final String DIST_SCHEDULING_TOP_K =
+      YARN_PREFIX + "distributed-scheduling.top-k";
+  public static final int DIST_SCHEDULING_TOP_K_DEFAULT = 10;
+
+  /** Frequency for computing Top K Best Nodes */
+  public static final String DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS =
+      YARN_PREFIX + "distributed-scheduling.top-k-compute-interval-ms";
+  public static final long DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT = 1000;
+
+  /** Comparator for determining Node Load for Distributed Scheduling */
+  public static final String DIST_SCHEDULING_TOP_K_COMPARATOR =
+      YARN_PREFIX + "distributed-scheduling.top-k-comparator";
+  public static final String DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT =
+      "QUEUE_LENGTH";
+
   /**
    * Enable/disable intermediate-data encryption at YARN level. For now, this
    * only is used by the FileSystemRMStateStore to setup right file-system

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index 529d63b..c92a276 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -114,6 +114,31 @@ public class TestYarnConfigurationFields extends 
TestConfigurationFieldsBase {
     configurationPrefixToSkipCompare
         .add(YarnConfiguration.NM_CPU_RESOURCE_ENABLED);
 
+    // Ignore Distributed Scheduling Related Configurations.
+    // Since it is still a "work in progress" feature
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.DIST_SCHEDULING_ENABLED);
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY);
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.DIST_SCHEDULING_INCR_VCORES);
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY);
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.DIST_SCHEDULING_MAX_VCORES);
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS);
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY);
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.DIST_SCHEDULING_MIN_VCORES);
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.DIST_SCHEDULING_TOP_K);
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS);
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR);
+
     // Set by container-executor.cfg
     configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java
new file mode 100644
index 0000000..8a5ad92
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.event;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+
+/**
+ * This is a specialized EventHandler to be used by Services that are expected
+ * handle a large number of events efficiently by ensuring that the caller
+ * thread is not blocked. Events are immediately stored in a BlockingQueue and
+ * a separate dedicated Thread consumes events from the queue and handles
+ * appropriately
+ * @param <T> Type of Event
+ */
+public class EventDispatcher<T extends Event> extends
+    AbstractService implements EventHandler<T> {
+
+  private final EventHandler<T> handler;
+  private final BlockingQueue<T> eventQueue =
+      new LinkedBlockingDeque<>();
+  private final Thread eventProcessor;
+  private volatile boolean stopped = false;
+  private boolean shouldExitOnError = false;
+
+  private static final Log LOG = LogFactory.getLog(EventDispatcher.class);
+
+  private final class EventProcessor implements Runnable {
+    @Override
+    public void run() {
+
+      T event;
+
+      while (!stopped && !Thread.currentThread().isInterrupted()) {
+        try {
+          event = eventQueue.take();
+        } catch (InterruptedException e) {
+          LOG.error("Returning, interrupted : " + e);
+          return; // TODO: Kill RM.
+        }
+
+        try {
+          handler.handle(event);
+        } catch (Throwable t) {
+          // An error occurred, but we are shutting down anyway.
+          // If it was an InterruptedException, the very act of
+          // shutdown could have caused it and is probably harmless.
+          if (stopped) {
+            LOG.warn("Exception during shutdown: ", t);
+            break;
+          }
+          LOG.fatal("Error in handling event type " + event.getType()
+              + " to the Event Dispatcher", t);
+          if (shouldExitOnError
+              && !ShutdownHookManager.get().isShutdownInProgress()) {
+            LOG.info("Exiting, bbye..");
+            System.exit(-1);
+          }
+        }
+      }
+    }
+  }
+
+  public EventDispatcher(EventHandler<T> handler, String name) {
+    super(name);
+    this.handler = handler;
+    this.eventProcessor = new Thread(new EventProcessor());
+    this.eventProcessor.setName(getName() + ":Event Processor");
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    this.shouldExitOnError =
+        conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
+            Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    this.eventProcessor.start();
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    this.stopped = true;
+    this.eventProcessor.interrupt();
+    try {
+      this.eventProcessor.join();
+    } catch (InterruptedException e) {
+      throw new YarnRuntimeException(e);
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public void handle(T event) {
+    try {
+      int qSize = eventQueue.size();
+      if (qSize !=0 && qSize %1000 == 0) {
+        LOG.info("Size of " + getName() + " event-queue is " + qSize);
+      }
+      int remCapacity = eventQueue.remainingCapacity();
+      if (remCapacity < 1000) {
+        LOG.info("Very low remaining capacity on " + getName() + "" +
+            "event queue: " + remCapacity);
+      }
+      this.eventQueue.put(event);
+    } catch (InterruptedException e) {
+      LOG.info("Interrupted. Trying to exit gracefully.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
index 836cd4b..89e054b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
@@ -122,4 +122,13 @@ public abstract class NodeStatus {
   @Unstable
   public abstract void setIncreasedContainers(
       List<Container> increasedContainers);
+
+  @Private
+  @Unstable
+  public abstract QueuedContainersStatus getQueuedContainersStatus();
+
+  @Private
+  @Unstable
+  public abstract void setQueuedContainersStatus(
+      QueuedContainersStatus queuedContainersStatus);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java
new file mode 100644
index 0000000..a7f0ece
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * <p>
+ * <code>QueuedContainersStatus</code> captures information pertaining to the
+ * state of execution of the Queueable containers within a node.
+ * </p>
+ */
+@Private
+@Evolving
+public abstract class QueuedContainersStatus {
+  public static QueuedContainersStatus newInstance() {
+    return Records.newRecord(QueuedContainersStatus.class);
+  }
+
+  public abstract int getEstimatedQueueWaitTime();
+
+  public abstract void setEstimatedQueueWaitTime(int queueWaitTime);
+
+  public abstract int getWaitQueueLength();
+
+  public abstract void setWaitQueueLength(int queueWaitTime);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
index 8dd4832..9a9a83a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
@@ -33,14 +33,17 @@ import 
org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourceUtilizationPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
 import 
org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder;
-import org.apache.hadoop.yarn.proto.YarnProtos.ResourceUtilizationProto;
+
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 
@@ -400,6 +403,27 @@ public class NodeStatusPBImpl extends NodeStatus {
     this.increasedContainers = increasedContainers;
   }
 
+  @Override
+  public QueuedContainersStatus getQueuedContainersStatus() {
+    NodeStatusProtoOrBuilder p =
+        this.viaProto ? this.proto : this.builder;
+    if (!p.hasQueuedContainerStatus()) {
+      return null;
+    }
+    return convertFromProtoFormat(p.getQueuedContainerStatus());
+  }
+
+  @Override
+  public void setQueuedContainersStatus(QueuedContainersStatus 
queuedContainersStatus) {
+    maybeInitBuilder();
+    if (queuedContainersStatus == null) {
+      this.builder.clearQueuedContainerStatus();
+      return;
+    }
+    this.builder.setQueuedContainerStatus(
+        convertToProtoFormat(queuedContainersStatus));
+  }
+
   private NodeIdProto convertToProtoFormat(NodeId nodeId) {
     return ((NodeIdPBImpl)nodeId).getProto();
   }
@@ -433,15 +457,25 @@ public class NodeStatusPBImpl extends NodeStatus {
     return ((ApplicationIdPBImpl)c).getProto();
   }
 
-  private ResourceUtilizationProto convertToProtoFormat(ResourceUtilization r) 
{
+  private YarnProtos.ResourceUtilizationProto 
convertToProtoFormat(ResourceUtilization r) {
     return ((ResourceUtilizationPBImpl) r).getProto();
   }
 
   private ResourceUtilizationPBImpl convertFromProtoFormat(
-      ResourceUtilizationProto p) {
+      YarnProtos.ResourceUtilizationProto p) {
     return new ResourceUtilizationPBImpl(p);
   }
 
+  private YarnServerCommonProtos.QueuedContainersStatusProto 
convertToProtoFormat(
+      QueuedContainersStatus r) {
+    return ((QueuedContainersStatusPBImpl) r).getProto();
+  }
+
+  private QueuedContainersStatus convertFromProtoFormat(
+      YarnServerCommonProtos.QueuedContainersStatusProto p) {
+    return new QueuedContainersStatusPBImpl(p);
+  }
+
   private ContainerPBImpl convertFromProtoFormat(
       ContainerProto c) {
     return new ContainerPBImpl(c);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/QueuedContainersStatusPBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/QueuedContainersStatusPBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/QueuedContainersStatusPBImpl.java
new file mode 100644
index 0000000..54470f4
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/QueuedContainersStatusPBImpl.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records.impl.pb;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
+
+public class QueuedContainersStatusPBImpl extends QueuedContainersStatus {
+
+  private YarnServerCommonProtos.QueuedContainersStatusProto proto =
+      YarnServerCommonProtos.QueuedContainersStatusProto.getDefaultInstance();
+  private YarnServerCommonProtos.QueuedContainersStatusProto.Builder builder = 
null;
+  private boolean viaProto = false;
+
+  public QueuedContainersStatusPBImpl() {
+    builder = YarnServerCommonProtos.QueuedContainersStatusProto.newBuilder();
+  }
+
+  public QueuedContainersStatusPBImpl(YarnServerCommonProtos
+      .QueuedContainersStatusProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public YarnServerCommonProtos.QueuedContainersStatusProto getProto() {
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder =
+          YarnServerCommonProtos.QueuedContainersStatusProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public int getEstimatedQueueWaitTime() {
+    YarnServerCommonProtos.QueuedContainersStatusProtoOrBuilder p =
+        viaProto ? proto : builder;
+    return p.getEstimatedQueueWaitTime();
+  }
+
+  @Override
+  public void setEstimatedQueueWaitTime(int queueWaitTime) {
+    maybeInitBuilder();
+    builder.setEstimatedQueueWaitTime(queueWaitTime);
+  }
+
+  @Override
+  public int getWaitQueueLength() {
+    YarnServerCommonProtos.QueuedContainersStatusProtoOrBuilder p =
+        viaProto ? proto : builder;
+    return p.getWaitQueueLength();
+  }
+
+  @Override
+  public void setWaitQueueLength(int waitQueueLength) {
+    maybeInitBuilder();
+    builder.setWaitQueueLength(waitQueueLength);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
index 77064a0..c23d557 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
@@ -39,6 +39,12 @@ message NodeStatusProto {
   optional ResourceUtilizationProto containers_utilization = 6;
   optional ResourceUtilizationProto node_utilization = 7;
   repeated ContainerProto increased_containers = 8;
+  optional QueuedContainersStatusProto queued_container_status = 9;
+}
+
+message QueuedContainersStatusProto {
+  optional int32 estimated_queue_wait_time = 1;
+  optional int32 wait_queue_length = 2;
 }
 
 message MasterKeyProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
index 86e49f0..27bdfff 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
@@ -39,8 +39,14 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl;
+
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb
+    .NodeHeartbeatRequestPBImpl;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
+
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.junit.Test;
@@ -131,4 +137,28 @@ public class TestProtocolRecords {
           ((NodeHeartbeatResponsePBImpl) record).getProto());
     Assert.assertEquals(appCredentials, proto.getSystemCredentialsForApps());
   }
+
+  @Test
+  public void testNodeHeartBeatRequest() throws IOException {
+    NodeHeartbeatRequest record =
+        Records.newRecord(NodeHeartbeatRequest.class);
+    NodeStatus nodeStatus =
+        Records.newRecord(NodeStatus.class);
+    QueuedContainersStatus queuedContainersStatus = Records.newRecord
+        (QueuedContainersStatus.class);
+    queuedContainersStatus.setEstimatedQueueWaitTime(123);
+    queuedContainersStatus.setWaitQueueLength(321);
+    nodeStatus.setQueuedContainersStatus(queuedContainersStatus);
+    record.setNodeStatus(nodeStatus);
+
+    NodeHeartbeatRequestPBImpl pb = new
+        NodeHeartbeatRequestPBImpl(
+        ((NodeHeartbeatRequestPBImpl) record).getProto());
+
+    Assert.assertEquals(123,
+        pb.getNodeStatus()
+            .getQueuedContainersStatus().getEstimatedQueueWaitTime());
+    Assert.assertEquals(321,
+        pb.getNodeStatus().getQueuedContainersStatus().getWaitQueueLength());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 72769bf..c0f02e9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -71,6 +71,7 @@ import 
org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
@@ -449,9 +450,16 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
           createKeepAliveApplicationList(), nodeHealthStatus,
           containersUtilization, nodeUtilization, increasedContainers);
 
+    nodeStatus.setQueuedContainersStatus(getQueuedContainerStatus());
     return nodeStatus;
   }
 
+  private QueuedContainersStatus getQueuedContainerStatus() {
+    QueuedContainersStatus status = QueuedContainersStatus.newInstance();
+    status.setWaitQueueLength(
+        this.context.getQueuingContext().getQueuedContainers().size());
+    return status;
+  }
   /**
    * Get the aggregated utilization of the containers in this node.
    * @return Resource utilization of all the containers.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java
new file mode 100644
index 0000000..4fd62d0
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.yarn.api.records.ResourceOption;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+
+import java.util.List;
+
+public interface ClusterMonitor {
+
+  void addNode(List<NMContainerStatus> containerStatuses, RMNode rmNode);
+
+  void removeNode(RMNode removedRMNode);
+
+  void nodeUpdate(RMNode rmNode);
+
+  void updateNodeResource(RMNode rmNode, ResourceOption resourceOption);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
index 5210f7f..170d91a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
@@ -18,10 +18,14 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
 import 
org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
 
@@ -40,20 +44,62 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import 
org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
+
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.security
-    .AMRMTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed
+    .TopKNodeSelector;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
+/**
+ * The DistributedSchedulingService is started instead of the
+ * ApplicationMasterService if DistributedScheduling is enabled for the YARN
+ * cluster.
+ * It extends the functionality of the ApplicationMasterService by servicing
+ * clients (AMs and AMRMProxy request interceptors) that understand the
+ * DistributedSchedulingProtocol.
+ */
 public class DistributedSchedulingService extends ApplicationMasterService
-    implements DistributedSchedulerProtocol {
+    implements DistributedSchedulerProtocol, EventHandler<SchedulerEvent> {
+
+  private static final Log LOG =
+      LogFactory.getLog(DistributedSchedulingService.class);
+
+  private final TopKNodeSelector clusterMonitor;
+
+  private final ConcurrentHashMap<String, Set<NodeId>> rackToNode =
+      new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, Set<NodeId>> hostToNode =
+      new ConcurrentHashMap<>();
 
   public DistributedSchedulingService(RMContext rmContext,
       YarnScheduler scheduler) {
     super(DistributedSchedulingService.class.getName(), rmContext, scheduler);
+    int k = rmContext.getYarnConfiguration().getInt(
+        YarnConfiguration.DIST_SCHEDULING_TOP_K,
+        YarnConfiguration.DIST_SCHEDULING_TOP_K_DEFAULT);
+    long topKComputationInterval = rmContext.getYarnConfiguration().getLong(
+        YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS,
+        YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT);
+    TopKNodeSelector.TopKComparator comparator =
+        TopKNodeSelector.TopKComparator.valueOf(
+            rmContext.getYarnConfiguration().get(
+                YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR,
+                YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT));
+    TopKNodeSelector topKSelector =
+        new TopKNodeSelector(k, topKComputationInterval, comparator);
+    this.clusterMonitor = topKSelector;
   }
 
   @Override
@@ -63,8 +109,9 @@ public class DistributedSchedulingService extends 
ApplicationMasterService
         addr, serverConf, secretManager,
         serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
             YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
-    // To support application running no NMs that DO NOT support
-    // Dist Scheduling...
+    // To support application running on NMs that DO NOT support
+    // Dist Scheduling... The server multiplexes both the
+    // ApplicationMasterProtocol as well as the DistributedSchedulingProtocol
     ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
         ApplicationMasterProtocolPB.class,
         ApplicationMasterProtocolService.newReflectiveBlockingService(
@@ -141,10 +188,8 @@ public class DistributedSchedulingService extends 
ApplicationMasterService
         this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
 
     // Set nodes to be used for scheduling
-    // TODO: The actual computation of the list will happen in YARN-4412
-    // TODO: Till then, send the complete list
     dsResp.setNodesForScheduling(
-        new ArrayList<>(this.rmContext.getRMNodes().keySet()));
+        new ArrayList<>(this.clusterMonitor.selectNodes()));
     return dsResp;
   }
 
@@ -156,7 +201,95 @@ public class DistributedSchedulingService extends 
ApplicationMasterService
         (DistSchedAllocateResponse.class);
     dsResp.setAllocateResponse(response);
     dsResp.setNodesForScheduling(
-        new ArrayList<>(this.rmContext.getRMNodes().keySet()));
+        new ArrayList<>(this.clusterMonitor.selectNodes()));
     return dsResp;
   }
+
+  private void addToMapping(ConcurrentHashMap<String, Set<NodeId>> mapping,
+      String rackName, NodeId nodeId) {
+    if (rackName != null) {
+      mapping.putIfAbsent(rackName, new HashSet<NodeId>());
+      Set<NodeId> nodeIds = mapping.get(rackName);
+      synchronized (nodeIds) {
+        nodeIds.add(nodeId);
+      }
+    }
+  }
+
+  private void removeFromMapping(ConcurrentHashMap<String, Set<NodeId>> 
mapping,
+      String rackName, NodeId nodeId) {
+    if (rackName != null) {
+      Set<NodeId> nodeIds = mapping.get(rackName);
+      synchronized (nodeIds) {
+        nodeIds.remove(nodeId);
+      }
+    }
+  }
+
+  @Override
+  public void handle(SchedulerEvent event) {
+    switch (event.getType()) {
+      case NODE_ADDED:
+        if (!(event instanceof NodeAddedSchedulerEvent)) {
+          throw new RuntimeException("Unexpected event type: " + event);
+        }
+        NodeAddedSchedulerEvent nodeAddedEvent = 
(NodeAddedSchedulerEvent)event;
+        clusterMonitor.addNode(nodeAddedEvent.getContainerReports(),
+            nodeAddedEvent.getAddedRMNode());
+        addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(),
+            nodeAddedEvent.getAddedRMNode().getNodeID());
+        addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(),
+            nodeAddedEvent.getAddedRMNode().getNodeID());
+        break;
+      case NODE_REMOVED:
+        if (!(event instanceof NodeRemovedSchedulerEvent)) {
+          throw new RuntimeException("Unexpected event type: " + event);
+        }
+        NodeRemovedSchedulerEvent nodeRemovedEvent =
+            (NodeRemovedSchedulerEvent)event;
+        clusterMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
+        removeFromMapping(rackToNode,
+            nodeRemovedEvent.getRemovedRMNode().getRackName(),
+            nodeRemovedEvent.getRemovedRMNode().getNodeID());
+        removeFromMapping(hostToNode,
+            nodeRemovedEvent.getRemovedRMNode().getHostName(),
+            nodeRemovedEvent.getRemovedRMNode().getNodeID());
+        break;
+      case NODE_UPDATE:
+        if (!(event instanceof NodeUpdateSchedulerEvent)) {
+          throw new RuntimeException("Unexpected event type: " + event);
+        }
+        NodeUpdateSchedulerEvent nodeUpdatedEvent = 
(NodeUpdateSchedulerEvent)event;
+        clusterMonitor.nodeUpdate(nodeUpdatedEvent.getRMNode());
+        break;
+      case NODE_RESOURCE_UPDATE:
+        if (!(event instanceof NodeResourceUpdateSchedulerEvent)) {
+          throw new RuntimeException("Unexpected event type: " + event);
+        }
+        NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
+            (NodeResourceUpdateSchedulerEvent)event;
+        clusterMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
+            nodeResourceUpdatedEvent.getResourceOption());
+        break;
+
+      // <-- IGNORED EVENTS : START -->
+      case APP_ADDED:
+        break;
+      case APP_REMOVED:
+        break;
+      case APP_ATTEMPT_ADDED:
+        break;
+      case APP_ATTEMPT_REMOVED:
+        break;
+      case CONTAINER_EXPIRED:
+        break;
+      case NODE_LABELS_UPDATE:
+        break;
+      // <-- IGNORED EVENTS : END -->
+      default:
+        LOG.error("Unknown event arrived at DistributedSchedulingService: "
+            + event.toString());
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 2fc940b..2af060f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -39,7 +39,6 @@ import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import 
org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
 import org.apache.hadoop.security.authorize.ProxyUsers;
-import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.util.ExitUtil;
@@ -59,6 +58,7 @@ import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import 
org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
@@ -118,8 +118,6 @@ import java.security.PrivilegedExceptionAction;
 import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * The ResourceManager is the main class that is a set of components.
@@ -370,7 +368,7 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
   }
 
   protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
-    return new SchedulerEventDispatcher(this.scheduler);
+    return new EventDispatcher(this.scheduler, "SchedulerEventDispatcher");
   }
 
   protected Dispatcher createDispatcher() {
@@ -726,108 +724,6 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
   }
 
   @Private
-  public static class SchedulerEventDispatcher extends AbstractService
-      implements EventHandler<SchedulerEvent> {
-
-    private final ResourceScheduler scheduler;
-    private final BlockingQueue<SchedulerEvent> eventQueue =
-      new LinkedBlockingQueue<SchedulerEvent>();
-    private volatile int lastEventQueueSizeLogged = 0;
-    private final Thread eventProcessor;
-    private volatile boolean stopped = false;
-    private boolean shouldExitOnError = false;
-
-    public SchedulerEventDispatcher(ResourceScheduler scheduler) {
-      super(SchedulerEventDispatcher.class.getName());
-      this.scheduler = scheduler;
-      this.eventProcessor = new Thread(new EventProcessor());
-      this.eventProcessor.setName("ResourceManager Event Processor");
-    }
-
-    @Override
-    protected void serviceInit(Configuration conf) throws Exception {
-      this.shouldExitOnError =
-          conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
-            Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
-      super.serviceInit(conf);
-    }
-
-    @Override
-    protected void serviceStart() throws Exception {
-      this.eventProcessor.start();
-      super.serviceStart();
-    }
-
-    private final class EventProcessor implements Runnable {
-      @Override
-      public void run() {
-
-        SchedulerEvent event;
-
-        while (!stopped && !Thread.currentThread().isInterrupted()) {
-          try {
-            event = eventQueue.take();
-          } catch (InterruptedException e) {
-            LOG.error("Returning, interrupted : " + e);
-            return; // TODO: Kill RM.
-          }
-
-          try {
-            scheduler.handle(event);
-          } catch (Throwable t) {
-            // An error occurred, but we are shutting down anyway.
-            // If it was an InterruptedException, the very act of 
-            // shutdown could have caused it and is probably harmless.
-            if (stopped) {
-              LOG.warn("Exception during shutdown: ", t);
-              break;
-            }
-            LOG.fatal("Error in handling event type " + event.getType()
-                + " to the scheduler", t);
-            if (shouldExitOnError
-                && !ShutdownHookManager.get().isShutdownInProgress()) {
-              LOG.info("Exiting, bbye..");
-              System.exit(-1);
-            }
-          }
-        }
-      }
-    }
-
-    @Override
-    protected void serviceStop() throws Exception {
-      this.stopped = true;
-      this.eventProcessor.interrupt();
-      try {
-        this.eventProcessor.join();
-      } catch (InterruptedException e) {
-        throw new YarnRuntimeException(e);
-      }
-      super.serviceStop();
-    }
-
-    @Override
-    public void handle(SchedulerEvent event) {
-      try {
-        int qSize = eventQueue.size();
-        if (qSize != 0 && qSize % 1000 == 0
-            && lastEventQueueSizeLogged != qSize) {
-          lastEventQueueSizeLogged = qSize;
-          LOG.info("Size of scheduler event-queue is " + qSize);
-        }
-        int remCapacity = eventQueue.remainingCapacity();
-        if (remCapacity < 1000) {
-          LOG.info("Very low remaining capacity on scheduler event queue: "
-              + remCapacity);
-        }
-        this.eventQueue.put(event);
-      } catch (InterruptedException e) {
-        LOG.info("Interrupted. Trying to exit gracefully.");
-      }
-    }
-  }
-
-  @Private
   public static class RMFatalEventDispatcher
       implements EventHandler<RMFatalEvent> {
 
@@ -1234,7 +1130,19 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
     if (this.rmContext.getYarnConfiguration().getBoolean(
         YarnConfiguration.DIST_SCHEDULING_ENABLED,
         YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) {
-      return new DistributedSchedulingService(this.rmContext, scheduler);
+      DistributedSchedulingService distributedSchedulingService = new
+          DistributedSchedulingService(this.rmContext, scheduler);
+      EventDispatcher distSchedulerEventDispatcher =
+          new EventDispatcher(distributedSchedulingService,
+              DistributedSchedulingService.class.getName());
+      // Add an event dispoatcher for the DistributedSchedulingService
+      // to handle node updates/additions and removals.
+      // Since the SchedulerEvent is currently a super set of theses,
+      // we register interest for it..
+      addService(distSchedulerEventDispatcher);
+      rmDispatcher.register(SchedulerEventType.class,
+          distSchedulerEventDispatcher);
+      return distributedSchedulingService;
     }
     return new ApplicationMasterService(this.rmContext, scheduler);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index d8df9f1..3bf9538 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 
 /**
  * Node managers information on available resources 
@@ -168,4 +169,7 @@ public interface RMNode {
       NodeHeartbeatResponse response);
   
   public List<Container> pullNewlyIncreasedContainers();
+
+  public QueuedContainersStatus getQueuedContainersStatus();
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 9b80716..3179169 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -57,6 +57,7 @@ import 
org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
@@ -125,6 +126,9 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
   /* Resource utilization for the node. */
   private ResourceUtilization nodeUtilization;
 
+  /* Container Queue Information for the node.. Used by Distributed Scheduler 
*/
+  private QueuedContainersStatus queuedContainersStatus;
+
   private final ContainerAllocationExpirer containerAllocationExpirer;
   /* set of containers that have just launched */
   private final Set<ContainerId> launchedContainers =
@@ -1121,7 +1125,7 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
     public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
 
       RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
-
+      rmNode.setQueuedContainersStatus(statusEvent.getContainerQueueInfo());
       NodeHealthStatus remoteNodeHealthStatus = updateRMNodeFromStatusEvents(
           rmNode, statusEvent);
       NodeState initialState = rmNode.getState();
@@ -1383,4 +1387,25 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
   public Resource getOriginalTotalCapability() {
     return this.originalTotalCapability;
   }
- }
+
+  public QueuedContainersStatus getQueuedContainersStatus() {
+    this.readLock.lock();
+
+    try {
+      return this.queuedContainersStatus;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  public void setQueuedContainersStatus(QueuedContainersStatus
+      queuedContainersStatus) {
+    this.writeLock.lock();
+
+    try {
+      this.queuedContainersStatus = queuedContainersStatus;
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
index ba6ac9b..5eeaabe 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 
@@ -79,6 +80,10 @@ public class RMNodeStatusEvent extends RMNodeEvent {
     return this.logAggregationReportsForApps;
   }
 
+  public QueuedContainersStatus getContainerQueueInfo() {
+    return this.nodeStatus.getQueuedContainersStatus();
+  }
+
   public void setLogAggregationReportsForApps(
       List<LogAggregationReport> logAggregationReportsForApps) {
     this.logAggregationReportsForApps = logAggregationReportsForApps;
@@ -89,4 +94,6 @@ public class RMNodeStatusEvent extends RMNodeEvent {
     return this.nodeStatus.getIncreasedContainers() == null ?
         Collections.EMPTY_LIST : this.nodeStatus.getIncreasedContainers();
   }
+
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java
new file mode 100644
index 0000000..7e24687
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceOption;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class TopKNodeSelector implements ClusterMonitor {
+
+  final static Log LOG = LogFactory.getLog(TopKNodeSelector.class);
+
+  public enum TopKComparator implements Comparator<ClusterNode> {
+    WAIT_TIME,
+    QUEUE_LENGTH;
+
+    @Override
+    public int compare(ClusterNode o1, ClusterNode o2) {
+      if (getQuant(o1) == getQuant(o2)) {
+        return o1.timestamp < o2.timestamp ? +1 : -1;
+      }
+      return getQuant(o1) > getQuant(o2) ? +1 : -1;
+    }
+
+    private int getQuant(ClusterNode c) {
+      return (this == WAIT_TIME) ? c.queueTime : c.waitQueueLength;
+    }
+  }
+
+  static class ClusterNode {
+    int queueTime = -1;
+    int waitQueueLength = 0;
+    double timestamp;
+    final NodeId nodeId;
+
+    public ClusterNode(NodeId nodeId) {
+      this.nodeId = nodeId;
+      updateTimestamp();
+    }
+
+    public ClusterNode setQueueTime(int queueTime) {
+      this.queueTime = queueTime;
+      return this;
+    }
+
+    public ClusterNode setWaitQueueLength(int queueLength) {
+      this.waitQueueLength = queueLength;
+      return this;
+    }
+
+    public ClusterNode updateTimestamp() {
+      this.timestamp = System.currentTimeMillis();
+      return this;
+    }
+  }
+
+  private final int k;
+  private final List<NodeId> topKNodes;
+  private final ScheduledExecutorService scheduledExecutor;
+  private final HashMap<NodeId, ClusterNode> clusterNodes = new HashMap<>();
+  private final Comparator<ClusterNode> comparator;
+
+  Runnable computeTask = new Runnable() {
+    @Override
+    public void run() {
+      synchronized (topKNodes) {
+        topKNodes.clear();
+        topKNodes.addAll(computeTopKNodes());
+      }
+    }
+  };
+
+  @VisibleForTesting
+  TopKNodeSelector(int k, TopKComparator comparator) {
+    this.k = k;
+    this.topKNodes = new ArrayList<>();
+    this.comparator = comparator;
+    this.scheduledExecutor = null;
+  }
+
+  public TopKNodeSelector(int k, long nodeComputationInterval,
+      TopKComparator comparator) {
+    this.k = k;
+    this.topKNodes = new ArrayList<>();
+    this.scheduledExecutor = Executors.newScheduledThreadPool(1);
+    this.comparator = comparator;
+    this.scheduledExecutor.scheduleAtFixedRate(computeTask,
+        nodeComputationInterval, nodeComputationInterval,
+        TimeUnit.MILLISECONDS);
+  }
+
+
+  @Override
+  public void addNode(List<NMContainerStatus> containerStatuses, RMNode
+      rmNode) {
+    LOG.debug("Node added event from: " + rmNode.getNode().getName());
+    // Ignoring this currently : atleast one NODE_UPDATE heartbeat is
+    // required to ensure node eligibility.
+  }
+
+  @Override
+  public void removeNode(RMNode removedRMNode) {
+    LOG.debug("Node delete event for: " + removedRMNode.getNode().getName());
+    synchronized (this.clusterNodes) {
+      if (this.clusterNodes.containsKey(removedRMNode.getNodeID())) {
+        this.clusterNodes.remove(removedRMNode.getNodeID());
+        LOG.debug("Delete ClusterNode: " + removedRMNode.getNodeID());
+      } else {
+        LOG.debug("Node not in list!");
+      }
+    }
+  }
+
+  @Override
+  public void nodeUpdate(RMNode rmNode) {
+    LOG.debug("Node update event from: " + rmNode.getNodeID());
+    QueuedContainersStatus queuedContainersStatus =
+        rmNode.getQueuedContainersStatus();
+    int estimatedQueueWaitTime =
+        queuedContainersStatus.getEstimatedQueueWaitTime();
+    int waitQueueLength = queuedContainersStatus.getWaitQueueLength();
+    // Add nodes to clusterNodes.. if estimatedQueueTime is -1, Ignore node
+    // UNLESS comparator is based on queue length, in which case, we should add
+    synchronized (this.clusterNodes) {
+      ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID());
+      if (currentNode == null) {
+        if (estimatedQueueWaitTime != -1
+            || comparator == TopKComparator.QUEUE_LENGTH) {
+          this.clusterNodes.put(rmNode.getNodeID(),
+              new ClusterNode(rmNode.getNodeID())
+                  .setQueueTime(estimatedQueueWaitTime)
+                  .setWaitQueueLength(waitQueueLength));
+          LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" +
+              "with queue wait time [" + estimatedQueueWaitTime + "] and " +
+              "wait queue length [" + waitQueueLength + "]");
+        } else {
+          LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "]" +
+              "with queue wait time [" + estimatedQueueWaitTime + "] and " +
+              "wait queue length [" + waitQueueLength + "]");
+        }
+      } else {
+        if (estimatedQueueWaitTime != -1
+            || comparator == TopKComparator.QUEUE_LENGTH) {
+          currentNode
+              .setQueueTime(estimatedQueueWaitTime)
+              .setWaitQueueLength(waitQueueLength)
+              .updateTimestamp();
+          LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" +
+              "with queue wait time [" + estimatedQueueWaitTime + "] and " +
+              "wait queue length [" + waitQueueLength + "]");
+        } else {
+          this.clusterNodes.remove(rmNode.getNodeID());
+          LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" +
+              "with queue wait time [" + currentNode.queueTime + "] and " +
+              "wait queue length [" + currentNode.waitQueueLength + "]");
+        }
+      }
+    }
+  }
+
+  @Override
+  public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) 
{
+    LOG.debug("Node resource update event from: " + rmNode.getNodeID());
+    // Ignoring this currently...
+  }
+
+  public List<NodeId> selectNodes() {
+    synchronized (this.topKNodes) {
+      return this.k < this.topKNodes.size() ?
+          new ArrayList<>(this.topKNodes).subList(0, this.k) :
+          new ArrayList<>(this.topKNodes);
+    }
+  }
+
+  private List<NodeId> computeTopKNodes() {
+    synchronized (this.clusterNodes) {
+      ArrayList aList = new ArrayList<>(this.clusterNodes.values());
+      List<NodeId> retList = new ArrayList<>();
+      Object[] nodes = aList.toArray();
+      // Collections.sort would do something similar by calling Arrays.sort
+      // internally but would finally iterate through the input list (aList)
+      // to reset the value of each element.. Since we don't really care about
+      // 'aList', we can use the iteration to create the list of nodeIds which
+      // is what we ultimately care about.
+      Arrays.sort(nodes, (Comparator)comparator);
+      for (int j=0; j < nodes.length; j++) {
+        retList.add(((ClusterNode)nodes[j]).nodeId);
+      }
+      return retList;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 89aff29..f5b61a3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
-import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 
@@ -260,6 +260,10 @@ public class MockNodes {
     public ResourceUtilization getNodeUtilization() {
       return this.nodeUtilization;
     }
+
+    public QueuedContainersStatus getQueuedContainersStatus() {
+      return null;
+    }
   };
 
   private static RMNode buildRMNode(int rack, final Resource perNode,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
index 3fa377e..c45fba8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -167,10 +169,11 @@ public class TestApplicationCleanup {
     MockRM rm = new MockRM() {
       @Override
       protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
-        return new SchedulerEventDispatcher(this.scheduler) {
+        return new EventDispatcher<SchedulerEvent>(this.scheduler,
+            this.scheduler.getClass().getName()) {
           @Override
           public void handle(SchedulerEvent event) {
-            scheduler.handle(event);
+            super.handle(event);
           }
         };
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
index 262fd5a..35914c6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
@@ -72,6 +72,11 @@ public class TestDistributedSchedulingService {
       public AMLivelinessMonitor getAMLivelinessMonitor() {
         return null;
       }
+
+      @Override
+      public Configuration getYarnConfiguration() {
+        return new YarnConfiguration();
+      }
     };
     DistributedSchedulingService service =
         new DistributedSchedulingService(rmContext, null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
index c944752..3be439d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java
@@ -27,7 +27,7 @@ import static org.mockito.Mockito.verify;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import 
org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.SchedulerEventDispatcher;
+import org.apache.hadoop.yarn.event.EventDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
@@ -44,8 +44,8 @@ public class TestRMDispatcher {
     AsyncDispatcher rmDispatcher = new AsyncDispatcher();
     CapacityScheduler sched = spy(new CapacityScheduler());
     YarnConfiguration conf = new YarnConfiguration();
-    SchedulerEventDispatcher schedulerDispatcher =
-        new SchedulerEventDispatcher(sched);
+    EventDispatcher schedulerDispatcher =
+        new EventDispatcher(sched, sched.getClass().getName());
     rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
     rmDispatcher.init(conf);
     rmDispatcher.start();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
index 4259e6b..f2f71ce 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@@ -985,7 +986,8 @@ public class TestResourceTrackerService extends 
NodeLabelTestBase {
     rm = new MockRM() {
       @Override
       protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
-        return new SchedulerEventDispatcher(this.scheduler) {
+        return new EventDispatcher<SchedulerEvent>(this.scheduler,
+            this.scheduler.getClass().getName()) {
           @Override
           public void handle(SchedulerEvent event) {
             scheduler.handle(event);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
index f4cb3b3..458f94d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
@@ -21,6 +21,7 @@ package 
org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
 import java.security.PrivilegedExceptionAction;
 import java.util.List;
 
+import org.apache.hadoop.yarn.event.EventDispatcher;
 import org.junit.Assert;
 
 import org.apache.hadoop.conf.Configuration;
@@ -65,10 +66,11 @@ public class TestAMRMRPCNodeUpdates {
       }
       @Override
       protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
-        return new SchedulerEventDispatcher(this.scheduler) {
+        return new EventDispatcher<SchedulerEvent>(this.scheduler,
+            this.scheduler.getClass().getName()) {
           @Override
           public void handle(SchedulerEvent event) {
-            scheduler.handle(event);
+            super.handle(event);
           }
         };
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index 6c31a96..555c4d7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
@@ -423,10 +424,11 @@ public class TestAMRestart {
     MockRM rm1 = new MockRM(conf, memStore) {
       @Override
       protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
-        return new SchedulerEventDispatcher(this.scheduler) {
+        return new EventDispatcher<SchedulerEvent>(this.scheduler,
+            this.scheduler.getClass().getName()) {
           @Override
           public void handle(SchedulerEvent event) {
-            scheduler.handle(event);
+            super.handle(event);
           }
         };
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to