[GitHub] [hadoop] ayushtkn commented on a change in pull request #2732: HADOOP-17531. DistCp: Reduce memory usage on copying huge directories.

2021-03-10 Thread GitBox


ayushtkn commented on a change in pull request #2732:
URL: https://github.com/apache/hadoop/pull/2732#discussion_r591382388



##
File path: 
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
##
@@ -730,4 +660,155 @@ private void writeToFileListing(SequenceFile.Writer 
fileListWriter,
 totalPaths++;
 maybePrintStats();
   }
+
+  /**
+   * A utility class to traverse a directory.
+   */
+  private final class TraverseDirectory {
+
+private SequenceFile.Writer fileListWriter;
+private FileSystem sourceFS;
+private ArrayList sourceDirs;
+private Path sourcePathRoot;
+private DistCpContext context;
+private HashSet excludeList;
+private List fileStatuses;
+private final boolean preserveAcls;
+private final boolean preserveXAttrs;
+private final boolean preserveRawXattrs;
+
+private TraverseDirectory(SequenceFile.Writer fileListWriter,
+FileSystem sourceFS, ArrayList sourceDirs,
+Path sourcePathRoot, DistCpContext context, HashSet 
excludeList,
+List fileStatuses) {
+  this.fileListWriter = fileListWriter;
+  this.sourceFS = sourceFS;
+  this.sourceDirs = sourceDirs;
+  this.sourcePathRoot = sourcePathRoot;
+  this.context = context;
+  this.excludeList = excludeList;
+  this.fileStatuses = fileStatuses;
+  this.preserveAcls = context.shouldPreserve(FileAttribute.ACL);
+  this.preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR);
+  this.preserveRawXattrs = context.shouldPreserveRawXattrs();
+}
+
+public void traverseDirectory() throws IOException {
+  if (context.shouldUseIterator()) {
+traverseDirectoryLegacy();
+  } else {
+traverseDirectoryMultiThreaded();
+  }
+}
+
+public void traverseDirectoryMultiThreaded() throws IOException {
+  assert numListstatusThreads > 0;
+  if (LOG.isDebugEnabled()) {
+LOG.debug("Starting thread pool of " + numListstatusThreads
++ " listStatus workers.");
+  }
+  ProducerConsumer workers =
+  new ProducerConsumer(numListstatusThreads);
+  for (int i = 0; i < numListstatusThreads; i++) {
+workers.addWorker(
+new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()),
+excludeList));
+  }
+
+  for (FileStatus status : sourceDirs) {
+workers.put(new WorkRequest(status, 0));
+  }
+
+  while (workers.hasWork()) {
+try {
+  WorkReport workResult = workers.take();
+  int retry = workResult.getRetry();
+  for (FileStatus child : workResult.getItem()) {
+if (LOG.isDebugEnabled()) {
+  LOG.debug(
+  "Recording source-path: " + child.getPath() + " for copy.");
+}
+if (workResult.getSuccess()) {
+  LinkedList childCopyListingStatus =
+  DistCpUtils.toCopyListingFileStatus(sourceFS, child,
+  preserveAcls && child.isDirectory(),
+  preserveXAttrs && child.isDirectory(),
+  preserveRawXattrs && child.isDirectory(),
+  context.getBlocksPerChunk());
+
+  for (CopyListingFileStatus fs : childCopyListingStatus) {
+if (randomizeFileListing) {
+  addToFileListing(fileStatuses,
+  new FileStatusInfo(fs, sourcePathRoot), fileListWriter);
+} else {
+  writeToFileListing(fileListWriter, fs, sourcePathRoot);
+}
+  }
+}
+if (retry < maxRetries) {
+  if (child.isDirectory()) {
+if (LOG.isDebugEnabled()) {
+  LOG.debug("Traversing into source dir: " + child.getPath());
+}
+workers.put(new WorkRequest(child, retry));
+  }
+} else {
+  LOG.error("Giving up on " + child.getPath() + " after " + retry
+  + " retries.");
+}
+  }
+} catch (InterruptedException ie) {
+  LOG.error("Could not get item from childQueue. Retrying...");
+}
+  }
+  workers.shutdown();
+}
+
+private void traverseDirectoryLegacy() throws IOException {
+  Stack pathStack = new Stack();
+  for (FileStatus fs : sourceDirs) {
+if (excludeList == null || !excludeList
+.contains(fs.getPath().toUri().getPath())) {
+  pathStack.add(fs);
+}
+  }
+  while (!pathStack.isEmpty()) {
+prepareListing(pathStack.pop().getPath());
+  }
+}
+
+private void prepareListing(Path path) throws IOException {
+  if (LOG.isDebugEnabled()) {
+LOG.debug("Recording source-path: " + path + " for copy.");
+  }
+  RemoteIterator listStatus = 
sourceFS.listStatusIterator(path);
+  while (listStatus.hasNext()) {
+  

[GitHub] [hadoop] ayushtkn commented on a change in pull request #2732: HADOOP-17531. DistCp: Reduce memory usage on copying huge directories.

2021-03-08 Thread GitBox


ayushtkn commented on a change in pull request #2732:
URL: https://github.com/apache/hadoop/pull/2732#discussion_r589927863



##
File path: 
hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java
##
@@ -688,6 +696,71 @@ private void traverseDirectory(SequenceFile.Writer 
fileListWriter,
 workers.shutdown();
   }
 
+  private void traverseDirectoryLegacy(SequenceFile.Writer fileListWriter,
+  FileSystem sourceFS, ArrayList sourceDirs,
+  Path sourcePathRoot, DistCpContext context, HashSet excludeList,
+  List fileStatuses) throws IOException {
+final boolean preserveAcls = context.shouldPreserve(FileAttribute.ACL);
+final boolean preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR);
+final boolean preserveRawXattrs = context.shouldPreserveRawXattrs();
+
+Stack pathStack = new Stack();
+for (FileStatus fs : sourceDirs) {
+  if (excludeList == null || !excludeList
+  .contains(fs.getPath().toUri().getPath())) {
+pathStack.add(fs);
+  }
+}
+
+while (!pathStack.isEmpty()) {
+  prepareListing(pathStack.pop().getPath(), fileListWriter, sourceFS,
+  sourcePathRoot, context, excludeList, fileStatuses, preserveAcls,
+  preserveXAttrs, preserveRawXattrs);
+}
+  }
+
+  @SuppressWarnings("checkstyle:parameternumber")
+  private void prepareListing(Path path, SequenceFile.Writer fileListWriter,

Review comment:
   Done, Refactored `TraverseDirectory` to a class





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] ayushtkn commented on a change in pull request #2732: HADOOP-17531. DistCp: Reduce memory usage on copying huge directories.

2021-03-08 Thread GitBox


ayushtkn commented on a change in pull request #2732:
URL: https://github.com/apache/hadoop/pull/2732#discussion_r589927746



##
File path: hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
##
@@ -362,6 +362,7 @@ Command Line Options
 | `-copybuffersize ` | Size of the copy buffer to use. By 
default, `` is set to 8192B | |
 | `-xtrack ` | Save information about missing source files to the 
specified path. | This option is only valid with `-update` option. This is an 
experimental property and it cannot be used with `-atomic` option. |
 | `-direct` | Write directly to destination paths | Useful for avoiding 
potentially very expensive temporary file rename operations when the 
destination is an object store |
+| `-useIterator` | Uses single threaded listStatusIterator to build listing | 
Useful for saving memory at the client side. |

Review comment:
   I have updated the document, Let me know if something more can be 
improved.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org



[GitHub] [hadoop] ayushtkn commented on a change in pull request #2732: HADOOP-17531. DistCp: Reduce memory usage on copying huge directories.

2021-03-02 Thread GitBox


ayushtkn commented on a change in pull request #2732:
URL: https://github.com/apache/hadoop/pull/2732#discussion_r585357165



##
File path: hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm
##
@@ -362,6 +362,7 @@ Command Line Options
 | `-copybuffersize ` | Size of the copy buffer to use. By 
default, `` is set to 8192B | |
 | `-xtrack ` | Save information about missing source files to the 
specified path. | This option is only valid with `-update` option. This is an 
experimental property and it cannot be used with `-atomic` option. |
 | `-direct` | Write directly to destination paths | Useful for avoiding 
potentially very expensive temporary file rename operations when the 
destination is an object store |
+| `-useIterator` | Uses single threaded listStatusIterator to build listing | 
Useful for saving memory at the client side. |

Review comment:
   Thanx @jojochuang for having a look.
   Yes, It indeed isn't meant for object stores, I am trying a multi threaded 
approach for object stores too as part of HADOOP-17558, that won't be too much 
memory efficient, but still find a balance between speed and memory. I have a 
WIP patch for that as well, will share that on the jira
   
   This is basically for HDFS or FS where listing is not slow, but there are 
memory constraints, my scenario is basically for DR, where it is in general 
HDFS->HDFS or HDFS->S3





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org