This is an automated email from the ASF dual-hosted git repository. jcamacho pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 61c9b2e HIVE-23354: Remove file size sanity checking from compareTempOrDuplicateFiles (John Sherman, reviewed by Jesus Camacho Rodriguez) 61c9b2e is described below commit 61c9b2eebd7411c982e8f33e1fe27636f98897a0 Author: John Sherman <j...@cloudera.com> AuthorDate: Mon May 18 13:19:21 2020 -0700 HIVE-23354: Remove file size sanity checking from compareTempOrDuplicateFiles (John Sherman, reviewed by Jesus Camacho Rodriguez) Close apache/hive#1022 --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 5 +- .../org/apache/hadoop/hive/ql/exec/Utilities.java | 119 +++++++++++---------- .../apache/hadoop/hive/ql/exec/mr/ExecDriver.java | 7 +- .../apache/hadoop/hive/ql/exec/tez/DagUtils.java | 9 +- .../hadoop/hive/ql/io/merge/MergeFileTask.java | 3 + .../ql/io/rcfile/truncate/ColumnTruncateTask.java | 3 + .../hadoop/hive/ql/txn/compactor/CompactorMR.java | 4 + .../apache/hadoop/hive/ql/exec/TestUtilities.java | 4 + 8 files changed, 87 insertions(+), 67 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index f5ad3a8..5a39006 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -601,8 +601,9 @@ public class HiveConf extends Configuration { EXECPARALLEL("hive.exec.parallel", false, "Whether to execute jobs in parallel"), EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8, "How many jobs at most can be executed in parallel"), - HIVESPECULATIVEEXECREDUCERS("hive.mapred.reduce.tasks.speculative.execution", true, - "Whether speculative execution for reducers should be turned on. "), + @Deprecated + HIVESPECULATIVEEXECREDUCERS("hive.mapred.reduce.tasks.speculative.execution", false, + "(Deprecated) Whether speculative execution for reducers should be turned on. "), HIVECOUNTERSPULLINTERVAL("hive.exec.counters.pull.interval", 1000L, "The interval with which to poll the JobTracker for the counters the running job. \n" + "The smaller it is the more load there will be on the jobtracker, the higher it is the less granular the caught will be."), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 0e4ce78..811fcc0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -1802,7 +1802,7 @@ public final class Utilities { } } - taskIDToFile = removeTempOrDuplicateFilesNonMm(items, fs); + taskIDToFile = removeTempOrDuplicateFilesNonMm(items, fs, hconf); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); } @@ -1815,7 +1815,7 @@ public final class Utilities { } Path mmDir = extractNonDpMmDir(writeId, stmtId, fileStats, isBaseDir); taskIDToFile = removeTempOrDuplicateFilesNonMm( - fs.listStatus(new Path(mmDir, unionSuffix)), fs); + fs.listStatus(new Path(mmDir, unionSuffix)), fs, hconf); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); } @@ -1825,13 +1825,13 @@ public final class Utilities { return result; } if (!isMmTable) { - taskIDToFile = removeTempOrDuplicateFilesNonMm(fileStats, fs); + taskIDToFile = removeTempOrDuplicateFilesNonMm(fileStats, fs, hconf); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); } } else { Path mmDir = extractNonDpMmDir(writeId, stmtId, fileStats, isBaseDir); - taskIDToFile = removeTempOrDuplicateFilesNonMm(fs.listStatus(mmDir), fs); + taskIDToFile = removeTempOrDuplicateFilesNonMm(fs.listStatus(mmDir), fs, hconf); if (filesKept != null && taskIDToFile != null) { addFilesToPathSet(taskIDToFile.values(), filesKept); } @@ -1896,12 +1896,20 @@ public final class Utilities { } private static HashMap<String, FileStatus> removeTempOrDuplicateFilesNonMm( - FileStatus[] files, FileSystem fs) throws IOException { + FileStatus[] files, FileSystem fs, Configuration conf) throws IOException { if (files == null || fs == null) { return null; } HashMap<String, FileStatus> taskIdToFile = new HashMap<String, FileStatus>(); + // This method currently does not support speculative execution due to + // compareTempOrDuplicateFiles not being able to de-duplicate speculative + // execution created files + if (isSpeculativeExecution(conf)) { + String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); + throw new IOException("Speculative execution is not supported for engine " + engine); + } + for (FileStatus one : files) { if (isTempPath(one)) { Path onePath = one.getPath(); @@ -1912,31 +1920,62 @@ public final class Utilities { } } else { // This would be a single file. See if we need to remove it. - ponderRemovingTempOrDuplicateFile(fs, one, taskIdToFile); + ponderRemovingTempOrDuplicateFile(fs, one, taskIdToFile, conf); } } return taskIdToFile; } private static void ponderRemovingTempOrDuplicateFile(FileSystem fs, - FileStatus file, HashMap<String, FileStatus> taskIdToFile) throws IOException { + FileStatus file, HashMap<String, FileStatus> taskIdToFile, Configuration conf) + throws IOException { Path filePath = file.getPath(); String taskId = getPrefixedTaskIdFromFilename(filePath.getName()); Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles looking at {}" + ", taskId {}", filePath, taskId); FileStatus otherFile = taskIdToFile.get(taskId); taskIdToFile.put(taskId, (otherFile == null) ? file : - compareTempOrDuplicateFiles(fs, file, otherFile)); + compareTempOrDuplicateFiles(fs, file, otherFile, conf)); + } + + private static boolean warnIfSet(Configuration conf, String value) { + if (conf.getBoolean(value, false)) { + LOG.warn(value + " support is currently deprecated"); + return true; + } + return false; + } + + private static boolean isSpeculativeExecution(Configuration conf) { + String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); + boolean isSpeculative = false; + if ("mr".equalsIgnoreCase(engine)) { + isSpeculative = warnIfSet(conf, "mapreduce.map.speculative") || + warnIfSet(conf, "mapreduce.reduce.speculative") || + warnIfSet(conf, "mapred.map.tasks.speculative.execution") || + warnIfSet(conf, "mapred.reduce.tasks.speculative.execution"); + } else if ("tez".equalsIgnoreCase(engine)) { + isSpeculative = warnIfSet(conf, "tez.am.speculation.enabled"); + } // all other engines do not support speculative execution + + return isSpeculative; } private static FileStatus compareTempOrDuplicateFiles(FileSystem fs, - FileStatus file, FileStatus existingFile) throws IOException { - // Pick the one with mewest attempt ID. For sanity, check the file sizes too. - // If the file size of newest attempt is less than that for older one, - // Throw an exception as it maybe a correctness issue causing it. - // This breaks speculative execution if it ends prematurely. + FileStatus file, FileStatus existingFile, Configuration conf) throws IOException { + // Pick the one with newest attempt ID. Previously, this function threw an + // exception when the file size of the newer attempt was less than the + // older attempt. This was an incorrect assumption due to various + // techniques like file compression and no guarantee that the new task will + // write values in the same order. FileStatus toDelete = null, toRetain = null; + // This method currently does not support speculative execution + if (isSpeculativeExecution(conf)) { + String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE); + throw new IOException("Speculative execution is not supported for engine " + engine); + } + // "LOAD .. INTO" and "INSERT INTO" commands will generate files with // "_copy_x" suffix. These files are usually read by map tasks and the // task output gets written to some tmp path. The output file names will @@ -1950,68 +1989,38 @@ public final class Utilities { // elimination. Path filePath = file.getPath(); if (isCopyFile(filePath.getName())) { - LOG.info("{} file identified as duplicate. This file is" + - " not deleted as it has copySuffix.", filePath); + LOG.info("{} file identified as duplicate. This file is" + + " not deleted as it has copySuffix.", filePath); return existingFile; } int existingFileAttemptId = getAttemptIdFromFilename(existingFile.getPath().getName()); int fileAttemptId = getAttemptIdFromFilename(file.getPath().getName()); - - long existingFileSz = getFileSizeRecursively(fs, existingFile); - long fileSz = getFileSizeRecursively(fs, file); // Files may come in any order irrespective of their attempt IDs - if (existingFileAttemptId > fileAttemptId && - existingFileSz >= fileSz) { + if (existingFileAttemptId > fileAttemptId) { // keep existing toRetain = existingFile; toDelete = file; - } else if (existingFileAttemptId < fileAttemptId && - existingFileSz <= fileSz) { + } else if (existingFileAttemptId < fileAttemptId) { // keep file toRetain = file; toDelete = existingFile; } else { - throw new IOException(" File " + filePath + - " with newer attempt ID " + fileAttemptId + " is smaller than the file " - + existingFile.getPath() + " with older attempt ID " + existingFileAttemptId); + throw new IOException(filePath + " has same attempt ID " + fileAttemptId + " as " + + existingFile.getPath()); } + if (!fs.delete(toDelete.getPath(), true)) { - throw new IOException( - "Unable to delete duplicate file: " + toDelete.getPath() - + ". Existing file: " + toRetain.getPath()); - } else { - LOG.warn("Duplicate taskid file removed: " + toDelete.getPath() + " with length " - + toDelete.getLen() + ". Existing file: " + toRetain.getPath() + " with length " - + toRetain.getLen()); + throw new IOException("Unable to delete duplicate file: " + toDelete.getPath() + + ". Existing file: " + toRetain.getPath()); } + LOG.warn("Duplicate taskid file removed: " + toDelete.getPath() + " with length " + + toDelete.getLen() + ". Existing file: " + toRetain.getPath() + " with length " + + toRetain.getLen()); return toRetain; } - // This function recurisvely fetches the size of all the files in given directory - private static long getFileSizeRecursively(FileSystem fs, FileStatus src) - throws IOException { - long size = 0; - if (src.isDirectory()) { - LOG.debug(" src " + src.getPath() + " is a directory"); - // This is a directory. - try { - FileStatus[] files = fs.listStatus(src.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER); - // Recursively fetch sizes of each file - for (FileStatus file : files) { - size += getFileSizeRecursively(fs, file); - } - } catch (IOException e) { - throw new IOException("Unable to fetch files in directory " + src.getPath(), e); - } - } else { - size = src.getLen(); - LOG.debug("src " + src.getPath() + " is a file of size " + size); - } - return size; - } - public static boolean isCopyFile(String filename) { String taskId = filename; String copyFileSuffix = null; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 8a8822d..2071de3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -278,10 +278,9 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop // set input format information if necessary setInputAttributes(job); - // Turn on speculative execution for reducers - boolean useSpeculativeExecReducers = HiveConf.getBoolVar(job, - HiveConf.ConfVars.HIVESPECULATIVEEXECREDUCERS); - job.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, useSpeculativeExecReducers); + // HIVE-23354 enforces that MR speculative execution is disabled + job.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false); + job.setBoolean(MRJobConfig.MAP_SPECULATIVE, false); String inpFormat = HiveConf.getVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index 3e8ba08..97220c0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -869,12 +869,9 @@ public class DagUtils { // Is this required ? conf.set("mapred.reducer.class", ExecReducer.class.getName()); - - boolean useSpeculativeExecReducers = HiveConf.getBoolVar(conf, - HiveConf.ConfVars.HIVESPECULATIVEEXECREDUCERS); - conf.setBoolean(org.apache.hadoop.mapreduce.MRJobConfig.REDUCE_SPECULATIVE, - useSpeculativeExecReducers); - + // HIVE-23354 enforces that MR speculative execution is disabled + conf.setBoolean(org.apache.hadoop.mapreduce.MRJobConfig.REDUCE_SPECULATIVE, false); + conf.setBoolean(org.apache.hadoop.mapreduce.MRJobConfig.MAP_SPECULATIVE, false); return conf; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java index 7fb3878..34519fb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java @@ -99,6 +99,9 @@ public class MergeFileTask extends Task<MergeFileWork> implements Serializable, job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(0); + // HIVE-23354 enforces that MR speculative execution is disabled + job.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false); + job.setBoolean(MRJobConfig.MAP_SPECULATIVE, false); // create the temp directories Path outputPath = work.getOutputDir(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java index 0458c94..752eea9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java @@ -106,6 +106,9 @@ public class ColumnTruncateTask extends Task<ColumnTruncateWork> implements Seri // zero reducers job.setNumReduceTasks(0); + // HIVE-23354 enforces that MR speculative execution is disabled + job.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false); + job.setBoolean(MRJobConfig.MAP_SPECULATIVE, false); if (work.getMinSplitSize() != null) { HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, work diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 9410a96..05ea38c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -76,6 +76,7 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TaskAttemptContext; import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.Ref; @@ -328,6 +329,9 @@ public class CompactorMR { job.set(DIRS_TO_SEARCH, dirsToSearch.toString()); job.setLong(MIN_TXN, minTxn); job.setLong(MAX_TXN, maxTxn); + // HIVE-23354 enforces that MR speculative execution is disabled + job.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false); + job.setBoolean(MRJobConfig.MAP_SPECULATIVE, false); // Add tokens for all the file system in the input path. ArrayList<Path> dirs = new ArrayList<>(); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java index 163d439..04cfd9e 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFFromUtcTimestamp; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -199,6 +200,9 @@ public class TestUtilities { DynamicPartitionCtx dpCtx = getDynamicPartitionCtx(dPEnabled); Path tempDirPath = setupTempDirWithSingleOutputFile(hconf); FileSinkDesc conf = getFileSinkDesc(tempDirPath); + // HIVE-23354 enforces that MR speculative execution is disabled + hconf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false); + hconf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false); List<Path> paths = Utilities.removeTempOrDuplicateFiles(localFs, tempDirPath, dpCtx, conf, hconf, false);