This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new dc03783161 HDDS-8145. ReadReplicas should close client (#4387)
dc03783161 is described below

commit dc037831619c06e99194fd83c5e11bd960270708
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Thu Mar 16 12:31:41 2023 +0100

    HDDS-8145. ReadReplicas should close client (#4387)
---
 .../apache/hadoop/ozone/debug/ReadReplicas.java    | 167 +++++++++++----------
 1 file changed, 89 insertions(+), 78 deletions(-)

diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ReadReplicas.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ReadReplicas.java
index 77798b4b22..3b2c2efede 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ReadReplicas.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/ReadReplicas.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.cli.SubcommandWithParent;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientException;
 import org.apache.hadoop.ozone.client.OzoneKeyDetails;
@@ -35,20 +36,24 @@ import 
org.apache.hadoop.ozone.common.OzoneChecksumException;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.shell.OzoneAddress;
 import org.apache.hadoop.ozone.shell.keys.KeyHandler;
-import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
 import org.jetbrains.annotations.NotNull;
 import org.kohsuke.MetaInfServices;
 import picocli.CommandLine;
 
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.Map;
 
+import static java.util.Collections.emptyMap;
+
 /**
  * Class that downloads every replica for all the blocks associated with a
  * given key. It also generates a manifest file with information about the
@@ -79,9 +84,6 @@ public class ReadReplicas extends KeyHandler implements 
SubcommandWithParent {
   private static final String JSON_PROPERTY_REPLICA_UUID = "uuid";
   private static final String JSON_PROPERTY_REPLICA_EXCEPTION = "exception";
 
-  private ClientProtocol clientProtocol;
-  private ClientProtocol clientProtocolWithoutChecksum;
-
   @Override
   public Class<?> getParentType() {
     return OzoneDebug.class;
@@ -91,56 +93,64 @@ public class ReadReplicas extends KeyHandler implements 
SubcommandWithParent {
   protected void execute(OzoneClient client, OzoneAddress address)
       throws IOException, OzoneClientException {
 
+    address.ensureKeyAddress();
+
     boolean isChecksumVerifyEnabled
         = getConf().getBoolean("ozone.client.verify.checksum", true);
     OzoneConfiguration configuration = new OzoneConfiguration(getConf());
     configuration.setBoolean("ozone.client.verify.checksum",
         !isChecksumVerifyEnabled);
 
-    if (isChecksumVerifyEnabled) {
-      clientProtocol = client.getObjectStore().getClientProxy();
-      clientProtocolWithoutChecksum = new RpcClient(configuration, null);
-    } else {
-      clientProtocol = new RpcClient(configuration, null);
-      clientProtocolWithoutChecksum = client.getObjectStore().getClientProxy();
-    }
+    RpcClient newClient = new RpcClient(configuration, null);
+    try {
+      ClientProtocol noChecksumClient;
+      ClientProtocol checksumClient;
+      if (isChecksumVerifyEnabled) {
+        checksumClient = client.getObjectStore().getClientProxy();
+        noChecksumClient = newClient;
+      } else {
+        checksumClient = newClient;
+        noChecksumClient = client.getObjectStore().getClientProxy();
+      }
 
-    address.ensureKeyAddress();
-    String volumeName = address.getVolumeName();
-    String bucketName = address.getBucketName();
-    String keyName = address.getKeyName();
+      String volumeName = address.getVolumeName();
+      String bucketName = address.getBucketName();
+      String keyName = address.getKeyName();
 
-    String directoryName = createDirectory(volumeName, bucketName, keyName);
+      File dir = createDirectory(volumeName, bucketName, keyName);
 
-    OzoneKeyDetails keyInfoDetails
-        = clientProtocol.getKeyDetails(volumeName, bucketName, keyName);
+      OzoneKeyDetails keyInfoDetails
+          = checksumClient.getKeyDetails(volumeName, bucketName, keyName);
 
-    Map<OmKeyLocationInfo, Map<DatanodeDetails, OzoneInputStream>> replicas
-        = clientProtocol.getKeysEveryReplicas(volumeName, bucketName, keyName);
+      Map<OmKeyLocationInfo, Map<DatanodeDetails, OzoneInputStream>> replicas =
+          checksumClient.getKeysEveryReplicas(volumeName, bucketName, keyName);
 
-    Map<OmKeyLocationInfo, Map<DatanodeDetails, OzoneInputStream>>
-        replicasWithoutChecksum = clientProtocolWithoutChecksum
-        .getKeysEveryReplicas(volumeName, bucketName, keyName);
+      Map<OmKeyLocationInfo, Map<DatanodeDetails, OzoneInputStream>>
+          replicasWithoutChecksum = noChecksumClient
+          .getKeysEveryReplicas(volumeName, bucketName, keyName);
 
-    JsonObject result = new JsonObject();
-    result.addProperty(JSON_PROPERTY_FILE_NAME,
-        volumeName + "/" + bucketName + "/" + keyName);
-    result.addProperty(JSON_PROPERTY_FILE_SIZE, keyInfoDetails.getDataSize());
+      JsonObject result = new JsonObject();
+      result.addProperty(JSON_PROPERTY_FILE_NAME,
+          volumeName + "/" + bucketName + "/" + keyName);
+      result.addProperty(JSON_PROPERTY_FILE_SIZE, 
keyInfoDetails.getDataSize());
 
-    JsonArray blocks = new JsonArray();
-    downloadReplicasAndCreateManifest(keyName, replicas,
-        replicasWithoutChecksum, directoryName, blocks);
-    result.add(JSON_PROPERTY_FILE_BLOCKS, blocks);
+      JsonArray blocks = new JsonArray();
+      downloadReplicasAndCreateManifest(keyName, replicas,
+          replicasWithoutChecksum, dir, blocks);
+      result.add(JSON_PROPERTY_FILE_BLOCKS, blocks);
 
-    Gson gson = new GsonBuilder().setPrettyPrinting().create();
-    String prettyJson = gson.toJson(result);
+      Gson gson = new GsonBuilder().setPrettyPrinting().create();
+      String prettyJson = gson.toJson(result);
 
-    String manifestFileName = keyName + "_manifest";
-    System.out.println("Writing manifest file : " + manifestFileName);
-    File manifestFile
-        = new File(outputDir + "/" + directoryName + "/" + manifestFileName);
-    Files.write(manifestFile.toPath(),
-        prettyJson.getBytes(StandardCharsets.UTF_8));
+      String manifestFileName = keyName + "_manifest";
+      System.out.println("Writing manifest file : " + manifestFileName);
+      File manifestFile
+          = new File(dir, manifestFileName);
+      Files.write(manifestFile.toPath(),
+          prettyJson.getBytes(StandardCharsets.UTF_8));
+    } finally {
+      newClient.close();
+    }
   }
 
   private void downloadReplicasAndCreateManifest(
@@ -148,7 +158,7 @@ public class ReadReplicas extends KeyHandler implements 
SubcommandWithParent {
       Map<OmKeyLocationInfo, Map<DatanodeDetails, OzoneInputStream>> replicas,
       Map<OmKeyLocationInfo, Map<DatanodeDetails, OzoneInputStream>>
           replicasWithoutChecksum,
-      String directoryName, JsonArray blocks) throws IOException {
+      File dir, JsonArray blocks) throws IOException {
     int blockIndex = 0;
 
     for (Map.Entry<OmKeyLocationInfo, Map<DatanodeDetails, OzoneInputStream>>
@@ -158,85 +168,86 @@ public class ReadReplicas extends KeyHandler implements 
SubcommandWithParent {
 
       blockIndex += 1;
       blockJson.addProperty(JSON_PROPERTY_BLOCK_INDEX, blockIndex);
+      OmKeyLocationInfo locationInfo = block.getKey();
       blockJson.addProperty(JSON_PROPERTY_BLOCK_CONTAINERID,
-          block.getKey().getContainerID());
+          locationInfo.getContainerID());
       blockJson.addProperty(JSON_PROPERTY_BLOCK_LOCALID,
-          block.getKey().getLocalID());
+          locationInfo.getLocalID());
       blockJson.addProperty(JSON_PROPERTY_BLOCK_LENGTH,
-          block.getKey().getLength());
+          locationInfo.getLength());
       blockJson.addProperty(JSON_PROPERTY_BLOCK_OFFSET,
-          block.getKey().getOffset());
+          locationInfo.getOffset());
+
+      BlockID blockID = locationInfo.getBlockID();
+      Map<DatanodeDetails, OzoneInputStream> blockReplicasWithoutChecksum =
+          replicasOf(blockID, replicasWithoutChecksum);
 
       for (Map.Entry<DatanodeDetails, OzoneInputStream>
           replica : block.getValue().entrySet()) {
+        DatanodeDetails datanode = replica.getKey();
+
         JsonObject replicaJson = new JsonObject();
 
         replicaJson.addProperty(JSON_PROPERTY_REPLICA_HOSTNAME,
-            replica.getKey().getHostName());
+            datanode.getHostName());
         replicaJson.addProperty(JSON_PROPERTY_REPLICA_UUID,
-            replica.getKey().getUuidString());
+            datanode.getUuidString());
 
-        OzoneInputStream is = replica.getValue();
         String fileName = keyName + "_block" + blockIndex + "_" +
-            replica.getKey().getHostName();
+            datanode.getHostName();
         System.out.println("Writing : " + fileName);
-        File replicaFile
-            = new File(outputDir + "/" + directoryName + "/" + fileName);
+        Path path = new File(dir, fileName).toPath();
 
-        try {
-          Files.copy(is, replicaFile.toPath(),
-              StandardCopyOption.REPLACE_EXISTING);
+        try (InputStream is = replica.getValue()) {
+          Files.copy(is, path, StandardCopyOption.REPLACE_EXISTING);
         } catch (IOException e) {
           Throwable cause = e.getCause();
           replicaJson.addProperty(JSON_PROPERTY_REPLICA_EXCEPTION,
               e.getMessage());
           if (cause instanceof OzoneChecksumException) {
-            BlockID blockID = block.getKey().getBlockID();
-            String datanodeUUID = replica.getKey().getUuidString();
-            is = getInputStreamWithoutChecksum(replicasWithoutChecksum,
-                datanodeUUID, blockID);
-            Files.copy(is, replicaFile.toPath(),
-                StandardCopyOption.REPLACE_EXISTING);
-          } else if (cause instanceof StatusRuntimeException) {
-            break;
+            try (InputStream is = getReplica(
+                blockReplicasWithoutChecksum, datanode)) {
+              Files.copy(is, path, StandardCopyOption.REPLACE_EXISTING);
+            }
           }
-        } finally {
-          is.close();
         }
         replicasJson.add(replicaJson);
       }
       blockJson.add(JSON_PROPERTY_BLOCK_REPLICAS, replicasJson);
       blocks.add(blockJson);
+
+      blockReplicasWithoutChecksum.values()
+          .forEach(each -> IOUtils.close(LOG, each));
     }
   }
 
-  private OzoneInputStream getInputStreamWithoutChecksum(
-      Map<OmKeyLocationInfo, Map<DatanodeDetails, OzoneInputStream>>
-          replicasWithoutChecksum, String datanodeUUID, BlockID blockID) {
-    OzoneInputStream is = new OzoneInputStream();
+  private Map<DatanodeDetails, OzoneInputStream> replicasOf(BlockID blockID,
+      Map<OmKeyLocationInfo, Map<DatanodeDetails, OzoneInputStream>> replicas) 
{
     for (Map.Entry<OmKeyLocationInfo, Map<DatanodeDetails, OzoneInputStream>>
-        block : replicasWithoutChecksum.entrySet()) {
+        block : replicas.entrySet()) {
       if (block.getKey().getBlockID().equals(blockID)) {
-        for (Map.Entry<DatanodeDetails, OzoneInputStream>
-            replica : block.getValue().entrySet()) {
-          if (replica.getKey().getUuidString().equals(datanodeUUID)) {
-            is = replica.getValue();
-          }
-        }
+        return block.getValue();
       }
     }
-    return is;
+    return emptyMap();
+  }
+
+  private InputStream getReplica(
+      Map<DatanodeDetails, OzoneInputStream> replicas, DatanodeDetails datanode
+  ) {
+    InputStream input = replicas.remove(datanode);
+    return input != null ? input : new ByteArrayInputStream(new byte[0]);
   }
 
   @NotNull
-  private String createDirectory(String volumeName, String bucketName,
+  private File createDirectory(String volumeName, String bucketName,
                                  String keyName) throws IOException {
     String fileSuffix
         = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
     String directoryName = volumeName + "_" + bucketName + "_" + keyName +
         "_" + fileSuffix;
     System.out.println("Creating directory : " + directoryName);
-    File dir = new File(outputDir + "/" + directoryName);
+    File dir = new File(outputDir, directoryName);
     if (!dir.exists()) {
       if (dir.mkdir()) {
         System.out.println("Successfully created!");
@@ -245,6 +256,6 @@ public class ReadReplicas extends KeyHandler implements 
SubcommandWithParent {
             "Failed to create directory %s.", dir));
       }
     }
-    return directoryName;
+    return dir;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to