http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index b000745,c8ad795..ed527b8
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@@ -392,12 -435,110 +390,12 @@@ public class MoveTask extends Task<Move
            // deal with dynamic partitions
            DynamicPartitionCtx dpCtx = tbd.getDPCtx();
            if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic 
partitions
 -
 -            List<LinkedHashMap<String, String>> dps = 
Utilities.getFullDPSpecs(conf, dpCtx);
 -
 -            console.printInfo(System.getProperty("line.separator"));
 -            long startTime = System.currentTimeMillis();
 -            // load the list of DP partitions and return the list of 
partition specs
 -            // TODO: In a follow-up to HIVE-1361, we should refactor 
loadDynamicPartitions
 -            // to use Utilities.getFullDPSpecs() to get the list of full 
partSpecs.
 -            // After that check the number of DPs created to not exceed the 
limit and
 -            // iterate over it and call loadPartition() here.
 -            // The reason we don't do inside HIVE-1361 is the latter is large 
and we
 -            // want to isolate any potential issue it may introduce.
 -            Map<Map<String, String>, Partition> dp =
 -              db.loadDynamicPartitions(
 -                tbd.getSourcePath(),
 -                tbd.getTable().getTableName(),
 -                tbd.getPartitionSpec(),
 -                tbd.getReplace(),
 -                dpCtx.getNumDPCols(),
 -                isSkewedStoredAsDirs(tbd),
 -                work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.NOT_ACID,
 -                work.getLoadTableWork().getCurrentTransactionId(), 
hasFollowingStatsTask(),
 -                work.getLoadTableWork().getWriteType());
 -
 -            // publish DP columns to its subscribers
 -            if (dps != null && dps.size() > 0) {
 -              pushFeed(FeedType.DYNAMIC_PARTITIONS, dp.values());
 -            }
 -
 -            String loadTime = "\t Time taken to load dynamic partitions: "  +
 -                (System.currentTimeMillis() - startTime)/1000.0 + " seconds";
 -            console.printInfo(loadTime);
 -            LOG.info(loadTime);
 -
 -            if (dp.size() == 0 && 
conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
 -              throw new HiveException("This query creates no partitions." +
 -                  " To turn off this error, set 
hive.error.on.empty.partition=false.");
 -            }
 -
 -            startTime = System.currentTimeMillis();
 -            // for each partition spec, get the partition
 -            // and put it to WriteEntity for post-exec hook
 -            for(Map.Entry<Map<String, String>, Partition> entry : 
dp.entrySet()) {
 -              Partition partn = entry.getValue();
 -
 -              if (bucketCols != null || sortCols != null) {
 -                updatePartitionBucketSortColumns(
 -                    db, table, partn, bucketCols, numBuckets, sortCols);
 -              }
 -
 -              WriteEntity enty = new WriteEntity(partn,
 -                getWriteType(tbd, work.getLoadTableWork().getWriteType()));
 -              if (work.getOutputs() != null) {
 -                DDLTask.addIfAbsentByName(enty, work.getOutputs());
 -              }
 -              // Need to update the queryPlan's output as well so that 
post-exec hook get executed.
 -              // This is only needed for dynamic partitioning since for SP 
the the WriteEntity is
 -              // constructed at compile time and the queryPlan already 
contains that.
 -              // For DP, WriteEntity creation is deferred at this stage so we 
need to update
 -              // queryPlan here.
 -              if (queryPlan.getOutputs() == null) {
 -                queryPlan.setOutputs(new LinkedHashSet<WriteEntity>());
 -              }
 -              queryPlan.getOutputs().add(enty);
 -
 -              // update columnar lineage for each partition
 -              dc = new DataContainer(table.getTTable(), 
partn.getTPartition());
 -
 -              // Don't set lineage on delete as we don't have all the columns
 -              if (work.getLineagState() != null &&
 -                  work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.DELETE &&
 -                  work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.UPDATE) {
 -                work.getLineagState().setLineage(tbd.getSourcePath(), dc,
 -                    table.getCols());
 -              }
 -              LOG.info("\tLoading partition " + entry.getKey());
 -            }
 -            console.printInfo("\t Time taken for adding to write entity : " +
 -                (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
 -            dc = null; // reset data container to prevent it being added 
again.
 +            dc = handleDynParts(db, table, tbd, ti, dpCtx);
            } else { // static partitions
 -            List<String> partVals = 
MetaStoreUtils.getPvals(table.getPartCols(),
 -                tbd.getPartitionSpec());
 -            db.validatePartitionNameCharacters(partVals);
 -            db.loadPartition(tbd.getSourcePath(), 
tbd.getTable().getTableName(),
 -                tbd.getPartitionSpec(), tbd.getReplace(),
 -                tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), 
work.isSrcLocal(),
 -                work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.NOT_ACID, hasFollowingStatsTask());
 -            Partition partn = db.getPartition(table, tbd.getPartitionSpec(), 
false);
 -
 -            if (bucketCols != null || sortCols != null) {
 -              updatePartitionBucketSortColumns(db, table, partn, bucketCols,
 -                  numBuckets, sortCols);
 -            }
 -
 -            dc = new DataContainer(table.getTTable(), partn.getTPartition());
 -            // add this partition to post-execution hook
 -            if (work.getOutputs() != null) {
 -              DDLTask.addIfAbsentByName(new WriteEntity(partn,
 -                getWriteType(tbd, work.getLoadTableWork().getWriteType())), 
work.getOutputs());
 -            }
 -         }
 +            dc = handleStaticParts(db, table, tbd, ti);
 +          }
          }
-         if (SessionState.get() != null && dc != null) {
+         if (work.getLineagState() != null && dc != null) {
            // If we are doing an update or a delete the number of columns in 
the table will not
            // match the number of columns in the file sink.  For update there 
will be one too many
            // (because of the ROW__ID), and in the case of the delete there 
will be just the
@@@ -445,231 -586,6 +443,231 @@@
        return (1);
      }
    }
 +
 +  private DataContainer handleStaticParts(Hive db, Table table, LoadTableDesc 
tbd,
 +      TaskInformation ti) throws HiveException, IOException, 
InvalidOperationException {
 +    List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(),  
tbd.getPartitionSpec());
 +    db.validatePartitionNameCharacters(partVals);
 +    Utilities.LOG14535.info("loadPartition called from " + tbd.getSourcePath()
 +        + " into " + tbd.getTable().getTableName());
 +    db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(),
 +        tbd.getPartitionSpec(), tbd.getReplace(),
 +        tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), 
work.isSrcLocal(),
 +        work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.NOT_ACID &&
 +            !tbd.isMmTable(),
 +        hasFollowingStatsTask(), tbd.getTxnId(), tbd.getStmtId());
 +    Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
 +
 +    // See the comment inside updatePartitionBucketSortColumns.
 +    if (!tbd.isMmTable() && (ti.bucketCols != null || ti.sortCols != null)) {
 +      updatePartitionBucketSortColumns(db, table, partn, ti.bucketCols,
 +          ti.numBuckets, ti.sortCols);
 +    }
 +
 +    DataContainer dc = new DataContainer(table.getTTable(), 
partn.getTPartition());
 +    // add this partition to post-execution hook
 +    if (work.getOutputs() != null) {
 +      DDLTask.addIfAbsentByName(new WriteEntity(partn,
 +        getWriteType(tbd, work.getLoadTableWork().getWriteType())), 
work.getOutputs());
 +    }
 +    return dc;
 +  }
 +
 +  private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc 
tbd,
 +      TaskInformation ti, DynamicPartitionCtx dpCtx) throws HiveException,
 +      IOException, InvalidOperationException {
 +    DataContainer dc;
 +    List<LinkedHashMap<String, String>> dps = Utilities.getFullDPSpecs(conf, 
dpCtx);
 +
 +    console.printInfo(System.getProperty("line.separator"));
 +    long startTime = System.currentTimeMillis();
 +    // load the list of DP partitions and return the list of partition specs
 +    // TODO: In a follow-up to HIVE-1361, we should refactor 
loadDynamicPartitions
 +    // to use Utilities.getFullDPSpecs() to get the list of full partSpecs.
 +    // After that check the number of DPs created to not exceed the limit and
 +    // iterate over it and call loadPartition() here.
 +    // The reason we don't do inside HIVE-1361 is the latter is large and we
 +    // want to isolate any potential issue it may introduce.
 +    if (tbd.isMmTable() && !tbd.isCommitMmWrite()) {
 +      throw new HiveException("Only single-partition LoadTableDesc can skip 
commiting write ID");
 +    }
 +    Map<Map<String, String>, Partition> dp =
 +      db.loadDynamicPartitions(
 +        tbd.getSourcePath(),
 +        tbd.getTable().getTableName(),
 +        tbd.getPartitionSpec(),
 +        tbd.getReplace(),
 +        dpCtx.getNumDPCols(),
 +        (tbd.getLbCtx() == null) ? 0 : 
tbd.getLbCtx().calculateListBucketingLevel(),
 +        work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.NOT_ACID &&
 +            !tbd.isMmTable(),
-         SessionState.get().getTxnMgr().getCurrentTxnId(), tbd.getStmtId(), 
hasFollowingStatsTask(),
++        work.getLoadTableWork().getTxnId(), tbd.getStmtId(), 
hasFollowingStatsTask(),
 +        work.getLoadTableWork().getWriteType());
 +
 +    // publish DP columns to its subscribers
 +    if (dps != null && dps.size() > 0) {
 +      pushFeed(FeedType.DYNAMIC_PARTITIONS, dp.values());
 +    }
 +
 +    String loadTime = "\t Time taken to load dynamic partitions: "  +
 +        (System.currentTimeMillis() - startTime)/1000.0 + " seconds";
 +    console.printInfo(loadTime);
 +    LOG.info(loadTime);
 +
 +    if (dp.size() == 0 && 
conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
 +      throw new HiveException("This query creates no partitions." +
 +          " To turn off this error, set 
hive.error.on.empty.partition=false.");
 +    }
 +
 +    startTime = System.currentTimeMillis();
 +    // for each partition spec, get the partition
 +    // and put it to WriteEntity for post-exec hook
 +    for(Map.Entry<Map<String, String>, Partition> entry : dp.entrySet()) {
 +      Partition partn = entry.getValue();
 +
 +      // See the comment inside updatePartitionBucketSortColumns.
 +      if (!tbd.isMmTable() && (ti.bucketCols != null || ti.sortCols != null)) 
{
 +        updatePartitionBucketSortColumns(
 +            db, table, partn, ti.bucketCols, ti.numBuckets, ti.sortCols);
 +      }
 +
 +      WriteEntity enty = new WriteEntity(partn,
 +        getWriteType(tbd, work.getLoadTableWork().getWriteType()));
 +      if (work.getOutputs() != null) {
 +        DDLTask.addIfAbsentByName(enty, work.getOutputs());
 +      }
 +      // Need to update the queryPlan's output as well so that post-exec hook 
get executed.
 +      // This is only needed for dynamic partitioning since for SP the the 
WriteEntity is
 +      // constructed at compile time and the queryPlan already contains that.
 +      // For DP, WriteEntity creation is deferred at this stage so we need to 
update
 +      // queryPlan here.
 +      if (queryPlan.getOutputs() == null) {
 +        queryPlan.setOutputs(new LinkedHashSet<WriteEntity>());
 +      }
 +      queryPlan.getOutputs().add(enty);
 +
 +      // update columnar lineage for each partition
 +      dc = new DataContainer(table.getTTable(), partn.getTPartition());
 +
 +      // Don't set lineage on delete as we don't have all the columns
-       if (SessionState.get() != null &&
++      if (work.getLineagState() != null &&
 +          work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.DELETE &&
 +          work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.UPDATE) {
-         SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), 
dc,
++        work.getLineagState().setLineage(tbd.getSourcePath(), dc,
 +            table.getCols());
 +      }
 +      LOG.info("\tLoading partition " + entry.getKey());
 +    }
 +    console.printInfo("\t Time taken for adding to write entity : " +
 +        (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
 +    dc = null; // reset data container to prevent it being added again.
 +    return dc;
 +  }
 +
 +  private void inferTaskInformation(TaskInformation ti) {
 +    // Find the first ancestor of this MoveTask which is some form of map 
reduce task
 +    // (Either standard, local, or a merge)
 +    while (ti.task.getParentTasks() != null && 
ti.task.getParentTasks().size() == 1) {
 +      ti.task = (Task)ti.task.getParentTasks().get(0);
 +      // If it was a merge task or a local map reduce task, nothing can be 
inferred
 +      if (ti.task instanceof MergeFileTask || ti.task instanceof 
MapredLocalTask) {
 +        break;
 +      }
 +
 +      // If it's a standard map reduce task, check what, if anything, it 
inferred about
 +      // the directory this move task is moving
 +      if (ti.task instanceof MapRedTask) {
 +        MapredWork work = (MapredWork)ti.task.getWork();
 +        MapWork mapWork = work.getMapWork();
 +        ti.bucketCols = mapWork.getBucketedColsByDirectory().get(ti.path);
 +        ti.sortCols = mapWork.getSortedColsByDirectory().get(ti.path);
 +        if (work.getReduceWork() != null) {
 +          ti.numBuckets = work.getReduceWork().getNumReduceTasks();
 +        }
 +
 +        if (ti.bucketCols != null || ti.sortCols != null) {
 +          // This must be a final map reduce task (the task containing the 
file sink
 +          // operator that writes the final output)
 +          assert work.isFinalMapRed();
 +        }
 +        break;
 +      }
 +
 +      // If it's a move task, get the path the files were moved from, this is 
what any
 +      // preceding map reduce task inferred information about, and moving 
does not invalidate
 +      // those assumptions
 +      // This can happen when a conditional merge is added before the final 
MoveTask, but the
 +      // condition for merging is not met, see GenMRFileSink1.
 +      if (ti.task instanceof MoveTask) {
 +        MoveTask mt = (MoveTask)ti.task;
 +        if (mt.getWork().getLoadFileWork() != null) {
 +          ti.path = 
mt.getWork().getLoadFileWork().getSourcePath().toUri().toString();
 +        }
 +      }
 +    }
 +  }
 +
 +  private void checkFileFormats(Hive db, LoadTableDesc tbd, Table table)
 +      throws HiveException {
 +    if (work.getCheckFileFormat()) {
 +      // Get all files from the src directory
 +      FileStatus[] dirs;
 +      ArrayList<FileStatus> files;
 +      FileSystem srcFs; // source filesystem
 +      try {
 +        srcFs = tbd.getSourcePath().getFileSystem(conf);
 +        dirs = srcFs.globStatus(tbd.getSourcePath());
 +        files = new ArrayList<FileStatus>();
 +        for (int i = 0; (dirs != null && i < dirs.length); i++) {
 +          files.addAll(Arrays.asList(srcFs.listStatus(dirs[i].getPath(), 
FileUtils.HIDDEN_FILES_PATH_FILTER)));
 +          // We only check one file, so exit the loop when we have at least
 +          // one.
 +          if (files.size() > 0) {
 +            break;
 +          }
 +        }
 +      } catch (IOException e) {
 +        throw new HiveException(
 +            "addFiles: filesystem error in check phase", e);
 +      }
 +
 +      // handle file format check for table level
 +      if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECHECKFILEFORMAT)) {
 +        boolean flag = true;
 +        // work.checkFileFormat is set to true only for Load Task, so 
assumption here is
 +        // dynamic partition context is null
 +        if (tbd.getDPCtx() == null) {
 +          if (tbd.getPartitionSpec() == null || 
tbd.getPartitionSpec().isEmpty()) {
 +            // Check if the file format of the file matches that of the table.
 +            flag = HiveFileFormatUtils.checkInputFormat(
 +                srcFs, conf, tbd.getTable().getInputFileFormatClass(), files);
 +          } else {
 +            // Check if the file format of the file matches that of the 
partition
 +            Partition oldPart = db.getPartition(table, 
tbd.getPartitionSpec(), false);
 +            if (oldPart == null) {
 +              // this means we have just created a table and are specifying 
partition in the
 +              // load statement (without pre-creating the partition), in 
which case lets use
 +              // table input format class. inheritTableSpecs defaults to true 
so when a new
 +              // partition is created later it will automatically inherit 
input format
 +              // from table object
 +              flag = HiveFileFormatUtils.checkInputFormat(
 +                  srcFs, conf, tbd.getTable().getInputFileFormatClass(), 
files);
 +            } else {
 +              flag = HiveFileFormatUtils.checkInputFormat(
 +                  srcFs, conf, oldPart.getInputFormatClass(), files);
 +            }
 +          }
 +          if (!flag) {
 +            throw new HiveException(ErrorMsg.WRONG_FILE_FORMAT);
 +          }
 +        } else {
 +          LOG.warn("Skipping file format check as dpCtx is not null");
 +        }
 +      }
 +    }
 +  }
 +
 +
    /**
     * so to make sure we crate WriteEntity with the right WriteType.  This is 
(at this point) only
     * for consistency since LockManager (which is the only thing that pays 
attention to WriteType)

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
----------------------------------------------------------------------
diff --cc 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index 05698d1,5c6ef9f..51c4090
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@@ -240,10 -238,10 +240,11 @@@ public class LoadPartitions 
        Path tmpPath) {
      LoadTableDesc loadTableWork = new LoadTableDesc(
          tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(),
 -        event.replicationSpec().isReplace());
 +        event.replicationSpec().isReplace(), 
SessionState.get().getTxnMgr().getCurrentTxnId()
 +    );
      loadTableWork.setInheritTableSpecs(false);
-     MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), 
loadTableWork, null, false);
+     MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), 
loadTableWork, null, false,
+         context.sessionStateLineageState);
      return TaskFactory.get(work, context.hiveConf);
    }
  

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --cc 
ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index afc04a3,a9a9162..65a3a59
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@@ -225,11 -224,10 +225,12 @@@ public class LoadTable 
          ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, 
context.hiveConf);
  
      LoadTableDesc loadTableWork = new LoadTableDesc(
 -        tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), 
replicationSpec.isReplace());
 +        tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), 
replicationSpec.isReplace(),
 +        SessionState.get().getTxnMgr().getCurrentTxnId()
 +    );
      MoveWork moveWork =
-         new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, 
false);
+         new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, 
false,
+             context.sessionStateLineageState);
      Task<?> loadTableTask = TaskFactory.get(moveWork, context.hiveConf);
      copyTask.addDependentTask(loadTableTask);
      return copyTask;

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 6dfa607,25cca7b..6a56407
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@@ -1364,29 -1353,9 +1364,31 @@@ public final class GenMapRedUtils 
      
cplan.setInputformat("org.apache.hadoop.hive.ql.io.CombineHiveInputFormat");
      // NOTE: we should gather stats in MR1 rather than MR2 at merge job since 
we don't
      // know if merge MR2 will be triggered at execution time
 -    Task<MoveWork> mvTask = GenMapRedUtils.findMoveTask(mvTasks, fsOutput);
 +    MoveWork dummyMv = null;
 +    if (srcMmWriteId == null) {
 +      // Only create the movework for non-MM table. No action needed for a MM 
table.
 +      Utilities.LOG14535.info("creating dummy movetask for merge (with lfd)");
 +      dummyMv = new MoveWork(null, null, null,
-          new LoadFileDesc(inputDirName, finalName, true, null, null, false), 
false);
++          new LoadFileDesc(inputDirName, finalName, true, null, null, false), 
false,
++          SessionState.get().getLineageState());
 +    } else {
 +      // TODO# create the noop MoveWork to avoid q file changes for now. 
Should be removed w/the flag just before merge
 +      dummyMv = new MoveWork(null, null, null,
-           new LoadFileDesc(inputDirName, finalName, true, null, null, false), 
false);
++          new LoadFileDesc(inputDirName, finalName, true, null, null, false), 
false,
++          SessionState.get().getLineageState());
 +      dummyMv.setNoop(true);
 +    }
 +    // Use the original fsOp path here in case of MM - while the new FSOP 
merges files inside the
 +    // MM directory, the original MoveTask still commits based on the parent. 
Note that this path
 +    // can only be triggered for a merge that's part of insert for now; MM 
tables do not support
 +    // concatenate. Keeping the old logic for non-MM tables with temp 
directories and stuff.
 +    Path fsopPath = srcMmWriteId != null ? fsInputDesc.getFinalDirName() : 
finalName;
 +    Utilities.LOG14535.info("Looking for MoveTask to make it dependant on the 
conditional tasks");
 +
 +    Task<MoveWork> mvTask = GenMapRedUtils.findMoveTaskForFsopOutput(
 +        mvTasks, fsopPath, fsInputDesc.isMmTable());
      ConditionalTask cndTsk = GenMapRedUtils.createCondTask(conf, currTask, 
dummyMv, work,
 -        fsInputDesc.getFinalDirName(), finalName, mvTask, dependencyTask);
 +        fsInputDesc.getMergeInputDirName(), finalName, mvTask, 
dependencyTask);
  
      // keep the dynamic partition context in conditional task resolver context
      ConditionalResolverMergeFilesCtx mrCtx =

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
index fdeeacf,e70a72c..36bb89f
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
@@@ -1090,10 -1088,11 +1095,11 @@@ public class DDLSemanticAnalyzer extend
          Path queryTmpdir = ctx.getExternalTmpPath(newTblPartLoc);
          truncateTblDesc.setOutputDir(queryTmpdir);
          LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc,
-             partSpec == null ? new HashMap<String, String>() : partSpec, 
null);
 -            partSpec == null ? new HashMap<>() : partSpec);
++            partSpec == null ? new HashMap<>() : partSpec, null);
          ltd.setLbCtx(lbCtx);
 -        Task<MoveWork> moveTsk = TaskFactory
 -            .get(new MoveWork(null, null, ltd, null, false, 
SessionState.get().getLineageState()),
 -            conf);
 +        @SuppressWarnings("unchecked")
-         Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(null, null, 
ltd, null, false), conf);
++        Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(
++          null, null, ltd, null, false, 
SessionState.get().getLineageState()), conf);
          truncateTask.addDependentTask(moveTsk);
  
          // Recalculate the HDFS stats if auto gather stats is set
@@@ -1736,11 -1722,12 +1742,12 @@@
        TableDesc tblDesc = Utilities.getTableDesc(tblObj);
        Path queryTmpdir = ctx.getExternalTmpPath(newTblPartLoc);
        mergeDesc.setOutputDir(queryTmpdir);
 +      // No need to handle MM tables - unsupported path.
        LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc,
 -          partSpec == null ? new HashMap<>() : partSpec);
 +          partSpec == null ? new HashMap<>() : partSpec, null);
        ltd.setLbCtx(lbCtx);
-       Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, 
null, false), conf);
 -      Task<MoveWork> moveTsk = TaskFactory
 -          .get(new MoveWork(null, null, ltd, null, false, 
SessionState.get().getLineageState()),
 -          conf);
++      Task<MoveWork> moveTsk = TaskFactory.get(
++        new MoveWork(null, null, ltd, null, false, 
SessionState.get().getLineageState()), conf);
        mergeTask.addDependentTask(moveTsk);
  
        if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --cc 
ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 36c0c98,751bda0..b9f28eb
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@@ -360,36 -343,17 +360,36 @@@ public class ImportSemanticAnalyzer ext
      return tblDesc;
    }
  
 +
    private static Task<?> loadTable(URI fromURI, Table table, boolean replace, 
Path tgtPath,
 -                            ReplicationSpec replicationSpec, 
EximUtil.SemanticAnalyzerWrapperContext x) {
 +      ReplicationSpec replicationSpec, 
EximUtil.SemanticAnalyzerWrapperContext x,
 +      Long txnId, int stmtId, boolean isSourceMm) {
      Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME);
 -    Path tmpPath = x.getCtx().getExternalTmpPath(tgtPath);
 -    Task<?> copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, 
dataPath, tmpPath, x.getConf());
 -    LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath,
 -        Utilities.getTableDesc(table), new TreeMap<>(),
 -        replace);
 -    Task<?> loadTableTask = TaskFactory.get(new MoveWork(x.getInputs(),
 -            x.getOutputs(), loadTableWork, null, false, 
SessionState.get().getLineageState()),
 -        x.getConf());
 +    Path destPath = !MetaStoreUtils.isInsertOnlyTable(table.getParameters()) 
? x.getCtx().getExternalTmpPath(tgtPath)
 +        : new Path(tgtPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId));
 +    Utilities.LOG14535.info("adding import work for table with source 
location: "
 +        + dataPath + "; table: " + tgtPath + "; copy destination " + destPath 
+ "; mm "
 +        + txnId + " (src " + isSourceMm + ") for " + (table == null ? "a new 
table" : table.getTableName()));
 +
 +    Task<?> copyTask = null;
 +    if (replicationSpec.isInReplicationScope()) {
 +      if (isSourceMm || isAcid(txnId)) {
 +        // TODO: ReplCopyTask is completely screwed. Need to support when 
it's not as screwed.
 +        throw new RuntimeException("Replicating MM and ACID tables is not 
supported");
 +      }
 +      copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, 
destPath, x.getConf());
 +    } else {
 +      CopyWork cw = new CopyWork(dataPath, destPath, false);
 +      cw.setSkipSourceMmDirs(isSourceMm);
 +      copyTask = TaskFactory.get(cw, x.getConf());
 +    }
 +
 +    LoadTableDesc loadTableWork = new LoadTableDesc(destPath,
-         Utilities.getTableDesc(table), new TreeMap<String, String>(), 
replace, txnId);
++        Utilities.getTableDesc(table), new TreeMap<>(), replace, txnId);
 +    loadTableWork.setTxnId(txnId);
 +    loadTableWork.setStmtId(stmtId);
-     MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, 
null, false);
++    MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, 
null, false, SessionState.get().getLineageState());
 +    Task<?> loadTableTask = TaskFactory.get(mv, x.getConf());
      copyTask.addDependentTask(loadTableTask);
      x.getTasks().add(copyTask);
      return loadTableTask;
@@@ -450,40 -413,19 +455,40 @@@
            + partSpecToString(partSpec.getPartSpec())
            + " with source location: " + srcLocation);
        Path tgtLocation = new Path(partSpec.getLocation());
 -      Path tmpPath = x.getCtx().getExternalTmpPath(tgtLocation);
 -      Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
 -          replicationSpec, new Path(srcLocation), tmpPath, x.getConf());
 +      Path destPath = 
!MetaStoreUtils.isInsertOnlyTable(table.getParameters()) ? 
x.getCtx().getExternalTmpPath(tgtLocation)
 +          : new Path(tgtLocation, AcidUtils.deltaSubdir(txnId, txnId, 
stmtId));
 +      Path moveTaskSrc =  
!MetaStoreUtils.isInsertOnlyTable(table.getParameters()) ? destPath : 
tgtLocation;
 +      Utilities.LOG14535.info("adding import work for partition with source 
location: "
 +          + srcLocation + "; target: " + tgtLocation + "; copy dest " + 
destPath + "; mm "
 +          + txnId + " (src " + isSourceMm + ") for " + 
partSpecToString(partSpec.getPartSpec()));
 +
 +
 +      Task<?> copyTask = null;
 +      if (replicationSpec.isInReplicationScope()) {
 +        if (isSourceMm || isAcid(txnId)) {
 +          // TODO: ReplCopyTask is completely screwed. Need to support when 
it's not as screwed.
 +          throw new RuntimeException("Replicating MM and ACID tables is not 
supported");
 +        }
 +        copyTask = ReplCopyTask.getLoadCopyTask(
 +            replicationSpec, new Path(srcLocation), destPath, x.getConf());
 +      } else {
 +        CopyWork cw = new CopyWork(new Path(srcLocation), destPath, false);
 +        cw.setSkipSourceMmDirs(isSourceMm);
 +        copyTask = TaskFactory.get(cw, x.getConf());
 +      }
 +
        Task<?> addPartTask = TaskFactory.get(new DDLWork(x.getInputs(),
            x.getOutputs(), addPartitionDesc), x.getConf());
 -      LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath,
 -          Utilities.getTableDesc(table),
 -          partSpec.getPartSpec(), replicationSpec.isReplace());
 +      LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, 
Utilities.getTableDesc(table),
 +          partSpec.getPartSpec(), replicationSpec.isReplace(), txnId);
 +      loadTableWork.setTxnId(txnId);
 +      loadTableWork.setStmtId(stmtId);
        loadTableWork.setInheritTableSpecs(false);
 +      // Do not commit the write ID from each task; need to commit once.
 +      // TODO: we should just change the import to use a single MoveTask, 
like dynparts.
 +      loadTableWork.setIntermediateInMmWrite(isAcid(txnId));
        Task<?> loadPartTask = TaskFactory.get(new MoveWork(
-           x.getInputs(), x.getOutputs(), loadTableWork, null, false), 
x.getConf());
 -              x.getInputs(), x.getOutputs(), loadTableWork, null, false,
 -              SessionState.get().getLineageState()),
 -          x.getConf());
++          x.getInputs(), x.getOutputs(), loadTableWork, null, false, 
SessionState.get().getLineageState()), x.getConf());
        copyTask.addDependentTask(loadPartTask);
        addPartTask.addDependentTask(loadPartTask);
        x.getTasks().add(copyTask);

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 3475c7c,4814fcd..df50aab
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@@ -6858,15 -6902,10 +6861,18 @@@ public class SemanticAnalyzer extends B
            acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
            checkAcidConstraints(qb, table_desc, dest_tab);
          }
 -        Long currentTransactionId = acidOp == Operation.NOT_ACID ? null :
 +        if (MetaStoreUtils.isInsertOnlyTable(table_desc.getProperties())) {
 +          acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
 +        }
 +        if (isMmTable) {
 +          txnId = SessionState.get().getTxnMgr().getCurrentTxnId();
++        } else {
++          txnId = acidOp == Operation.NOT_ACID ? null :
+             SessionState.get().getTxnMgr().getCurrentTxnId();
 -        ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp,
 -            currentTransactionId);
 +        }
 +        boolean isReplace = !qb.getParseInfo().isInsertIntoTable(
 +            dest_tab.getDbName(), dest_tab.getTableName());
 +        ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, 
isReplace, txnId);
          // For Acid table, Insert Overwrite shouldn't replace the table 
content. We keep the old
          // deltas and base and leave them up to the cleaner to clean up
          
ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
@@@ -6926,13 -7020,10 +6932,16 @@@
          acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
          checkAcidConstraints(qb, table_desc, dest_tab);
        }
 -      Long currentTransactionId = (acidOp == Operation.NOT_ACID) ? null :
 +      if 
(MetaStoreUtils.isInsertOnlyTable(dest_part.getTable().getParameters())) {
 +        acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest);
 +      }
 +      if (isMmTable) {
 +        txnId = SessionState.get().getTxnMgr().getCurrentTxnId();
++      } else {
++        txnId = (acidOp == Operation.NOT_ACID) ? null :
+           SessionState.get().getTxnMgr().getCurrentTxnId();
 -      ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), 
acidOp,
 -          currentTransactionId);
 +      }
 +      ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), 
acidOp, txnId);
        // For Acid table, Insert Overwrite shouldn't replace the table 
content. We keep the old
        // deltas and base and leave them up to the cleaner to clean up
        
ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
@@@ -7015,9 -7150,8 +7024,9 @@@
        }
  
        boolean isDfsDir = (dest_type.intValue() == QBMetaData.DEST_DFS_FILE);
 +      // Create LFD even for MM CTAS - it's a no-op move, but it still seems 
to be used for stats.
        loadFileWork.add(new LoadFileDesc(tblDesc, viewDesc, queryTmpdir, 
dest_path, isDfsDir, cols,
-         colTypes, destTableIsAcid ? Operation.INSERT : Operation.NOT_ACID, 
isMmCtas));
 -          colTypes, destTableIsAcid ? Operation.INSERT : Operation.NOT_ACID));
++          colTypes, destTableIsAcid ? Operation.INSERT : Operation.NOT_ACID, 
isMmCtas));
        if (tblDesc == null) {
          if (viewDesc != null) {
            table_desc = PlanUtils.getTableDesc(viewDesc, cols, colTypes);

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index 356ab6f,f0089fc..752a934
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@@ -18,17 -18,8 +18,19 @@@
  
  package org.apache.hadoop.hive.ql.parse;
  
 +import java.io.Serializable;
 +import java.util.ArrayList;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.LinkedHashSet;
 +import java.util.List;
 +import java.util.Set;
 +
 +import org.apache.hadoop.hive.ql.io.AcidUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
+ import com.google.common.collect.Interner;
+ import com.google.common.collect.Interners;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.common.HiveStatsUtils;
  import org.apache.hadoop.hive.conf.HiveConf;
@@@ -228,17 -227,47 +239,19 @@@ public abstract class TaskCompiler 
          }
        }
  
 -      boolean oneLoadFile = true;
 +      boolean oneLoadFileForCtas = true;
        for (LoadFileDesc lfd : loadFileWork) {
          if (pCtx.getQueryProperties().isCTAS() || 
pCtx.getQueryProperties().isMaterializedView()) {
 -          assert (oneLoadFile); // should not have more than 1 load file for
 -          // CTAS
 -          // make the movetask's destination directory the table's 
destination.
 -          Path location;
 -          String loc = pCtx.getQueryProperties().isCTAS() ?
 -                  pCtx.getCreateTable().getLocation() : 
pCtx.getCreateViewDesc().getLocation();
 -          if (loc == null) {
 -            // get the default location
 -            Path targetPath;
 -            try {
 -              String protoName = null;
 -              if (pCtx.getQueryProperties().isCTAS()) {
 -                protoName = pCtx.getCreateTable().getTableName();
 -              } else if (pCtx.getQueryProperties().isMaterializedView()) {
 -                protoName = pCtx.getCreateViewDesc().getViewName();
 -              }
 -              String[] names = Utilities.getDbTableName(protoName);
 -              if (!db.databaseExists(names[0])) {
 -                throw new SemanticException("ERROR: The database " + names[0]
 -                    + " does not exist.");
 -              }
 -              Warehouse wh = new Warehouse(conf);
 -              targetPath = wh.getDefaultTablePath(db.getDatabase(names[0]), 
names[1]);
 -            } catch (HiveException | MetaException e) {
 -              throw new SemanticException(e);
 -            }
 -
 -            location = targetPath;
 -          } else {
 -            location = new Path(loc);
 +          if (!oneLoadFileForCtas) { // should not have more than 1 load file 
for CTAS.
 +            throw new SemanticException(
 +                "One query is not expected to contain multiple CTAS loads 
statements");
            }
 -          lfd.setTargetDir(location);
 -
 -          oneLoadFile = false;
 +          setLoadFileLocation(pCtx, lfd);
 +          oneLoadFileForCtas = false;
          }
-         mvTask.add(TaskFactory.get(new MoveWork(null, null, null, lfd, 
false), conf));
+         mvTask.add(TaskFactory
+             .get(new MoveWork(null, null, null, lfd, false, 
SessionState.get().getLineageState()),
+                 conf));
        }
      }
  

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java
index 45d4fb0,023d247..9477df6
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java
@@@ -35,8 -35,8 +35,9 @@@ public class LoadDesc implements Serial
     * Need to remember whether this is an acid compliant operation, and if so 
whether it is an
     * insert, update, or delete.
     */
-   private final AcidUtils.Operation writeType;
+   final AcidUtils.Operation writeType;
+ 
 +
    public LoadDesc(final Path sourcePath, AcidUtils.Operation writeType) {
      this.sourcePath = sourcePath;
      this.writeType = writeType;

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
index 0032648,0292af5..d90acce
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
@@@ -51,8 -48,8 +51,8 @@@ public class LoadFileDesc extends LoadD
  
    public LoadFileDesc(final CreateTableDesc createTableDesc, final 
CreateViewDesc  createViewDesc,
                        final Path sourcePath, final Path targetDir, final 
boolean isDfsDir,
 -                      final String columns, final String columnTypes, 
AcidUtils.Operation writeType) {
 -    this(sourcePath, targetDir, isDfsDir, columns, columnTypes, writeType);
 +                      final String columns, final String columnTypes, 
AcidUtils.Operation writeType, boolean isMmCtas) {
-    this(sourcePath, targetDir, isDfsDir, columns, columnTypes, writeType, 
isMmCtas);
++    this(sourcePath, targetDir, isDfsDir, columns, columnTypes, writeType, 
isMmCtas);
      if (createTableDesc != null && createTableDesc.getDatabaseName() != null
          && createTableDesc.getTableName() != null) {
        destinationCreateTable = (createTableDesc.getTableName().contains(".") 
? "" : createTableDesc

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
index e893ab5,90a970c..85d1324
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
@@@ -18,13 -18,7 +18,9 @@@
  
  package org.apache.hadoop.hive.ql.plan;
  
- import java.io.Serializable;
- import java.util.LinkedHashMap;
- import java.util.Map;
- 
  import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 +import org.apache.hadoop.hive.ql.exec.Utilities;
  import org.apache.hadoop.hive.ql.io.AcidUtils;
  import org.apache.hadoop.hive.ql.plan.Explain.Level;
  
@@@ -39,8 -37,10 +39,8 @@@ public class LoadTableDesc extends Load
    private ListBucketingCtx lbCtx;
    private boolean inheritTableSpecs = true; //For partitions, flag 
controlling whether the current
                                              //table specs are to be used
-   private Long txnId;
 -  /*
 -  if the writeType above is NOT_ACID then the currentTransactionId will be 
null
 -   */
 -  private final Long currentTransactionId;
 +  private int stmtId;
++  private Long currentTransactionId;
  
    // TODO: the below seems like they should just be combined into 
partitionDesc
    private org.apache.hadoop.hive.ql.plan.TableDesc table;
@@@ -59,15 -59,13 +60,14 @@@
    }
  
    public LoadTableDesc(final Path sourcePath,
-       final org.apache.hadoop.hive.ql.plan.TableDesc table,
+       final TableDesc table,
        final Map<String, String> partitionSpec,
        final boolean replace,
-       final AcidUtils.Operation writeType,
-       Long txnId) {
+       final AcidUtils.Operation writeType, Long currentTransactionId) {
      super(sourcePath, writeType);
 -    this.currentTransactionId = currentTransactionId;
 -    init(table, partitionSpec, replace);
 +    Utilities.LOG14535.info("creating part LTD from " + sourcePath + " to "
 +        + ((table.getProperties() == null) ? "null" : table.getTableName()));
-     init(table, partitionSpec, replace, txnId);
++    init(table, partitionSpec, replace, currentTransactionId);
    }
  
    /**
@@@ -105,17 -103,16 +105,17 @@@
    }
  
    public LoadTableDesc(final Path sourcePath,
-       final org.apache.hadoop.hive.ql.plan.TableDesc table,
+       final TableDesc table,
        final DynamicPartitionCtx dpCtx,
 -      final AcidUtils.Operation writeType, Long currentTransactionId) {
 +      final AcidUtils.Operation writeType,
 +      boolean isReplace, Long txnId) {
      super(sourcePath, writeType);
 +    Utilities.LOG14535.info("creating LTD from " + sourcePath + " to " + 
table.getTableName()/*, new Exception()*/);
      this.dpCtx = dpCtx;
 -    this.currentTransactionId = currentTransactionId;
      if (dpCtx != null && dpCtx.getPartSpec() != null && partitionSpec == 
null) {
 -      init(table, dpCtx.getPartSpec(), true);
 +      init(table, dpCtx.getPartSpec(), isReplace, txnId);
      } else {
-       init(table, new LinkedHashMap<String, String>(), isReplace, txnId);
 -      init(table, new LinkedHashMap<>(), true);
++      init(table, new LinkedHashMap<>(), isReplace, txnId);
      }
    }
  
@@@ -127,7 -123,6 +127,7 @@@
      this.table = table;
      this.partitionSpec = partitionSpec;
      this.replace = replace;
-     this.txnId = txnId;
++    this.currentTransactionId = txnId;
    }
  
    @Explain(displayName = "table", explainLevels = { Level.USER, 
Level.DEFAULT, Level.EXTENDED })
@@@ -196,27 -182,7 +196,27 @@@
      this.lbCtx = lbCtx;
    }
  
-   public Long getTxnId() {
-     return txnId;
 -  public long getCurrentTransactionId() {
 -    return writeType == AcidUtils.Operation.NOT_ACID ? 0L : 
currentTransactionId;
++  public long getTxnId() {
++    return currentTransactionId == null ? 0 : currentTransactionId;
 +  }
 +
 +  public void setTxnId(Long txnId) {
-     this.txnId = txnId;
++    this.currentTransactionId = txnId;
 +  }
 +
 +  public int getStmtId() {
 +    return stmtId;
 +  }
 +
 +  public void setStmtId(int stmtId) {
 +    this.stmtId = stmtId;
 +  }
 +
 +  public void setIntermediateInMmWrite(boolean b) {
 +    this.commitMmWriteId = !b;
 +  }
 +
 +  public boolean isCommitMmWrite() {
 +    return commitMmWriteId;
    }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
index 8594edf,00c0ce3..f6303ba
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
@@@ -56,22 -61,22 +62,26 @@@ public class MoveWork implements Serial
     * List of inserted partitions
     */
    protected List<Partition> movedParts;
 +  private boolean isNoop;
  
    public MoveWork() {
+     sessionStateLineageState = null;
    }
  
-   private MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs) {
++
+   private MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs,
+       LineageState lineageState) {
      this.inputs = inputs;
      this.outputs = outputs;
+     sessionStateLineageState = lineageState;
    }
  
    public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs,
        final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork,
-       boolean checkFileFormat, boolean srcLocal) {
-     this(inputs, outputs);
+       boolean checkFileFormat, boolean srcLocal, LineageState lineageState) {
+     this(inputs, outputs, lineageState);
 +    Utilities.LOG14535.info("Creating MoveWork " + 
System.identityHashCode(this)
 +        + " with " + loadTableWork + "; " + loadFileWork);
      this.loadTableWork = loadTableWork;
      this.loadFileWork = loadFileWork;
      this.checkFileFormat = checkFileFormat;
@@@ -80,8 -85,11 +90,8 @@@
  
    public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs,
        final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork,
-       boolean checkFileFormat) {
-     this(inputs, outputs, loadTableWork, loadFileWork, checkFileFormat, 
false);
+       boolean checkFileFormat, LineageState lineageState) {
 -    this(inputs, outputs, lineageState);
 -    this.loadTableWork = loadTableWork;
 -    this.loadFileWork = loadFileWork;
 -    this.checkFileFormat = checkFileFormat;
++    this(inputs, outputs, loadTableWork, loadFileWork, checkFileFormat, 
false, lineageState);
    }
  
    public MoveWork(final MoveWork o) {
@@@ -153,11 -162,7 +164,15 @@@
      this.srcLocal = srcLocal;
    }
  
 +  public void setNoop(boolean b) {
 +    this.isNoop = true;
 +  }
 +
 +  public boolean isNoop() {
 +    return this.isNoop;
 +  }
++  
+   public LineageState getLineagState() {
+     return sessionStateLineageState;
+   }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/test/results/clientpositive/tez/explainuser_3.q.out
----------------------------------------------------------------------
diff --cc ql/src/test/results/clientpositive/tez/explainuser_3.q.out
index 9f764b8,a331bf7..7e89478
--- a/ql/src/test/results/clientpositive/tez/explainuser_3.q.out
+++ b/ql/src/test/results/clientpositive/tez/explainuser_3.q.out
@@@ -509,13 -509,13 +509,13 @@@ Stage-
                    Conditional Operator
                      Stage-1
                        Map 1 vectorized
 -                      File Output Operator [FS_10]
 +                      File Output Operator [FS_8]
                          table:{"name:":"default.orc_merge5"}
-                         Select Operator [SEL_9] (rows=306 width=335)
+                         Select Operator [SEL_9] (rows=1 width=352)
                            Output:["_col0","_col1","_col2","_col3","_col4"]
-                           Filter Operator [FIL_8] (rows=306 width=335)
+                           Filter Operator [FIL_8] (rows=1 width=352)
                              predicate:(userid <= 13)
-                             TableScan [TS_0] (rows=919 width=335)
+                             TableScan [TS_0] (rows=1 width=352)
                                
default@orc_merge5,orc_merge5,Tbl:COMPLETE,Col:NONE,Output:["userid","string1","subtype","decimal1","ts"]
              Stage-4(CONDITIONAL)
                File Merge

Reply via email to