This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new fb11477eeb HDDS-9321. LegacyReplicationManager: Save UNHEALTHY 
replicas with highest BCSID for a QUASI_CLOSED container (#5366)
fb11477eeb is described below

commit fb11477eeb95f646a27341d798bb15b1d5b65fec
Author: Siddhant Sangwan <[email protected]>
AuthorDate: Thu Sep 28 15:09:13 2023 +0530

    HDDS-9321. LegacyReplicationManager: Save UNHEALTHY replicas with highest 
BCSID for a QUASI_CLOSED container (#5366)
---
 .../replication/LegacyReplicationManager.java      |  86 +++++++++
 .../replication/RatisContainerReplicaCount.java    |  83 +++++++++
 .../replication/TestLegacyReplicationManager.java  | 207 +++++++++++++++++++++
 3 files changed, 376 insertions(+)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
index 3154b2e52f..ee3a442967 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.container.replication;
 
+import com.google.common.collect.ImmutableList;
 import com.google.protobuf.Message;
 import org.apache.hadoop.hdds.conf.Config;
 import org.apache.hadoop.hdds.conf.ConfigGroup;
@@ -533,6 +534,21 @@ public class LegacyReplicationManager {
           return;
         }
 
+        /*
+         * A QUASI_CLOSED container may have some UNHEALTHY replicas with the
+         * same Sequence ID as the container. RM should try to maintain one
+         * copy of such replicas when there are no healthy replicas that
+         * match the container's Sequence ID.
+         */
+        List<ContainerReplica> vulnerableUnhealthy =
+            replicaSet.getVulnerableUnhealthyReplicas(nodeManager);
+        if (!vulnerableUnhealthy.isEmpty()) {
+          report.incrementAndSample(HealthState.UNDER_REPLICATED,
+              container.containerID());
+          handleVulnerableUnhealthyReplicas(replicaSet, vulnerableUnhealthy);
+          return;
+        }
+
         /*
          * Check if the container is over replicated and take appropriate
          * action.
@@ -563,6 +579,37 @@ public class LegacyReplicationManager {
     }
   }
 
+  /**
+   * Sends a replicate command for each replica specified in
+   * vulnerableUnhealthy.
+   * @param replicaCount RatisContainerReplicaCount for this container
+   * @param vulnerableUnhealthy List of UNHEALTHY replicas that need to be
+   * replicated
+   */
+  private void handleVulnerableUnhealthyReplicas(
+      RatisContainerReplicaCount replicaCount,
+      List<ContainerReplica> vulnerableUnhealthy) {
+    ContainerInfo container = replicaCount.getContainer();
+    LOG.debug("Handling vulnerable UNHEALTHY replicas {} for container {}.",
+        vulnerableUnhealthy, container);
+    int pendingAdds = getInflightAdd(container.containerID());
+    if (pendingAdds >= vulnerableUnhealthy.size()) {
+      LOG.debug("There are {} pending adds for container {}, while " +
+              "the number of UNHEALTHY replicas is {}.", pendingAdds,
+          container.containerID(), vulnerableUnhealthy.size());
+      return;
+    }
+
+    /*
+    Since we're replicating UNHEALTHY replicas, it's possible that
+    replication keeps on failing. Shuffling gives other replicas a chance to be
+    replicated since there's a limit on inflight adds.
+     */
+    Collections.shuffle(vulnerableUnhealthy);
+    replicateEachSource(container, vulnerableUnhealthy,
+        replicaCount.getReplicas());
+  }
+
   private void updateCompletedReplicationMetrics(ContainerInfo container,
       InflightAction action) {
     metrics.incrReplicasCreatedTotal();
@@ -2383,6 +2430,45 @@ public class LegacyReplicationManager {
     }
   }
 
+  /**
+   * Replicates each of the ContainerReplica specified in sources to new
+   * Datanodes. Will not consider Datanodes hosting existing replicas and
+   * Datanodes pending adds as targets. Note that this method simply skips
+   * the replica if there's an exception.
+   * @param container Container whose replicas are specified as sources
+   * @param sources List containing replicas, each will be replicated
+   * @param allReplicas all existing replicas of this container
+   */
+  private void replicateEachSource(ContainerInfo container,
+      List<ContainerReplica> sources, List<ContainerReplica> allReplicas) {
+    final List<DatanodeDetails> excludeList = allReplicas.stream()
+        .map(ContainerReplica::getDatanodeDetails)
+        .collect(Collectors.toList());
+
+    for (ContainerReplica replica : sources) {
+      // also exclude any DNs pending to receive a replica of this container
+      final List<DatanodeDetails> replicationInFlight
+          = inflightReplication.getDatanodeDetails(container.containerID());
+      for (DatanodeDetails dn : replicationInFlight) {
+        if (!excludeList.contains(dn)) {
+          excludeList.add(dn);
+        }
+      }
+
+      try {
+        final List<DatanodeDetails> target =
+            ReplicationManagerUtil.getTargetDatanodes(containerPlacement,
+                1, null, excludeList, currentContainerSize,
+                container);
+        sendReplicateCommand(container, target.iterator().next(),
+            ImmutableList.of(replica.getDatanodeDetails()));
+      } catch (SCMException e) {
+        LOG.warn("Exception while trying to replicate {} of container {}.",
+            replica, container, e);
+      }
+    }
+  }
+
   private void closeEmptyContainer(ContainerInfo containerInfo) {
     /*
      * We should wait for sometime before moving the container to CLOSED state.
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java
index e8ccaebfbf..0c6052ef35 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisContainerReplicaCount.java
@@ -24,18 +24,23 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import 
org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.OverReplicatedHealthResult;
 import 
org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult.UnderReplicatedHealthResult;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
 import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.compareState;
 
 /**
@@ -416,6 +421,84 @@ public class RatisContainerReplicaCount implements 
ContainerReplicaCount {
     return isSufficientlyReplicated();
   }
 
+  /**
+   * QUASI_CLOSED containers that have a mix of healthy and UNHEALTHY
+   * replicas require special treatment. If the healthy replicas don't have
+   * the same BCSID as the container, but the UNHEALTHY ones do, then we need
+   * to save at least one copy of each such UNHEALTHY replica. This method
+   * finds such UNHEALTHY replicas.
+   *
+   * @param nodeManager an instance of NodeManager
+   * @return List of UNHEALTHY replicas with the greatest Sequence ID that
+   * need to be replicated to other nodes. Empty list if this container is not
+   * QUASI_CLOSED, doesn't have a mix of healthy and UNHEALTHY replicas, or
+   * if there are no replicas that need to be saved.
+   */
+  List<ContainerReplica> getVulnerableUnhealthyReplicas(
+      NodeManager nodeManager) {
+    if (container.getState() != HddsProtos.LifeCycleState.QUASI_CLOSED) {
+      // this method is only relevant for QUASI_CLOSED containers
+      return Collections.emptyList();
+    }
+
+    boolean foundHealthy = false;
+    List<ContainerReplica> unhealthyReplicas = new ArrayList<>();
+    for (ContainerReplica replica : replicas) {
+      if (replica.getState() != ContainerReplicaProto.State.UNHEALTHY) {
+        foundHealthy = true;
+      }
+
+      if (replica.getSequenceId() == container.getSequenceId()) {
+        if (replica.getState() == ContainerReplicaProto.State.UNHEALTHY) {
+          unhealthyReplicas.add(replica);
+        } else if (replica.getState() ==
+            ContainerReplicaProto.State.QUASI_CLOSED) {
+          // don't need to save UNHEALTHY replicas if there's a QUASI_CLOSED
+          // replica with the greatest Sequence ID.
+          return Collections.emptyList();
+        }
+      }
+    }
+    if (!foundHealthy) {
+      // this method is only relevant when there's a mix of healthy and
+      // unhealthy replicas
+      return Collections.emptyList();
+    }
+
+    unhealthyReplicas.removeIf(
+        replica -> {
+          try {
+            return !nodeManager.getNodeStatus(replica.getDatanodeDetails())
+                .isHealthy();
+          } catch (NodeNotFoundException e) {
+            return true;
+          }
+        });
+    /*
+    At this point, the list of unhealthyReplicas contains all UNHEALTHY
+    replicas with the greatest Sequence ID that are on healthy Datanodes.
+    Note that this also includes multiple copies of the same UNHEALTHY
+    replica, that is, replicas with the same Origin ID. We need to consider
+    the fact that replicas can be uniquely unhealthy. That is, 2 UNHEALTHY
+    replicas will difference Origin ID need not be exact copies of each other.
+
+    Replicas that don't have at least one instance (multiple instances of a
+    replica will have the same Origin ID) on an IN_SERVICE node are
+    vulnerable and need to be saved.
+     */
+    // TODO should we also consider pending deletes?
+    Set<UUID> originsOfInServiceReplicas = new HashSet<>();
+    for (ContainerReplica replica : unhealthyReplicas) {
+      if (replica.getDatanodeDetails().getPersistedOpState()
+          .equals(IN_SERVICE)) {
+        originsOfInServiceReplicas.add(replica.getOriginDatanodeId());
+      }
+    }
+    unhealthyReplicas.removeIf(replica -> originsOfInServiceReplicas.contains(
+        replica.getOriginDatanodeId()));
+    return unhealthyReplicas;
+  }
+
   /**
    * Return true if the container is sufficiently replicated. Decommissioning
    * and Decommissioned containers are ignored in this check, assuming they 
will
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
index 4919adb4bb..dfbbc13f0d 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
@@ -950,6 +950,207 @@ public class TestLegacyReplicationManager {
               SCMCommandProto.Type.deleteContainerCommand));
     }
 
+    /**
+     * Situation: QUASI_CLOSED container with 3 QUASI_CLOSED replicas
+     * whose Sequence ID is smaller than the container's. There are 2
+     * UNHEALTHY replicas with Sequence ID same as the container's. One of
+     * them is on a decommissioning node.
+     *
+     * Expectation: Replication command should be sent for the UNHEALTHY
+     * replica on the decommissioning node.
+     */
+    @Test
+    public void testQuasiClosedHavingUnhealthyReplicaWithGreatestBCSID()
+        throws IOException {
+      final ContainerInfo container = 
getContainer(LifeCycleState.QUASI_CLOSED);
+      container.setUsedBytes(100);
+      final ContainerID id = container.containerID();
+      final UUID originNodeId = UUID.randomUUID();
+      long sequenceID = container.getSequenceId();
+      final ContainerReplica replica1 =
+          getReplicas(id, QUASI_CLOSED, sequenceID - 1, originNodeId,
+              randomDatanodeDetails());
+      final ContainerReplica replica2 = getReplicas(
+          id, QUASI_CLOSED, sequenceID - 1, originNodeId,
+          randomDatanodeDetails());
+      final ContainerReplica replica3 = getReplicas(
+          id, QUASI_CLOSED, sequenceID - 1, originNodeId,
+          randomDatanodeDetails());
+      DatanodeDetails decommissioning =
+          MockDatanodeDetails.randomDatanodeDetails();
+      decommissioning.setPersistedOpState(DECOMMISSIONING);
+      final ContainerReplica replica4 = getReplicas(
+          id, UNHEALTHY, sequenceID, decommissioning.getUuid(),
+          decommissioning);
+      final ContainerReplica replica5 = getReplicas(
+          id, UNHEALTHY, sequenceID, randomDatanodeDetails().getUuid(),
+          randomDatanodeDetails());
+
+      containerStateManager.addContainer(container.getProtobuf());
+      containerStateManager.updateContainerReplica(id, replica1);
+      containerStateManager.updateContainerReplica(id, replica2);
+      containerStateManager.updateContainerReplica(id, replica3);
+      containerStateManager.updateContainerReplica(id, replica4);
+      containerStateManager.updateContainerReplica(id, replica5);
+
+      replicationManager.processAll();
+      eventQueue.processAll(1000);
+
+      // 1 replicate command should have been sent
+      Assertions.assertEquals(1,
+          datanodeCommandHandler.getInvocationCount(
+              SCMCommandProto.Type.replicateContainerCommand));
+      Assertions.assertEquals(1,
+          replicationManager.getContainerReport().getStat(
+              ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+
+      // the following code asserts that replicate was sent for the UNHEALTHY
+      // replica on the decommissioning node
+      CommandForDatanode command =
+          datanodeCommandHandler.getReceivedCommands().iterator().next();
+      Assertions.assertEquals(SCMCommandProto.Type.replicateContainerCommand,
+          command.getCommand().getType());
+      ReplicateContainerCommand replicateCommand =
+          (ReplicateContainerCommand) command.getCommand();
+      Assertions.assertEquals(1, replicateCommand.getSourceDatanodes().size());
+      Assertions.assertEquals(replica4.getDatanodeDetails(),
+          replicateCommand.getSourceDatanodes().iterator().next());
+
+      Assertions.assertEquals(0,
+          datanodeCommandHandler.getInvocationCount(
+              SCMCommandProto.Type.deleteContainerCommand));
+
+      // If we don't complete the pending add by the next iteration, it's
+      // expected that another replicate command is not sent.
+      replicationManager.processAll();
+      eventQueue.processAll(100);
+
+      // that 1 command is the one RM sent in the last iteration
+      Assertions.assertEquals(1,
+          datanodeCommandHandler.getInvocationCount(
+              SCMCommandProto.Type.replicateContainerCommand));
+      Assertions.assertEquals(0,
+          datanodeCommandHandler.getInvocationCount(
+              SCMCommandProto.Type.deleteContainerCommand));
+
+      // Now, we will complete the add. Expectation is that no new commands
+      // should be sent in the next iteration.
+      ContainerReplica newReplica =
+          getReplicas(container.containerID(), UNHEALTHY,
+              container.getSequenceId(), replica4.getOriginDatanodeId(),
+              MockDatanodeDetails.createDatanodeDetails(
+                  command.getDatanodeId()));
+      containerStateManager.updateContainerReplica(container.containerID(),
+          newReplica);
+
+      replicationManager.processAll();
+      eventQueue.processAll(100);
+
+      // that 1 command is the one RM sent in the last iteration
+      Assertions.assertEquals(1,
+          datanodeCommandHandler.getInvocationCount(
+              SCMCommandProto.Type.replicateContainerCommand));
+      Assertions.assertEquals(0,
+          datanodeCommandHandler.getInvocationCount(
+              SCMCommandProto.Type.deleteContainerCommand));
+    }
+
+    /**
+     * Situation: QUASI_CLOSED container with 2 QUASI_CLOSED replicas
+     * whose Sequence ID is smaller than the container's. There's 1
+     * UNHEALTHY replica with Sequence ID same as the container's and on a
+     * decommissioning node.
+     * <p>
+     * Expectation: First, one of the QUASI_CLOSED should be replicated to
+     * get 3 of them. In the next iteration, the UNHEALTHY replica should be
+     * replicated. This also verifies that HDDS-9321 does not introduce a
+     * regression in the under replication flow.
+     */
+    @Test
+    public void testUnderRepQuasiClosedHavingUnhealthyWithGreatestBCSID()
+        throws IOException {
+      final ContainerInfo container = 
getContainer(LifeCycleState.QUASI_CLOSED);
+      container.setUsedBytes(100);
+      final ContainerID id = container.containerID();
+      final UUID originNodeId = UUID.randomUUID();
+      long sequenceID = container.getSequenceId();
+      final ContainerReplica replica1 =
+          getReplicas(id, QUASI_CLOSED, sequenceID - 1, originNodeId,
+              randomDatanodeDetails());
+      final ContainerReplica replica2 = getReplicas(
+          id, QUASI_CLOSED, sequenceID - 1, originNodeId,
+          randomDatanodeDetails());
+      DatanodeDetails decommissioning =
+          MockDatanodeDetails.randomDatanodeDetails();
+      decommissioning.setPersistedOpState(DECOMMISSIONING);
+      final ContainerReplica unhealthyReplica = getReplicas(
+          id, UNHEALTHY, sequenceID, decommissioning.getUuid(),
+          decommissioning);
+
+      containerStateManager.addContainer(container.getProtobuf());
+      containerStateManager.updateContainerReplica(id, replica1);
+      containerStateManager.updateContainerReplica(id, replica2);
+      containerStateManager.updateContainerReplica(id, unhealthyReplica);
+
+      replicationManager.processAll();
+      eventQueue.processAll(1000);
+
+      Assertions.assertEquals(1,
+          datanodeCommandHandler.getInvocationCount(
+              SCMCommandProto.Type.replicateContainerCommand));
+      Assertions.assertEquals(1,
+          replicationManager.getContainerReport().getStat(
+              ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+
+      // the following code asserts that the replicate command was sent for
+      // either of the QUASI_CLOSED replicas
+      CommandForDatanode command =
+          datanodeCommandHandler.getReceivedCommands().iterator().next();
+      Assertions.assertEquals(SCMCommandProto.Type.replicateContainerCommand,
+          command.getCommand().getType());
+      ReplicateContainerCommand replicateCommand =
+          (ReplicateContainerCommand) command.getCommand();
+      List<DatanodeDetails> sourceDatanodes =
+          replicateCommand.getSourceDatanodes();
+      Assertions.assertEquals(2, sourceDatanodes.size());
+      Assertions.assertFalse(
+          sourceDatanodes.contains(unhealthyReplica.getDatanodeDetails()));
+      Assertions.assertEquals(0,
+          datanodeCommandHandler.getInvocationCount(
+              SCMCommandProto.Type.deleteContainerCommand));
+
+      // now, add a QUASI_CLOSED replica, which is a copy of replica1
+      ContainerReplica newReplica =
+          getReplicas(container.containerID(), QUASI_CLOSED,
+              container.getSequenceId() - 1, replica1.getOriginDatanodeId(),
+              MockDatanodeDetails.createDatanodeDetails(
+                  command.getDatanodeId()));
+      containerStateManager.updateContainerReplica(container.containerID(),
+          newReplica);
+      datanodeCommandHandler.clearState();
+      replicationManager.processAll();
+      eventQueue.processAll(1000);
+
+      Assertions.assertEquals(1,
+          datanodeCommandHandler.getInvocationCount(
+              SCMCommandProto.Type.replicateContainerCommand));
+      Assertions.assertEquals(1,
+          replicationManager.getContainerReport().getStat(
+              ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+      Assertions.assertEquals(1,
+          datanodeCommandHandler.getReceivedCommands().size());
+      command = datanodeCommandHandler.getReceivedCommands().iterator().next();
+      Assertions.assertEquals(SCMCommandProto.Type.replicateContainerCommand,
+          command.getCommand().getType());
+      replicateCommand = (ReplicateContainerCommand) command.getCommand();
+      Assertions.assertEquals(1, replicateCommand.getSourceDatanodes().size());
+      Assertions.assertEquals(unhealthyReplica.getDatanodeDetails(),
+          replicateCommand.getSourceDatanodes().iterator().next());
+      Assertions.assertEquals(0,
+          datanodeCommandHandler.getInvocationCount(
+              SCMCommandProto.Type.deleteContainerCommand));
+    }
+
     /**
      * 2 closed replica
      * 1 quasi-closed replicas
@@ -3182,5 +3383,11 @@ public class TestLegacyReplicationManager {
           dc.getCommand().getType().equals(type) &&
               dc.getDatanodeId().equals(datanode.getUuid()));
     }
+
+    private void clearState() {
+      commands.clear();
+      invocation.set(0);
+      commandInvocation.clear();
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to