This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 1ce118c635 HDDS-9254. Legacy replication manager uses mismatched
replicas as replication sources (#5257)
1ce118c635 is described below
commit 1ce118c635020a52fb174f596b65def19303f348
Author: Ethan Rose <[email protected]>
AuthorDate: Tue Sep 19 14:08:30 2023 -0700
HDDS-9254. Legacy replication manager uses mismatched replicas as
replication sources (#5257)
---
.../replication/LegacyReplicationManager.java | 10 +-
.../replication/TestLegacyReplicationManager.java | 184 ++++++++++++++++++++-
2 files changed, 192 insertions(+), 2 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
index 82b9d04b0d..a142dfff8d 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/LegacyReplicationManager.java
@@ -1138,8 +1138,16 @@ public class LegacyReplicationManager {
container.containerID());
}
+ State matchingReplicaState = State.CLOSED;
+ if (container.getState() == LifeCycleState.QUASI_CLOSED) {
+ // If we are replicating quasi closed replicas, they should have the
+ // same origin node ID and therefore the same BCSID. If they have
+ // different origin node IDs, then we have 2/3 containers and it should
+ // have been closed before replicating.
+ matchingReplicaState = State.QUASI_CLOSED;
+ }
List<ContainerReplica> replicationSources =
getReplicationSources(container,
- replicaSet.getReplicas(), State.CLOSED, State.QUASI_CLOSED);
+ replicaSet.getReplicas(), matchingReplicaState);
// This method will handle topology even if replicasNeeded <= 0.
try {
replicateAnyWithTopology(container, replicationSources,
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
index 4afe59dc0d..8dc816e13c 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestLegacyReplicationManager.java
@@ -66,8 +66,9 @@ import org.apache.hadoop.hdds.utils.db.LongCodec;
import
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
-import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.TestClock;
import org.junit.jupiter.api.Disabled;
@@ -838,6 +839,104 @@ public class TestLegacyReplicationManager {
SCMCommandProto.Type.deleteContainerCommand));
}
+ /**
+ * 2 closed replica
+ * 1 quasi-closed replicas
+ * SCM state is closed.
+ * Expectation: The replicate container command should only contain the
+ * closed replicas as sources.
+ */
+ @Test
+ public void testOnlyMatchingClosedReplicasReplicated()
+ throws IOException {
+ final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
+ final ContainerID id = container.containerID();
+ final UUID originNodeId = UUID.randomUUID();
+ final ContainerReplica quasiReplica = getReplicas(
+ id, QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+ final ContainerReplica closedReplica1 = getReplicas(
+ id, State.CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+ final DatanodeDetails datanodeDetails = randomDatanodeDetails();
+ final ContainerReplica closedReplica2 = getReplicas(
+ id, State.CLOSED, 1000L, datanodeDetails.getUuid(), datanodeDetails);
+
+ containerStateManager.addContainer(container.getProtobuf());
+ containerStateManager.updateContainerReplica(id, quasiReplica);
+ containerStateManager.updateContainerReplica(id, closedReplica1);
+ containerStateManager.updateContainerReplica(id, closedReplica2);
+
+ replicationManager.processAll();
+ eventQueue.processAll(1000);
+ Assertions.assertEquals(1,
+ datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.replicateContainerCommand));
+
+ Optional<CommandForDatanode> cmdOptional =
+ datanodeCommandHandler.getReceivedCommands().stream().findFirst();
+ Assertions.assertTrue(cmdOptional.isPresent());
+ SCMCommand<?> scmCmd = cmdOptional.get().getCommand();
+ Assertions.assertTrue(scmCmd instanceof ReplicateContainerCommand);
+ ReplicateContainerCommand repCmd = (ReplicateContainerCommand) scmCmd;
+
+ // Only the closed replicas should have been used as sources.
+ List<DatanodeDetails> repSources = repCmd.getSourceDatanodes();
+ Assertions.assertEquals(2, repSources.size());
+ Assertions.assertTrue(repSources.containsAll(
+ Arrays.asList(closedReplica1.getDatanodeDetails(),
+ closedReplica2.getDatanodeDetails())));
+ Assertions.assertFalse(
+ repSources.contains(quasiReplica.getDatanodeDetails()));
+ }
+
+ /**
+ * 2 quasi-closed replicas
+ * 1 unhealthy replica
+ * SCM state is quasi-closed.
+ * Expectation: The replicate container command should only contain the
+ * quasi-closed replicas as sources.
+ */
+ @Test
+ public void testOnlyMatchingQuasiClosedReplicasReplicated()
+ throws IOException {
+ final ContainerInfo container =
getContainer(LifeCycleState.QUASI_CLOSED);
+ final ContainerID id = container.containerID();
+ final UUID originNodeId = UUID.randomUUID();
+ final ContainerReplica quasiReplica1 = getReplicas(
+ id, QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+ final ContainerReplica quasiReplica2 = getReplicas(
+ id, QUASI_CLOSED, 1000L, originNodeId, randomDatanodeDetails());
+ final DatanodeDetails datanodeDetails = randomDatanodeDetails();
+ final ContainerReplica unhealthyReplica = getReplicas(
+ id, UNHEALTHY, 1000L, datanodeDetails.getUuid(), datanodeDetails);
+
+ containerStateManager.addContainer(container.getProtobuf());
+ containerStateManager.updateContainerReplica(id, quasiReplica1);
+ containerStateManager.updateContainerReplica(id, quasiReplica2);
+ containerStateManager.updateContainerReplica(id, unhealthyReplica);
+
+ replicationManager.processAll();
+ eventQueue.processAll(1000);
+ Assertions.assertEquals(1,
+ datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.replicateContainerCommand));
+
+ Optional<CommandForDatanode> cmdOptional =
+ datanodeCommandHandler.getReceivedCommands().stream().findFirst();
+ Assertions.assertTrue(cmdOptional.isPresent());
+ SCMCommand<?> scmCmd = cmdOptional.get().getCommand();
+ Assertions.assertTrue(scmCmd instanceof ReplicateContainerCommand);
+ ReplicateContainerCommand repCmd = (ReplicateContainerCommand) scmCmd;
+
+ // Only the quasi closed replicas should have been used as a sources.
+ List<DatanodeDetails> repSources = repCmd.getSourceDatanodes();
+ Assertions.assertEquals(2, repSources.size());
+ Assertions.assertTrue(repSources.containsAll(
+ Arrays.asList(quasiReplica1.getDatanodeDetails(),
+ quasiReplica2.getDatanodeDetails())));
+ Assertions.assertFalse(
+ repSources.contains(unhealthyReplica.getDatanodeDetails()));
+ }
+
/**
* Container is closed.
* 2 quasi-closed replicas.
@@ -1766,6 +1865,89 @@ public class TestLegacyReplicationManager {
Assertions.assertEquals(0, report.getStat(
ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
}
+
+ /**
+ * 2 quasi-closed replicas.
+ * 2 unique origin IDs with different BCSIDs.
+ * Expectation: Container is closed, then replicated on the next iteration.
+ */
+ @Test
+ public void testCloseableIsClosedBeforeReplication() throws IOException {
+ final ContainerInfo container =
getContainer(LifeCycleState.QUASI_CLOSED);
+ final ContainerID id = container.containerID();
+ final ContainerReplica replicaOne = getReplicas(
+ id, QUASI_CLOSED, 1000L, UUID.randomUUID(), randomDatanodeDetails());
+ final ContainerReplica replicaTwo = getReplicas(
+ id, QUASI_CLOSED, 900L, UUID.randomUUID(), randomDatanodeDetails());
+ containerStateManager.addContainer(container.getProtobuf());
+ containerStateManager.updateContainerReplica(id, replicaOne);
+ containerStateManager.updateContainerReplica(id, replicaTwo);
+
+ // Since we have 2/3 replicas, the container should be closed on this
+ // iteration. The replica with the higher BCSID can be closed.
+ int currentCloseCommandCount = datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
+
+ replicationManager.processAll();
+ eventQueue.processAll(1000);
+
+ Assertions.assertEquals(currentCloseCommandCount + 1,
+ datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.closeContainerCommand));
+
+ // The highest BCSID replica should have been closed.
+ Optional<CommandForDatanode> cmd =
+ datanodeCommandHandler.getReceivedCommands().stream().findFirst();
+ Assertions.assertTrue(cmd.isPresent());
+ Assertions.assertEquals(replicaOne.getDatanodeDetails().getUuid(),
+ cmd.get().getDatanodeId());
+
+ ReplicationManagerReport report =
replicationManager.getContainerReport();
+ // Container will register as closed after the closed replica is
+ // reported in the next iteration.
+ Assertions.assertEquals(0,
+ report.getStat(LifeCycleState.CLOSED));
+ // Legacy replication manager does not count over/under replicated
+ // status until the container is eligible to be replicated.
+ Assertions.assertEquals(0, report.getStat(
+ ReplicationManagerReport.HealthState.UNDER_REPLICATED));
+ Assertions.assertEquals(1,
+ report.getStat(LifeCycleState.QUASI_CLOSED));
+ Assertions.assertEquals(0, report.getStat(
+ ReplicationManagerReport.HealthState.QUASI_CLOSED_STUCK));
+
+ // Move the higher BCSID replica to closed state.
+ final ContainerReplica closedReplicaOne = getReplicas(
+ id, CLOSED, 1000L, replicaOne.getOriginDatanodeId(),
+ replicaOne.getDatanodeDetails());
+ containerStateManager.updateContainerReplica(id, closedReplicaOne);
+
+ // On the next iteration, the container should be replicated.
+ currentCloseCommandCount = datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.closeContainerCommand);
+ int currentReplicateCommandCount = datanodeCommandHandler
+ .getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
+
+ replicationManager.processAll();
+ eventQueue.processAll(1000);
+ // No more close commands should be sent.
+ Assertions.assertEquals(currentCloseCommandCount,
+ datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.closeContainerCommand));
+ // Two replicate commands should be triggered for the now closed
+ // container.
+ Assertions.assertEquals(currentReplicateCommandCount + 2,
+ datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.replicateContainerCommand));
+ Assertions.assertEquals(currentReplicateCommandCount + 2,
+ replicationManager.getMetrics().getReplicationCmdsSentTotal());
+ Assertions.assertEquals(1, getInflightCount(InflightType.REPLICATION));
+ Assertions.assertEquals(1, replicationManager.getMetrics()
+ .getInflightReplication());
+
+ // Once the replicas are closed, moving the container to CLOSED state
+ // in SCM is done on the container report, not in the replication
manager.
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]