Repository: hive Updated Branches: refs/heads/master f780eb39e -> d5bdb9bc6
HIVE-17289: EXPORT and IMPORT shouldn't perform distcp with doAs privileged user (Sankar Hariappan, reviewed by Daniel Dai) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d5bdb9bc Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d5bdb9bc Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d5bdb9bc Branch: refs/heads/master Commit: d5bdb9bc6bbe6c8decccf29e169b80c30ede27f8 Parents: f780eb3 Author: Daniel Dai <da...@hortonworks.com> Authored: Tue Aug 15 12:12:24 2017 -0700 Committer: Daniel Dai <da...@hortonworks.com> Committed: Tue Aug 15 12:12:24 2017 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hive/common/FileUtils.java | 10 -- .../org/apache/hadoop/hive/conf/HiveConf.java | 5 +- .../hadoop/hive/ql/exec/ReplCopyTask.java | 169 +++++-------------- .../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 3 +- .../hive/ql/parse/ExportSemanticAnalyzer.java | 2 +- .../ql/parse/repl/dump/PartitionExport.java | 9 +- .../hive/ql/parse/repl/dump/TableExport.java | 18 +- .../hive/ql/parse/repl/dump/io/CopyUtils.java | 23 ++- .../ql/parse/repl/dump/io/FileOperations.java | 9 +- .../hadoop/hive/ql/plan/ReplCopyWork.java | 82 ++------- 10 files changed, 97 insertions(+), 233 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d5bdb9bc/common/src/java/org/apache/hadoop/hive/common/FileUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java index 2880eb2..e784797 100644 --- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java +++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java @@ -609,16 +609,6 @@ public final class FileUtils { return copy(srcFS, src, dstFS, dst, deleteSource, overwrite, conf, ShimLoader.getHadoopShims()); } - /** - * Copies files between filesystems as a fs super user using distcp, and runs - * as a privileged user. - */ - public static boolean privilegedCopy(FileSystem srcFS, List<Path> srcPaths, Path dst, - HiveConf conf) throws IOException { - String privilegedUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); - return distCp(srcFS, srcPaths, dst, false, privilegedUser, conf, ShimLoader.getHadoopShims()); - } - @VisibleForTesting static boolean copy(FileSystem srcFS, Path src, FileSystem dstFS, Path dst, http://git-wip-us.apache.org/repos/asf/hive/blob/d5bdb9bc/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 580e725..b154544 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2595,10 +2595,9 @@ public class HiveConf extends Configuration { HIVE_SERVER2_ENABLE_DOAS("hive.server2.enable.doAs", true, "Setting this property to true will have HiveServer2 execute\n" + "Hive operations as the user making the calls to it."), - HIVE_DISTCP_DOAS_USER("hive.distcp.privileged.doAs","hdfs", + HIVE_DISTCP_DOAS_USER("hive.distcp.privileged.doAs","hive", "This property allows privileged distcp executions done by hive\n" + - "to run as this user. Typically, it should be the user you\n" + - "run the namenode as, such as the 'hdfs' user."), + "to run as this user."), HIVE_SERVER2_TABLE_TYPE_MAPPING("hive.server2.table.type.mapping", "CLASSIC", new StringSet("CLASSIC", "HIVE"), "This setting reflects how HiveServer2 will report the table types for JDBC and other\n" + "client implementations that retrieve the available tables and supported table types\n" + http://git-wip-us.apache.org/repos/asf/hive/blob/d5bdb9bc/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 7330f56..07f9167 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -24,16 +24,13 @@ import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.plan.CopyWork; import org.apache.hadoop.hive.ql.plan.ReplCopyWork; +import org.apache.hadoop.hive.ql.parse.repl.CopyUtils; import java.io.BufferedReader; -import java.io.BufferedWriter; import java.io.IOException; import java.io.InputStreamReader; -import java.io.OutputStreamWriter; import java.io.Serializable; import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; import org.slf4j.Logger; @@ -50,7 +47,6 @@ import org.apache.hadoop.util.StringUtils; public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { - private static final long serialVersionUID = 1L; private static transient final Logger LOG = LoggerFactory.getLogger(ReplCopyTask.class); @@ -85,9 +81,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { .getPath(); if (FileUtils.copy( sourcePath.getFileSystem(conf), sourcePath, - dstFs, toPath - , false, false, conf - )) { + dstFs, toPath, false, false, conf)) { return 0; } else { console.printError("Failed to copy: '" + fromPath.toString() + "to: '" + toPath.toString() @@ -96,107 +90,47 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { } } - List<FileStatus> srcFiles = new ArrayList<>(); - FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, fromPath); - LOG.debug("ReplCopyTasks srcs=" + (srcs == null ? "null" : srcs.length)); - if (!rwork.getReadListFromInput()) { - if (srcs == null || srcs.length == 0) { + List<Path> srcPaths = new ArrayList<>(); + if (rwork.readSrcAsFilesList()) { + // This flow is usually taken for REPL LOAD + // Our input is the result of a _files listing, we should expand out _files. + srcPaths = filesInFileListing(srcFs, fromPath); + LOG.debug("ReplCopyTask _files contains:" + (srcPaths == null ? "null" : srcPaths.size())); + if ((srcPaths == null) || (srcPaths.isEmpty())) { if (work.isErrorOnSrcEmpty()) { - console.printError("No files matching path: " + fromPath.toString()); - return 3; + console.printError("No _files entry found on source: " + fromPath.toString()); + return 5; } else { return 0; } } } else { - LOG.debug("ReplCopyTask making sense of _files"); - // Our input is probably the result of a _files listing, we should expand out _files. - srcFiles = filesInFileListing(srcFs,fromPath); - LOG.debug("ReplCopyTask _files contains:" + (srcFiles == null ? "null" : srcFiles.size())); - if (srcFiles == null){ + // This flow is usually taken for IMPORT command + FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, fromPath); + LOG.debug("ReplCopyTasks srcs= {}", (srcs == null ? "null" : srcs.length)); + if (srcs == null || srcs.length == 0) { if (work.isErrorOnSrcEmpty()) { - console.printError("No _files entry found on source: " + fromPath.toString()); - return 5; + console.printError("No files matching path: " + fromPath.toString()); + return 3; } else { return 0; } } + + for (FileStatus oneSrc : srcs) { + console.printInfo("Copying file: " + oneSrc.getPath().toString()); + LOG.debug("ReplCopyTask :cp:{}=>{}", oneSrc.getPath(), toPath); + srcPaths.add(oneSrc.getPath()); + } } - // Add in all the lone filecopies expected as well - applies to - // both _files case stragglers and regular copies - srcFiles.addAll(Arrays.asList(srcs)); - LOG.debug("ReplCopyTask numFiles:" + (srcFiles == null ? "null" : srcFiles.size())); + LOG.debug("ReplCopyTask numFiles: {}", srcPaths.size()); if (!FileUtils.mkdir(dstFs, toPath, conf)) { console.printError("Cannot make target directory: " + toPath.toString()); return 2; } - - BufferedWriter listBW = null; - if (rwork.getListFilesOnOutputBehaviour()) { - Path listPath = new Path(toPath,EximUtil.FILES_NAME); - LOG.debug("ReplCopyTask : generating _files at :" + listPath.toUri().toString()); - if (dstFs.exists(listPath)){ - console.printError("Cannot make target _files file:" + listPath.toString()); - return 4; - } - listBW = new BufferedWriter(new OutputStreamWriter(dstFs.create(listPath))); - // TODO : verify that not specifying charset here does not bite us - // later(for cases where filenames have unicode chars) - } - - HashMap<FileSystem, List<Path>> srcMap = new HashMap<>(); - for (FileStatus oneSrc : srcFiles) { - console.printInfo("Copying file: " + oneSrc.getPath().toString()); - LOG.debug("Copying file: " + oneSrc.getPath().toString()); - - FileSystem actualSrcFs = null; - if (rwork.getReadListFromInput()) { - // TODO : filesystemcache prevents this from being a perf nightmare, but we - // should still probably follow up to see if we need to do something better here. - actualSrcFs = oneSrc.getPath().getFileSystem(conf); - } else { - actualSrcFs = srcFs; - } - - if (!rwork.getListFilesOnOutputBehaviour(oneSrc)) { - LOG.debug("ReplCopyTask :cp:" + oneSrc.getPath() + "=>" + toPath); - - // We just make the list of files to copied using distCp. - // If files come from different file system, then just make separate lists for each filesystem. - if (srcMap.containsKey(actualSrcFs)) { - srcMap.get(actualSrcFs).add(oneSrc.getPath()); - } else { - List<Path> srcPaths = new ArrayList<>(); - srcPaths.add(oneSrc.getPath()); - srcMap.put(actualSrcFs, srcPaths); - } - } else { - LOG.debug("ReplCopyTask _files now tracks:" + oneSrc.getPath().toUri()); - console.printInfo("Tracking file: " + oneSrc.getPath().toUri()); - String chksumString = ReplChangeManager.checksumFor(oneSrc.getPath(), actualSrcFs); - listBW.write(ReplChangeManager.encodeFileUri - (oneSrc.getPath().toUri().toString(), chksumString) + "\n"); - } - } - - if (listBW != null){ - listBW.close(); - } - - // If the srcMap is not empty which means we made the list of files for distCp. - // If there are files from different filesystems, then the map will have multiple entries. - if (!srcMap.isEmpty()) { - for (final HashMap.Entry<FileSystem, List<Path>> entry : srcMap.entrySet()) { - FileSystem actualSrcFs = entry.getKey(); - List<Path> srcPaths = entry.getValue(); - if (!doCopy(toPath, dstFs, srcPaths, actualSrcFs, conf)) { - console.printError("Failed to copy: " + srcPaths.size() - + " files to: '" + toPath.toString() + "'"); - return 1; - } - } - } + // Copy the files from different source file systems to one destination directory + new CopyUtils(rwork.distCpDoAsUser(), conf).doCopy(toPath, srcPaths); return 0; } catch (Exception e) { @@ -206,36 +140,9 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { } } - public static boolean doCopy(Path dst, FileSystem dstFs, List<Path> srcPaths, FileSystem srcFs, - HiveConf conf) throws IOException { - boolean result = true; - if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) - || isLocalFileSystem(dstFs) || isLocalFileSystem(srcFs)) { - for (final Path src : srcPaths) { - // regular copy in test env, or when source or destination is a local file - // distcp runs inside a mapper task, and cannot handle file:/// - LOG.debug("Using regular copy for {} -> {}", src.toUri(), dst.toUri()); - if (!FileUtils.copy(srcFs, src, dstFs, dst, false, true, conf)) { - result = false; - } - } - } else { - // distcp in actual deployment with privilege escalation - result = FileUtils.privilegedCopy(srcFs, srcPaths, dst, conf); - } - return result; - } - - private static boolean isLocalFileSystem(FileSystem fs) { - String scheme = fs.getScheme(); - boolean isLocalFileSystem = scheme.equalsIgnoreCase("file"); - LOG.debug("Scheme {} was a local file system? {}", scheme, isLocalFileSystem); - return isLocalFileSystem; - } - - private List<FileStatus> filesInFileListing(FileSystem fs, Path path) + private List<Path> filesInFileListing(FileSystem fs, Path dataPath) throws IOException { - Path fileListing = new Path(path, EximUtil.FILES_NAME); + Path fileListing = new Path(dataPath, EximUtil.FILES_NAME); LOG.debug("ReplCopyTask filesInFileListing() reading " + fileListing.toUri()); if (! fs.exists(fileListing)){ LOG.debug("ReplCopyTask : _files does not exist"); @@ -243,7 +150,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { // On success, but with nothing to return, we can return an empty list. } - List<FileStatus> ret = new ArrayList<FileStatus>(); + List<Path> filePaths = new ArrayList<>(); BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(fileListing))); // TODO : verify if skipping charset here is okay @@ -253,9 +160,10 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { String[] fileWithChksum = ReplChangeManager.getFileWithChksumFromURI(line); try { - FileStatus f = ReplChangeManager.getFileStatus(new Path(fileWithChksum[0]), - fileWithChksum[1], conf); - ret.add(f); + Path f = ReplChangeManager + .getFileStatus(new Path(fileWithChksum[0]), fileWithChksum[1], conf) + .getPath(); + filePaths.add(f); } catch (MetaException e) { // skip and issue warning for missing file LOG.warn("Cannot find " + fileWithChksum[0] + " in source repo or cmroot"); @@ -269,7 +177,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { // and if not so, optimize. } - return ret; + return filePaths; } @Override @@ -290,9 +198,14 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){ ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false); LOG.debug("ReplCopyTask:\trcwork"); - if (replicationSpec.isLazy()){ + if (replicationSpec.isLazy()) { LOG.debug("ReplCopyTask:\tlazy"); - rcwork.setReadListFromInput(true); + rcwork.setReadSrcAsFilesList(true); + + // It is assumed isLazy flag is set only for REPL LOAD flow. + // IMPORT always do deep copy. So, distCpDoAsUser will be null by default in ReplCopyWork. + String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); + rcwork.setDistCpDoAsUser(distCpDoAsUser); } copyTask = TaskFactory.get(rcwork, conf); } else { http://git-wip-us.apache.org/repos/asf/hive/blob/d5bdb9bc/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 34b6737..05fc5e4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -236,7 +236,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable { TableSpec ts = new TableSpec(db, conf, dbName + "." + tblName, null); TableExport.Paths exportPaths = new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf); - new TableExport(exportPaths, ts, getNewReplicationSpec(), db, conf, LOG).write(); + String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); + new TableExport(exportPaths, ts, getNewReplicationSpec(), db, distCpDoAsUser, conf).write(); REPL_STATE_LOG.info( "Repl Dump: Analyzed dump for table/view: {}.{} and dumping metadata and data to path {}", dbName, tblName, exportPaths.exportRootDir.toString()); http://git-wip-us.apache.org/repos/asf/hive/blob/d5bdb9bc/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java index 86575e0..74144ac 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java @@ -74,7 +74,7 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer { TableExport.Paths exportPaths = new TableExport.Paths(ErrorMsg.INVALID_PATH.getMsg(ast), tmpPath, conf); TableExport.AuthEntities authEntities = - new TableExport(exportPaths, ts, replicationSpec, db, conf, LOG).write(); + new TableExport(exportPaths, ts, replicationSpec, db, null, conf).write(); inputs.addAll(authEntities.inputs); outputs.addAll(authEntities.outputs); } http://git-wip-us.apache.org/repos/asf/hive/blob/d5bdb9bc/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java index 87beffa..7e72f23 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java @@ -46,6 +46,7 @@ import static org.apache.hadoop.hive.ql.parse.repl.dump.TableExport.Paths; class PartitionExport { private final Paths paths; private final PartitionIterable partitionIterable; + private final String distCpDoAsUser; private final HiveConf hiveConf; private final int nThreads; private final AuthEntities authEntities; @@ -53,10 +54,11 @@ class PartitionExport { private static final Logger LOG = LoggerFactory.getLogger(PartitionExport.class); private BlockingQueue<Partition> queue; - PartitionExport(Paths paths, PartitionIterable partitionIterable, HiveConf hiveConf, - AuthEntities authEntities) { + PartitionExport(Paths paths, PartitionIterable partitionIterable, String distCpDoAsUser, + HiveConf hiveConf, AuthEntities authEntities) { this.paths = paths; this.partitionIterable = partitionIterable; + this.distCpDoAsUser = distCpDoAsUser; this.hiveConf = hiveConf; this.authEntities = authEntities; this.nThreads = hiveConf.getIntVar(HiveConf.ConfVars.REPL_PARTITIONS_DUMP_PARALLELISM); @@ -101,7 +103,8 @@ class PartitionExport { try { // this the data copy Path rootDataDumpDir = paths.partitionExportDir(partitionName); - new FileOperations(fromPath, rootDataDumpDir, hiveConf).export(forReplicationSpec); + new FileOperations(fromPath, rootDataDumpDir, distCpDoAsUser, hiveConf) + .export(forReplicationSpec); authEntities.inputs.add(new ReadEntity(partition)); LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hive/blob/d5bdb9bc/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java index 5d7fd25..5eae35a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.FileNotFoundException; import java.io.IOException; @@ -45,16 +46,18 @@ import java.util.concurrent.ConcurrentHashMap; import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.toWriteEntity; public class TableExport { + private static final Logger logger = LoggerFactory.getLogger(TableExport.class); + private TableSpec tableSpec; private final ReplicationSpec replicationSpec; private final Hive db; + private final String distCpDoAsUser; private final HiveConf conf; - private final Logger logger; private final Paths paths; private final AuthEntities authEntities = new AuthEntities(); public TableExport(Paths paths, TableSpec tableSpec, - ReplicationSpec replicationSpec, Hive db, HiveConf conf, Logger logger) + ReplicationSpec replicationSpec, Hive db, String distCpDoAsUser, HiveConf conf) throws SemanticException { this.tableSpec = (tableSpec != null && tableSpec.tableHandle.isTemporary() @@ -66,8 +69,8 @@ public class TableExport { this.replicationSpec.setIsMetadataOnly(true); } this.db = db; + this.distCpDoAsUser = distCpDoAsUser; this.conf = conf; - this.logger = logger; this.paths = paths; } @@ -115,8 +118,7 @@ public class TableExport { } } - private void writeMetaData(PartitionIterable partitions) - throws SemanticException { + private void writeMetaData(PartitionIterable partitions) throws SemanticException { try { EximUtil.createExportDump( paths.exportFileSystem, @@ -140,11 +142,13 @@ public class TableExport { throw new IllegalStateException( "partitions cannot be null for partitionTable :" + tableSpec.tableName); } - new PartitionExport(paths, partitions, conf, authEntities).write(replicationSpec); + new PartitionExport(paths, partitions, distCpDoAsUser, conf, authEntities) + .write(replicationSpec); } else { Path fromPath = tableSpec.tableHandle.getDataLocation(); //this is the data copy - new FileOperations(fromPath, paths.dataExportDir(), conf).export(replicationSpec); + new FileOperations(fromPath, paths.dataExportDir(), distCpDoAsUser, conf) + .export(replicationSpec); authEntities.inputs.add(new ReadEntity(tableSpec.tableHandle)); } authEntities.outputs.add(toWriteEntity(paths.exportRootDir, conf)); http://git-wip-us.apache.org/repos/asf/hive/blob/d5bdb9bc/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/CopyUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/CopyUtils.java index 0cd3f17..db923e3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/CopyUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/CopyUtils.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.hive.ql.parse.repl.dump.io; +package org.apache.hadoop.hive.ql.parse.repl; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileSystem; @@ -34,7 +34,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -class CopyUtils { +public class CopyUtils { private static final Logger LOG = LoggerFactory.getLogger(CopyUtils.class); @@ -44,15 +44,15 @@ class CopyUtils { private final boolean hiveInTest; private final String copyAsUser; - CopyUtils(HiveConf hiveConf) { + public CopyUtils(String distCpDoAsUser, HiveConf hiveConf) { this.hiveConf = hiveConf; maxNumberOfFiles = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES); maxCopyFileSize = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE); hiveInTest = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST); - this.copyAsUser = hiveConf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER); + this.copyAsUser = distCpDoAsUser; } - void doCopy(Path destination, List<Path> srcPaths) throws IOException { + public void doCopy(Path destination, List<Path> srcPaths) throws IOException { Map<FileSystem, List<Path>> map = fsToFileMap(srcPaths); FileSystem destinationFs = destination.getFileSystem(hiveConf); @@ -77,10 +77,9 @@ class CopyUtils { /* Check for conditions that will lead to local copy, checks are: 1. we are testing hive. - 2. both source and destination are same FileSystem - 3. either source or destination is a "local" FileSystem("file") - 4. aggregate fileSize of all source Paths(can be directory / file) is less than configured size. - 5. number of files of all source Paths(can be directory / file) is less than configured size. + 2. either source or destination is a "local" FileSystem("file") + 3. aggregate fileSize of all source Paths(can be directory / file) is less than configured size. + 4. number of files of all source Paths(can be directory / file) is less than configured size. */ private boolean regularCopy(FileSystem destinationFs, Map.Entry<FileSystem, List<Path>> entry) throws IOException { @@ -88,9 +87,7 @@ class CopyUtils { return true; } FileSystem sourceFs = entry.getKey(); - boolean isLocalFs = isLocal(sourceFs) || isLocal(destinationFs); - boolean sameFs = sourceFs.equals(destinationFs); - if (isLocalFs || sameFs) { + if (isLocal(sourceFs) || isLocal(destinationFs)) { return true; } http://git-wip-us.apache.org/repos/asf/hive/blob/d5bdb9bc/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java index e1e3ae1..3ae07f1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.parse.SemanticException; +import org.apache.hadoop.hive.ql.parse.repl.CopyUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,13 +40,15 @@ public class FileOperations { private static Logger logger = LoggerFactory.getLogger(FileOperations.class); private final Path dataFileListPath; private final Path exportRootDataDir; + private final String distCpDoAsUser; private HiveConf hiveConf; private final FileSystem dataFileSystem, exportFileSystem; - public FileOperations(Path dataFileListPath, Path exportRootDataDir, HiveConf hiveConf) - throws IOException { + public FileOperations(Path dataFileListPath, Path exportRootDataDir, + String distCpDoAsUser, HiveConf hiveConf) throws IOException { this.dataFileListPath = dataFileListPath; this.exportRootDataDir = exportRootDataDir; + this.distCpDoAsUser = distCpDoAsUser; this.hiveConf = hiveConf; dataFileSystem = dataFileListPath.getFileSystem(hiveConf); exportFileSystem = exportRootDataDir.getFileSystem(hiveConf); @@ -69,7 +72,7 @@ public class FileOperations { for (FileStatus fileStatus : fileStatuses) { srcPaths.add(fileStatus.getPath()); } - new CopyUtils(hiveConf).doCopy(exportRootDataDir, srcPaths); + new CopyUtils(distCpDoAsUser, hiveConf).doCopy(exportRootDataDir, srcPaths); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/d5bdb9bc/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java index 1932d60..cf6ec46 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hive.ql.plan; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@ -27,93 +26,48 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level; * which will have mechanics to list the files in source to write to the destination, * instead of copying them, if specified, falling back to copying if needed. */ -@Explain(displayName = "Copy for Replication", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) +@Explain(displayName = "Repl Copy", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public class ReplCopyWork extends CopyWork { - - protected boolean copyFiles = true; // governs copy-or-list-files behaviour. - // If set to true, behaves identically to a CopyWork - // If set to false, ReplCopyTask does a file-list of the things to be copied instead, and puts them in a file called _files. - // Default is set to mimic CopyTask, with the intent that any Replication code will explicitly flip this. - /** * TODO : Refactor * * There is an upcoming patch that refactors this bit of code. Currently, the idea is the following: * * By default, ReplCopyWork will behave similarly to CopyWork, and simply copy - * along data from the source to destination. If, however, listFilesOnOutput is set, - * then, instead of copying the individual files to the destination, it simply creates - * a file called _files on destination that contains the list of the original files - * that were intended to be copied. Thus, we do not actually copy the files at CopyWork - * time. - * - * The flip side of this behaviour happens when, instead, readListFromInput is set. This - * flag, if set, changes the source behaviour of this CopyTask, and instead of copying - * explicit files, this will then fall back to a behaviour wherein an _files is read from - * the source, and the files specified by the _files are then copied to the destination. + * along data from the source to destination. + * If the flag readSrcAsFilesList is set, changes the source behaviour of this CopyTask, and + * instead of copying explicit files, this will then fall back to a behaviour wherein an _files is + * read from the source, and the files specified by the _files are then copied to the destination. * * This allows us a lazy-copy-on-source and a pull-from destination semantic that we want * to use from replication. - * - * == - * - * The refactor intent, however, is to simplify this, so that we have only 1 flag that we set, - * called isLazy. If isLazy is set, then this is the equivalent of the current listFilesOnOutput, - * and will generate a _files file. - * - * As to the input, we simply decide on whether to use the lazy mode or not depending on the - * presence of a _files file on the input. If we see a _files on the input, we simply expand it - * to copy as needed. If we do not, we copy as normal. - * */ - protected boolean listFilesOnOutput = false; // governs copy-or-list-files behaviour - // If set to true, it'll iterate over input files, and for each file in the input, - // it'll write out an additional line in a _files file in the output. - // If set to false, it'll behave as a traditional CopyTask. - - protected boolean readListFromInput = false; // governs remote-fetch-input behaviour + // Governs remote-fetch-input behaviour // If set to true, we'll assume that the input has a _files file present which lists // the actual input files to copy, and we'll pull each of those on read. // If set to false, it'll behave as a traditional CopyTask. + protected boolean readSrcAsFilesList = false; - public ReplCopyWork() { - } - - public ReplCopyWork(final Path fromPath, final Path toPath) { - super(fromPath, toPath, true); - } - - public ReplCopyWork(final Path fromPath, final Path toPath, boolean errorOnSrcEmpty) { - super(fromPath, toPath, errorOnSrcEmpty); - } + private String distCpDoAsUser = null; - public void setListFilesOnOutputBehaviour(boolean listFilesOnOutput){ - this.listFilesOnOutput = listFilesOnOutput; + public ReplCopyWork(final Path srcPath, final Path destPath, boolean errorOnSrcEmpty) { + super(srcPath, destPath, errorOnSrcEmpty); } - public boolean getListFilesOnOutputBehaviour(){ - return this.listFilesOnOutput; + public void setReadSrcAsFilesList(boolean readSrcAsFilesList) { + this.readSrcAsFilesList = readSrcAsFilesList; } - public void setReadListFromInput(boolean readListFromInput){ - this.readListFromInput = readListFromInput; + public boolean readSrcAsFilesList() { + return this.readSrcAsFilesList; } - public boolean getReadListFromInput(){ - return this.readListFromInput; + public void setDistCpDoAsUser(String distCpDoAsUser) { + this.distCpDoAsUser = distCpDoAsUser; } - // specialization of getListFilesOnOutputBehaviour, with a filestatus arg - // we can default to the default getListFilesOnOutputBehaviour behaviour, - // or, we can do additional pattern matching to decide that certain files - // should not be listed, and copied instead, _metadata files, for instance. - // Currently, we use this to skip _metadata files, but we might decide that - // this is not the right place for it later on. - public boolean getListFilesOnOutputBehaviour(FileStatus f) { - if (f.getPath().toString().contains("_metadata")){ - return false; // always copy _metadata files - } - return this.listFilesOnOutput; + public String distCpDoAsUser() { + return distCpDoAsUser; } }