ChenSammi commented on a change in pull request #2360:
URL: https://github.com/apache/ozone/pull/2360#discussion_r685053664



##########
File path: 
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
##########
@@ -49,86 +60,158 @@
 
   private final ContainerSet containerSet;
 
-  private final ContainerController controller;
-
-  private final ContainerDownloader downloader;
-
-  private final TarContainerPacker packer;
+  private final ConfigurationSource config;
+  private final Supplier<String> clusterId;
+  private VolumeChoosingPolicy volumeChoosingPolicy;
+  private VolumeSet volumeSet;
+  private SslContext sslContext;
 
   public DownloadAndImportReplicator(
+      ConfigurationSource config,
+      Supplier<String> clusterId,
       ContainerSet containerSet,
-      ContainerController controller,
-      ContainerDownloader downloader,
-      TarContainerPacker packer) {
+      VolumeSet volumeSet,
+      SslContext sslContext
+  ) {
     this.containerSet = containerSet;
-    this.controller = controller;
-    this.downloader = downloader;
-    this.packer = packer;
-  }
-
-  public void importContainer(long containerID, Path tarFilePath)
-      throws IOException {
+    this.config = config;
+    this.clusterId = clusterId;
+    this.volumeSet = volumeSet;
+    this.sslContext = sslContext;
+    Class<? extends VolumeChoosingPolicy> volumeChoosingPolicyType = null;
     try {
-      ContainerData originalContainerData;
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-        byte[] containerDescriptorYaml =
-            packer.unpackContainerDescriptor(tempContainerTarStream);
-        originalContainerData = ContainerDataYaml.readContainer(
-            containerDescriptorYaml);
-      }
-
-      try (FileInputStream tempContainerTarStream = new FileInputStream(
-          tarFilePath.toFile())) {
-
-        Container container = controller.importContainer(
-            originalContainerData, tempContainerTarStream, packer);
-
-        containerSet.addContainer(container);
-      }
-
-    } finally {
-      try {
-        Files.delete(tarFilePath);
-      } catch (Exception ex) {
-        LOG.error("Got exception while deleting downloaded container file: "
-            + tarFilePath.toAbsolutePath().toString(), ex);
-      }
+      volumeChoosingPolicyType =
+          config.getClass(
+              HDDS_DATANODE_VOLUME_CHOOSING_POLICY,
+              RoundRobinVolumeChoosingPolicy
+                  .class, VolumeChoosingPolicy.class);
+
+      this.volumeChoosingPolicy = volumeChoosingPolicyType.newInstance();
+
+    } catch (InstantiationException | IllegalAccessException ex) {
+      throw new IllegalArgumentException(
+          "Couldn't create volume choosing policy: " + 
volumeChoosingPolicyType,
+          ex);
     }
   }
 
   @Override
   public void replicate(ReplicationTask task) {
     long containerID = task.getContainerId();
-
+    if (clusterId.get() == null) {
+      LOG.error("Replication task is called before first SCM call");
+      task.setStatus(Status.FAILED);
+    }
     List<DatanodeDetails> sourceDatanodes = task.getSources();
 
     LOG.info("Starting replication of container {} from {}", containerID,
         sourceDatanodes);
 
-    CompletableFuture<Path> tempTarFile = downloader
-        .getContainerDataFromReplicas(containerID,
-            sourceDatanodes);
-    if (tempTarFile == null) {
-      task.setStatus(Status.FAILED);
-    } else {
-      try {
-        // Wait for the download. This thread pool is limiting the parallel
-        // downloads, so it's ok to block here and wait for the full download.
-        Path path = tempTarFile.get();
-        long bytes = Files.size(path);
-
-        LOG.info("Container {} is downloaded with size {}, starting to 
import.",
-                containerID, bytes);
-        task.setTransferredBytes(bytes);
-
-        importContainer(containerID, path);
+    try {
+
+      long maxContainerSize = (long) config.getStorageSize(
+          ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+          ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
+
+      KeyValueContainerData containerData =
+          new KeyValueContainerData(containerID,
+              ChunkLayOutVersion.FILE_PER_BLOCK, maxContainerSize, "", "");
+
+      //choose a volume
+      final HddsVolume volume = volumeChoosingPolicy
+          .chooseVolume(
+              StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList()),
+              maxContainerSize);
+
+      //fill the path fields
+      containerData.assignToVolume(clusterId.get(), volume);
+
+      Collections.shuffle(sourceDatanodes);
+
+      //download data
+      final DatanodeDetails datanode = sourceDatanodes.get(0);
+
+      try (StreamingClient client =
+               new StreamingClient(datanode.getIpAddress(),
+                   datanode.getPort(Name.REPLICATION).getValue(),
+                   new ContainerStreamingDestination(containerData),
+                   sslContext)
+      ) {
+        client.stream("" + containerData.getContainerID());
+
+        LOG.info("Container {} is downloaded successfully", containerID);
+        KeyValueContainerData loadedContainerData =
+            updateContainerData(containerData);
+        LOG.info("Container {} is downloaded, starting to import.",
+            containerID);
+        importContainer(loadedContainerData);
         LOG.info("Container {} is replicated successfully", containerID);
         task.setStatus(Status.DONE);
-      } catch (Exception e) {
-        LOG.error("Container {} replication was unsuccessful.", containerID, 
e);
-        task.setStatus(Status.FAILED);
+
       }
+    } catch (IOException | RuntimeException ex) {
+      LOG.error("Error on replicating container " + containerID, ex);
+      task.setStatus(Status.FAILED);

Review comment:
       Should the partically downloaded container be deleted in case of failure?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to