This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 9df3916614 HDDS-7864. Add integration test for replication (#4238)
9df3916614 is described below
commit 9df39166146d8bff00120dad01b1841d3d6d4832
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Mon Feb 6 11:38:41 2023 +0100
HDDS-7864. Add integration test for replication (#4238)
---
.../hdds/scm/storage/ContainerProtocolCalls.java | 6 +-
.../replication/DownloadAndImportReplicator.java | 10 +-
.../container/replication/PushReplicator.java | 9 +-
.../replication/TestContainerReplication.java | 173 +++++++++++++++++++++
.../ozone/container/replication/package-info.java | 22 +++
5 files changed, 211 insertions(+), 9 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index d0999268be..b921d4c897 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -440,7 +440,7 @@ public final class ContainerProtocolCalls {
public static void createRecoveringContainer(XceiverClientSpi client,
long containerID, String encodedToken, int replicaIndex)
throws IOException {
- createContainerInternal(client, containerID, encodedToken,
+ createContainer(client, containerID, encodedToken,
ContainerProtos.ContainerDataProto.State.RECOVERING, replicaIndex);
}
@@ -453,7 +453,7 @@ public final class ContainerProtocolCalls {
*/
public static void createContainer(XceiverClientSpi client, long containerID,
String encodedToken) throws IOException {
- createContainerInternal(client, containerID, encodedToken, null, 0);
+ createContainer(client, containerID, encodedToken, null, 0);
}
/**
* createContainer call that creates a container on the datanode.
@@ -464,7 +464,7 @@ public final class ContainerProtocolCalls {
* @param replicaIndex - index position of the container replica
* @throws IOException
*/
- private static void createContainerInternal(XceiverClientSpi client,
+ public static void createContainer(XceiverClientSpi client,
long containerID, String encodedToken,
ContainerProtos.ContainerDataProto.State state, int replicaIndex)
throws IOException {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
index 5b5f954afd..2589c0de5b 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
@@ -42,19 +42,19 @@ public class DownloadAndImportReplicator implements
ContainerReplicator {
public static final Logger LOG =
LoggerFactory.getLogger(DownloadAndImportReplicator.class);
+ private final ConfigurationSource conf;
private final ContainerDownloader downloader;
private final ContainerImporter containerImporter;
private final ContainerSet containerSet;
- private final CopyContainerCompression compression;
public DownloadAndImportReplicator(
ConfigurationSource conf, ContainerSet containerSet,
ContainerImporter containerImporter,
ContainerDownloader downloader) {
+ this.conf = conf;
this.containerSet = containerSet;
this.downloader = downloader;
this.containerImporter = containerImporter;
- compression = CopyContainerCompression.getConf(conf);
}
@Override
@@ -67,9 +67,11 @@ public class DownloadAndImportReplicator implements
ContainerReplicator {
}
List<DatanodeDetails> sourceDatanodes = task.getSources();
+ CopyContainerCompression compression =
+ CopyContainerCompression.getConf(conf);
- LOG.info("Starting replication of container {} from {}", containerID,
- sourceDatanodes);
+ LOG.info("Starting replication of container {} from {} using {}",
+ containerID, sourceDatanodes, compression);
try {
HddsVolume targetVolume = containerImporter.chooseNextVolume();
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
index eb9aafff95..54675cbbf3 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
@@ -36,15 +36,15 @@ public class PushReplicator implements ContainerReplicator {
private static final Logger LOG =
LoggerFactory.getLogger(PushReplicator.class);
+ private final ConfigurationSource conf;
private final ContainerReplicationSource source;
private final ContainerUploader uploader;
- private final CopyContainerCompression compression;
public PushReplicator(ConfigurationSource conf,
ContainerReplicationSource source, ContainerUploader uploader) {
+ this.conf = conf;
this.source = source;
this.uploader = uploader;
- compression = CopyContainerCompression.getConf(conf);
}
@Override
@@ -52,6 +52,11 @@ public class PushReplicator implements ContainerReplicator {
long containerID = task.getContainerId();
DatanodeDetails target = task.getTarget();
CompletableFuture<Void> fut = new CompletableFuture<>();
+ CopyContainerCompression compression =
+ CopyContainerCompression.getConf(conf);
+
+ LOG.info("Starting replication of container {} to {} using {}",
+ containerID, target, compression);
source.prepare(containerID);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplication.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplication.java
new file mode 100644
index 0000000000..9a81f67351
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplication.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.replication;
+
+import static java.util.Collections.singleton;
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.pipeline.MockPipeline.createPipeline;
+import static
org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.createContainer;
+import static org.apache.ozone.test.GenericTestUtils.waitFor;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import
org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/**
+ * Tests ozone containers replication.
+ */
+@Timeout(300)
+class TestContainerReplication {
+
+ private static final AtomicLong CONTAINER_ID = new AtomicLong();
+
+ private static MiniOzoneCluster cluster;
+
+ private static XceiverClientFactory clientFactory;
+
+ @BeforeAll
+ static void setup() throws Exception {
+ OzoneConfiguration conf = createConfiguration();
+ CopyContainerCompression[] compressions =
CopyContainerCompression.values();
+ final int count = compressions.length;
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(count)
+ .setStartDataNodes(false)
+ .build();
+ List<HddsDatanodeService> datanodes = cluster.getHddsDatanodes();
+ for (int i = 0; i < count; ++i) {
+ compressions[i].setOn(datanodes.get(i).getConf());
+ }
+ cluster.startHddsDatanodes();
+ cluster.waitForClusterToBeReady();
+
+ clientFactory = new XceiverClientManager(conf);
+ }
+
+ @AfterAll
+ static void tearDown() throws IOException {
+ if (clientFactory != null) {
+ clientFactory.close();
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource
+ void testPush(CopyContainerCompression compression) throws Exception {
+ final int index = compression.ordinal();
+ DatanodeDetails source = cluster.getHddsDatanodes().get(index)
+ .getDatanodeDetails();
+ long containerID = createNewClosedContainer(source);
+ DatanodeDetails target = selectOtherNode(source);
+ ReplicateContainerCommand cmd =
+ ReplicateContainerCommand.toTarget(containerID, target);
+ queueAndWaitForSuccess(cmd, source);
+ }
+
+ @ParameterizedTest
+ @EnumSource
+ void testPull(CopyContainerCompression compression) throws Exception {
+ final int index = compression.ordinal();
+ DatanodeDetails target = cluster.getHddsDatanodes().get(index)
+ .getDatanodeDetails();
+ DatanodeDetails source = selectOtherNode(target);
+ long containerID = createNewClosedContainer(source);
+ ReplicateContainerCommand cmd =
+ ReplicateContainerCommand.fromSources(containerID,
+ ImmutableList.of(source));
+ queueAndWaitForSuccess(cmd, target);
+ }
+
+ private void queueAndWaitForSuccess(ReplicateContainerCommand cmd,
+ DatanodeDetails dn)
+ throws IOException, InterruptedException, TimeoutException {
+
+ DatanodeStateMachine datanodeStateMachine =
+ cluster.getHddsDatanode(dn).getDatanodeStateMachine();
+ final ReplicationSupervisor supervisor =
+ datanodeStateMachine.getSupervisor();
+ final long replicationCount = supervisor.getReplicationSuccessCount();
+ StateContext context = datanodeStateMachine.getContext();
+ context.getTermOfLeaderSCM().ifPresent(cmd::setTerm);
+ context.addCommand(cmd);
+ waitFor(
+ () -> supervisor.getReplicationSuccessCount() == replicationCount + 1,
+ 100, 30000);
+ }
+
+ private DatanodeDetails selectOtherNode(DatanodeDetails source)
+ throws IOException {
+ int sourceIndex = cluster.getHddsDatanodeIndex(source);
+ int targetIndex = IntStream.range(0, cluster.getHddsDatanodes().size())
+ .filter(index -> index != sourceIndex)
+ .findAny()
+ .orElseThrow(() -> new AssertionError("no target datanode found"));
+ return cluster.getHddsDatanodes().get(targetIndex).getDatanodeDetails();
+ }
+
+ private static OzoneConfiguration createConfiguration() {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+ conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, TimeUnit.SECONDS);
+
+ ReplicationManagerConfiguration repConf =
+ conf.getObject(ReplicationManagerConfiguration.class);
+ repConf.setInterval(Duration.ofSeconds(1));
+ conf.setFromObject(repConf);
+ return conf;
+ }
+
+ private static long createNewClosedContainer(DatanodeDetails dn)
+ throws Exception {
+ long containerID = CONTAINER_ID.incrementAndGet();
+ try (XceiverClientSpi client = clientFactory.acquireClient(
+ createPipeline(singleton(dn)))) {
+ createContainer(client, containerID, null, CLOSED, 0);
+ return containerID;
+ }
+ }
+
+}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
new file mode 100644
index 0000000000..b69c42c84e
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+/**
+ * Test container replication.
+ */
+package org.apache.hadoop.ozone.container.replication;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]