[ 
https://issues.apache.org/jira/browse/HDFS-17475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17838832#comment-17838832
 ] 

ASF GitHub Bot commented on HDFS-17475:
---------------------------------------

ZanderXu commented on code in PR #6745:
URL: https://github.com/apache/hadoop/pull/6745#discussion_r1571614915


##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java:
##########
@@ -641,6 +663,207 @@ private void closeBlockReaders() {
 
   }
 
+  private class VerifyReadableCommand extends DebugCommand {

Review Comment:
   Can add some comments to describe this command.



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java:
##########
@@ -641,6 +663,207 @@ private void closeBlockReaders() {
 
   }
 
+  private class VerifyReadableCommand extends DebugCommand {
+    private DistributedFileSystem dfs;
+    private boolean suppressed = false;
+
+    VerifyReadableCommand() {
+      super("verifyReadable",
+          "verifyReadable "
+              + "[-path <path> | -input <input>] "
+              + "[-output <output>] "
+              + "[-concurrency <concurrency>] "
+              + "[-suppressed]",
+          "  Verify if one or multiple paths are fully readable and have no 
missing blocks.");
+    }
+
+    @Override
+    int run(List<String> args) throws IOException {
+      if (args.isEmpty()) {
+        System.out.println(usageText);
+        System.out.println(helpText + System.lineSeparator());
+        return 1;
+      }
+      dfs = AdminHelper.getDFS(getConf());
+      String pathStr = StringUtils.popOptionWithArgument("-path", args);
+      String inputStr = StringUtils.popOptionWithArgument("-input", args);
+      String outputStr = StringUtils.popOptionWithArgument("-output", args);
+      String concurrencyStr = 
StringUtils.popOptionWithArgument("-concurrency", args);
+      suppressed = StringUtils.popOption("-suppressed", args);
+      if (pathStr == null && inputStr == null) {
+        System.out.println("Either -path or -input must be present.");
+        System.out.println(usageText);
+        System.out.println(helpText + System.lineSeparator());
+        return 1;
+      }
+      try {
+        return handleArgs(pathStr, inputStr, outputStr, concurrencyStr);
+      } catch (Exception e) {
+        System.err.println(
+            "Got IOE: " + StringUtils.stringifyException(e) + " for command: " 
+ StringUtils.join(
+                ",", args));
+        return 1;
+      }
+    }
+
+    private int handleArgs(String pathStr, String inputStr, String outputStr, 
String concurrencyStr)
+        throws IOException, ExecutionException, InterruptedException {
+      BufferedWriter writer = null;
+      try {
+        if (outputStr != null) {
+          File output = new File(outputStr);
+          writer = new BufferedWriter(new 
OutputStreamWriter(Files.newOutputStream(output.toPath()),
+              StandardCharsets.UTF_8));
+        }
+
+        // -path takes priority over -input
+        if (pathStr != null) {
+          int result = handlePath(new Path(pathStr));

Review Comment:
   how about using `handlePaths()` method too?



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java:
##########
@@ -641,6 +663,207 @@ private void closeBlockReaders() {
 
   }
 
+  private class VerifyReadableCommand extends DebugCommand {
+    private DistributedFileSystem dfs;
+    private boolean suppressed = false;
+
+    VerifyReadableCommand() {
+      super("verifyReadable",
+          "verifyReadable "
+              + "[-path <path> | -input <input>] "
+              + "[-output <output>] "
+              + "[-concurrency <concurrency>] "
+              + "[-suppressed]",
+          "  Verify if one or multiple paths are fully readable and have no 
missing blocks.");
+    }
+
+    @Override
+    int run(List<String> args) throws IOException {
+      if (args.isEmpty()) {
+        System.out.println(usageText);
+        System.out.println(helpText + System.lineSeparator());
+        return 1;
+      }
+      dfs = AdminHelper.getDFS(getConf());
+      String pathStr = StringUtils.popOptionWithArgument("-path", args);
+      String inputStr = StringUtils.popOptionWithArgument("-input", args);
+      String outputStr = StringUtils.popOptionWithArgument("-output", args);
+      String concurrencyStr = 
StringUtils.popOptionWithArgument("-concurrency", args);
+      suppressed = StringUtils.popOption("-suppressed", args);
+      if (pathStr == null && inputStr == null) {
+        System.out.println("Either -path or -input must be present.");
+        System.out.println(usageText);
+        System.out.println(helpText + System.lineSeparator());
+        return 1;
+      }
+      try {
+        return handleArgs(pathStr, inputStr, outputStr, concurrencyStr);
+      } catch (Exception e) {
+        System.err.println(
+            "Got IOE: " + StringUtils.stringifyException(e) + " for command: " 
+ StringUtils.join(
+                ",", args));
+        return 1;
+      }
+    }
+
+    private int handleArgs(String pathStr, String inputStr, String outputStr, 
String concurrencyStr)
+        throws IOException, ExecutionException, InterruptedException {
+      BufferedWriter writer = null;
+      try {
+        if (outputStr != null) {
+          File output = new File(outputStr);
+          writer = new BufferedWriter(new 
OutputStreamWriter(Files.newOutputStream(output.toPath()),
+              StandardCharsets.UTF_8));
+        }
+
+        // -path takes priority over -input
+        if (pathStr != null) {
+          int result = handlePath(new Path(pathStr));
+          writeToOutput(writer, pathStr, result);
+          return result;
+        }
+
+        // -input must be defined by this point
+        File input = new File(inputStr);
+        if (!input.exists()) {
+          return 1;
+        }
+        BufferedReader reader = new BufferedReader(
+            new InputStreamReader(Files.newInputStream(input.toPath()), 
StandardCharsets.UTF_8));
+        Set<Path> paths = new HashSet<>();
+        String line;
+        while ((line = reader.readLine()) != null) {
+          paths.add(new Path(line.trim()));
+        }
+        reader.close();
+        int concurrency = concurrencyStr == null ? 1 : 
Integer.parseInt(concurrencyStr);
+        return handlePaths(paths, writer, concurrency);
+      } finally {
+        if (writer != null) {
+          writer.flush();
+          writer.close();
+        }
+      }
+    }
+
+    private void writeToOutput(BufferedWriter writer, String path, int result) 
throws IOException {
+      if (writer == null) {
+        return;
+      }
+      writer.write(path);
+      writer.write(" ");
+      writer.write(String.valueOf(result));
+      writer.write("\n");
+      writer.flush();
+    }
+
+    private int handlePaths(Set<Path> paths, BufferedWriter writer, int 
concurrency)
+        throws ExecutionException, InterruptedException, IOException {
+      int total = paths.size();
+      long start = Time.monotonicNow();
+      ExecutorService threadPool = Executors.newFixedThreadPool(concurrency);
+      List<Callable<Pair<Path, Integer>>> tasks = new ArrayList<>();
+      for (Path path : paths) {
+        tasks.add(() -> Pair.of(path, handlePath(path)));
+      }
+      List<Future<Pair<Path, Integer>>> futures =
+          tasks.stream().map(threadPool::submit).collect(Collectors.toList());
+
+      boolean failed = false;
+      int done = 0;
+      for (Future<Pair<Path, Integer>> future : futures) {
+        done++;
+        if (done % 1000 == 0) {
+          long elapsed = Time.monotonicNow() - start;
+          double rate = (double) done / elapsed * 1000;
+          String msg = "Progress: %d/%d, elapsed: %d ms, rate: %5.2f 
files/s%n";
+          System.out.printf(msg, done, total, elapsed, rate);
+        }
+        writeToOutput(writer, future.get().getLeft().toString(), 
future.get().getRight());
+        failed |= future.get().getRight() != 0;
+      }
+      return failed ? 1 : 0;
+    }
+
+    private int handlePath(Path path) {
+
+      HdfsBlockLocation[] locs;
+      try {
+        locs = (HdfsBlockLocation[]) dfs.getFileBlockLocations(path, 0,
+            dfs.getFileStatus(path).getLen());

Review Comment:
   here can use Long.MAX simply.



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java:
##########
@@ -641,6 +663,207 @@ private void closeBlockReaders() {
 
   }
 
+  private class VerifyReadableCommand extends DebugCommand {
+    private DistributedFileSystem dfs;
+    private boolean suppressed = false;
+
+    VerifyReadableCommand() {
+      super("verifyReadable",
+          "verifyReadable "
+              + "[-path <path> | -input <input>] "
+              + "[-output <output>] "
+              + "[-concurrency <concurrency>] "
+              + "[-suppressed]",
+          "  Verify if one or multiple paths are fully readable and have no 
missing blocks.");
+    }
+
+    @Override
+    int run(List<String> args) throws IOException {
+      if (args.isEmpty()) {
+        System.out.println(usageText);
+        System.out.println(helpText + System.lineSeparator());
+        return 1;
+      }
+      dfs = AdminHelper.getDFS(getConf());
+      String pathStr = StringUtils.popOptionWithArgument("-path", args);
+      String inputStr = StringUtils.popOptionWithArgument("-input", args);
+      String outputStr = StringUtils.popOptionWithArgument("-output", args);
+      String concurrencyStr = 
StringUtils.popOptionWithArgument("-concurrency", args);
+      suppressed = StringUtils.popOption("-suppressed", args);
+      if (pathStr == null && inputStr == null) {
+        System.out.println("Either -path or -input must be present.");
+        System.out.println(usageText);
+        System.out.println(helpText + System.lineSeparator());
+        return 1;
+      }
+      try {
+        return handleArgs(pathStr, inputStr, outputStr, concurrencyStr);
+      } catch (Exception e) {
+        System.err.println(
+            "Got IOE: " + StringUtils.stringifyException(e) + " for command: " 
+ StringUtils.join(
+                ",", args));
+        return 1;
+      }
+    }
+
+    private int handleArgs(String pathStr, String inputStr, String outputStr, 
String concurrencyStr)
+        throws IOException, ExecutionException, InterruptedException {
+      BufferedWriter writer = null;
+      try {
+        if (outputStr != null) {
+          File output = new File(outputStr);
+          writer = new BufferedWriter(new 
OutputStreamWriter(Files.newOutputStream(output.toPath()),
+              StandardCharsets.UTF_8));
+        }
+
+        // -path takes priority over -input
+        if (pathStr != null) {
+          int result = handlePath(new Path(pathStr));
+          writeToOutput(writer, pathStr, result);
+          return result;
+        }
+
+        // -input must be defined by this point
+        File input = new File(inputStr);
+        if (!input.exists()) {
+          return 1;
+        }
+        BufferedReader reader = new BufferedReader(
+            new InputStreamReader(Files.newInputStream(input.toPath()), 
StandardCharsets.UTF_8));
+        Set<Path> paths = new HashSet<>();
+        String line;
+        while ((line = reader.readLine()) != null) {
+          paths.add(new Path(line.trim()));
+        }
+        reader.close();
+        int concurrency = concurrencyStr == null ? 1 : 
Integer.parseInt(concurrencyStr);
+        return handlePaths(paths, writer, concurrency);
+      } finally {
+        if (writer != null) {
+          writer.flush();
+          writer.close();
+        }
+      }
+    }
+
+    private void writeToOutput(BufferedWriter writer, String path, int result) 
throws IOException {

Review Comment:
   how about writing more detailed data to this writer? such as:
   
   1. path  result
   2. number of blocks have been verified
   3. detailed blocks lists and the verify result for each block
   
   or you can add a parameter to control it 



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java:
##########
@@ -641,6 +663,207 @@ private void closeBlockReaders() {
 
   }
 
+  private class VerifyReadableCommand extends DebugCommand {
+    private DistributedFileSystem dfs;
+    private boolean suppressed = false;
+
+    VerifyReadableCommand() {
+      super("verifyReadable",
+          "verifyReadable "
+              + "[-path <path> | -input <input>] "

Review Comment:
   can add some descriptions for these parameters.



##########
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java:
##########
@@ -641,6 +663,207 @@ private void closeBlockReaders() {
 
   }
 
+  private class VerifyReadableCommand extends DebugCommand {
+    private DistributedFileSystem dfs;
+    private boolean suppressed = false;
+
+    VerifyReadableCommand() {
+      super("verifyReadable",
+          "verifyReadable "
+              + "[-path <path> | -input <input>] "
+              + "[-output <output>] "
+              + "[-concurrency <concurrency>] "
+              + "[-suppressed]",
+          "  Verify if one or multiple paths are fully readable and have no 
missing blocks.");
+    }
+
+    @Override
+    int run(List<String> args) throws IOException {
+      if (args.isEmpty()) {
+        System.out.println(usageText);
+        System.out.println(helpText + System.lineSeparator());
+        return 1;
+      }
+      dfs = AdminHelper.getDFS(getConf());
+      String pathStr = StringUtils.popOptionWithArgument("-path", args);
+      String inputStr = StringUtils.popOptionWithArgument("-input", args);
+      String outputStr = StringUtils.popOptionWithArgument("-output", args);
+      String concurrencyStr = 
StringUtils.popOptionWithArgument("-concurrency", args);
+      suppressed = StringUtils.popOption("-suppressed", args);
+      if (pathStr == null && inputStr == null) {
+        System.out.println("Either -path or -input must be present.");
+        System.out.println(usageText);
+        System.out.println(helpText + System.lineSeparator());
+        return 1;
+      }
+      try {
+        return handleArgs(pathStr, inputStr, outputStr, concurrencyStr);
+      } catch (Exception e) {
+        System.err.println(
+            "Got IOE: " + StringUtils.stringifyException(e) + " for command: " 
+ StringUtils.join(
+                ",", args));
+        return 1;
+      }
+    }
+
+    private int handleArgs(String pathStr, String inputStr, String outputStr, 
String concurrencyStr)

Review Comment:
   maybe you can abstract this method as the following steps to make it clear
   
   1. prepare the paths need to be verified
   2. verify these paths





> Add a command to check if files are readable
> --------------------------------------------
>
>                 Key: HDFS-17475
>                 URL: https://issues.apache.org/jira/browse/HDFS-17475
>             Project: Hadoop HDFS
>          Issue Type: Improvement
>          Components: hdfs
>            Reporter: Felix N
>            Assignee: Felix N
>            Priority: Minor
>              Labels: pull-request-available
>
> Sometimes a job can fail due to one unreadable file down the line due to 
> missing replicas or dead DNs or other reason. This command should allow users 
> to check whether files are readable by checking for metadata on DNs without 
> executing full read pipelines of the files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to