Repository: hive Updated Branches: refs/heads/branch-1 fda7c5175 -> ca20049ff
HIVE-11925 : Hive file format checking breaks load from named pipes (Sergey Shelukhin, reviewed by Ashutosh Chauhan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/ca20049f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/ca20049f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/ca20049f Branch: refs/heads/branch-1 Commit: ca20049ff83db9e111afb47b2e8d93ad6e366299 Parents: fda7c51 Author: Sergey Shelukhin <ser...@apache.org> Authored: Thu Oct 1 12:42:28 2015 -0700 Committer: Sergey Shelukhin <ser...@apache.org> Committed: Thu Oct 1 14:19:01 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/io/HiveFileFormatUtils.java | 95 ++++++++++++++------ .../hadoop/hive/ql/io/InputFormatChecker.java | 5 +- .../hadoop/hive/ql/io/RCFileInputFormat.java | 3 +- .../ql/io/SequenceFileInputFormatChecker.java | 3 +- .../hive/ql/io/VectorizedRCFileInputFormat.java | 3 +- .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 4 +- .../ql/io/orc/VectorizedOrcInputFormat.java | 2 +- .../hive/ql/exec/TestFileSinkOperator.java | 2 +- .../hive/ql/txn/compactor/CompactorTest.java | 2 +- 9 files changed, 80 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/ca20049f/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java index 50ba740..06d3df7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java @@ -19,8 +19,13 @@ package org.apache.hadoop.hive.ql.io; import java.io.IOException; +import java.nio.file.FileSystemNotFoundException; +import java.nio.file.Files; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -28,10 +33,13 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; @@ -69,6 +77,7 @@ import org.apache.hive.common.util.ReflectionUtil; * */ public final class HiveFileFormatUtils { + private static final Log LOG = LogFactory.getLog(HiveFileFormatUtils.class); static { outputFormatSubstituteMap = @@ -177,44 +186,51 @@ public final class HiveFileFormatUtils { */ @SuppressWarnings("unchecked") public static boolean checkInputFormat(FileSystem fs, HiveConf conf, - Class<? extends InputFormat> inputFormatCls, ArrayList<FileStatus> files) + Class<? extends InputFormat> inputFormatCls, List<FileStatus> files) throws HiveException { - if (files.size() > 0) { - Class<? extends InputFormatChecker> checkerCls = getInputFormatChecker(inputFormatCls); - if (checkerCls == null - && inputFormatCls.isAssignableFrom(TextInputFormat.class)) { - // we get a text input format here, we can not determine a file is text - // according to its content, so we can do is to test if other file - // format can accept it. If one other file format can accept this file, - // we treat this file as text file, although it maybe not. - return checkTextInputFormat(fs, conf, files); - } + if (files.isEmpty()) return false; + Class<? extends InputFormatChecker> checkerCls = getInputFormatChecker(inputFormatCls); + if (checkerCls == null + && inputFormatCls.isAssignableFrom(TextInputFormat.class)) { + // we get a text input format here, we can not determine a file is text + // according to its content, so we can do is to test if other file + // format can accept it. If one other file format can accept this file, + // we treat this file as text file, although it maybe not. + return checkTextInputFormat(fs, conf, files); + } - if (checkerCls != null) { - InputFormatChecker checkerInstance = inputFormatCheckerInstanceCache - .get(checkerCls); - try { - if (checkerInstance == null) { - checkerInstance = checkerCls.newInstance(); - inputFormatCheckerInstanceCache.put(checkerCls, checkerInstance); - } - return checkerInstance.validateInput(fs, conf, files); - } catch (Exception e) { - throw new HiveException(e); + if (checkerCls != null) { + InputFormatChecker checkerInstance = inputFormatCheckerInstanceCache.get(checkerCls); + try { + if (checkerInstance == null) { + checkerInstance = checkerCls.newInstance(); + inputFormatCheckerInstanceCache.put(checkerCls, checkerInstance); } + return checkerInstance.validateInput(fs, conf, files); + } catch (Exception e) { + throw new HiveException(e); } - return true; } - return false; + return true; } @SuppressWarnings("unchecked") private static boolean checkTextInputFormat(FileSystem fs, HiveConf conf, - ArrayList<FileStatus> files) throws HiveException { - Set<Class<? extends InputFormat>> inputFormatter = inputFormatCheckerMap - .keySet(); + List<FileStatus> files) throws HiveException { + List<FileStatus> files2 = new LinkedList<>(files); + Iterator<FileStatus> iter = files2.iterator(); + while (iter.hasNext()) { + FileStatus file = iter.next(); + if (file == null) continue; + if (isPipe(fs, file)) { + LOG.info("Skipping format check for " + file.getPath() + " as it is a pipe"); + iter.remove(); + } + } + if (files2.isEmpty()) return true; + Set<Class<? extends InputFormat>> inputFormatter = inputFormatCheckerMap.keySet(); for (Class<? extends InputFormat> reg : inputFormatter) { - boolean result = checkInputFormat(fs, conf, reg, files); + boolean result = checkInputFormat(fs, conf, reg, files2); if (result) { return false; } @@ -222,6 +238,29 @@ public final class HiveFileFormatUtils { return true; } + // See include/uapi/linux/stat.h + private static final int S_IFIFO = 0010000; + private static boolean isPipe(FileSystem fs, FileStatus file) { + if (fs instanceof DistributedFileSystem) { + return false; // Shortcut for HDFS. + } + int mode = 0; + Object pathToLog = file.getPath(); + try { + java.nio.file.Path realPath = Paths.get(file.getPath().toUri()); + pathToLog = realPath; + mode = (Integer)Files.getAttribute(realPath, "unix:mode"); + } catch (FileSystemNotFoundException t) { + return false; // Probably not a local filesystem; no need to check. + } catch (UnsupportedOperationException | IOException + | SecurityException | IllegalArgumentException t) { + LOG.info("Failed to check mode for " + pathToLog + ": " + + t.getMessage() + " (" + t.getClass() + ")"); + return false; + } + return (mode & S_IFIFO) != 0; + } + public static RecordWriter getHiveRecordWriter(JobConf jc, TableDesc tableInfo, Class<? extends Writable> outputClass, FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException { http://git-wip-us.apache.org/repos/asf/hive/blob/ca20049f/ql/src/java/org/apache/hadoop/hive/ql/io/InputFormatChecker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/InputFormatChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/io/InputFormatChecker.java index 3945411..129b834 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/InputFormatChecker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/InputFormatChecker.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hive.ql.io; import java.io.IOException; -import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -34,7 +34,6 @@ public interface InputFormatChecker { * This method is used to validate the input files. * */ - boolean validateInput(FileSystem fs, HiveConf conf, - ArrayList<FileStatus> files) throws IOException; + boolean validateInput(FileSystem fs, HiveConf conf, List<FileStatus> files) throws IOException; } http://git-wip-us.apache.org/repos/asf/hive/blob/ca20049f/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java index 88198ed..6004db8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileInputFormat.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io; import java.io.IOException; import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -59,7 +60,7 @@ public class RCFileInputFormat<K extends LongWritable, V extends BytesRefArrayWr @Override public boolean validateInput(FileSystem fs, HiveConf conf, - ArrayList<FileStatus> files) throws IOException { + List<FileStatus> files) throws IOException { if (files.size() <= 0) { return false; } http://git-wip-us.apache.org/repos/asf/hive/blob/ca20049f/ql/src/java/org/apache/hadoop/hive/ql/io/SequenceFileInputFormatChecker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/SequenceFileInputFormatChecker.java b/ql/src/java/org/apache/hadoop/hive/ql/io/SequenceFileInputFormatChecker.java index e2666d7..6cb46c9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/SequenceFileInputFormatChecker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/SequenceFileInputFormatChecker.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io; import java.io.IOException; import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -35,7 +36,7 @@ public class SequenceFileInputFormatChecker implements InputFormatChecker { @Override public boolean validateInput(FileSystem fs, HiveConf conf, - ArrayList<FileStatus> files) throws IOException { + List<FileStatus> files) throws IOException { if (files.size() <= 0) { return false; } http://git-wip-us.apache.org/repos/asf/hive/blob/ca20049f/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java index faad5f2..e9e1d5a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io; import java.io.IOException; import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -56,7 +57,7 @@ public class VectorizedRCFileInputFormat extends FileInputFormat<NullWritable, V @Override public boolean validateInput(FileSystem fs, HiveConf conf, - ArrayList<FileStatus> files) throws IOException { + List<FileStatus> files) throws IOException { if (files.size() <= 0) { return false; } http://git-wip-us.apache.org/repos/asf/hive/blob/ca20049f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index f078018..9a61ca0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -101,7 +101,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * that added this event. Insert and update events include the entire row, while * delete events have null for row. */ -public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, +public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, InputFormatChecker, VectorizedInputFormatInterface, AcidInputFormat<NullWritable, OrcStruct>, CombineHiveInputFormat.AvoidSplitCombination { @@ -323,7 +323,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, @Override public boolean validateInput(FileSystem fs, HiveConf conf, - ArrayList<FileStatus> files + List<FileStatus> files ) throws IOException { if (Utilities.isVectorMode(conf)) { http://git-wip-us.apache.org/repos/asf/hive/blob/ca20049f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java index 3992d8c..bf09001 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java @@ -158,7 +158,7 @@ public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, Vect @Override public boolean validateInput(FileSystem fs, HiveConf conf, - ArrayList<FileStatus> files + List<FileStatus> files ) throws IOException { if (files.size() <= 0) { return false; http://git-wip-us.apache.org/repos/asf/hive/blob/ca20049f/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index c6ae030..5d140b4 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -572,7 +572,7 @@ public class TestFileSinkOperator { } @Override - public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList<FileStatus> files) throws + public boolean validateInput(FileSystem fs, HiveConf conf, List<FileStatus> files) throws IOException { return false; } http://git-wip-us.apache.org/repos/asf/hive/blob/ca20049f/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index 21adc9d..5a8c932 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -361,7 +361,7 @@ public abstract class CompactorTest { } @Override - public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList<FileStatus> files) throws + public boolean validateInput(FileSystem fs, HiveConf conf, List<FileStatus> files) throws IOException { return false; }