This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch branch-3.2.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit c5c158df7319c55e47b9c88d424b52720af4aaa1
Author: jianghuazhu <740087...@qq.com>
AuthorDate: Fri Aug 27 11:41:44 2021 +0800

    HDFS-16173.Improve CopyCommands#Put#executor queue configurability. (#3302)
    
    Co-authored-by: zhujianghua <zhujianghua@zhujianghuadeMacBook-Pro.local>
    Reviewed-by: Hui Fei <fer...@apache.org>
    Reviewed-by: Viraj Jasani <vjas...@apache.org>
    (cherry picked from commit 4c94831364e9258247029c22a222a665771ab4c0)
    (cherry picked from commit 7c663043b2c1e207dd8c05e09e17811d68badfac)
    (cherry picked from commit dac74b0e115fa466467aa1de2a71fb8704a04639)
---
 .../org/apache/hadoop/fs/shell/CopyCommands.java   | 34 ++++++++++++++++++++--
 .../src/site/markdown/FileSystemShell.md           |  4 ++-
 .../hadoop/fs/shell/TestCopyPreserveFlag.java      | 16 ++++++++++
 .../hadoop-common/src/test/resources/testConf.xml  | 18 +++++++-----
 4 files changed, 62 insertions(+), 10 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
index f296712..ea79e4e 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CopyCommands.java
@@ -37,6 +37,8 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIsDirectoryException;
 import org.apache.hadoop.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Various commands for copy files */
 @InterfaceAudience.Private
@@ -238,7 +240,11 @@ class CopyCommands {
    *  Copy local files to a remote filesystem
    */
   public static class Put extends CommandWithDestination {
+
+    public static final Logger LOG = LoggerFactory.getLogger(Put.class);
+
     private ThreadPoolExecutor executor = null;
+    private int threadPoolQueueSize = 1024;
     private int numThreads = 1;
 
     private static final int MAX_THREADS =
@@ -246,7 +252,8 @@ class CopyCommands {
 
     public static final String NAME = "put";
     public static final String USAGE =
-        "[-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>";
+        "[-f] [-p] [-l] [-d] [-t <thread count>] [-q <threadPool queue size>] 
" +
+        "<localsrc> ... <dst>";
     public static final String DESCRIPTION =
         "Copy files from the local file system " +
         "into fs. Copying fails if the file already " +
@@ -255,6 +262,8 @@ class CopyCommands {
         "  -p : Preserves timestamps, ownership and the mode.\n" +
         "  -f : Overwrites the destination if it already exists.\n" +
         "  -t <thread count> : Number of threads to be used, default is 1.\n" +
+        "  -q <threadPool size> : ThreadPool queue size to be used, " +
+        "default is 1024.\n" +
         "  -l : Allow DataNode to lazily persist the file to disk. Forces" +
         "  replication factor of 1. This flag will result in reduced" +
         "  durability. Use with care.\n" +
@@ -265,8 +274,10 @@ class CopyCommands {
       CommandFormat cf =
           new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l", "d");
       cf.addOptionWithValue("t");
+      cf.addOptionWithValue("q");
       cf.parse(args);
       setNumberThreads(cf.getOptValue("t"));
+      setThreadPoolQueueSize(cf.getOptValue("q"));
       setOverwrite(cf.getOpt("f"));
       setPreserve(cf.getOpt("p"));
       setLazyPersist(cf.getOpt("l"));
@@ -298,7 +309,7 @@ class CopyCommands {
       }
 
       executor = new ThreadPoolExecutor(numThreads, numThreads, 1,
-          TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024),
+          TimeUnit.SECONDS, new ArrayBlockingQueue<>(threadPoolQueueSize),
           new ThreadPoolExecutor.CallerRunsPolicy());
       super.processArguments(args);
 
@@ -328,6 +339,25 @@ class CopyCommands {
       }
     }
 
+    private void setThreadPoolQueueSize(String numThreadPoolQueueSize) {
+      if (numThreadPoolQueueSize != null) {
+        int parsedValue = Integer.parseInt(numThreadPoolQueueSize);
+        if (parsedValue < 1) {
+          LOG.warn("The value of the thread pool queue size cannot be " +
+              "less than 1, and the default value is used here. " +
+              "The default size is 1024.");
+          threadPoolQueueSize = 1024;
+        } else {
+          threadPoolQueueSize = parsedValue;
+        }
+      }
+    }
+
+    @VisibleForTesting
+    protected int getThreadPoolQueueSize() {
+      return threadPoolQueueSize;
+    }
+
     private void copyFile(PathData src, PathData target) throws IOException {
       if (isPathRecursable(src)) {
         throw new PathIsDirectoryException(src.toString());
diff --git 
a/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md 
b/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md
index a2c07a9..e8dca94 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/FileSystemShell.md
@@ -509,7 +509,7 @@ Returns 0 on success and -1 on error.
 put
 ---
 
-Usage: `hadoop fs -put  [-f] [-p] [-l] [-d] [-t <thread count>] [ - | 
<localsrc1>  .. ]. <dst>`
+Usage: `hadoop fs -put  [-f] [-p] [-l] [-d] [-t <thread count>] [-q 
<threadPool queue size>] [ - | <localsrc1>  .. ]. <dst>`
 
 Copy single src, or multiple srcs from local file system to the destination 
file system.
 Also reads input from stdin and writes to destination file system if the 
source is set to "-"
@@ -526,6 +526,7 @@ Options:
 * `-l` : Allow DataNode to lazily persist the file to disk, Forces a 
replication
  factor of 1. This flag will result in reduced durability. Use with care.
 * `-d` : Skip creation of temporary file with the suffix `._COPYING_`.
+* `-q <threadPool queue size>` : ThreadPool queue size to be used, default is 
1024.
 
 
 Examples:
@@ -534,6 +535,7 @@ Examples:
 * `hadoop fs -put -f localfile1 localfile2 /user/hadoop/hadoopdir`
 * `hadoop fs -put -d localfile hdfs://nn.example.com/hadoop/hadoopfile`
 * `hadoop fs -put - hdfs://nn.example.com/hadoop/hadoopfile` Reads the input 
from stdin.
+* `hadoop fs -put -q 500 localfile3 hdfs://nn.example.com/hadoop/hadoopfile3`
 
 Exit Code:
 
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java
index 8d2e160..0f0ddcc 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopyPreserveFlag.java
@@ -123,6 +123,22 @@ public class TestCopyPreserveFlag {
   }
 
   @Test(timeout = 10000)
+  public void testPutWithPQ() throws Exception {
+    Put put = new Put();
+    run(put, "-p", "-q", "100", FROM.toString(), TO.toString());
+    assertEquals(put.getThreadPoolQueueSize(), 100);
+    assertAttributesPreserved(TO);
+  }
+
+  @Test(timeout = 10000)
+  public void testPutWithQ() throws Exception {
+    Put put = new Put();
+    run(put, "-q", "100", FROM.toString(), TO.toString());
+    assertEquals(put.getThreadPoolQueueSize(), 100);
+    assertAttributesChanged(TO);
+  }
+
+  @Test(timeout = 10000)
   public void testPutWithSplCharacter() throws Exception {
     fs.mkdirs(DIR_FROM_SPL);
     fs.createNewFile(FROM_SPL);
diff --git 
a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml 
b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml
index 6f18378..f9a80d7 100644
--- a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml
+++ b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml
@@ -498,7 +498,7 @@
           <type>RegexpComparator</type>
           <comparator>
             <type>RegexpComparator</type>
-            <expected-output>^-put \[-f\] \[-p\] \[-l\] \[-d\] \[-t &lt;thread 
count&gt;\] &lt;localsrc&gt; \.\.\. &lt;dst&gt; :\s*</expected-output>
+            <expected-output>^-put \[-f\] \[-p\] \[-l\] \[-d\] \[-t &lt;thread 
count&gt;\] \[-q &lt;threadPool queue size&gt;\] &lt;localsrc&gt; \.\.\. 
&lt;dst&gt; :\s*</expected-output>
           </comparator>
         </comparator>
         <comparator>
@@ -515,19 +515,23 @@
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^\s*-p                 Preserves timestamps, 
ownership and the mode.( )*</expected-output>
+          <expected-output>^\s*-p                    Preserves timestamps, 
ownership and the mode.( )*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^\s*-f                 Overwrites the destination 
if it already exists.( )*</expected-output>
+          <expected-output>^\s*-f                    Overwrites the 
destination if it already exists.( )*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^\s*-t &lt;thread count&gt;  Number of threads to 
be used, default is 1.( )*</expected-output>
+          <expected-output>^\s*-t &lt;thread count&gt;     Number of threads 
to be used, default is 1.( )*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^\s*-l                 Allow DataNode to lazily 
persist the file to disk. Forces( )*</expected-output>
+          <expected-output>^\s*-q &lt;threadPool size&gt;  ThreadPool queue 
size to be used, default is 1024.( )*</expected-output>
+        </comparator>
+        <comparator>
+          <type>RegexpComparator</type>
+          <expected-output>^\s*-l                    Allow DataNode to lazily 
persist the file to disk. Forces( )*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
@@ -539,7 +543,7 @@
         </comparator>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^\s*-d                 Skip creation of temporary 
file\(&lt;dst&gt;\._COPYING_\).( )*</expected-output>
+          <expected-output>^\s*-d                    Skip creation of 
temporary file\(&lt;dst&gt;\._COPYING_\).( )*</expected-output>
         </comparator>
       </comparators>
     </test>
@@ -554,7 +558,7 @@
       <comparators>
         <comparator>
           <type>RegexpComparator</type>
-          <expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] \[-t 
&lt;thread count&gt;\] &lt;localsrc&gt; \.\.\. &lt;dst&gt; 
:\s*</expected-output>
+          <expected-output>^-copyFromLocal \[-f\] \[-p\] \[-l\] \[-d\] \[-t 
&lt;thread count&gt;\] \[-q &lt;threadPool queue size&gt;\] &lt;localsrc&gt; 
\.\.\. &lt;dst&gt; :\s*</expected-output>
         </comparator>
         <comparator>
           <type>RegexpComparator</type>

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to