HDDS-199. Implement ReplicationManager to handle underreplication of closed containers. Contributed by Elek Marton.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3a9e25ed Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3a9e25ed Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3a9e25ed Branch: refs/heads/HADOOP-15461 Commit: 3a9e25edf53187f16ec9f9f6075e850b74b3b91f Parents: 84d7bf1 Author: Xiaoyu Yao <x...@apache.org> Authored: Mon Jul 23 10:13:53 2018 -0700 Committer: Xiaoyu Yao <x...@apache.org> Committed: Mon Jul 23 10:28:33 2018 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hdds/scm/ScmConfigKeys.java | 7 + .../apache/hadoop/ozone/OzoneConfigKeys.java | 1 + .../common/src/main/resources/ozone-default.xml | 10 + .../container/replication/ReplicationQueue.java | 76 ------ .../replication/ReplicationRequest.java | 106 -------- .../container/replication/package-info.java | 23 -- .../replication/TestReplicationQueue.java | 134 ---------- .../container/replication/package-info.java | 23 -- .../hadoop/hdds/server/events/EventWatcher.java | 4 +- .../hadoop/hdds/server/events/TypedEvent.java | 5 + .../hdds/server/events/TestEventWatcher.java | 6 +- .../algorithms/ContainerPlacementPolicy.java | 5 +- .../placement/algorithms/SCMCommonPolicy.java | 8 +- .../SCMContainerPlacementCapacity.java | 16 +- .../algorithms/SCMContainerPlacementRandom.java | 7 +- .../replication/ReplicationCommandWatcher.java | 56 +++++ .../replication/ReplicationManager.java | 242 +++++++++++++++++++ .../container/replication/ReplicationQueue.java | 73 ++++++ .../replication/ReplicationRequest.java | 107 ++++++++ .../scm/container/replication/package-info.java | 23 ++ .../hadoop/hdds/scm/events/SCMEvents.java | 31 +++ .../scm/server/StorageContainerManager.java | 42 +++- .../TestSCMContainerPlacementCapacity.java | 106 ++++++++ .../TestSCMContainerPlacementRandom.java | 86 +++++++ .../replication/TestReplicationManager.java | 215 ++++++++++++++++ .../replication/TestReplicationQueue.java | 134 ++++++++++ .../scm/container/replication/package-info.java | 23 ++ .../placement/TestContainerPlacement.java | 5 +- 28 files changed, 1192 insertions(+), 382 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 6e940ad..e337d2f 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -251,6 +251,13 @@ public final class ScmConfigKeys { public static final String OZONE_SCM_CONTAINER_CLOSE_THRESHOLD = "ozone.scm.container.close.threshold"; public static final float OZONE_SCM_CONTAINER_CLOSE_THRESHOLD_DEFAULT = 0.9f; + + public static final String HDDS_SCM_WATCHER_TIMEOUT = + "hdds.scm.watcher.timeout"; + + public static final String HDDS_SCM_WATCHER_TIMEOUT_DEFAULT = + "10m"; + /** * Never constructed. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index 0273677..92f0c41 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.scm.ScmConfigKeys; + import org.apache.ratis.util.TimeDuration; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/common/src/main/resources/ozone-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 84a3e0c..6ddf3c6 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -1108,4 +1108,14 @@ </description> </property> + <property> + <name>hdds.scm.watcher.timeout</name> + <value>10m</value> + <tag>OZONE, SCM, MANAGEMENT</tag> + <description> + Timeout for the watchers of the HDDS SCM CommandWatchers. After this + duration the Copy/Delete container commands will be sent again to the + datanode unless the datanode confirms the completion. + </description> + </property> </configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java deleted file mode 100644 index e0a2351..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationQueue.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.replication; - -import java.util.List; -import java.util.PriorityQueue; -import java.util.Queue; - -/** - * Priority queue to handle under-replicated and over replicated containers - * in ozone. ReplicationManager will consume these messages and decide - * accordingly. - */ -public class ReplicationQueue { - - private final Queue<ReplicationRequest> queue; - - ReplicationQueue() { - queue = new PriorityQueue<>(); - } - - public synchronized boolean add(ReplicationRequest repObj) { - if (this.queue.contains(repObj)) { - // Remove the earlier message and insert this one - this.queue.remove(repObj); - } - return this.queue.add(repObj); - } - - public synchronized boolean remove(ReplicationRequest repObj) { - return queue.remove(repObj); - } - - /** - * Retrieves, but does not remove, the head of this queue, - * or returns {@code null} if this queue is empty. - * - * @return the head of this queue, or {@code null} if this queue is empty - */ - public synchronized ReplicationRequest peek() { - return queue.peek(); - } - - /** - * Retrieves and removes the head of this queue, - * or returns {@code null} if this queue is empty. - * - * @return the head of this queue, or {@code null} if this queue is empty - */ - public synchronized ReplicationRequest poll() { - return queue.poll(); - } - - public synchronized boolean removeAll(List<ReplicationRequest> repObjs) { - return queue.removeAll(repObjs); - } - - public int size() { - return queue.size(); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java deleted file mode 100644 index a6ccce1..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationRequest.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.replication; - -import java.io.Serializable; -import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.commons.lang3.builder.HashCodeBuilder; - -/** - * Wrapper class for hdds replication queue. Implements its natural - * ordering for priority queue. - */ -public class ReplicationRequest implements Comparable<ReplicationRequest>, - Serializable { - private final long containerId; - private final short replicationCount; - private final short expecReplicationCount; - private final long timestamp; - - public ReplicationRequest(long containerId, short replicationCount, - long timestamp, short expecReplicationCount) { - this.containerId = containerId; - this.replicationCount = replicationCount; - this.timestamp = timestamp; - this.expecReplicationCount = expecReplicationCount; - } - - /** - * Compares this object with the specified object for order. Returns a - * negative integer, zero, or a positive integer as this object is less - * than, equal to, or greater than the specified object. - * @param o the object to be compared. - * @return a negative integer, zero, or a positive integer as this object - * is less than, equal to, or greater than the specified object. - * @throws NullPointerException if the specified object is null - * @throws ClassCastException if the specified object's type prevents it - * from being compared to this object. - */ - @Override - public int compareTo(ReplicationRequest o) { - if (o == null) { - return 1; - } - if (this == o) { - return 0; - } - int retVal = Integer - .compare(getReplicationCount() - getExpecReplicationCount(), - o.getReplicationCount() - o.getExpecReplicationCount()); - if (retVal != 0) { - return retVal; - } - return Long.compare(getTimestamp(), o.getTimestamp()); - } - - @Override - public int hashCode() { - return new HashCodeBuilder(91, 1011) - .append(getContainerId()) - .toHashCode(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - ReplicationRequest that = (ReplicationRequest) o; - return new EqualsBuilder().append(getContainerId(), that.getContainerId()) - .isEquals(); - } - - public long getContainerId() { - return containerId; - } - - public short getReplicationCount() { - return replicationCount; - } - - public long getTimestamp() { - return timestamp; - } - - public short getExpecReplicationCount() { - return expecReplicationCount; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java deleted file mode 100644 index 7f335e3..0000000 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.replication; - -/** - * Ozone Container replicaton related classes. - */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java deleted file mode 100644 index 6d74c68..0000000 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationQueue.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.ozone.container.replication; - -import java.util.Random; -import java.util.UUID; -import org.apache.hadoop.util.Time; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -/** - * Test class for ReplicationQueue. - */ -public class TestReplicationQueue { - - private ReplicationQueue replicationQueue; - private Random random; - - @Before - public void setUp() { - replicationQueue = new ReplicationQueue(); - random = new Random(); - } - - @Test - public void testDuplicateAddOp() { - long contId = random.nextLong(); - String nodeId = UUID.randomUUID().toString(); - ReplicationRequest obj1, obj2, obj3; - long time = Time.monotonicNow(); - obj1 = new ReplicationRequest(contId, (short) 2, time, (short) 3); - obj2 = new ReplicationRequest(contId, (short) 2, time + 1, (short) 3); - obj3 = new ReplicationRequest(contId, (short) 1, time+2, (short) 3); - - replicationQueue.add(obj1); - replicationQueue.add(obj2); - replicationQueue.add(obj3); - Assert.assertEquals("Should add only 1 msg as second one is duplicate", - 1, replicationQueue.size()); - ReplicationRequest temp = replicationQueue.poll(); - Assert.assertEquals(temp, obj3); - } - - @Test - public void testPollOp() { - long contId = random.nextLong(); - String nodeId = UUID.randomUUID().toString(); - ReplicationRequest msg1, msg2, msg3, msg4, msg5; - msg1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(), - (short) 3); - long time = Time.monotonicNow(); - msg2 = new ReplicationRequest(contId + 1, (short) 4, time, (short) 3); - msg3 = new ReplicationRequest(contId + 2, (short) 0, time, (short) 3); - msg4 = new ReplicationRequest(contId, (short) 2, time, (short) 3); - // Replication message for same container but different nodeId - msg5 = new ReplicationRequest(contId + 1, (short) 2, time, (short) 3); - - replicationQueue.add(msg1); - replicationQueue.add(msg2); - replicationQueue.add(msg3); - replicationQueue.add(msg4); - replicationQueue.add(msg5); - Assert.assertEquals("Should have 3 objects", - 3, replicationQueue.size()); - - // Since Priority queue orders messages according to replication count, - // message with lowest replication should be first - ReplicationRequest temp; - temp = replicationQueue.poll(); - Assert.assertEquals("Should have 2 objects", - 2, replicationQueue.size()); - Assert.assertEquals(temp, msg3); - - temp = replicationQueue.poll(); - Assert.assertEquals("Should have 1 objects", - 1, replicationQueue.size()); - Assert.assertEquals(temp, msg5); - - // Message 2 should be ordered before message 5 as both have same replication - // number but message 2 has earlier timestamp. - temp = replicationQueue.poll(); - Assert.assertEquals("Should have 0 objects", - replicationQueue.size(), 0); - Assert.assertEquals(temp, msg4); - } - - @Test - public void testRemoveOp() { - long contId = random.nextLong(); - String nodeId = UUID.randomUUID().toString(); - ReplicationRequest obj1, obj2, obj3; - obj1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(), - (short) 3); - obj2 = new ReplicationRequest(contId + 1, (short) 2, Time.monotonicNow(), - (short) 3); - obj3 = new ReplicationRequest(contId + 2, (short) 3, Time.monotonicNow(), - (short) 3); - - replicationQueue.add(obj1); - replicationQueue.add(obj2); - replicationQueue.add(obj3); - Assert.assertEquals("Should have 3 objects", - 3, replicationQueue.size()); - - replicationQueue.remove(obj3); - Assert.assertEquals("Should have 2 objects", - 2, replicationQueue.size()); - - replicationQueue.remove(obj2); - Assert.assertEquals("Should have 1 objects", - 1, replicationQueue.size()); - - replicationQueue.remove(obj1); - Assert.assertEquals("Should have 0 objects", - 0, replicationQueue.size()); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java deleted file mode 100644 index 5b1fd0f..0000000 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -/** - * SCM Testing and Mocking Utils. - */ -package org.apache.hadoop.ozone.container.replication; -// Test classes for Replication functionality. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java index 473c152..38386d4 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java @@ -180,9 +180,9 @@ public abstract class EventWatcher<TIMEOUT_PAYLOAD extends } - abstract void onTimeout(EventPublisher publisher, TIMEOUT_PAYLOAD payload); + protected abstract void onTimeout(EventPublisher publisher, TIMEOUT_PAYLOAD payload); - abstract void onFinished(EventPublisher publisher, TIMEOUT_PAYLOAD payload); + protected abstract void onFinished(EventPublisher publisher, TIMEOUT_PAYLOAD payload); public List<TIMEOUT_PAYLOAD> getTimeoutEvents( Predicate<? super TIMEOUT_PAYLOAD> predicate) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java index c2159ad..62e2419 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/TypedEvent.java @@ -48,4 +48,9 @@ public class TypedEvent<T> implements Event<T> { return name; } + @Override + public String toString() { + return "TypedEvent{" + "payloadType=" + payloadType + ", name='" + name + + '\'' + '}'; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java index 8f18478..786b7b8 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java @@ -216,12 +216,12 @@ public class TestEventWatcher { } @Override - void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) { + protected void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) { publisher.fireEvent(UNDER_REPLICATED, payload); } @Override - void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) { + protected void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) { //Good job. We did it. } @@ -231,8 +231,6 @@ public class TestEventWatcher { } } - ; - private static class ReplicationCompletedEvent implements IdentifiableEventPayload { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java index 5d91ac5..3336c8e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java @@ -31,11 +31,14 @@ public interface ContainerPlacementPolicy { /** * Given the replication factor and size required, return set of datanodes * that satisfy the nodes and size requirement. + * + * @param excludedNodes - list of nodes to be excluded. * @param nodesRequired - number of datanodes required. * @param sizeRequired - size required for the container or block. * @return list of datanodes chosen. * @throws IOException */ - List<DatanodeDetails> chooseDatanodes(int nodesRequired, long sizeRequired) + List<DatanodeDetails> chooseDatanodes(List<DatanodeDetails> excludedNodes, + int nodesRequired, long sizeRequired) throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java index 0a595d5..ba241dc 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java @@ -95,16 +95,20 @@ public abstract class SCMCommonPolicy implements ContainerPlacementPolicy { * 3. if a set of containers are requested, we either meet the required * number of nodes or we fail that request. * + * + * @param excludedNodes - datanodes with existing replicas * @param nodesRequired - number of datanodes required. * @param sizeRequired - size required for the container or block. * @return list of datanodes chosen. * @throws SCMException SCM exception. */ - public List<DatanodeDetails> chooseDatanodes(int nodesRequired, final long - sizeRequired) throws SCMException { + public List<DatanodeDetails> chooseDatanodes( + List<DatanodeDetails> excludedNodes, + int nodesRequired, final long sizeRequired) throws SCMException { List<DatanodeDetails> healthyNodes = nodeManager.getNodes(HddsProtos.NodeState.HEALTHY); + healthyNodes.removeAll(excludedNodes); String msg; if (healthyNodes.size() == 0) { msg = "No healthy node found to allocate container."; http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java index 85a6b54..8df8f6e 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java @@ -17,17 +17,18 @@ package org.apache.hadoop.hdds.scm.container.placement.algorithms; -import com.google.common.annotations.VisibleForTesting; +import java.util.List; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; + +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; - /** * Container placement policy that randomly choose datanodes with remaining * space to satisfy the size constraints. @@ -83,6 +84,8 @@ public final class SCMContainerPlacementCapacity extends SCMCommonPolicy { /** * Called by SCM to choose datanodes. * + * + * @param excludedNodes - list of the datanodes to exclude. * @param nodesRequired - number of datanodes required. * @param sizeRequired - size required for the container or block. * @return List of datanodes. @@ -90,9 +93,10 @@ public final class SCMContainerPlacementCapacity extends SCMCommonPolicy { */ @Override public List<DatanodeDetails> chooseDatanodes( - final int nodesRequired, final long sizeRequired) throws SCMException { + List<DatanodeDetails> excludedNodes, final int nodesRequired, + final long sizeRequired) throws SCMException { List<DatanodeDetails> healthyNodes = - super.chooseDatanodes(nodesRequired, sizeRequired); + super.chooseDatanodes(excludedNodes, nodesRequired, sizeRequired); if (healthyNodes.size() == nodesRequired) { return healthyNodes; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java index 9903c84..76702d5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java @@ -56,6 +56,8 @@ public final class SCMContainerPlacementRandom extends SCMCommonPolicy /** * Choose datanodes called by the SCM to choose the datanode. * + * + * @param excludedNodes - list of the datanodes to exclude. * @param nodesRequired - number of datanodes required. * @param sizeRequired - size required for the container or block. * @return List of Datanodes. @@ -63,9 +65,10 @@ public final class SCMContainerPlacementRandom extends SCMCommonPolicy */ @Override public List<DatanodeDetails> chooseDatanodes( - final int nodesRequired, final long sizeRequired) throws SCMException { + List<DatanodeDetails> excludedNodes, final int nodesRequired, + final long sizeRequired) throws SCMException { List<DatanodeDetails> healthyNodes = - super.chooseDatanodes(nodesRequired, sizeRequired); + super.chooseDatanodes(excludedNodes, nodesRequired, sizeRequired); if (healthyNodes.size() == nodesRequired) { return healthyNodes; http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationCommandWatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationCommandWatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationCommandWatcher.java new file mode 100644 index 0000000..03a81a7 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationCommandWatcher.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager + .ReplicationCompleted; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager + .ReplicationRequestToRepeat; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.server.events.Event; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.hdds.server.events.EventWatcher; +import org.apache.hadoop.ozone.lease.LeaseManager; + +/** + * Command watcher to track the replication commands. + */ +public class ReplicationCommandWatcher + extends + EventWatcher<ReplicationManager.ReplicationRequestToRepeat, + ReplicationManager.ReplicationCompleted> { + + public ReplicationCommandWatcher(Event<ReplicationRequestToRepeat> startEvent, + Event<ReplicationCompleted> completionEvent, + LeaseManager<Long> leaseManager) { + super(startEvent, completionEvent, leaseManager); + } + + @Override + protected void onTimeout(EventPublisher publisher, + ReplicationRequestToRepeat payload) { + //put back to the original queue + publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER, + payload.getRequest()); + } + + @Override + protected void onFinished(EventPublisher publisher, + ReplicationRequestToRepeat payload) { + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java new file mode 100644 index 0000000..5f78722 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ThreadFactory; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerStateManager; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.placement.algorithms + .ContainerPlacementPolicy; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.hdds.server.events.EventQueue; +import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload; +import org.apache.hadoop.ozone.lease.LeaseManager; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import static org.apache.hadoop.hdds.scm.events.SCMEvents + .TRACK_REPLICATE_COMMAND; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Replication Manager manages the replication of the closed container. + */ +public class ReplicationManager implements Runnable { + + private static final Logger LOG = + LoggerFactory.getLogger(ReplicationManager.class); + + private ReplicationQueue replicationQueue; + + private ContainerPlacementPolicy containerPlacement; + + private EventPublisher eventPublisher; + + private ReplicationCommandWatcher replicationCommandWatcher; + + private boolean running = true; + + private ContainerStateManager containerStateManager; + + public ReplicationManager(ContainerPlacementPolicy containerPlacement, + ContainerStateManager containerStateManager, EventQueue eventQueue, + LeaseManager<Long> commandWatcherLeaseManager) { + + this.containerPlacement = containerPlacement; + this.containerStateManager = containerStateManager; + this.eventPublisher = eventQueue; + + this.replicationCommandWatcher = + new ReplicationCommandWatcher(TRACK_REPLICATE_COMMAND, + SCMEvents.REPLICATION_COMPLETE, commandWatcherLeaseManager); + + this.replicationQueue = new ReplicationQueue(); + + eventQueue.addHandler(SCMEvents.REPLICATE_CONTAINER, + (replicationRequest, publisher) -> replicationQueue + .add(replicationRequest)); + + this.replicationCommandWatcher.start(eventQueue); + + } + + public void start() { + + ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Replication Manager").build(); + + threadFactory.newThread(this).start(); + } + + public void run() { + + while (running) { + ReplicationRequest request = null; + try { + //TODO: add throttling here + request = replicationQueue.take(); + + ContainerID containerID = new ContainerID(request.getContainerId()); + ContainerInfo containerInfo = + containerStateManager.getContainer(containerID); + + Preconditions.checkNotNull(containerInfo, + "No information about the container " + request.getContainerId()); + + Preconditions + .checkState(containerInfo.getState() == LifeCycleState.CLOSED, + "Container should be in closed state"); + + //check the current replication + List<DatanodeDetails> datanodesWithReplicas = + getCurrentReplicas(request); + + ReplicationRequest finalRequest = request; + + int inFlightReplications = replicationCommandWatcher.getTimeoutEvents( + e -> e.request.getContainerId() == finalRequest.getContainerId()) + .size(); + + int deficit = + request.getExpecReplicationCount() - datanodesWithReplicas.size() + - inFlightReplications; + + if (deficit > 0) { + + List<DatanodeDetails> selectedDatanodes = containerPlacement + .chooseDatanodes(datanodesWithReplicas, deficit, + containerInfo.getUsedBytes()); + + //send the command + for (DatanodeDetails datanode : selectedDatanodes) { + + ReplicateContainerCommand replicateCommand = + new ReplicateContainerCommand(containerID.getId(), + datanodesWithReplicas); + + eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, + new CommandForDatanode<>( + datanode.getUuid(), replicateCommand)); + + ReplicationRequestToRepeat timeoutEvent = + new ReplicationRequestToRepeat(replicateCommand.getId(), + request); + + eventPublisher.fireEvent(TRACK_REPLICATE_COMMAND, timeoutEvent); + + } + + } else if (deficit < 0) { + //TODO: too many replicas. Not handled yet. + } + + } catch (Exception e) { + LOG.error("Can't replicate container {}", request, e); + } + } + + } + + @VisibleForTesting + protected List<DatanodeDetails> getCurrentReplicas(ReplicationRequest request) + throws IOException { + //TODO: replication information is not yet available after HDDS-175, + // should be fixed after HDDS-228 + return new ArrayList<>(); + } + + @VisibleForTesting + public ReplicationQueue getReplicationQueue() { + return replicationQueue; + } + + public void stop() { + running = false; + } + + /** + * Event for the ReplicationCommandWatcher to repeate the embedded request + * in case fof timeout. + */ + public static class ReplicationRequestToRepeat + implements IdentifiableEventPayload { + + private final long commandId; + + private final ReplicationRequest request; + + public ReplicationRequestToRepeat(long commandId, + ReplicationRequest request) { + this.commandId = commandId; + this.request = request; + } + + public ReplicationRequest getRequest() { + return request; + } + + @Override + public long getId() { + return commandId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ReplicationRequestToRepeat that = (ReplicationRequestToRepeat) o; + return Objects.equals(request, that.request); + } + + @Override + public int hashCode() { + + return Objects.hash(request); + } + } + + public static class ReplicationCompleted implements IdentifiableEventPayload { + + private final long uuid; + + public ReplicationCompleted(long uuid) { + this.uuid = uuid; + } + + @Override + public long getId() { + return uuid; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java new file mode 100644 index 0000000..4ca67be --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationQueue.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.PriorityBlockingQueue; + +/** + * Priority queue to handle under-replicated and over replicated containers + * in ozone. ReplicationManager will consume these messages and decide + * accordingly. + */ +public class ReplicationQueue { + + private final BlockingQueue<ReplicationRequest> queue; + + public ReplicationQueue() { + queue = new PriorityBlockingQueue<>(); + } + + public boolean add(ReplicationRequest repObj) { + if (this.queue.contains(repObj)) { + // Remove the earlier message and insert this one + this.queue.remove(repObj); + } + return this.queue.add(repObj); + } + + public boolean remove(ReplicationRequest repObj) { + return queue.remove(repObj); + } + + /** + * Retrieves, but does not remove, the head of this queue, + * or returns {@code null} if this queue is empty. + * + * @return the head of this queue, or {@code null} if this queue is empty + */ + public ReplicationRequest peek() { + return queue.peek(); + } + + /** + * Retrieves and removes the head of this queue (blocking queue). + */ + public ReplicationRequest take() throws InterruptedException { + return queue.take(); + } + + public boolean removeAll(List<ReplicationRequest> repObjs) { + return queue.removeAll(repObjs); + } + + public int size() { + return queue.size(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java new file mode 100644 index 0000000..ef7c546 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationRequest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +import java.io.Serializable; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; + +/** + * Wrapper class for hdds replication queue. Implements its natural + * ordering for priority queue. + */ +public class ReplicationRequest implements Comparable<ReplicationRequest>, + Serializable { + private final long containerId; + private final short replicationCount; + private final short expecReplicationCount; + private final long timestamp; + + public ReplicationRequest(long containerId, short replicationCount, + long timestamp, short expecReplicationCount) { + this.containerId = containerId; + this.replicationCount = replicationCount; + this.timestamp = timestamp; + this.expecReplicationCount = expecReplicationCount; + } + + /** + * Compares this object with the specified object for order. Returns a + * negative integer, zero, or a positive integer as this object is less + * than, equal to, or greater than the specified object. + * @param o the object to be compared. + * @return a negative integer, zero, or a positive integer as this object + * is less than, equal to, or greater than the specified object. + * @throws NullPointerException if the specified object is null + * @throws ClassCastException if the specified object's type prevents it + * from being compared to this object. + */ + @Override + public int compareTo(ReplicationRequest o) { + if (o == null) { + return 1; + } + if (this == o) { + return 0; + } + int retVal = Integer + .compare(getReplicationCount() - getExpecReplicationCount(), + o.getReplicationCount() - o.getExpecReplicationCount()); + if (retVal != 0) { + return retVal; + } + return Long.compare(getTimestamp(), o.getTimestamp()); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(91, 1011) + .append(getContainerId()) + .toHashCode(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ReplicationRequest that = (ReplicationRequest) o; + return new EqualsBuilder().append(getContainerId(), that.getContainerId()) + .isEquals(); + } + + public long getContainerId() { + return containerId; + } + + public short getReplicationCount() { + return replicationCount; + } + + public long getTimestamp() { + return timestamp; + } + + public short getExpecReplicationCount() { + return expecReplicationCount; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java new file mode 100644 index 0000000..934b01e --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.replication; + +/** + * HDDS (Closed) Container replicaton related classes. + */ \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java index 46f1588..ad1702b 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java @@ -28,6 +28,10 @@ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .ContainerReportFromDatanode; import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher .NodeReportFromDatanode; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager + .ReplicationCompleted; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest; import org.apache.hadoop.hdds.server.events.Event; import org.apache.hadoop.hdds.server.events.TypedEvent; @@ -129,6 +133,33 @@ public final class SCMEvents { "DeleteBlockCommandStatus"); /** + * This is the command for ReplicationManager to handle under/over + * replication. Sent by the ContainerReportHandler after processing the + * heartbeat. + */ + public static final TypedEvent<ReplicationRequest> REPLICATE_CONTAINER = + new TypedEvent<>(ReplicationRequest.class); + + /** + * This event is sent by the ReplicaManager to the + * ReplicationCommandWatcher to track the in-progress replication. + */ + public static final TypedEvent<ReplicationManager.ReplicationRequestToRepeat> + TRACK_REPLICATE_COMMAND = + new TypedEvent<>(ReplicationManager.ReplicationRequestToRepeat.class); + /** + * This event comes from the Heartbeat dispatcher (in fact from the + * datanode) to notify the scm that the replication is done. This is + * received by the replicate command watcher to mark in-progress task as + * finished. + <p> + * TODO: Temporary event, should be replaced by specific Heartbeat + * ActionRequred event. + */ + public static final TypedEvent<ReplicationCompleted> REPLICATION_COMPLETE = + new TypedEvent<>(ReplicationCompleted.class); + + /** * Private Ctor. Never Constructed. */ private SCMEvents() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index aba6410..f4cd448 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.block.BlockManager; import org.apache.hadoop.hdds.scm.block.BlockManagerImpl; import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler; @@ -38,7 +39,12 @@ import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler; import org.apache.hadoop.hdds.scm.container.ContainerMapping; import org.apache.hadoop.hdds.scm.container.ContainerReportHandler; import org.apache.hadoop.hdds.scm.container.Mapping; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.placement.algorithms + .ContainerPlacementPolicy; +import org.apache.hadoop.hdds.scm.container.placement.algorithms + .SCMContainerPlacementCapacity; import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat; import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics; import org.apache.hadoop.hdds.scm.events.SCMEvents; @@ -61,9 +67,13 @@ import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.common.Storage.StorageState; import org.apache.hadoop.ozone.common.StorageInfo; +import org.apache.hadoop.ozone.lease.LeaseManager; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.StringUtils; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .HDDS_SCM_WATCHER_TIMEOUT_DEFAULT; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,6 +163,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl * Key = DatanodeUuid, value = ContainerStat. */ private Cache<String, ContainerStat> containerReportCache; + private final ReplicationManager replicationManager; + private final LeaseManager<Long> commandWatcherLeaseManager; /** * Creates a new StorageContainerManager. Configuration will be updated @@ -207,6 +219,20 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler); eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler); + long watcherTimeout = + conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT, + HDDS_SCM_WATCHER_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS); + + commandWatcherLeaseManager = new LeaseManager<>(watcherTimeout); + + //TODO: support configurable containerPlacement policy + ContainerPlacementPolicy containerPlacementPolicy = + new SCMContainerPlacementCapacity(scmNodeManager, conf); + + replicationManager = new ReplicationManager(containerPlacementPolicy, + scmContainerManager.getStateManager(), eventQueue, + commandWatcherLeaseManager); + scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys .OZONE_ADMINISTRATORS); scmUsername = UserGroupInformation.getCurrentUser().getUserName(); @@ -552,7 +578,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl httpServer.start(); scmBlockManager.start(); - + replicationManager.start(); setStartTime(); } @@ -562,6 +588,20 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl public void stop() { try { + LOG.info("Stopping Replication Manager Service."); + replicationManager.stop(); + } catch (Exception ex) { + LOG.error("Replication manager service stop failed.", ex); + } + + try { + LOG.info("Stopping Lease Manager of the command watchers"); + commandWatcherLeaseManager.shutdown(); + } catch (Exception ex) { + LOG.error("Lease Manager of the command watchers stop failed"); + } + + try { LOG.info("Stopping datanode service RPC server"); getDatanodeProtocolServer().stop(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java new file mode 100644 index 0000000..5966f2a --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container.placement.algorithms; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.NodeManager; + +import org.junit.Assert; +import org.junit.Test; +import static org.mockito.Matchers.anyObject; +import org.mockito.Mockito; +import static org.mockito.Mockito.when; + +public class TestSCMContainerPlacementCapacity { + @Test + public void chooseDatanodes() throws SCMException { + //given + Configuration conf = new OzoneConfiguration(); + + List<DatanodeDetails> datanodes = new ArrayList<>(); + for (int i = 0; i < 7; i++) { + datanodes.add(TestUtils.getDatanodeDetails()); + } + + NodeManager mockNodeManager = Mockito.mock(NodeManager.class); + when(mockNodeManager.getNodes(NodeState.HEALTHY)) + .thenReturn(new ArrayList<>(datanodes)); + + when(mockNodeManager.getNodeStat(anyObject())) + .thenReturn(new SCMNodeMetric(100l, 0L, 100L)); + when(mockNodeManager.getNodeStat(datanodes.get(2))) + .thenReturn(new SCMNodeMetric(100l, 90L, 10L)); + when(mockNodeManager.getNodeStat(datanodes.get(3))) + .thenReturn(new SCMNodeMetric(100l, 80L, 20L)); + when(mockNodeManager.getNodeStat(datanodes.get(4))) + .thenReturn(new SCMNodeMetric(100l, 70L, 30L)); + + SCMContainerPlacementCapacity scmContainerPlacementRandom = + new SCMContainerPlacementCapacity(mockNodeManager, conf); + + List<DatanodeDetails> existingNodes = new ArrayList<>(); + existingNodes.add(datanodes.get(0)); + existingNodes.add(datanodes.get(1)); + + Map<DatanodeDetails, Integer> selectedCount = new HashMap<>(); + for (DatanodeDetails datanode : datanodes) { + selectedCount.put(datanode, 0); + } + + for (int i = 0; i < 1000; i++) { + + //when + List<DatanodeDetails> datanodeDetails = + scmContainerPlacementRandom.chooseDatanodes(existingNodes, 1, 15); + + //then + Assert.assertEquals(1, datanodeDetails.size()); + DatanodeDetails datanode0Details = datanodeDetails.get(0); + + Assert.assertNotEquals( + "Datanode 0 should not been selected: excluded by parameter", + datanodes.get(0), datanode0Details); + Assert.assertNotEquals( + "Datanode 1 should not been selected: excluded by parameter", + datanodes.get(1), datanode0Details); + Assert.assertNotEquals( + "Datanode 2 should not been selected: not enough space there", + datanodes.get(2), datanode0Details); + + selectedCount + .put(datanode0Details, selectedCount.get(datanode0Details) + 1); + + } + + //datanode 4 has less space. Should be selected less times. + Assert.assertTrue(selectedCount.get(datanodes.get(3)) > selectedCount + .get(datanodes.get(6))); + Assert.assertTrue(selectedCount.get(datanodes.get(4)) > selectedCount + .get(datanodes.get(6))); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java new file mode 100644 index 0000000..430c181 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRandom.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container.placement.algorithms; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.node.NodeManager; + +import org.junit.Assert; +import org.junit.Test; +import static org.mockito.Matchers.anyObject; +import org.mockito.Mockito; +import static org.mockito.Mockito.when; + +public class TestSCMContainerPlacementRandom { + + @Test + public void chooseDatanodes() throws SCMException { + //given + Configuration conf = new OzoneConfiguration(); + + List<DatanodeDetails> datanodes = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + datanodes.add(TestUtils.getDatanodeDetails()); + } + + NodeManager mockNodeManager = Mockito.mock(NodeManager.class); + when(mockNodeManager.getNodes(NodeState.HEALTHY)) + .thenReturn(new ArrayList<>(datanodes)); + + when(mockNodeManager.getNodeStat(anyObject())) + .thenReturn(new SCMNodeMetric(100l, 0l, 100l)); + when(mockNodeManager.getNodeStat(datanodes.get(2))) + .thenReturn(new SCMNodeMetric(100l, 90l, 10l)); + + SCMContainerPlacementRandom scmContainerPlacementRandom = + new SCMContainerPlacementRandom(mockNodeManager, conf); + + List<DatanodeDetails> existingNodes = new ArrayList<>(); + existingNodes.add(datanodes.get(0)); + existingNodes.add(datanodes.get(1)); + + for (int i = 0; i < 100; i++) { + //when + List<DatanodeDetails> datanodeDetails = + scmContainerPlacementRandom.chooseDatanodes(existingNodes, 1, 15); + + //then + Assert.assertEquals(1, datanodeDetails.size()); + DatanodeDetails datanode0Details = datanodeDetails.get(0); + + Assert.assertNotEquals( + "Datanode 0 should not been selected: excluded by parameter", + datanodes.get(0), datanode0Details); + Assert.assertNotEquals( + "Datanode 1 should not been selected: excluded by parameter", + datanodes.get(1), datanode0Details); + Assert.assertNotEquals( + "Datanode 2 should not been selected: not enough space there", + datanodes.get(2), datanode0Details); + + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java new file mode 100644 index 0000000..e3e876b --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.UUID; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.ReplicateContainerCommandProto; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.container.ContainerStateManager; +import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.hdds.scm.container.placement.algorithms + .ContainerPlacementPolicy; +import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager + .ReplicationRequestToRepeat; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.server.events.EventQueue; +import org.apache.hadoop.ozone.lease.LeaseManager; +import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode; + +import com.google.common.base.Preconditions; +import static org.apache.hadoop.hdds.scm.events.SCMEvents + .TRACK_REPLICATE_COMMAND; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import static org.mockito.Matchers.anyObject; +import org.mockito.Mockito; +import static org.mockito.Mockito.when; + +/** + * Test behaviour of the TestReplication. + */ +public class TestReplicationManager { + + private EventQueue queue; + + private List<ReplicationRequestToRepeat> trackReplicationEvents; + + private List<CommandForDatanode<ReplicateContainerCommandProto>> copyEvents; + + private ContainerStateManager containerStateManager; + + private ContainerPlacementPolicy containerPlacementPolicy; + private List<DatanodeDetails> listOfDatanodeDetails; + + @Before + public void initReplicationManager() throws IOException { + + listOfDatanodeDetails = TestUtils.getListOfDatanodeDetails(5); + + containerPlacementPolicy = + (excludedNodes, nodesRequired, sizeRequired) -> listOfDatanodeDetails + .subList(2, 2 + nodesRequired); + + containerStateManager = Mockito.mock(ContainerStateManager.class); + + //container with 2 replicas + ContainerInfo containerInfo = new ContainerInfo.Builder() + .setState(LifeCycleState.CLOSED) + .build(); + + when(containerStateManager.getContainer(anyObject())) + .thenReturn(containerInfo); + + queue = new EventQueue(); + + trackReplicationEvents = new ArrayList<>(); + queue.addHandler(TRACK_REPLICATE_COMMAND, + (event, publisher) -> trackReplicationEvents.add(event)); + + copyEvents = new ArrayList<>(); + queue.addHandler(SCMEvents.DATANODE_COMMAND, + (event, publisher) -> copyEvents.add(event)); + + } + + @Test + public void testEventSending() throws InterruptedException, IOException { + + + //GIVEN + + LeaseManager<Long> leaseManager = new LeaseManager<>(100000L); + try { + leaseManager.start(); + + ReplicationManager replicationManager = + new ReplicationManager(containerPlacementPolicy, + containerStateManager, + queue, leaseManager) { + @Override + protected List<DatanodeDetails> getCurrentReplicas( + ReplicationRequest request) throws IOException { + return listOfDatanodeDetails.subList(0, 2); + } + }; + replicationManager.start(); + + //WHEN + + queue.fireEvent(SCMEvents.REPLICATE_CONTAINER, + new ReplicationRequest(1l, (short) 2, System.currentTimeMillis(), + (short) 3)); + + Thread.sleep(500L); + queue.processAll(1000L); + + //THEN + + Assert.assertEquals(1, trackReplicationEvents.size()); + Assert.assertEquals(1, copyEvents.size()); + } finally { + if (leaseManager != null) { + leaseManager.shutdown(); + } + } + } + + @Test + public void testCommandWatcher() throws InterruptedException, IOException { + + Logger.getRootLogger().setLevel(Level.DEBUG); + LeaseManager<Long> leaseManager = new LeaseManager<>(1000L); + + try { + leaseManager.start(); + + ReplicationManager replicationManager = + new ReplicationManager(containerPlacementPolicy, containerStateManager, + + + queue, leaseManager) { + @Override + protected List<DatanodeDetails> getCurrentReplicas( + ReplicationRequest request) throws IOException { + return listOfDatanodeDetails.subList(0, 2); + } + }; + replicationManager.start(); + + queue.fireEvent(SCMEvents.REPLICATE_CONTAINER, + new ReplicationRequest(1l, (short) 2, System.currentTimeMillis(), + (short) 3)); + + Thread.sleep(500L); + + queue.processAll(1000L); + + Assert.assertEquals(1, trackReplicationEvents.size()); + Assert.assertEquals(1, copyEvents.size()); + + Assert.assertEquals(trackReplicationEvents.get(0).getId(), + copyEvents.get(0).getCommand().getId()); + + //event is timed out + Thread.sleep(1500); + + queue.processAll(1000L); + + //original copy command + retry + Assert.assertEquals(2, trackReplicationEvents.size()); + Assert.assertEquals(2, copyEvents.size()); + + } finally { + if (leaseManager != null) { + leaseManager.shutdown(); + } + } + } + + public static Pipeline createPipeline(Iterable<DatanodeDetails> ids) + throws IOException { + Objects.requireNonNull(ids, "ids == null"); + final Iterator<DatanodeDetails> i = ids.iterator(); + Preconditions.checkArgument(i.hasNext()); + final DatanodeDetails leader = i.next(); + String pipelineName = "TEST-" + UUID.randomUUID().toString().substring(3); + final Pipeline pipeline = + new Pipeline(leader.getUuidString(), LifeCycleState.OPEN, + ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName); + pipeline.addMember(leader); + for (; i.hasNext(); ) { + pipeline.addMember(i.next()); + } + return pipeline; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationQueue.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationQueue.java new file mode 100644 index 0000000..a593718 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationQueue.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +import java.util.Random; +import java.util.UUID; +import org.apache.hadoop.util.Time; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Test class for ReplicationQueue. + */ +public class TestReplicationQueue { + + private ReplicationQueue replicationQueue; + private Random random; + + @Before + public void setUp() { + replicationQueue = new ReplicationQueue(); + random = new Random(); + } + + @Test + public void testDuplicateAddOp() throws InterruptedException { + long contId = random.nextLong(); + String nodeId = UUID.randomUUID().toString(); + ReplicationRequest obj1, obj2, obj3; + long time = Time.monotonicNow(); + obj1 = new ReplicationRequest(contId, (short) 2, time, (short) 3); + obj2 = new ReplicationRequest(contId, (short) 2, time + 1, (short) 3); + obj3 = new ReplicationRequest(contId, (short) 1, time+2, (short) 3); + + replicationQueue.add(obj1); + replicationQueue.add(obj2); + replicationQueue.add(obj3); + Assert.assertEquals("Should add only 1 msg as second one is duplicate", + 1, replicationQueue.size()); + ReplicationRequest temp = replicationQueue.take(); + Assert.assertEquals(temp, obj3); + } + + @Test + public void testPollOp() throws InterruptedException { + long contId = random.nextLong(); + String nodeId = UUID.randomUUID().toString(); + ReplicationRequest msg1, msg2, msg3, msg4, msg5; + msg1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(), + (short) 3); + long time = Time.monotonicNow(); + msg2 = new ReplicationRequest(contId + 1, (short) 4, time, (short) 3); + msg3 = new ReplicationRequest(contId + 2, (short) 0, time, (short) 3); + msg4 = new ReplicationRequest(contId, (short) 2, time, (short) 3); + // Replication message for same container but different nodeId + msg5 = new ReplicationRequest(contId + 1, (short) 2, time, (short) 3); + + replicationQueue.add(msg1); + replicationQueue.add(msg2); + replicationQueue.add(msg3); + replicationQueue.add(msg4); + replicationQueue.add(msg5); + Assert.assertEquals("Should have 3 objects", + 3, replicationQueue.size()); + + // Since Priority queue orders messages according to replication count, + // message with lowest replication should be first + ReplicationRequest temp; + temp = replicationQueue.take(); + Assert.assertEquals("Should have 2 objects", + 2, replicationQueue.size()); + Assert.assertEquals(temp, msg3); + + temp = replicationQueue.take(); + Assert.assertEquals("Should have 1 objects", + 1, replicationQueue.size()); + Assert.assertEquals(temp, msg5); + + // Message 2 should be ordered before message 5 as both have same replication + // number but message 2 has earlier timestamp. + temp = replicationQueue.take(); + Assert.assertEquals("Should have 0 objects", + replicationQueue.size(), 0); + Assert.assertEquals(temp, msg4); + } + + @Test + public void testRemoveOp() { + long contId = random.nextLong(); + String nodeId = UUID.randomUUID().toString(); + ReplicationRequest obj1, obj2, obj3; + obj1 = new ReplicationRequest(contId, (short) 1, Time.monotonicNow(), + (short) 3); + obj2 = new ReplicationRequest(contId + 1, (short) 2, Time.monotonicNow(), + (short) 3); + obj3 = new ReplicationRequest(contId + 2, (short) 3, Time.monotonicNow(), + (short) 3); + + replicationQueue.add(obj1); + replicationQueue.add(obj2); + replicationQueue.add(obj3); + Assert.assertEquals("Should have 3 objects", + 3, replicationQueue.size()); + + replicationQueue.remove(obj3); + Assert.assertEquals("Should have 2 objects", + 2, replicationQueue.size()); + + replicationQueue.remove(obj2); + Assert.assertEquals("Should have 1 objects", + 1, replicationQueue.size()); + + replicationQueue.remove(obj1); + Assert.assertEquals("Should have 0 objects", + 0, replicationQueue.size()); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java new file mode 100644 index 0000000..1423c99 --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +/** + * SCM Testing and Mocking Utils. + */ +package org.apache.hadoop.hdds.scm.container.replication; +// Test classes for Replication functionality. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a9e25ed/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java index 651b776..802f2ef 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/placement/TestContainerPlacement.java @@ -30,6 +30,7 @@ import org.apache.hadoop.ozone.OzoneConsts; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -86,11 +87,11 @@ public class TestContainerPlacement { for (int x = 0; x < opsCount; x++) { long containerSize = random.nextInt(100) * OzoneConsts.GB; List<DatanodeDetails> nodesCapacity = - capacityPlacer.chooseDatanodes(nodesRequired, containerSize); + capacityPlacer.chooseDatanodes(new ArrayList<>(), nodesRequired, containerSize); assertEquals(nodesRequired, nodesCapacity.size()); List<DatanodeDetails> nodesRandom = - randomPlacer.chooseDatanodes(nodesRequired, containerSize); + randomPlacer.chooseDatanodes(nodesCapacity, nodesRequired, containerSize); // One fifth of all calls are delete if (x % 5 == 0) { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org