This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new fb11477eeb HDDS-9321. LegacyReplicationManager: Save UNHEALTHY
replicas with highest BCSID for a QUASI_CLOSED container (#5366)
fb11477eeb is described below
commit fb11477eeb95f646a27341d798bb15b1d5b65fec
Author: Siddhant Sangwan <[email protected]>
AuthorDate: Thu Sep 28 15:09:13 2023 +0530
HDDS-9321. LegacyReplicationManager: Save UNHEALTHY replicas with highest
BCSID for a QUASI_CLOSED container (#5366)
---
.../replication/LegacyReplicationManager.java | 86 +++++++++
.../replication/RatisContainerReplicaCount.java | 83 +++++++++
.../replication/TestLegacyReplicationManager.java | 207 +++++++++++++++++++++
3 files changed, 376 insertions(+)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
index 3154b2e52f..ee3a442967 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.container.replication;
+import com.google.common.collect.ImmutableList;
import com.google.protobuf.Message;
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
@@ -533,6 +534,21 @@ public class LegacyReplicationManager {
return;
}
+ /*
+ * A QUASI_CLOSED container may have some UNHEALTHY replicas with the
+ * same Sequence ID as the container. RM should try to maintain one
+ * copy of such replicas when there are no healthy replicas that
+ * match the container's Sequence ID.
+ */
+ List<ContainerReplica> vulnerableUnhealthy =
+ replicaSet.getVulnerableUnhealthyReplicas(nodeManager);
+ if (!vulnerableUnhealthy.isEmpty()) {
+ report.incrementAndSample(HealthState.UNDER_REPLICATED,
+ container.containerID());
+ handleVulnerableUnhealthyReplicas(replicaSet, vulnerableUnhealthy);
+ return;
+ }
+
/*
* Check if the container is over replicated and take appropriate
* action.
@@ -563,6 +579,37 @@ public class LegacyReplicationManager {
}
}
+ /**
+ * Sends a replicate command for each replica specified in
+ * vulnerableUnhealthy.
+ * @param replicaCount RatisContainerReplicaCount for this container
+ * @param vulnerableUnhealthy List of UNHEALTHY replicas that need to be
+ * replicated
+ */
+ private void handleVulnerableUnhealthyReplicas(
+ RatisContainerReplicaCount replicaCount,
+ List<ContainerReplica> vulnerableUnhealthy) {
+ ContainerInfo container = replicaCount.getContainer();
+ LOG.debug("Handling vulnerable UNHEALTHY replicas {} for container {}.",
+ vulnerableUnhealthy, container);
+ int pendingAdds = getInflightAdd(container.containerID());
+ if (pendingAdds >= vulnerableUnhealthy.size()) {
+ LOG.debug("There are {} pending adds for container {}, while " +
+ "the number of UNHEALTHY replicas is {}.", pendingAdds,
+ container.containerID(), vulnerableUnhealthy.size());
+ return;
+ }
+
+ /*
+ Since we're replicating UNHEALTHY replicas, it's possible that
+ replication keeps on failing. Shuffling gives other replicas a chance to be
+ replicated since there's a limit on inflight adds.
+ */
+ Collections.shuffle(vulnerableUnhealthy);
+ replicateEachSource(container, vulnerableUnhealthy,
+ replicaCount.getReplicas());
+ }
+
private void updateCompletedReplicationMetrics(ContainerInfo container,
InflightAction action) {
metrics.incrReplicasCreatedTotal();
@@ -2383,6 +2430,45 @@ public class LegacyReplicationManager {
}
}
+ /**
+ * Replicates each of the ContainerReplica specified in sources to new
+ * Datanodes. Will not consider Datanodes hosting existing replicas and
+ * Datanodes pending adds as targets. Note that this method simply skips
+ * the replica if there's an exception.
+ * @param container Container whose replicas are specified as sources
+ * @param sources List containing replicas, each will be replicated
+ * @param allReplicas all existing replicas of this container
+ */
+ private void replicateEachSource(ContainerInfo container,
+ List<ContainerReplica> sources, List<ContainerReplica> allReplicas) {
+ final List<DatanodeDetails> excludeList = allReplicas.stream()
+ .map(ContainerReplica::getDatanodeDetails)
+ .collect(Collectors.toList());
+
+ for (ContainerReplica replica : sources) {
+ // also exclude any DNs pending to receive a replica of this container
+ final List<DatanodeDetails> replicationInFlight
+ = inflightReplication.getDatanodeDetails(container.containerID());
+ for (DatanodeDetails dn : replicationInFlight) {
+ if (!excludeList.contains(dn)) {
+ excludeList.add(dn);
+ }
+ }
+
+ try {
+ final List<DatanodeDetails> target =
+ ReplicationManagerUtil.getTargetDatanodes(containerPlacement,
+ 1, null, excludeList, currentContainerSize,
+ container);
+ sendReplicateCommand(container, target.iterator().next(),
+ ImmutableList.of(replica.getDatanodeDetails()));
+ } catch (SCMException e) {
+ LOG.warn("Exception while trying to replicate {} of container {}.",
+ replica, container, e);
+ }
+ }
+ }
+
private void closeEmptyContainer(ContainerInfo containerInfo) {
/*
* We should wait for sometime before moving the container to CLOSED state.
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java
index e8ccaebfbf..0c6052ef35 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java
@@ -24,18 +24,23 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import
org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.OverReplicatedHealthResult;
import
org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.UnderReplicatedHealthResult;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
import java.util.stream.Collectors;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
import static
org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.compareState;
/**
@@ -416,6 +421,84 @@ public class RatisContainerReplicaCount implements
ContainerReplicaCount {
return isSufficientlyReplicated();
}
+ /**
+ * QUASI_CLOSED containers that have a mix of healthy and UNHEALTHY
+ * replicas require special treatment. If the healthy replicas don't have
+ * the same BCSID as the container, but the UNHEALTHY ones do, then we need
+ * to save at least one copy of each such UNHEALTHY replica. This method
+ * finds such UNHEALTHY replicas.
+ *
+ * @param nodeManager an instance of NodeManager
+ * @return List of UNHEALTHY replicas with the greatest Sequence ID that
+ * need to be replicated to other nodes. Empty list if this container is not
+ * QUASI_CLOSED, doesn't have a mix of healthy and UNHEALTHY replicas, or
+ * if there are no replicas that need to be saved.
+ */
+ List<ContainerReplica> getVulnerableUnhealthyReplicas(
+ NodeManager nodeManager) {
+ if (container.getState() != HddsProtos.LifeCycleState.QUASI_CLOSED) {
+ // this method is only relevant for QUASI_CLOSED containers
+ return Collections.emptyList();
+ }
+
+ boolean foundHealthy = false;
+ List<ContainerReplica> unhealthyReplicas = new ArrayList<>();
+ for (ContainerReplica replica : replicas) {
+ if (replica.getState() != ContainerReplicaProto.State.UNHEALTHY) {
+ foundHealthy = true;
+ }
+
+ if (replica.getSequenceId() == container.getSequenceId()) {
+ if (replica.getState() == ContainerReplicaProto.State.UNHEALTHY) {
+ unhealthyReplicas.add(replica);
+ } else if (replica.getState() ==
+ ContainerReplicaProto.State.QUASI_CLOSED) {
+ // don't need to save UNHEALTHY replicas if there's a QUASI_CLOSED
+ // replica with the greatest Sequence ID.
+ return Collections.emptyList();
+ }
+ }
+ }
+ if (!foundHealthy) {
+ // this method is only relevant when there's a mix of healthy and
+ // unhealthy replicas
+ return Collections.emptyList();
+ }
+
+ unhealthyReplicas.removeIf(
+ replica -> {
+ try {
+ return !nodeManager.getNodeStatus(replica.getDatanodeDetails())
+ .isHealthy();
+ } catch (NodeNotFoundException e) {
+ return true;
+ }
+ });
+ /*
+ At this point, the list of unhealthyReplicas contains all UNHEALTHY
+ replicas with the greatest Sequence ID that are on healthy Datanodes.
+ Note that this also includes multiple copies of the same UNHEALTHY
+ replica, that is, replicas with the same Origin ID. We need to consider
+ the fact that replicas can be uniquely unhealthy. That is, 2 UNHEALTHY
+ replicas will difference Origin ID need not be exact copies of each other.
+
+ Replicas that don't have at least one instance (multiple instances of a
+ replica will have the same Origin ID) on an IN_SERVICE node are
+ vulnerable and need to be saved.
+ */
+ // TODO should we also consider pending deletes?
+ Set<UUID> originsOfInServiceReplicas = new HashSet<>();
+ for (ContainerReplica replica : unhealthyReplicas) {
+ if (replica.getDatanodeDetails().getPersistedOpState()
+ .equals(IN_SERVICE)) {
+ originsOfInServiceReplicas.add(replica.getOriginDatanodeId());
+ }
+ }
+ unhealthyReplicas.removeIf(replica -> originsOfInServiceReplicas.contains(
+ replica.getOriginDatanodeId()));
+ return unhealthyReplicas;
+ }
+
/**
* Return true if the container is sufficiently replicated. Decommissioning
* and Decommissioned containers are ignored in this check, assuming they
will
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
index 4919adb4bb..dfbbc13f0d 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
@@ -950,6 +950,207 @@ public class TestLegacyReplicationManager {
SCMCommandProto.Type.deleteContainerCommand));
}
+ /**
+ * Situation: QUASI_CLOSED container with 3 QUASI_CLOSED replicas
+ * whose Sequence ID is smaller than the container's. There are 2
+ * UNHEALTHY replicas with Sequence ID same as the container's. One of
+ * them is on a decommissioning node.
+ *
+ * Expectation: Replication command should be sent for the UNHEALTHY
+ * replica on the decommissioning node.
+ */
+ @Test
+ public void testQuasiClosedHavingUnhealthyReplicaWithGreatestBCSID()
+ throws IOException {
+ final ContainerInfo container =
getContainer(LifeCycleState.QUASI_CLOSED);
+ container.setUsedBytes(100);
+ final ContainerID id = container.containerID();
+ final UUID originNodeId = UUID.randomUUID();
+ long sequenceID = container.getSequenceId();
+ final ContainerReplica replica1 =
+ getReplicas(id, QUASI_CLOSED, sequenceID - 1, originNodeId,
+ randomDatanodeDetails());
+ final ContainerReplica replica2 = getReplicas(
+ id, QUASI_CLOSED, sequenceID - 1, originNodeId,
+ randomDatanodeDetails());
+ final ContainerReplica replica3 = getReplicas(
+ id, QUASI_CLOSED, sequenceID - 1, originNodeId,
+ randomDatanodeDetails());
+ DatanodeDetails decommissioning =
+ MockDatanodeDetails.randomDatanodeDetails();
+ decommissioning.setPersistedOpState(DECOMMISSIONING);
+ final ContainerReplica replica4 = getReplicas(
+ id, UNHEALTHY, sequenceID, decommissioning.getUuid(),
+ decommissioning);
+ final ContainerReplica replica5 = getReplicas(
+ id, UNHEALTHY, sequenceID, randomDatanodeDetails().getUuid(),
+ randomDatanodeDetails());
+
+ containerStateManager.addContainer(container.getProtobuf());
+ containerStateManager.updateContainerReplica(id, replica1);
+ containerStateManager.updateContainerReplica(id, replica2);
+ containerStateManager.updateContainerReplica(id, replica3);
+ containerStateManager.updateContainerReplica(id, replica4);
+ containerStateManager.updateContainerReplica(id, replica5);
+
+ replicationManager.processAll();
+ eventQueue.processAll(1000);
+
+ // 1 replicate command should have been sent
+ Assertions.assertEquals(1,
+ datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.replicateContainerCommand));
+ Assertions.assertEquals(1,
+ replicationManager.getContainerReport().getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+
+ // the following code asserts that replicate was sent for the UNHEALTHY
+ // replica on the decommissioning node
+ CommandForDatanode command =
+ datanodeCommandHandler.getReceivedCommands().iterator().next();
+ Assertions.assertEquals(SCMCommandProto.Type.replicateContainerCommand,
+ command.getCommand().getType());
+ ReplicateContainerCommand replicateCommand =
+ (ReplicateContainerCommand) command.getCommand();
+ Assertions.assertEquals(1, replicateCommand.getSourceDatanodes().size());
+ Assertions.assertEquals(replica4.getDatanodeDetails(),
+ replicateCommand.getSourceDatanodes().iterator().next());
+
+ Assertions.assertEquals(0,
+ datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.deleteContainerCommand));
+
+ // If we don't complete the pending add by the next iteration, it's
+ // expected that another replicate command is not sent.
+ replicationManager.processAll();
+ eventQueue.processAll(100);
+
+ // that 1 command is the one RM sent in the last iteration
+ Assertions.assertEquals(1,
+ datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.replicateContainerCommand));
+ Assertions.assertEquals(0,
+ datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.deleteContainerCommand));
+
+ // Now, we will complete the add. Expectation is that no new commands
+ // should be sent in the next iteration.
+ ContainerReplica newReplica =
+ getReplicas(container.containerID(), UNHEALTHY,
+ container.getSequenceId(), replica4.getOriginDatanodeId(),
+ MockDatanodeDetails.createDatanodeDetails(
+ command.getDatanodeId()));
+ containerStateManager.updateContainerReplica(container.containerID(),
+ newReplica);
+
+ replicationManager.processAll();
+ eventQueue.processAll(100);
+
+ // that 1 command is the one RM sent in the last iteration
+ Assertions.assertEquals(1,
+ datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.replicateContainerCommand));
+ Assertions.assertEquals(0,
+ datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.deleteContainerCommand));
+ }
+
+ /**
+ * Situation: QUASI_CLOSED container with 2 QUASI_CLOSED replicas
+ * whose Sequence ID is smaller than the container's. There's 1
+ * UNHEALTHY replica with Sequence ID same as the container's and on a
+ * decommissioning node.
+ * <p>
+ * Expectation: First, one of the QUASI_CLOSED should be replicated to
+ * get 3 of them. In the next iteration, the UNHEALTHY replica should be
+ * replicated. This also verifies that HDDS-9321 does not introduce a
+ * regression in the under replication flow.
+ */
+ @Test
+ public void testUnderRepQuasiClosedHavingUnhealthyWithGreatestBCSID()
+ throws IOException {
+ final ContainerInfo container =
getContainer(LifeCycleState.QUASI_CLOSED);
+ container.setUsedBytes(100);
+ final ContainerID id = container.containerID();
+ final UUID originNodeId = UUID.randomUUID();
+ long sequenceID = container.getSequenceId();
+ final ContainerReplica replica1 =
+ getReplicas(id, QUASI_CLOSED, sequenceID - 1, originNodeId,
+ randomDatanodeDetails());
+ final ContainerReplica replica2 = getReplicas(
+ id, QUASI_CLOSED, sequenceID - 1, originNodeId,
+ randomDatanodeDetails());
+ DatanodeDetails decommissioning =
+ MockDatanodeDetails.randomDatanodeDetails();
+ decommissioning.setPersistedOpState(DECOMMISSIONING);
+ final ContainerReplica unhealthyReplica = getReplicas(
+ id, UNHEALTHY, sequenceID, decommissioning.getUuid(),
+ decommissioning);
+
+ containerStateManager.addContainer(container.getProtobuf());
+ containerStateManager.updateContainerReplica(id, replica1);
+ containerStateManager.updateContainerReplica(id, replica2);
+ containerStateManager.updateContainerReplica(id, unhealthyReplica);
+
+ replicationManager.processAll();
+ eventQueue.processAll(1000);
+
+ Assertions.assertEquals(1,
+ datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.replicateContainerCommand));
+ Assertions.assertEquals(1,
+ replicationManager.getContainerReport().getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+
+ // the following code asserts that the replicate command was sent for
+ // either of the QUASI_CLOSED replicas
+ CommandForDatanode command =
+ datanodeCommandHandler.getReceivedCommands().iterator().next();
+ Assertions.assertEquals(SCMCommandProto.Type.replicateContainerCommand,
+ command.getCommand().getType());
+ ReplicateContainerCommand replicateCommand =
+ (ReplicateContainerCommand) command.getCommand();
+ List<DatanodeDetails> sourceDatanodes =
+ replicateCommand.getSourceDatanodes();
+ Assertions.assertEquals(2, sourceDatanodes.size());
+ Assertions.assertFalse(
+ sourceDatanodes.contains(unhealthyReplica.getDatanodeDetails()));
+ Assertions.assertEquals(0,
+ datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.deleteContainerCommand));
+
+ // now, add a QUASI_CLOSED replica, which is a copy of replica1
+ ContainerReplica newReplica =
+ getReplicas(container.containerID(), QUASI_CLOSED,
+ container.getSequenceId() - 1, replica1.getOriginDatanodeId(),
+ MockDatanodeDetails.createDatanodeDetails(
+ command.getDatanodeId()));
+ containerStateManager.updateContainerReplica(container.containerID(),
+ newReplica);
+ datanodeCommandHandler.clearState();
+ replicationManager.processAll();
+ eventQueue.processAll(1000);
+
+ Assertions.assertEquals(1,
+ datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.replicateContainerCommand));
+ Assertions.assertEquals(1,
+ replicationManager.getContainerReport().getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ Assertions.assertEquals(1,
+ datanodeCommandHandler.getReceivedCommands().size());
+ command = datanodeCommandHandler.getReceivedCommands().iterator().next();
+ Assertions.assertEquals(SCMCommandProto.Type.replicateContainerCommand,
+ command.getCommand().getType());
+ replicateCommand = (ReplicateContainerCommand) command.getCommand();
+ Assertions.assertEquals(1, replicateCommand.getSourceDatanodes().size());
+ Assertions.assertEquals(unhealthyReplica.getDatanodeDetails(),
+ replicateCommand.getSourceDatanodes().iterator().next());
+ Assertions.assertEquals(0,
+ datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.deleteContainerCommand));
+ }
+
/**
* 2 closed replica
* 1 quasi-closed replicas
@@ -3182,5 +3383,11 @@ public class TestLegacyReplicationManager {
dc.getCommand().getType().equals(type) &&
dc.getDatanodeId().equals(datanode.getUuid()));
}
+
+ private void clearState() {
+ commands.clear();
+ invocation.set(0);
+ commandInvocation.clear();
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]