Repository: hive
Updated Branches:
  refs/heads/master 26cdbe044 -> 47b759f84


HIVE-13933 : Add an option to turn off parallel file moves (Ashutosh Chauhan 
via Hari Sankar Subramaniyan)

Signed-off-by: Ashutosh Chauhan <hashut...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/47b759f8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/47b759f8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/47b759f8

Branch: refs/heads/master
Commit: 47b759f84b876bc7b6dc92f0824546baadd6c69f
Parents: 26cdbe0
Author: Ashutosh Chauhan <hashut...@apache.org>
Authored: Thu Jun 2 16:07:53 2016 -0700
Committer: Ashutosh Chauhan <hashut...@apache.org>
Committed: Mon Jun 6 08:12:36 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   4 +-
 .../apache/hadoop/hive/ql/metadata/Hive.java    | 215 +++++++++++--------
 2 files changed, 131 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/47b759f8/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index db2382d..d06aa44 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2399,8 +2399,8 @@ public class HiveConf extends Configuration {
     HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", 
"set,reset,dfs,add,list,delete,reload,compile",
         "Comma separated list of non-SQL Hive commands users are authorized to 
execute"),
 
-     HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 25, new  
SizeValidator(1L, true, 1024L, true), "Number of threads"
-         + " used to move files in move task"),
+     HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 25, new  
SizeValidator(0L, true, 1024L, true), "Number of threads"
+         + " used to move files in move task. Set it to 0 to disable 
multi-threaded file moves."),
     // If this is set all move tasks at the end of a multi-insert query will 
only begin once all
     // outputs are ready
     HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES(

http://git-wip-us.apache.org/repos/asf/hive/blob/47b759f8/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 3a7d3bb..16d9b03 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -2647,10 +2647,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
     final boolean inheritPerms = HiveConf.getBoolVar(conf,
         HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
     final List<Future<ObjectPair<Path, Path>>> futures = new LinkedList<>();
-    final ExecutorService pool = Executors.newFixedThreadPool(
-        conf.getIntVar(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT),
-        new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("MoveDir-Thread-%d").build());
-
+    final ExecutorService pool = 
conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
+        
Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname,
 25),
+        new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()) 
: null;
     for (FileStatus src : srcs) {
       FileStatus[] files;
       if (src.isDirectory()) {
@@ -2666,29 +2665,27 @@ private void constructOneLBLocationMap(FileStatus fSta,
 
       final SessionState parentSession = SessionState.get();
       for (final FileStatus srcFile : files) {
+        final Path srcP = srcFile.getPath();
+        final boolean needToCopy = needToCopy(srcP, destf, srcFs, destFs);
+        // Strip off the file type, if any so we don't make:
+        // 000000_0.gz -> 000000_0.gz_copy_1
+        final String name;
+        final String filetype;
+        String itemName = srcP.getName();
+        int index = itemName.lastIndexOf('.');
+        if (index >= 0) {
+          filetype = itemName.substring(index);
+          name = itemName.substring(0, index);
+        } else {
+          name = itemName;
+          filetype = "";
+        }
 
-        futures.add(pool.submit(new Callable<ObjectPair<Path, Path>>() {
-          @Override
-          public ObjectPair<Path, Path> call() throws Exception {
-            SessionState.setCurrentSessionState(parentSession);
-            final Path srcP = srcFile.getPath();
-            final boolean needToCopy = needToCopy(srcP, destf, srcFs, destFs);
-            // Strip off the file type, if any so we don't make:
-            // 000000_0.gz -> 000000_0.gz_copy_1
-            final String name;
-            final String filetype;
-            String itemName = srcP.getName();
-            int index = itemName.lastIndexOf('.');
-            if (index >= 0) {
-              filetype = itemName.substring(index);
-              name = itemName.substring(0, index);
-            } else {
-              name = itemName;
-              filetype = "";
-            }
+        final String srcGroup = srcFile.getGroup();
+        if (null == pool) {
+          Path destPath = new Path(destf, srcP.getName());
+          try {
 
-            Path destPath = new Path(destf, srcP.getName());
-            String srcGroup = srcFile.getGroup();
             if (!needToCopy && !isSrcLocal) {
               for (int counter = 1; !destFs.rename(srcP,destPath); counter++) {
                 destPath = new Path(destf, name + ("_copy_" + counter) + 
filetype);
@@ -2696,27 +2693,59 @@ private void constructOneLBLocationMap(FileStatus fSta,
             } else {
               destPath = mvFile(conf, srcP, destPath, isSrcLocal, srcFs, 
destFs, name, filetype);
             }
-
-            if (inheritPerms) {
-              HdfsUtils.setFullFileStatus(conf, fullDestStatus, srcGroup, 
destFs, destPath, false);
-            }
             if (null != newFiles) {
               newFiles.add(destPath);
             }
-            return ObjectPair.create(srcP, destPath);
+          } catch (IOException ioe) {
+            LOG.error("Failed to move: {}", ioe.getMessage());
+            throw new HiveException(ioe.getCause());
           }
-        }));
+        } else {
+          futures.add(pool.submit(new Callable<ObjectPair<Path, Path>>() {
+            @Override
+            public ObjectPair<Path, Path> call() throws Exception {
+              SessionState.setCurrentSessionState(parentSession);
+              Path destPath = new Path(destf, srcP.getName());
+              if (!needToCopy && !isSrcLocal) {
+                for (int counter = 1; !destFs.rename(srcP,destPath); 
counter++) {
+                  destPath = new Path(destf, name + ("_copy_" + counter) + 
filetype);
+                }
+              } else {
+                destPath = mvFile(conf, srcP, destPath, isSrcLocal, srcFs, 
destFs, name, filetype);
+              }
+
+              if (inheritPerms) {
+                HdfsUtils.setFullFileStatus(conf, fullDestStatus, srcGroup, 
destFs, destPath, false);
+              }
+              if (null != newFiles) {
+                newFiles.add(destPath);
+              }
+              return ObjectPair.create(srcP, destPath);
+            }
+          }));
+        }
       }
     }
-    pool.shutdown();
-    for (Future<ObjectPair<Path, Path>> future : futures) {
-      try {
-        ObjectPair<Path, Path> pair = future.get();
-        LOG.debug("Moved src: {}", pair.getFirst().toString(), ", to dest: 
{}", pair.getSecond().toString());
-      } catch (Exception e) {
-        LOG.error("Failed to move: {}", e.getMessage());
-        pool.shutdownNow();
-        throw new HiveException(e.getCause());
+    if (null == pool) {
+      if (inheritPerms) {
+        try {
+          HdfsUtils.setFullFileStatus(conf, fullDestStatus, null, destFs, 
destf, true);
+        } catch (IOException e) {
+          LOG.error("Failed to move: {}", e.getMessage());
+          throw new HiveException(e.getCause());
+        }
+      }
+    } else {
+      pool.shutdown();
+      for (Future<ObjectPair<Path, Path>> future : futures) {
+        try {
+          ObjectPair<Path, Path> pair = future.get();
+          LOG.debug("Moved src: {}", pair.getFirst().toString(), ", to dest: 
{}", pair.getSecond().toString());
+        } catch (Exception e) {
+          LOG.error("Failed to move: {}", e.getMessage());
+          pool.shutdownNow();
+          throw new HiveException(e.getCause());
+        }
       }
     }
   }
@@ -2862,42 +2891,51 @@ private void constructOneLBLocationMap(FileStatus fSta,
             FileStatus[] srcs = destFs.listStatus(srcf, 
FileUtils.HIDDEN_FILES_PATH_FILTER);
 
             List<Future<Void>> futures = new LinkedList<>();
-            final ExecutorService pool = Executors.newFixedThreadPool(
-                conf.getIntVar(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT),
-                new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("MoveDir-Thread-%d").build());
+            final ExecutorService pool = 
conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
+                
Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname,
 25),
+                new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()) 
: null;
             /* Move files one by one because source is a subdirectory of 
destination */
-            for (final FileStatus status : srcs) {
-              futures.add(pool.submit(new Callable<Void>() {
-                @Override
-                public Void call() throws Exception {
-                  SessionState.setCurrentSessionState(parentSession);
-                  Path destPath = new Path(destf, status.getPath().getName());
-                  String group = status.getGroup();
-                  try {
-                    if(destFs.rename(status.getPath(), destf)) {
+            for (final FileStatus srcStatus : srcs) {
+
+              if (null == pool) {
+                if(!destFs.rename(srcStatus.getPath(), destf)) {
+                  throw new IOException("rename for src path: " + 
srcStatus.getPath() + " to dest:"
+                      + destf + " returned false");
+                }
+              } else {
+                futures.add(pool.submit(new Callable<Void>() {
+                  @Override
+                  public Void call() throws Exception {
+                    SessionState.setCurrentSessionState(parentSession);
+                    final Path destPath = new Path(destf, 
srcStatus.getPath().getName());
+                    final String group = srcStatus.getGroup();
+                    if(destFs.rename(srcStatus.getPath(), destf)) {
                       if (inheritPerms) {
                         HdfsUtils.setFullFileStatus(conf, desiredStatus, 
group, destFs, destPath, false);
                       }
                     } else {
-                      throw new IOException("rename for src path: " + 
status.getPath() + " to dest path:"
+                      throw new IOException("rename for src path: " + 
srcStatus.getPath() + " to dest path:"
                           + destPath + " returned false");
                     }
-                  } catch (IOException ioe) {
-                    LOG.error("Failed to rename/set permissions. Src path: {} 
Dest path: {}", status.getPath(), destPath);
-                    throw ioe;
+                    return null;
                   }
-                  return null;
-                }
-              }));
+                }));
+              }
             }
-            pool.shutdown();
-            for (Future<Void> future : futures) {
-              try {
-                future.get();
-              } catch (Exception e) {
-                LOG.debug(e.getMessage());
-                pool.shutdownNow();
-                throw new HiveException(e.getCause());
+            if (null == pool) {
+              if (inheritPerms) {
+                HdfsUtils.setFullFileStatus(conf, desiredStatus, null, destFs, 
destf, true);
+              }
+            } else {
+              pool.shutdown();
+              for (Future<Void> future : futures) {
+                try {
+                  future.get();
+                } catch (Exception e) {
+                  LOG.debug(e.getMessage());
+                  pool.shutdownNow();
+                  throw new HiveException(e.getCause());
+                }
               }
             }
             return true;
@@ -3182,28 +3220,33 @@ private void constructOneLBLocationMap(FileStatus fSta,
     FileStatus[] statuses = fs.listStatus(f, 
FileUtils.HIDDEN_FILES_PATH_FILTER);
     boolean result = true;
     final List<Future<Boolean>> futures = new LinkedList<>();
-    final ExecutorService pool = Executors.newFixedThreadPool(
-        conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25),
-        new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("Delete-Thread-%d").build());
+    final ExecutorService pool = 
conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ?
+        
Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname,
 25),
+        new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("Delete-Thread-%d").build())
 : null;
     final SessionState parentSession = SessionState.get();
     for (final FileStatus status : statuses) {
-      futures.add(pool.submit(new Callable<Boolean>() {
-
-        @Override
-        public Boolean call() throws Exception {
-          SessionState.setCurrentSessionState(parentSession);
-          return FileUtils.moveToTrash(fs, status.getPath(), conf);
-        }
-      }));
+      if (null == pool) {
+        result &= FileUtils.moveToTrash(fs, status.getPath(), conf);
+      } else {
+        futures.add(pool.submit(new Callable<Boolean>() {
+          @Override
+          public Boolean call() throws Exception {
+            SessionState.setCurrentSessionState(parentSession);
+            return FileUtils.moveToTrash(fs, status.getPath(), conf);
+          }
+        }));
+      }
     }
-    pool.shutdown();
-    for (Future<Boolean> future : futures) {
-      try {
-        result &= future.get();
-      } catch (InterruptedException | ExecutionException e) {
-        LOG.error("Failed to delete: ",e);
-        pool.shutdownNow();
-        throw new IOException(e);
+    if (null != pool) {
+      pool.shutdown();
+      for (Future<Boolean> future : futures) {
+        try {
+          result &= future.get();
+        } catch (InterruptedException | ExecutionException e) {
+          LOG.error("Failed to delete: ",e);
+          pool.shutdownNow();
+          throw new IOException(e);
+        }
       }
     }
     return result;

Reply via email to