This is an automated email from the ASF dual-hosted git repository. sodonnell pushed a commit to branch HDDS-1880-Decom in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/HDDS-1880-Decom by this push: new 42d53bc HDDS-4323. Add integration tests for putting nodes into maintenance and fix any issues uncovered in the tests (#1501) 42d53bc is described below commit 42d53bce288db8d617fe4d3c82eb4c0f35fba135 Author: Stephen O'Donnell <stephen.odonn...@gmail.com> AuthorDate: Wed Oct 21 15:04:22 2020 +0100 HDDS-4323. Add integration tests for putting nodes into maintenance and fix any issues uncovered in the tests (#1501) --- .../hadoop/hdds/scm/node/DatanodeAdminMonitor.java | 3 + .../hdds/scm/node/DatanodeAdminMonitorImpl.java | 17 +- .../hdds/scm/node/NodeDecommissionManager.java | 12 +- .../hdds/scm/server/StorageContainerManager.java | 7 + .../scm/node/TestDecommissionAndMaintenance.java | 300 ++++++++++++++++++++- 5 files changed, 319 insertions(+), 20 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitor.java index 903b816..3466547 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitor.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitor.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.scm.node; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import java.util.Set; + /** * Interface used by the DatanodeAdminMonitor, which can be used to * decommission or recommission nodes and take them in and out of maintenance. @@ -27,5 +29,6 @@ public interface DatanodeAdminMonitor extends Runnable { void startMonitoring(DatanodeDetails dn); void stopMonitoring(DatanodeDetails dn); + Set<DatanodeDetails> getTrackedNodes(); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java index ee90cba..247a307 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeAdminMonitorImpl.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hdds.scm.node; -import com.google.common.annotations.VisibleForTesting; - import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; @@ -108,6 +106,16 @@ public class DatanodeAdminMonitorImpl implements DatanodeAdminMonitor { } /** + * Get the set of nodes which are currently tracked in the decommissioned + * and maintenance workflow. + * @return An unmodifiable set of the tracked nodes. + */ + @Override + public synchronized Set<DatanodeDetails> getTrackedNodes() { + return Collections.unmodifiableSet(trackedNodes); + } + + /** * Run an iteration of the monitor. This is the main run loop, and performs * the following checks: * <p> @@ -148,11 +156,6 @@ public class DatanodeAdminMonitorImpl implements DatanodeAdminMonitor { return trackedNodes.size(); } - @VisibleForTesting - public Set<DatanodeDetails> getTrackedNodes() { - return trackedNodes; - } - private void processCancelledNodes() { while (!cancelledNodes.isEmpty()) { DatanodeDetails dn = cancelledNodes.poll(); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java index cc81473..30cae10 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeDecommissionManager.java @@ -244,7 +244,8 @@ public class NodeDecommissionManager { throws NodeNotFoundException { NodeOperationalState opState = getNodeStatus(dn).getOperationalState(); if (opState == NodeOperationalState.DECOMMISSIONING - || opState == NodeOperationalState.ENTERING_MAINTENANCE) { + || opState == NodeOperationalState.ENTERING_MAINTENANCE + || opState == NodeOperationalState.IN_MAINTENANCE) { monitor.startMonitoring(dn); } } @@ -351,6 +352,15 @@ public class NodeDecommissionManager { } } + /** + * Stops the decommission monitor from running when SCM is shutdown. + */ + public void stop() { + if (executor != null) { + executor.shutdown(); + } + } + private NodeStatus getNodeStatus(DatanodeDetails dn) throws NodeNotFoundException { return nodeManager.getNodeStatus(dn); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index e26d521..2e9f576 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -845,6 +845,13 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl } try { + LOG.info("Stopping the Datanode Admin Monitor."); + scmDecommissionManager.stop(); + } catch (Exception ex) { + LOG.error("The Datanode Admin Monitor failed to stop", ex); + } + + try { LOG.info("Stopping Lease Manager of the command watchers"); commandWatcherLeaseManager.shutdown(); } catch (Exception ex) { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java index 8f02878..42b4cd8d 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo; import org.apache.hadoop.hdds.scm.container.ContainerManager; import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException; import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.ContainerReplicaCount; import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration; import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; @@ -46,15 +47,18 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import static java.util.concurrent.TimeUnit.SECONDS; import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertTrue; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL; @@ -62,6 +66,7 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.DEAD; import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY; @@ -138,16 +143,11 @@ public class TestDecommissionAndMaintenance { throws Exception { // Generate some data on the empty cluster to create some containers generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS); + // Locate any container and find its open pipeline - final ContainerInfo container = - scmClient.listContainer(0, 1).get(0); + final ContainerInfo container = waitForAndReturnContainer(); Pipeline pipeline = pm.getPipeline(container.getPipelineID()); assertEquals(Pipeline.PipelineState.OPEN, pipeline.getPipelineState()); - - // Ensure all 3 replicas of the container have been reported via ICR - GenericTestUtils.waitFor( - () -> getContainerReplicas(container).size() == 3, - 200, 30000); Set<ContainerReplica> replicas = getContainerReplicas(container); final DatanodeDetails toDecommission = getOneDNHostingReplica(replicas); @@ -215,11 +215,7 @@ public class TestDecommissionAndMaintenance { // container. This ensures it will not decommission immediately due to // having no containers. generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS); - final ContainerInfo container = - scmClient.listContainer(0, 1).get(0); - GenericTestUtils.waitFor( - () -> getContainerReplicas(container).size() == 3, - 200, 30000); + final ContainerInfo container = waitForAndReturnContainer(); final DatanodeDetails dn = getOneDNHostingReplica(getContainerReplicas(container)); scmClient.decommissionNodes(Arrays.asList(getDNHostAndPort(dn))); @@ -246,6 +242,7 @@ public class TestDecommissionAndMaintenance { throws Exception { // Decommission node and wait for it to be DECOMMISSIONED generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS); + DatanodeDetails dn = nm.getAllNodes().get(0); scmClient.decommissionNodes(Arrays.asList(getDNHostAndPort(dn))); waitForDnToReachOpState(dn, DECOMMISSIONED); @@ -288,6 +285,270 @@ public class TestDecommissionAndMaintenance { waitForDnToReachPersistedOpState(dn, IN_SERVICE); } + @Test + // When putting a single node into maintenance, its pipelines should be closed + // but no new replicas should be create and the node should transition into + // maintenance + public void testSingleNodeWithOpenPipelineCanGotoMaintenance() + throws Exception { + // Generate some data on the empty cluster to create some containers + generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS); + + // Locate any container and find its open pipeline + final ContainerInfo container = waitForAndReturnContainer(); + Pipeline pipeline = pm.getPipeline(container.getPipelineID()); + assertEquals(Pipeline.PipelineState.OPEN, pipeline.getPipelineState()); + Set<ContainerReplica> replicas = getContainerReplicas(container); + + final DatanodeDetails dn = getOneDNHostingReplica(replicas); + scmClient.startMaintenanceNodes(Arrays.asList( + getDNHostAndPort(dn)), 0); + + waitForDnToReachOpState(dn, IN_MAINTENANCE); + waitForDnToReachPersistedOpState(dn, IN_MAINTENANCE); + + // Should still be 3 replicas online as no replication should happen for + // maintenance + Set<ContainerReplica> newReplicas = + cm.getContainerReplicas(container.containerID()); + assertEquals(3, newReplicas.size()); + + // Stop the maintenance DN + cluster.shutdownHddsDatanode(dn); + waitForDnToReachHealthState(dn, DEAD); + + // Now the maintenance node is dead, we should still have + // 3 replicas as we don't purge the replicas for a dead maintenance node + newReplicas = cm.getContainerReplicas(container.containerID()); + assertEquals(3, newReplicas.size()); + + // Restart the DN and it should keep the IN_MAINTENANCE state + cluster.restartHddsDatanode(dn, true); + DatanodeDetails newDN = nm.getNodeByUuid(dn.getUuid().toString()); + waitForDnToReachHealthState(newDN, HEALTHY); + waitForDnToReachPersistedOpState(newDN, IN_MAINTENANCE); + } + + @Test + // After a node enters maintenance and is stopped, it can be recommissioned in + // SCM. Then when it is restarted, it should go back to IN_SERVICE and have + // that persisted on the DN. + public void testStoppedMaintenanceNodeTakesScmStateOnRestart() + throws Exception { + // Put a node into maintenance and wait for it to complete + generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS); + DatanodeDetails dn = nm.getAllNodes().get(0); + scmClient.startMaintenanceNodes(Arrays.asList(getDNHostAndPort(dn)), 0); + waitForDnToReachOpState(dn, IN_MAINTENANCE); + waitForDnToReachPersistedOpState(dn, IN_MAINTENANCE); + + int dnIndex = cluster.getHddsDatanodeIndex(dn); + cluster.shutdownHddsDatanode(dnIndex); + waitForDnToReachHealthState(dn, DEAD); + + // Datanode is shutdown and dead. Now recommission it in SCM + scmClient.recommissionNodes(Arrays.asList(getDNHostAndPort(dn))); + + // Now restart it and ensure it remains IN_SERVICE + cluster.restartHddsDatanode(dnIndex, true); + DatanodeDetails newDn = nm.getNodeByUuid(dn.getUuid().toString()); + + // As this is not an initial registration since SCM was started, the DN + // should report its operational state and if it differs from what SCM + // has, then the SCM state should be used and the DN state updated. + waitForDnToReachHealthState(newDn, HEALTHY); + waitForDnToReachOpState(newDn, IN_SERVICE); + waitForDnToReachPersistedOpState(dn, IN_SERVICE); + } + + @Test + // By default a node can enter maintenance if there are two replicas left + // available when the maintenance nodes are stopped. Therefore putting all + // nodes hosting a replica to maintenance should cause new replicas to get + // created before the nodes can enter maintenance. When the maintenance nodes + // return, the excess replicas should be removed. + public void testContainerIsReplicatedWhenAllNodesGotoMaintenance() + throws Exception { + // Generate some data on the empty cluster to create some containers + generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS); + // Locate any container and find its open pipeline + final ContainerInfo container = waitForAndReturnContainer(); + Set<ContainerReplica> replicas = getContainerReplicas(container); + + List<DatanodeDetails> forMaintenance = new ArrayList<>(); + replicas.forEach(r ->forMaintenance.add(r.getDatanodeDetails())); + + scmClient.startMaintenanceNodes(forMaintenance.stream() + .map(d -> getDNHostAndPort(d)) + .collect(Collectors.toList()), 0); + + // Ensure all 3 DNs go to maintenance + for(DatanodeDetails dn : forMaintenance) { + waitForDnToReachPersistedOpState(dn, IN_MAINTENANCE); + } + + // There should now be 5 replicas of the container we are tracking + Set<ContainerReplica> newReplicas = + cm.getContainerReplicas(container.containerID()); + assertEquals(5, newReplicas.size()); + + scmClient.recommissionNodes(forMaintenance.stream() + .map(d -> getDNHostAndPort(d)) + .collect(Collectors.toList())); + + // Ensure all 3 DNs go to maintenance + for(DatanodeDetails dn : forMaintenance) { + waitForDnToReachOpState(dn, IN_SERVICE); + } + + GenericTestUtils.waitFor(() -> getContainerReplicas(container).size() == 3, + 200, 30000); + } + + @Test + // If SCM is restarted when a node is ENTERING_MAINTENANCE, then when the node + // re-registers, it should continue to enter maintenance. + public void testEnteringMaintenanceNodeCompletesAfterSCMRestart() + throws Exception { + // Stop Replication Manager to sure no containers are replicated + scm.getReplicationManager().stop(); + // Generate some data on the empty cluster to create some containers + generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS); + // Locate any container and find its open pipeline + final ContainerInfo container = waitForAndReturnContainer(); + Set<ContainerReplica> replicas = getContainerReplicas(container); + + List<DatanodeDetails> forMaintenance = new ArrayList<>(); + replicas.forEach(r ->forMaintenance.add(r.getDatanodeDetails())); + + scmClient.startMaintenanceNodes(forMaintenance.stream() + .map(d -> getDNHostAndPort(d)) + .collect(Collectors.toList()), 0); + + // Ensure all 3 DNs go to maintenance + for(DatanodeDetails dn : forMaintenance) { + waitForDnToReachOpState(dn, IN_MAINTENANCE); + } + + cluster.restartStorageContainerManager(true); + + List<DatanodeDetails> newDns = new ArrayList<>(); + for(DatanodeDetails dn : forMaintenance) { + newDns.add(nm.getNodeByUuid(dn.getUuid().toString())); + } + + // Ensure all 3 DNs go to maintenance + for(DatanodeDetails dn : forMaintenance) { + waitForDnToReachOpState(dn, IN_MAINTENANCE); + } + + // There should now be 5 replicas of the container we are tracking + Set<ContainerReplica> newReplicas = + cm.getContainerReplicas(container.containerID()); + assertEquals(5, newReplicas.size()); + } + + @Test + // For a node which is online the maintenance should end automatically when + // maintenance expires and the node should go back into service. + // If the node is dead when maintenance expires, its replicas will be purge + // and new replicas created. + public void testMaintenanceEndsAutomaticallyAtTimeout() + throws Exception { + // Generate some data on the empty cluster to create some containers + generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS); + ContainerInfo container = waitForAndReturnContainer(); + DatanodeDetails dn = + getOneDNHostingReplica(getContainerReplicas(container)); + + scmClient.startMaintenanceNodes(Arrays.asList(getDNHostAndPort(dn)), 0); + waitForDnToReachPersistedOpState(dn, IN_MAINTENANCE); + + long newEndTime = System.currentTimeMillis() / 1000 + 5; + // Update the maintenance end time via NM manually. As the current + // decommission interface only allows us to specify hours from now as the + // end time, that is not really suitable for a test like this. + nm.setNodeOperationalState(dn, IN_MAINTENANCE, newEndTime); + waitForDnToReachOpState(dn, IN_SERVICE); + waitForDnToReachPersistedOpState(dn, IN_SERVICE); + + // Put the node back into maintenance and then stop it and wait for it to + // go dead + scmClient.startMaintenanceNodes(Arrays.asList(getDNHostAndPort(dn)), 0); + waitForDnToReachPersistedOpState(dn, IN_MAINTENANCE); + cluster.shutdownHddsDatanode(dn); + waitForDnToReachHealthState(dn, DEAD); + + newEndTime = System.currentTimeMillis() / 1000 + 5; + nm.setNodeOperationalState(dn, IN_MAINTENANCE, newEndTime); + waitForDnToReachOpState(dn, IN_SERVICE); + // Ensure there are 3 replicas not including the dead node, indicating a new + // replica was created + GenericTestUtils.waitFor(() -> getContainerReplicas(container) + .stream() + .filter(r -> !r.getDatanodeDetails().equals(dn)) + .count() == 3, + 200, 30000); + } + + @Test + // If is SCM is Restarted when a maintenance node is dead, then we lose all + // the replicas associated with it, as the dead node cannot report them back + // in. If that happens, SCM has no choice except to replicate the containers. + public void testSCMHandlesRestartForMaintenanceNode() + throws Exception { + // Generate some data on the empty cluster to create some containers + generateData(20, "key", ReplicationFactor.THREE, ReplicationType.RATIS); + ContainerInfo container = waitForAndReturnContainer(); + DatanodeDetails dn = + getOneDNHostingReplica(getContainerReplicas(container)); + + scmClient.startMaintenanceNodes(Arrays.asList(getDNHostAndPort(dn)), 0); + waitForDnToReachPersistedOpState(dn, IN_MAINTENANCE); + + cluster.restartStorageContainerManager(true); + setManagers(); + + // Ensure there are 3 replicas with one in maintenance indicating no new + // replicas were created + final ContainerInfo newContainer = cm.getContainer(container.containerID()); + GenericTestUtils.waitFor(() -> + getContainerReplicas(newContainer).size() == 3, 200, 30000); + + ContainerReplicaCount counts = + scm.getReplicationManager().getContainerReplicaCount(newContainer); + assertEquals(1, counts.getMaintenanceCount()); + assertTrue(counts.isSufficientlyReplicated()); + + // The node should be added back to the decommission monitor to ensure + // maintenance end time is correctly tracked. + GenericTestUtils.waitFor(() -> scm.getScmDecommissionManager().getMonitor() + .getTrackedNodes().size() == 1, 200, 30000); + + // Now let the node go dead and repeat the test. This time ensure a new + // replica is created. + cluster.shutdownHddsDatanode(dn); + waitForDnToReachHealthState(dn, DEAD); + + cluster.restartStorageContainerManager(false); + setManagers(); + + GenericTestUtils.waitFor(() + -> nm.getNodeCount(IN_SERVICE, null) == 5, 200, 30000); + + // Ensure there are 3 replicas not including the dead node, indicating a new + // replica was created + final ContainerInfo nextContainer + = cm.getContainer(container.containerID()); + GenericTestUtils.waitFor(() -> + getContainerReplicas(nextContainer).size() == 3, 200, 30000); + // There should be no IN_MAINTENANCE node: + assertEquals(0, nm.getNodeCount(IN_MAINTENANCE, null)); + counts = scm.getReplicationManager().getContainerReplicaCount(newContainer); + assertEquals(0, counts.getMaintenanceCount()); + assertTrue(counts.isSufficientlyReplicated()); + } + /** * Sets the instance variables to the values for the current MiniCluster. */ @@ -416,4 +677,19 @@ public class TestDecommissionAndMaintenance { 200, 30000); } + /** + * Get any container present in the cluster and wait to ensure 3 replicas + * have been reported before returning the container. + * @return A single container present on the cluster + * @throws Exception + */ + private ContainerInfo waitForAndReturnContainer() throws Exception { + final ContainerInfo container = cm.getContainers().get(0); + // Ensure all 3 replicas of the container have been reported via ICR + GenericTestUtils.waitFor( + () -> getContainerReplicas(container).size() == 3, + 200, 30000); + return container; + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org