This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 57b9770b42 HDDS-9661. Datanode should not need to download existing 
container (#5576)
57b9770b42 is described below

commit 57b9770b42d77a6402c540ecceebf16267d8145e
Author: Sumit Agrawal <[email protected]>
AuthorDate: Tue Nov 14 16:22:40 2023 +0530

    HDDS-9661. Datanode should not need to download existing container (#5576)
---
 .../container/replication/ContainerImporter.java   |  7 +-
 .../replication/SendContainerRequestHandler.java   | 11 +++
 .../TestSendContainerRequestHandler.java           | 88 ++++++++++++++++++++++
 3 files changed, 105 insertions(+), 1 deletion(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
index a90c5624cb..4b38430c59 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java
@@ -61,7 +61,7 @@ public class ContainerImporter {
   private final long containerSize;
 
   private final Set<Long> importContainerProgress
-      = Collections.synchronizedSet(new HashSet());
+      = Collections.synchronizedSet(new HashSet<>());
 
   public ContainerImporter(ConfigurationSource conf, ContainerSet containerSet,
       ContainerController controller,
@@ -79,6 +79,11 @@ public class ContainerImporter {
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
   }
 
+  public boolean isAllowedContainerImport(long containerID) {
+    return !importContainerProgress.contains(containerID) &&
+        containerSet.getContainer(containerID) == null;
+  }
+
   public void importContainer(long containerID, Path tarFilePath,
       HddsVolume hddsVolume, CopyContainerCompression compression)
       throws IOException {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
index 5711446af5..6bcd46ba0a 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerRequestHandler.java
@@ -17,8 +17,10 @@
  */
 package org.apache.hadoop.ozone.container.replication;
 
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerResponse;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
@@ -68,6 +70,15 @@ class SendContainerRequestHandler
 
       assertSame(nextOffset, req.getOffset(), "offset");
 
+      // check and avoid download of container file if target already have
+      // container data and import in progress
+      if (!importer.isAllowedContainerImport(req.getContainerID())) {
+        containerId = req.getContainerID();
+        throw new StorageContainerException("Container exists or " +
+            "import in progress with container Id " + req.getContainerID(),
+            ContainerProtos.Result.CONTAINER_EXISTS);
+      }
+
       if (containerId == -1) {
         containerId = req.getContainerID();
         volume = importer.chooseNextVolume();
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java
new file mode 100644
index 0000000000..f6b1512da9
--- /dev/null
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestSendContainerRequestHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.replication;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.junit.Assert;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.hadoop.ozone.container.replication.CopyContainerCompression.NO_COMPRESSION;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test for {@link SendContainerRequestHandler}.
+ */
+class TestSendContainerRequestHandler {
+
+  private OzoneConfiguration conf;
+
+  @BeforeEach
+  void setup() {
+    conf = new OzoneConfiguration();
+  }
+
+  @Test
+  void testReceiveDataForExistingContainer() throws Exception {
+    long containerId = 1;
+    // create containerImporter
+    ContainerSet containerSet = new ContainerSet(0);
+    MutableVolumeSet volumeSet = new MutableVolumeSet("test", conf, null,
+        StorageVolume.VolumeType.DATA_VOLUME, null);
+    ContainerImporter containerImporter = new ContainerImporter(conf,
+        new ContainerSet(0), mock(ContainerController.class), volumeSet);
+    KeyValueContainerData containerData = new 
KeyValueContainerData(containerId,
+        ContainerLayoutVersion.FILE_PER_BLOCK, 100, "test", "test");
+    // add container to container set
+    KeyValueContainer container = new KeyValueContainer(containerData, conf);
+    containerSet.addContainer(container);
+
+    StreamObserver observer = mock(StreamObserver.class);
+    doAnswer(invocation -> {
+      Object arg = invocation.getArgument(0);
+      Assert.assertTrue(arg instanceof StorageContainerException);
+      Assert.assertEquals(((StorageContainerException) arg).getResult(),
+          ContainerProtos.Result.CONTAINER_EXISTS);
+      return null;
+    }).when(observer).onError(any());
+    SendContainerRequestHandler sendContainerRequestHandler
+        = new SendContainerRequestHandler(containerImporter, observer);
+    ByteString data = ByteString.copyFromUtf8("test");
+    ContainerProtos.SendContainerRequest request
+        = ContainerProtos.SendContainerRequest.newBuilder()
+        .setContainerID(containerId)
+        .setData(data)
+        .setOffset(0)
+        .setCompression(NO_COMPRESSION.toProto())
+        .build();
+    sendContainerRequestHandler.onNext(request);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to