pkumarsinha commented on a change in pull request #1648:
URL: https://github.com/apache/hive/pull/1648#discussion_r520063304



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
##########
@@ -82,27 +87,31 @@ public void copyAndVerify(Path destRoot, 
List<ReplChangeManager.FileInfo> srcFil
     }
     FileSystem sourceFs = srcFiles.get(0).getSrcFs();
     boolean useRegularCopy = regularCopy(sourceFs, srcFiles);
+    ExecutorService executorService = null;
     try {
       if (useRegularCopy || readSrcAsFilesList) {
+        executorService = Executors.newFixedThreadPool(maxParallelCopyTask);

Review comment:
       When readSrcAsFilesList is true, should we not always use regular copy 
as anyway currently going discp way has adverse affect and no benefit.

##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -647,6 +647,9 @@ private static void populateLlapDaemonVarsSet(Set<String> 
llapDaemonVarsSetLocal
       "Provide the maximum number of partitions of a table that will be 
batched together during  \n"
         + "repl load. All the partitions in a batch will make a single 
metastore call to update the metadata. \n"
         + "The data for these partitions will be copied before copying the 
metadata batch. "),
+    REPL_PARALLEL_COPY_TASKS("hive.repl.parallel.copy.tasks",1000,

Review comment:
       Do we also support a case where multiple parallel ReplCopyTask will be 
launched in bootstrap case (in existing code I mean)?
    Another thing is, how does this 'hive.repl.parallel.copy.tasks'  fares with 
multiple concurrent policies? Like, say some customer sets  this config to 5k 
and we have say 40 concurrent policies. It would result in 200K threads 
concurrently (excluding other threads). Do you see any issue with that?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
##########
@@ -82,27 +87,31 @@ public void copyAndVerify(Path destRoot, 
List<ReplChangeManager.FileInfo> srcFil
     }
     FileSystem sourceFs = srcFiles.get(0).getSrcFs();
     boolean useRegularCopy = regularCopy(sourceFs, srcFiles);
+    ExecutorService executorService = null;
     try {
       if (useRegularCopy || readSrcAsFilesList) {
+        executorService = Executors.newFixedThreadPool(maxParallelCopyTask);

Review comment:
       Add tests.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
##########
@@ -112,11 +121,34 @@ public void copyAndVerify(Path destRoot, 
List<ReplChangeManager.FileInfo> srcFil
         srcFiles.add(new ReplChangeManager.FileInfo(sourceFs, origSrcPath, 
null));
         doCopyRetry(sourceFs, srcFiles, destRoot, proxyUser, useRegularCopy, 
overwrite);
       }
+    } catch (InterruptedException e) {
+      LOG.error("Failed to copy ", e);
+      throw new 
IOException(ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getMsg());
     } finally {
       if (proxyUser != null) {
         FileSystem.closeAllForUGI(proxyUser);
       }
+      if (executorService != null) {

Review comment:
       This is prone to Thread leak. Make sure this is always executed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to