Revert "HDDS-194. Remove NodePoolManager and node pool handling from SCM. Contributed by Elek Marton"
This reverts commit aaf03cc459a34af284f9735453aefd4ddb430d67. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0d6fe5f3 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0d6fe5f3 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0d6fe5f3 Branch: refs/heads/trunk Commit: 0d6fe5f36be5b19aab89d995866e526c5feec758 Parents: aaf03cc Author: Xiaoyu Yao <[email protected]> Authored: Wed Jun 27 13:25:45 2018 -0700 Committer: Xiaoyu Yao <[email protected]> Committed: Wed Jun 27 13:25:45 2018 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hdds/scm/ScmConfigKeys.java | 26 ++ .../org/apache/hadoop/ozone/OzoneConsts.java | 1 + .../common/src/main/resources/ozone-default.xml | 47 +++ .../container/replication/ReplicationQueue.java | 78 ----- .../replication/ReplicationReqMsg.java | 107 ------ .../container/replication/package-info.java | 23 -- .../replication/TestReplicationQueue.java | 134 -------- .../container/replication/package-info.java | 23 -- .../hdds/scm/container/ContainerMapping.java | 10 +- .../replication/ContainerSupervisor.java | 340 +++++++++++++++++++ .../container/replication/InProgressPool.java | 255 ++++++++++++++ .../scm/container/replication/PeriodicPool.java | 119 +++++++ .../scm/container/replication/package-info.java | 23 ++ .../hadoop/hdds/scm/node/NodeManager.java | 6 + .../hadoop/hdds/scm/node/NodePoolManager.java | 71 ++++ .../hadoop/hdds/scm/node/SCMNodeManager.java | 23 ++ .../hdds/scm/node/SCMNodePoolManager.java | 269 +++++++++++++++ .../hdds/scm/container/MockNodeManager.java | 6 + .../hdds/scm/node/TestSCMNodePoolManager.java | 160 +++++++++ .../testutils/ReplicationNodeManagerMock.java | 5 + .../ReplicationNodePoolManagerMock.java | 133 ++++++++ .../hadoop/ozone/scm/TestContainerSQLCli.java | 31 ++ .../org/apache/hadoop/ozone/scm/cli/SQLCLI.java | 74 ++++ 23 files changed, 1596 insertions(+), 368 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index df6fbf0..85407e6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -243,6 +243,32 @@ public final class ScmConfigKeys { public static final String OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT = "60s"; + /** + * Don't start processing a pool if we have not had a minimum number of + * seconds from the last processing. + */ + public static final String OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL = + "ozone.scm.container.report.processing.interval"; + public static final String + OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT = "60s"; + + /** + * This determines the total number of pools to be processed in parallel. + */ + public static final String OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS = + "ozone.scm.max.nodepool.processing.threads"; + public static final int OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT = 1; + /** + * These 2 settings control the number of threads in executor pool and time + * outs for thw container reports from all nodes. + */ + public static final String OZONE_SCM_MAX_CONTAINER_REPORT_THREADS = + "ozone.scm.max.container.report.threads"; + public static final int OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT = 100; + public static final String OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT = + "ozone.scm.container.reports.wait.timeout"; + public static final String OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT = + "5m"; public static final String OZONE_SCM_BLOCK_DELETION_MAX_RETRY = "ozone.scm.block.deletion.max.retry"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index 08a5ffd..c40dc8e 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -91,6 +91,7 @@ public final class OzoneConsts { public static final String SCM_CONTAINER_DB = "scm-" + CONTAINER_DB_SUFFIX; public static final String DN_CONTAINER_DB = "-dn-"+ CONTAINER_DB_SUFFIX; public static final String BLOCK_DB = "block.db"; + public static final String NODEPOOL_DB = "nodepool.db"; public static final String OPEN_CONTAINERS_DB = "openContainers.db"; public static final String DELETED_BLOCK_DB = "deletedBlock.db"; public static final String KSM_DB_NAME = "ksm.db"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/common/src/main/resources/ozone-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 25365c8..7a91610 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -572,6 +572,25 @@ </description> </property> <property> + <name>ozone.scm.container.report.processing.interval</name> + <value>60s</value> + <tag>OZONE, PERFORMANCE</tag> + <description>Time interval for scm to process container reports + for a node pool. Scm handles node pool reports in a cyclic clock + manner, it fetches pools periodically with this time interval. + </description> + </property> + <property> + <name>ozone.scm.container.reports.wait.timeout</name> + <value>300s</value> + <tag>OZONE, PERFORMANCE, MANAGEMENT</tag> + <description>Maximum time to wait in seconds for processing all container + reports from + a node pool. It determines the timeout for a + node pool report. + </description> + </property> + <property> <name>ozone.scm.container.size.gb</name> <value>5</value> <tag>OZONE, PERFORMANCE, MANAGEMENT</tag> @@ -774,6 +793,17 @@ </description> </property> <property> + <name>ozone.scm.max.container.report.threads</name> + <value>100</value> + <tag>OZONE, PERFORMANCE</tag> + <description> + Maximum number of threads to process container reports in scm. + Each container report from a data node is processed by scm in a worker + thread, fetched from a thread pool. This property is used to control the + maximum size of the thread pool. + </description> + </property> + <property> <name>ozone.scm.max.hb.count.to.process</name> <value>5000</value> <tag>OZONE, MANAGEMENT, PERFORMANCE</tag> @@ -785,6 +815,14 @@ </description> </property> <property> + <name>ozone.scm.max.nodepool.processing.threads</name> + <value>1</value> + <tag>OZONE, MANAGEMENT, PERFORMANCE</tag> + <description> + Number of node pools to process in parallel. + </description> + </property> + <property> <name>ozone.scm.names</name> <value/> <tag>OZONE</tag> @@ -806,6 +844,15 @@ </description> </property> <property> + <name>ozone.scm.max.nodepool.processing.threads</name> + <value>1</value> + <tag>OZONE, SCM</tag> + <description> + Controls the number of node pools that can be processed in parallel by + Container Supervisor. + </description> + </property> + <property> <name>ozone.trace.enabled</name> <value>false</value> <tag>OZONE, DEBUG</tag> http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java deleted file mode 100644 index b83ecf1..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.replication; - -import java.util.List; -import java.util.PriorityQueue; -import java.util.Queue; - -/** - * Priority queue to handle under-replicated and over replicated containers - * in ozone. ReplicationManager will consume these messages and decide - * accordingly. - */ -public class ReplicationQueue { - - private final Queue<ReplicationReqMsg> queue; - - ReplicationQueue() { - queue = new PriorityQueue<>(); - } - - public synchronized boolean add(ReplicationReqMsg repObj) { - if (this.queue.contains(repObj)) { - // Remove the earlier message and insert this one - this.queue.remove(repObj); - return this.queue.add(repObj); - } else { - return this.queue.add(repObj); - } - } - - public synchronized boolean remove(ReplicationReqMsg repObj) { - return queue.remove(repObj); - } - - /** - * Retrieves, but does not remove, the head of this queue, - * or returns {@code null} if this queue is empty. - * - * @return the head of this queue, or {@code null} if this queue is empty - */ - public synchronized ReplicationReqMsg peek() { - return queue.peek(); - } - - /** - * Retrieves and removes the head of this queue, - * or returns {@code null} if this queue is empty. - * - * @return the head of this queue, or {@code null} if this queue is empty - */ - public synchronized ReplicationReqMsg poll() { - return queue.poll(); - } - - public synchronized boolean removeAll(List<ReplicationReqMsg> repObjs) { - return queue.removeAll(repObjs); - } - - public int size() { - return queue.size(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationReqMsg.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationReqMsg.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationReqMsg.java deleted file mode 100644 index 8d26fc3..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationReqMsg.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.replication; - -import java.io.Serializable; -import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.commons.lang3.math.NumberUtils; - -/** - * Wrapper class for hdds replication queue. Implements its natural - * ordering for priority queue. - */ -public class ReplicationReqMsg implements Comparable<ReplicationReqMsg>, - Serializable { - private final long containerId; - private final short replicationCount; - private final short expecReplicationCount; - private final long timestamp; - - public ReplicationReqMsg(long containerId, short replicationCount, - long timestamp, short expecReplicationCount) { - this.containerId = containerId; - this.replicationCount = replicationCount; - this.timestamp = timestamp; - this.expecReplicationCount = expecReplicationCount; - } - - /** - * Compares this object with the specified object for order. Returns a - * negative integer, zero, or a positive integer as this object is less - * than, equal to, or greater than the specified object. - * @param o the object to be compared. - * @return a negative integer, zero, or a positive integer as this object - * is less than, equal to, or greater than the specified object. - * @throws NullPointerException if the specified object is null - * @throws ClassCastException if the specified object's type prevents it - * from being compared to this object. - */ - @Override - public int compareTo(ReplicationReqMsg o) { - if (this == o) { - return 0; - } - if (o == null) { - return 1; - } - int retVal = NumberUtils - .compare(getReplicationCount() - getExpecReplicationCount(), - o.getReplicationCount() - o.getExpecReplicationCount()); - if (retVal != 0) { - return retVal; - } - return NumberUtils.compare(getTimestamp(), o.getTimestamp()); - } - - @Override - public int hashCode() { - return new HashCodeBuilder(91, 1011) - .append(getContainerId()) - .toHashCode(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - ReplicationReqMsg that = (ReplicationReqMsg) o; - return new EqualsBuilder().append(getContainerId(), that.getContainerId()) - .isEquals(); - } - - public long getContainerId() { - return containerId; - } - - public short getReplicationCount() { - return replicationCount; - } - - public long getTimestamp() { - return timestamp; - } - - public short getExpecReplicationCount() { - return expecReplicationCount; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java deleted file mode 100644 index 7f335e3..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.replication; - -/** - * Ozone Container replicaton related classes. - */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java deleted file mode 100644 index 39c61d3..0000000 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.replication; - -import java.util.Random; -import java.util.UUID; -import org.apache.hadoop.util.Time; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -/** - * Test class for ReplicationQueue. - */ -public class TestReplicationQueue { - - private ReplicationQueue replicationQueue; - private Random random; - - @Before - public void setUp() { - replicationQueue = new ReplicationQueue(); - random = new Random(); - } - - @Test - public void testDuplicateAddOp() { - long contId = random.nextLong(); - String nodeId = UUID.randomUUID().toString(); - ReplicationReqMsg obj1, obj2, obj3; - long time = Time.monotonicNow(); - obj1 = new ReplicationReqMsg(contId, (short) 2, time, (short) 3); - obj2 = new ReplicationReqMsg(contId, (short) 2, time + 1, (short) 3); - obj3 = new ReplicationReqMsg(contId, (short) 1, time+2, (short) 3); - - replicationQueue.add(obj1); - replicationQueue.add(obj2); - replicationQueue.add(obj3); - Assert.assertEquals("Should add only 1 msg as second one is duplicate", - 1, replicationQueue.size()); - ReplicationReqMsg temp = replicationQueue.poll(); - Assert.assertEquals(temp, obj3); - } - - @Test - public void testPollOp() { - long contId = random.nextLong(); - String nodeId = UUID.randomUUID().toString(); - ReplicationReqMsg msg1, msg2, msg3, msg4, msg5; - msg1 = new ReplicationReqMsg(contId, (short) 1, Time.monotonicNow(), - (short) 3); - long time = Time.monotonicNow(); - msg2 = new ReplicationReqMsg(contId + 1, (short) 4, time, (short) 3); - msg3 = new ReplicationReqMsg(contId + 2, (short) 0, time, (short) 3); - msg4 = new ReplicationReqMsg(contId, (short) 2, time, (short) 3); - // Replication message for same container but different nodeId - msg5 = new ReplicationReqMsg(contId + 1, (short) 2, time, (short) 3); - - replicationQueue.add(msg1); - replicationQueue.add(msg2); - replicationQueue.add(msg3); - replicationQueue.add(msg4); - replicationQueue.add(msg5); - Assert.assertEquals("Should have 3 objects", - 3, replicationQueue.size()); - - // Since Priority queue orders messages according to replication count, - // message with lowest replication should be first - ReplicationReqMsg temp; - temp = replicationQueue.poll(); - Assert.assertEquals("Should have 2 objects", - 2, replicationQueue.size()); - Assert.assertEquals(temp, msg3); - - temp = replicationQueue.poll(); - Assert.assertEquals("Should have 1 objects", - 1, replicationQueue.size()); - Assert.assertEquals(temp, msg5); - - // Message 2 should be ordered before message 5 as both have same replication - // number but message 2 has earlier timestamp. - temp = replicationQueue.poll(); - Assert.assertEquals("Should have 0 objects", - replicationQueue.size(), 0); - Assert.assertEquals(temp, msg4); - } - - @Test - public void testRemoveOp() { - long contId = random.nextLong(); - String nodeId = UUID.randomUUID().toString(); - ReplicationReqMsg obj1, obj2, obj3; - obj1 = new ReplicationReqMsg(contId, (short) 1, Time.monotonicNow(), - (short) 3); - obj2 = new ReplicationReqMsg(contId + 1, (short) 2, Time.monotonicNow(), - (short) 3); - obj3 = new ReplicationReqMsg(contId + 2, (short) 3, Time.monotonicNow(), - (short) 3); - - replicationQueue.add(obj1); - replicationQueue.add(obj2); - replicationQueue.add(obj3); - Assert.assertEquals("Should have 3 objects", - 3, replicationQueue.size()); - - replicationQueue.remove(obj3); - Assert.assertEquals("Should have 2 objects", - 2, replicationQueue.size()); - - replicationQueue.remove(obj2); - Assert.assertEquals("Should have 1 objects", - 1, replicationQueue.size()); - - replicationQueue.remove(obj1); - Assert.assertEquals("Should have 0 objects", - 0, replicationQueue.size()); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java deleted file mode 100644 index 5b1fd0f..0000000 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -/** - * SCM Testing and Mocking Utils. - */ -package org.apache.hadoop.ozone.container.replication; -// Test classes for Replication functionality. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java index 9fd30f2..b563e90 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.container.closer.ContainerCloser; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.replication.ContainerSupervisor; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector; @@ -79,6 +80,7 @@ public class ContainerMapping implements Mapping { private final PipelineSelector pipelineSelector; private final ContainerStateManager containerStateManager; private final LeaseManager<ContainerInfo> containerLeaseManager; + private final ContainerSupervisor containerSupervisor; private final float containerCloseThreshold; private final ContainerCloser closer; private final long size; @@ -125,7 +127,9 @@ public class ContainerMapping implements Mapping { OZONE_SCM_CONTAINER_SIZE_DEFAULT) * 1024 * 1024 * 1024; this.containerStateManager = new ContainerStateManager(conf, this); - + this.containerSupervisor = + new ContainerSupervisor(conf, nodeManager, + nodeManager.getNodePoolManager()); this.containerCloseThreshold = conf.getFloat( ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD, ScmConfigKeys.OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT); @@ -403,8 +407,8 @@ public class ContainerMapping implements Mapping { throws IOException { List<StorageContainerDatanodeProtocolProtos.ContainerInfo> containerInfos = reports.getReportsList(); - - for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState : + containerSupervisor.handleContainerReport(datanodeDetails, reports); + for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState : containerInfos) { byte[] dbKey = Longs.toByteArray(datanodeState.getContainerID()); lock.lock(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java new file mode 100644 index 0000000..5bd0574 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerSupervisor.java @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodePoolManager; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static com.google.common.util.concurrent.Uninterruptibles + .sleepUninterruptibly; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT; + +/** + * This class takes a set of container reports that belong to a pool and then + * computes the replication levels for each container. + */ +public class ContainerSupervisor implements Closeable { + public static final Logger LOG = + LoggerFactory.getLogger(ContainerSupervisor.class); + + private final NodePoolManager poolManager; + private final HashSet<String> poolNames; + private final PriorityQueue<PeriodicPool> poolQueue; + private final NodeManager nodeManager; + private final long containerProcessingLag; + private final AtomicBoolean runnable; + private final ExecutorService executorService; + private final long maxPoolWait; + private long poolProcessCount; + private final List<InProgressPool> inProgressPoolList; + private final AtomicInteger threadFaultCount; + private final int inProgressPoolMaxCount; + + private final ReadWriteLock inProgressPoolListLock; + + /** + * Returns the number of times we have processed pools. + * @return long + */ + public long getPoolProcessCount() { + return poolProcessCount; + } + + + /** + * Constructs a class that computes Replication Levels. + * + * @param conf - OzoneConfiguration + * @param nodeManager - Node Manager + * @param poolManager - Pool Manager + */ + public ContainerSupervisor(Configuration conf, NodeManager nodeManager, + NodePoolManager poolManager) { + Preconditions.checkNotNull(poolManager); + Preconditions.checkNotNull(nodeManager); + this.containerProcessingLag = + conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL, + OZONE_SCM_CONTAINER_REPORT_PROCESSING_INTERVAL_DEFAULT, + TimeUnit.SECONDS + ) * 1000; + int maxContainerReportThreads = + conf.getInt(OZONE_SCM_MAX_CONTAINER_REPORT_THREADS, + OZONE_SCM_MAX_CONTAINER_REPORT_THREADS_DEFAULT + ); + this.maxPoolWait = + conf.getTimeDuration(OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT, + OZONE_SCM_CONTAINER_REPORTS_WAIT_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + this.inProgressPoolMaxCount = conf.getInt( + OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS, + OZONE_SCM_MAX_NODEPOOL_PROCESSING_THREADS_DEFAULT); + this.poolManager = poolManager; + this.nodeManager = nodeManager; + this.poolNames = new HashSet<>(); + this.poolQueue = new PriorityQueue<>(); + this.runnable = new AtomicBoolean(true); + this.threadFaultCount = new AtomicInteger(0); + this.executorService = newCachedThreadPool( + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Container Reports Processing Thread - %d") + .build(), maxContainerReportThreads); + this.inProgressPoolList = new LinkedList<>(); + this.inProgressPoolListLock = new ReentrantReadWriteLock(); + + initPoolProcessThread(); + } + + private ExecutorService newCachedThreadPool(ThreadFactory threadFactory, + int maxThreads) { + return new HadoopThreadPoolExecutor(0, maxThreads, 60L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), threadFactory); + } + + /** + * Returns the number of pools that are under process right now. + * @return int - Number of pools that are in process. + */ + public int getInProgressPoolCount() { + return inProgressPoolList.size(); + } + + /** + * Exits the background thread. + */ + public void setExit() { + this.runnable.set(false); + } + + /** + * Adds or removes pools from names that we need to process. + * + * There are two different cases that we need to process. + * The case where some pools are being added and some times we have to + * handle cases where pools are removed. + */ + private void refreshPools() { + List<String> pools = this.poolManager.getNodePools(); + if (pools != null) { + + HashSet<String> removedPools = + computePoolDifference(this.poolNames, new HashSet<>(pools)); + + HashSet<String> addedPools = + computePoolDifference(new HashSet<>(pools), this.poolNames); + // TODO: Support remove pool API in pool manager so that this code + // path can be tested. This never happens in the current code base. + for (String poolName : removedPools) { + for (PeriodicPool periodicPool : poolQueue) { + if (periodicPool.getPoolName().compareTo(poolName) == 0) { + poolQueue.remove(periodicPool); + } + } + } + // Remove the pool names that we have in the list. + this.poolNames.removeAll(removedPools); + + for (String poolName : addedPools) { + poolQueue.add(new PeriodicPool(poolName)); + } + + // Add to the pool names we are tracking. + poolNames.addAll(addedPools); + } + + } + + /** + * Handle the case where pools are added. + * + * @param newPools - New Pools list + * @param oldPool - oldPool List. + */ + private HashSet<String> computePoolDifference(HashSet<String> newPools, + Set<String> oldPool) { + Preconditions.checkNotNull(newPools); + Preconditions.checkNotNull(oldPool); + HashSet<String> newSet = new HashSet<>(newPools); + newSet.removeAll(oldPool); + return newSet; + } + + private void initPoolProcessThread() { + + /* + * Task that runs to check if we need to start a pool processing job. + * if so we create a pool reconciliation job and find out of all the + * expected containers are on the nodes. + */ + Runnable processPools = () -> { + while (runnable.get()) { + // Make sure that we don't have any new pools. + refreshPools(); + while (inProgressPoolList.size() < inProgressPoolMaxCount) { + PeriodicPool pool = poolQueue.poll(); + if (pool != null) { + if (pool.getLastProcessedTime() + this.containerProcessingLag > + Time.monotonicNow()) { + LOG.debug("Not within the time window for processing: {}", + pool.getPoolName()); + // we might over sleep here, not a big deal. + sleepUninterruptibly(this.containerProcessingLag, + TimeUnit.MILLISECONDS); + } + LOG.debug("Adding pool {} to container processing queue", + pool.getPoolName()); + InProgressPool inProgressPool = new InProgressPool(maxPoolWait, + pool, this.nodeManager, this.poolManager, this.executorService); + inProgressPool.startReconciliation(); + inProgressPoolListLock.writeLock().lock(); + try { + inProgressPoolList.add(inProgressPool); + } finally { + inProgressPoolListLock.writeLock().unlock(); + } + poolProcessCount++; + } else { + break; + } + } + sleepUninterruptibly(this.maxPoolWait, TimeUnit.MILLISECONDS); + inProgressPoolListLock.readLock().lock(); + try { + for (InProgressPool inProgressPool : inProgressPoolList) { + inProgressPool.finalizeReconciliation(); + poolQueue.add(inProgressPool.getPool()); + } + } finally { + inProgressPoolListLock.readLock().unlock(); + } + inProgressPoolListLock.writeLock().lock(); + try { + inProgressPoolList.clear(); + } finally { + inProgressPoolListLock.writeLock().unlock(); + } + } + }; + + // We will have only one thread for pool processing. + Thread poolProcessThread = new Thread(processPools); + poolProcessThread.setDaemon(true); + poolProcessThread.setName("Pool replica thread"); + poolProcessThread.setUncaughtExceptionHandler((Thread t, Throwable e) -> { + // Let us just restart this thread after logging a critical error. + // if this thread is not running we cannot handle commands from SCM. + LOG.error("Critical Error : Pool replica thread encountered an " + + "error. Thread: {} Error Count : {}", t.toString(), e, + threadFaultCount.incrementAndGet()); + poolProcessThread.start(); + // TODO : Add a config to restrict how many times we will restart this + // thread in a single session. + }); + poolProcessThread.start(); + } + + /** + * Adds a container report to appropriate inProgress Pool. + * @param containerReport -- Container report for a specific container from + * a datanode. + */ + public void handleContainerReport(DatanodeDetails datanodeDetails, + ContainerReportsProto containerReport) { + inProgressPoolListLock.readLock().lock(); + try { + String poolName = poolManager.getNodePool(datanodeDetails); + for (InProgressPool ppool : inProgressPoolList) { + if (ppool.getPoolName().equalsIgnoreCase(poolName)) { + ppool.handleContainerReport(datanodeDetails, containerReport); + return; + } + } + // TODO: Decide if we can do anything else with this report. + LOG.debug("Discarding the container report for pool {}. " + + "That pool is not currently in the pool reconciliation process." + + " Container Name: {}", poolName, datanodeDetails); + } catch (SCMException e) { + LOG.warn("Skipping processing container report from datanode {}, " + + "cause: failed to get the corresponding node pool", + datanodeDetails.toString(), e); + } finally { + inProgressPoolListLock.readLock().unlock(); + } + } + + /** + * Get in process pool list, used for testing. + * @return List of InProgressPool + */ + @VisibleForTesting + public List<InProgressPool> getInProcessPoolList() { + return inProgressPoolList; + } + + /** + * Shutdown the Container Replication Manager. + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + setExit(); + HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java new file mode 100644 index 0000000..4b54731 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/InProgressPool.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodePoolManager; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerInfo; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ContainerReportsProto; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +/** + * These are pools that are actively checking for replication status of the + * containers. + */ +public final class InProgressPool { + public static final Logger LOG = + LoggerFactory.getLogger(InProgressPool.class); + + private final PeriodicPool pool; + private final NodeManager nodeManager; + private final NodePoolManager poolManager; + private final ExecutorService executorService; + private final Map<Long, Integer> containerCountMap; + private final Map<UUID, Boolean> processedNodeSet; + private final long startTime; + private ProgressStatus status; + private AtomicInteger nodeCount; + private AtomicInteger nodeProcessed; + private AtomicInteger containerProcessedCount; + private long maxWaitTime; + /** + * Constructs an pool that is being processed. + * @param maxWaitTime - Maximum wait time in milliseconds. + * @param pool - Pool that we are working against + * @param nodeManager - Nodemanager + * @param poolManager - pool manager + * @param executorService - Shared Executor service. + */ + InProgressPool(long maxWaitTime, PeriodicPool pool, + NodeManager nodeManager, NodePoolManager poolManager, + ExecutorService executorService) { + Preconditions.checkNotNull(pool); + Preconditions.checkNotNull(nodeManager); + Preconditions.checkNotNull(poolManager); + Preconditions.checkNotNull(executorService); + Preconditions.checkArgument(maxWaitTime > 0); + this.pool = pool; + this.nodeManager = nodeManager; + this.poolManager = poolManager; + this.executorService = executorService; + this.containerCountMap = new ConcurrentHashMap<>(); + this.processedNodeSet = new ConcurrentHashMap<>(); + this.maxWaitTime = maxWaitTime; + startTime = Time.monotonicNow(); + } + + /** + * Returns periodic pool. + * + * @return PeriodicPool + */ + public PeriodicPool getPool() { + return pool; + } + + /** + * We are done if we have got reports from all nodes or we have + * done waiting for the specified time. + * + * @return true if we are done, false otherwise. + */ + public boolean isDone() { + return (nodeCount.get() == nodeProcessed.get()) || + (this.startTime + this.maxWaitTime) > Time.monotonicNow(); + } + + /** + * Gets the number of containers processed. + * + * @return int + */ + public int getContainerProcessedCount() { + return containerProcessedCount.get(); + } + + /** + * Returns the start time in milliseconds. + * + * @return - Start Time. + */ + public long getStartTime() { + return startTime; + } + + /** + * Get the number of nodes in this pool. + * + * @return - node count + */ + public int getNodeCount() { + return nodeCount.get(); + } + + /** + * Get the number of nodes that we have already processed container reports + * from. + * + * @return - Processed count. + */ + public int getNodeProcessed() { + return nodeProcessed.get(); + } + + /** + * Returns the current status. + * + * @return Status + */ + public ProgressStatus getStatus() { + return status; + } + + /** + * Starts the reconciliation process for all the nodes in the pool. + */ + public void startReconciliation() { + List<DatanodeDetails> datanodeDetailsList = + this.poolManager.getNodes(pool.getPoolName()); + if (datanodeDetailsList.size() == 0) { + LOG.error("Datanode list for {} is Empty. Pool with no nodes ? ", + pool.getPoolName()); + this.status = ProgressStatus.Error; + return; + } + + nodeProcessed = new AtomicInteger(0); + containerProcessedCount = new AtomicInteger(0); + nodeCount = new AtomicInteger(0); + this.status = ProgressStatus.InProgress; + this.getPool().setLastProcessedTime(Time.monotonicNow()); + } + + /** + * Queues a container Report for handling. This is done in a worker thread + * since decoding a container report might be compute intensive . We don't + * want to block since we have asked for bunch of container reports + * from a set of datanodes. + * + * @param containerReport - ContainerReport + */ + public void handleContainerReport(DatanodeDetails datanodeDetails, + ContainerReportsProto containerReport) { + if (status == ProgressStatus.InProgress) { + executorService.submit(processContainerReport(datanodeDetails, + containerReport)); + } else { + LOG.debug("Cannot handle container report when the pool is in {} status.", + status); + } + } + + private Runnable processContainerReport(DatanodeDetails datanodeDetails, + ContainerReportsProto reports) { + return () -> { + if (processedNodeSet.computeIfAbsent(datanodeDetails.getUuid(), + (k) -> true)) { + nodeProcessed.incrementAndGet(); + LOG.debug("Total Nodes processed : {} Node Name: {} ", nodeProcessed, + datanodeDetails.getUuid()); + for (ContainerInfo info : reports.getReportsList()) { + containerProcessedCount.incrementAndGet(); + LOG.debug("Total Containers processed: {} Container Name: {}", + containerProcessedCount.get(), info.getContainerID()); + + // Update the container map with count + 1 if the key exists or + // update the map with 1. Since this is a concurrentMap the + // computation and update is atomic. + containerCountMap.merge(info.getContainerID(), 1, Integer::sum); + } + } + }; + } + + /** + * Filter the containers based on specific rules. + * + * @param predicate -- Predicate to filter by + * @return A list of map entries. + */ + public List<Map.Entry<Long, Integer>> filterContainer( + Predicate<Map.Entry<Long, Integer>> predicate) { + return containerCountMap.entrySet().stream() + .filter(predicate).collect(Collectors.toList()); + } + + /** + * Used only for testing, calling this will abort container report + * processing. This is very dangerous call and should not be made by any users + */ + @VisibleForTesting + public void setDoneProcessing() { + nodeProcessed.set(nodeCount.get()); + } + + /** + * Returns the pool name. + * + * @return Name of the pool. + */ + String getPoolName() { + return pool.getPoolName(); + } + + public void finalizeReconciliation() { + status = ProgressStatus.Done; + //TODO: Add finalizing logic. This is where actual reconciliation happens. + } + + /** + * Current status of the computing replication status. + */ + public enum ProgressStatus { + InProgress, Done, Error + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/PeriodicPool.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/PeriodicPool.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/PeriodicPool.java new file mode 100644 index 0000000..ef28aa7 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/PeriodicPool.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Periodic pool is a pool with a time stamp, this allows us to process pools + * based on a cyclic clock. + */ +public class PeriodicPool implements Comparable<PeriodicPool> { + private final String poolName; + private long lastProcessedTime; + private AtomicLong totalProcessedCount; + + /** + * Constructs a periodic pool. + * + * @param poolName - Name of the pool + */ + public PeriodicPool(String poolName) { + this.poolName = poolName; + lastProcessedTime = 0; + totalProcessedCount = new AtomicLong(0); + } + + /** + * Get pool Name. + * @return PoolName + */ + public String getPoolName() { + return poolName; + } + + /** + * Compares this object with the specified object for order. Returns a + * negative integer, zero, or a positive integer as this object is less + * than, equal to, or greater than the specified object. + * + * @param o the object to be compared. + * @return a negative integer, zero, or a positive integer as this object is + * less than, equal to, or greater than the specified object. + * @throws NullPointerException if the specified object is null + * @throws ClassCastException if the specified object's type prevents it + * from being compared to this object. + */ + @Override + public int compareTo(PeriodicPool o) { + return Long.compare(this.lastProcessedTime, o.lastProcessedTime); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + PeriodicPool that = (PeriodicPool) o; + + return poolName.equals(that.poolName); + } + + @Override + public int hashCode() { + return poolName.hashCode(); + } + + /** + * Returns the Total Times we have processed this pool. + * + * @return processed count. + */ + public long getTotalProcessedCount() { + return totalProcessedCount.get(); + } + + /** + * Gets the last time we processed this pool. + * @return time in milliseconds + */ + public long getLastProcessedTime() { + return this.lastProcessedTime; + } + + + /** + * Sets the last processed time. + * + * @param lastProcessedTime - Long in milliseconds. + */ + + public void setLastProcessedTime(long lastProcessedTime) { + this.lastProcessedTime = lastProcessedTime; + } + + /* + * Increments the total processed count. + */ + public void incTotalProcessedCount() { + this.totalProcessedCount.incrementAndGet(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java new file mode 100644 index 0000000..7bbe2ef --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; +/* + This package contains routines that manage replication of a container. This + relies on container reports to understand the replication level of a + container - UnderReplicated, Replicated, OverReplicated -- and manages the + replication level based on that. + */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index 72d7e94..4392633 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -124,6 +124,12 @@ public interface NodeManager extends StorageContainerNodeProtocol, SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails); /** + * Returns the NodePoolManager associated with the NodeManager. + * @return NodePoolManager + */ + NodePoolManager getNodePoolManager(); + + /** * Wait for the heartbeat is processed by NodeManager. * @return true if heartbeat has been processed. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodePoolManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodePoolManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodePoolManager.java new file mode 100644 index 0000000..46faf9ca --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodePoolManager.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.node; + +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * Interface that defines SCM NodePoolManager. + */ +public interface NodePoolManager extends Closeable { + + /** + * Add a node to a node pool. + * @param pool - name of the node pool. + * @param node - data node. + */ + void addNode(String pool, DatanodeDetails node) throws IOException; + + /** + * Remove a node from a node pool. + * @param pool - name of the node pool. + * @param node - data node. + * @throws SCMException + */ + void removeNode(String pool, DatanodeDetails node) + throws SCMException; + + /** + * Get a list of known node pools. + * @return a list of known node pool names or an empty list if not node pool + * is defined. + */ + List<String> getNodePools(); + + /** + * Get all nodes of a node pool given the name of the node pool. + * @param pool - name of the node pool. + * @return a list of datanode ids or an empty list if the node pool was not + * found. + */ + List<DatanodeDetails> getNodes(String pool); + + /** + * Get the node pool name if the node has been added to a node pool. + * @param datanodeDetails - datanode ID. + * @return node pool name if it has been assigned. + * null if the node has not been assigned to any node pool yet. + */ + String getNodePool(DatanodeDetails datanodeDetails) throws SCMException; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index adca8ea..fc8b013 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -53,6 +53,7 @@ import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.concurrent.HadoopExecutors; +import com.google.protobuf.GeneratedMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -158,6 +159,7 @@ public class SCMNodeManager private ObjectName nmInfoBean; // Node pool manager. + private final SCMNodePoolManager nodePoolManager; private final StorageContainerManager scmManager; public static final Event<CommandForDatanode> DATANODE_COMMAND = @@ -208,6 +210,7 @@ public class SCMNodeManager registerMXBean(); + this.nodePoolManager = new SCMNodePoolManager(conf); this.scmManager = scmManager; } @@ -679,6 +682,7 @@ public class SCMNodeManager @Override public void close() throws IOException { unregisterMXBean(); + nodePoolManager.close(); executorService.shutdown(); try { if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { @@ -756,6 +760,20 @@ public class SCMNodeManager LOG.info("Leaving startup chill mode."); } + // TODO: define node pool policy for non-default node pool. + // For now, all nodes are added to the "DefaultNodePool" upon registration + // if it has not been added to any node pool yet. + try { + if (nodePoolManager.getNodePool(datanodeDetails) == null) { + nodePoolManager.addNode(SCMNodePoolManager.DEFAULT_NODEPOOL, + datanodeDetails); + } + } catch (IOException e) { + // TODO: make sure registration failure is handled correctly. + return RegisteredCommand.newBuilder() + .setErrorCode(ErrorCode.errorNodeNotPermitted) + .build(); + } // Updating Node Report, as registration is successful updateNodeStat(datanodeDetails.getUuid(), nodeReport); LOG.info("Data node with ID: {} Registered.", @@ -842,6 +860,11 @@ public class SCMNodeManager } @Override + public NodePoolManager getNodePoolManager() { + return nodePoolManager; + } + + @Override public Map<String, Integer> getNodeCount() { Map<String, Integer> nodeCountMap = new HashMap<String, Integer>(); for(NodeState state : NodeState.values()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodePoolManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodePoolManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodePoolManager.java new file mode 100644 index 0000000..faf330e --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodePoolManager.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.node; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.utils.MetadataStore; +import org.apache.hadoop.utils.MetadataStoreBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_DB_CACHE_SIZE_DEFAULT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_DB_CACHE_SIZE_MB; +import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes + .FAILED_TO_FIND_NODE_IN_POOL; +import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes + .FAILED_TO_LOAD_NODEPOOL; +import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath; +import static org.apache.hadoop.ozone.OzoneConsts.NODEPOOL_DB; + +/** + * SCM node pool manager that manges node pools. + */ +public final class SCMNodePoolManager implements NodePoolManager { + + private static final Logger LOG = + LoggerFactory.getLogger(SCMNodePoolManager.class); + private static final List<DatanodeDetails> EMPTY_NODE_LIST = + new ArrayList<>(); + private static final List<String> EMPTY_NODEPOOL_LIST = new ArrayList<>(); + public static final String DEFAULT_NODEPOOL = "DefaultNodePool"; + + // DB that saves the node to node pool mapping. + private MetadataStore nodePoolStore; + + // In-memory node pool to nodes mapping + private HashMap<String, Set<DatanodeDetails>> nodePools; + + // Read-write lock for nodepool operations + private ReadWriteLock lock; + + /** + * Construct SCMNodePoolManager class that manages node to node pool mapping. + * @param conf - configuration. + * @throws IOException + */ + public SCMNodePoolManager(final OzoneConfiguration conf) + throws IOException { + final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB, + OZONE_SCM_DB_CACHE_SIZE_DEFAULT); + File metaDir = getOzoneMetaDirPath(conf); + String scmMetaDataDir = metaDir.getPath(); + File nodePoolDBPath = new File(scmMetaDataDir, NODEPOOL_DB); + nodePoolStore = MetadataStoreBuilder.newBuilder() + .setConf(conf) + .setDbFile(nodePoolDBPath) + .setCacheSize(cacheSize * OzoneConsts.MB) + .build(); + nodePools = new HashMap<>(); + lock = new ReentrantReadWriteLock(); + init(); + } + + /** + * Initialize the in-memory store based on persist store from level db. + * No lock is needed as init() is only invoked by constructor. + * @throws SCMException + */ + private void init() throws SCMException { + try { + nodePoolStore.iterate(null, (key, value) -> { + try { + DatanodeDetails nodeId = DatanodeDetails.getFromProtoBuf( + HddsProtos.DatanodeDetailsProto.PARSER.parseFrom(key)); + String poolName = DFSUtil.bytes2String(value); + + Set<DatanodeDetails> nodePool = null; + if (nodePools.containsKey(poolName)) { + nodePool = nodePools.get(poolName); + } else { + nodePool = new HashSet<>(); + nodePools.put(poolName, nodePool); + } + nodePool.add(nodeId); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding node: {} to node pool: {}", + nodeId, poolName); + } + } catch (IOException e) { + LOG.warn("Can't add a datanode to node pool, continue next..."); + } + return true; + }); + } catch (IOException e) { + LOG.error("Loading node pool error " + e); + throw new SCMException("Failed to load node pool", + FAILED_TO_LOAD_NODEPOOL); + } + } + + /** + * Add a datanode to a node pool. + * @param pool - name of the node pool. + * @param node - name of the datanode. + */ + @Override + public void addNode(final String pool, final DatanodeDetails node) + throws IOException { + Preconditions.checkNotNull(pool, "pool name is null"); + Preconditions.checkNotNull(node, "node is null"); + lock.writeLock().lock(); + try { + // add to the persistent store + nodePoolStore.put(node.getProtoBufMessage().toByteArray(), + DFSUtil.string2Bytes(pool)); + + // add to the in-memory store + Set<DatanodeDetails> nodePool = null; + if (nodePools.containsKey(pool)) { + nodePool = nodePools.get(pool); + } else { + nodePool = new HashSet<DatanodeDetails>(); + nodePools.put(pool, nodePool); + } + nodePool.add(node); + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Remove a datanode from a node pool. + * @param pool - name of the node pool. + * @param node - datanode id. + * @throws SCMException + */ + @Override + public void removeNode(final String pool, final DatanodeDetails node) + throws SCMException { + Preconditions.checkNotNull(pool, "pool name is null"); + Preconditions.checkNotNull(node, "node is null"); + lock.writeLock().lock(); + try { + // Remove from the persistent store + byte[] kName = node.getProtoBufMessage().toByteArray(); + byte[] kData = nodePoolStore.get(kName); + if (kData == null) { + throw new SCMException(String.format("Unable to find node %s from" + + " pool %s in DB.", DFSUtil.bytes2String(kName), pool), + FAILED_TO_FIND_NODE_IN_POOL); + } + nodePoolStore.delete(kName); + + // Remove from the in-memory store + if (nodePools.containsKey(pool)) { + Set<DatanodeDetails> nodePool = nodePools.get(pool); + nodePool.remove(node); + } else { + throw new SCMException(String.format("Unable to find node %s from" + + " pool %s in MAP.", DFSUtil.bytes2String(kName), pool), + FAILED_TO_FIND_NODE_IN_POOL); + } + } catch (IOException e) { + throw new SCMException("Failed to remove node " + node.toString() + + " from node pool " + pool, e, + SCMException.ResultCodes.IO_EXCEPTION); + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Get all the node pools. + * @return all the node pools. + */ + @Override + public List<String> getNodePools() { + lock.readLock().lock(); + try { + if (!nodePools.isEmpty()) { + return nodePools.keySet().stream().collect(Collectors.toList()); + } else { + return EMPTY_NODEPOOL_LIST; + } + } finally { + lock.readLock().unlock(); + } + } + + /** + * Get all datanodes of a specific node pool. + * @param pool - name of the node pool. + * @return all datanodes of the specified node pool. + */ + @Override + public List<DatanodeDetails> getNodes(final String pool) { + Preconditions.checkNotNull(pool, "pool name is null"); + if (nodePools.containsKey(pool)) { + return nodePools.get(pool).stream().collect(Collectors.toList()); + } else { + return EMPTY_NODE_LIST; + } + } + + /** + * Get the node pool name if the node has been added to a node pool. + * @param datanodeDetails - datanode ID. + * @return node pool name if it has been assigned. + * null if the node has not been assigned to any node pool yet. + * TODO: Put this in a in-memory map if performance is an issue. + */ + @Override + public String getNodePool(final DatanodeDetails datanodeDetails) + throws SCMException { + Preconditions.checkNotNull(datanodeDetails, "node is null"); + try { + byte[] result = nodePoolStore.get( + datanodeDetails.getProtoBufMessage().toByteArray()); + return result == null ? null : DFSUtil.bytes2String(result); + } catch (IOException e) { + throw new SCMException("Failed to get node pool for node " + + datanodeDetails.toString(), e, + SCMException.ResultCodes.IO_EXCEPTION); + } + } + + /** + * Close node pool level db store. + * @throws IOException + */ + @Override + public void close() throws IOException { + nodePoolStore.close(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 80b5d6e..8c59462 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.container; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodePoolManager; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -272,6 +273,11 @@ public class MockNodeManager implements NodeManager { return new SCMNodeMetric(nodeMetricMap.get(datanodeDetails.getUuid())); } + @Override + public NodePoolManager getNodePoolManager() { + return Mockito.mock(NodePoolManager.class); + } + /** * Used for testing. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodePoolManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodePoolManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodePoolManager.java new file mode 100644 index 0000000..8f412de --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodePoolManager.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.node; + +import org.apache.commons.collections.ListUtils; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.placement.algorithms + .ContainerPlacementPolicy; +import org.apache.hadoop.hdds.scm.container.placement.algorithms + .SCMContainerPlacementCapacity; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.test.PathUtils; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test for SCM node pool manager. + */ +public class TestSCMNodePoolManager { + private static final Logger LOG = + LoggerFactory.getLogger(TestSCMNodePoolManager.class); + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + private final File testDir = PathUtils.getTestDir( + TestSCMNodePoolManager.class); + + SCMNodePoolManager createNodePoolManager(OzoneConfiguration conf) + throws IOException { + conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, + testDir.getAbsolutePath()); + conf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY, + SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class); + return new SCMNodePoolManager(conf); + } + + /** + * Test default node pool. + * + * @throws IOException + */ + @Test + public void testDefaultNodePool() throws IOException { + OzoneConfiguration conf = new OzoneConfiguration(); + try { + final String defaultPool = "DefaultPool"; + NodePoolManager npMgr = createNodePoolManager(conf); + + final int nodeCount = 4; + final List<DatanodeDetails> nodes = TestUtils + .getListOfDatanodeDetails(nodeCount); + assertEquals(0, npMgr.getNodePools().size()); + for (DatanodeDetails node: nodes) { + npMgr.addNode(defaultPool, node); + } + List<DatanodeDetails> nodesRetrieved = npMgr.getNodes(defaultPool); + assertEquals(nodeCount, nodesRetrieved.size()); + assertTwoDatanodeListsEqual(nodes, nodesRetrieved); + + DatanodeDetails nodeRemoved = nodes.remove(2); + npMgr.removeNode(defaultPool, nodeRemoved); + List<DatanodeDetails> nodesAfterRemove = npMgr.getNodes(defaultPool); + assertTwoDatanodeListsEqual(nodes, nodesAfterRemove); + + List<DatanodeDetails> nonExistSet = npMgr.getNodes("NonExistSet"); + assertEquals(0, nonExistSet.size()); + } finally { + FileUtil.fullyDelete(testDir); + } + } + + + /** + * Test default node pool reload. + * + * @throws IOException + */ + @Test + public void testDefaultNodePoolReload() throws IOException { + OzoneConfiguration conf = new OzoneConfiguration(); + final String defaultPool = "DefaultPool"; + final int nodeCount = 4; + final List<DatanodeDetails> nodes = TestUtils + .getListOfDatanodeDetails(nodeCount); + + try { + try { + SCMNodePoolManager npMgr = createNodePoolManager(conf); + assertEquals(0, npMgr.getNodePools().size()); + for (DatanodeDetails node : nodes) { + npMgr.addNode(defaultPool, node); + } + List<DatanodeDetails> nodesRetrieved = npMgr.getNodes(defaultPool); + assertEquals(nodeCount, nodesRetrieved.size()); + assertTwoDatanodeListsEqual(nodes, nodesRetrieved); + npMgr.close(); + } finally { + LOG.info("testDefaultNodePoolReload: Finish adding nodes to pool" + + " and close."); + } + + // try reload with a new NodePoolManager instance + try { + SCMNodePoolManager npMgr = createNodePoolManager(conf); + List<DatanodeDetails> nodesRetrieved = npMgr.getNodes(defaultPool); + assertEquals(nodeCount, nodesRetrieved.size()); + assertTwoDatanodeListsEqual(nodes, nodesRetrieved); + } finally { + LOG.info("testDefaultNodePoolReload: Finish reloading node pool."); + } + } finally { + FileUtil.fullyDelete(testDir); + } + } + + /** + * Compare and verify that two datanode lists are equal. + * @param list1 - datanode list 1. + * @param list2 - datanode list 2. + */ + private void assertTwoDatanodeListsEqual(List<DatanodeDetails> list1, + List<DatanodeDetails> list2) { + assertEquals(list1.size(), list2.size()); + Collections.sort(list1); + Collections.sort(list2); + assertTrue(ListUtils.isEqualList(list1, list2)); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index 1a4dcd7..072d821 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat; import org.apache.hadoop.hdds.scm.node.CommandQueue; import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodePoolManager; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; @@ -200,6 +201,10 @@ public class ReplicationNodeManagerMock implements NodeManager { return null; } + @Override + public NodePoolManager getNodePoolManager() { + return Mockito.mock(NodePoolManager.class); + } /** * Wait for the heartbeat is processed by NodeManager. http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodePoolManagerMock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodePoolManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodePoolManagerMock.java new file mode 100644 index 0000000..ffcd752 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodePoolManagerMock.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.container.testutils; + +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.NodePoolManager; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Pool Manager replication mock. + */ +public class ReplicationNodePoolManagerMock implements NodePoolManager { + + private final Map<DatanodeDetails, String> nodeMemberShip; + + /** + * A node pool manager for testing. + */ + public ReplicationNodePoolManagerMock() { + nodeMemberShip = new HashMap<>(); + } + + /** + * Add a node to a node pool. + * + * @param pool - name of the node pool. + * @param node - data node. + */ + @Override + public void addNode(String pool, DatanodeDetails node) { + nodeMemberShip.put(node, pool); + } + + /** + * Remove a node from a node pool. + * + * @param pool - name of the node pool. + * @param node - data node. + * @throws SCMException + */ + @Override + public void removeNode(String pool, DatanodeDetails node) + throws SCMException { + nodeMemberShip.remove(node); + + } + + /** + * Get a list of known node pools. + * + * @return a list of known node pool names or an empty list if not node pool + * is defined. + */ + @Override + public List<String> getNodePools() { + Set<String> poolSet = new HashSet<>(); + for (Map.Entry<DatanodeDetails, String> entry : nodeMemberShip.entrySet()) { + poolSet.add(entry.getValue()); + } + return new ArrayList<>(poolSet); + + } + + /** + * Get all nodes of a node pool given the name of the node pool. + * + * @param pool - name of the node pool. + * @return a list of datanode ids or an empty list if the node pool was not + * found. + */ + @Override + public List<DatanodeDetails> getNodes(String pool) { + Set<DatanodeDetails> datanodeSet = new HashSet<>(); + for (Map.Entry<DatanodeDetails, String> entry : nodeMemberShip.entrySet()) { + if (entry.getValue().equals(pool)) { + datanodeSet.add(entry.getKey()); + } + } + return new ArrayList<>(datanodeSet); + } + + /** + * Get the node pool name if the node has been added to a node pool. + * + * @param datanodeDetails DatanodeDetails. + * @return node pool name if it has been assigned. null if the node has not + * been assigned to any node pool yet. + */ + @Override + public String getNodePool(DatanodeDetails datanodeDetails) { + return nodeMemberShip.get(datanodeDetails); + } + + /** + * Closes this stream and releases any system resources associated + * with it. If the stream is already closed then invoking this + * method has no effect. + * <p> + * <p> As noted in {@link AutoCloseable#close()}, cases where the + * close may fail require careful attention. It is strongly advised + * to relinquish the underlying resources and to internally + * <em>mark</em> the {@code Closeable} as closed, prior to throwing + * the {@code IOException}. + * + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/0d6fe5f3/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java index b4ed2b1..4d70af8 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSQLCli.java @@ -51,9 +51,12 @@ import java.util.Collection; import java.util.HashMap; import java.util.UUID; +import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB; import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB; import static org.apache.hadoop.ozone.OzoneConsts.KB; +import static org.apache.hadoop.ozone.OzoneConsts.NODEPOOL_DB; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * This class tests the CLI that transforms container into SQLite DB files. @@ -174,6 +177,34 @@ public class TestContainerSQLCli { } @Test + public void testConvertNodepoolDB() throws Exception { + String dbOutPath = GenericTestUtils.getTempPath( + UUID.randomUUID() + "/out_sql.db"); + String dbRootPath = conf.get(OzoneConfigKeys.OZONE_METADATA_DIRS); + String dbPath = dbRootPath + "/" + NODEPOOL_DB; + String[] args = {"-p", dbPath, "-o", dbOutPath}; + + cli.run(args); + + // verify the sqlite db + HashMap<String, String> expectedPool = new HashMap<>(); + for (DatanodeDetails dnid : nodeManager.getAllNodes()) { + expectedPool.put(dnid.getUuidString(), "DefaultNodePool"); + } + Connection conn = connectDB(dbOutPath); + String sql = "SELECT * FROM nodePool"; + ResultSet rs = executeQuery(conn, sql); + while(rs.next()) { + String datanodeUUID = rs.getString("datanodeUUID"); + String poolName = rs.getString("poolName"); + assertTrue(expectedPool.remove(datanodeUUID).equals(poolName)); + } + assertEquals(0, expectedPool.size()); + + Files.delete(Paths.get(dbOutPath)); + } + + @Test public void testConvertContainerDB() throws Exception { String dbOutPath = GenericTestUtils.getTempPath( UUID.randomUUID() + "/out_sql.db"); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
