http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index b000745,c8ad795..ed527b8 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@@ -392,12 -435,110 +390,12 @@@ public class MoveTask extends Task<Move // deal with dynamic partitions DynamicPartitionCtx dpCtx = tbd.getDPCtx(); if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions - - List<LinkedHashMap<String, String>> dps = Utilities.getFullDPSpecs(conf, dpCtx); - - console.printInfo(System.getProperty("line.separator")); - long startTime = System.currentTimeMillis(); - // load the list of DP partitions and return the list of partition specs - // TODO: In a follow-up to HIVE-1361, we should refactor loadDynamicPartitions - // to use Utilities.getFullDPSpecs() to get the list of full partSpecs. - // After that check the number of DPs created to not exceed the limit and - // iterate over it and call loadPartition() here. - // The reason we don't do inside HIVE-1361 is the latter is large and we - // want to isolate any potential issue it may introduce. - Map<Map<String, String>, Partition> dp = - db.loadDynamicPartitions( - tbd.getSourcePath(), - tbd.getTable().getTableName(), - tbd.getPartitionSpec(), - tbd.getReplace(), - dpCtx.getNumDPCols(), - isSkewedStoredAsDirs(tbd), - work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, - work.getLoadTableWork().getCurrentTransactionId(), hasFollowingStatsTask(), - work.getLoadTableWork().getWriteType()); - - // publish DP columns to its subscribers - if (dps != null && dps.size() > 0) { - pushFeed(FeedType.DYNAMIC_PARTITIONS, dp.values()); - } - - String loadTime = "\t Time taken to load dynamic partitions: " + - (System.currentTimeMillis() - startTime)/1000.0 + " seconds"; - console.printInfo(loadTime); - LOG.info(loadTime); - - if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) { - throw new HiveException("This query creates no partitions." + - " To turn off this error, set hive.error.on.empty.partition=false."); - } - - startTime = System.currentTimeMillis(); - // for each partition spec, get the partition - // and put it to WriteEntity for post-exec hook - for(Map.Entry<Map<String, String>, Partition> entry : dp.entrySet()) { - Partition partn = entry.getValue(); - - if (bucketCols != null || sortCols != null) { - updatePartitionBucketSortColumns( - db, table, partn, bucketCols, numBuckets, sortCols); - } - - WriteEntity enty = new WriteEntity(partn, - getWriteType(tbd, work.getLoadTableWork().getWriteType())); - if (work.getOutputs() != null) { - DDLTask.addIfAbsentByName(enty, work.getOutputs()); - } - // Need to update the queryPlan's output as well so that post-exec hook get executed. - // This is only needed for dynamic partitioning since for SP the the WriteEntity is - // constructed at compile time and the queryPlan already contains that. - // For DP, WriteEntity creation is deferred at this stage so we need to update - // queryPlan here. - if (queryPlan.getOutputs() == null) { - queryPlan.setOutputs(new LinkedHashSet<WriteEntity>()); - } - queryPlan.getOutputs().add(enty); - - // update columnar lineage for each partition - dc = new DataContainer(table.getTTable(), partn.getTPartition()); - - // Don't set lineage on delete as we don't have all the columns - if (work.getLineagState() != null && - work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE && - work.getLoadTableWork().getWriteType() != AcidUtils.Operation.UPDATE) { - work.getLineagState().setLineage(tbd.getSourcePath(), dc, - table.getCols()); - } - LOG.info("\tLoading partition " + entry.getKey()); - } - console.printInfo("\t Time taken for adding to write entity : " + - (System.currentTimeMillis() - startTime)/1000.0 + " seconds"); - dc = null; // reset data container to prevent it being added again. + dc = handleDynParts(db, table, tbd, ti, dpCtx); } else { // static partitions - List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(), - tbd.getPartitionSpec()); - db.validatePartitionNameCharacters(partVals); - db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(), - tbd.getPartitionSpec(), tbd.getReplace(), - tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(), - work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID, hasFollowingStatsTask()); - Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false); - - if (bucketCols != null || sortCols != null) { - updatePartitionBucketSortColumns(db, table, partn, bucketCols, - numBuckets, sortCols); - } - - dc = new DataContainer(table.getTTable(), partn.getTPartition()); - // add this partition to post-execution hook - if (work.getOutputs() != null) { - DDLTask.addIfAbsentByName(new WriteEntity(partn, - getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs()); - } - } + dc = handleStaticParts(db, table, tbd, ti); + } } - if (SessionState.get() != null && dc != null) { + if (work.getLineagState() != null && dc != null) { // If we are doing an update or a delete the number of columns in the table will not // match the number of columns in the file sink. For update there will be one too many // (because of the ROW__ID), and in the case of the delete there will be just the @@@ -445,231 -586,6 +443,231 @@@ return (1); } } + + private DataContainer handleStaticParts(Hive db, Table table, LoadTableDesc tbd, + TaskInformation ti) throws HiveException, IOException, InvalidOperationException { + List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(), tbd.getPartitionSpec()); + db.validatePartitionNameCharacters(partVals); + Utilities.LOG14535.info("loadPartition called from " + tbd.getSourcePath() + + " into " + tbd.getTable().getTableName()); + db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(), + tbd.getPartitionSpec(), tbd.getReplace(), + tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(), + work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID && + !tbd.isMmTable(), + hasFollowingStatsTask(), tbd.getTxnId(), tbd.getStmtId()); + Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false); + + // See the comment inside updatePartitionBucketSortColumns. + if (!tbd.isMmTable() && (ti.bucketCols != null || ti.sortCols != null)) { + updatePartitionBucketSortColumns(db, table, partn, ti.bucketCols, + ti.numBuckets, ti.sortCols); + } + + DataContainer dc = new DataContainer(table.getTTable(), partn.getTPartition()); + // add this partition to post-execution hook + if (work.getOutputs() != null) { + DDLTask.addIfAbsentByName(new WriteEntity(partn, + getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs()); + } + return dc; + } + + private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd, + TaskInformation ti, DynamicPartitionCtx dpCtx) throws HiveException, + IOException, InvalidOperationException { + DataContainer dc; + List<LinkedHashMap<String, String>> dps = Utilities.getFullDPSpecs(conf, dpCtx); + + console.printInfo(System.getProperty("line.separator")); + long startTime = System.currentTimeMillis(); + // load the list of DP partitions and return the list of partition specs + // TODO: In a follow-up to HIVE-1361, we should refactor loadDynamicPartitions + // to use Utilities.getFullDPSpecs() to get the list of full partSpecs. + // After that check the number of DPs created to not exceed the limit and + // iterate over it and call loadPartition() here. + // The reason we don't do inside HIVE-1361 is the latter is large and we + // want to isolate any potential issue it may introduce. + if (tbd.isMmTable() && !tbd.isCommitMmWrite()) { + throw new HiveException("Only single-partition LoadTableDesc can skip commiting write ID"); + } + Map<Map<String, String>, Partition> dp = + db.loadDynamicPartitions( + tbd.getSourcePath(), + tbd.getTable().getTableName(), + tbd.getPartitionSpec(), + tbd.getReplace(), + dpCtx.getNumDPCols(), + (tbd.getLbCtx() == null) ? 0 : tbd.getLbCtx().calculateListBucketingLevel(), + work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID && + !tbd.isMmTable(), - SessionState.get().getTxnMgr().getCurrentTxnId(), tbd.getStmtId(), hasFollowingStatsTask(), ++ work.getLoadTableWork().getTxnId(), tbd.getStmtId(), hasFollowingStatsTask(), + work.getLoadTableWork().getWriteType()); + + // publish DP columns to its subscribers + if (dps != null && dps.size() > 0) { + pushFeed(FeedType.DYNAMIC_PARTITIONS, dp.values()); + } + + String loadTime = "\t Time taken to load dynamic partitions: " + + (System.currentTimeMillis() - startTime)/1000.0 + " seconds"; + console.printInfo(loadTime); + LOG.info(loadTime); + + if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) { + throw new HiveException("This query creates no partitions." + + " To turn off this error, set hive.error.on.empty.partition=false."); + } + + startTime = System.currentTimeMillis(); + // for each partition spec, get the partition + // and put it to WriteEntity for post-exec hook + for(Map.Entry<Map<String, String>, Partition> entry : dp.entrySet()) { + Partition partn = entry.getValue(); + + // See the comment inside updatePartitionBucketSortColumns. + if (!tbd.isMmTable() && (ti.bucketCols != null || ti.sortCols != null)) { + updatePartitionBucketSortColumns( + db, table, partn, ti.bucketCols, ti.numBuckets, ti.sortCols); + } + + WriteEntity enty = new WriteEntity(partn, + getWriteType(tbd, work.getLoadTableWork().getWriteType())); + if (work.getOutputs() != null) { + DDLTask.addIfAbsentByName(enty, work.getOutputs()); + } + // Need to update the queryPlan's output as well so that post-exec hook get executed. + // This is only needed for dynamic partitioning since for SP the the WriteEntity is + // constructed at compile time and the queryPlan already contains that. + // For DP, WriteEntity creation is deferred at this stage so we need to update + // queryPlan here. + if (queryPlan.getOutputs() == null) { + queryPlan.setOutputs(new LinkedHashSet<WriteEntity>()); + } + queryPlan.getOutputs().add(enty); + + // update columnar lineage for each partition + dc = new DataContainer(table.getTTable(), partn.getTPartition()); + + // Don't set lineage on delete as we don't have all the columns - if (SessionState.get() != null && ++ if (work.getLineagState() != null && + work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE && + work.getLoadTableWork().getWriteType() != AcidUtils.Operation.UPDATE) { - SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc, ++ work.getLineagState().setLineage(tbd.getSourcePath(), dc, + table.getCols()); + } + LOG.info("\tLoading partition " + entry.getKey()); + } + console.printInfo("\t Time taken for adding to write entity : " + + (System.currentTimeMillis() - startTime)/1000.0 + " seconds"); + dc = null; // reset data container to prevent it being added again. + return dc; + } + + private void inferTaskInformation(TaskInformation ti) { + // Find the first ancestor of this MoveTask which is some form of map reduce task + // (Either standard, local, or a merge) + while (ti.task.getParentTasks() != null && ti.task.getParentTasks().size() == 1) { + ti.task = (Task)ti.task.getParentTasks().get(0); + // If it was a merge task or a local map reduce task, nothing can be inferred + if (ti.task instanceof MergeFileTask || ti.task instanceof MapredLocalTask) { + break; + } + + // If it's a standard map reduce task, check what, if anything, it inferred about + // the directory this move task is moving + if (ti.task instanceof MapRedTask) { + MapredWork work = (MapredWork)ti.task.getWork(); + MapWork mapWork = work.getMapWork(); + ti.bucketCols = mapWork.getBucketedColsByDirectory().get(ti.path); + ti.sortCols = mapWork.getSortedColsByDirectory().get(ti.path); + if (work.getReduceWork() != null) { + ti.numBuckets = work.getReduceWork().getNumReduceTasks(); + } + + if (ti.bucketCols != null || ti.sortCols != null) { + // This must be a final map reduce task (the task containing the file sink + // operator that writes the final output) + assert work.isFinalMapRed(); + } + break; + } + + // If it's a move task, get the path the files were moved from, this is what any + // preceding map reduce task inferred information about, and moving does not invalidate + // those assumptions + // This can happen when a conditional merge is added before the final MoveTask, but the + // condition for merging is not met, see GenMRFileSink1. + if (ti.task instanceof MoveTask) { + MoveTask mt = (MoveTask)ti.task; + if (mt.getWork().getLoadFileWork() != null) { + ti.path = mt.getWork().getLoadFileWork().getSourcePath().toUri().toString(); + } + } + } + } + + private void checkFileFormats(Hive db, LoadTableDesc tbd, Table table) + throws HiveException { + if (work.getCheckFileFormat()) { + // Get all files from the src directory + FileStatus[] dirs; + ArrayList<FileStatus> files; + FileSystem srcFs; // source filesystem + try { + srcFs = tbd.getSourcePath().getFileSystem(conf); + dirs = srcFs.globStatus(tbd.getSourcePath()); + files = new ArrayList<FileStatus>(); + for (int i = 0; (dirs != null && i < dirs.length); i++) { + files.addAll(Arrays.asList(srcFs.listStatus(dirs[i].getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER))); + // We only check one file, so exit the loop when we have at least + // one. + if (files.size() > 0) { + break; + } + } + } catch (IOException e) { + throw new HiveException( + "addFiles: filesystem error in check phase", e); + } + + // handle file format check for table level + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECHECKFILEFORMAT)) { + boolean flag = true; + // work.checkFileFormat is set to true only for Load Task, so assumption here is + // dynamic partition context is null + if (tbd.getDPCtx() == null) { + if (tbd.getPartitionSpec() == null || tbd.getPartitionSpec().isEmpty()) { + // Check if the file format of the file matches that of the table. + flag = HiveFileFormatUtils.checkInputFormat( + srcFs, conf, tbd.getTable().getInputFileFormatClass(), files); + } else { + // Check if the file format of the file matches that of the partition + Partition oldPart = db.getPartition(table, tbd.getPartitionSpec(), false); + if (oldPart == null) { + // this means we have just created a table and are specifying partition in the + // load statement (without pre-creating the partition), in which case lets use + // table input format class. inheritTableSpecs defaults to true so when a new + // partition is created later it will automatically inherit input format + // from table object + flag = HiveFileFormatUtils.checkInputFormat( + srcFs, conf, tbd.getTable().getInputFileFormatClass(), files); + } else { + flag = HiveFileFormatUtils.checkInputFormat( + srcFs, conf, oldPart.getInputFormatClass(), files); + } + } + if (!flag) { + throw new HiveException(ErrorMsg.WRONG_FILE_FORMAT); + } + } else { + LOG.warn("Skipping file format check as dpCtx is not null"); + } + } + } + } + + /** * so to make sure we crate WriteEntity with the right WriteType. This is (at this point) only * for consistency since LockManager (which is the only thing that pays attention to WriteType)
http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 05698d1,5c6ef9f..51c4090 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@@ -240,10 -238,10 +240,11 @@@ public class LoadPartitions Path tmpPath) { LoadTableDesc loadTableWork = new LoadTableDesc( tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(), - event.replicationSpec().isReplace()); + event.replicationSpec().isReplace(), SessionState.get().getTxnMgr().getCurrentTxnId() + ); loadTableWork.setInheritTableSpecs(false); - MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false); + MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false, + context.sessionStateLineageState); return TaskFactory.get(work, context.hiveConf); } http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java index afc04a3,a9a9162..65a3a59 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java @@@ -225,11 -224,10 +225,12 @@@ public class LoadTable ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf); LoadTableDesc loadTableWork = new LoadTableDesc( - tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), replicationSpec.isReplace()); + tmpPath, Utilities.getTableDesc(table), new TreeMap<>(), replicationSpec.isReplace(), + SessionState.get().getTxnMgr().getCurrentTxnId() + ); MoveWork moveWork = - new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false); + new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false, + context.sessionStateLineageState); Task<?> loadTableTask = TaskFactory.get(moveWork, context.hiveConf); copyTask.addDependentTask(loadTableTask); return copyTask; http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 6dfa607,25cca7b..6a56407 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@@ -1364,29 -1353,9 +1364,31 @@@ public final class GenMapRedUtils cplan.setInputformat("org.apache.hadoop.hive.ql.io.CombineHiveInputFormat"); // NOTE: we should gather stats in MR1 rather than MR2 at merge job since we don't // know if merge MR2 will be triggered at execution time - Task<MoveWork> mvTask = GenMapRedUtils.findMoveTask(mvTasks, fsOutput); + MoveWork dummyMv = null; + if (srcMmWriteId == null) { + // Only create the movework for non-MM table. No action needed for a MM table. + Utilities.LOG14535.info("creating dummy movetask for merge (with lfd)"); + dummyMv = new MoveWork(null, null, null, - new LoadFileDesc(inputDirName, finalName, true, null, null, false), false); ++ new LoadFileDesc(inputDirName, finalName, true, null, null, false), false, ++ SessionState.get().getLineageState()); + } else { + // TODO# create the noop MoveWork to avoid q file changes for now. Should be removed w/the flag just before merge + dummyMv = new MoveWork(null, null, null, - new LoadFileDesc(inputDirName, finalName, true, null, null, false), false); ++ new LoadFileDesc(inputDirName, finalName, true, null, null, false), false, ++ SessionState.get().getLineageState()); + dummyMv.setNoop(true); + } + // Use the original fsOp path here in case of MM - while the new FSOP merges files inside the + // MM directory, the original MoveTask still commits based on the parent. Note that this path + // can only be triggered for a merge that's part of insert for now; MM tables do not support + // concatenate. Keeping the old logic for non-MM tables with temp directories and stuff. + Path fsopPath = srcMmWriteId != null ? fsInputDesc.getFinalDirName() : finalName; + Utilities.LOG14535.info("Looking for MoveTask to make it dependant on the conditional tasks"); + + Task<MoveWork> mvTask = GenMapRedUtils.findMoveTaskForFsopOutput( + mvTasks, fsopPath, fsInputDesc.isMmTable()); ConditionalTask cndTsk = GenMapRedUtils.createCondTask(conf, currTask, dummyMv, work, - fsInputDesc.getFinalDirName(), finalName, mvTask, dependencyTask); + fsInputDesc.getMergeInputDirName(), finalName, mvTask, dependencyTask); // keep the dynamic partition context in conditional task resolver context ConditionalResolverMergeFilesCtx mrCtx = http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index fdeeacf,e70a72c..36bb89f --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@@ -1090,10 -1088,11 +1095,11 @@@ public class DDLSemanticAnalyzer extend Path queryTmpdir = ctx.getExternalTmpPath(newTblPartLoc); truncateTblDesc.setOutputDir(queryTmpdir); LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc, - partSpec == null ? new HashMap<String, String>() : partSpec, null); - partSpec == null ? new HashMap<>() : partSpec); ++ partSpec == null ? new HashMap<>() : partSpec, null); ltd.setLbCtx(lbCtx); - Task<MoveWork> moveTsk = TaskFactory - .get(new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState()), - conf); + @SuppressWarnings("unchecked") - Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf); ++ Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork( ++ null, null, ltd, null, false, SessionState.get().getLineageState()), conf); truncateTask.addDependentTask(moveTsk); // Recalculate the HDFS stats if auto gather stats is set @@@ -1736,11 -1722,12 +1742,12 @@@ TableDesc tblDesc = Utilities.getTableDesc(tblObj); Path queryTmpdir = ctx.getExternalTmpPath(newTblPartLoc); mergeDesc.setOutputDir(queryTmpdir); + // No need to handle MM tables - unsupported path. LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc, - partSpec == null ? new HashMap<>() : partSpec); + partSpec == null ? new HashMap<>() : partSpec, null); ltd.setLbCtx(lbCtx); - Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf); - Task<MoveWork> moveTsk = TaskFactory - .get(new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState()), - conf); ++ Task<MoveWork> moveTsk = TaskFactory.get( ++ new MoveWork(null, null, ltd, null, false, SessionState.get().getLineageState()), conf); mergeTask.addDependentTask(moveTsk); if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) { http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 36c0c98,751bda0..b9f28eb --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@@ -360,36 -343,17 +360,36 @@@ public class ImportSemanticAnalyzer ext return tblDesc; } + private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath, - ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x) { + ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x, + Long txnId, int stmtId, boolean isSourceMm) { Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME); - Path tmpPath = x.getCtx().getExternalTmpPath(tgtPath); - Task<?> copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, x.getConf()); - LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath, - Utilities.getTableDesc(table), new TreeMap<>(), - replace); - Task<?> loadTableTask = TaskFactory.get(new MoveWork(x.getInputs(), - x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState()), - x.getConf()); + Path destPath = !MetaStoreUtils.isInsertOnlyTable(table.getParameters()) ? x.getCtx().getExternalTmpPath(tgtPath) + : new Path(tgtPath, AcidUtils.deltaSubdir(txnId, txnId, stmtId)); + Utilities.LOG14535.info("adding import work for table with source location: " + + dataPath + "; table: " + tgtPath + "; copy destination " + destPath + "; mm " + + txnId + " (src " + isSourceMm + ") for " + (table == null ? "a new table" : table.getTableName())); + + Task<?> copyTask = null; + if (replicationSpec.isInReplicationScope()) { + if (isSourceMm || isAcid(txnId)) { + // TODO: ReplCopyTask is completely screwed. Need to support when it's not as screwed. + throw new RuntimeException("Replicating MM and ACID tables is not supported"); + } + copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf()); + } else { + CopyWork cw = new CopyWork(dataPath, destPath, false); + cw.setSkipSourceMmDirs(isSourceMm); + copyTask = TaskFactory.get(cw, x.getConf()); + } + + LoadTableDesc loadTableWork = new LoadTableDesc(destPath, - Utilities.getTableDesc(table), new TreeMap<String, String>(), replace, txnId); ++ Utilities.getTableDesc(table), new TreeMap<>(), replace, txnId); + loadTableWork.setTxnId(txnId); + loadTableWork.setStmtId(stmtId); - MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false); ++ MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState()); + Task<?> loadTableTask = TaskFactory.get(mv, x.getConf()); copyTask.addDependentTask(loadTableTask); x.getTasks().add(copyTask); return loadTableTask; @@@ -450,40 -413,19 +455,40 @@@ + partSpecToString(partSpec.getPartSpec()) + " with source location: " + srcLocation); Path tgtLocation = new Path(partSpec.getLocation()); - Path tmpPath = x.getCtx().getExternalTmpPath(tgtLocation); - Task<?> copyTask = ReplCopyTask.getLoadCopyTask( - replicationSpec, new Path(srcLocation), tmpPath, x.getConf()); + Path destPath = !MetaStoreUtils.isInsertOnlyTable(table.getParameters()) ? x.getCtx().getExternalTmpPath(tgtLocation) + : new Path(tgtLocation, AcidUtils.deltaSubdir(txnId, txnId, stmtId)); + Path moveTaskSrc = !MetaStoreUtils.isInsertOnlyTable(table.getParameters()) ? destPath : tgtLocation; + Utilities.LOG14535.info("adding import work for partition with source location: " + + srcLocation + "; target: " + tgtLocation + "; copy dest " + destPath + "; mm " + + txnId + " (src " + isSourceMm + ") for " + partSpecToString(partSpec.getPartSpec())); + + + Task<?> copyTask = null; + if (replicationSpec.isInReplicationScope()) { + if (isSourceMm || isAcid(txnId)) { + // TODO: ReplCopyTask is completely screwed. Need to support when it's not as screwed. + throw new RuntimeException("Replicating MM and ACID tables is not supported"); + } + copyTask = ReplCopyTask.getLoadCopyTask( + replicationSpec, new Path(srcLocation), destPath, x.getConf()); + } else { + CopyWork cw = new CopyWork(new Path(srcLocation), destPath, false); + cw.setSkipSourceMmDirs(isSourceMm); + copyTask = TaskFactory.get(cw, x.getConf()); + } + Task<?> addPartTask = TaskFactory.get(new DDLWork(x.getInputs(), x.getOutputs(), addPartitionDesc), x.getConf()); - LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath, - Utilities.getTableDesc(table), - partSpec.getPartSpec(), replicationSpec.isReplace()); + LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table), + partSpec.getPartSpec(), replicationSpec.isReplace(), txnId); + loadTableWork.setTxnId(txnId); + loadTableWork.setStmtId(stmtId); loadTableWork.setInheritTableSpecs(false); + // Do not commit the write ID from each task; need to commit once. + // TODO: we should just change the import to use a single MoveTask, like dynparts. + loadTableWork.setIntermediateInMmWrite(isAcid(txnId)); Task<?> loadPartTask = TaskFactory.get(new MoveWork( - x.getInputs(), x.getOutputs(), loadTableWork, null, false), x.getConf()); - x.getInputs(), x.getOutputs(), loadTableWork, null, false, - SessionState.get().getLineageState()), - x.getConf()); ++ x.getInputs(), x.getOutputs(), loadTableWork, null, false, SessionState.get().getLineageState()), x.getConf()); copyTask.addDependentTask(loadPartTask); addPartTask.addDependentTask(loadPartTask); x.getTasks().add(copyTask); http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 3475c7c,4814fcd..df50aab --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@@ -6858,15 -6902,10 +6861,18 @@@ public class SemanticAnalyzer extends B acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest); checkAcidConstraints(qb, table_desc, dest_tab); } - Long currentTransactionId = acidOp == Operation.NOT_ACID ? null : + if (MetaStoreUtils.isInsertOnlyTable(table_desc.getProperties())) { + acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest); + } + if (isMmTable) { + txnId = SessionState.get().getTxnMgr().getCurrentTxnId(); ++ } else { ++ txnId = acidOp == Operation.NOT_ACID ? null : + SessionState.get().getTxnMgr().getCurrentTxnId(); - ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, - currentTransactionId); + } + boolean isReplace = !qb.getParseInfo().isInsertIntoTable( + dest_tab.getDbName(), dest_tab.getTableName()); + ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, isReplace, txnId); // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old // deltas and base and leave them up to the cleaner to clean up ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), @@@ -6926,13 -7020,10 +6932,16 @@@ acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest); checkAcidConstraints(qb, table_desc, dest_tab); } - Long currentTransactionId = (acidOp == Operation.NOT_ACID) ? null : + if (MetaStoreUtils.isInsertOnlyTable(dest_part.getTable().getParameters())) { + acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest); + } + if (isMmTable) { + txnId = SessionState.get().getTxnMgr().getCurrentTxnId(); ++ } else { ++ txnId = (acidOp == Operation.NOT_ACID) ? null : + SessionState.get().getTxnMgr().getCurrentTxnId(); - ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp, - currentTransactionId); + } + ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp, txnId); // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old // deltas and base and leave them up to the cleaner to clean up ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(), @@@ -7015,9 -7150,8 +7024,9 @@@ } boolean isDfsDir = (dest_type.intValue() == QBMetaData.DEST_DFS_FILE); + // Create LFD even for MM CTAS - it's a no-op move, but it still seems to be used for stats. loadFileWork.add(new LoadFileDesc(tblDesc, viewDesc, queryTmpdir, dest_path, isDfsDir, cols, - colTypes, destTableIsAcid ? Operation.INSERT : Operation.NOT_ACID, isMmCtas)); - colTypes, destTableIsAcid ? Operation.INSERT : Operation.NOT_ACID)); ++ colTypes, destTableIsAcid ? Operation.INSERT : Operation.NOT_ACID, isMmCtas)); if (tblDesc == null) { if (viewDesc != null) { table_desc = PlanUtils.getTableDesc(viewDesc, cols, colTypes); http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 356ab6f,f0089fc..752a934 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@@ -18,17 -18,8 +18,19 @@@ package org.apache.hadoop.hive.ql.parse; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.common.collect.Interner; + import com.google.common.collect.Interners; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.conf.HiveConf; @@@ -228,17 -227,47 +239,19 @@@ public abstract class TaskCompiler } } - boolean oneLoadFile = true; + boolean oneLoadFileForCtas = true; for (LoadFileDesc lfd : loadFileWork) { if (pCtx.getQueryProperties().isCTAS() || pCtx.getQueryProperties().isMaterializedView()) { - assert (oneLoadFile); // should not have more than 1 load file for - // CTAS - // make the movetask's destination directory the table's destination. - Path location; - String loc = pCtx.getQueryProperties().isCTAS() ? - pCtx.getCreateTable().getLocation() : pCtx.getCreateViewDesc().getLocation(); - if (loc == null) { - // get the default location - Path targetPath; - try { - String protoName = null; - if (pCtx.getQueryProperties().isCTAS()) { - protoName = pCtx.getCreateTable().getTableName(); - } else if (pCtx.getQueryProperties().isMaterializedView()) { - protoName = pCtx.getCreateViewDesc().getViewName(); - } - String[] names = Utilities.getDbTableName(protoName); - if (!db.databaseExists(names[0])) { - throw new SemanticException("ERROR: The database " + names[0] - + " does not exist."); - } - Warehouse wh = new Warehouse(conf); - targetPath = wh.getDefaultTablePath(db.getDatabase(names[0]), names[1]); - } catch (HiveException | MetaException e) { - throw new SemanticException(e); - } - - location = targetPath; - } else { - location = new Path(loc); + if (!oneLoadFileForCtas) { // should not have more than 1 load file for CTAS. + throw new SemanticException( + "One query is not expected to contain multiple CTAS loads statements"); } - lfd.setTargetDir(location); - - oneLoadFile = false; + setLoadFileLocation(pCtx, lfd); + oneLoadFileForCtas = false; } - mvTask.add(TaskFactory.get(new MoveWork(null, null, null, lfd, false), conf)); + mvTask.add(TaskFactory + .get(new MoveWork(null, null, null, lfd, false, SessionState.get().getLineageState()), + conf)); } } http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java index 45d4fb0,023d247..9477df6 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadDesc.java @@@ -35,8 -35,8 +35,9 @@@ public class LoadDesc implements Serial * Need to remember whether this is an acid compliant operation, and if so whether it is an * insert, update, or delete. */ - private final AcidUtils.Operation writeType; + final AcidUtils.Operation writeType; + + public LoadDesc(final Path sourcePath, AcidUtils.Operation writeType) { this.sourcePath = sourcePath; this.writeType = writeType; http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java index 0032648,0292af5..d90acce --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java @@@ -51,8 -48,8 +51,8 @@@ public class LoadFileDesc extends LoadD public LoadFileDesc(final CreateTableDesc createTableDesc, final CreateViewDesc createViewDesc, final Path sourcePath, final Path targetDir, final boolean isDfsDir, - final String columns, final String columnTypes, AcidUtils.Operation writeType) { - this(sourcePath, targetDir, isDfsDir, columns, columnTypes, writeType); + final String columns, final String columnTypes, AcidUtils.Operation writeType, boolean isMmCtas) { - this(sourcePath, targetDir, isDfsDir, columns, columnTypes, writeType, isMmCtas); ++ this(sourcePath, targetDir, isDfsDir, columns, columnTypes, writeType, isMmCtas); if (createTableDesc != null && createTableDesc.getDatabaseName() != null && createTableDesc.getTableName() != null) { destinationCreateTable = (createTableDesc.getTableName().contains(".") ? "" : createTableDesc http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java index e893ab5,90a970c..85d1324 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java @@@ -18,13 -18,7 +18,9 @@@ package org.apache.hadoop.hive.ql.plan; - import java.io.Serializable; - import java.util.LinkedHashMap; - import java.util.Map; - import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.plan.Explain.Level; @@@ -39,8 -37,10 +39,8 @@@ public class LoadTableDesc extends Load private ListBucketingCtx lbCtx; private boolean inheritTableSpecs = true; //For partitions, flag controlling whether the current //table specs are to be used - private Long txnId; - /* - if the writeType above is NOT_ACID then the currentTransactionId will be null - */ - private final Long currentTransactionId; + private int stmtId; ++ private Long currentTransactionId; // TODO: the below seems like they should just be combined into partitionDesc private org.apache.hadoop.hive.ql.plan.TableDesc table; @@@ -59,15 -59,13 +60,14 @@@ } public LoadTableDesc(final Path sourcePath, - final org.apache.hadoop.hive.ql.plan.TableDesc table, + final TableDesc table, final Map<String, String> partitionSpec, final boolean replace, - final AcidUtils.Operation writeType, - Long txnId) { + final AcidUtils.Operation writeType, Long currentTransactionId) { super(sourcePath, writeType); - this.currentTransactionId = currentTransactionId; - init(table, partitionSpec, replace); + Utilities.LOG14535.info("creating part LTD from " + sourcePath + " to " + + ((table.getProperties() == null) ? "null" : table.getTableName())); - init(table, partitionSpec, replace, txnId); ++ init(table, partitionSpec, replace, currentTransactionId); } /** @@@ -105,17 -103,16 +105,17 @@@ } public LoadTableDesc(final Path sourcePath, - final org.apache.hadoop.hive.ql.plan.TableDesc table, + final TableDesc table, final DynamicPartitionCtx dpCtx, - final AcidUtils.Operation writeType, Long currentTransactionId) { + final AcidUtils.Operation writeType, + boolean isReplace, Long txnId) { super(sourcePath, writeType); + Utilities.LOG14535.info("creating LTD from " + sourcePath + " to " + table.getTableName()/*, new Exception()*/); this.dpCtx = dpCtx; - this.currentTransactionId = currentTransactionId; if (dpCtx != null && dpCtx.getPartSpec() != null && partitionSpec == null) { - init(table, dpCtx.getPartSpec(), true); + init(table, dpCtx.getPartSpec(), isReplace, txnId); } else { - init(table, new LinkedHashMap<String, String>(), isReplace, txnId); - init(table, new LinkedHashMap<>(), true); ++ init(table, new LinkedHashMap<>(), isReplace, txnId); } } @@@ -127,7 -123,6 +127,7 @@@ this.table = table; this.partitionSpec = partitionSpec; this.replace = replace; - this.txnId = txnId; ++ this.currentTransactionId = txnId; } @Explain(displayName = "table", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @@@ -196,27 -182,7 +196,27 @@@ this.lbCtx = lbCtx; } - public Long getTxnId() { - return txnId; - public long getCurrentTransactionId() { - return writeType == AcidUtils.Operation.NOT_ACID ? 0L : currentTransactionId; ++ public long getTxnId() { ++ return currentTransactionId == null ? 0 : currentTransactionId; + } + + public void setTxnId(Long txnId) { - this.txnId = txnId; ++ this.currentTransactionId = txnId; + } + + public int getStmtId() { + return stmtId; + } + + public void setStmtId(int stmtId) { + this.stmtId = stmtId; + } + + public void setIntermediateInMmWrite(boolean b) { + this.commitMmWriteId = !b; + } + + public boolean isCommitMmWrite() { + return commitMmWriteId; } } http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java index 8594edf,00c0ce3..f6303ba --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java @@@ -56,22 -61,22 +62,26 @@@ public class MoveWork implements Serial * List of inserted partitions */ protected List<Partition> movedParts; + private boolean isNoop; public MoveWork() { + sessionStateLineageState = null; } - private MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs) { ++ + private MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs, + LineageState lineageState) { this.inputs = inputs; this.outputs = outputs; + sessionStateLineageState = lineageState; } public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs, final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, - boolean checkFileFormat, boolean srcLocal) { - this(inputs, outputs); + boolean checkFileFormat, boolean srcLocal, LineageState lineageState) { + this(inputs, outputs, lineageState); + Utilities.LOG14535.info("Creating MoveWork " + System.identityHashCode(this) + + " with " + loadTableWork + "; " + loadFileWork); this.loadTableWork = loadTableWork; this.loadFileWork = loadFileWork; this.checkFileFormat = checkFileFormat; @@@ -80,8 -85,11 +90,8 @@@ public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs, final LoadTableDesc loadTableWork, final LoadFileDesc loadFileWork, - boolean checkFileFormat) { - this(inputs, outputs, loadTableWork, loadFileWork, checkFileFormat, false); + boolean checkFileFormat, LineageState lineageState) { - this(inputs, outputs, lineageState); - this.loadTableWork = loadTableWork; - this.loadFileWork = loadFileWork; - this.checkFileFormat = checkFileFormat; ++ this(inputs, outputs, loadTableWork, loadFileWork, checkFileFormat, false, lineageState); } public MoveWork(final MoveWork o) { @@@ -153,11 -162,7 +164,15 @@@ this.srcLocal = srcLocal; } + public void setNoop(boolean b) { + this.isNoop = true; + } + + public boolean isNoop() { + return this.isNoop; + } ++ + public LineageState getLineagState() { + return sessionStateLineageState; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestGenMapRedUtilsCreateConditionalTask.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/379d9bab/ql/src/test/results/clientpositive/tez/explainuser_3.q.out ---------------------------------------------------------------------- diff --cc ql/src/test/results/clientpositive/tez/explainuser_3.q.out index 9f764b8,a331bf7..7e89478 --- a/ql/src/test/results/clientpositive/tez/explainuser_3.q.out +++ b/ql/src/test/results/clientpositive/tez/explainuser_3.q.out @@@ -509,13 -509,13 +509,13 @@@ Stage- Conditional Operator Stage-1 Map 1 vectorized - File Output Operator [FS_10] + File Output Operator [FS_8] table:{"name:":"default.orc_merge5"} - Select Operator [SEL_9] (rows=306 width=335) + Select Operator [SEL_9] (rows=1 width=352) Output:["_col0","_col1","_col2","_col3","_col4"] - Filter Operator [FIL_8] (rows=306 width=335) + Filter Operator [FIL_8] (rows=1 width=352) predicate:(userid <= 13) - TableScan [TS_0] (rows=919 width=335) + TableScan [TS_0] (rows=1 width=352) default@orc_merge5,orc_merge5,Tbl:COMPLETE,Col:NONE,Output:["userid","string1","subtype","decimal1","ts"] Stage-4(CONDITIONAL) File Merge
