YARN-4412. Create ClusterMonitor to compute ordered list of preferred NMs for OPPORTUNITIC containers. (asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/341888a0 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/341888a0 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/341888a0 Branch: refs/heads/HADOOP-12930 Commit: 341888a0aa23f24458b4e6e34868794b9735c06a Parents: 68b4564 Author: Arun Suresh <asur...@apache.org> Authored: Tue Apr 26 20:12:12 2016 -0700 Committer: Arun Suresh <asur...@apache.org> Committed: Tue Apr 26 20:12:12 2016 -0700 ---------------------------------------------------------------------- .../hadoop/yarn/sls/nodemanager/NodeInfo.java | 5 + .../yarn/sls/scheduler/RMNodeWrapper.java | 5 + .../hadoop/yarn/conf/YarnConfiguration.java | 17 ++ .../yarn/conf/TestYarnConfigurationFields.java | 25 +++ .../hadoop/yarn/event/EventDispatcher.java | 137 ++++++++++++ .../yarn/server/api/records/NodeStatus.java | 9 + .../api/records/QueuedContainersStatus.java | 45 ++++ .../api/records/impl/pb/NodeStatusPBImpl.java | 40 +++- .../impl/pb/QueuedContainersStatusPBImpl.java | 80 +++++++ .../main/proto/yarn_server_common_protos.proto | 6 + .../protocolrecords/TestProtocolRecords.java | 30 +++ .../nodemanager/NodeStatusUpdaterImpl.java | 8 + .../server/resourcemanager/ClusterMonitor.java | 36 +++ .../DistributedSchedulingService.java | 151 ++++++++++++- .../server/resourcemanager/ResourceManager.java | 122 ++-------- .../server/resourcemanager/rmnode/RMNode.java | 4 + .../resourcemanager/rmnode/RMNodeImpl.java | 29 ++- .../rmnode/RMNodeStatusEvent.java | 7 + .../scheduler/distributed/TopKNodeSelector.java | 223 +++++++++++++++++++ .../yarn/server/resourcemanager/MockNodes.java | 6 +- .../resourcemanager/TestApplicationCleanup.java | 7 +- .../TestDistributedSchedulingService.java | 5 + .../resourcemanager/TestRMDispatcher.java | 6 +- .../TestResourceTrackerService.java | 4 +- .../TestAMRMRPCNodeUpdates.java | 6 +- .../applicationsmanager/TestAMRestart.java | 6 +- .../distributed/TestTopKNodeSelector.java | 147 ++++++++++++ 27 files changed, 1034 insertions(+), 132 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 92d586b..85096ba 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode @@ -190,6 +191,10 @@ public class NodeInfo { return null; } + public QueuedContainersStatus getQueuedContainersStatus() { + return null; + } + @Override public ResourceUtilization getAggregatedContainersUtilization() { return null; http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 2e9cccb..ab82e66 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode @@ -179,6 +180,10 @@ public class RMNodeWrapper implements RMNode { return Collections.EMPTY_LIST; } + public QueuedContainersStatus getQueuedContainersStatus() { + return null; + } + @Override public ResourceUtilization getAggregatedContainersUtilization() { return node.getAggregatedContainersUtilization(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index d8a5b71..cdfaaf1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -338,6 +338,23 @@ public class YarnConfiguration extends Configuration { public static final int DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS_DEFAULT = 600000; + /** K least loaded nodes to be provided to the LocalScheduler of a + * NodeManager for Distributed Scheduling */ + public static final String DIST_SCHEDULING_TOP_K = + YARN_PREFIX + "distributed-scheduling.top-k"; + public static final int DIST_SCHEDULING_TOP_K_DEFAULT = 10; + + /** Frequency for computing Top K Best Nodes */ + public static final String DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS = + YARN_PREFIX + "distributed-scheduling.top-k-compute-interval-ms"; + public static final long DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT = 1000; + + /** Comparator for determining Node Load for Distributed Scheduling */ + public static final String DIST_SCHEDULING_TOP_K_COMPARATOR = + YARN_PREFIX + "distributed-scheduling.top-k-comparator"; + public static final String DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT = + "QUEUE_LENGTH"; + /** * Enable/disable intermediate-data encryption at YARN level. For now, this * only is used by the FileSystemRMStateStore to setup right file-system http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index 529d63b..c92a276 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -114,6 +114,31 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase { configurationPrefixToSkipCompare .add(YarnConfiguration.NM_CPU_RESOURCE_ENABLED); + // Ignore Distributed Scheduling Related Configurations. + // Since it is still a "work in progress" feature + configurationPrefixToSkipCompare + .add(YarnConfiguration.DIST_SCHEDULING_ENABLED); + configurationPrefixToSkipCompare + .add(YarnConfiguration.DIST_SCHEDULING_INCR_MEMORY); + configurationPrefixToSkipCompare + .add(YarnConfiguration.DIST_SCHEDULING_INCR_VCORES); + configurationPrefixToSkipCompare + .add(YarnConfiguration.DIST_SCHEDULING_MAX_MEMORY); + configurationPrefixToSkipCompare + .add(YarnConfiguration.DIST_SCHEDULING_MAX_VCORES); + configurationPrefixToSkipCompare + .add(YarnConfiguration.DIST_SCHEDULING_CONTAINER_TOKEN_EXPIRY_MS); + configurationPrefixToSkipCompare + .add(YarnConfiguration.DIST_SCHEDULING_MIN_MEMORY); + configurationPrefixToSkipCompare + .add(YarnConfiguration.DIST_SCHEDULING_MIN_VCORES); + configurationPrefixToSkipCompare + .add(YarnConfiguration.DIST_SCHEDULING_TOP_K); + configurationPrefixToSkipCompare + .add(YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS); + configurationPrefixToSkipCompare + .add(YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR); + // Set by container-executor.cfg configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR); http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java new file mode 100644 index 0000000..8a5ad92 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.event; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; + +/** + * This is a specialized EventHandler to be used by Services that are expected + * handle a large number of events efficiently by ensuring that the caller + * thread is not blocked. Events are immediately stored in a BlockingQueue and + * a separate dedicated Thread consumes events from the queue and handles + * appropriately + * @param <T> Type of Event + */ +public class EventDispatcher<T extends Event> extends + AbstractService implements EventHandler<T> { + + private final EventHandler<T> handler; + private final BlockingQueue<T> eventQueue = + new LinkedBlockingDeque<>(); + private final Thread eventProcessor; + private volatile boolean stopped = false; + private boolean shouldExitOnError = false; + + private static final Log LOG = LogFactory.getLog(EventDispatcher.class); + + private final class EventProcessor implements Runnable { + @Override + public void run() { + + T event; + + while (!stopped && !Thread.currentThread().isInterrupted()) { + try { + event = eventQueue.take(); + } catch (InterruptedException e) { + LOG.error("Returning, interrupted : " + e); + return; // TODO: Kill RM. + } + + try { + handler.handle(event); + } catch (Throwable t) { + // An error occurred, but we are shutting down anyway. + // If it was an InterruptedException, the very act of + // shutdown could have caused it and is probably harmless. + if (stopped) { + LOG.warn("Exception during shutdown: ", t); + break; + } + LOG.fatal("Error in handling event type " + event.getType() + + " to the Event Dispatcher", t); + if (shouldExitOnError + && !ShutdownHookManager.get().isShutdownInProgress()) { + LOG.info("Exiting, bbye.."); + System.exit(-1); + } + } + } + } + } + + public EventDispatcher(EventHandler<T> handler, String name) { + super(name); + this.handler = handler; + this.eventProcessor = new Thread(new EventProcessor()); + this.eventProcessor.setName(getName() + ":Event Processor"); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + this.shouldExitOnError = + conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, + Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + this.eventProcessor.start(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + this.stopped = true; + this.eventProcessor.interrupt(); + try { + this.eventProcessor.join(); + } catch (InterruptedException e) { + throw new YarnRuntimeException(e); + } + super.serviceStop(); + } + + @Override + public void handle(T event) { + try { + int qSize = eventQueue.size(); + if (qSize !=0 && qSize %1000 == 0) { + LOG.info("Size of " + getName() + " event-queue is " + qSize); + } + int remCapacity = eventQueue.remainingCapacity(); + if (remCapacity < 1000) { + LOG.info("Very low remaining capacity on " + getName() + "" + + "event queue: " + remCapacity); + } + this.eventQueue.put(event); + } catch (InterruptedException e) { + LOG.info("Interrupted. Trying to exit gracefully."); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java index 836cd4b..89e054b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java @@ -122,4 +122,13 @@ public abstract class NodeStatus { @Unstable public abstract void setIncreasedContainers( List<Container> increasedContainers); + + @Private + @Unstable + public abstract QueuedContainersStatus getQueuedContainersStatus(); + + @Private + @Unstable + public abstract void setQueuedContainersStatus( + QueuedContainersStatus queuedContainersStatus); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java new file mode 100644 index 0000000..a7f0ece --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/QueuedContainersStatus.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.util.Records; + +/** + * <p> + * <code>QueuedContainersStatus</code> captures information pertaining to the + * state of execution of the Queueable containers within a node. + * </p> + */ +@Private +@Evolving +public abstract class QueuedContainersStatus { + public static QueuedContainersStatus newInstance() { + return Records.newRecord(QueuedContainersStatus.class); + } + + public abstract int getEstimatedQueueWaitTime(); + + public abstract void setEstimatedQueueWaitTime(int queueWaitTime); + + public abstract int getWaitQueueLength(); + + public abstract void setWaitQueueLength(int queueWaitTime); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java index 8dd4832..9a9a83a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java @@ -33,14 +33,17 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceUtilizationPBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder; -import org.apache.hadoop.yarn.proto.YarnProtos.ResourceUtilizationProto; + +import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -400,6 +403,27 @@ public class NodeStatusPBImpl extends NodeStatus { this.increasedContainers = increasedContainers; } + @Override + public QueuedContainersStatus getQueuedContainersStatus() { + NodeStatusProtoOrBuilder p = + this.viaProto ? this.proto : this.builder; + if (!p.hasQueuedContainerStatus()) { + return null; + } + return convertFromProtoFormat(p.getQueuedContainerStatus()); + } + + @Override + public void setQueuedContainersStatus(QueuedContainersStatus queuedContainersStatus) { + maybeInitBuilder(); + if (queuedContainersStatus == null) { + this.builder.clearQueuedContainerStatus(); + return; + } + this.builder.setQueuedContainerStatus( + convertToProtoFormat(queuedContainersStatus)); + } + private NodeIdProto convertToProtoFormat(NodeId nodeId) { return ((NodeIdPBImpl)nodeId).getProto(); } @@ -433,15 +457,25 @@ public class NodeStatusPBImpl extends NodeStatus { return ((ApplicationIdPBImpl)c).getProto(); } - private ResourceUtilizationProto convertToProtoFormat(ResourceUtilization r) { + private YarnProtos.ResourceUtilizationProto convertToProtoFormat(ResourceUtilization r) { return ((ResourceUtilizationPBImpl) r).getProto(); } private ResourceUtilizationPBImpl convertFromProtoFormat( - ResourceUtilizationProto p) { + YarnProtos.ResourceUtilizationProto p) { return new ResourceUtilizationPBImpl(p); } + private YarnServerCommonProtos.QueuedContainersStatusProto convertToProtoFormat( + QueuedContainersStatus r) { + return ((QueuedContainersStatusPBImpl) r).getProto(); + } + + private QueuedContainersStatus convertFromProtoFormat( + YarnServerCommonProtos.QueuedContainersStatusProto p) { + return new QueuedContainersStatusPBImpl(p); + } + private ContainerPBImpl convertFromProtoFormat( ContainerProto c) { return new ContainerPBImpl(c); http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/QueuedContainersStatusPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/QueuedContainersStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/QueuedContainersStatusPBImpl.java new file mode 100644 index 0000000..54470f4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/QueuedContainersStatusPBImpl.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.api.records.impl.pb; + +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos; +import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; + +public class QueuedContainersStatusPBImpl extends QueuedContainersStatus { + + private YarnServerCommonProtos.QueuedContainersStatusProto proto = + YarnServerCommonProtos.QueuedContainersStatusProto.getDefaultInstance(); + private YarnServerCommonProtos.QueuedContainersStatusProto.Builder builder = null; + private boolean viaProto = false; + + public QueuedContainersStatusPBImpl() { + builder = YarnServerCommonProtos.QueuedContainersStatusProto.newBuilder(); + } + + public QueuedContainersStatusPBImpl(YarnServerCommonProtos + .QueuedContainersStatusProto proto) { + this.proto = proto; + viaProto = true; + } + + public YarnServerCommonProtos.QueuedContainersStatusProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = + YarnServerCommonProtos.QueuedContainersStatusProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public int getEstimatedQueueWaitTime() { + YarnServerCommonProtos.QueuedContainersStatusProtoOrBuilder p = + viaProto ? proto : builder; + return p.getEstimatedQueueWaitTime(); + } + + @Override + public void setEstimatedQueueWaitTime(int queueWaitTime) { + maybeInitBuilder(); + builder.setEstimatedQueueWaitTime(queueWaitTime); + } + + @Override + public int getWaitQueueLength() { + YarnServerCommonProtos.QueuedContainersStatusProtoOrBuilder p = + viaProto ? proto : builder; + return p.getWaitQueueLength(); + } + + @Override + public void setWaitQueueLength(int waitQueueLength) { + maybeInitBuilder(); + builder.setWaitQueueLength(waitQueueLength); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 77064a0..c23d557 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -39,6 +39,12 @@ message NodeStatusProto { optional ResourceUtilizationProto containers_utilization = 6; optional ResourceUtilizationProto node_utilization = 7; repeated ContainerProto increased_containers = 8; + optional QueuedContainersStatusProto queued_container_status = 9; +} + +message QueuedContainersStatusProto { + optional int32 estimated_queue_wait_time = 1; + optional int32 wait_queue_length = 2; } message MasterKeyProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java index 86e49f0..27bdfff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java @@ -39,8 +39,14 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl; + +import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb + .NodeHeartbeatRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl; + +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; import org.junit.Test; @@ -131,4 +137,28 @@ public class TestProtocolRecords { ((NodeHeartbeatResponsePBImpl) record).getProto()); Assert.assertEquals(appCredentials, proto.getSystemCredentialsForApps()); } + + @Test + public void testNodeHeartBeatRequest() throws IOException { + NodeHeartbeatRequest record = + Records.newRecord(NodeHeartbeatRequest.class); + NodeStatus nodeStatus = + Records.newRecord(NodeStatus.class); + QueuedContainersStatus queuedContainersStatus = Records.newRecord + (QueuedContainersStatus.class); + queuedContainersStatus.setEstimatedQueueWaitTime(123); + queuedContainersStatus.setWaitQueueLength(321); + nodeStatus.setQueuedContainersStatus(queuedContainersStatus); + record.setNodeStatus(nodeStatus); + + NodeHeartbeatRequestPBImpl pb = new + NodeHeartbeatRequestPBImpl( + ((NodeHeartbeatRequestPBImpl) record).getProto()); + + Assert.assertEquals(123, + pb.getNodeStatus() + .getQueuedContainersStatus().getEstimatedQueueWaitTime()); + Assert.assertEquals(321, + pb.getNodeStatus().getQueuedContainersStatus().getWaitQueueLength()); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 72769bf..c0f02e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; @@ -449,9 +450,16 @@ public class NodeStatusUpdaterImpl extends AbstractService implements createKeepAliveApplicationList(), nodeHealthStatus, containersUtilization, nodeUtilization, increasedContainers); + nodeStatus.setQueuedContainersStatus(getQueuedContainerStatus()); return nodeStatus; } + private QueuedContainersStatus getQueuedContainerStatus() { + QueuedContainersStatus status = QueuedContainersStatus.newInstance(); + status.setWaitQueueLength( + this.context.getQueuingContext().getQueuedContainers().size()); + return status; + } /** * Get the aggregated utilization of the containers in this node. * @return Resource utilization of all the containers. http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java new file mode 100644 index 0000000..4fd62d0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.yarn.api.records.ResourceOption; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; + +import java.util.List; + +public interface ClusterMonitor { + + void addNode(List<NMContainerStatus> containerStatuses, RMNode rmNode); + + void removeNode(RMNode removedRMNode); + + void nodeUpdate(RMNode rmNode); + + void updateNodeResource(RMNode rmNode, ResourceOption resourceOption); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java index 5210f7f..170d91a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java @@ -18,10 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol; import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl; @@ -40,20 +44,62 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.security - .AMRMTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed + .TopKNodeSelector; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +/** + * The DistributedSchedulingService is started instead of the + * ApplicationMasterService if DistributedScheduling is enabled for the YARN + * cluster. + * It extends the functionality of the ApplicationMasterService by servicing + * clients (AMs and AMRMProxy request interceptors) that understand the + * DistributedSchedulingProtocol. + */ public class DistributedSchedulingService extends ApplicationMasterService - implements DistributedSchedulerProtocol { + implements DistributedSchedulerProtocol, EventHandler<SchedulerEvent> { + + private static final Log LOG = + LogFactory.getLog(DistributedSchedulingService.class); + + private final TopKNodeSelector clusterMonitor; + + private final ConcurrentHashMap<String, Set<NodeId>> rackToNode = + new ConcurrentHashMap<>(); + private final ConcurrentHashMap<String, Set<NodeId>> hostToNode = + new ConcurrentHashMap<>(); public DistributedSchedulingService(RMContext rmContext, YarnScheduler scheduler) { super(DistributedSchedulingService.class.getName(), rmContext, scheduler); + int k = rmContext.getYarnConfiguration().getInt( + YarnConfiguration.DIST_SCHEDULING_TOP_K, + YarnConfiguration.DIST_SCHEDULING_TOP_K_DEFAULT); + long topKComputationInterval = rmContext.getYarnConfiguration().getLong( + YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS, + YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT); + TopKNodeSelector.TopKComparator comparator = + TopKNodeSelector.TopKComparator.valueOf( + rmContext.getYarnConfiguration().get( + YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR, + YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT)); + TopKNodeSelector topKSelector = + new TopKNodeSelector(k, topKComputationInterval, comparator); + this.clusterMonitor = topKSelector; } @Override @@ -63,8 +109,9 @@ public class DistributedSchedulingService extends ApplicationMasterService addr, serverConf, secretManager, serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT)); - // To support application running no NMs that DO NOT support - // Dist Scheduling... + // To support application running on NMs that DO NOT support + // Dist Scheduling... The server multiplexes both the + // ApplicationMasterProtocol as well as the DistributedSchedulingProtocol ((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, ApplicationMasterProtocolPB.class, ApplicationMasterProtocolService.newReflectiveBlockingService( @@ -141,10 +188,8 @@ public class DistributedSchedulingService extends ApplicationMasterService this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT); // Set nodes to be used for scheduling - // TODO: The actual computation of the list will happen in YARN-4412 - // TODO: Till then, send the complete list dsResp.setNodesForScheduling( - new ArrayList<>(this.rmContext.getRMNodes().keySet())); + new ArrayList<>(this.clusterMonitor.selectNodes())); return dsResp; } @@ -156,7 +201,95 @@ public class DistributedSchedulingService extends ApplicationMasterService (DistSchedAllocateResponse.class); dsResp.setAllocateResponse(response); dsResp.setNodesForScheduling( - new ArrayList<>(this.rmContext.getRMNodes().keySet())); + new ArrayList<>(this.clusterMonitor.selectNodes())); return dsResp; } + + private void addToMapping(ConcurrentHashMap<String, Set<NodeId>> mapping, + String rackName, NodeId nodeId) { + if (rackName != null) { + mapping.putIfAbsent(rackName, new HashSet<NodeId>()); + Set<NodeId> nodeIds = mapping.get(rackName); + synchronized (nodeIds) { + nodeIds.add(nodeId); + } + } + } + + private void removeFromMapping(ConcurrentHashMap<String, Set<NodeId>> mapping, + String rackName, NodeId nodeId) { + if (rackName != null) { + Set<NodeId> nodeIds = mapping.get(rackName); + synchronized (nodeIds) { + nodeIds.remove(nodeId); + } + } + } + + @Override + public void handle(SchedulerEvent event) { + switch (event.getType()) { + case NODE_ADDED: + if (!(event instanceof NodeAddedSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; + clusterMonitor.addNode(nodeAddedEvent.getContainerReports(), + nodeAddedEvent.getAddedRMNode()); + addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(), + nodeAddedEvent.getAddedRMNode().getNodeID()); + addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(), + nodeAddedEvent.getAddedRMNode().getNodeID()); + break; + case NODE_REMOVED: + if (!(event instanceof NodeRemovedSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeRemovedSchedulerEvent nodeRemovedEvent = + (NodeRemovedSchedulerEvent)event; + clusterMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode()); + removeFromMapping(rackToNode, + nodeRemovedEvent.getRemovedRMNode().getRackName(), + nodeRemovedEvent.getRemovedRMNode().getNodeID()); + removeFromMapping(hostToNode, + nodeRemovedEvent.getRemovedRMNode().getHostName(), + nodeRemovedEvent.getRemovedRMNode().getNodeID()); + break; + case NODE_UPDATE: + if (!(event instanceof NodeUpdateSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; + clusterMonitor.nodeUpdate(nodeUpdatedEvent.getRMNode()); + break; + case NODE_RESOURCE_UPDATE: + if (!(event instanceof NodeResourceUpdateSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = + (NodeResourceUpdateSchedulerEvent)event; + clusterMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(), + nodeResourceUpdatedEvent.getResourceOption()); + break; + + // <-- IGNORED EVENTS : START --> + case APP_ADDED: + break; + case APP_REMOVED: + break; + case APP_ATTEMPT_ADDED: + break; + case APP_ATTEMPT_REMOVED: + break; + case CONTAINER_EXPIRED: + break; + case NODE_LABELS_UPDATE: + break; + // <-- IGNORED EVENTS : END --> + default: + LOG.error("Unknown event arrived at DistributedSchedulingService: " + + event.toString()); + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 2fc940b..2af060f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -39,7 +39,6 @@ import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler; import org.apache.hadoop.security.authorize.ProxyUsers; -import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.Service; import org.apache.hadoop.util.ExitUtil; @@ -59,6 +58,7 @@ import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; @@ -118,8 +118,6 @@ import java.security.PrivilegedExceptionAction; import java.security.SecureRandom; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; /** * The ResourceManager is the main class that is a set of components. @@ -370,7 +368,7 @@ public class ResourceManager extends CompositeService implements Recoverable { } protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { - return new SchedulerEventDispatcher(this.scheduler); + return new EventDispatcher(this.scheduler, "SchedulerEventDispatcher"); } protected Dispatcher createDispatcher() { @@ -726,108 +724,6 @@ public class ResourceManager extends CompositeService implements Recoverable { } @Private - public static class SchedulerEventDispatcher extends AbstractService - implements EventHandler<SchedulerEvent> { - - private final ResourceScheduler scheduler; - private final BlockingQueue<SchedulerEvent> eventQueue = - new LinkedBlockingQueue<SchedulerEvent>(); - private volatile int lastEventQueueSizeLogged = 0; - private final Thread eventProcessor; - private volatile boolean stopped = false; - private boolean shouldExitOnError = false; - - public SchedulerEventDispatcher(ResourceScheduler scheduler) { - super(SchedulerEventDispatcher.class.getName()); - this.scheduler = scheduler; - this.eventProcessor = new Thread(new EventProcessor()); - this.eventProcessor.setName("ResourceManager Event Processor"); - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - this.shouldExitOnError = - conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, - Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); - super.serviceInit(conf); - } - - @Override - protected void serviceStart() throws Exception { - this.eventProcessor.start(); - super.serviceStart(); - } - - private final class EventProcessor implements Runnable { - @Override - public void run() { - - SchedulerEvent event; - - while (!stopped && !Thread.currentThread().isInterrupted()) { - try { - event = eventQueue.take(); - } catch (InterruptedException e) { - LOG.error("Returning, interrupted : " + e); - return; // TODO: Kill RM. - } - - try { - scheduler.handle(event); - } catch (Throwable t) { - // An error occurred, but we are shutting down anyway. - // If it was an InterruptedException, the very act of - // shutdown could have caused it and is probably harmless. - if (stopped) { - LOG.warn("Exception during shutdown: ", t); - break; - } - LOG.fatal("Error in handling event type " + event.getType() - + " to the scheduler", t); - if (shouldExitOnError - && !ShutdownHookManager.get().isShutdownInProgress()) { - LOG.info("Exiting, bbye.."); - System.exit(-1); - } - } - } - } - } - - @Override - protected void serviceStop() throws Exception { - this.stopped = true; - this.eventProcessor.interrupt(); - try { - this.eventProcessor.join(); - } catch (InterruptedException e) { - throw new YarnRuntimeException(e); - } - super.serviceStop(); - } - - @Override - public void handle(SchedulerEvent event) { - try { - int qSize = eventQueue.size(); - if (qSize != 0 && qSize % 1000 == 0 - && lastEventQueueSizeLogged != qSize) { - lastEventQueueSizeLogged = qSize; - LOG.info("Size of scheduler event-queue is " + qSize); - } - int remCapacity = eventQueue.remainingCapacity(); - if (remCapacity < 1000) { - LOG.info("Very low remaining capacity on scheduler event queue: " - + remCapacity); - } - this.eventQueue.put(event); - } catch (InterruptedException e) { - LOG.info("Interrupted. Trying to exit gracefully."); - } - } - } - - @Private public static class RMFatalEventDispatcher implements EventHandler<RMFatalEvent> { @@ -1234,7 +1130,19 @@ public class ResourceManager extends CompositeService implements Recoverable { if (this.rmContext.getYarnConfiguration().getBoolean( YarnConfiguration.DIST_SCHEDULING_ENABLED, YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT)) { - return new DistributedSchedulingService(this.rmContext, scheduler); + DistributedSchedulingService distributedSchedulingService = new + DistributedSchedulingService(this.rmContext, scheduler); + EventDispatcher distSchedulerEventDispatcher = + new EventDispatcher(distributedSchedulingService, + DistributedSchedulingService.class.getName()); + // Add an event dispoatcher for the DistributedSchedulingService + // to handle node updates/additions and removals. + // Since the SchedulerEvent is currently a super set of theses, + // we register interest for it.. + addService(distSchedulerEventDispatcher); + rmDispatcher.register(SchedulerEventType.class, + distSchedulerEventDispatcher); + return distributedSchedulingService; } return new ApplicationMasterService(this.rmContext, scheduler); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index d8df9f1..3bf9538 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; /** * Node managers information on available resources @@ -168,4 +169,7 @@ public interface RMNode { NodeHeartbeatResponse response); public List<Container> pullNewlyIncreasedContainers(); + + public QueuedContainersStatus getQueuedContainersStatus(); + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 9b80716..3179169 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent; @@ -125,6 +126,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { /* Resource utilization for the node. */ private ResourceUtilization nodeUtilization; + /* Container Queue Information for the node.. Used by Distributed Scheduler */ + private QueuedContainersStatus queuedContainersStatus; + private final ContainerAllocationExpirer containerAllocationExpirer; /* set of containers that have just launched */ private final Set<ContainerId> launchedContainers = @@ -1121,7 +1125,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event; - + rmNode.setQueuedContainersStatus(statusEvent.getContainerQueueInfo()); NodeHealthStatus remoteNodeHealthStatus = updateRMNodeFromStatusEvents( rmNode, statusEvent); NodeState initialState = rmNode.getState(); @@ -1383,4 +1387,25 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { public Resource getOriginalTotalCapability() { return this.originalTotalCapability; } - } + + public QueuedContainersStatus getQueuedContainersStatus() { + this.readLock.lock(); + + try { + return this.queuedContainersStatus; + } finally { + this.readLock.unlock(); + } + } + + public void setQueuedContainersStatus(QueuedContainersStatus + queuedContainersStatus) { + this.writeLock.lock(); + + try { + this.queuedContainersStatus = queuedContainersStatus; + } finally { + this.writeLock.unlock(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java index ba6ac9b..5eeaabe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -79,6 +80,10 @@ public class RMNodeStatusEvent extends RMNodeEvent { return this.logAggregationReportsForApps; } + public QueuedContainersStatus getContainerQueueInfo() { + return this.nodeStatus.getQueuedContainersStatus(); + } + public void setLogAggregationReportsForApps( List<LogAggregationReport> logAggregationReportsForApps) { this.logAggregationReportsForApps = logAggregationReportsForApps; @@ -89,4 +94,6 @@ public class RMNodeStatusEvent extends RMNodeEvent { return this.nodeStatus.getIncreasedContainers() == null ? Collections.EMPTY_LIST : this.nodeStatus.getIncreasedContainers(); } + + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java new file mode 100644 index 0000000..7e24687 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceOption; +import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; +import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class TopKNodeSelector implements ClusterMonitor { + + final static Log LOG = LogFactory.getLog(TopKNodeSelector.class); + + public enum TopKComparator implements Comparator<ClusterNode> { + WAIT_TIME, + QUEUE_LENGTH; + + @Override + public int compare(ClusterNode o1, ClusterNode o2) { + if (getQuant(o1) == getQuant(o2)) { + return o1.timestamp < o2.timestamp ? +1 : -1; + } + return getQuant(o1) > getQuant(o2) ? +1 : -1; + } + + private int getQuant(ClusterNode c) { + return (this == WAIT_TIME) ? c.queueTime : c.waitQueueLength; + } + } + + static class ClusterNode { + int queueTime = -1; + int waitQueueLength = 0; + double timestamp; + final NodeId nodeId; + + public ClusterNode(NodeId nodeId) { + this.nodeId = nodeId; + updateTimestamp(); + } + + public ClusterNode setQueueTime(int queueTime) { + this.queueTime = queueTime; + return this; + } + + public ClusterNode setWaitQueueLength(int queueLength) { + this.waitQueueLength = queueLength; + return this; + } + + public ClusterNode updateTimestamp() { + this.timestamp = System.currentTimeMillis(); + return this; + } + } + + private final int k; + private final List<NodeId> topKNodes; + private final ScheduledExecutorService scheduledExecutor; + private final HashMap<NodeId, ClusterNode> clusterNodes = new HashMap<>(); + private final Comparator<ClusterNode> comparator; + + Runnable computeTask = new Runnable() { + @Override + public void run() { + synchronized (topKNodes) { + topKNodes.clear(); + topKNodes.addAll(computeTopKNodes()); + } + } + }; + + @VisibleForTesting + TopKNodeSelector(int k, TopKComparator comparator) { + this.k = k; + this.topKNodes = new ArrayList<>(); + this.comparator = comparator; + this.scheduledExecutor = null; + } + + public TopKNodeSelector(int k, long nodeComputationInterval, + TopKComparator comparator) { + this.k = k; + this.topKNodes = new ArrayList<>(); + this.scheduledExecutor = Executors.newScheduledThreadPool(1); + this.comparator = comparator; + this.scheduledExecutor.scheduleAtFixedRate(computeTask, + nodeComputationInterval, nodeComputationInterval, + TimeUnit.MILLISECONDS); + } + + + @Override + public void addNode(List<NMContainerStatus> containerStatuses, RMNode + rmNode) { + LOG.debug("Node added event from: " + rmNode.getNode().getName()); + // Ignoring this currently : atleast one NODE_UPDATE heartbeat is + // required to ensure node eligibility. + } + + @Override + public void removeNode(RMNode removedRMNode) { + LOG.debug("Node delete event for: " + removedRMNode.getNode().getName()); + synchronized (this.clusterNodes) { + if (this.clusterNodes.containsKey(removedRMNode.getNodeID())) { + this.clusterNodes.remove(removedRMNode.getNodeID()); + LOG.debug("Delete ClusterNode: " + removedRMNode.getNodeID()); + } else { + LOG.debug("Node not in list!"); + } + } + } + + @Override + public void nodeUpdate(RMNode rmNode) { + LOG.debug("Node update event from: " + rmNode.getNodeID()); + QueuedContainersStatus queuedContainersStatus = + rmNode.getQueuedContainersStatus(); + int estimatedQueueWaitTime = + queuedContainersStatus.getEstimatedQueueWaitTime(); + int waitQueueLength = queuedContainersStatus.getWaitQueueLength(); + // Add nodes to clusterNodes.. if estimatedQueueTime is -1, Ignore node + // UNLESS comparator is based on queue length, in which case, we should add + synchronized (this.clusterNodes) { + ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID()); + if (currentNode == null) { + if (estimatedQueueWaitTime != -1 + || comparator == TopKComparator.QUEUE_LENGTH) { + this.clusterNodes.put(rmNode.getNodeID(), + new ClusterNode(rmNode.getNodeID()) + .setQueueTime(estimatedQueueWaitTime) + .setWaitQueueLength(waitQueueLength)); + LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" + + "with queue wait time [" + estimatedQueueWaitTime + "] and " + + "wait queue length [" + waitQueueLength + "]"); + } else { + LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "]" + + "with queue wait time [" + estimatedQueueWaitTime + "] and " + + "wait queue length [" + waitQueueLength + "]"); + } + } else { + if (estimatedQueueWaitTime != -1 + || comparator == TopKComparator.QUEUE_LENGTH) { + currentNode + .setQueueTime(estimatedQueueWaitTime) + .setWaitQueueLength(waitQueueLength) + .updateTimestamp(); + LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" + + "with queue wait time [" + estimatedQueueWaitTime + "] and " + + "wait queue length [" + waitQueueLength + "]"); + } else { + this.clusterNodes.remove(rmNode.getNodeID()); + LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" + + "with queue wait time [" + currentNode.queueTime + "] and " + + "wait queue length [" + currentNode.waitQueueLength + "]"); + } + } + } + } + + @Override + public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) { + LOG.debug("Node resource update event from: " + rmNode.getNodeID()); + // Ignoring this currently... + } + + public List<NodeId> selectNodes() { + synchronized (this.topKNodes) { + return this.k < this.topKNodes.size() ? + new ArrayList<>(this.topKNodes).subList(0, this.k) : + new ArrayList<>(this.topKNodes); + } + } + + private List<NodeId> computeTopKNodes() { + synchronized (this.clusterNodes) { + ArrayList aList = new ArrayList<>(this.clusterNodes.values()); + List<NodeId> retList = new ArrayList<>(); + Object[] nodes = aList.toArray(); + // Collections.sort would do something similar by calling Arrays.sort + // internally but would finally iterate through the input list (aList) + // to reset the value of each element.. Since we don't really care about + // 'aList', we can use the iteration to create the list of nodeIds which + // is what we ultimately care about. + Arrays.sort(nodes, (Comparator)comparator); + for (int j=0; j < nodes.length; j++) { + retList.add(((ClusterNode)nodes[j]).nodeId); + } + return retList; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index 89aff29..f5b61a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; -import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; @@ -260,6 +260,10 @@ public class MockNodes { public ResourceUtilization getNodeUtilization() { return this.nodeUtilization; } + + public QueuedContainersStatus getQueuedContainersStatus() { + return null; + } }; private static RMNode buildRMNode(int rack, final Resource perNode, http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index 3fa377e..c45fba8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -42,6 +42,8 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -167,10 +169,11 @@ public class TestApplicationCleanup { MockRM rm = new MockRM() { @Override protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { - return new SchedulerEventDispatcher(this.scheduler) { + return new EventDispatcher<SchedulerEvent>(this.scheduler, + this.scheduler.getClass().getName()) { @Override public void handle(SchedulerEvent event) { - scheduler.handle(event); + super.handle(event); } }; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java index 262fd5a..35914c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java @@ -72,6 +72,11 @@ public class TestDistributedSchedulingService { public AMLivelinessMonitor getAMLivelinessMonitor() { return null; } + + @Override + public Configuration getYarnConfiguration() { + return new YarnConfiguration(); + } }; DistributedSchedulingService service = new DistributedSchedulingService(rmContext, null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java index c944752..3be439d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java @@ -27,7 +27,7 @@ import static org.mockito.Mockito.verify; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.SchedulerEventDispatcher; +import org.apache.hadoop.yarn.event.EventDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; @@ -44,8 +44,8 @@ public class TestRMDispatcher { AsyncDispatcher rmDispatcher = new AsyncDispatcher(); CapacityScheduler sched = spy(new CapacityScheduler()); YarnConfiguration conf = new YarnConfiguration(); - SchedulerEventDispatcher schedulerDispatcher = - new SchedulerEventDispatcher(sched); + EventDispatcher schedulerDispatcher = + new EventDispatcher(sched, sched.getClass().getName()); rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher); rmDispatcher.init(conf); rmDispatcher.start(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 4259e6b..f2f71ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; @@ -985,7 +986,8 @@ public class TestResourceTrackerService extends NodeLabelTestBase { rm = new MockRM() { @Override protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { - return new SchedulerEventDispatcher(this.scheduler) { + return new EventDispatcher<SchedulerEvent>(this.scheduler, + this.scheduler.getClass().getName()) { @Override public void handle(SchedulerEvent event) { scheduler.handle(event); http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java index f4cb3b3..458f94d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager; import java.security.PrivilegedExceptionAction; import java.util.List; +import org.apache.hadoop.yarn.event.EventDispatcher; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -65,10 +66,11 @@ public class TestAMRMRPCNodeUpdates { } @Override protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { - return new SchedulerEventDispatcher(this.scheduler) { + return new EventDispatcher<SchedulerEvent>(this.scheduler, + this.scheduler.getClass().getName()) { @Override public void handle(SchedulerEvent event) { - scheduler.handle(event); + super.handle(event); } }; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/341888a0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index 6c31a96..555c4d7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.event.EventDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; @@ -423,10 +424,11 @@ public class TestAMRestart { MockRM rm1 = new MockRM(conf, memStore) { @Override protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { - return new SchedulerEventDispatcher(this.scheduler) { + return new EventDispatcher<SchedulerEvent>(this.scheduler, + this.scheduler.getClass().getName()) { @Override public void handle(SchedulerEvent event) { - scheduler.handle(event); + super.handle(event); } }; } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org