This is an automated email from the ASF dual-hosted git repository. daijy pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 43491dbd75b HIVE-27143: Optimize HCatStorer moveTask (#4177) 43491dbd75b is described below commit 43491dbd75b83daa755438eb6f43cf6e6b47b1c1 Author: yigress <104102129+yigr...@users.noreply.github.com> AuthorDate: Mon Apr 10 17:19:38 2023 -0700 HIVE-27143: Optimize HCatStorer moveTask (#4177) * HIVE-27143: Optimize HCatStorer moveTask * fix custom dynamic partition --- .../mapreduce/FileOutputCommitterContainer.java | 230 +++++++++++---------- 1 file changed, 123 insertions(+), 107 deletions(-) diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java index ef3c1afc457..476c60e53af 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java @@ -19,23 +19,34 @@ package org.apache.hive.hcatalog.mapreduce; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.net.URI; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; - +import java.util.Queue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.io.HdfsUtils; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.Warehouse; @@ -225,6 +236,15 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { } } + public static final PathFilter HIDDEN_FILES_PATH_FILTER = new PathFilter() { + @Override + public boolean accept(Path p) { + String name = p.getName(); + boolean filtered = name.equals(TEMP_DIR_NAME) || name.equals(LOGS_DIR_NAME) || name.equals(SUCCEEDED_FILE_NAME); + return !filtered; + } + }; + public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = "mapreduce.fileoutputcommitter.marksuccessfuljobs"; @@ -367,10 +387,11 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { partPath = new Path(finalLocn); } else { partPath = new Path(partLocnRoot); + FileSystem partFs = partPath.getFileSystem(context.getConfiguration()); int i = 0; for (FieldSchema partKey : table.getPartitionKeys()) { if (i++ != 0) { - fs.mkdirs(partPath); // Attempt to make the path in case it does not exist before we check + partFs.mkdirs(partPath); // Attempt to make the path in case it does not exist before we check HdfsUtils.setFullFileStatus(conf, status, status.getFileStatus().getGroup(), fs, partPath, false); } @@ -380,7 +401,8 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { // Do not need to set the status on the partition directory. We will do it later recursively. // See: end of the registerPartitions method - fs.mkdirs(partPath); + FileSystem partFs = partPath.getFileSystem(context.getConfiguration()); + partFs.mkdirs(partPath); // Set the location in the StorageDescriptor if (dynamicPartitioningUsed) { @@ -467,131 +489,129 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { /** * Move all of the files from the temp directory to the final location - * @param fs the output file system - * @param file the file to move + * @param srcf the file to move * @param srcDir the source directory * @param destDir the target directory - * @param dryRun - a flag that simply tests if this move would succeed or not based - * on whether other files exist where we're trying to copy + * @param immutable - whether table is immutable. * @throws java.io.IOException */ - private void moveTaskOutputs(FileSystem fs, Path file, Path srcDir, - Path destDir, final boolean dryRun, boolean immutable - ) throws IOException { + private void moveTaskOutputs(final Configuration conf, Path srcf, Path srcDir, + Path destDir, boolean immutable) throws IOException { if(LOG.isDebugEnabled()) { LOG.debug("moveTaskOutputs " - + file + " from: " + srcDir + " to: " + destDir - + " dry: " + dryRun + " immutable: " + immutable); + + srcf + " from: " + srcDir + " to: " + destDir + " immutable: " + immutable); } if (dynamicPartitioningUsed) { immutable = true; // Making sure we treat dynamic partitioning jobs as if they were immutable. } - if (file.getName().equals(TEMP_DIR_NAME) || file.getName().equals(LOGS_DIR_NAME) || file.getName().equals(SUCCEEDED_FILE_NAME)) { - return; - } + final FileSystem srcFs = srcf.getFileSystem(conf); + final FileSystem destFs = destDir.getFileSystem(conf); + final boolean canRename = srcFs.getUri().equals(destFs.getUri()); - final Path finalOutputPath = getFinalPath(fs, file, srcDir, destDir, immutable); - FileStatus fileStatus = fs.getFileStatus(file); + if (destFs.exists(destDir) && !destFs.getFileStatus(destDir).isDirectory()) { + throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Destination is not directory " + destDir); + } - if (!fileStatus.isDir()) { - if (dryRun){ - if (immutable){ - // Dryrun checks are meaningless for mutable table - we should always succeed - // unless there is a runtime IOException. - LOG.debug("Testing if moving file: [{}] to [{}] would cause a problem", file, finalOutputPath); - if (fs.exists(finalOutputPath)) { - throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " - + finalOutputPath + ", duplicate publish not possible."); - } - } - } else { - LOG.debug("Moving file: [{}] to [{}]", file, finalOutputPath); - // Make sure the parent directory exists. It is not an error - // to recreate an existing directory - fs.mkdirs(finalOutputPath.getParent()); - if (!fs.rename(file, finalOutputPath)) { - if (!fs.delete(finalOutputPath, true)) { - throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to delete existing path " + finalOutputPath); - } - if (!fs.rename(file, finalOutputPath)) { - throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Failed to move output to " + finalOutputPath); - } - } + LinkedList<Pair<Path, Path>> moves = new LinkedList<>(); + if (customDynamicLocationUsed) { + if (immutable && destFs.exists(destDir) && + !org.apache.hadoop.hive.metastore.utils.FileUtils.isDirEmpty(destFs, destDir)) { + throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION, + "Data already exists in " + destDir + + ", duplicate publish not possible."); } + moves.add(Pair.of(srcf, destDir)); } else { - - FileStatus[] children = fs.listStatus(file); - FileStatus firstChild = null; - if (children != null) { - int index=0; - while (index < children.length) { - if ( !children[index].getPath().getName().equals(TEMP_DIR_NAME) - && !children[index].getPath().getName().equals(LOGS_DIR_NAME) - && !children[index].getPath().getName().equals(SUCCEEDED_FILE_NAME)) { - firstChild = children[index]; - break; - } - index++; - } + Queue<FileStatus> srcQ = new LinkedList<>(); + FileStatus[] contents = srcFs.listStatus(srcf, HIDDEN_FILES_PATH_FILTER); + if (contents.length == 0) { + // nothing to move + return; } - if(firstChild!=null && firstChild.isDir()) { - // If the first child is directory, then rest would be directory too according to HCatalog dir structure - // recurse in that case - for (FileStatus child : children) { - moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun, immutable); + Collections.addAll(srcQ, contents); + + while (!srcQ.isEmpty()) { + FileStatus srcStatus = srcQ.remove(); + Path srcF = srcStatus.getPath(); + final Path finalOutputPath = getFinalPath(destFs, srcF, srcDir, destDir, immutable); + if (immutable && destFs.exists(finalOutputPath) && + !org.apache.hadoop.hive.metastore.utils.FileUtils.isDirEmpty(destFs, finalOutputPath)) { + throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION, + "Data already exists in " + finalOutputPath + + ", duplicate publish not possible."); } - } else { - - if (!dryRun) { - if (dynamicPartitioningUsed) { - - // Optimization: if the first child is file, we have reached the leaf directory, move the parent directory itself - // instead of moving each file under the directory. See HCATALOG-538 - // Note for future Append implementation : This optimization is another reason dynamic - // partitioning is currently incompatible with append on mutable tables. - - final Path parentDir = finalOutputPath.getParent(); - // Create the directory - Path placeholder = new Path(parentDir, "_placeholder" + String.valueOf(Math.random())); - if (fs.mkdirs(parentDir)) { - // It is weird but we need a placeholder, - // otherwise rename cannot move file to the right place - fs.create(placeholder).close(); - } - LOG.debug("Moving directory: {} to {}", file, parentDir); - + if (srcStatus.isDirectory()) { + if (canRename && dynamicPartitioningUsed) { + // If it is partition, move the partition directory instead of each file. // If custom dynamic location provided, need to rename to final output path + final Path parentDir = finalOutputPath.getParent(); Path dstPath = !customDynamicLocationUsed ? parentDir : finalOutputPath; - if (!fs.rename(file, dstPath)) { - final String msg = "Failed to move file: " + file + " to " + dstPath; - LOG.error(msg); - throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg); - } - fs.delete(placeholder, false); + moves.add(Pair.of(srcF, dstPath)); } else { - - // In case of no partition we have to move each file - for (FileStatus child : children) { - moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun, immutable); - } - + Collections.addAll(srcQ, srcFs.listStatus(srcF, HIDDEN_FILES_PATH_FILTER)); } - } else { - if(immutable && fs.exists(finalOutputPath) && - !org.apache.hadoop.hive.metastore.utils.FileUtils.isDirEmpty(fs, finalOutputPath)) { + moves.add(Pair.of(srcF, finalOutputPath)); + } + } + } - throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION, "Data already exists in " + finalOutputPath - + ", duplicate publish not possible."); - } + if (moves.isEmpty()) { + return; + } + final List<Future<Pair<Path, Path>>> futures = new LinkedList<>(); + final ExecutorService pool = conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25) > 0 ? + Executors.newFixedThreadPool(conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Move-Thread-%d").build()) : null; + + for (final Pair<Path, Path> pair: moves){ + Path srcP = pair.getLeft(); + Path dstP = pair.getRight(); + final String msg = "Unable to move source " + srcP + " to destination " + dstP; + if (null==pool) { + moveFile(srcFs, srcP, destFs, dstP, conf, canRename); + } else { + futures.add(pool.submit(new Callable<Pair<Path, Path>>() { + @Override + public Pair<Path, Path> call() throws IOException { + if (moveFile(srcFs, srcP, destFs, dstP, conf, canRename)) { + return pair; + } else { + throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg); + } + } + })); + } + } + if (null != pool) { + pool.shutdown(); + for (Future<Pair<Path, Path>> future : futures) { + try { + Pair<Path, Path> pair = future.get(); + LOG.debug("Moved src: {}, to dest: {}", pair.getLeft().toString(), pair.getRight().toString()); + } catch (Exception e) { + LOG.error("Failed to move {}", e.getMessage()); + pool.shutdownNow(); + throw new HCatException(ErrorType.ERROR_MOVE_FAILED, e.getMessage()); } } } } + private boolean moveFile(FileSystem srcFs, Path srcf, FileSystem destFs, Path destf, Configuration conf, boolean canRename) throws IOException { + boolean moved; + if (canRename) { + destFs.mkdirs(destf.getParent()); + moved = srcFs.rename(srcf, destf); + } else { + moved = FileUtil.copy(srcFs, srcf, destFs, destf, true, false, conf); + } + return moved; + } + /** * Find the final name of a given output file, given the output directory * and the work directory. If immutable, attempt to create file of name @@ -750,7 +770,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { // Move data from temp directory the actual table directory // No metastore operation required. Path src = new Path(jobInfo.getLocation()); - moveTaskOutputs(fs, src, src, tblPath, false, table.isImmutable()); + moveTaskOutputs(conf, src, src, tblPath, table.isImmutable()); if (!src.equals(tblPath)) { fs.delete(src, true); } @@ -813,8 +833,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { // check here for each dir we're copying out, to see if it // already exists, error out if so. // Also, treat dyn-writes as writes to immutable tables. - moveTaskOutputs(fs, src, src, tblPath, true, true); // dryRun = true, immutable = true - moveTaskOutputs(fs, src, src, tblPath, false, true); + moveTaskOutputs(conf, src, src, tblPath, table.isImmutable()); if (!src.equals(tblPath)){ fs.delete(src, true); } @@ -854,8 +873,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { Partition p = partitionsToAdd.get(0); Path src = new Path(jobInfo.getLocation()); Path dest = new Path(p.getSd().getLocation()); - moveTaskOutputs(fs, src, src, dest, true, table.isImmutable()); - moveTaskOutputs(fs,src,src,dest,false,table.isImmutable()); + moveTaskOutputs(conf, src, src, dest, table.isImmutable()); if (!src.equals(dest)){ if (src.toString().matches(".*" + Path.SEPARATOR + SCRATCH_DIR_NAME + "\\d\\.?\\d+.*")){ // src is scratch directory, need to trim the part key value pairs from path @@ -903,8 +921,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { // Dynamic partitioning usecase if (!customDynamicLocationUsed) { Path src = new Path(ptnRootLocation); - moveTaskOutputs(fs, src, src, tblPath, true, true); // dryRun = true, immutable = true - moveTaskOutputs(fs, src, src, tblPath, false, true); + moveTaskOutputs(conf, src, src, tblPath, true); if (!src.equals(tblPath)){ fs.delete(src, true); } @@ -956,8 +973,7 @@ class FileOutputCommitterContainer extends OutputCommitterContainer { for (Entry<String, Map<String, String>> entry : partitionsDiscoveredByPath.entrySet()) { Path src = new Path(entry.getKey()); Path destPath = new Path(getFinalDynamicPartitionDestination(table, entry.getValue(), jobInfo)); - moveTaskOutputs(fs, src, src, destPath, true, true); // dryRun = true, immutable = true - moveTaskOutputs(fs, src, src, destPath, false, true); + moveTaskOutputs(conf, src, src, destPath, true); } // delete the parent temp directory of all custom dynamic partitions Path parentPath = new Path(getCustomPartitionRootLocation(jobInfo, conf));