Repository: hive
Updated Branches:
  refs/heads/master f780eb39e -> d5bdb9bc6


HIVE-17289: EXPORT and IMPORT shouldn't perform distcp with doAs privileged 
user (Sankar Hariappan, reviewed by Daniel Dai)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d5bdb9bc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d5bdb9bc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d5bdb9bc

Branch: refs/heads/master
Commit: d5bdb9bc6bbe6c8decccf29e169b80c30ede27f8
Parents: f780eb3
Author: Daniel Dai <da...@hortonworks.com>
Authored: Tue Aug 15 12:12:24 2017 -0700
Committer: Daniel Dai <da...@hortonworks.com>
Committed: Tue Aug 15 12:12:24 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/common/FileUtils.java    |  10 --
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   5 +-
 .../hadoop/hive/ql/exec/ReplCopyTask.java       | 169 +++++--------------
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java  |   3 +-
 .../hive/ql/parse/ExportSemanticAnalyzer.java   |   2 +-
 .../ql/parse/repl/dump/PartitionExport.java     |   9 +-
 .../hive/ql/parse/repl/dump/TableExport.java    |  18 +-
 .../hive/ql/parse/repl/dump/io/CopyUtils.java   |  23 ++-
 .../ql/parse/repl/dump/io/FileOperations.java   |   9 +-
 .../hadoop/hive/ql/plan/ReplCopyWork.java       |  82 ++-------
 10 files changed, 97 insertions(+), 233 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d5bdb9bc/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java 
b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index 2880eb2..e784797 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -609,16 +609,6 @@ public final class FileUtils {
     return copy(srcFS, src, dstFS, dst, deleteSource, overwrite, conf, 
ShimLoader.getHadoopShims());
   }
 
-  /**
-   * Copies files between filesystems as a fs super user using distcp, and runs
-   * as a privileged user.
-   */
-  public static boolean privilegedCopy(FileSystem srcFS, List<Path> srcPaths, 
Path dst,
-      HiveConf conf) throws IOException {
-    String privilegedUser = 
conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
-    return distCp(srcFS, srcPaths, dst, false, privilegedUser, conf, 
ShimLoader.getHadoopShims());
-  }
-
   @VisibleForTesting
   static boolean copy(FileSystem srcFS, Path src,
     FileSystem dstFS, Path dst,

http://git-wip-us.apache.org/repos/asf/hive/blob/d5bdb9bc/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 580e725..b154544 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2595,10 +2595,9 @@ public class HiveConf extends Configuration {
     HIVE_SERVER2_ENABLE_DOAS("hive.server2.enable.doAs", true,
         "Setting this property to true will have HiveServer2 execute\n" +
         "Hive operations as the user making the calls to it."),
-    HIVE_DISTCP_DOAS_USER("hive.distcp.privileged.doAs","hdfs",
+    HIVE_DISTCP_DOAS_USER("hive.distcp.privileged.doAs","hive",
         "This property allows privileged distcp executions done by hive\n" +
-        "to run as this user. Typically, it should be the user you\n" +
-        "run the namenode as, such as the 'hdfs' user."),
+        "to run as this user."),
     HIVE_SERVER2_TABLE_TYPE_MAPPING("hive.server2.table.type.mapping", 
"CLASSIC", new StringSet("CLASSIC", "HIVE"),
         "This setting reflects how HiveServer2 will report the table types for 
JDBC and other\n" +
         "client implementations that retrieve the available tables and 
supported table types\n" +

http://git-wip-us.apache.org/repos/asf/hive/blob/d5bdb9bc/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 7330f56..07f9167 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -24,16 +24,13 @@ import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.plan.CopyWork;
 import org.apache.hadoop.hive.ql.plan.ReplCopyWork;
+import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
 
 import java.io.BufferedReader;
-import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 
 import org.slf4j.Logger;
@@ -50,7 +47,6 @@ import org.apache.hadoop.util.StringUtils;
 
 public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
 
-
   private static final long serialVersionUID = 1L;
 
   private static transient final Logger LOG = 
LoggerFactory.getLogger(ReplCopyTask.class);
@@ -85,9 +81,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> 
implements Serializable {
             .getPath();
         if (FileUtils.copy(
             sourcePath.getFileSystem(conf), sourcePath,
-            dstFs, toPath
-            , false, false, conf
-        )) {
+            dstFs, toPath, false, false, conf)) {
           return 0;
         } else {
           console.printError("Failed to copy: '" + fromPath.toString() + "to: 
'" + toPath.toString()
@@ -96,107 +90,47 @@ public class ReplCopyTask extends Task<ReplCopyWork> 
implements Serializable {
         }
       }
 
-      List<FileStatus> srcFiles = new ArrayList<>();
-      FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, 
fromPath);
-      LOG.debug("ReplCopyTasks srcs=" + (srcs == null ? "null" : srcs.length));
-      if (!rwork.getReadListFromInput()) {
-        if (srcs == null || srcs.length == 0) {
+      List<Path> srcPaths = new ArrayList<>();
+      if (rwork.readSrcAsFilesList()) {
+        // This flow is usually taken for REPL LOAD
+        // Our input is the result of a _files listing, we should expand out 
_files.
+        srcPaths = filesInFileListing(srcFs, fromPath);
+        LOG.debug("ReplCopyTask _files contains:" + (srcPaths == null ? "null" 
: srcPaths.size()));
+        if ((srcPaths == null) || (srcPaths.isEmpty())) {
           if (work.isErrorOnSrcEmpty()) {
-            console.printError("No files matching path: " + 
fromPath.toString());
-            return 3;
+            console.printError("No _files entry found on source: " + 
fromPath.toString());
+            return 5;
           } else {
             return 0;
           }
         }
       } else {
-        LOG.debug("ReplCopyTask making sense of _files");
-        // Our input is probably the result of a _files listing, we should 
expand out _files.
-        srcFiles = filesInFileListing(srcFs,fromPath);
-        LOG.debug("ReplCopyTask _files contains:" + (srcFiles == null ? "null" 
: srcFiles.size()));
-        if (srcFiles == null){
+        // This flow is usually taken for IMPORT command
+        FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, 
fromPath);
+        LOG.debug("ReplCopyTasks srcs= {}", (srcs == null ? "null" : 
srcs.length));
+        if (srcs == null || srcs.length == 0) {
           if (work.isErrorOnSrcEmpty()) {
-            console.printError("No _files entry found on source: " + 
fromPath.toString());
-            return 5;
+            console.printError("No files matching path: " + 
fromPath.toString());
+            return 3;
           } else {
             return 0;
           }
         }
+
+        for (FileStatus oneSrc : srcs) {
+          console.printInfo("Copying file: " + oneSrc.getPath().toString());
+          LOG.debug("ReplCopyTask :cp:{}=>{}", oneSrc.getPath(), toPath);
+          srcPaths.add(oneSrc.getPath());
+        }
       }
-      // Add in all the lone filecopies expected as well - applies to
-      // both _files case stragglers and regular copies
-      srcFiles.addAll(Arrays.asList(srcs));
-      LOG.debug("ReplCopyTask numFiles:" + (srcFiles == null ? "null" : 
srcFiles.size()));
 
+      LOG.debug("ReplCopyTask numFiles: {}", srcPaths.size());
       if (!FileUtils.mkdir(dstFs, toPath, conf)) {
         console.printError("Cannot make target directory: " + 
toPath.toString());
         return 2;
       }
-
-      BufferedWriter listBW = null;
-      if (rwork.getListFilesOnOutputBehaviour()) {
-        Path listPath = new Path(toPath,EximUtil.FILES_NAME);
-        LOG.debug("ReplCopyTask : generating _files at :" + 
listPath.toUri().toString());
-        if (dstFs.exists(listPath)){
-          console.printError("Cannot make target _files file:" + 
listPath.toString());
-          return 4;
-        }
-        listBW = new BufferedWriter(new 
OutputStreamWriter(dstFs.create(listPath)));
-        // TODO : verify that not specifying charset here does not bite us
-        // later(for cases where filenames have unicode chars)
-      }
-
-      HashMap<FileSystem, List<Path>> srcMap = new HashMap<>();
-      for (FileStatus oneSrc : srcFiles) {
-        console.printInfo("Copying file: " + oneSrc.getPath().toString());
-        LOG.debug("Copying file: " + oneSrc.getPath().toString());
-
-        FileSystem actualSrcFs = null;
-        if (rwork.getReadListFromInput()) {
-          // TODO : filesystemcache prevents this from being a perf nightmare, 
but we
-          // should still probably follow up to see if we need to do something 
better here.
-          actualSrcFs = oneSrc.getPath().getFileSystem(conf);
-        } else {
-          actualSrcFs = srcFs;
-        }
-
-        if (!rwork.getListFilesOnOutputBehaviour(oneSrc)) {
-          LOG.debug("ReplCopyTask :cp:" + oneSrc.getPath() + "=>" + toPath);
-
-          // We just make the list of files to copied using distCp.
-          // If files come from different file system, then just make separate 
lists for each filesystem.
-          if (srcMap.containsKey(actualSrcFs)) {
-            srcMap.get(actualSrcFs).add(oneSrc.getPath());
-          } else {
-            List<Path> srcPaths = new ArrayList<>();
-            srcPaths.add(oneSrc.getPath());
-            srcMap.put(actualSrcFs, srcPaths);
-          }
-        } else {
-          LOG.debug("ReplCopyTask _files now tracks:" + 
oneSrc.getPath().toUri());
-          console.printInfo("Tracking file: " + oneSrc.getPath().toUri());
-          String chksumString = 
ReplChangeManager.checksumFor(oneSrc.getPath(), actualSrcFs);
-          listBW.write(ReplChangeManager.encodeFileUri
-              (oneSrc.getPath().toUri().toString(), chksumString) + "\n");
-        }
-      }
-
-      if (listBW != null){
-        listBW.close();
-      }
-
-      // If the srcMap is not empty which means we made the list of files for 
distCp.
-      // If there are files from different filesystems, then the map will have 
multiple entries.
-      if (!srcMap.isEmpty()) {
-        for (final HashMap.Entry<FileSystem, List<Path>> entry : 
srcMap.entrySet()) {
-          FileSystem actualSrcFs = entry.getKey();
-          List<Path> srcPaths = entry.getValue();
-          if (!doCopy(toPath, dstFs, srcPaths, actualSrcFs, conf)) {
-            console.printError("Failed to copy: " + srcPaths.size()
-                    + " files to: '" + toPath.toString() + "'");
-            return 1;
-          }
-        }
-      }
+      // Copy the files from different source file systems to one destination 
directory
+      new CopyUtils(rwork.distCpDoAsUser(), conf).doCopy(toPath, srcPaths);
 
       return 0;
     } catch (Exception e) {
@@ -206,36 +140,9 @@ public class ReplCopyTask extends Task<ReplCopyWork> 
implements Serializable {
     }
   }
 
-  public static boolean doCopy(Path dst, FileSystem dstFs, List<Path> 
srcPaths, FileSystem srcFs,
-              HiveConf conf) throws IOException {
-    boolean result = true;
-    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)
-            || isLocalFileSystem(dstFs) || isLocalFileSystem(srcFs)) {
-      for (final Path src : srcPaths) {
-        // regular copy in test env, or when source or destination is a local 
file
-        // distcp runs inside a mapper task, and cannot handle file:///
-        LOG.debug("Using regular copy for {} -> {}", src.toUri(), dst.toUri());
-        if (!FileUtils.copy(srcFs, src, dstFs, dst, false, true, conf)) {
-          result = false;
-        }
-      }
-    } else {
-      // distcp in actual deployment with privilege escalation
-      result = FileUtils.privilegedCopy(srcFs, srcPaths, dst, conf);
-    }
-    return result;
-  }
-
-  private static boolean isLocalFileSystem(FileSystem fs) {
-    String scheme = fs.getScheme();
-    boolean isLocalFileSystem = scheme.equalsIgnoreCase("file");
-    LOG.debug("Scheme {} was a local file system? {}", scheme, 
isLocalFileSystem);
-    return isLocalFileSystem;
-  }
-
-  private List<FileStatus> filesInFileListing(FileSystem fs, Path path)
+  private List<Path> filesInFileListing(FileSystem fs, Path dataPath)
       throws IOException {
-    Path fileListing = new Path(path, EximUtil.FILES_NAME);
+    Path fileListing = new Path(dataPath, EximUtil.FILES_NAME);
     LOG.debug("ReplCopyTask filesInFileListing() reading " + 
fileListing.toUri());
     if (! fs.exists(fileListing)){
       LOG.debug("ReplCopyTask : _files does not exist");
@@ -243,7 +150,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> 
implements Serializable {
       // On success, but with nothing to return, we can return an empty list.
     }
 
-    List<FileStatus> ret = new ArrayList<FileStatus>();
+    List<Path> filePaths = new ArrayList<>();
     BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(fileListing)));
     // TODO : verify if skipping charset here is okay
 
@@ -253,9 +160,10 @@ public class ReplCopyTask extends Task<ReplCopyWork> 
implements Serializable {
 
       String[] fileWithChksum = 
ReplChangeManager.getFileWithChksumFromURI(line);
       try {
-        FileStatus f = ReplChangeManager.getFileStatus(new 
Path(fileWithChksum[0]),
-            fileWithChksum[1], conf);
-        ret.add(f);
+        Path f = ReplChangeManager
+                .getFileStatus(new Path(fileWithChksum[0]), fileWithChksum[1], 
conf)
+                .getPath();
+        filePaths.add(f);
       } catch (MetaException e) {
         // skip and issue warning for missing file
         LOG.warn("Cannot find " + fileWithChksum[0] + " in source repo or 
cmroot");
@@ -269,7 +177,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> 
implements Serializable {
       // and if not so, optimize.
     }
 
-    return ret;
+    return filePaths;
   }
 
   @Override
@@ -290,9 +198,14 @@ public class ReplCopyTask extends Task<ReplCopyWork> 
implements Serializable {
     if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){
       ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false);
       LOG.debug("ReplCopyTask:\trcwork");
-      if (replicationSpec.isLazy()){
+      if (replicationSpec.isLazy()) {
         LOG.debug("ReplCopyTask:\tlazy");
-        rcwork.setReadListFromInput(true);
+        rcwork.setReadSrcAsFilesList(true);
+
+        // It is assumed isLazy flag is set only for REPL LOAD flow.
+        // IMPORT always do deep copy. So, distCpDoAsUser will be null by 
default in ReplCopyWork.
+        String distCpDoAsUser = 
conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
+        rcwork.setDistCpDoAsUser(distCpDoAsUser);
       }
       copyTask = TaskFactory.get(rcwork, conf);
     } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/d5bdb9bc/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 34b6737..05fc5e4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -236,7 +236,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> 
implements Serializable {
       TableSpec ts = new TableSpec(db, conf, dbName + "." + tblName, null);
       TableExport.Paths exportPaths =
           new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, 
tblName, conf);
-      new TableExport(exportPaths, ts, getNewReplicationSpec(), db, conf, 
LOG).write();
+      String distCpDoAsUser = 
conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
+      new TableExport(exportPaths, ts, getNewReplicationSpec(), db, 
distCpDoAsUser, conf).write();
       REPL_STATE_LOG.info(
           "Repl Dump: Analyzed dump for table/view: {}.{} and dumping metadata 
and data to path {}",
           dbName, tblName, exportPaths.exportRootDir.toString());

http://git-wip-us.apache.org/repos/asf/hive/blob/d5bdb9bc/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
index 86575e0..74144ac 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
@@ -74,7 +74,7 @@ public class ExportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     TableExport.Paths exportPaths =
         new TableExport.Paths(ErrorMsg.INVALID_PATH.getMsg(ast), tmpPath, 
conf);
     TableExport.AuthEntities authEntities =
-        new TableExport(exportPaths, ts, replicationSpec, db, conf, 
LOG).write();
+        new TableExport(exportPaths, ts, replicationSpec, db, null, 
conf).write();
     inputs.addAll(authEntities.inputs);
     outputs.addAll(authEntities.outputs);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/d5bdb9bc/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
index 87beffa..7e72f23 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
@@ -46,6 +46,7 @@ import static 
org.apache.hadoop.hive.ql.parse.repl.dump.TableExport.Paths;
 class PartitionExport {
   private final Paths paths;
   private final PartitionIterable partitionIterable;
+  private final String distCpDoAsUser;
   private final HiveConf hiveConf;
   private final int nThreads;
   private final AuthEntities authEntities;
@@ -53,10 +54,11 @@ class PartitionExport {
   private static final Logger LOG = 
LoggerFactory.getLogger(PartitionExport.class);
   private BlockingQueue<Partition> queue;
 
-  PartitionExport(Paths paths, PartitionIterable partitionIterable, HiveConf 
hiveConf,
-      AuthEntities authEntities) {
+  PartitionExport(Paths paths, PartitionIterable partitionIterable, String 
distCpDoAsUser,
+                  HiveConf hiveConf, AuthEntities authEntities) {
     this.paths = paths;
     this.partitionIterable = partitionIterable;
+    this.distCpDoAsUser = distCpDoAsUser;
     this.hiveConf = hiveConf;
     this.authEntities = authEntities;
     this.nThreads = 
hiveConf.getIntVar(HiveConf.ConfVars.REPL_PARTITIONS_DUMP_PARALLELISM);
@@ -101,7 +103,8 @@ class PartitionExport {
         try {
           // this the data copy
           Path rootDataDumpDir = paths.partitionExportDir(partitionName);
-          new FileOperations(fromPath, rootDataDumpDir, 
hiveConf).export(forReplicationSpec);
+          new FileOperations(fromPath, rootDataDumpDir, distCpDoAsUser, 
hiveConf)
+                  .export(forReplicationSpec);
           authEntities.inputs.add(new ReadEntity(partition));
           LOG.debug("Thread: {}, finish partition dump {}", threadName, 
partitionName);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/d5bdb9bc/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
index 5d7fd25..5eae35a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -45,16 +46,18 @@ import java.util.concurrent.ConcurrentHashMap;
 import static 
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.toWriteEntity;
 
 public class TableExport {
+  private static final Logger logger = 
LoggerFactory.getLogger(TableExport.class);
+
   private TableSpec tableSpec;
   private final ReplicationSpec replicationSpec;
   private final Hive db;
+  private final String distCpDoAsUser;
   private final HiveConf conf;
-  private final Logger logger;
   private final Paths paths;
   private final AuthEntities authEntities = new AuthEntities();
 
   public TableExport(Paths paths, TableSpec tableSpec,
-      ReplicationSpec replicationSpec, Hive db, HiveConf conf, Logger logger)
+      ReplicationSpec replicationSpec, Hive db, String distCpDoAsUser, 
HiveConf conf)
       throws SemanticException {
     this.tableSpec = (tableSpec != null
         && tableSpec.tableHandle.isTemporary()
@@ -66,8 +69,8 @@ public class TableExport {
       this.replicationSpec.setIsMetadataOnly(true);
     }
     this.db = db;
+    this.distCpDoAsUser = distCpDoAsUser;
     this.conf = conf;
-    this.logger = logger;
     this.paths = paths;
   }
 
@@ -115,8 +118,7 @@ public class TableExport {
     }
   }
 
-  private void writeMetaData(PartitionIterable partitions)
-      throws SemanticException {
+  private void writeMetaData(PartitionIterable partitions) throws 
SemanticException {
     try {
       EximUtil.createExportDump(
           paths.exportFileSystem,
@@ -140,11 +142,13 @@ public class TableExport {
           throw new IllegalStateException(
               "partitions cannot be null for partitionTable :" + 
tableSpec.tableName);
         }
-        new PartitionExport(paths, partitions, conf, 
authEntities).write(replicationSpec);
+        new PartitionExport(paths, partitions, distCpDoAsUser, conf, 
authEntities)
+                .write(replicationSpec);
       } else {
         Path fromPath = tableSpec.tableHandle.getDataLocation();
         //this is the data copy
-        new FileOperations(fromPath, paths.dataExportDir(), 
conf).export(replicationSpec);
+        new FileOperations(fromPath, paths.dataExportDir(), distCpDoAsUser, 
conf)
+                .export(replicationSpec);
         authEntities.inputs.add(new ReadEntity(tableSpec.tableHandle));
       }
       authEntities.outputs.add(toWriteEntity(paths.exportRootDir, conf));

http://git-wip-us.apache.org/repos/asf/hive/blob/d5bdb9bc/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/CopyUtils.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/CopyUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/CopyUtils.java
index 0cd3f17..db923e3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/CopyUtils.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.hive.ql.parse.repl.dump.io;
+package org.apache.hadoop.hive.ql.parse.repl;
 
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileSystem;
@@ -34,7 +34,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-class CopyUtils {
+public class CopyUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(CopyUtils.class);
 
@@ -44,15 +44,15 @@ class CopyUtils {
   private final boolean hiveInTest;
   private final String copyAsUser;
 
-  CopyUtils(HiveConf hiveConf) {
+  public CopyUtils(String distCpDoAsUser, HiveConf hiveConf) {
     this.hiveConf = hiveConf;
     maxNumberOfFiles = 
hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES);
     maxCopyFileSize = 
hiveConf.getLongVar(HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE);
     hiveInTest = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST);
-    this.copyAsUser = hiveConf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
+    this.copyAsUser = distCpDoAsUser;
   }
 
-  void doCopy(Path destination, List<Path> srcPaths) throws IOException {
+  public void doCopy(Path destination, List<Path> srcPaths) throws IOException 
{
     Map<FileSystem, List<Path>> map = fsToFileMap(srcPaths);
     FileSystem destinationFs = destination.getFileSystem(hiveConf);
 
@@ -77,10 +77,9 @@ class CopyUtils {
   /*
       Check for conditions that will lead to local copy, checks are:
       1. we are testing hive.
-      2. both source and destination are same FileSystem
-      3. either source or destination is a "local" FileSystem("file")
-      4. aggregate fileSize of all source Paths(can be directory /  file) is 
less than configured size.
-      5. number of files of all source Paths(can be directory /  file) is less 
than configured size.
+      2. either source or destination is a "local" FileSystem("file")
+      3. aggregate fileSize of all source Paths(can be directory /  file) is 
less than configured size.
+      4. number of files of all source Paths(can be directory /  file) is less 
than configured size.
   */
   private boolean regularCopy(FileSystem destinationFs, Map.Entry<FileSystem, 
List<Path>> entry)
       throws IOException {
@@ -88,9 +87,7 @@ class CopyUtils {
       return true;
     }
     FileSystem sourceFs = entry.getKey();
-    boolean isLocalFs = isLocal(sourceFs) || isLocal(destinationFs);
-    boolean sameFs = sourceFs.equals(destinationFs);
-    if (isLocalFs || sameFs) {
+    if (isLocal(sourceFs) || isLocal(destinationFs)) {
       return true;
     }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/d5bdb9bc/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
index e1e3ae1..3ae07f1 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,13 +40,15 @@ public class FileOperations {
   private static Logger logger = LoggerFactory.getLogger(FileOperations.class);
   private final Path dataFileListPath;
   private final Path exportRootDataDir;
+  private final String distCpDoAsUser;
   private HiveConf hiveConf;
   private final FileSystem dataFileSystem, exportFileSystem;
 
-  public FileOperations(Path dataFileListPath, Path exportRootDataDir, 
HiveConf hiveConf)
-      throws IOException {
+  public FileOperations(Path dataFileListPath, Path exportRootDataDir,
+                        String distCpDoAsUser, HiveConf hiveConf) throws 
IOException {
     this.dataFileListPath = dataFileListPath;
     this.exportRootDataDir = exportRootDataDir;
+    this.distCpDoAsUser = distCpDoAsUser;
     this.hiveConf = hiveConf;
     dataFileSystem = dataFileListPath.getFileSystem(hiveConf);
     exportFileSystem = exportRootDataDir.getFileSystem(hiveConf);
@@ -69,7 +72,7 @@ public class FileOperations {
     for (FileStatus fileStatus : fileStatuses) {
       srcPaths.add(fileStatus.getPath());
     }
-    new CopyUtils(hiveConf).doCopy(exportRootDataDir, srcPaths);
+    new CopyUtils(distCpDoAsUser, hiveConf).doCopy(exportRootDataDir, 
srcPaths);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/d5bdb9bc/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
index 1932d60..cf6ec46 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.plan;
 
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 
@@ -27,93 +26,48 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level;
  * which will have mechanics to list the files in source to write to the 
destination,
  * instead of copying them, if specified, falling back to copying if needed.
  */
-@Explain(displayName = "Copy for Replication", explainLevels = { Level.USER, 
Level.DEFAULT, Level.EXTENDED })
+@Explain(displayName = "Repl Copy", explainLevels = { Level.USER, 
Level.DEFAULT, Level.EXTENDED })
 public class ReplCopyWork extends CopyWork {
-
-  protected boolean copyFiles = true; // governs copy-or-list-files behaviour.
-  // If set to true, behaves identically to a CopyWork
-  // If set to false, ReplCopyTask does a file-list of the things to be copied 
instead, and puts them in a file called _files.
-  // Default is set to mimic CopyTask, with the intent that any Replication 
code will explicitly flip this.
-
   /**
    * TODO : Refactor
    *
    * There is an upcoming patch that refactors this bit of code. Currently, 
the idea is the following:
    *
    * By default, ReplCopyWork will behave similarly to CopyWork, and simply 
copy
-   * along data from the source to destination. If, however, listFilesOnOutput 
is set,
-   * then, instead of copying the individual files to the destination, it 
simply creates
-   * a file called _files on destination that contains the list of the 
original files
-   * that were intended to be copied. Thus, we do not actually copy the files 
at CopyWork
-   * time.
-   *
-   * The flip side of this behaviour happens when, instead, readListFromInput 
is set. This
-   * flag, if set, changes the source behaviour of this CopyTask, and instead 
of copying
-   * explicit files, this will then fall back to a behaviour wherein an _files 
is read from
-   * the source, and the files specified by the _files are then copied to the 
destination.
+   * along data from the source to destination.
+   * If the flag readSrcAsFilesList is set, changes the source behaviour of 
this CopyTask, and
+   * instead of copying explicit files, this will then fall back to a 
behaviour wherein an _files is
+   * read from the source, and the files specified by the _files are then 
copied to the destination.
    *
    * This allows us a lazy-copy-on-source and a pull-from destination semantic 
that we want
    * to use from replication.
-   *
-   * ==
-   *
-   * The refactor intent, however, is to simplify this, so that we have only 1 
flag that we set,
-   * called isLazy. If isLazy is set, then this is the equivalent of the 
current listFilesOnOutput,
-   * and will generate a _files file.
-   *
-   * As to the input, we simply decide on whether to use the lazy mode or not 
depending on the
-   * presence of a _files file on the input. If we see a _files on the input, 
we simply expand it
-   * to copy as needed. If we do not, we copy as normal.
-   *
    */
 
-  protected boolean listFilesOnOutput = false; // governs copy-or-list-files 
behaviour
-  // If set to true, it'll iterate over input files, and for each file in the 
input,
-  //   it'll write out an additional line in a _files file in the output.
-  // If set to false, it'll behave as a traditional CopyTask.
-
-  protected boolean readListFromInput = false; // governs remote-fetch-input 
behaviour
+  // Governs remote-fetch-input behaviour
   // If set to true, we'll assume that the input has a _files file present 
which lists
   //   the actual input files to copy, and we'll pull each of those on read.
   // If set to false, it'll behave as a traditional CopyTask.
+  protected boolean readSrcAsFilesList = false;
 
-  public ReplCopyWork() {
-  }
-
-  public ReplCopyWork(final Path fromPath, final Path toPath) {
-    super(fromPath, toPath, true);
-  }
-
-  public ReplCopyWork(final Path fromPath, final Path toPath, boolean 
errorOnSrcEmpty) {
-    super(fromPath, toPath, errorOnSrcEmpty);
-  }
+  private String distCpDoAsUser = null;
 
-  public void setListFilesOnOutputBehaviour(boolean listFilesOnOutput){
-    this.listFilesOnOutput = listFilesOnOutput;
+  public ReplCopyWork(final Path srcPath, final Path destPath, boolean 
errorOnSrcEmpty) {
+    super(srcPath, destPath, errorOnSrcEmpty);
   }
 
-  public boolean getListFilesOnOutputBehaviour(){
-    return this.listFilesOnOutput;
+  public void setReadSrcAsFilesList(boolean readSrcAsFilesList) {
+    this.readSrcAsFilesList = readSrcAsFilesList;
   }
 
-  public void setReadListFromInput(boolean readListFromInput){
-    this.readListFromInput = readListFromInput;
+  public boolean readSrcAsFilesList() {
+    return this.readSrcAsFilesList;
   }
 
-  public boolean getReadListFromInput(){
-    return this.readListFromInput;
+  public void setDistCpDoAsUser(String distCpDoAsUser) {
+    this.distCpDoAsUser = distCpDoAsUser;
   }
 
-  // specialization of getListFilesOnOutputBehaviour, with a filestatus arg
-  // we can default to the default getListFilesOnOutputBehaviour behaviour,
-  // or, we can do additional pattern matching to decide that certain files
-  // should not be listed, and copied instead, _metadata files, for instance.
-  // Currently, we use this to skip _metadata files, but we might decide that
-  // this is not the right place for it later on.
-  public boolean getListFilesOnOutputBehaviour(FileStatus f) {
-    if (f.getPath().toString().contains("_metadata")){
-      return false; // always copy _metadata files
-    }
-    return this.listFilesOnOutput;
+  public String distCpDoAsUser() {
+    return distCpDoAsUser;
   }
 }

Reply via email to