This is an automated email from the ASF dual-hosted git repository.

siddhant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new a81773bafd HDDS-9353. ReplicationManager: Ignore any Datanodes that 
are not in-service and healthy when finding unique origins (#5650)
a81773bafd is described below

commit a81773bafdfe405e740936b10b743f1013942e60
Author: Siddhant Sangwan <[email protected]>
AuthorDate: Thu Nov 23 12:30:57 2023 +0530

    HDDS-9353. ReplicationManager: Ignore any Datanodes that are not in-service 
and healthy when finding unique origins (#5650)
---
 .../replication/LegacyReplicationManager.java      | 43 ++++--------
 .../replication/RatisOverReplicationHandler.java   | 56 +++++++---------
 .../replication/RatisUnderReplicationHandler.java  |  6 ++
 .../replication/ReplicationManagerUtil.java        | 78 +++++++++++++++++++---
 .../replication/TestLegacyReplicationManager.java  |  5 +-
 .../TestRatisOverReplicationHandler.java           | 32 ++++++++-
 6 files changed, 147 insertions(+), 73 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
index fc27260520..9a51537028 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
@@ -87,7 +87,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -1267,6 +1266,8 @@ public class LegacyReplicationManager {
               try {
                 return nodeManager.getNodeStatus(dnd);
               } catch (NodeNotFoundException e) {
+                LOG.warn("Exception while finding an unhealthy replica to " +
+                    "delete for container {}.", container, e);
                 return null;
               }
             });
@@ -2354,38 +2355,20 @@ public class LegacyReplicationManager {
     // by an existing replica.
     // TODO topology handling must be improved to make an optimal
     //  choice as to which replica to keep.
-
-    // Gather the origin node IDs of replicas which are not candidates for
-    // deletion.
-    Set<UUID> existingOriginNodeIDs = allReplicas.stream()
-        .filter(r -> !deleteCandidates.contains(r))
-        .filter(
-            r -> {
+    Set<ContainerReplica> allReplicasSet = new HashSet<>(allReplicas);
+    List<ContainerReplica> nonUniqueDeleteCandidates =
+        ReplicationManagerUtil.findNonUniqueDeleteCandidates(allReplicasSet,
+            deleteCandidates, (dnd) -> {
               try {
-                return nodeManager.getNodeStatus(r.getDatanodeDetails())
-                    .isHealthy();
+                return nodeManager.getNodeStatus(dnd);
               } catch (NodeNotFoundException e) {
-                LOG.warn("Exception when checking replica {} for container {}" 
+
-                    " while deleting excess UNHEALTHY.", r, container, e);
-                return false;
+                LOG.warn(
+                    "Exception while finding excess unhealthy replicas to " +
+                        "delete for container {} with replicas {}.", container,
+                    allReplicas, e);
+                return null;
               }
-            })
-        .filter(r -> r.getDatanodeDetails().getPersistedOpState()
-            .equals(IN_SERVICE))
-        .map(ContainerReplica::getOriginDatanodeId)
-        .collect(Collectors.toSet());
-
-    List<ContainerReplica> nonUniqueDeleteCandidates = new ArrayList<>();
-    for (ContainerReplica replica: deleteCandidates) {
-      if (existingOriginNodeIDs.contains(replica.getOriginDatanodeId())) {
-        nonUniqueDeleteCandidates.add(replica);
-      } else {
-        // Spare this replica with this new origin node ID from deletion.
-        // delete candidates seen later in the loop with this same origin
-        // node ID can be deleted.
-        existingOriginNodeIDs.add(replica.getOriginDatanodeId());
-      }
-    }
+            });
 
     if (LOG.isDebugEnabled() && nonUniqueDeleteCandidates.size() < excess) {
       LOG.debug("Unable to delete {} excess replicas of container {}. Only {}" 
+
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
index dd00e01ba7..7974e72bac 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdds.scm.container.replication;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
@@ -34,11 +33,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 import java.util.stream.Collectors;
 
 /**
@@ -165,13 +161,6 @@ public class RatisOverReplicationHandler
         sortReplicas(replicaCount.getReplicas(),
             replicaCount.getHealthyReplicaCount() == 0);
 
-    // retain one replica per unique origin datanode if the container is not
-    // closed
-    if (replicaCount.getContainer().getState() !=
-        HddsProtos.LifeCycleState.CLOSED) {
-      saveReplicasWithUniqueOrigins(eligibleReplicas);
-    }
-
     Set<DatanodeDetails> pendingDeletion = new HashSet<>();
     // collect the DNs that are going to have their container replica deleted
     for (ContainerReplicaOp op : pendingOps) {
@@ -187,6 +176,14 @@ public class RatisOverReplicationHandler
             HddsProtos.NodeOperationalState.IN_SERVICE ||
             pendingDeletion.contains(replica.getDatanodeDetails()));
 
+    // retain one replica per unique origin datanode if the container is not
+    // closed
+    if (replicaCount.getContainer().getState() !=
+        HddsProtos.LifeCycleState.CLOSED) {
+      saveReplicasWithUniqueOrigins(replicaCount.getContainer(),
+          eligibleReplicas);
+    }
+
     return eligibleReplicas;
   }
 
@@ -199,29 +196,26 @@ public class RatisOverReplicationHandler
    * @param eligibleReplicas List of replicas that are eligible to be deleted
    * and from which replicas with unique origin node ID need to be saved
    */
-  private void saveReplicasWithUniqueOrigins(
+  private void saveReplicasWithUniqueOrigins(ContainerInfo container,
       List<ContainerReplica> eligibleReplicas) {
-    final Map<UUID, ContainerReplica> uniqueOrigins = new LinkedHashMap<>();
-    eligibleReplicas.stream()
-        // get unique origin nodes of healthy replicas
-        .filter(r -> r.getState() != ContainerReplicaProto.State.UNHEALTHY)
-        .forEach(r -> uniqueOrigins.putIfAbsent(r.getOriginDatanodeId(), r));
-
-    /*
-     Now that we've checked healthy replicas, see if some unhealthy replicas
-     need to be saved. For example, in the case of {QUASI_CLOSED,
-     QUASI_CLOSED, QUASI_CLOSED, UNHEALTHY}, if both the first and last
-     replicas have the same origin node ID (and no other replicas have it), we
-     prefer saving the QUASI_CLOSED replica and deleting the UNHEALTHY one.
-     */
-    for (ContainerReplica replica : eligibleReplicas) {
-      if (replica.getState() == ContainerReplicaProto.State.UNHEALTHY) {
-        uniqueOrigins.putIfAbsent(replica.getOriginDatanodeId(), replica);
-      }
-    }
+    List<ContainerReplica> nonUniqueDeleteCandidates =
+        ReplicationManagerUtil.findNonUniqueDeleteCandidates(
+            new HashSet<>(eligibleReplicas),
+            eligibleReplicas, (dnd) -> {
+              try {
+                return replicationManager.getNodeStatus(dnd);
+              } catch (NodeNotFoundException e) {
+                LOG.warn(
+                    "Exception while finding excess unhealthy replicas to " +
+                        "delete for container {} with eligible replicas {}.",
+                    container, eligibleReplicas, e);
+                return null;
+              }
+            });
 
     // note that this preserves order of the List
-    eligibleReplicas.removeAll(uniqueOrigins.values());
+    eligibleReplicas.removeIf(
+        replica -> !nonUniqueDeleteCandidates.contains(replica));
   }
 
   /**
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
index 5af7a16df2..98c19d16ff 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
@@ -154,6 +154,9 @@ public class RatisUnderReplicationHandler
   private void removeUnhealthyReplicaIfPossible(ContainerInfo containerInfo,
       Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps)
       throws NotLeaderException {
+    LOG.info("Finding an unhealthy replica to delete for container {} with " +
+        "replicas {} to unblock under replication handling.", containerInfo,
+        replicas);
     int pendingDeletes = 0;
     for (ContainerReplicaOp op : pendingOps) {
       if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
@@ -167,6 +170,9 @@ public class RatisUnderReplicationHandler
               try {
                 return replicationManager.getNodeStatus(dnd);
               } catch (NodeNotFoundException e) {
+                LOG.warn("Exception while finding an unhealthy replica to " +
+                    "delete for container {} with replicas {}.", containerInfo,
+                    replicas, e);
                 return null;
               }
             });
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
index f355030599..076a81e69b 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
@@ -266,35 +266,93 @@ public final class ReplicationManagerUtil {
 
     if (containerInfo.getState() == HddsProtos.LifeCycleState.QUASI_CLOSED) {
       List<ContainerReplica> nonUniqueOrigins =
-          findNonUniqueDeleteCandidates(replicas, deleteCandidates);
+          findNonUniqueDeleteCandidates(replicas, deleteCandidates,
+              nodeStatusFn);
       return nonUniqueOrigins.size() > 0 ? nonUniqueOrigins.get(0) : null;
     }
     return null;
   }
 
-  private static List<ContainerReplica> findNonUniqueDeleteCandidates(
+  /**
+   * Given a list of all replicas (including deleteCandidates), finds and
+   * returns replicas which don't have unique origin node IDs. This method
+   * preserves the order of the passed deleteCandidates list. This means that
+   * the order of the returned list depends on the order of deleteCandidates;
+   * if the same deleteCandidates with the same order is passed into this
+   * method on different invocations, the order of the returned list will be
+   * the same on each invocation. To protect against different SCMs
+   * (old leader and new leader) selecting different replicas to delete,
+   * callers of this method are responsible for ensuring consistent ordering.
+   * @see
+   * <a href="https://issues.apache.org/jira/browse/HDDS-4589";>HDDS-4589</a>
+   * @param allReplicas all replicas of this container including
+   * deleteCandidates
+   * @param deleteCandidates replicas that are being considered for deletion
+   * @param nodeStatusFn a Function that can be called to check the
+   * NodeStatus of each DN
+   * @return a List of replicas that can be deleted because they do not have
+   * unique origin node IDs
+   */
+  static List<ContainerReplica> findNonUniqueDeleteCandidates(
       Set<ContainerReplica> allReplicas,
-      List<ContainerReplica> deleteCandidates) {
+      List<ContainerReplica> deleteCandidates,
+      Function<DatanodeDetails, NodeStatus> nodeStatusFn) {
     // Gather the origin node IDs of replicas which are not candidates for
     // deletion.
     Set<UUID> existingOriginNodeIDs = allReplicas.stream()
         .filter(r -> !deleteCandidates.contains(r))
+        .filter(r -> {
+          NodeStatus status = nodeStatusFn.apply(r.getDatanodeDetails());
+          /*
+           Replicas on datanodes that are not in-service, healthy are not
+           valid because it's likely they will be gone soon or are already
+           lost. See https://issues.apache.org/jira/browse/HDDS-9352. This
+           means that these replicas don't count as having unique origin IDs.
+           */
+          return status != null && status.isHealthy() && status.isInService();
+        })
         .map(ContainerReplica::getOriginDatanodeId)
         .collect(Collectors.toSet());
 
+    /*
+    In the case of {QUASI_CLOSED, QUASI_CLOSED, QUASI_CLOSED, UNHEALTHY}, if
+    both the first and last replicas have the same origin node ID (and no
+    other replicas have it), we prefer saving the QUASI_CLOSED replica and
+    deleting the UNHEALTHY one. So, we'll first loop through healthy replicas
+    and check for uniqueness.
+     */
     List<ContainerReplica> nonUniqueDeleteCandidates = new ArrayList<>();
     for (ContainerReplica replica : deleteCandidates) {
-      if (existingOriginNodeIDs.contains(replica.getOriginDatanodeId())) {
-        nonUniqueDeleteCandidates.add(replica);
-      } else {
-        // Spare this replica with this new origin node ID from deletion.
-        // delete candidates seen later in the loop with this same origin
-        // node ID can be deleted.
-        existingOriginNodeIDs.add(replica.getOriginDatanodeId());
+      if (replica.getState() == ContainerReplicaProto.State.UNHEALTHY) {
+        continue;
       }
+      checkUniqueness(existingOriginNodeIDs, nonUniqueDeleteCandidates,
+          replica);
+    }
+
+    // now, see which UNHEALTHY replicas are not unique and can be deleted
+    for (ContainerReplica replica : deleteCandidates) {
+      if (replica.getState() != ContainerReplicaProto.State.UNHEALTHY) {
+        continue;
+      }
+      checkUniqueness(existingOriginNodeIDs, nonUniqueDeleteCandidates,
+          replica);
     }
 
     return nonUniqueDeleteCandidates;
   }
 
+  private static void checkUniqueness(Set<UUID> existingOriginNodeIDs,
+      List<ContainerReplica> nonUniqueDeleteCandidates,
+      ContainerReplica replica) {
+    if (existingOriginNodeIDs.contains(replica.getOriginDatanodeId())) {
+      nonUniqueDeleteCandidates.add(replica);
+    } else {
+      // Spare this replica with this new origin node ID from deletion.
+      // delete candidates seen later with this same origin node ID can be
+      // deleted.
+      existingOriginNodeIDs.add(replica.getOriginDatanodeId());
+    }
+  }
+
 }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
index 561a4c2895..43a2fb3263 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
@@ -941,7 +941,8 @@ public class TestLegacyReplicationManager {
       final ContainerReplica replica5 = getReplicas(
           id, UNHEALTHY, 1000L, replica4.getOriginDatanodeId(),
           randomDatanodeDetails());
-      replica5.getDatanodeDetails().setPersistedOpState(DECOMMISSIONING);
+      nodeManager.register(replica5.getDatanodeDetails(),
+          new NodeStatus(DECOMMISSIONING, HEALTHY));
       DatanodeDetails deadNode = randomDatanodeDetails();
       nodeManager.register(deadNode, NodeStatus.inServiceDead());
       final ContainerReplica replica6 = getReplicas(
@@ -1006,6 +1007,8 @@ public class TestLegacyReplicationManager {
       DatanodeDetails decommissioning =
           MockDatanodeDetails.randomDatanodeDetails();
       decommissioning.setPersistedOpState(DECOMMISSIONING);
+      nodeManager.register(decommissioning,
+          new NodeStatus(DECOMMISSIONING, HEALTHY));
       final ContainerReplica replica4 = getReplicas(
           id, UNHEALTHY, sequenceID, decommissioning.getUuid(),
           decommissioning);
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
index 5a9fd20288..0e2070aac0 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestRatisOverReplicationHandler.java
@@ -51,6 +51,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
 import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainer;
 import static 
org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil.createContainerReplica;
@@ -164,7 +165,7 @@ public class TestRatisOverReplicationHandler {
    */
   @Test
   public void testOverReplicatedQuasiClosedContainerWithDifferentOrigins()
-      throws IOException {
+      throws IOException, NodeNotFoundException {
     container = createContainer(HddsProtos.LifeCycleState.QUASI_CLOSED,
         RATIS_REPLICATION_CONFIG);
     Set<ContainerReplica> replicas = createReplicas(container.containerID(),
@@ -181,6 +182,35 @@ public class TestRatisOverReplicationHandler {
 
     testProcessing(replicas, Collections.emptyList(),
         getOverReplicatedHealthResult(), 0);
+
+    /*
+    Now, introduce two UNHEALTHY replicas that share the same origin node as
+    the existing UNHEALTHY replica. They're on decommissioning and stale
+    nodes, respectively. Still no replica should be deleted, because these are
+    likely going away soon anyway.
+     */
+    replicas.add(
+        createContainerReplica(container.containerID(), 0, DECOMMISSIONING,
+            State.UNHEALTHY, container.getNumberOfKeys(),
+            container.getUsedBytes(),
+            MockDatanodeDetails.randomDatanodeDetails(),
+            unhealthyReplica.getOriginDatanodeId()));
+    DatanodeDetails staleNode =
+        MockDatanodeDetails.randomDatanodeDetails();
+    replicas.add(
+        createContainerReplica(container.containerID(), 0, IN_SERVICE,
+            State.UNHEALTHY, container.getNumberOfKeys(),
+            container.getUsedBytes(), staleNode,
+            unhealthyReplica.getOriginDatanodeId()));
+    Mockito.when(replicationManager.getNodeStatus(eq(staleNode)))
+        .thenAnswer(invocation -> {
+          DatanodeDetails dd = invocation.getArgument(0);
+          return new NodeStatus(dd.getPersistedOpState(),
+              HddsProtos.NodeState.STALE, 0);
+        });
+
+    testProcessing(replicas, Collections.emptyList(),
+        getOverReplicatedHealthResult(), 0);
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to