[ 
https://issues.apache.org/jira/browse/HDFS-15987?focusedWorklogId=689380&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-689380
 ]

ASF GitHub Bot logged work on HDFS-15987:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Dec/21 15:07
            Start Date: 02/Dec/21 15:07
    Worklog Time Spent: 10m 
      Work Description: symious commented on a change in pull request #2918:
URL: https://github.com/apache/hadoop/pull/2918#discussion_r761133799



##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java
##########
@@ -651,14 +683,123 @@ private void output(Configuration conf, FileSummary 
summary,
         is = FSImageUtil.wrapInputStreamForCompression(conf,
             summary.getCodec(), new BufferedInputStream(new LimitInputStream(
                 fin, section.getLength())));
-        outputINodes(is);
+        INodeSection s = INodeSection.parseDelimitedFrom(is);
+        LOG.info("Found {} INodes in the INode section", s.getNumInodes());
+        int count = outputINodes(is, out);
+        LOG.info("Outputted {} INodes.", count);
       }
     }
     afterOutput();
     long timeTaken = Time.monotonicNow() - startTime;
     LOG.debug("Time to output inodes: {}ms", timeTaken);
   }
 
+  /**
+   * STEP1: Multi-threaded process sub-sections.
+   * Given n (n>1) threads to process k (k>=n) sections,
+   * E.g. 10 sections and 4 threads, grouped as follows:
+   * |---------------------------------------------------------------|
+   * | (0    1    2)    (3    4    5)    (6    7)     (8    9)       |
+   * | thread[0]        thread[1]        thread[2]    thread[3]      |
+   * |---------------------------------------------------------------|
+   *
+   * STEP2: Merge files.
+   */
+  private void outputInParallel(Configuration conf, FileSummary summary,
+      ArrayList<FileSummary.Section> subSections)
+      throws IOException {
+    int nThreads = Integer.min(numThreads, subSections.size());
+    LOG.info("Outputting in parallel with {} sub-sections" +
+        " using {} threads", subSections.size(), nThreads);
+    final CopyOnWriteArrayList<IOException> exceptions =
+        new CopyOnWriteArrayList<>();
+    Thread[] threads = new Thread[nThreads];
+    String[] paths = new String[nThreads];
+    for (int i = 0; i < paths.length; i++) {
+      paths[i] = parallelOut + ".tmp." + i;
+    }
+    AtomicLong expectedINodes = new AtomicLong(0);
+    AtomicLong totalParsed = new AtomicLong(0);
+    String codec = summary.getCodec();
+
+    int mark = 0;
+    for (int i = 0; i < nThreads; i++) {
+      // Each thread processes different ordered sub-sections
+      // and outputs to different paths
+      int step = subSections.size() / nThreads +
+          (i < subSections.size() % nThreads ? 1 : 0);
+      int start = mark;
+      int end = start + step;
+      ArrayList<FileSummary.Section> subList = new ArrayList<>(
+          subSections.subList(start, end));
+      mark = end;
+      String path = paths[i];
+
+      threads[i] = new Thread(() -> {
+        LOG.info("output iNodes of sub-sections: [{},{})", start, end);
+        InputStream is = null;
+        try (PrintStream theOut = new PrintStream(path, "UTF-8")) {
+          long startTime = Time.monotonicNow();
+          for (int j = 0; j < subList.size(); j++) {
+            is = getInputStreamForSection(subList.get(j), codec, conf);
+            if (start == 0 && j == 0) {
+              // The first iNode section has a header which must be
+              // processed first
+              INodeSection s = INodeSection.parseDelimitedFrom(is);
+              expectedINodes.set(s.getNumInodes());
+            }
+            totalParsed.addAndGet(outputINodes(is, theOut));
+          }
+          long timeTaken = Time.monotonicNow() - startTime;
+          LOG.info("Time to output iNodes of sub-sections: [{},{}) {} ms",
+              start, end, timeTaken);
+        } catch (Exception e) {
+          exceptions.add(new IOException(e));
+        } finally {
+          try {
+            is.close();
+          } catch (IOException ioe) {
+            LOG.warn("Failed to close the input stream, ignoring", ioe);
+          }
+        }
+      });
+    }
+    for (Thread t : threads) {
+      t.start();
+    }
+    for (Thread t : threads) {
+      try {
+        t.join();
+      } catch (InterruptedException e) {
+        LOG.error("Interrupted when thread join.", e);
+        throw new IOException(e);
+      }
+    }
+
+    if (exceptions.size() != 0) {

Review comment:
       Is this check happening after all threads are finished? Can we return 
the exception promptly?

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java
##########
@@ -651,14 +683,123 @@ private void output(Configuration conf, FileSummary 
summary,
         is = FSImageUtil.wrapInputStreamForCompression(conf,
             summary.getCodec(), new BufferedInputStream(new LimitInputStream(
                 fin, section.getLength())));
-        outputINodes(is);
+        INodeSection s = INodeSection.parseDelimitedFrom(is);
+        LOG.info("Found {} INodes in the INode section", s.getNumInodes());
+        int count = outputINodes(is, out);
+        LOG.info("Outputted {} INodes.", count);
       }
     }
     afterOutput();
     long timeTaken = Time.monotonicNow() - startTime;
     LOG.debug("Time to output inodes: {}ms", timeTaken);
   }
 
+  /**
+   * STEP1: Multi-threaded process sub-sections.
+   * Given n (n>1) threads to process k (k>=n) sections,
+   * E.g. 10 sections and 4 threads, grouped as follows:
+   * |---------------------------------------------------------------|
+   * | (0    1    2)    (3    4    5)    (6    7)     (8    9)       |
+   * | thread[0]        thread[1]        thread[2]    thread[3]      |
+   * |---------------------------------------------------------------|
+   *
+   * STEP2: Merge files.
+   */
+  private void outputInParallel(Configuration conf, FileSummary summary,
+      ArrayList<FileSummary.Section> subSections)
+      throws IOException {
+    int nThreads = Integer.min(numThreads, subSections.size());
+    LOG.info("Outputting in parallel with {} sub-sections" +
+        " using {} threads", subSections.size(), nThreads);
+    final CopyOnWriteArrayList<IOException> exceptions =
+        new CopyOnWriteArrayList<>();
+    Thread[] threads = new Thread[nThreads];
+    String[] paths = new String[nThreads];
+    for (int i = 0; i < paths.length; i++) {
+      paths[i] = parallelOut + ".tmp." + i;
+    }
+    AtomicLong expectedINodes = new AtomicLong(0);
+    AtomicLong totalParsed = new AtomicLong(0);
+    String codec = summary.getCodec();
+
+    int mark = 0;
+    for (int i = 0; i < nThreads; i++) {
+      // Each thread processes different ordered sub-sections
+      // and outputs to different paths
+      int step = subSections.size() / nThreads +
+          (i < subSections.size() % nThreads ? 1 : 0);
+      int start = mark;
+      int end = start + step;
+      ArrayList<FileSummary.Section> subList = new ArrayList<>(
+          subSections.subList(start, end));
+      mark = end;
+      String path = paths[i];
+
+      threads[i] = new Thread(() -> {

Review comment:
       Maybe thread pool is better here?

##########
File path: 
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/PBImageTextWriter.java
##########
@@ -642,6 +661,19 @@ long getParentId(long id) throws IOException {
   private void output(Configuration conf, FileSummary summary,
       FileInputStream fin, ArrayList<FileSummary.Section> sections)
       throws IOException {
+    ArrayList<FileSummary.Section> allINodeSubSections =
+        getINodeSubSections(sections);
+    if (numThreads > 1 && !parallelOut.equals("-") &&
+        allINodeSubSections.size() > 1) {

Review comment:
       It would be good to add some logs showing it's using serial or parallel 
output. And if the user is using the tool with option "threads" but her fsimage 
doesn't support the feature, better add some logs notifying her it's falling 
back to the serial output.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 689380)
    Time Spent: 4h  (was: 3h 50m)

> Improve oiv tool to parse fsimage file in parallel with delimited format
> ------------------------------------------------------------------------
>
>                 Key: HDFS-15987
>                 URL: https://issues.apache.org/jira/browse/HDFS-15987
>             Project: Hadoop HDFS
>          Issue Type: Sub-task
>            Reporter: Hongbing Wang
>            Assignee: Hongbing Wang
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: Improve_oiv_tool_001.pdf
>
>          Time Spent: 4h
>  Remaining Estimate: 0h
>
> The purpose of this Jira is to improve oiv tool to parse fsimage file with 
> sub-sections (see -HDFS-14617-) in parallel with delmited format. 
> 1.Serial parsing is time-consuming
> The time to serially parse a large fsimage with delimited format (e.g. `hdfs 
> oiv -p Delimited -t <tmp> ...`) is as follows: 
> {code:java}
> 1) Loading string table:                 -> Not time consuming.
> 2) Loading inode references:             -> Not time consuming
> 3) Loading directories in INode section: -> Slightly time consuming (3%)
> 4) Loading INode directory section:      -> A bit time consuming (11%)
> 5) Output:                               -> Very time consuming (86%){code}
> Therefore, output is the most parallelized stage.
> 2.How to output in parallel
> The sub-sections are grouped in order, and each thread processes a group and 
> outputs it to the file corresponding to each thread, and finally merges the 
> output files.
> 3. The result of a test
> {code:java}
>  input fsimage file info:
>  3.4G, 12 sub-sections, 55976500 INodes
>  -----------------------------------------
>  Threads TotalTime OutputTime MergeTime
>  1       18m37s     16m18s      –
>  4        8m7s      4m49s       41s{code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: hdfs-issues-h...@hadoop.apache.org

Reply via email to