This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ce118c635 HDDS-9254. Legacy replication manager uses mismatched 
replicas as replication sources (#5257)
1ce118c635 is described below

commit 1ce118c635020a52fb174f596b65def19303f348
Author: Ethan Rose <[email protected]>
AuthorDate: Tue Sep 19 14:08:30 2023 -0700

    HDDS-9254. Legacy replication manager uses mismatched replicas as 
replication sources (#5257)
---
 .../replication/LegacyReplicationManager.java      |  10 +-
 .../replication/TestLegacyReplicationManager.java  | 184 ++++++++++++++++++++-
 2 files changed, 192 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
index 82b9d04b0d..a142dfff8d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
@@ -1138,8 +1138,16 @@ public class LegacyReplicationManager {
           container.containerID());
     }
 
+    State matchingReplicaState = State.CLOSED;
+    if (container.getState() == LifeCycleState.QUASI_CLOSED) {
+      // If we are replicating quasi closed replicas, they should have the
+      // same origin node ID and therefore the same BCSID. If they have
+      // different origin node IDs, then we have 2/3 containers and it should
+      // have been closed before replicating.
+      matchingReplicaState = State.QUASI_CLOSED;
+    }
     List<ContainerReplica> replicationSources = 
getReplicationSources(container,
-        replicaSet.getReplicas(), State.CLOSED, State.QUASI_CLOSED);
+        replicaSet.getReplicas(), matchingReplicaState);
     // This method will handle topology even if replicasNeeded <= 0.
     try {
       replicateAnyWithTopology(container, replicationSources,
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
index 4afe59dc0d..8dc816e13c 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
@@ -66,8 +66,9 @@ import org.apache.hadoop.hdds.utils.db.LongCodec;
 import 
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
-import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.TestClock;
 import org.junit.jupiter.api.Disabled;
@@ -838,6 +839,104 @@ public class TestLegacyReplicationManager {
               SCMCommandProto.Type.deleteContainerCommand));
     }
 
+    /**
+     * 2 closed replica
+     * 1 quasi-closed replicas
+     * SCM state is closed.
+     * Expectation: The replicate container command should only contain the
+     * closed replicas as sources.
+     */
+    @Test
+    public void testOnlyMatchingClosedReplicasReplicated()
+        throws IOException {
+      final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
+      final ContainerID id = container.containerID();
+      final UUID originNodeId = UUID.randomUUID();
+      final ContainerReplica quasiReplica = getReplicas(
+          id, QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+      final ContainerReplica closedReplica1 = getReplicas(
+          id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+      final DatanodeDetails datanodeDetails = randomDatanodeDetails();
+      final ContainerReplica closedReplica2 = getReplicas(
+          id, State.CLOSED, 1000L, datanodeDetails.getUuid(), datanodeDetails);
+
+      containerStateManager.addContainer(container.getProtobuf());
+      containerStateManager.updateContainerReplica(id, quasiReplica);
+      containerStateManager.updateContainerReplica(id, closedReplica1);
+      containerStateManager.updateContainerReplica(id, closedReplica2);
+
+      replicationManager.processAll();
+      eventQueue.processAll(1000);
+      Assertions.assertEquals(1,
+          datanodeCommandHandler.getInvocationCount(
+              SCMCommandProto.Type.replicateContainerCommand));
+
+      Optional<CommandForDatanode> cmdOptional =
+          datanodeCommandHandler.getReceivedCommands().stream().findFirst();
+      Assertions.assertTrue(cmdOptional.isPresent());
+      SCMCommand<?> scmCmd = cmdOptional.get().getCommand();
+      Assertions.assertTrue(scmCmd instanceof ReplicateContainerCommand);
+      ReplicateContainerCommand repCmd = (ReplicateContainerCommand) scmCmd;
+
+      // Only the closed replicas should have been used as sources.
+      List<DatanodeDetails> repSources = repCmd.getSourceDatanodes();
+      Assertions.assertEquals(2, repSources.size());
+      Assertions.assertTrue(repSources.containsAll(
+          Arrays.asList(closedReplica1.getDatanodeDetails(),
+              closedReplica2.getDatanodeDetails())));
+      Assertions.assertFalse(
+          repSources.contains(quasiReplica.getDatanodeDetails()));
+    }
+
+    /**
+     * 2 quasi-closed replicas
+     * 1 unhealthy replica
+     * SCM state is quasi-closed.
+     * Expectation: The replicate container command should only contain the
+     * quasi-closed replicas as sources.
+     */
+    @Test
+    public void testOnlyMatchingQuasiClosedReplicasReplicated()
+        throws IOException {
+      final ContainerInfo container = 
getContainer(LifeCycleState.QUASI_CLOSED);
+      final ContainerID id = container.containerID();
+      final UUID originNodeId = UUID.randomUUID();
+      final ContainerReplica quasiReplica1 = getReplicas(
+          id, QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+      final ContainerReplica quasiReplica2 = getReplicas(
+          id, QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+      final DatanodeDetails datanodeDetails = randomDatanodeDetails();
+      final ContainerReplica unhealthyReplica = getReplicas(
+          id, UNHEALTHY, 1000L, datanodeDetails.getUuid(), datanodeDetails);
+
+      containerStateManager.addContainer(container.getProtobuf());
+      containerStateManager.updateContainerReplica(id, quasiReplica1);
+      containerStateManager.updateContainerReplica(id, quasiReplica2);
+      containerStateManager.updateContainerReplica(id, unhealthyReplica);
+
+      replicationManager.processAll();
+      eventQueue.processAll(1000);
+      Assertions.assertEquals(1,
+          datanodeCommandHandler.getInvocationCount(
+              SCMCommandProto.Type.replicateContainerCommand));
+
+      Optional<CommandForDatanode> cmdOptional =
+          datanodeCommandHandler.getReceivedCommands().stream().findFirst();
+      Assertions.assertTrue(cmdOptional.isPresent());
+      SCMCommand<?> scmCmd = cmdOptional.get().getCommand();
+      Assertions.assertTrue(scmCmd instanceof ReplicateContainerCommand);
+      ReplicateContainerCommand repCmd = (ReplicateContainerCommand) scmCmd;
+
+      // Only the quasi closed replicas should have been used as a sources.
+      List<DatanodeDetails> repSources = repCmd.getSourceDatanodes();
+      Assertions.assertEquals(2, repSources.size());
+      Assertions.assertTrue(repSources.containsAll(
+          Arrays.asList(quasiReplica1.getDatanodeDetails(),
+              quasiReplica2.getDatanodeDetails())));
+      Assertions.assertFalse(
+          repSources.contains(unhealthyReplica.getDatanodeDetails()));
+    }
+
     /**
      * Container is closed.
      * 2 quasi-closed replicas.
@@ -1766,6 +1865,89 @@ public class TestLegacyReplicationManager {
       Assertions.assertEquals(0, report.getStat(
           ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
     }
+
+    /**
+     * 2 quasi-closed replicas.
+     * 2 unique origin IDs with different BCSIDs.
+     * Expectation: Container is closed, then replicated on the next iteration.
+     */
+    @Test
+    public void testCloseableIsClosedBeforeReplication() throws IOException {
+      final ContainerInfo container = 
getContainer(LifeCycleState.QUASI_CLOSED);
+      final ContainerID id = container.containerID();
+      final ContainerReplica replicaOne = getReplicas(
+          id, QUASI_CLOSED, 1000L, UUID.randomUUID(), randomDatanodeDetails());
+      final ContainerReplica replicaTwo = getReplicas(
+          id, QUASI_CLOSED, 900L, UUID.randomUUID(), randomDatanodeDetails());
+      containerStateManager.addContainer(container.getProtobuf());
+      containerStateManager.updateContainerReplica(id, replicaOne);
+      containerStateManager.updateContainerReplica(id, replicaTwo);
+
+      // Since we have 2/3 replicas, the container should be closed on this
+      // iteration. The replica with the higher BCSID can be closed.
+      int currentCloseCommandCount = datanodeCommandHandler
+          .getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
+
+      replicationManager.processAll();
+      eventQueue.processAll(1000);
+
+      Assertions.assertEquals(currentCloseCommandCount + 1,
+          datanodeCommandHandler.getInvocationCount(
+              SCMCommandProto.Type.closeContainerCommand));
+
+      // The highest BCSID replica should have been closed.
+      Optional<CommandForDatanode> cmd =
+          datanodeCommandHandler.getReceivedCommands().stream().findFirst();
+      Assertions.assertTrue(cmd.isPresent());
+      Assertions.assertEquals(replicaOne.getDatanodeDetails().getUuid(),
+          cmd.get().getDatanodeId());
+
+      ReplicationManagerReport report = 
replicationManager.getContainerReport();
+      // Container will register as closed after the closed replica is
+      // reported in the next iteration.
+      Assertions.assertEquals(0,
+          report.getStat(LifeCycleState.CLOSED));
+      // Legacy replication manager does not count over/under replicated
+      // status until the container is eligible to be replicated.
+      Assertions.assertEquals(0, report.getStat(
+          ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+      Assertions.assertEquals(1,
+          report.getStat(LifeCycleState.QUASI_CLOSED));
+      Assertions.assertEquals(0, report.getStat(
+          ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
+
+      // Move the higher BCSID replica to closed state.
+      final ContainerReplica closedReplicaOne = getReplicas(
+          id, CLOSED, 1000L, replicaOne.getOriginDatanodeId(),
+          replicaOne.getDatanodeDetails());
+      containerStateManager.updateContainerReplica(id, closedReplicaOne);
+
+      // On the next iteration, the container should be replicated.
+      currentCloseCommandCount = datanodeCommandHandler
+          .getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
+      int currentReplicateCommandCount = datanodeCommandHandler
+          .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
+
+      replicationManager.processAll();
+      eventQueue.processAll(1000);
+      // No more close commands should be sent.
+      Assertions.assertEquals(currentCloseCommandCount,
+          datanodeCommandHandler.getInvocationCount(
+              SCMCommandProto.Type.closeContainerCommand));
+      // Two replicate commands should be triggered for the now closed
+      // container.
+      Assertions.assertEquals(currentReplicateCommandCount + 2,
+          datanodeCommandHandler.getInvocationCount(
+              SCMCommandProto.Type.replicateContainerCommand));
+      Assertions.assertEquals(currentReplicateCommandCount + 2,
+          replicationManager.getMetrics().getReplicationCmdsSentTotal());
+      Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
+      Assertions.assertEquals(1, replicationManager.getMetrics()
+          .getInflightReplication());
+
+      // Once the replicas are closed, moving the container to CLOSED state
+      // in SCM is done on the container report, not in the replication 
manager.
+    }
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to