[ 
https://issues.apache.org/jira/browse/HADOOP-17531?focusedWorklogId=563285&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-563285
 ]

ASF GitHub Bot logged work on HADOOP-17531:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Mar/21 19:29
            Start Date: 09/Mar/21 19:29
    Worklog Time Spent: 10m 
      Work Description: steveloughran commented on a change in pull request 
#2732:
URL: https://github.com/apache/hadoop/pull/2732#discussion_r590624549



##########
File path: 
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
##########
@@ -730,4 +660,155 @@ private void writeToFileListing(SequenceFile.Writer 
fileListWriter,
     totalPaths++;
     maybePrintStats();
   }
+
+  /**
+   * A utility class to traverse a directory.
+   */
+  private final class TraverseDirectory {
+
+    private SequenceFile.Writer fileListWriter;
+    private FileSystem sourceFS;
+    private ArrayList<FileStatus> sourceDirs;
+    private Path sourcePathRoot;
+    private DistCpContext context;
+    private HashSet<String> excludeList;
+    private List<FileStatusInfo> fileStatuses;
+    private final boolean preserveAcls;
+    private final boolean preserveXAttrs;
+    private final boolean preserveRawXattrs;
+
+    private TraverseDirectory(SequenceFile.Writer fileListWriter,
+        FileSystem sourceFS, ArrayList<FileStatus> sourceDirs,
+        Path sourcePathRoot, DistCpContext context, HashSet<String> 
excludeList,
+        List<FileStatusInfo> fileStatuses) {
+      this.fileListWriter = fileListWriter;
+      this.sourceFS = sourceFS;
+      this.sourceDirs = sourceDirs;
+      this.sourcePathRoot = sourcePathRoot;
+      this.context = context;
+      this.excludeList = excludeList;
+      this.fileStatuses = fileStatuses;
+      this.preserveAcls = context.shouldPreserve(FileAttribute.ACL);
+      this.preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR);
+      this.preserveRawXattrs = context.shouldPreserveRawXattrs();
+    }
+
+    public void traverseDirectory() throws IOException {
+      if (context.shouldUseIterator()) {
+        traverseDirectoryLegacy();
+      } else {
+        traverseDirectoryMultiThreaded();
+      }
+    }
+
+    public void traverseDirectoryMultiThreaded() throws IOException {
+      assert numListstatusThreads > 0;
+      if (LOG.isDebugEnabled()) {

Review comment:
       you can go to slf4j logging here; this is all commons-logging era 
(distcp lagged)

##########
File path: 
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
##########
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.tools;
 
+import org.apache.hadoop.fs.RemoteIterator;

Review comment:
       needs to go  into the "real hadoop imports" block; your IDE is getting 
confused. Putting it in the right place makes backporting waay easier

##########
File path: 
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
##########
@@ -730,4 +660,155 @@ private void writeToFileListing(SequenceFile.Writer 
fileListWriter,
     totalPaths++;
     maybePrintStats();
   }
+
+  /**
+   * A utility class to traverse a directory.
+   */
+  private final class TraverseDirectory {
+
+    private SequenceFile.Writer fileListWriter;
+    private FileSystem sourceFS;
+    private ArrayList<FileStatus> sourceDirs;
+    private Path sourcePathRoot;
+    private DistCpContext context;
+    private HashSet<String> excludeList;
+    private List<FileStatusInfo> fileStatuses;
+    private final boolean preserveAcls;
+    private final boolean preserveXAttrs;
+    private final boolean preserveRawXattrs;
+
+    private TraverseDirectory(SequenceFile.Writer fileListWriter,
+        FileSystem sourceFS, ArrayList<FileStatus> sourceDirs,
+        Path sourcePathRoot, DistCpContext context, HashSet<String> 
excludeList,
+        List<FileStatusInfo> fileStatuses) {
+      this.fileListWriter = fileListWriter;
+      this.sourceFS = sourceFS;
+      this.sourceDirs = sourceDirs;
+      this.sourcePathRoot = sourcePathRoot;
+      this.context = context;
+      this.excludeList = excludeList;
+      this.fileStatuses = fileStatuses;
+      this.preserveAcls = context.shouldPreserve(FileAttribute.ACL);
+      this.preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR);
+      this.preserveRawXattrs = context.shouldPreserveRawXattrs();
+    }
+
+    public void traverseDirectory() throws IOException {
+      if (context.shouldUseIterator()) {
+        traverseDirectoryLegacy();
+      } else {
+        traverseDirectoryMultiThreaded();
+      }
+    }
+
+    public void traverseDirectoryMultiThreaded() throws IOException {
+      assert numListstatusThreads > 0;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Starting thread pool of " + numListstatusThreads
+            + " listStatus workers.");
+      }
+      ProducerConsumer<FileStatus, FileStatus[]> workers =
+          new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
+      for (int i = 0; i < numListstatusThreads; i++) {
+        workers.addWorker(
+            new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()),
+                excludeList));
+      }
+
+      for (FileStatus status : sourceDirs) {
+        workers.put(new WorkRequest<FileStatus>(status, 0));
+      }
+
+      while (workers.hasWork()) {
+        try {
+          WorkReport<FileStatus[]> workResult = workers.take();
+          int retry = workResult.getRetry();
+          for (FileStatus child : workResult.getItem()) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(
+                  "Recording source-path: " + child.getPath() + " for copy.");
+            }
+            if (workResult.getSuccess()) {
+              LinkedList<CopyListingFileStatus> childCopyListingStatus =
+                  DistCpUtils.toCopyListingFileStatus(sourceFS, child,
+                      preserveAcls && child.isDirectory(),
+                      preserveXAttrs && child.isDirectory(),
+                      preserveRawXattrs && child.isDirectory(),
+                      context.getBlocksPerChunk());
+
+              for (CopyListingFileStatus fs : childCopyListingStatus) {
+                if (randomizeFileListing) {
+                  addToFileListing(fileStatuses,
+                      new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
+                } else {
+                  writeToFileListing(fileListWriter, fs, sourcePathRoot);
+                }
+              }
+            }
+            if (retry < maxRetries) {
+              if (child.isDirectory()) {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Traversing into source dir: " + child.getPath());
+                }
+                workers.put(new WorkRequest<FileStatus>(child, retry));
+              }
+            } else {
+              LOG.error("Giving up on " + child.getPath() + " after " + retry
+                  + " retries.");
+            }
+          }
+        } catch (InterruptedException ie) {
+          LOG.error("Could not get item from childQueue. Retrying...");
+        }
+      }
+      workers.shutdown();

Review comment:
       should this be in a finally clause?

##########
File path: 
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java
##########
@@ -217,4 +221,42 @@ public void testPreserveEC() throws Exception {
     assertTrue("/dest/dir1/subdir1 is not erasure coded!",
         destSubDir1Status.isErasureCoded());
   }
+
+  @Test
+  public void testUseIterator() throws Exception {

Review comment:
       Could we have this/a variant of this in AbstractContractDistCpTest so 
that the object stores will run it.

##########
File path: 
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
##########
@@ -239,7 +239,12 @@
    */
   DIRECT_WRITE(DistCpConstants.CONF_LABEL_DIRECT_WRITE,
       new Option("direct", false, "Write files directly to the"
-          + " target location, avoiding temporary file rename."));
+          + " target location, avoiding temporary file rename.")),
+
+  USE_ITERATOR(DistCpConstants.CONF_LABEL_USE_ITERATOR,
+      new Option("useIterator", false,

Review comment:
       could we have some non-mixed-case arg; I always get confused here? 

##########
File path: 
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
##########
@@ -730,4 +660,155 @@ private void writeToFileListing(SequenceFile.Writer 
fileListWriter,
     totalPaths++;
     maybePrintStats();
   }
+
+  /**
+   * A utility class to traverse a directory.
+   */
+  private final class TraverseDirectory {
+
+    private SequenceFile.Writer fileListWriter;
+    private FileSystem sourceFS;
+    private ArrayList<FileStatus> sourceDirs;
+    private Path sourcePathRoot;
+    private DistCpContext context;
+    private HashSet<String> excludeList;
+    private List<FileStatusInfo> fileStatuses;
+    private final boolean preserveAcls;
+    private final boolean preserveXAttrs;
+    private final boolean preserveRawXattrs;
+
+    private TraverseDirectory(SequenceFile.Writer fileListWriter,
+        FileSystem sourceFS, ArrayList<FileStatus> sourceDirs,
+        Path sourcePathRoot, DistCpContext context, HashSet<String> 
excludeList,
+        List<FileStatusInfo> fileStatuses) {
+      this.fileListWriter = fileListWriter;
+      this.sourceFS = sourceFS;
+      this.sourceDirs = sourceDirs;
+      this.sourcePathRoot = sourcePathRoot;
+      this.context = context;
+      this.excludeList = excludeList;
+      this.fileStatuses = fileStatuses;
+      this.preserveAcls = context.shouldPreserve(FileAttribute.ACL);
+      this.preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR);
+      this.preserveRawXattrs = context.shouldPreserveRawXattrs();
+    }
+
+    public void traverseDirectory() throws IOException {
+      if (context.shouldUseIterator()) {
+        traverseDirectoryLegacy();
+      } else {
+        traverseDirectoryMultiThreaded();
+      }
+    }
+
+    public void traverseDirectoryMultiThreaded() throws IOException {
+      assert numListstatusThreads > 0;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Starting thread pool of " + numListstatusThreads
+            + " listStatus workers.");
+      }
+      ProducerConsumer<FileStatus, FileStatus[]> workers =
+          new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
+      for (int i = 0; i < numListstatusThreads; i++) {
+        workers.addWorker(
+            new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()),
+                excludeList));
+      }
+
+      for (FileStatus status : sourceDirs) {
+        workers.put(new WorkRequest<FileStatus>(status, 0));
+      }
+
+      while (workers.hasWork()) {
+        try {
+          WorkReport<FileStatus[]> workResult = workers.take();
+          int retry = workResult.getRetry();
+          for (FileStatus child : workResult.getItem()) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(
+                  "Recording source-path: " + child.getPath() + " for copy.");
+            }
+            if (workResult.getSuccess()) {
+              LinkedList<CopyListingFileStatus> childCopyListingStatus =
+                  DistCpUtils.toCopyListingFileStatus(sourceFS, child,
+                      preserveAcls && child.isDirectory(),
+                      preserveXAttrs && child.isDirectory(),
+                      preserveRawXattrs && child.isDirectory(),
+                      context.getBlocksPerChunk());
+
+              for (CopyListingFileStatus fs : childCopyListingStatus) {
+                if (randomizeFileListing) {
+                  addToFileListing(fileStatuses,
+                      new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
+                } else {
+                  writeToFileListing(fileListWriter, fs, sourcePathRoot);
+                }
+              }
+            }
+            if (retry < maxRetries) {
+              if (child.isDirectory()) {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Traversing into source dir: " + child.getPath());
+                }
+                workers.put(new WorkRequest<FileStatus>(child, retry));
+              }
+            } else {
+              LOG.error("Giving up on " + child.getPath() + " after " + retry
+                  + " retries.");
+            }
+          }
+        } catch (InterruptedException ie) {
+          LOG.error("Could not get item from childQueue. Retrying...");
+        }
+      }
+      workers.shutdown();
+    }
+
+    private void traverseDirectoryLegacy() throws IOException {
+      Stack<FileStatus> pathStack = new Stack<FileStatus>();
+      for (FileStatus fs : sourceDirs) {
+        if (excludeList == null || !excludeList
+            .contains(fs.getPath().toUri().getPath())) {
+          pathStack.add(fs);
+        }
+      }
+      while (!pathStack.isEmpty()) {
+        prepareListing(pathStack.pop().getPath());
+      }
+    }
+
+    private void prepareListing(Path path) throws IOException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Recording source-path: " + path + " for copy.");
+      }
+      RemoteIterator<FileStatus> listStatus = 
sourceFS.listStatusIterator(path);
+      while (listStatus.hasNext()) {
+        FileStatus child = listStatus.next();
+        if (excludeList != null && excludeList

Review comment:
       `RemoteIterators.filteringRemoteIterator()` would let you do this in an 
elegant functional programming style -just not backportable to branches without 
IOStatistics support

##########
File path: 
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
##########
@@ -730,4 +660,155 @@ private void writeToFileListing(SequenceFile.Writer 
fileListWriter,
     totalPaths++;
     maybePrintStats();
   }
+
+  /**
+   * A utility class to traverse a directory.
+   */
+  private final class TraverseDirectory {
+
+    private SequenceFile.Writer fileListWriter;
+    private FileSystem sourceFS;
+    private ArrayList<FileStatus> sourceDirs;
+    private Path sourcePathRoot;
+    private DistCpContext context;
+    private HashSet<String> excludeList;
+    private List<FileStatusInfo> fileStatuses;
+    private final boolean preserveAcls;
+    private final boolean preserveXAttrs;
+    private final boolean preserveRawXattrs;
+
+    private TraverseDirectory(SequenceFile.Writer fileListWriter,
+        FileSystem sourceFS, ArrayList<FileStatus> sourceDirs,
+        Path sourcePathRoot, DistCpContext context, HashSet<String> 
excludeList,
+        List<FileStatusInfo> fileStatuses) {
+      this.fileListWriter = fileListWriter;
+      this.sourceFS = sourceFS;
+      this.sourceDirs = sourceDirs;
+      this.sourcePathRoot = sourcePathRoot;
+      this.context = context;
+      this.excludeList = excludeList;
+      this.fileStatuses = fileStatuses;
+      this.preserveAcls = context.shouldPreserve(FileAttribute.ACL);
+      this.preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR);
+      this.preserveRawXattrs = context.shouldPreserveRawXattrs();
+    }
+
+    public void traverseDirectory() throws IOException {
+      if (context.shouldUseIterator()) {

Review comment:
       It'd be really good for DurationInfo logging of the time to collect the 
listings -will help deciding which to use, what option to set

##########
File path: 
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
##########
@@ -730,4 +660,155 @@ private void writeToFileListing(SequenceFile.Writer 
fileListWriter,
     totalPaths++;
     maybePrintStats();
   }
+
+  /**
+   * A utility class to traverse a directory.
+   */
+  private final class TraverseDirectory {
+
+    private SequenceFile.Writer fileListWriter;
+    private FileSystem sourceFS;
+    private ArrayList<FileStatus> sourceDirs;
+    private Path sourcePathRoot;
+    private DistCpContext context;
+    private HashSet<String> excludeList;
+    private List<FileStatusInfo> fileStatuses;
+    private final boolean preserveAcls;
+    private final boolean preserveXAttrs;
+    private final boolean preserveRawXattrs;
+
+    private TraverseDirectory(SequenceFile.Writer fileListWriter,
+        FileSystem sourceFS, ArrayList<FileStatus> sourceDirs,
+        Path sourcePathRoot, DistCpContext context, HashSet<String> 
excludeList,
+        List<FileStatusInfo> fileStatuses) {
+      this.fileListWriter = fileListWriter;
+      this.sourceFS = sourceFS;
+      this.sourceDirs = sourceDirs;
+      this.sourcePathRoot = sourcePathRoot;
+      this.context = context;
+      this.excludeList = excludeList;
+      this.fileStatuses = fileStatuses;
+      this.preserveAcls = context.shouldPreserve(FileAttribute.ACL);
+      this.preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR);
+      this.preserveRawXattrs = context.shouldPreserveRawXattrs();
+    }
+
+    public void traverseDirectory() throws IOException {
+      if (context.shouldUseIterator()) {
+        traverseDirectoryLegacy();
+      } else {
+        traverseDirectoryMultiThreaded();
+      }
+    }
+
+    public void traverseDirectoryMultiThreaded() throws IOException {
+      assert numListstatusThreads > 0;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Starting thread pool of " + numListstatusThreads
+            + " listStatus workers.");
+      }
+      ProducerConsumer<FileStatus, FileStatus[]> workers =
+          new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
+      for (int i = 0; i < numListstatusThreads; i++) {
+        workers.addWorker(
+            new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()),
+                excludeList));
+      }
+
+      for (FileStatus status : sourceDirs) {
+        workers.put(new WorkRequest<FileStatus>(status, 0));
+      }
+
+      while (workers.hasWork()) {
+        try {
+          WorkReport<FileStatus[]> workResult = workers.take();
+          int retry = workResult.getRetry();
+          for (FileStatus child : workResult.getItem()) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(
+                  "Recording source-path: " + child.getPath() + " for copy.");
+            }
+            if (workResult.getSuccess()) {
+              LinkedList<CopyListingFileStatus> childCopyListingStatus =
+                  DistCpUtils.toCopyListingFileStatus(sourceFS, child,
+                      preserveAcls && child.isDirectory(),

Review comment:
       `child.isDirectory()` is called enough it could go into a variable

##########
File path: 
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java
##########
@@ -217,4 +221,42 @@ public void testPreserveEC() throws Exception {
     assertTrue("/dest/dir1/subdir1 is not erasure coded!",
         destSubDir1Status.isErasureCoded());
   }
+
+  @Test
+  public void testUseIterator() throws Exception {
+    Path source = new Path("/src");
+    Path dest = new Path("/dest");
+    fs.delete(source, true);
+    fs.delete(dest, true);
+    // Create a source dir
+    fs.mkdirs(source);
+    // Create 10 dirs inside.
+    for (int i = 0; i < 10; i++) {
+      fs.mkdirs(new Path("/src/sub" + i));
+      // Create 10 directories inside each sub
+      for (int j = 0; j < 10; j++) {
+        fs.mkdirs(new Path("/src/sub" + i + "/subsub" + j));
+
+        // create 10 files under each leaf node.
+        for (int k = 0; k < 10; k++) {
+          Path parentPath = new Path("/src/sub" + i + "/subsub" + j);
+          Path filePath = new Path(parentPath, "file" + k);
+          DFSTestUtil.createFile(fs, filePath, 1024L, (short) 3, 1024L);
+        }
+      }
+    }
+
+    DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(),
+        dest.toString(), "-useIterator", conf);
+
+    // Check that all 1000 files got copied.
+    RemoteIterator<LocatedFileStatus> destFileItr = fs.listFiles(dest, true);
+    int numFiles = 0;

Review comment:
       ```
   Assertions.assertThat(RemoteIterators.toList(fs.listFiles(dest, true)))
    .describedAs("files").hasSize(...)
   ```
   
   that way: if the size isn't met, the error includes the list of all files 
which were found.

##########
File path: 
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
##########
@@ -730,4 +660,155 @@ private void writeToFileListing(SequenceFile.Writer 
fileListWriter,
     totalPaths++;
     maybePrintStats();
   }
+
+  /**
+   * A utility class to traverse a directory.
+   */
+  private final class TraverseDirectory {
+
+    private SequenceFile.Writer fileListWriter;
+    private FileSystem sourceFS;
+    private ArrayList<FileStatus> sourceDirs;
+    private Path sourcePathRoot;
+    private DistCpContext context;
+    private HashSet<String> excludeList;
+    private List<FileStatusInfo> fileStatuses;
+    private final boolean preserveAcls;
+    private final boolean preserveXAttrs;
+    private final boolean preserveRawXattrs;
+
+    private TraverseDirectory(SequenceFile.Writer fileListWriter,
+        FileSystem sourceFS, ArrayList<FileStatus> sourceDirs,
+        Path sourcePathRoot, DistCpContext context, HashSet<String> 
excludeList,
+        List<FileStatusInfo> fileStatuses) {
+      this.fileListWriter = fileListWriter;
+      this.sourceFS = sourceFS;
+      this.sourceDirs = sourceDirs;
+      this.sourcePathRoot = sourcePathRoot;
+      this.context = context;
+      this.excludeList = excludeList;
+      this.fileStatuses = fileStatuses;
+      this.preserveAcls = context.shouldPreserve(FileAttribute.ACL);
+      this.preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR);
+      this.preserveRawXattrs = context.shouldPreserveRawXattrs();
+    }
+
+    public void traverseDirectory() throws IOException {
+      if (context.shouldUseIterator()) {
+        traverseDirectoryLegacy();
+      } else {
+        traverseDirectoryMultiThreaded();
+      }
+    }
+
+    public void traverseDirectoryMultiThreaded() throws IOException {
+      assert numListstatusThreads > 0;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Starting thread pool of " + numListstatusThreads
+            + " listStatus workers.");
+      }
+      ProducerConsumer<FileStatus, FileStatus[]> workers =
+          new ProducerConsumer<FileStatus, FileStatus[]>(numListstatusThreads);
+      for (int i = 0; i < numListstatusThreads; i++) {
+        workers.addWorker(
+            new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()),
+                excludeList));
+      }
+
+      for (FileStatus status : sourceDirs) {
+        workers.put(new WorkRequest<FileStatus>(status, 0));
+      }
+
+      while (workers.hasWork()) {
+        try {
+          WorkReport<FileStatus[]> workResult = workers.take();
+          int retry = workResult.getRetry();
+          for (FileStatus child : workResult.getItem()) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(
+                  "Recording source-path: " + child.getPath() + " for copy.");
+            }
+            if (workResult.getSuccess()) {
+              LinkedList<CopyListingFileStatus> childCopyListingStatus =
+                  DistCpUtils.toCopyListingFileStatus(sourceFS, child,
+                      preserveAcls && child.isDirectory(),
+                      preserveXAttrs && child.isDirectory(),
+                      preserveRawXattrs && child.isDirectory(),
+                      context.getBlocksPerChunk());
+
+              for (CopyListingFileStatus fs : childCopyListingStatus) {
+                if (randomizeFileListing) {
+                  addToFileListing(fileStatuses,
+                      new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
+                } else {
+                  writeToFileListing(fileListWriter, fs, sourcePathRoot);
+                }
+              }
+            }
+            if (retry < maxRetries) {
+              if (child.isDirectory()) {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Traversing into source dir: " + child.getPath());
+                }
+                workers.put(new WorkRequest<FileStatus>(child, retry));
+              }
+            } else {
+              LOG.error("Giving up on " + child.getPath() + " after " + retry
+                  + " retries.");
+            }
+          }
+        } catch (InterruptedException ie) {
+          LOG.error("Could not get item from childQueue. Retrying...");
+        }
+      }
+      workers.shutdown();
+    }
+
+    private void traverseDirectoryLegacy() throws IOException {
+      Stack<FileStatus> pathStack = new Stack<FileStatus>();
+      for (FileStatus fs : sourceDirs) {
+        if (excludeList == null || !excludeList
+            .contains(fs.getPath().toUri().getPath())) {
+          pathStack.add(fs);
+        }
+      }
+      while (!pathStack.isEmpty()) {
+        prepareListing(pathStack.pop().getPath());
+      }
+    }
+
+    private void prepareListing(Path path) throws IOException {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Recording source-path: " + path + " for copy.");
+      }
+      RemoteIterator<FileStatus> listStatus = 
sourceFS.listStatusIterator(path);
+      while (listStatus.hasNext()) {
+        FileStatus child = listStatus.next();
+        if (excludeList != null && excludeList
+            .contains(child.getPath().toUri().getPath())) {
+          continue;
+        }
+        LinkedList<CopyListingFileStatus> childCopyListingStatus = DistCpUtils
+            .toCopyListingFileStatus(sourceFS, child,
+                preserveAcls && child.isDirectory(),
+                preserveXAttrs && child.isDirectory(),
+                preserveRawXattrs && child.isDirectory(),
+                context.getBlocksPerChunk());
+        for (CopyListingFileStatus fs : childCopyListingStatus) {
+          if (randomizeFileListing) {
+            addToFileListing(fileStatuses,
+                new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
+          } else {
+            writeToFileListing(fileListWriter, fs, sourcePathRoot);
+          }
+        }
+        if (child.isDirectory()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Traversing into source dir: " + child.getPath());
+          }
+          prepareListing(child.getPath());
+        }
+      }
+    }

Review comment:
       Can you add an `IOStatisticsLogging.logIOStatisticsAtDebug(LOG, 
listStatus)` call here. That way at debug level you get a log from s3a, soon 
abfs of what IO took place for the list, performance etc. Really interesting.

##########
File path: 
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java
##########
@@ -217,4 +221,42 @@ public void testPreserveEC() throws Exception {
     assertTrue("/dest/dir1/subdir1 is not erasure coded!",
         destSubDir1Status.isErasureCoded());
   }
+
+  @Test
+  public void testUseIterator() throws Exception {
+    Path source = new Path("/src");
+    Path dest = new Path("/dest");
+    fs.delete(source, true);
+    fs.delete(dest, true);
+    // Create a source dir
+    fs.mkdirs(source);
+    // Create 10 dirs inside.
+    for (int i = 0; i < 10; i++) {
+      fs.mkdirs(new Path("/src/sub" + i));
+      // Create 10 directories inside each sub
+      for (int j = 0; j < 10; j++) {
+        fs.mkdirs(new Path("/src/sub" + i + "/subsub" + j));
+
+        // create 10 files under each leaf node.
+        for (int k = 0; k < 10; k++) {
+          Path parentPath = new Path("/src/sub" + i + "/subsub" + j);
+          Path filePath = new Path(parentPath, "file" + k);
+          DFSTestUtil.createFile(fs, filePath, 1024L, (short) 3, 1024L);

Review comment:
       actually, you can go straight to the createfile, without doing any 
mkdirs. Still going to take 10^3 calls on an object store though. If you do 
move something of this size there then
   1. the create files should be done in an executor pool (see 
ITestPartialRenamesDeletes.createDirsAndFiles())
   2. parameters should be something configurable, just a subclass getWidth() 
would be enough

##########
File path: 
hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java
##########
@@ -217,4 +221,42 @@ public void testPreserveEC() throws Exception {
     assertTrue("/dest/dir1/subdir1 is not erasure coded!",
         destSubDir1Status.isErasureCoded());
   }
+
+  @Test
+  public void testUseIterator() throws Exception {
+    Path source = new Path("/src");
+    Path dest = new Path("/dest");
+    fs.delete(source, true);
+    fs.delete(dest, true);
+    // Create a source dir
+    fs.mkdirs(source);
+    // Create 10 dirs inside.
+    for (int i = 0; i < 10; i++) {
+      fs.mkdirs(new Path("/src/sub" + i));

Review comment:
       skip this and just delegate to the children; saves 10 calls




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 563285)
    Time Spent: 1.5h  (was: 1h 20m)

> DistCp: Reduce memory usage on copying huge directories
> -------------------------------------------------------
>
>                 Key: HADOOP-17531
>                 URL: https://issues.apache.org/jira/browse/HADOOP-17531
>             Project: Hadoop Common
>          Issue Type: Improvement
>            Reporter: Ayush Saxena
>            Assignee: Ayush Saxena
>            Priority: Critical
>              Labels: pull-request-available
>         Attachments: MoveToStackIterator.patch, gc-NewD-512M-3.8ML.log
>
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Presently distCp, uses the producer-consumer kind of setup while building the 
> listing, the input queue and output queue are both unbounded, thus the 
> listStatus grows quite huge.
> Rel Code Part :
> https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java#L635
> This goes on bredth-first traversal kind of stuff(uses queue instead of 
> earlier stack), so if you have files at lower depth, it will like open up the 
> entire tree and the start processing....



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to