http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java deleted file mode 100644 index 7e24687..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java +++ /dev/null @@ -1,223 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.ResourceOption; -import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; -import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; -import org.apache.hadoop.yarn.server.resourcemanager.ClusterMonitor; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class TopKNodeSelector implements ClusterMonitor { - - final static Log LOG = LogFactory.getLog(TopKNodeSelector.class); - - public enum TopKComparator implements Comparator<ClusterNode> { - WAIT_TIME, - QUEUE_LENGTH; - - @Override - public int compare(ClusterNode o1, ClusterNode o2) { - if (getQuant(o1) == getQuant(o2)) { - return o1.timestamp < o2.timestamp ? +1 : -1; - } - return getQuant(o1) > getQuant(o2) ? +1 : -1; - } - - private int getQuant(ClusterNode c) { - return (this == WAIT_TIME) ? c.queueTime : c.waitQueueLength; - } - } - - static class ClusterNode { - int queueTime = -1; - int waitQueueLength = 0; - double timestamp; - final NodeId nodeId; - - public ClusterNode(NodeId nodeId) { - this.nodeId = nodeId; - updateTimestamp(); - } - - public ClusterNode setQueueTime(int queueTime) { - this.queueTime = queueTime; - return this; - } - - public ClusterNode setWaitQueueLength(int queueLength) { - this.waitQueueLength = queueLength; - return this; - } - - public ClusterNode updateTimestamp() { - this.timestamp = System.currentTimeMillis(); - return this; - } - } - - private final int k; - private final List<NodeId> topKNodes; - private final ScheduledExecutorService scheduledExecutor; - private final HashMap<NodeId, ClusterNode> clusterNodes = new HashMap<>(); - private final Comparator<ClusterNode> comparator; - - Runnable computeTask = new Runnable() { - @Override - public void run() { - synchronized (topKNodes) { - topKNodes.clear(); - topKNodes.addAll(computeTopKNodes()); - } - } - }; - - @VisibleForTesting - TopKNodeSelector(int k, TopKComparator comparator) { - this.k = k; - this.topKNodes = new ArrayList<>(); - this.comparator = comparator; - this.scheduledExecutor = null; - } - - public TopKNodeSelector(int k, long nodeComputationInterval, - TopKComparator comparator) { - this.k = k; - this.topKNodes = new ArrayList<>(); - this.scheduledExecutor = Executors.newScheduledThreadPool(1); - this.comparator = comparator; - this.scheduledExecutor.scheduleAtFixedRate(computeTask, - nodeComputationInterval, nodeComputationInterval, - TimeUnit.MILLISECONDS); - } - - - @Override - public void addNode(List<NMContainerStatus> containerStatuses, RMNode - rmNode) { - LOG.debug("Node added event from: " + rmNode.getNode().getName()); - // Ignoring this currently : atleast one NODE_UPDATE heartbeat is - // required to ensure node eligibility. - } - - @Override - public void removeNode(RMNode removedRMNode) { - LOG.debug("Node delete event for: " + removedRMNode.getNode().getName()); - synchronized (this.clusterNodes) { - if (this.clusterNodes.containsKey(removedRMNode.getNodeID())) { - this.clusterNodes.remove(removedRMNode.getNodeID()); - LOG.debug("Delete ClusterNode: " + removedRMNode.getNodeID()); - } else { - LOG.debug("Node not in list!"); - } - } - } - - @Override - public void nodeUpdate(RMNode rmNode) { - LOG.debug("Node update event from: " + rmNode.getNodeID()); - QueuedContainersStatus queuedContainersStatus = - rmNode.getQueuedContainersStatus(); - int estimatedQueueWaitTime = - queuedContainersStatus.getEstimatedQueueWaitTime(); - int waitQueueLength = queuedContainersStatus.getWaitQueueLength(); - // Add nodes to clusterNodes.. if estimatedQueueTime is -1, Ignore node - // UNLESS comparator is based on queue length, in which case, we should add - synchronized (this.clusterNodes) { - ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID()); - if (currentNode == null) { - if (estimatedQueueWaitTime != -1 - || comparator == TopKComparator.QUEUE_LENGTH) { - this.clusterNodes.put(rmNode.getNodeID(), - new ClusterNode(rmNode.getNodeID()) - .setQueueTime(estimatedQueueWaitTime) - .setWaitQueueLength(waitQueueLength)); - LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" + - "with queue wait time [" + estimatedQueueWaitTime + "] and " + - "wait queue length [" + waitQueueLength + "]"); - } else { - LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "]" + - "with queue wait time [" + estimatedQueueWaitTime + "] and " + - "wait queue length [" + waitQueueLength + "]"); - } - } else { - if (estimatedQueueWaitTime != -1 - || comparator == TopKComparator.QUEUE_LENGTH) { - currentNode - .setQueueTime(estimatedQueueWaitTime) - .setWaitQueueLength(waitQueueLength) - .updateTimestamp(); - LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" + - "with queue wait time [" + estimatedQueueWaitTime + "] and " + - "wait queue length [" + waitQueueLength + "]"); - } else { - this.clusterNodes.remove(rmNode.getNodeID()); - LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" + - "with queue wait time [" + currentNode.queueTime + "] and " + - "wait queue length [" + currentNode.waitQueueLength + "]"); - } - } - } - } - - @Override - public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) { - LOG.debug("Node resource update event from: " + rmNode.getNodeID()); - // Ignoring this currently... - } - - public List<NodeId> selectNodes() { - synchronized (this.topKNodes) { - return this.k < this.topKNodes.size() ? - new ArrayList<>(this.topKNodes).subList(0, this.k) : - new ArrayList<>(this.topKNodes); - } - } - - private List<NodeId> computeTopKNodes() { - synchronized (this.clusterNodes) { - ArrayList aList = new ArrayList<>(this.clusterNodes.values()); - List<NodeId> retList = new ArrayList<>(); - Object[] nodes = aList.toArray(); - // Collections.sort would do something similar by calling Arrays.sort - // internally but would finally iterate through the input list (aList) - // to reset the value of each element.. Since we don't really care about - // 'aList', we can use the iteration to create the list of nodeIds which - // is what we ultimately care about. - Arrays.sort(nodes, (Comparator)comparator); - for (int j=0; j < nodes.length; j++) { - retList.add(((ClusterNode)nodes[j]).nodeId); - } - return retList; - } - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java new file mode 100644 index 0000000..5f63923 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; +import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.List; + +/** + * Unit tests for NodeQueueLoadMonitor. + */ +public class TestNodeQueueLoadMonitor { + + static class FakeNodeId extends NodeId { + final String host; + final int port; + + public FakeNodeId(String host, int port) { + this.host = host; + this.port = port; + } + + @Override + public String getHost() { + return host; + } + + @Override + public int getPort() { + return port; + } + + @Override + protected void setHost(String host) {} + @Override + protected void setPort(int port) {} + @Override + protected void build() {} + + @Override + public String toString() { + return host + ":" + port; + } + } + + @Test + public void testWaitTimeSort() { + NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor( + NodeQueueLoadMonitor.LoadComparator.QUEUE_WAIT_TIME); + selector.updateNode(createRMNode("h1", 1, 15, 10)); + selector.updateNode(createRMNode("h2", 2, 5, 10)); + selector.updateNode(createRMNode("h3", 3, 10, 10)); + selector.computeTask.run(); + List<NodeId> nodeIds = selector.selectNodes(); + System.out.println("1-> " + nodeIds); + Assert.assertEquals("h2:2", nodeIds.get(0).toString()); + Assert.assertEquals("h3:3", nodeIds.get(1).toString()); + Assert.assertEquals("h1:1", nodeIds.get(2).toString()); + + // Now update node3 + selector.updateNode(createRMNode("h3", 3, 2, 10)); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + System.out.println("2-> "+ nodeIds); + Assert.assertEquals("h3:3", nodeIds.get(0).toString()); + Assert.assertEquals("h2:2", nodeIds.get(1).toString()); + Assert.assertEquals("h1:1", nodeIds.get(2).toString()); + + // Now send update with -1 wait time + selector.updateNode(createRMNode("h4", 4, -1, 10)); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + System.out.println("3-> "+ nodeIds); + // No change + Assert.assertEquals("h3:3", nodeIds.get(0).toString()); + Assert.assertEquals("h2:2", nodeIds.get(1).toString()); + Assert.assertEquals("h1:1", nodeIds.get(2).toString()); + } + + @Test + public void testQueueLengthSort() { + NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor( + NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH); + selector.updateNode(createRMNode("h1", 1, -1, 15)); + selector.updateNode(createRMNode("h2", 2, -1, 5)); + selector.updateNode(createRMNode("h3", 3, -1, 10)); + selector.computeTask.run(); + List<NodeId> nodeIds = selector.selectNodes(); + System.out.println("1-> " + nodeIds); + Assert.assertEquals("h2:2", nodeIds.get(0).toString()); + Assert.assertEquals("h3:3", nodeIds.get(1).toString()); + Assert.assertEquals("h1:1", nodeIds.get(2).toString()); + + // Now update node3 + selector.updateNode(createRMNode("h3", 3, -1, 2)); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + System.out.println("2-> "+ nodeIds); + Assert.assertEquals("h3:3", nodeIds.get(0).toString()); + Assert.assertEquals("h2:2", nodeIds.get(1).toString()); + Assert.assertEquals("h1:1", nodeIds.get(2).toString()); + + // Now send update with -1 wait time but valid length + selector.updateNode(createRMNode("h4", 4, -1, 20)); + selector.computeTask.run(); + nodeIds = selector.selectNodes(); + System.out.println("3-> "+ nodeIds); + // No change + Assert.assertEquals("h3:3", nodeIds.get(0).toString()); + Assert.assertEquals("h2:2", nodeIds.get(1).toString()); + Assert.assertEquals("h1:1", nodeIds.get(2).toString()); + Assert.assertEquals("h4:4", nodeIds.get(3).toString()); + } + + @Test + public void testContainerQueuingLimit() { + NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor( + NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH); + selector.updateNode(createRMNode("h1", 1, -1, 15)); + selector.updateNode(createRMNode("h2", 2, -1, 5)); + selector.updateNode(createRMNode("h3", 3, -1, 10)); + + // Test Mean Calculation + selector.initThresholdCalculator(0, 6, 100); + QueueLimitCalculator calculator = selector.getThresholdCalculator(); + ContainerQueuingLimit containerQueuingLimit = calculator + .createContainerQueuingLimit(); + Assert.assertEquals(6, containerQueuingLimit.getMaxQueueLength()); + Assert.assertEquals(-1, containerQueuingLimit.getMaxQueueWaitTimeInMs()); + selector.computeTask.run(); + containerQueuingLimit = calculator.createContainerQueuingLimit(); + Assert.assertEquals(10, containerQueuingLimit.getMaxQueueLength()); + Assert.assertEquals(-1, containerQueuingLimit.getMaxQueueWaitTimeInMs()); + + // Test Limits do not exceed specified max + selector.updateNode(createRMNode("h1", 1, -1, 110)); + selector.updateNode(createRMNode("h2", 2, -1, 120)); + selector.updateNode(createRMNode("h3", 3, -1, 130)); + selector.updateNode(createRMNode("h4", 4, -1, 140)); + selector.updateNode(createRMNode("h5", 5, -1, 150)); + selector.updateNode(createRMNode("h6", 6, -1, 160)); + selector.computeTask.run(); + containerQueuingLimit = calculator.createContainerQueuingLimit(); + Assert.assertEquals(100, containerQueuingLimit.getMaxQueueLength()); + + // Test Limits do not go below specified min + selector.updateNode(createRMNode("h1", 1, -1, 1)); + selector.updateNode(createRMNode("h2", 2, -1, 2)); + selector.updateNode(createRMNode("h3", 3, -1, 3)); + selector.updateNode(createRMNode("h4", 4, -1, 4)); + selector.updateNode(createRMNode("h5", 5, -1, 5)); + selector.updateNode(createRMNode("h6", 6, -1, 6)); + selector.computeTask.run(); + containerQueuingLimit = calculator.createContainerQueuingLimit(); + Assert.assertEquals(6, containerQueuingLimit.getMaxQueueLength()); + + } + + private RMNode createRMNode(String host, int port, + int waitTime, int queueLength) { + RMNode node1 = Mockito.mock(RMNode.class); + NodeId nID1 = new FakeNodeId(host, port); + Mockito.when(node1.getNodeID()).thenReturn(nID1); + QueuedContainersStatus status1 = + Mockito.mock(QueuedContainersStatus.class); + Mockito.when(status1.getEstimatedQueueWaitTime()) + .thenReturn(waitTime); + Mockito.when(status1.getWaitQueueLength()) + .thenReturn(queueLength); + Mockito.when(node1.getQueuedContainersStatus()).thenReturn(status1); + return node1; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/f0ac18d0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java deleted file mode 100644 index aec4e86..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java +++ /dev/null @@ -1,147 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed; - -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.junit.Assert; -import org.junit.Test; -import org.mockito.Mockito; - -import java.util.List; - -public class TestTopKNodeSelector { - - static class FakeNodeId extends NodeId { - final String host; - final int port; - - public FakeNodeId(String host, int port) { - this.host = host; - this.port = port; - } - - @Override - public String getHost() { - return host; - } - - @Override - public int getPort() { - return port; - } - - @Override - protected void setHost(String host) {} - @Override - protected void setPort(int port) {} - @Override - protected void build() {} - - @Override - public String toString() { - return host + ":" + port; - } - } - - @Test - public void testQueueTimeSort() { - TopKNodeSelector selector = new TopKNodeSelector(5, - TopKNodeSelector.TopKComparator.WAIT_TIME); - selector.nodeUpdate(createRMNode("h1", 1, 15, 10)); - selector.nodeUpdate(createRMNode("h2", 2, 5, 10)); - selector.nodeUpdate(createRMNode("h3", 3, 10, 10)); - selector.computeTask.run(); - List<NodeId> nodeIds = selector.selectNodes(); - System.out.println("1-> " + nodeIds); - Assert.assertEquals("h2:2", nodeIds.get(0).toString()); - Assert.assertEquals("h3:3", nodeIds.get(1).toString()); - Assert.assertEquals("h1:1", nodeIds.get(2).toString()); - - // Now update node3 - selector.nodeUpdate(createRMNode("h3", 3, 2, 10)); - selector.computeTask.run(); - nodeIds = selector.selectNodes(); - System.out.println("2-> "+ nodeIds); - Assert.assertEquals("h3:3", nodeIds.get(0).toString()); - Assert.assertEquals("h2:2", nodeIds.get(1).toString()); - Assert.assertEquals("h1:1", nodeIds.get(2).toString()); - - // Now send update with -1 wait time - selector.nodeUpdate(createRMNode("h4", 4, -1, 10)); - selector.computeTask.run(); - nodeIds = selector.selectNodes(); - System.out.println("3-> "+ nodeIds); - // No change - Assert.assertEquals("h3:3", nodeIds.get(0).toString()); - Assert.assertEquals("h2:2", nodeIds.get(1).toString()); - Assert.assertEquals("h1:1", nodeIds.get(2).toString()); - } - - @Test - public void testQueueLengthSort() { - TopKNodeSelector selector = new TopKNodeSelector(5, - TopKNodeSelector.TopKComparator.QUEUE_LENGTH); - selector.nodeUpdate(createRMNode("h1", 1, -1, 15)); - selector.nodeUpdate(createRMNode("h2", 2, -1, 5)); - selector.nodeUpdate(createRMNode("h3", 3, -1, 10)); - selector.computeTask.run(); - List<NodeId> nodeIds = selector.selectNodes(); - System.out.println("1-> " + nodeIds); - Assert.assertEquals("h2:2", nodeIds.get(0).toString()); - Assert.assertEquals("h3:3", nodeIds.get(1).toString()); - Assert.assertEquals("h1:1", nodeIds.get(2).toString()); - - // Now update node3 - selector.nodeUpdate(createRMNode("h3", 3, -1, 2)); - selector.computeTask.run(); - nodeIds = selector.selectNodes(); - System.out.println("2-> "+ nodeIds); - Assert.assertEquals("h3:3", nodeIds.get(0).toString()); - Assert.assertEquals("h2:2", nodeIds.get(1).toString()); - Assert.assertEquals("h1:1", nodeIds.get(2).toString()); - - // Now send update with -1 wait time but valid length - selector.nodeUpdate(createRMNode("h4", 4, -1, 20)); - selector.computeTask.run(); - nodeIds = selector.selectNodes(); - System.out.println("3-> "+ nodeIds); - // No change - Assert.assertEquals("h3:3", nodeIds.get(0).toString()); - Assert.assertEquals("h2:2", nodeIds.get(1).toString()); - Assert.assertEquals("h1:1", nodeIds.get(2).toString()); - Assert.assertEquals("h4:4", nodeIds.get(3).toString()); - } - - private RMNode createRMNode(String host, int port, - int waitTime, int queueLength) { - RMNode node1 = Mockito.mock(RMNode.class); - NodeId nID1 = new FakeNodeId(host, port); - Mockito.when(node1.getNodeID()).thenReturn(nID1); - QueuedContainersStatus status1 = - Mockito.mock(QueuedContainersStatus.class); - Mockito.when(status1.getEstimatedQueueWaitTime()) - .thenReturn(waitTime); - Mockito.when(status1.getWaitQueueLength()) - .thenReturn(queueLength); - Mockito.when(node1.getQueuedContainersStatus()).thenReturn(status1); - return node1; - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org