This is an automated email from the ASF dual-hosted git repository. vgarg pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new b1a446b HIVE-22045 : HIVE-21711 introduced regression in data load (Vineet Garg, reviewed by Gopal V) b1a446b is described below commit b1a446bd7c9a8f18ca19d6e9f31f158beaf85308 Author: Vineet Garg <vg...@apache.org> AuthorDate: Thu Aug 1 11:04:18 2019 -0700 HIVE-22045 : HIVE-21711 introduced regression in data load (Vineet Garg, reviewed by Gopal V) --- .../org/apache/hadoop/hive/ql/exec/Utilities.java | 27 ++++++++++++++++++---- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 3 ++- .../apache/hadoop/hive/ql/plan/FileSinkDesc.java | 21 +++++++++++++++-- .../hadoop/hive/ql/exec/TestFileSinkOperator.java | 2 +- 4 files changed, 44 insertions(+), 9 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 1d32ba0..ac89dd9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1431,17 +1431,28 @@ public final class Utilities { // 3) Rename/move the temp directory to specPath FileSystem fs = specPath.getFileSystem(hconf); - boolean isBlobStorage = BlobStorageUtils.isBlobStorageFileSystem(hconf, fs); Path tmpPath = Utilities.toTempPath(specPath); Path taskTmpPath = Utilities.toTaskTempPath(specPath); + PerfLogger perfLogger = SessionState.getPerfLogger(); + boolean isBlobStorage = BlobStorageUtils.isBlobStorageFileSystem(hconf, fs); + boolean avoidRename = false; + boolean shouldAvoidRename = shouldAvoidRename(conf, hconf); + + if(isBlobStorage && (shouldAvoidRename|| ((conf != null) && conf.isCTASorCM())) + || (!isBlobStorage && shouldAvoidRename)) { + avoidRename = true; + } if (success) { - if (!shouldAvoidRename(conf, hconf) && fs.exists(tmpPath) && !isBlobStorage) { + if (!avoidRename && fs.exists(tmpPath)) { // 1) Rename tmpPath to a new directory name to prevent additional files // from being added by runaway processes. + // this is only done for all statements except SELECT, CTAS and Create MV Path tmpPathOriginal = tmpPath; tmpPath = new Path(tmpPath.getParent(), tmpPath.getName() + ".moved"); - LOG.debug("Moving/Renaming " + tmpPathOriginal + " to " + tmpPath); + LOG.debug("shouldAvoidRename is false therefore moving/renaming " + tmpPathOriginal + " to " + tmpPath); + perfLogger.PerfLogBegin("FileSinkOperator", "rename"); Utilities.rename(fs, tmpPathOriginal, tmpPath); + perfLogger.PerfLogEnd("FileSinkOperator", "rename"); } // Remove duplicates from tmpPath @@ -1449,7 +1460,6 @@ public final class Utilities { tmpPath, ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs); FileStatus[] statuses = statusList.toArray(new FileStatus[statusList.size()]); if(statuses != null && statuses.length > 0) { - PerfLogger perfLogger = SessionState.getPerfLogger(); Set<FileStatus> filesKept = new HashSet<>(); perfLogger.PerfLogBegin("FileSinkOperator", "RemoveTempOrDuplicateFiles"); // remove any tmp file or double-committed output files @@ -1471,12 +1481,19 @@ public final class Utilities { // move to the file destination Utilities.FILE_OP_LOGGER.trace("Moving tmp dir: {} to: {}", tmpPath, specPath); if(shouldAvoidRename(conf, hconf)){ + // for SELECT statements LOG.debug("Skipping rename/move files. Files to be kept are: " + filesKept.toString()); conf.getFilesToFetch().addAll(filesKept); - } else if (isBlobStorage) { + } else if (conf !=null && conf.isCTASorCM() && isBlobStorage) { + // for CTAS or Create MV statements + perfLogger.PerfLogBegin("FileSinkOperator", "moveSpecifiedFileStatus"); + LOG.debug("CTAS/Create MV: Files being renamed: " + filesKept.toString()); Utilities.moveSpecifiedFileStatus(fs, tmpPath, specPath, filesKept); + perfLogger.PerfLogEnd("FileSinkOperator", "moveSpecifiedFileStatus"); } else { + // for rest of the statement e.g. INSERT, LOAD etc perfLogger.PerfLogBegin("FileSinkOperator", "RenameOrMoveFiles"); + LOG.debug("Final renaming/moving. Source: " + tmpPath + " .Destination: " + specPath); Utilities.renameOrMoveFiles(fs, tmpPath, specPath); perfLogger.PerfLogEnd("FileSinkOperator", "RenameOrMoveFiles"); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 8ff00fb..826b23e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -8168,7 +8168,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { FileSinkDesc fileSinkDesc = new FileSinkDesc(queryTmpdir, table_desc, conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId, rsCtx.isMultiFileSpray(), canBeMerged, rsCtx.getNumFiles(), rsCtx.getTotalFiles(), rsCtx.getPartnCols(), dpCtx, - dest_path, mmWriteId, isMmCtas, isInsertOverwrite, qb.getIsQuery()); + dest_path, mmWriteId, isMmCtas, isInsertOverwrite, qb.getIsQuery(), + qb.isCTAS() || qb.isMaterializedView()); boolean isHiveServerQuery = SessionState.get().isHiveServerQuery(); fileSinkDesc.setHiveServerQuery(isHiveServerQuery); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index 61ea28a..72ecde4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -115,6 +115,8 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe private boolean isQuery = false; + private boolean isCTASorCM = false; + public FileSinkDesc() { } @@ -125,7 +127,7 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe final boolean compressed, final int destTableId, final boolean multiFileSpray, final boolean canBeMerged, final int numFiles, final int totalFiles, final ArrayList<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx, Path destPath, - Long mmWriteId, boolean isMmCtas, boolean isInsertOverwrite, boolean isQuery) { + Long mmWriteId, boolean isMmCtas, boolean isInsertOverwrite, boolean isQuery, boolean isCTASorCM) { this.dirName = dirName; this.tableInfo = tableInfo; @@ -143,6 +145,7 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe this.isMmCtas = isMmCtas; this.isInsertOverwrite = isInsertOverwrite; this.isQuery = isQuery; + this.isCTASorCM = isCTASorCM; } public FileSinkDesc(final Path dirName, final TableDesc tableInfo, @@ -164,7 +167,7 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe public Object clone() throws CloneNotSupportedException { FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed, destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles, - partitionCols, dpCtx, destPath, mmWriteId, isMmCtas, isInsertOverwrite, isQuery); + partitionCols, dpCtx, destPath, mmWriteId, isMmCtas, isInsertOverwrite, isQuery, isCTASorCM); ret.setCompressCodec(compressCodec); ret.setCompressType(compressType); ret.setGatherStats(gatherStats); @@ -181,6 +184,7 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe ret.setIsMerge(isMerge); ret.setFilesToFetch(filesToFetch); ret.setIsQuery(isQuery); + ret.setIsCTASorCM(isCTASorCM); return ret; } @@ -188,6 +192,10 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe this.filesToFetch = filesToFetch; } + public void setIsCTASorCM(boolean isCTASorCM) { + this.isCTASorCM = isCTASorCM; + } + public void setIsQuery(boolean isQuery) { this.isQuery = isQuery; } @@ -591,6 +599,15 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe return isMmCtas; } + /** + * Whether this is CREATE TABLE SELECT or CREATE MATERIALIZED VIEW statemet + * Set by semantic analyzer this is required because CTAS/CM requires some special logic + * in mvFileToFinalPath + */ + public boolean isCTASorCM() { + return isCTASorCM; + } + public class FileSinkOperatorExplainVectorization extends OperatorExplainVectorization { public FileSinkOperatorExplainVectorization(VectorFileSinkDesc vectorFileSinkDesc) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index a75103d..2c4b69b 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -286,7 +286,7 @@ public class TestFileSinkOperator { DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(partColMap, "Sunday", 100); //todo: does this need the finalDestination? desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, - false, 1, 1, partCols, dpCtx, null, null, false, false, false); + false, 1, 1, partCols, dpCtx, null, null, false, false, false, false); } else { desc = new FileSinkDesc(basePath, tableDesc, false); }