[ https://issues.apache.org/jira/browse/HDFS-17475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17838832#comment-17838832 ]
ASF GitHub Bot commented on HDFS-17475: --------------------------------------- ZanderXu commented on code in PR #6745: URL: https://github.com/apache/hadoop/pull/6745#discussion_r1571614915 ########## hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java: ########## @@ -641,6 +663,207 @@ private void closeBlockReaders() { } + private class VerifyReadableCommand extends DebugCommand { Review Comment: Can add some comments to describe this command. ########## hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java: ########## @@ -641,6 +663,207 @@ private void closeBlockReaders() { } + private class VerifyReadableCommand extends DebugCommand { + private DistributedFileSystem dfs; + private boolean suppressed = false; + + VerifyReadableCommand() { + super("verifyReadable", + "verifyReadable " + + "[-path <path> | -input <input>] " + + "[-output <output>] " + + "[-concurrency <concurrency>] " + + "[-suppressed]", + " Verify if one or multiple paths are fully readable and have no missing blocks."); + } + + @Override + int run(List<String> args) throws IOException { + if (args.isEmpty()) { + System.out.println(usageText); + System.out.println(helpText + System.lineSeparator()); + return 1; + } + dfs = AdminHelper.getDFS(getConf()); + String pathStr = StringUtils.popOptionWithArgument("-path", args); + String inputStr = StringUtils.popOptionWithArgument("-input", args); + String outputStr = StringUtils.popOptionWithArgument("-output", args); + String concurrencyStr = StringUtils.popOptionWithArgument("-concurrency", args); + suppressed = StringUtils.popOption("-suppressed", args); + if (pathStr == null && inputStr == null) { + System.out.println("Either -path or -input must be present."); + System.out.println(usageText); + System.out.println(helpText + System.lineSeparator()); + return 1; + } + try { + return handleArgs(pathStr, inputStr, outputStr, concurrencyStr); + } catch (Exception e) { + System.err.println( + "Got IOE: " + StringUtils.stringifyException(e) + " for command: " + StringUtils.join( + ",", args)); + return 1; + } + } + + private int handleArgs(String pathStr, String inputStr, String outputStr, String concurrencyStr) + throws IOException, ExecutionException, InterruptedException { + BufferedWriter writer = null; + try { + if (outputStr != null) { + File output = new File(outputStr); + writer = new BufferedWriter(new OutputStreamWriter(Files.newOutputStream(output.toPath()), + StandardCharsets.UTF_8)); + } + + // -path takes priority over -input + if (pathStr != null) { + int result = handlePath(new Path(pathStr)); Review Comment: how about using `handlePaths()` method too? ########## hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java: ########## @@ -641,6 +663,207 @@ private void closeBlockReaders() { } + private class VerifyReadableCommand extends DebugCommand { + private DistributedFileSystem dfs; + private boolean suppressed = false; + + VerifyReadableCommand() { + super("verifyReadable", + "verifyReadable " + + "[-path <path> | -input <input>] " + + "[-output <output>] " + + "[-concurrency <concurrency>] " + + "[-suppressed]", + " Verify if one or multiple paths are fully readable and have no missing blocks."); + } + + @Override + int run(List<String> args) throws IOException { + if (args.isEmpty()) { + System.out.println(usageText); + System.out.println(helpText + System.lineSeparator()); + return 1; + } + dfs = AdminHelper.getDFS(getConf()); + String pathStr = StringUtils.popOptionWithArgument("-path", args); + String inputStr = StringUtils.popOptionWithArgument("-input", args); + String outputStr = StringUtils.popOptionWithArgument("-output", args); + String concurrencyStr = StringUtils.popOptionWithArgument("-concurrency", args); + suppressed = StringUtils.popOption("-suppressed", args); + if (pathStr == null && inputStr == null) { + System.out.println("Either -path or -input must be present."); + System.out.println(usageText); + System.out.println(helpText + System.lineSeparator()); + return 1; + } + try { + return handleArgs(pathStr, inputStr, outputStr, concurrencyStr); + } catch (Exception e) { + System.err.println( + "Got IOE: " + StringUtils.stringifyException(e) + " for command: " + StringUtils.join( + ",", args)); + return 1; + } + } + + private int handleArgs(String pathStr, String inputStr, String outputStr, String concurrencyStr) + throws IOException, ExecutionException, InterruptedException { + BufferedWriter writer = null; + try { + if (outputStr != null) { + File output = new File(outputStr); + writer = new BufferedWriter(new OutputStreamWriter(Files.newOutputStream(output.toPath()), + StandardCharsets.UTF_8)); + } + + // -path takes priority over -input + if (pathStr != null) { + int result = handlePath(new Path(pathStr)); + writeToOutput(writer, pathStr, result); + return result; + } + + // -input must be defined by this point + File input = new File(inputStr); + if (!input.exists()) { + return 1; + } + BufferedReader reader = new BufferedReader( + new InputStreamReader(Files.newInputStream(input.toPath()), StandardCharsets.UTF_8)); + Set<Path> paths = new HashSet<>(); + String line; + while ((line = reader.readLine()) != null) { + paths.add(new Path(line.trim())); + } + reader.close(); + int concurrency = concurrencyStr == null ? 1 : Integer.parseInt(concurrencyStr); + return handlePaths(paths, writer, concurrency); + } finally { + if (writer != null) { + writer.flush(); + writer.close(); + } + } + } + + private void writeToOutput(BufferedWriter writer, String path, int result) throws IOException { + if (writer == null) { + return; + } + writer.write(path); + writer.write(" "); + writer.write(String.valueOf(result)); + writer.write("\n"); + writer.flush(); + } + + private int handlePaths(Set<Path> paths, BufferedWriter writer, int concurrency) + throws ExecutionException, InterruptedException, IOException { + int total = paths.size(); + long start = Time.monotonicNow(); + ExecutorService threadPool = Executors.newFixedThreadPool(concurrency); + List<Callable<Pair<Path, Integer>>> tasks = new ArrayList<>(); + for (Path path : paths) { + tasks.add(() -> Pair.of(path, handlePath(path))); + } + List<Future<Pair<Path, Integer>>> futures = + tasks.stream().map(threadPool::submit).collect(Collectors.toList()); + + boolean failed = false; + int done = 0; + for (Future<Pair<Path, Integer>> future : futures) { + done++; + if (done % 1000 == 0) { + long elapsed = Time.monotonicNow() - start; + double rate = (double) done / elapsed * 1000; + String msg = "Progress: %d/%d, elapsed: %d ms, rate: %5.2f files/s%n"; + System.out.printf(msg, done, total, elapsed, rate); + } + writeToOutput(writer, future.get().getLeft().toString(), future.get().getRight()); + failed |= future.get().getRight() != 0; + } + return failed ? 1 : 0; + } + + private int handlePath(Path path) { + + HdfsBlockLocation[] locs; + try { + locs = (HdfsBlockLocation[]) dfs.getFileBlockLocations(path, 0, + dfs.getFileStatus(path).getLen()); Review Comment: here can use Long.MAX simply. ########## hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java: ########## @@ -641,6 +663,207 @@ private void closeBlockReaders() { } + private class VerifyReadableCommand extends DebugCommand { + private DistributedFileSystem dfs; + private boolean suppressed = false; + + VerifyReadableCommand() { + super("verifyReadable", + "verifyReadable " + + "[-path <path> | -input <input>] " + + "[-output <output>] " + + "[-concurrency <concurrency>] " + + "[-suppressed]", + " Verify if one or multiple paths are fully readable and have no missing blocks."); + } + + @Override + int run(List<String> args) throws IOException { + if (args.isEmpty()) { + System.out.println(usageText); + System.out.println(helpText + System.lineSeparator()); + return 1; + } + dfs = AdminHelper.getDFS(getConf()); + String pathStr = StringUtils.popOptionWithArgument("-path", args); + String inputStr = StringUtils.popOptionWithArgument("-input", args); + String outputStr = StringUtils.popOptionWithArgument("-output", args); + String concurrencyStr = StringUtils.popOptionWithArgument("-concurrency", args); + suppressed = StringUtils.popOption("-suppressed", args); + if (pathStr == null && inputStr == null) { + System.out.println("Either -path or -input must be present."); + System.out.println(usageText); + System.out.println(helpText + System.lineSeparator()); + return 1; + } + try { + return handleArgs(pathStr, inputStr, outputStr, concurrencyStr); + } catch (Exception e) { + System.err.println( + "Got IOE: " + StringUtils.stringifyException(e) + " for command: " + StringUtils.join( + ",", args)); + return 1; + } + } + + private int handleArgs(String pathStr, String inputStr, String outputStr, String concurrencyStr) + throws IOException, ExecutionException, InterruptedException { + BufferedWriter writer = null; + try { + if (outputStr != null) { + File output = new File(outputStr); + writer = new BufferedWriter(new OutputStreamWriter(Files.newOutputStream(output.toPath()), + StandardCharsets.UTF_8)); + } + + // -path takes priority over -input + if (pathStr != null) { + int result = handlePath(new Path(pathStr)); + writeToOutput(writer, pathStr, result); + return result; + } + + // -input must be defined by this point + File input = new File(inputStr); + if (!input.exists()) { + return 1; + } + BufferedReader reader = new BufferedReader( + new InputStreamReader(Files.newInputStream(input.toPath()), StandardCharsets.UTF_8)); + Set<Path> paths = new HashSet<>(); + String line; + while ((line = reader.readLine()) != null) { + paths.add(new Path(line.trim())); + } + reader.close(); + int concurrency = concurrencyStr == null ? 1 : Integer.parseInt(concurrencyStr); + return handlePaths(paths, writer, concurrency); + } finally { + if (writer != null) { + writer.flush(); + writer.close(); + } + } + } + + private void writeToOutput(BufferedWriter writer, String path, int result) throws IOException { Review Comment: how about writing more detailed data to this writer? such as: 1. path result 2. number of blocks have been verified 3. detailed blocks lists and the verify result for each block or you can add a parameter to control it ########## hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java: ########## @@ -641,6 +663,207 @@ private void closeBlockReaders() { } + private class VerifyReadableCommand extends DebugCommand { + private DistributedFileSystem dfs; + private boolean suppressed = false; + + VerifyReadableCommand() { + super("verifyReadable", + "verifyReadable " + + "[-path <path> | -input <input>] " Review Comment: can add some descriptions for these parameters. ########## hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DebugAdmin.java: ########## @@ -641,6 +663,207 @@ private void closeBlockReaders() { } + private class VerifyReadableCommand extends DebugCommand { + private DistributedFileSystem dfs; + private boolean suppressed = false; + + VerifyReadableCommand() { + super("verifyReadable", + "verifyReadable " + + "[-path <path> | -input <input>] " + + "[-output <output>] " + + "[-concurrency <concurrency>] " + + "[-suppressed]", + " Verify if one or multiple paths are fully readable and have no missing blocks."); + } + + @Override + int run(List<String> args) throws IOException { + if (args.isEmpty()) { + System.out.println(usageText); + System.out.println(helpText + System.lineSeparator()); + return 1; + } + dfs = AdminHelper.getDFS(getConf()); + String pathStr = StringUtils.popOptionWithArgument("-path", args); + String inputStr = StringUtils.popOptionWithArgument("-input", args); + String outputStr = StringUtils.popOptionWithArgument("-output", args); + String concurrencyStr = StringUtils.popOptionWithArgument("-concurrency", args); + suppressed = StringUtils.popOption("-suppressed", args); + if (pathStr == null && inputStr == null) { + System.out.println("Either -path or -input must be present."); + System.out.println(usageText); + System.out.println(helpText + System.lineSeparator()); + return 1; + } + try { + return handleArgs(pathStr, inputStr, outputStr, concurrencyStr); + } catch (Exception e) { + System.err.println( + "Got IOE: " + StringUtils.stringifyException(e) + " for command: " + StringUtils.join( + ",", args)); + return 1; + } + } + + private int handleArgs(String pathStr, String inputStr, String outputStr, String concurrencyStr) Review Comment: maybe you can abstract this method as the following steps to make it clear 1. prepare the paths need to be verified 2. verify these paths > Add a command to check if files are readable > -------------------------------------------- > > Key: HDFS-17475 > URL: https://issues.apache.org/jira/browse/HDFS-17475 > Project: Hadoop HDFS > Issue Type: Improvement > Components: hdfs > Reporter: Felix N > Assignee: Felix N > Priority: Minor > Labels: pull-request-available > > Sometimes a job can fail due to one unreadable file down the line due to > missing replicas or dead DNs or other reason. This command should allow users > to check whether files are readable by checking for metadata on DNs without > executing full read pipelines of the files. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org