Repository: hive Updated Branches: refs/heads/master 26cdbe044 -> 47b759f84
HIVE-13933 : Add an option to turn off parallel file moves (Ashutosh Chauhan via Hari Sankar Subramaniyan) Signed-off-by: Ashutosh Chauhan <hashut...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/47b759f8 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/47b759f8 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/47b759f8 Branch: refs/heads/master Commit: 47b759f84b876bc7b6dc92f0824546baadd6c69f Parents: 26cdbe0 Author: Ashutosh Chauhan <hashut...@apache.org> Authored: Thu Jun 2 16:07:53 2016 -0700 Committer: Ashutosh Chauhan <hashut...@apache.org> Committed: Mon Jun 6 08:12:36 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 4 +- .../apache/hadoop/hive/ql/metadata/Hive.java | 215 +++++++++++-------- 2 files changed, 131 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/47b759f8/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index db2382d..d06aa44 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2399,8 +2399,8 @@ public class HiveConf extends Configuration { HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile", "Comma separated list of non-SQL Hive commands users are authorized to execute"), - HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 25, new SizeValidator(1L, true, 1024L, true), "Number of threads" - + " used to move files in move task"), + HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 25, new SizeValidator(0L, true, 1024L, true), "Number of threads" + + " used to move files in move task. Set it to 0 to disable multi-threaded file moves."), // If this is set all move tasks at the end of a multi-insert query will only begin once all // outputs are ready HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES( http://git-wip-us.apache.org/repos/asf/hive/blob/47b759f8/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 3a7d3bb..16d9b03 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -2647,10 +2647,9 @@ private void constructOneLBLocationMap(FileStatus fSta, final boolean inheritPerms = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); final List<Future<ObjectPair<Path, Path>>> futures = new LinkedList<>(); - final ExecutorService pool = Executors.newFixedThreadPool( - conf.getIntVar(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MoveDir-Thread-%d").build()); - + final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? + Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()) : null; for (FileStatus src : srcs) { FileStatus[] files; if (src.isDirectory()) { @@ -2666,29 +2665,27 @@ private void constructOneLBLocationMap(FileStatus fSta, final SessionState parentSession = SessionState.get(); for (final FileStatus srcFile : files) { + final Path srcP = srcFile.getPath(); + final boolean needToCopy = needToCopy(srcP, destf, srcFs, destFs); + // Strip off the file type, if any so we don't make: + // 000000_0.gz -> 000000_0.gz_copy_1 + final String name; + final String filetype; + String itemName = srcP.getName(); + int index = itemName.lastIndexOf('.'); + if (index >= 0) { + filetype = itemName.substring(index); + name = itemName.substring(0, index); + } else { + name = itemName; + filetype = ""; + } - futures.add(pool.submit(new Callable<ObjectPair<Path, Path>>() { - @Override - public ObjectPair<Path, Path> call() throws Exception { - SessionState.setCurrentSessionState(parentSession); - final Path srcP = srcFile.getPath(); - final boolean needToCopy = needToCopy(srcP, destf, srcFs, destFs); - // Strip off the file type, if any so we don't make: - // 000000_0.gz -> 000000_0.gz_copy_1 - final String name; - final String filetype; - String itemName = srcP.getName(); - int index = itemName.lastIndexOf('.'); - if (index >= 0) { - filetype = itemName.substring(index); - name = itemName.substring(0, index); - } else { - name = itemName; - filetype = ""; - } + final String srcGroup = srcFile.getGroup(); + if (null == pool) { + Path destPath = new Path(destf, srcP.getName()); + try { - Path destPath = new Path(destf, srcP.getName()); - String srcGroup = srcFile.getGroup(); if (!needToCopy && !isSrcLocal) { for (int counter = 1; !destFs.rename(srcP,destPath); counter++) { destPath = new Path(destf, name + ("_copy_" + counter) + filetype); @@ -2696,27 +2693,59 @@ private void constructOneLBLocationMap(FileStatus fSta, } else { destPath = mvFile(conf, srcP, destPath, isSrcLocal, srcFs, destFs, name, filetype); } - - if (inheritPerms) { - HdfsUtils.setFullFileStatus(conf, fullDestStatus, srcGroup, destFs, destPath, false); - } if (null != newFiles) { newFiles.add(destPath); } - return ObjectPair.create(srcP, destPath); + } catch (IOException ioe) { + LOG.error("Failed to move: {}", ioe.getMessage()); + throw new HiveException(ioe.getCause()); } - })); + } else { + futures.add(pool.submit(new Callable<ObjectPair<Path, Path>>() { + @Override + public ObjectPair<Path, Path> call() throws Exception { + SessionState.setCurrentSessionState(parentSession); + Path destPath = new Path(destf, srcP.getName()); + if (!needToCopy && !isSrcLocal) { + for (int counter = 1; !destFs.rename(srcP,destPath); counter++) { + destPath = new Path(destf, name + ("_copy_" + counter) + filetype); + } + } else { + destPath = mvFile(conf, srcP, destPath, isSrcLocal, srcFs, destFs, name, filetype); + } + + if (inheritPerms) { + HdfsUtils.setFullFileStatus(conf, fullDestStatus, srcGroup, destFs, destPath, false); + } + if (null != newFiles) { + newFiles.add(destPath); + } + return ObjectPair.create(srcP, destPath); + } + })); + } } } - pool.shutdown(); - for (Future<ObjectPair<Path, Path>> future : futures) { - try { - ObjectPair<Path, Path> pair = future.get(); - LOG.debug("Moved src: {}", pair.getFirst().toString(), ", to dest: {}", pair.getSecond().toString()); - } catch (Exception e) { - LOG.error("Failed to move: {}", e.getMessage()); - pool.shutdownNow(); - throw new HiveException(e.getCause()); + if (null == pool) { + if (inheritPerms) { + try { + HdfsUtils.setFullFileStatus(conf, fullDestStatus, null, destFs, destf, true); + } catch (IOException e) { + LOG.error("Failed to move: {}", e.getMessage()); + throw new HiveException(e.getCause()); + } + } + } else { + pool.shutdown(); + for (Future<ObjectPair<Path, Path>> future : futures) { + try { + ObjectPair<Path, Path> pair = future.get(); + LOG.debug("Moved src: {}", pair.getFirst().toString(), ", to dest: {}", pair.getSecond().toString()); + } catch (Exception e) { + LOG.error("Failed to move: {}", e.getMessage()); + pool.shutdownNow(); + throw new HiveException(e.getCause()); + } } } } @@ -2862,42 +2891,51 @@ private void constructOneLBLocationMap(FileStatus fSta, FileStatus[] srcs = destFs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER); List<Future<Void>> futures = new LinkedList<>(); - final ExecutorService pool = Executors.newFixedThreadPool( - conf.getIntVar(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MoveDir-Thread-%d").build()); + final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? + Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()) : null; /* Move files one by one because source is a subdirectory of destination */ - for (final FileStatus status : srcs) { - futures.add(pool.submit(new Callable<Void>() { - @Override - public Void call() throws Exception { - SessionState.setCurrentSessionState(parentSession); - Path destPath = new Path(destf, status.getPath().getName()); - String group = status.getGroup(); - try { - if(destFs.rename(status.getPath(), destf)) { + for (final FileStatus srcStatus : srcs) { + + if (null == pool) { + if(!destFs.rename(srcStatus.getPath(), destf)) { + throw new IOException("rename for src path: " + srcStatus.getPath() + " to dest:" + + destf + " returned false"); + } + } else { + futures.add(pool.submit(new Callable<Void>() { + @Override + public Void call() throws Exception { + SessionState.setCurrentSessionState(parentSession); + final Path destPath = new Path(destf, srcStatus.getPath().getName()); + final String group = srcStatus.getGroup(); + if(destFs.rename(srcStatus.getPath(), destf)) { if (inheritPerms) { HdfsUtils.setFullFileStatus(conf, desiredStatus, group, destFs, destPath, false); } } else { - throw new IOException("rename for src path: " + status.getPath() + " to dest path:" + throw new IOException("rename for src path: " + srcStatus.getPath() + " to dest path:" + destPath + " returned false"); } - } catch (IOException ioe) { - LOG.error("Failed to rename/set permissions. Src path: {} Dest path: {}", status.getPath(), destPath); - throw ioe; + return null; } - return null; - } - })); + })); + } } - pool.shutdown(); - for (Future<Void> future : futures) { - try { - future.get(); - } catch (Exception e) { - LOG.debug(e.getMessage()); - pool.shutdownNow(); - throw new HiveException(e.getCause()); + if (null == pool) { + if (inheritPerms) { + HdfsUtils.setFullFileStatus(conf, desiredStatus, null, destFs, destf, true); + } + } else { + pool.shutdown(); + for (Future<Void> future : futures) { + try { + future.get(); + } catch (Exception e) { + LOG.debug(e.getMessage()); + pool.shutdownNow(); + throw new HiveException(e.getCause()); + } } } return true; @@ -3182,28 +3220,33 @@ private void constructOneLBLocationMap(FileStatus fSta, FileStatus[] statuses = fs.listStatus(f, FileUtils.HIDDEN_FILES_PATH_FILTER); boolean result = true; final List<Future<Boolean>> futures = new LinkedList<>(); - final ExecutorService pool = Executors.newFixedThreadPool( - conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Delete-Thread-%d").build()); + final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? + Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Delete-Thread-%d").build()) : null; final SessionState parentSession = SessionState.get(); for (final FileStatus status : statuses) { - futures.add(pool.submit(new Callable<Boolean>() { - - @Override - public Boolean call() throws Exception { - SessionState.setCurrentSessionState(parentSession); - return FileUtils.moveToTrash(fs, status.getPath(), conf); - } - })); + if (null == pool) { + result &= FileUtils.moveToTrash(fs, status.getPath(), conf); + } else { + futures.add(pool.submit(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + SessionState.setCurrentSessionState(parentSession); + return FileUtils.moveToTrash(fs, status.getPath(), conf); + } + })); + } } - pool.shutdown(); - for (Future<Boolean> future : futures) { - try { - result &= future.get(); - } catch (InterruptedException | ExecutionException e) { - LOG.error("Failed to delete: ",e); - pool.shutdownNow(); - throw new IOException(e); + if (null != pool) { + pool.shutdown(); + for (Future<Boolean> future : futures) { + try { + result &= future.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Failed to delete: ",e); + pool.shutdownNow(); + throw new IOException(e); + } } } return result;