Repository: hive Updated Branches: refs/heads/master 04d54f61c -> bab3ee31e
HIVE-11210. Remove dependency on HiveConf from Orc reader and writer. Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bab3ee31 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bab3ee31 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bab3ee31 Branch: refs/heads/master Commit: bab3ee31eec98ff28366949461e004d1386100f6 Parents: 04d54f6 Author: Owen O'Malley <omal...@apache.org> Authored: Wed Jul 8 15:02:49 2015 -0700 Committer: Owen O'Malley <omal...@apache.org> Committed: Tue Jul 21 14:07:39 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/io/orc/MemoryManager.java | 4 +- .../apache/hadoop/hive/ql/io/orc/OrcConf.java | 134 +++++++++++++++++++ .../apache/hadoop/hive/ql/io/orc/OrcFile.java | 40 +++--- .../apache/hadoop/hive/ql/io/orc/Reader.java | 34 ++++- .../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 33 ++--- .../hadoop/hive/ql/io/orc/WriterImpl.java | 27 ++-- 6 files changed, 212 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/bab3ee31/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java index 6432d6e..0347a1c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/MemoryManager.java @@ -22,7 +22,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; import com.google.common.base.Preconditions; @@ -91,8 +90,7 @@ class MemoryManager { * pool. */ MemoryManager(Configuration conf) { - HiveConf.ConfVars poolVar = HiveConf.ConfVars.HIVE_ORC_FILE_MEMORY_POOL; - double maxLoad = conf.getFloat(poolVar.varname, poolVar.defaultFloatVal); + double maxLoad = OrcConf.MEMORY_POOL.getDouble(conf); totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean(). getHeapMemoryUsage().getMax() * maxLoad); ownerLock.lock(); http://git-wip-us.apache.org/repos/asf/hive/blob/bab3ee31/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcConf.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcConf.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcConf.java new file mode 100644 index 0000000..aeb0ec1 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcConf.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import org.apache.hadoop.conf.Configuration; + +/** + * Define the configuration properties that Orc understands. + */ +public enum OrcConf { + STRIPE_SIZE("hive.exec.orc.default.stripe.size", + 64L * 1024 * 1024, + "Define the default ORC stripe size, in bytes."), + BLOCK_SIZE("hive.exec.orc.default.block.size", 256L * 1024 * 1024, + "Define the default file system block size for ORC files."), + ROW_INDEX_STRIDE("hive.exec.orc.default.row.index.stride", 10000, + "Define the default ORC index stride in number of rows. (Stride is the\n"+ + " number of rows n index entry represents.)"), + BUFFER_SIZE("hive.exec.orc.default.buffer.size", 256 * 1024, + "Define the default ORC buffer size, in bytes."), + BLOCK_PADDING("hive.exec.orc.default.block.padding", true, + "Define the default block padding, which pads stripes to the HDFS\n" + + " block boundaries."), + COMPRESS("hive.exec.orc.default.compress", "ZLIB", + "Define the default compression codec for ORC file"), + WRITE_FORMAT("hive.exec.orc.write.format", null, + "Define the version of the file to write. Possible values are 0.11 and\n"+ + " 0.12. If this parameter is not defined, ORC will use the run\n" + + " length encoding (RLE) introduced in Hive 0.12. Any value other\n" + + " than 0.11 results in the 0.12 encoding."), + ENCODING_STRATEGY("hive.exec.orc.encoding.strategy", "SPEED", + "Define the encoding strategy to use while writing data. Changing this\n"+ + "will only affect the light weight encoding for integers. This\n" + + "flag will not change the compression level of higher level\n" + + "compression codec (like ZLIB)."), + COMPRESSION_STRATEGY("hive.exec.orc.compression.strategy", "SPEED", + "Define the compression strategy to use while writing data.\n" + + "This changes the compression level of higher level compression\n" + + "codec (like ZLIB)."), + BLOCK_PADDING_TOLERANCE("hive.exec.orc.block.padding.tolerance", + 0.05, + "Define the tolerance for block padding as a decimal fraction of\n" + + "stripe size (for example, the default value 0.05 is 5% of the\n" + + "stripe size). For the defaults of 64Mb ORC stripe and 256Mb HDFS\n" + + "blocks, the default block padding tolerance of 5% will\n" + + "reserve a maximum of 3.2Mb for padding within the 256Mb block.\n" + + "In that case, if the available size within the block is more than\n"+ + "3.2Mb, a new smaller stripe will be inserted to fit within that\n" + + "space. This will make sure that no stripe written will block\n" + + " boundaries and cause remote reads within a node local task."), + BLOOM_FILTER_FPP("orc.default.bloom.fpp", 0.05, + "Define the default false positive probability for bloom filters."), + USE_ZEROCOPY("hive.exec.orc.zerocopy", false, + "Use zerocopy reads with ORC. (This requires Hadoop 2.3 or later.)"), + SKIP_CORRUPT_DATA("hive.exec.orc.skip.corrupt.data", false, + "If ORC reader encounters corrupt data, this value will be used to\n" + + "determine whether to skip the corrupt data or throw exception.\n" + + "The default behavior is to throw exception."), + MEMORY_POOL("hive.exec.orc.memory.pool", 0.5, + "Maximum fraction of heap that can be used by ORC file writers"), + DICTIONARY_KEY_SIZE_THRESHOLD("hive.exec.orc.dictionary.key.size.threshold", + 0.8, + "If the number of keys in a dictionary is greater than this fraction\n" + + "of the total number of non-null rows, turn off dictionary\n" + + "encoding. Use 1 to always use dictionary encoding."), + ROW_INDEX_STRIDE_DICTIONARY_CHECK("hive.orc.row.index.stride.dictionary.check", + true, + "If enabled dictionary check will happen after first row index stride\n" + + "(default 10000 rows) else dictionary check will happen before\n" + + "writing first stripe. In both cases, the decision to use\n" + + "dictionary or not will be retained thereafter."), + ; + + private final String attribute; + private final Object defaultValue; + private final String description; + + OrcConf(String attribute, Object defaultValue, String description) { + this.attribute = attribute; + this.defaultValue = defaultValue; + this.description = description; + } + + public String getAttribute() { + return attribute; + } + + public Object getDefaultValue() { + return defaultValue; + } + + public String getDescription() { + return description; + } + + public long getLong(Configuration conf) { + return conf.getLong(attribute, ((Number) defaultValue).longValue()); + } + + public String getString(Configuration conf) { + return conf.get(attribute, (String) defaultValue); + } + + public boolean getBoolean(Configuration conf) { + if (conf == null) { + return (Boolean) defaultValue; + } + return conf.getBoolean(attribute, (Boolean) defaultValue); + } + + public double getDouble(Configuration conf) { + String str = conf.get(attribute); + if (str == null) { + return ((Number) defaultValue).doubleValue(); + } + return Double.parseDouble(str); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/bab3ee31/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java index 4e2bd6a..976a84b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java @@ -18,21 +18,11 @@ package org.apache.hadoop.hive.ql.io.orc; -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_PADDING; -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BLOCK_SIZE; -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_BUFFER_SIZE; -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_COMPRESS; -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE; -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE; -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT; - import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; /** @@ -264,44 +254,44 @@ public final class OrcFile { private WriterCallback callback; private EncodingStrategy encodingStrategy; private CompressionStrategy compressionStrategy; - private float paddingTolerance; + private double paddingTolerance; private String bloomFilterColumns; private double bloomFilterFpp; WriterOptions(Configuration conf) { configuration = conf; memoryManagerValue = getMemoryManager(conf); - stripeSizeValue = HiveConf.getLongVar(conf, HIVE_ORC_DEFAULT_STRIPE_SIZE); - blockSizeValue = HiveConf.getLongVar(conf, HIVE_ORC_DEFAULT_BLOCK_SIZE); - rowIndexStrideValue = HiveConf.getIntVar(conf, HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE); - bufferSizeValue = HiveConf.getIntVar(conf, HIVE_ORC_DEFAULT_BUFFER_SIZE); - blockPaddingValue = HiveConf.getBoolVar(conf, HIVE_ORC_DEFAULT_BLOCK_PADDING); - compressValue = CompressionKind.valueOf(HiveConf.getVar(conf, HIVE_ORC_DEFAULT_COMPRESS)); - String versionName = HiveConf.getVar(conf, HIVE_ORC_WRITE_FORMAT); + stripeSizeValue = OrcConf.STRIPE_SIZE.getLong(conf); + blockSizeValue = OrcConf.BLOCK_SIZE.getLong(conf); + rowIndexStrideValue = + (int) OrcConf.ROW_INDEX_STRIDE.getLong(conf); + bufferSizeValue = (int) OrcConf.BUFFER_SIZE.getLong(conf); + blockPaddingValue = OrcConf.BLOCK_PADDING.getBoolean(conf); + compressValue = + CompressionKind.valueOf(OrcConf.COMPRESS.getString(conf)); + String versionName = OrcConf.WRITE_FORMAT.getString(conf); if (versionName == null) { versionValue = Version.CURRENT; } else { versionValue = Version.byName(versionName); } - String enString = - conf.get(HiveConf.ConfVars.HIVE_ORC_ENCODING_STRATEGY.varname); + String enString = OrcConf.ENCODING_STRATEGY.getString(conf); if (enString == null) { encodingStrategy = EncodingStrategy.SPEED; } else { encodingStrategy = EncodingStrategy.valueOf(enString); } - String compString = conf - .get(HiveConf.ConfVars.HIVE_ORC_COMPRESSION_STRATEGY.varname); + String compString = OrcConf.COMPRESSION_STRATEGY.getString(conf); if (compString == null) { compressionStrategy = CompressionStrategy.SPEED; } else { compressionStrategy = CompressionStrategy.valueOf(compString); } - paddingTolerance = conf.getFloat(HiveConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.varname, - HiveConf.ConfVars.HIVE_ORC_BLOCK_PADDING_TOLERANCE.defaultFloatVal); - bloomFilterFpp = BloomFilterIO.DEFAULT_FPP; + paddingTolerance = + OrcConf.BLOCK_PADDING_TOLERANCE.getDouble(conf); + bloomFilterFpp = OrcConf.BLOOM_FILTER_FPP.getDouble(conf); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/bab3ee31/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java index 1f29085..6f4f013 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java @@ -147,6 +147,8 @@ public interface Reader { private long length = Long.MAX_VALUE; private SearchArgument sarg = null; private String[] columnNames = null; + private Boolean useZeroCopy = null; + private Boolean skipCorruptRecords = null; /** * Set the list of columns to read. @@ -174,7 +176,7 @@ public interface Reader { * Set search argument for predicate push down. * @param sarg the search argument * @param columnNames the column names for - * @return + * @return this */ public Options searchArgument(SearchArgument sarg, String[] columnNames) { this.sarg = sarg; @@ -182,6 +184,26 @@ public interface Reader { return this; } + /** + * Set whether to use zero copy from HDFS. + * @param value the new zero copy flag + * @return this + */ + public Options useZeroCopy(boolean value) { + this.useZeroCopy = value; + return this; + } + + /** + * Set whether to skip corrupt records. + * @param value the new skip corrupt records flag + * @return this + */ + public Options skipCorruptRecords(boolean value) { + this.skipCorruptRecords = value; + return this; + } + public boolean[] getInclude() { return include; } @@ -210,6 +232,14 @@ public interface Reader { return result; } + public Boolean getUseZeroCopy() { + return useZeroCopy; + } + + public Boolean getSkipCorruptRecords() { + return skipCorruptRecords; + } + public Options clone() { Options result = new Options(); result.include = include; @@ -217,6 +247,8 @@ public interface Reader { result.length = length; result.sarg = sarg; result.columnNames = columnNames; + result.useZeroCopy = useZeroCopy; + result.skipCorruptRecords = skipCorruptRecords; return result; } http://git-wip-us.apache.org/repos/asf/hive/blob/bab3ee31/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index beaf231..4f79e37 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hive.ql.io.orc; -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ORC_ZEROCOPY; - import java.io.IOException; import java.math.BigDecimal; import java.nio.ByteBuffer; @@ -41,7 +39,6 @@ import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.common.DiskRangeList; import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper; import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO; @@ -153,15 +150,15 @@ class RecordReaderImpl implements RecordReader { } protected RecordReaderImpl(List<StripeInformation> stripes, - FileSystem fileSystem, - Path path, - Reader.Options options, - List<OrcProto.Type> types, - CompressionCodec codec, - int bufferSize, - long strideRate, - Configuration conf - ) throws IOException { + FileSystem fileSystem, + Path path, + Reader.Options options, + List<OrcProto.Type> types, + CompressionCodec codec, + int bufferSize, + long strideRate, + Configuration conf + ) throws IOException { this.path = path; this.file = fileSystem.open(path); this.codec = codec; @@ -192,13 +189,19 @@ class RecordReaderImpl implements RecordReader { } } - final boolean zeroCopy = (conf != null) - && (HiveConf.getBoolVar(conf, HIVE_ORC_ZEROCOPY)); + Boolean zeroCopy = options.getUseZeroCopy(); + if (zeroCopy == null) { + zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(conf); + } zcr = zeroCopy ? RecordReaderUtils.createZeroCopyShim(file, codec, pool) : null; firstRow = skippedRows; totalRowCount = rows; - boolean skipCorrupt = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA); + Boolean skipCorrupt = options.getSkipCorruptRecords(); + if (skipCorrupt == null) { + skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf); + } + reader = RecordReaderFactory.createTreeReader(0, conf, types, included, skipCorrupt); indexes = new OrcProto.RowIndex[types.size()]; bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()]; http://git-wip-us.apache.org/repos/asf/hive/blob/bab3ee31/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java index ee6110e..7aa8d65 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java @@ -40,7 +40,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO; import org.apache.hadoop.hive.ql.io.orc.CompressionCodec.Modifier; @@ -127,7 +126,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { private final boolean addBlockPadding; private final int bufferSize; private final long blockSize; - private final float paddingTolerance; + private final double paddingTolerance; // the streams that make up the current stripe private final Map<StreamName, BufferedStream> streams = new TreeMap<StreamName, BufferedStream>(); @@ -176,7 +175,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { OrcFile.WriterCallback callback, EncodingStrategy encodingStrategy, CompressionStrategy compressionStrategy, - float paddingTolerance, + double paddingTolerance, long blockSizeValue, String bloomFilterColumnNames, double bloomFilterFpp) throws IOException { @@ -315,8 +314,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { // the assumption is only one ORC writer open at a time, which holds true for // most of the cases. HIVE-6455 forces single writer case. private long getMemoryAvailableForORC() { - HiveConf.ConfVars poolVar = HiveConf.ConfVars.HIVE_ORC_FILE_MEMORY_POOL; - double maxLoad = conf.getFloat(poolVar.varname, poolVar.defaultFloatVal); + double maxLoad = OrcConf.MEMORY_POOL.getDouble(conf); long totalMemoryPool = Math.round(ManagementFactory.getMemoryMXBean(). getHeapMemoryUsage().getMax() * maxLoad); return totalMemoryPool; @@ -1178,7 +1176,7 @@ public class WriterImpl implements Writer, MemoryManager.Callback { private final List<Long> rowIndexValueCount = new ArrayList<Long>(); // If the number of keys in a dictionary is greater than this fraction of //the total number of non-null rows, turn off dictionary encoding - private final float dictionaryKeySizeThreshold; + private final double dictionaryKeySizeThreshold; private boolean useDictionaryEncoding = true; private boolean isDirectV2 = true; private boolean doneDictionaryCheck; @@ -1202,14 +1200,11 @@ public class WriterImpl implements Writer, MemoryManager.Callback { directStreamOutput = writer.createStream(id, OrcProto.Stream.Kind.DATA); directLengthOutput = createIntegerWriter(writer.createStream(id, OrcProto.Stream.Kind.LENGTH), false, isDirectV2, writer); - dictionaryKeySizeThreshold = writer.getConfiguration().getFloat( - HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.varname, - HiveConf.ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD. - defaultFloatVal); - strideDictionaryCheck = writer.getConfiguration().getBoolean( - HiveConf.ConfVars.HIVE_ORC_ROW_INDEX_STRIDE_DICTIONARY_CHECK.varname, - HiveConf.ConfVars.HIVE_ORC_ROW_INDEX_STRIDE_DICTIONARY_CHECK. - defaultBoolVal); + Configuration conf = writer.getConfiguration(); + dictionaryKeySizeThreshold = + OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf); + strideDictionaryCheck = + OrcConf.ROW_INDEX_STRIDE_DICTIONARY_CHECK.getBoolean(conf); doneDictionaryCheck = false; } @@ -2189,8 +2184,8 @@ public class WriterImpl implements Writer, MemoryManager.Callback { // and user specified padding tolerance. Since stripe size can overflow // the default stripe size we should apply this correction to avoid // writing portion of last stripe to next hdfs block. - float correction = overflow > 0 ? (float) overflow - / (float) adjustedStripeSize : 0.0f; + double correction = overflow > 0 ? (double) overflow + / (double) adjustedStripeSize : 0.0; // correction should not be greater than user specified padding // tolerance