Repository: hive
Updated Branches:
  refs/heads/master 90149de71 -> addeab8d0


HIVE-17608: REPL LOAD should overwrite the data files if exists instead of 
duplicating it (Sankar Hariappan, reviewed by Anishek Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/addeab8d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/addeab8d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/addeab8d

Branch: refs/heads/master
Commit: addeab8d0810c62fc2246166e91d10c347abe58c
Parents: 90149de
Author: sankarh <sank...@apache.org>
Authored: Wed Oct 4 23:50:36 2017 +0530
Committer: sankarh <sank...@apache.org>
Committed: Wed Oct 4 23:50:36 2017 +0530

----------------------------------------------------------------------
 .../hadoop/hive/ql/history/TestHiveHistory.java |  3 +-
 .../hive/ql/parse/TestReplicationScenarios.java | 63 +++++++++++++++++-
 .../apache/hadoop/hive/ql/exec/MoveTask.java    |  9 +--
 .../hadoop/hive/ql/exec/ReplCopyTask.java       | 18 ++++-
 .../apache/hadoop/hive/ql/exec/StatsTask.java   |  9 ++-
 .../bootstrap/load/table/LoadPartitions.java    |  3 +-
 .../repl/bootstrap/load/table/LoadTable.java    |  4 +-
 .../apache/hadoop/hive/ql/metadata/Hive.java    | 69 ++++++++++++--------
 .../hive/ql/parse/ImportSemanticAnalyzer.java   |  6 +-
 .../hive/ql/parse/LoadSemanticAnalyzer.java     |  4 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  | 17 +++--
 .../hadoop/hive/ql/parse/repl/CopyUtils.java    |  6 +-
 .../hadoop/hive/ql/plan/LoadTableDesc.java      | 43 +++++++-----
 .../hadoop/hive/ql/exec/TestExecDriver.java     |  4 +-
 .../hive/ql/metadata/TestHiveCopyFiles.java     | 12 ++--
 15 files changed, 192 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
index bec715d..8a6fe3d 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/history/TestHiveHistory.java
@@ -41,6 +41,7 @@ import 
org.apache.hadoop.hive.ql.history.HiveHistory.QueryInfo;
 import org.apache.hadoop.hive.ql.history.HiveHistory.TaskInfo;
 import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.tools.LineageInfo;
 import org.apache.hadoop.mapred.TextInputFormat;
@@ -104,7 +105,7 @@ public class TestHiveHistory extends TestCase {
         db.dropTable(Warehouse.DEFAULT_DATABASE_NAME, src, true, true);
         db.createTable(src, cols, null, TextInputFormat.class,
             IgnoreKeyTextOutputFormat.class);
-        db.loadTable(hadoopDataFile[i], src, false, false, false, false, 
false);
+        db.loadTable(hadoopDataFile[i], src, LoadFileType.KEEP_EXISTING, 
false, false, false, false);
         i++;
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 7cf1498..276c464 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -1406,7 +1406,7 @@ public class TestReplicationScenarios {
 
             // Skip all the events belong to other DBs/tables.
             if (event.getDbName().equalsIgnoreCase(dbName)) {
-              if (event.getEventType() == "INSERT") {
+              if (event.getEventType().equalsIgnoreCase("INSERT")) {
                 // If an insert event is found, then return null hence no 
event is dumped.
                 LOG.error("Encountered INSERT event when it was not expected 
to");
                 return null;
@@ -1425,7 +1425,66 @@ public class TestReplicationScenarios {
     eventTypeValidator.assertInjectionsPerformed(true,false);
     InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour(); // 
reset the behaviour
 
-    verifyRun("SELECT a from " + dbName + "_dupe.ptned where (b=1)", ptn_data, 
driverMirror);
+    verifyRun("SELECT a from " + replDbName + ".ptned where (b=1)", ptn_data, 
driverMirror);
+  }
+
+  @Test
+  public void testIdempotentMoveTaskForInsertFiles() throws IOException {
+    String name = testName.getMethodName();
+    final String dbName = createDB(name, driver);
+    String replDbName = dbName + "_dupe";
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", 
driver);
+    Tuple bootstrap = bootstrapLoadAndVerify(dbName, replDbName);
+
+    String[] unptn_data = new String[]{ "ten"};
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + 
"')", driver);
+
+    // Inject a behaviour where it repeats the INSERT event twice with 
different event IDs
+    BehaviourInjection<NotificationEventResponse,NotificationEventResponse> 
insertEventRepeater
+            = new 
BehaviourInjection<NotificationEventResponse,NotificationEventResponse>(){
+
+      @Nullable
+      @Override
+      public NotificationEventResponse apply(@Nullable 
NotificationEventResponse eventsList) {
+        if (null != eventsList) {
+          List<NotificationEvent> events = eventsList.getEvents();
+          List<NotificationEvent> outEvents = new ArrayList<>();
+          long insertEventId = -1;
+
+          for (int i = 0; i < events.size(); i++) {
+            NotificationEvent event = events.get(i);
+
+            // Skip all the events belong to other DBs/tables.
+            if (event.getDbName().equalsIgnoreCase(dbName)) {
+              if (event.getEventType().equalsIgnoreCase("INSERT")) {
+                // Add insert event twice with different event ID to allow 
apply of both events.
+                NotificationEvent newEvent = new NotificationEvent(event);
+                outEvents.add(newEvent);
+                insertEventId = newEvent.getEventId();
+              }
+            }
+
+            NotificationEvent newEvent = new NotificationEvent(event);
+            if (insertEventId != -1) {
+              insertEventId++;
+              newEvent.setEventId(insertEventId);
+            }
+            outEvents.add(newEvent);
+          }
+          eventsList.setEvents(outEvents);
+          injectionPathCalled = true;
+        }
+        return eventsList;
+      }
+    };
+    
InjectableBehaviourObjectStore.setGetNextNotificationBehaviour(insertEventRepeater);
+
+    incrementalLoadAndVerify(dbName, bootstrap.lastReplId, replDbName);
+
+    insertEventRepeater.assertInjectionsPerformed(true,false);
+    InjectableBehaviourObjectStore.resetGetNextNotificationBehaviour(); // 
reset the behaviour
+
+    verifyRun("SELECT a from " + replDbName + ".unptned", unptn_data, 
driverMirror);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index c8ad795..0dadd51 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
 import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
@@ -376,7 +377,7 @@ public class MoveTask extends Task<MoveWork> implements 
Serializable {
         DataContainer dc = null;
         if (tbd.getPartitionSpec().size() == 0) {
           dc = new DataContainer(table.getTTable());
-          db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), 
tbd.getReplace(),
+          db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), 
tbd.getLoadFileType(),
               work.isSrcLocal(), isSkewedStoredAsDirs(tbd),
               work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.NOT_ACID,
               hasFollowingStatsTask());
@@ -452,7 +453,7 @@ public class MoveTask extends Task<MoveWork> implements 
Serializable {
                 tbd.getSourcePath(),
                 tbd.getTable().getTableName(),
                 tbd.getPartitionSpec(),
-                tbd.getReplace(),
+                tbd.getLoadFileType(),
                 dpCtx.getNumDPCols(),
                 isSkewedStoredAsDirs(tbd),
                 work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.NOT_ACID,
@@ -520,7 +521,7 @@ public class MoveTask extends Task<MoveWork> implements 
Serializable {
                 tbd.getPartitionSpec());
             db.validatePartitionNameCharacters(partVals);
             db.loadPartition(tbd.getSourcePath(), 
tbd.getTable().getTableName(),
-                tbd.getPartitionSpec(), tbd.getReplace(),
+                tbd.getPartitionSpec(), tbd.getLoadFileType(),
                 tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), 
work.isSrcLocal(),
                 work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.NOT_ACID, hasFollowingStatsTask());
             Partition partn = db.getPartition(table, tbd.getPartitionSpec(), 
false);
@@ -592,7 +593,7 @@ public class MoveTask extends Task<MoveWork> implements 
Serializable {
    * has done it's job before the query ran.
    */
   WriteEntity.WriteType getWriteType(LoadTableDesc tbd, AcidUtils.Operation 
operation) {
-    if(tbd.getReplace()) {
+    if (tbd.getLoadFileType() == LoadFileType.REPLACE_ALL) {
       return WriteEntity.WriteType.INSERT_OVERWRITE;
     }
     switch (operation) {

http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 6e722f7..39e5bf1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -130,7 +130,23 @@ public class ReplCopyTask extends Task<ReplCopyWork> 
implements Serializable {
         return 2;
       }
       // Copy the files from different source file systems to one destination 
directory
-      new CopyUtils(rwork.distCpDoAsUser(), conf).copyAndVerify(toPath, 
srcFiles);
+      new CopyUtils(rwork.distCpDoAsUser(), conf).copyAndVerify(dstFs, toPath, 
srcFiles);
+
+      // If a file is copied from CM path, then need to rename them using 
original source file name
+      // This is needed to avoid having duplicate files in target if same 
event is applied twice
+      // where the first event refers to source path and  second event refers 
to CM path
+      for (ReplChangeManager.FileInfo srcFile : srcFiles) {
+        if (srcFile.isUseSourcePath()) {
+          continue;
+        }
+        String destFileName = srcFile.getCmPath().getName();
+        Path destFile = new Path(toPath, destFileName);
+        if (dstFs.exists(destFile)) {
+          String destFileWithSourceName = srcFile.getSourcePath().getName();
+          Path newDestFile = new Path(toPath, destFileWithSourceName);
+          dstFs.rename(destFile, newDestFile);
+        }
+      }
 
       return 0;
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
index 4db6806..bdf3710 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
@@ -53,6 +53,7 @@ import 
org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
 import org.apache.hadoop.hive.ql.parse.ExplainConfiguration.AnalyzeState;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
 import org.apache.hadoop.hive.ql.plan.StatsWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.stats.StatsAggregator;
@@ -182,7 +183,8 @@ public class StatsTask extends Task<StatsWork> implements 
Serializable {
         if (work.getTableSpecs() == null && AcidUtils.isAcidTable(table)) {
           StatsSetupConst.setBasicStatsState(parameters, 
StatsSetupConst.FALSE);
         } else if (work.getTableSpecs() != null
-            || (work.getLoadTableDesc() != null && 
work.getLoadTableDesc().getReplace())
+            || (work.getLoadTableDesc() != null
+                && (work.getLoadTableDesc().getLoadFileType() == 
LoadFileType.REPLACE_ALL))
             || (work.getLoadFileDesc() != null && !work.getLoadFileDesc()
                 .getDestinationCreateTable().isEmpty())) {
           StatsSetupConst.setBasicStatsState(parameters, StatsSetupConst.TRUE);
@@ -283,7 +285,8 @@ public class StatsTask extends Task<StatsWork> implements 
Serializable {
           if (work.getTableSpecs() == null && AcidUtils.isAcidTable(table)) {
             StatsSetupConst.setBasicStatsState(parameters, 
StatsSetupConst.FALSE);
           } else if (work.getTableSpecs() != null
-              || (work.getLoadTableDesc() != null && 
work.getLoadTableDesc().getReplace())
+              || (work.getLoadTableDesc() != null
+                  && (work.getLoadTableDesc().getLoadFileType() == 
LoadFileType.REPLACE_ALL))
               || (work.getLoadFileDesc() != null && !work.getLoadFileDesc()
                   .getDestinationCreateTable().isEmpty())) {
             StatsSetupConst.setBasicStatsState(parameters, 
StatsSetupConst.TRUE);
@@ -409,7 +412,7 @@ public class StatsTask extends Task<StatsWork> implements 
Serializable {
         long longValue = Long.parseLong(value);
 
         if (work.getLoadTableDesc() != null &&
-            !work.getLoadTableDesc().getReplace()) {
+                (work.getLoadTableDesc().getLoadFileType() != 
LoadFileType.REPLACE_ALL)) {
           String originalValue = parameters.get(statType);
           if (originalValue != null) {
             longValue += Long.parseLong(originalValue); // todo: invalid + 
valid = invalid

http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index 5c6ef9f..821d7df 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.datanucleus.util.StringUtils;
 import org.slf4j.Logger;
@@ -238,7 +239,7 @@ public class LoadPartitions {
       Path tmpPath) {
     LoadTableDesc loadTableWork = new LoadTableDesc(
         tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(),
-        event.replicationSpec().isReplace());
+        event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : 
LoadFileType.OVERWRITE_EXISTING);
     loadTableWork.setInheritTableSpecs(false);
     MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), 
loadTableWork, null, false,
         context.sessionStateLineageState);

http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index a9a9162..25a2532 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
 import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -224,7 +225,8 @@ public class LoadTable {
         ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, 
context.hiveConf);
 
     LoadTableDesc loadTableWork = new LoadTableDesc(
-        tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), 
replicationSpec.isReplace());
+        tmpPath, Utilities.getTableDesc(table), new TreeMap<>(),
+        replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : 
LoadFileType.OVERWRITE_EXISTING);
     MoveWork moveWork =
         new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, 
false,
             context.sessionStateLineageState);

http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index b0e68b1..26003f4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -144,6 +144,7 @@ import 
org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrun
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
 import org.apache.hadoop.hive.ql.plan.DropTableDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
 import org.apache.hadoop.hive.ql.session.CreateTableAutomaticGrant;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde2.Deserializer;
@@ -1623,7 +1624,7 @@ public class Hive {
    * @param loadPath
    * @param tableName
    * @param partSpec
-   * @param replace
+   * @param loadFileType
    * @param inheritTableSpecs
    * @param isSkewedStoreAsSubdir
    * @param isSrcLocal
@@ -1633,11 +1634,11 @@ public class Hive {
    * @throws HiveException
    */
   public void loadPartition(Path loadPath, String tableName,
-      Map<String, String> partSpec, boolean replace,
+      Map<String, String> partSpec, LoadFileType loadFileType,
       boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
       boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask) 
throws HiveException {
     Table tbl = getTable(tableName);
-    loadPartition(loadPath, tbl, partSpec, replace, inheritTableSpecs,
+    loadPartition(loadPath, tbl, partSpec, loadFileType, inheritTableSpecs,
         isSkewedStoreAsSubdir, isSrcLocal, isAcid, hasFollowingStatsTask);
   }
 
@@ -1653,9 +1654,9 @@ public class Hive {
    *          name of table to be loaded.
    * @param partSpec
    *          defines which partition needs to be loaded
-   * @param replace
-   *          if true - replace files in the partition, otherwise add files to
-   *          the partition
+   * @param loadFileType
+   *          if REPLACE_ALL - replace files in the table,
+   *          otherwise add files to table (KEEP_EXISTING, OVERWRITE_EXISTING)
    * @param inheritTableSpecs if true, on [re]creating the partition, take the
    *          location/inputformat/outputformat/serde details from table spec
    * @param isSrcLocal
@@ -1667,7 +1668,7 @@ public class Hive {
    * @return Partition object being loaded with data
    */
   public Partition loadPartition(Path loadPath, Table tbl,
-      Map<String, String> partSpec, boolean replace,
+      Map<String, String> partSpec, LoadFileType loadFileType,
       boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
       boolean isSrcLocal, boolean isAcid, boolean hasFollowingStatsTask) 
throws HiveException {
 
@@ -1719,13 +1720,14 @@ public class Hive {
         newFiles = Collections.synchronizedList(new ArrayList<Path>());
       }
 
-      if (replace || (oldPart == null && !isAcid)) {
+      if ((loadFileType == LoadFileType.REPLACE_ALL) || (oldPart == null && 
!isAcid)) {
         boolean isAutoPurge = 
"true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
         replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, 
getConf(),
             isSrcLocal, isAutoPurge, newFiles);
       } else {
         FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
-        Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid, 
newFiles);
+        Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid,
+                            (loadFileType == LoadFileType.OVERWRITE_EXISTING), 
newFiles);
       }
       perfLogger.PerfLogEnd("MoveTask", "FileMoves");
       Partition newTPart = oldPart != null ? oldPart : new Partition(tbl, 
partSpec, newPartPath);
@@ -1735,7 +1737,7 @@ public class Hive {
       // Generate an insert event only if inserting into an existing partition
       // When inserting into a new partition, the add partition event takes 
care of insert event
       if ((null != oldPart) && (null != newFiles)) {
-        fireInsertEvent(tbl, partSpec, replace, newFiles);
+        fireInsertEvent(tbl, partSpec, (loadFileType == 
LoadFileType.REPLACE_ALL), newFiles);
       } else {
         LOG.debug("No new files were created, and is not a replace, or we're 
inserting into a "
                 + "partition that does not exist yet. Skipping generating 
INSERT event.");
@@ -1950,7 +1952,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
    * @param loadPath
    * @param tableName
    * @param partSpec
-   * @param replace
+   * @param loadFileType
    * @param numDP number of dynamic partitions
    * @param listBucketingEnabled
    * @param isAcid true if this is an ACID operation
@@ -1959,7 +1961,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
    * @throws HiveException
    */
   public Map<Map<String, String>, Partition> loadDynamicPartitions(final Path 
loadPath,
-      final String tableName, final Map<String, String> partSpec, final 
boolean replace,
+      final String tableName, final Map<String, String> partSpec, final 
LoadFileType loadFileType,
       final int numDP, final boolean listBucketingEnabled, final boolean 
isAcid, final long txnId,
       final boolean hasFollowingStatsTask, final AcidUtils.Operation operation)
       throws HiveException {
@@ -2005,7 +2007,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
 
               // load the partition
               Partition newPartition = loadPartition(partPath, tbl, 
fullPartSpec,
-                  replace, true, listBucketingEnabled,
+                  loadFileType, true, listBucketingEnabled,
                   false, isAcid, hasFollowingStatsTask);
               partitionsMap.put(fullPartSpec, newPartition);
 
@@ -2029,7 +2031,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
                   + " partPath=" + partPath + ", "
                   + " table=" + tbl.getTableName() + ", "
                   + " partSpec=" + fullPartSpec + ", "
-                  + " replace=" + replace + ", "
+                  + " loadFileType=" + loadFileType.toString() + ", "
                   + " listBucketingEnabled=" + listBucketingEnabled + ", "
                   + " isAcid=" + isAcid + ", "
                   + " hasFollowingStatsTask=" + hasFollowingStatsTask, t);
@@ -2086,8 +2088,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
    *          Directory containing files to load into Table
    * @param tableName
    *          name of table to be loaded.
-   * @param replace
-   *          if true - replace files in the table, otherwise add files to 
table
+   * @param loadFileType
+   *          if REPLACE_ALL - replace files in the table,
+   *          otherwise add files to table (KEEP_EXISTING, OVERWRITE_EXISTING)
    * @param isSrcLocal
    *          If the source directory is LOCAL
    * @param isSkewedStoreAsSubdir
@@ -2096,8 +2099,8 @@ private void constructOneLBLocationMap(FileStatus fSta,
    *          if there is any following stats task
    * @param isAcid true if this is an ACID based write
    */
-  public void loadTable(Path loadPath, String tableName, boolean replace, 
boolean isSrcLocal,
-      boolean isSkewedStoreAsSubdir, boolean isAcid, boolean 
hasFollowingStatsTask)
+  public void loadTable(Path loadPath, String tableName, LoadFileType 
loadFileType, boolean isSrcLocal,
+                        boolean isSkewedStoreAsSubdir, boolean isAcid, boolean 
hasFollowingStatsTask)
       throws HiveException {
 
     List<Path> newFiles = null;
@@ -2106,7 +2109,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) {
       newFiles = Collections.synchronizedList(new ArrayList<Path>());
     }
-    if (replace) {
+    if (loadFileType == LoadFileType.REPLACE_ALL) {
       Path tableDest = tbl.getPath();
       boolean isAutopurge = 
"true".equalsIgnoreCase(tbl.getProperty("auto.purge"));
       replaceFiles(tableDest, loadPath, tableDest, tableDest, sessionConf, 
isSrcLocal, isAutopurge, newFiles);
@@ -2114,7 +2117,8 @@ private void constructOneLBLocationMap(FileStatus fSta,
       FileSystem fs;
       try {
         fs = tbl.getDataLocation().getFileSystem(sessionConf);
-        copyFiles(sessionConf, loadPath, tbl.getPath(), fs, isSrcLocal, 
isAcid, newFiles);
+        copyFiles(sessionConf, loadPath, tbl.getPath(), fs, isSrcLocal, isAcid,
+                          (loadFileType == LoadFileType.OVERWRITE_EXISTING), 
newFiles);
       } catch (IOException e) {
         throw new HiveException("addFiles: filesystem error in check phase", 
e);
       }
@@ -2151,7 +2155,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
       throw new HiveException(e);
     }
 
-    fireInsertEvent(tbl, null, replace, newFiles);
+    fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), 
newFiles);
   }
 
   /**
@@ -2972,8 +2976,8 @@ private void constructOneLBLocationMap(FileStatus fSta,
   }
 
   private static void copyFiles(final HiveConf conf, final FileSystem destFs,
-      FileStatus[] srcs, final FileSystem srcFs, final Path destf, final 
boolean isSrcLocal, final List<Path> newFiles)
-          throws HiveException {
+            FileStatus[] srcs, final FileSystem srcFs, final Path destf, final 
boolean isSrcLocal,
+            boolean isOverwrite, final List<Path> newFiles) throws 
HiveException {
 
     final HdfsUtils.HadoopFileStatus fullDestStatus;
     try {
@@ -3016,7 +3020,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
         // copy from source to destination, we will inherit the destination's 
parent group ownership.
         if (null == pool) {
           try {
-            Path destPath = mvFile(conf, srcFs, srcP, destFs, destf, 
isSrcLocal, isRenameAllowed);
+            Path destPath = mvFile(conf, srcFs, srcP, destFs, destf, 
isSrcLocal, isOverwrite, isRenameAllowed);
 
             if (null != newFiles) {
               newFiles.add(destPath);
@@ -3032,7 +3036,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
 
               try {
                 Path destPath =
-                    mvFile(conf, srcFs, srcP, destFs, destf, isSrcLocal, 
isRenameAllowed);
+                    mvFile(conf, srcFs, srcP, destFs, destf, isSrcLocal, 
isOverwrite, isRenameAllowed);
 
                 if (null != newFiles) {
                   newFiles.add(destPath);
@@ -3121,6 +3125,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
    * @param destFs the {@link FileSystem} to move the file to
    * @param destDirPath the {@link Path} to move the file to
    * @param isSrcLocal if the source file is on the local filesystem
+   * @param isOverwrite if true, then overwrite destination file if exist else 
make a duplicate copy
    * @param isRenameAllowed true if the data should be renamed and not copied, 
false otherwise
    *
    * @return the {@link Path} the source file was moved to
@@ -3128,7 +3133,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
    * @throws IOException if there was an issue moving the file
    */
   private static Path mvFile(HiveConf conf, FileSystem sourceFs, Path 
sourcePath, FileSystem destFs, Path destDirPath,
-                             boolean isSrcLocal, boolean isRenameAllowed) 
throws IOException {
+                             boolean isSrcLocal, boolean isOverwrite, boolean 
isRenameAllowed) throws IOException {
 
     // Strip off the file type, if any so we don't make:
     // 000000_0.gz -> 000000_0.gz_copy_1
@@ -3147,6 +3152,10 @@ private void constructOneLBLocationMap(FileStatus fSta,
     * I'll leave the below loop for now until a better approach is found.
     */
     for (int counter = 1; destFs.exists(destFilePath); counter++) {
+      if (isOverwrite) {
+        destFs.delete(destFilePath, false);
+        break;
+      }
       destFilePath =  new Path(destDirPath, name + (Utilities.COPY_KEYWORD + 
counter) + (!type.isEmpty() ? "." + type : ""));
     }
 
@@ -3429,12 +3438,14 @@ private void constructOneLBLocationMap(FileStatus fSta,
    * @param fs Filesystem
    * @param isSrcLocal true if source is on local file system
    * @param isAcid true if this is an ACID based write
+   * @param isOverwrite if true, then overwrite if destination file exist, 
else add a duplicate copy
    * @param newFiles if this is non-null, a list of files that were created as 
a result of this
    *                 move will be returned.
    * @throws HiveException
    */
-  static protected void copyFiles(HiveConf conf, Path srcf, Path destf,
-      FileSystem fs, boolean isSrcLocal, boolean isAcid, List<Path> newFiles) 
throws HiveException {
+  static protected void copyFiles(HiveConf conf, Path srcf, Path destf, 
FileSystem fs,
+                                  boolean isSrcLocal, boolean isAcid,
+                                  boolean isOverwrite, List<Path> newFiles) 
throws HiveException {
     try {
       // create the destination if it does not exist
       if (!fs.exists(destf)) {
@@ -3466,7 +3477,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     if (isAcid) {
       moveAcidFiles(srcFs, srcs, destf, newFiles);
     } else {
-      copyFiles(conf, fs, srcs, srcFs, destf, isSrcLocal, newFiles);
+      copyFiles(conf, fs, srcs, srcFs, destf, isSrcLocal, isOverwrite, 
newFiles);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 751bda0..51b4f36 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
 import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde.serdeConstants;
@@ -350,7 +351,7 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
     Task<?> copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, 
tmpPath, x.getConf());
     LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath,
         Utilities.getTableDesc(table), new TreeMap<>(),
-        replace);
+        replace ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING);
     Task<?> loadTableTask = TaskFactory.get(new MoveWork(x.getInputs(),
             x.getOutputs(), loadTableWork, null, false, 
SessionState.get().getLineageState()),
         x.getConf());
@@ -420,7 +421,8 @@ public class ImportSemanticAnalyzer extends 
BaseSemanticAnalyzer {
           x.getOutputs(), addPartitionDesc), x.getConf());
       LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath,
           Utilities.getTableDesc(table),
-          partSpec.getPartSpec(), replicationSpec.isReplace());
+          partSpec.getPartSpec(),
+          replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : 
LoadFileType.OVERWRITE_EXISTING);
       loadTableWork.setInheritTableSpecs(false);
       Task<?> loadPartTask = TaskFactory.get(new MoveWork(
               x.getInputs(), x.getOutputs(), loadTableWork, null, false,

http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
index 8879b80..033235b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.StatsWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -275,7 +276,8 @@ public class LoadSemanticAnalyzer extends 
BaseSemanticAnalyzer {
 
     LoadTableDesc loadTableWork;
     loadTableWork = new LoadTableDesc(new Path(fromURI),
-      Utilities.getTableDesc(ts.tableHandle), partSpec, isOverWrite);
+      Utilities.getTableDesc(ts.tableHandle), partSpec,
+      isOverWrite ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING);
     if (preservePartitionSpecs){
       // Note : preservePartitionSpecs=true implies inheritTableSpecs=false but
       // but preservePartitionSpecs=false(default) here is not sufficient 
enough

http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 9c6c556..c8277f4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -190,6 +190,7 @@ import org.apache.hadoop.hive.ql.plan.LimitDesc;
 import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
 import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PTFDesc;
@@ -6917,8 +6918,10 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
             currentTransactionId);
         // For Acid table, Insert Overwrite shouldn't replace the table 
content. We keep the old
         // deltas and base and leave them up to the cleaner to clean up
-        
ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
-            dest_tab.getTableName()) && !destTableIsAcid);
+        LoadFileType loadType = 
(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
+                dest_tab.getTableName()) && !destTableIsAcid)
+                ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING;
+        ltd.setLoadFileType(loadType);
         ltd.setLbCtx(lbCtx);
         loadTableWork.add(ltd);
       } else {
@@ -7035,8 +7038,10 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
           currentTransactionId);
       // For Acid table, Insert Overwrite shouldn't replace the table content. 
We keep the old
       // deltas and base and leave them up to the cleaner to clean up
-      ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
-          dest_tab.getTableName()) && !destTableIsAcid);
+      LoadFileType loadType = 
(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
+              dest_tab.getTableName()) && !destTableIsAcid)
+              ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING;
+      ltd.setLoadFileType(loadType);
       ltd.setLbCtx(lbCtx);
 
       loadTableWork.add(ltd);
@@ -13658,8 +13663,8 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
     // and don't have a rational way to guess, so assume the most
     // conservative case.
     if (isNonNativeTable) return WriteEntity.WriteType.INSERT_OVERWRITE;
-    else return (ltd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE :
-      getWriteType(dest));
+    else return ((ltd.getLoadFileType() == LoadFileType.REPLACE_ALL)
+                         ? WriteEntity.WriteType.INSERT_OVERWRITE : 
getWriteType(dest));
   }
 
   private WriteEntity.WriteType getWriteType(String dest) {

http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index 71cdbde..f24d1b6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -62,11 +62,9 @@ public class CopyUtils {
   // Used by replication, copy files from source to destination. It is 
possible source file is
   // changed/removed during copy, so double check the checksum after copy,
   // if not match, copy again from cm
-  public void copyAndVerify(Path destination, List<ReplChangeManager.FileInfo> 
srcFiles)
-          throws IOException, LoginException {
+  public void copyAndVerify(FileSystem destinationFs, Path destination,
+                    List<ReplChangeManager.FileInfo> srcFiles) throws 
IOException, LoginException {
     Map<FileSystem, List<ReplChangeManager.FileInfo>> map = 
fsToFileMap(srcFiles);
-    FileSystem destinationFs = destination.getFileSystem(hiveConf);
-
     for (Map.Entry<FileSystem, List<ReplChangeManager.FileInfo>> entry : 
map.entrySet()) {
       FileSystem sourceFs = entry.getKey();
       List<ReplChangeManager.FileInfo> fileInfoList = entry.getValue();

http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
index 90a970c..e15f59c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
@@ -32,7 +32,7 @@ import java.util.Map;
  */
 public class LoadTableDesc extends LoadDesc implements Serializable {
   private static final long serialVersionUID = 1L;
-  private boolean replace;
+  private LoadFileType loadFileType;
   private DynamicPartitionCtx dpCtx;
   private ListBucketingCtx lbCtx;
   private boolean inheritTableSpecs = true; //For partitions, flag controlling 
whether the current
@@ -46,10 +46,15 @@ public class LoadTableDesc extends LoadDesc implements 
Serializable {
   private org.apache.hadoop.hive.ql.plan.TableDesc table;
   private Map<String, String> partitionSpec; // NOTE: this partitionSpec has 
to be ordered map
 
+  public enum LoadFileType {
+    REPLACE_ALL,        // Remove all existing data before copy/move
+    KEEP_EXISTING,      // If any file exist while copy, then just duplicate 
the file
+    OVERWRITE_EXISTING  // If any file exist while copy, then just overwrite 
the file
+  }
   public LoadTableDesc(final LoadTableDesc o) {
     super(o.getSourcePath(), o.getWriteType());
 
-    this.replace = o.replace;
+    this.loadFileType = o.loadFileType;
     this.dpCtx = o.dpCtx;
     this.lbCtx = o.lbCtx;
     this.inheritTableSpecs = o.inheritTableSpecs;
@@ -61,11 +66,11 @@ public class LoadTableDesc extends LoadDesc implements 
Serializable {
   public LoadTableDesc(final Path sourcePath,
       final TableDesc table,
       final Map<String, String> partitionSpec,
-      final boolean replace,
+      final LoadFileType loadFileType,
       final AcidUtils.Operation writeType, Long currentTransactionId) {
     super(sourcePath, writeType);
     this.currentTransactionId = currentTransactionId;
-    init(table, partitionSpec, replace);
+    init(table, partitionSpec, loadFileType);
   }
 
   /**
@@ -73,13 +78,13 @@ public class LoadTableDesc extends LoadDesc implements 
Serializable {
    * @param sourcePath
    * @param table
    * @param partitionSpec
-   * @param replace
+   * @param loadFileType
    */
   public LoadTableDesc(final Path sourcePath,
       final TableDesc table,
       final Map<String, String> partitionSpec,
-      final boolean replace) {
-    this(sourcePath, table, partitionSpec, replace, 
AcidUtils.Operation.NOT_ACID,
+      final LoadFileType loadFileType) {
+    this(sourcePath, table, partitionSpec, loadFileType, 
AcidUtils.Operation.NOT_ACID,
         null);
   }
 
@@ -87,7 +92,8 @@ public class LoadTableDesc extends LoadDesc implements 
Serializable {
       final TableDesc table,
       final Map<String, String> partitionSpec,
       final AcidUtils.Operation writeType, Long currentTransactionId) {
-    this(sourcePath, table, partitionSpec, true, writeType, 
currentTransactionId);
+    this(sourcePath, table, partitionSpec, LoadFileType.REPLACE_ALL,
+            writeType, currentTransactionId);
   }
 
   /**
@@ -99,7 +105,8 @@ public class LoadTableDesc extends LoadDesc implements 
Serializable {
   public LoadTableDesc(final Path sourcePath,
       final TableDesc table,
       final Map<String, String> partitionSpec) {
-    this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID, 
null);
+    this(sourcePath, table, partitionSpec, LoadFileType.REPLACE_ALL,
+            AcidUtils.Operation.NOT_ACID, null);
   }
 
   public LoadTableDesc(final Path sourcePath,
@@ -110,19 +117,19 @@ public class LoadTableDesc extends LoadDesc implements 
Serializable {
     this.dpCtx = dpCtx;
     this.currentTransactionId = currentTransactionId;
     if (dpCtx != null && dpCtx.getPartSpec() != null && partitionSpec == null) 
{
-      init(table, dpCtx.getPartSpec(), true);
+      init(table, dpCtx.getPartSpec(), LoadFileType.REPLACE_ALL);
     } else {
-      init(table, new LinkedHashMap<>(), true);
+      init(table, new LinkedHashMap<>(), LoadFileType.REPLACE_ALL);
     }
   }
 
   private void init(
       final org.apache.hadoop.hive.ql.plan.TableDesc table,
       final Map<String, String> partitionSpec,
-      final boolean replace) {
+      final LoadFileType loadFileType) {
     this.table = table;
     this.partitionSpec = partitionSpec;
-    this.replace = replace;
+    this.loadFileType = loadFileType;
   }
 
   @Explain(displayName = "table", explainLevels = { Level.USER, Level.DEFAULT, 
Level.EXTENDED })
@@ -145,11 +152,15 @@ public class LoadTableDesc extends LoadDesc implements 
Serializable {
 
   @Explain(displayName = "replace")
   public boolean getReplace() {
-    return replace;
+    return (loadFileType == LoadFileType.REPLACE_ALL);
+  }
+
+  public LoadFileType getLoadFileType() {
+    return loadFileType;
   }
 
-  public void setReplace(boolean replace) {
-    this.replace = replace;
+  public void setLoadFileType(LoadFileType loadFileType) {
+    this.loadFileType = loadFileType;
   }
 
   public DynamicPartitionCtx getDPCtx() {

http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
index c0c496f..e523049 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestExecDriver.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
@@ -140,7 +141,8 @@ public class TestExecDriver extends TestCase {
         db.dropTable(Warehouse.DEFAULT_DATABASE_NAME, src, true, true);
         db.createTable(src, cols, null, TextInputFormat.class,
             HiveIgnoreKeyTextOutputFormat.class);
-        db.loadTable(hadoopDataFile[i], src, false, true, false, false, false);
+        db.loadTable(hadoopDataFile[i], src, LoadFileType.KEEP_EXISTING,
+                true, false, false, false);
         i++;
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/addeab8d/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java 
b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java
index 0c1c230..cc1d857 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/metadata/TestHiveCopyFiles.java
@@ -83,7 +83,7 @@ public class TestHiveCopyFiles {
     FileSystem targetFs = targetPath.getFileSystem(hiveConf);
 
     try {
-      Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, 
isSourceLocal, NO_ACID, null);
+      Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, 
isSourceLocal, NO_ACID, false,null);
     } catch (HiveException e) {
       e.printStackTrace();
       assertTrue("Hive.copyFiles() threw an unexpected exception.", false);
@@ -107,7 +107,7 @@ public class TestHiveCopyFiles {
     FileSystem targetFs = targetPath.getFileSystem(hiveConf);
 
     try {
-      Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, 
isSourceLocal, NO_ACID, null);
+      Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, 
isSourceLocal, NO_ACID, false, null);
     } catch (HiveException e) {
       e.printStackTrace();
       assertTrue("Hive.copyFiles() threw an unexpected exception.", false);
@@ -127,7 +127,7 @@ public class TestHiveCopyFiles {
     sourceFolder.newFile("000001_0.gz");
 
     try {
-      Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, 
isSourceLocal, NO_ACID, null);
+      Hive.copyFiles(hiveConf, sourcePath, targetPath, targetFs, 
isSourceLocal, NO_ACID, false, null);
     } catch (HiveException e) {
       e.printStackTrace();
       assertTrue("Hive.copyFiles() threw an unexpected exception.", false);
@@ -158,7 +158,7 @@ public class TestHiveCopyFiles {
     Mockito.when(spyTargetFs.getUri()).thenReturn(URI.create("hdfs://" + 
targetPath.toUri().getPath()));
 
     try {
-      Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, 
isSourceLocal, NO_ACID, null);
+      Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, 
isSourceLocal, NO_ACID, false, null);
     } catch (HiveException e) {
       e.printStackTrace();
       assertTrue("Hive.copyFiles() threw an unexpected exception.", false);
@@ -185,7 +185,7 @@ public class TestHiveCopyFiles {
     Mockito.when(spyTargetFs.getUri()).thenReturn(URI.create("hdfs://" + 
targetPath.toUri().getPath()));
 
     try {
-      Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, 
isSourceLocal, NO_ACID, null);
+      Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, 
isSourceLocal, NO_ACID, false, null);
     } catch (HiveException e) {
       e.printStackTrace();
       assertTrue("Hive.copyFiles() threw an unexpected exception.", false);
@@ -205,7 +205,7 @@ public class TestHiveCopyFiles {
     sourceFolder.newFile("000001_0.gz");
 
     try {
-      Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, 
isSourceLocal, NO_ACID, null);
+      Hive.copyFiles(hiveConf, sourcePath, targetPath, spyTargetFs, 
isSourceLocal, NO_ACID, false, null);
     } catch (HiveException e) {
       e.printStackTrace();
       assertTrue("Hive.copyFiles() threw an unexpected exception.", false);

Reply via email to