http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java b/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java deleted file mode 100644 index 9433e54..0000000 --- a/orc/src/java/org/apache/orc/impl/RecordReaderImpl.java +++ /dev/null @@ -1,1230 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.orc.impl; - -import java.io.IOException; -import java.math.BigDecimal; -import java.sql.Date; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.orc.BooleanColumnStatistics; -import org.apache.orc.Reader; -import org.apache.orc.RecordReader; -import org.apache.orc.TypeDescription; -import org.apache.orc.ColumnStatistics; -import org.apache.orc.CompressionCodec; -import org.apache.orc.DataReader; -import org.apache.orc.DateColumnStatistics; -import org.apache.orc.DecimalColumnStatistics; -import org.apache.orc.DoubleColumnStatistics; -import org.apache.orc.IntegerColumnStatistics; -import org.apache.orc.OrcConf; -import org.apache.orc.StringColumnStatistics; -import org.apache.orc.StripeInformation; -import org.apache.orc.TimestampColumnStatistics; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.io.DiskRange; -import org.apache.hadoop.hive.common.io.DiskRangeList; -import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper; -import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.orc.BloomFilterIO; -import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; -import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; -import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; -import org.apache.hadoop.hive.serde2.io.DateWritable; -import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; -import org.apache.hadoop.hive.ql.util.TimestampUtils; -import org.apache.hadoop.io.Text; -import org.apache.orc.OrcProto; - -public class RecordReaderImpl implements RecordReader { - static final Logger LOG = LoggerFactory.getLogger(RecordReaderImpl.class); - private static final boolean isLogDebugEnabled = LOG.isDebugEnabled(); - private static final Object UNKNOWN_VALUE = new Object(); - protected final Path path; - private final long firstRow; - private final List<StripeInformation> stripes = - new ArrayList<StripeInformation>(); - private OrcProto.StripeFooter stripeFooter; - private final long totalRowCount; - private final CompressionCodec codec; - protected final TypeDescription schema; - private final List<OrcProto.Type> types; - private final int bufferSize; - private final SchemaEvolution evolution; - // the file included columns indexed by the file's column ids. - private final boolean[] included; - private final long rowIndexStride; - private long rowInStripe = 0; - private int currentStripe = -1; - private long rowBaseInStripe = 0; - private long rowCountInStripe = 0; - private final Map<StreamName, InStream> streams = - new HashMap<StreamName, InStream>(); - DiskRangeList bufferChunks = null; - private final TreeReaderFactory.TreeReader reader; - private final OrcProto.RowIndex[] indexes; - private final OrcProto.BloomFilterIndex[] bloomFilterIndices; - private final SargApplier sargApp; - // an array about which row groups aren't skipped - private boolean[] includedRowGroups = null; - private final DataReader dataReader; - - /** - * Given a list of column names, find the given column and return the index. - * - * @param evolution the mapping from reader to file schema - * @param columnName the column name to look for - * @return the file column id or -1 if the column wasn't found - */ - static int findColumns(SchemaEvolution evolution, - String columnName) { - TypeDescription readerSchema = evolution.getReaderBaseSchema(); - List<String> fieldNames = readerSchema.getFieldNames(); - List<TypeDescription> children = readerSchema.getChildren(); - for (int i = 0; i < fieldNames.size(); ++i) { - if (columnName.equals(fieldNames.get(i))) { - TypeDescription result = evolution.getFileType(children.get(i).getId()); - return result == null ? -1 : result.getId(); - } - } - return -1; - } - - /** - * Find the mapping from predicate leaves to columns. - * @param sargLeaves the search argument that we need to map - * @param evolution the mapping from reader to file schema - * @return an array mapping the sarg leaves to file column ids - */ - public static int[] mapSargColumnsToOrcInternalColIdx(List<PredicateLeaf> sargLeaves, - SchemaEvolution evolution) { - int[] result = new int[sargLeaves.size()]; - Arrays.fill(result, -1); - for(int i=0; i < result.length; ++i) { - String colName = sargLeaves.get(i).getColumnName(); - result[i] = findColumns(evolution, colName); - } - return result; - } - - protected RecordReaderImpl(ReaderImpl fileReader, - Reader.Options options) throws IOException { - boolean[] readerIncluded = options.getInclude(); - if (options.getSchema() == null) { - if (LOG.isInfoEnabled()) { - LOG.info("Reader schema not provided -- using file schema " + - fileReader.getSchema()); - } - evolution = new SchemaEvolution(fileReader.getSchema(), readerIncluded); - } else { - - // Now that we are creating a record reader for a file, validate that the schema to read - // is compatible with the file schema. - // - evolution = new SchemaEvolution(fileReader.getSchema(), - options.getSchema(), readerIncluded); - if (LOG.isDebugEnabled() && evolution.hasConversion()) { - LOG.debug("ORC file " + fileReader.path.toString() + - " has data type conversion --\n" + - "reader schema: " + options.getSchema().toString() + "\n" + - "file schema: " + fileReader.getSchema()); - } - } - this.schema = evolution.getReaderSchema(); - this.path = fileReader.path; - this.codec = fileReader.codec; - this.types = fileReader.types; - this.bufferSize = fileReader.bufferSize; - this.rowIndexStride = fileReader.rowIndexStride; - SearchArgument sarg = options.getSearchArgument(); - if (sarg != null && rowIndexStride != 0) { - sargApp = new SargApplier(sarg, options.getColumnNames(), rowIndexStride, - evolution); - } else { - sargApp = null; - } - long rows = 0; - long skippedRows = 0; - long offset = options.getOffset(); - long maxOffset = options.getMaxOffset(); - for(StripeInformation stripe: fileReader.getStripes()) { - long stripeStart = stripe.getOffset(); - if (offset > stripeStart) { - skippedRows += stripe.getNumberOfRows(); - } else if (stripeStart < maxOffset) { - this.stripes.add(stripe); - rows += stripe.getNumberOfRows(); - } - } - - Boolean zeroCopy = options.getUseZeroCopy(); - if (zeroCopy == null) { - zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(fileReader.conf); - } - if (options.getDataReader() != null) { - this.dataReader = options.getDataReader(); - } else { - this.dataReader = RecordReaderUtils.createDefaultDataReader( - DataReaderProperties.builder() - .withBufferSize(bufferSize) - .withCompression(fileReader.compressionKind) - .withFileSystem(fileReader.fileSystem) - .withPath(fileReader.path) - .withTypeCount(types.size()) - .withZeroCopy(zeroCopy) - .build()); - } - this.dataReader.open(); - - firstRow = skippedRows; - totalRowCount = rows; - Boolean skipCorrupt = options.getSkipCorruptRecords(); - if (skipCorrupt == null) { - skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf); - } - - reader = TreeReaderFactory.createTreeReader(evolution.getReaderSchema(), - evolution, readerIncluded, skipCorrupt); - indexes = new OrcProto.RowIndex[types.size()]; - bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()]; - this.included = evolution.getFileIncluded(); - advanceToNextRow(reader, 0L, true); - } - - public static final class PositionProviderImpl implements PositionProvider { - private final OrcProto.RowIndexEntry entry; - private int index; - - public PositionProviderImpl(OrcProto.RowIndexEntry entry) { - this(entry, 0); - } - - public PositionProviderImpl(OrcProto.RowIndexEntry entry, int startPos) { - this.entry = entry; - this.index = startPos; - } - - @Override - public long getNext() { - return entry.getPositions(index++); - } - - @Override - public String toString() { - return "{" + entry.getPositionsList() + "; " + index + "}"; - } - } - - public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe - ) throws IOException { - return dataReader.readStripeFooter(stripe); - } - - enum Location { - BEFORE, MIN, MIDDLE, MAX, AFTER - } - - /** - * Given a point and min and max, determine if the point is before, at the - * min, in the middle, at the max, or after the range. - * @param point the point to test - * @param min the minimum point - * @param max the maximum point - * @param <T> the type of the comparision - * @return the location of the point - */ - static <T> Location compareToRange(Comparable<T> point, T min, T max) { - int minCompare = point.compareTo(min); - if (minCompare < 0) { - return Location.BEFORE; - } else if (minCompare == 0) { - return Location.MIN; - } - int maxCompare = point.compareTo(max); - if (maxCompare > 0) { - return Location.AFTER; - } else if (maxCompare == 0) { - return Location.MAX; - } - return Location.MIDDLE; - } - - /** - * Get the maximum value out of an index entry. - * @param index - * the index entry - * @return the object for the maximum value or null if there isn't one - */ - static Object getMax(ColumnStatistics index) { - if (index instanceof IntegerColumnStatistics) { - return ((IntegerColumnStatistics) index).getMaximum(); - } else if (index instanceof DoubleColumnStatistics) { - return ((DoubleColumnStatistics) index).getMaximum(); - } else if (index instanceof StringColumnStatistics) { - return ((StringColumnStatistics) index).getMaximum(); - } else if (index instanceof DateColumnStatistics) { - return ((DateColumnStatistics) index).getMaximum(); - } else if (index instanceof DecimalColumnStatistics) { - return ((DecimalColumnStatistics) index).getMaximum(); - } else if (index instanceof TimestampColumnStatistics) { - return ((TimestampColumnStatistics) index).getMaximum(); - } else if (index instanceof BooleanColumnStatistics) { - if (((BooleanColumnStatistics)index).getTrueCount()!=0) { - return Boolean.TRUE; - } else { - return Boolean.FALSE; - } - } else { - return null; - } - } - - /** - * Get the minimum value out of an index entry. - * @param index - * the index entry - * @return the object for the minimum value or null if there isn't one - */ - static Object getMin(ColumnStatistics index) { - if (index instanceof IntegerColumnStatistics) { - return ((IntegerColumnStatistics) index).getMinimum(); - } else if (index instanceof DoubleColumnStatistics) { - return ((DoubleColumnStatistics) index).getMinimum(); - } else if (index instanceof StringColumnStatistics) { - return ((StringColumnStatistics) index).getMinimum(); - } else if (index instanceof DateColumnStatistics) { - return ((DateColumnStatistics) index).getMinimum(); - } else if (index instanceof DecimalColumnStatistics) { - return ((DecimalColumnStatistics) index).getMinimum(); - } else if (index instanceof TimestampColumnStatistics) { - return ((TimestampColumnStatistics) index).getMinimum(); - } else if (index instanceof BooleanColumnStatistics) { - if (((BooleanColumnStatistics)index).getFalseCount()!=0) { - return Boolean.FALSE; - } else { - return Boolean.TRUE; - } - } else { - return UNKNOWN_VALUE; // null is not safe here - } - } - - /** - * Evaluate a predicate with respect to the statistics from the column - * that is referenced in the predicate. - * @param statsProto the statistics for the column mentioned in the predicate - * @param predicate the leaf predicate we need to evaluation - * @param bloomFilter - * @return the set of truth values that may be returned for the given - * predicate. - */ - static TruthValue evaluatePredicateProto(OrcProto.ColumnStatistics statsProto, - PredicateLeaf predicate, OrcProto.BloomFilter bloomFilter) { - ColumnStatistics cs = ColumnStatisticsImpl.deserialize(statsProto); - Object minValue = getMin(cs); - Object maxValue = getMax(cs); - BloomFilterIO bf = null; - if (bloomFilter != null) { - bf = new BloomFilterIO(bloomFilter); - } - return evaluatePredicateRange(predicate, minValue, maxValue, cs.hasNull(), bf); - } - - /** - * Evaluate a predicate with respect to the statistics from the column - * that is referenced in the predicate. - * @param stats the statistics for the column mentioned in the predicate - * @param predicate the leaf predicate we need to evaluation - * @return the set of truth values that may be returned for the given - * predicate. - */ - public static TruthValue evaluatePredicate(ColumnStatistics stats, - PredicateLeaf predicate, - BloomFilterIO bloomFilter) { - Object minValue = getMin(stats); - Object maxValue = getMax(stats); - return evaluatePredicateRange(predicate, minValue, maxValue, stats.hasNull(), bloomFilter); - } - - static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min, - Object max, boolean hasNull, BloomFilterIO bloomFilter) { - // if we didn't have any values, everything must have been null - if (min == null) { - if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) { - return TruthValue.YES; - } else { - return TruthValue.NULL; - } - } else if (min == UNKNOWN_VALUE) { - return TruthValue.YES_NO_NULL; - } - - TruthValue result; - Object baseObj = predicate.getLiteral(); - try { - // Predicate object and stats objects are converted to the type of the predicate object. - Object minValue = getBaseObjectForComparison(predicate.getType(), min); - Object maxValue = getBaseObjectForComparison(predicate.getType(), max); - Object predObj = getBaseObjectForComparison(predicate.getType(), baseObj); - - result = evaluatePredicateMinMax(predicate, predObj, minValue, maxValue, hasNull); - if (shouldEvaluateBloomFilter(predicate, result, bloomFilter)) { - result = evaluatePredicateBloomFilter(predicate, predObj, bloomFilter, hasNull); - } - // in case failed conversion, return the default YES_NO_NULL truth value - } catch (Exception e) { - if (LOG.isWarnEnabled()) { - final String statsType = min == null ? - (max == null ? "null" : max.getClass().getSimpleName()) : - min.getClass().getSimpleName(); - final String predicateType = baseObj == null ? "null" : baseObj.getClass().getSimpleName(); - final String reason = e.getClass().getSimpleName() + " when evaluating predicate." + - " Skipping ORC PPD." + - " Exception: " + e.getMessage() + - " StatsType: " + statsType + - " PredicateType: " + predicateType; - LOG.warn(reason); - LOG.debug(reason, e); - } - if (predicate.getOperator().equals(PredicateLeaf.Operator.NULL_SAFE_EQUALS) || !hasNull) { - result = TruthValue.YES_NO; - } else { - result = TruthValue.YES_NO_NULL; - } - } - return result; - } - - private static boolean shouldEvaluateBloomFilter(PredicateLeaf predicate, - TruthValue result, BloomFilterIO bloomFilter) { - // evaluate bloom filter only when - // 1) Bloom filter is available - // 2) Min/Max evaluation yield YES or MAYBE - // 3) Predicate is EQUALS or IN list - if (bloomFilter != null - && result != TruthValue.NO_NULL && result != TruthValue.NO - && (predicate.getOperator().equals(PredicateLeaf.Operator.EQUALS) - || predicate.getOperator().equals(PredicateLeaf.Operator.NULL_SAFE_EQUALS) - || predicate.getOperator().equals(PredicateLeaf.Operator.IN))) { - return true; - } - return false; - } - - private static TruthValue evaluatePredicateMinMax(PredicateLeaf predicate, Object predObj, - Object minValue, - Object maxValue, - boolean hasNull) { - Location loc; - - switch (predicate.getOperator()) { - case NULL_SAFE_EQUALS: - loc = compareToRange((Comparable) predObj, minValue, maxValue); - if (loc == Location.BEFORE || loc == Location.AFTER) { - return TruthValue.NO; - } else { - return TruthValue.YES_NO; - } - case EQUALS: - loc = compareToRange((Comparable) predObj, minValue, maxValue); - if (minValue.equals(maxValue) && loc == Location.MIN) { - return hasNull ? TruthValue.YES_NULL : TruthValue.YES; - } else if (loc == Location.BEFORE || loc == Location.AFTER) { - return hasNull ? TruthValue.NO_NULL : TruthValue.NO; - } else { - return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO; - } - case LESS_THAN: - loc = compareToRange((Comparable) predObj, minValue, maxValue); - if (loc == Location.AFTER) { - return hasNull ? TruthValue.YES_NULL : TruthValue.YES; - } else if (loc == Location.BEFORE || loc == Location.MIN) { - return hasNull ? TruthValue.NO_NULL : TruthValue.NO; - } else { - return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO; - } - case LESS_THAN_EQUALS: - loc = compareToRange((Comparable) predObj, minValue, maxValue); - if (loc == Location.AFTER || loc == Location.MAX) { - return hasNull ? TruthValue.YES_NULL : TruthValue.YES; - } else if (loc == Location.BEFORE) { - return hasNull ? TruthValue.NO_NULL : TruthValue.NO; - } else { - return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO; - } - case IN: - if (minValue.equals(maxValue)) { - // for a single value, look through to see if that value is in the - // set - for (Object arg : predicate.getLiteralList()) { - predObj = getBaseObjectForComparison(predicate.getType(), arg); - loc = compareToRange((Comparable) predObj, minValue, maxValue); - if (loc == Location.MIN) { - return hasNull ? TruthValue.YES_NULL : TruthValue.YES; - } - } - return hasNull ? TruthValue.NO_NULL : TruthValue.NO; - } else { - // are all of the values outside of the range? - for (Object arg : predicate.getLiteralList()) { - predObj = getBaseObjectForComparison(predicate.getType(), arg); - loc = compareToRange((Comparable) predObj, minValue, maxValue); - if (loc == Location.MIN || loc == Location.MIDDLE || - loc == Location.MAX) { - return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO; - } - } - return hasNull ? TruthValue.NO_NULL : TruthValue.NO; - } - case BETWEEN: - List<Object> args = predicate.getLiteralList(); - Object predObj1 = getBaseObjectForComparison(predicate.getType(), args.get(0)); - - loc = compareToRange((Comparable) predObj1, minValue, maxValue); - if (loc == Location.BEFORE || loc == Location.MIN) { - Object predObj2 = getBaseObjectForComparison(predicate.getType(), args.get(1)); - - Location loc2 = compareToRange((Comparable) predObj2, minValue, maxValue); - if (loc2 == Location.AFTER || loc2 == Location.MAX) { - return hasNull ? TruthValue.YES_NULL : TruthValue.YES; - } else if (loc2 == Location.BEFORE) { - return hasNull ? TruthValue.NO_NULL : TruthValue.NO; - } else { - return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO; - } - } else if (loc == Location.AFTER) { - return hasNull ? TruthValue.NO_NULL : TruthValue.NO; - } else { - return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO; - } - case IS_NULL: - // min = null condition above handles the all-nulls YES case - return hasNull ? TruthValue.YES_NO : TruthValue.NO; - default: - return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO; - } - } - - private static TruthValue evaluatePredicateBloomFilter(PredicateLeaf predicate, - final Object predObj, BloomFilterIO bloomFilter, boolean hasNull) { - switch (predicate.getOperator()) { - case NULL_SAFE_EQUALS: - // null safe equals does not return *_NULL variant. So set hasNull to false - return checkInBloomFilter(bloomFilter, predObj, false); - case EQUALS: - return checkInBloomFilter(bloomFilter, predObj, hasNull); - case IN: - for (Object arg : predicate.getLiteralList()) { - // if atleast one value in IN list exist in bloom filter, qualify the row group/stripe - Object predObjItem = getBaseObjectForComparison(predicate.getType(), arg); - TruthValue result = checkInBloomFilter(bloomFilter, predObjItem, hasNull); - if (result == TruthValue.YES_NO_NULL || result == TruthValue.YES_NO) { - return result; - } - } - return hasNull ? TruthValue.NO_NULL : TruthValue.NO; - default: - return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO; - } - } - - private static TruthValue checkInBloomFilter(BloomFilterIO bf, Object predObj, boolean hasNull) { - TruthValue result = hasNull ? TruthValue.NO_NULL : TruthValue.NO; - - if (predObj instanceof Long) { - if (bf.testLong(((Long) predObj).longValue())) { - result = TruthValue.YES_NO_NULL; - } - } else if (predObj instanceof Double) { - if (bf.testDouble(((Double) predObj).doubleValue())) { - result = TruthValue.YES_NO_NULL; - } - } else if (predObj instanceof String || predObj instanceof Text || - predObj instanceof HiveDecimalWritable || - predObj instanceof BigDecimal) { - if (bf.testString(predObj.toString())) { - result = TruthValue.YES_NO_NULL; - } - } else if (predObj instanceof Timestamp) { - if (bf.testLong(((Timestamp) predObj).getTime())) { - result = TruthValue.YES_NO_NULL; - } - } else if (predObj instanceof Date) { - if (bf.testLong(DateWritable.dateToDays((Date) predObj))) { - result = TruthValue.YES_NO_NULL; - } - } else { - // if the predicate object is null and if hasNull says there are no nulls then return NO - if (predObj == null && !hasNull) { - result = TruthValue.NO; - } else { - result = TruthValue.YES_NO_NULL; - } - } - - if (result == TruthValue.YES_NO_NULL && !hasNull) { - result = TruthValue.YES_NO; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Bloom filter evaluation: " + result.toString()); - } - - return result; - } - - private static Object getBaseObjectForComparison(PredicateLeaf.Type type, Object obj) { - if (obj == null) { - return null; - } - switch (type) { - case BOOLEAN: - if (obj instanceof Boolean) { - return obj; - } else { - // will only be true if the string conversion yields "true", all other values are - // considered false - return Boolean.valueOf(obj.toString()); - } - case DATE: - if (obj instanceof Date) { - return obj; - } else if (obj instanceof String) { - return Date.valueOf((String) obj); - } else if (obj instanceof Timestamp) { - return DateWritable.timeToDate(((Timestamp) obj).getTime() / 1000L); - } - // always string, but prevent the comparison to numbers (are they days/seconds/milliseconds?) - break; - case DECIMAL: - if (obj instanceof Boolean) { - return new HiveDecimalWritable(((Boolean) obj).booleanValue() ? - HiveDecimal.ONE : HiveDecimal.ZERO); - } else if (obj instanceof Integer) { - return new HiveDecimalWritable(((Integer) obj).intValue()); - } else if (obj instanceof Long) { - return new HiveDecimalWritable(((Long) obj)); - } else if (obj instanceof Float || obj instanceof Double || - obj instanceof String) { - return new HiveDecimalWritable(obj.toString()); - } else if (obj instanceof BigDecimal) { - return new HiveDecimalWritable(HiveDecimal.create((BigDecimal) obj)); - } else if (obj instanceof HiveDecimal) { - return new HiveDecimalWritable((HiveDecimal) obj); - } else if (obj instanceof HiveDecimalWritable) { - return obj; - } else if (obj instanceof Timestamp) { - return new HiveDecimalWritable(Double.toString( - TimestampUtils.getDouble((Timestamp) obj))); - } - break; - case FLOAT: - if (obj instanceof Number) { - // widening conversion - return ((Number) obj).doubleValue(); - } else if (obj instanceof HiveDecimal) { - return ((HiveDecimal) obj).doubleValue(); - } else if (obj instanceof String) { - return Double.valueOf(obj.toString()); - } else if (obj instanceof Timestamp) { - return TimestampUtils.getDouble((Timestamp) obj); - } else if (obj instanceof HiveDecimal) { - return ((HiveDecimal) obj).doubleValue(); - } else if (obj instanceof BigDecimal) { - return ((BigDecimal) obj).doubleValue(); - } - break; - case LONG: - if (obj instanceof Number) { - // widening conversion - return ((Number) obj).longValue(); - } else if (obj instanceof HiveDecimal) { - return ((HiveDecimal) obj).longValue(); // TODO: lossy conversion! - } else if (obj instanceof String) { - return Long.valueOf(obj.toString()); - } - break; - case STRING: - if (obj != null) { - return (obj.toString()); - } - break; - case TIMESTAMP: - if (obj instanceof Timestamp) { - return obj; - } else if (obj instanceof Integer) { - return new Timestamp(((Number) obj).longValue()); - } else if (obj instanceof Float) { - return TimestampUtils.doubleToTimestamp(((Float) obj).doubleValue()); - } else if (obj instanceof Double) { - return TimestampUtils.doubleToTimestamp(((Double) obj).doubleValue()); - } else if (obj instanceof HiveDecimal) { - return TimestampUtils.decimalToTimestamp((HiveDecimal) obj); - } else if (obj instanceof HiveDecimalWritable) { - return TimestampUtils.decimalToTimestamp(((HiveDecimalWritable) obj).getHiveDecimal()); - } else if (obj instanceof Date) { - return new Timestamp(((Date) obj).getTime()); - } - // float/double conversion to timestamp is interpreted as seconds whereas integer conversion - // to timestamp is interpreted as milliseconds by default. The integer to timestamp casting - // is also config driven. The filter operator changes its promotion based on config: - // "int.timestamp.conversion.in.seconds". Disable PPD for integer cases. - break; - default: - break; - } - - throw new IllegalArgumentException(String.format( - "ORC SARGS could not convert from %s to %s", obj == null ? "(null)" : obj.getClass() - .getSimpleName(), type)); - } - - public static class SargApplier { - public final static boolean[] READ_ALL_RGS = null; - public final static boolean[] READ_NO_RGS = new boolean[0]; - - private final SearchArgument sarg; - private final List<PredicateLeaf> sargLeaves; - private final int[] filterColumns; - private final long rowIndexStride; - // same as the above array, but indices are set to true - private final boolean[] sargColumns; - private SchemaEvolution evolution; - - public SargApplier(SearchArgument sarg, String[] columnNames, - long rowIndexStride, - SchemaEvolution evolution) { - this.sarg = sarg; - sargLeaves = sarg.getLeaves(); - filterColumns = mapSargColumnsToOrcInternalColIdx(sargLeaves, evolution); - this.rowIndexStride = rowIndexStride; - // included will not be null, row options will fill the array with trues if null - sargColumns = new boolean[evolution.getFileIncluded().length]; - for (int i : filterColumns) { - // filter columns may have -1 as index which could be partition column in SARG. - if (i > 0) { - sargColumns[i] = true; - } - } - this.evolution = evolution; - } - - /** - * Pick the row groups that we need to load from the current stripe. - * - * @return an array with a boolean for each row group or null if all of the - * row groups must be read. - * @throws IOException - */ - public boolean[] pickRowGroups(StripeInformation stripe, OrcProto.RowIndex[] indexes, - OrcProto.BloomFilterIndex[] bloomFilterIndices, boolean returnNone) throws IOException { - long rowsInStripe = stripe.getNumberOfRows(); - int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride); - boolean[] result = new boolean[groupsInStripe]; // TODO: avoid alloc? - TruthValue[] leafValues = new TruthValue[sargLeaves.size()]; - boolean hasSelected = false, hasSkipped = false; - for (int rowGroup = 0; rowGroup < result.length; ++rowGroup) { - for (int pred = 0; pred < leafValues.length; ++pred) { - int columnIx = filterColumns[pred]; - if (columnIx != -1) { - if (indexes[columnIx] == null) { - throw new AssertionError("Index is not populated for " + columnIx); - } - OrcProto.RowIndexEntry entry = indexes[columnIx].getEntry(rowGroup); - if (entry == null) { - throw new AssertionError("RG is not populated for " + columnIx + " rg " + rowGroup); - } - OrcProto.ColumnStatistics stats = entry.getStatistics(); - OrcProto.BloomFilter bf = null; - if (bloomFilterIndices != null && bloomFilterIndices[columnIx] != null) { - bf = bloomFilterIndices[columnIx].getBloomFilter(rowGroup); - } - if (evolution != null && evolution.isPPDSafeConversion(columnIx)) { - leafValues[pred] = evaluatePredicateProto(stats, sargLeaves.get(pred), bf); - } else { - leafValues[pred] = TruthValue.YES_NO_NULL; - } - if (LOG.isTraceEnabled()) { - LOG.trace("Stats = " + stats); - LOG.trace("Setting " + sargLeaves.get(pred) + " to " + leafValues[pred]); - } - } else { - // the column is a virtual column - leafValues[pred] = TruthValue.YES_NO_NULL; - } - } - result[rowGroup] = sarg.evaluate(leafValues).isNeeded(); - hasSelected = hasSelected || result[rowGroup]; - hasSkipped = hasSkipped || (!result[rowGroup]); - if (LOG.isDebugEnabled()) { - LOG.debug("Row group " + (rowIndexStride * rowGroup) + " to " + - (rowIndexStride * (rowGroup + 1) - 1) + " is " + - (result[rowGroup] ? "" : "not ") + "included."); - } - } - - return hasSkipped ? ((hasSelected || !returnNone) ? result : READ_NO_RGS) : READ_ALL_RGS; - } - } - - /** - * Pick the row groups that we need to load from the current stripe. - * - * @return an array with a boolean for each row group or null if all of the - * row groups must be read. - * @throws IOException - */ - protected boolean[] pickRowGroups() throws IOException { - // if we don't have a sarg or indexes, we read everything - if (sargApp == null) { - return null; - } - readRowIndex(currentStripe, included, sargApp.sargColumns); - return sargApp.pickRowGroups(stripes.get(currentStripe), indexes, bloomFilterIndices, false); - } - - private void clearStreams() { - // explicit close of all streams to de-ref ByteBuffers - for (InStream is : streams.values()) { - is.close(); - } - if (bufferChunks != null) { - if (dataReader.isTrackingDiskRanges()) { - for (DiskRangeList range = bufferChunks; range != null; range = range.next) { - if (!(range instanceof BufferChunk)) { - continue; - } - dataReader.releaseBuffer(((BufferChunk) range).getChunk()); - } - } - } - bufferChunks = null; - streams.clear(); - } - - /** - * Read the current stripe into memory. - * - * @throws IOException - */ - private void readStripe() throws IOException { - StripeInformation stripe = beginReadStripe(); - includedRowGroups = pickRowGroups(); - - // move forward to the first unskipped row - if (includedRowGroups != null) { - while (rowInStripe < rowCountInStripe && - !includedRowGroups[(int) (rowInStripe / rowIndexStride)]) { - rowInStripe = Math.min(rowCountInStripe, rowInStripe + rowIndexStride); - } - } - - // if we haven't skipped the whole stripe, read the data - if (rowInStripe < rowCountInStripe) { - // if we aren't projecting columns or filtering rows, just read it all - if (included == null && includedRowGroups == null) { - readAllDataStreams(stripe); - } else { - readPartialDataStreams(stripe); - } - reader.startStripe(streams, stripeFooter); - // if we skipped the first row group, move the pointers forward - if (rowInStripe != 0) { - seekToRowEntry(reader, (int) (rowInStripe / rowIndexStride)); - } - } - } - - private StripeInformation beginReadStripe() throws IOException { - StripeInformation stripe = stripes.get(currentStripe); - stripeFooter = readStripeFooter(stripe); - clearStreams(); - // setup the position in the stripe - rowCountInStripe = stripe.getNumberOfRows(); - rowInStripe = 0; - rowBaseInStripe = 0; - for (int i = 0; i < currentStripe; ++i) { - rowBaseInStripe += stripes.get(i).getNumberOfRows(); - } - // reset all of the indexes - for (int i = 0; i < indexes.length; ++i) { - indexes[i] = null; - } - return stripe; - } - - private void readAllDataStreams(StripeInformation stripe) throws IOException { - long start = stripe.getIndexLength(); - long end = start + stripe.getDataLength(); - // explicitly trigger 1 big read - DiskRangeList toRead = new DiskRangeList(start, end); - bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false); - List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList(); - createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams); - } - - /** - * Plan the ranges of the file that we need to read given the list of - * columns and row groups. - * - * @param streamList the list of streams available - * @param indexes the indexes that have been loaded - * @param includedColumns which columns are needed - * @param includedRowGroups which row groups are needed - * @param isCompressed does the file have generic compression - * @param encodings the encodings for each column - * @param types the types of the columns - * @param compressionSize the compression block size - * @return the list of disk ranges that will be loaded - */ - static DiskRangeList planReadPartialDataStreams - (List<OrcProto.Stream> streamList, - OrcProto.RowIndex[] indexes, - boolean[] includedColumns, - boolean[] includedRowGroups, - boolean isCompressed, - List<OrcProto.ColumnEncoding> encodings, - List<OrcProto.Type> types, - int compressionSize, - boolean doMergeBuffers) { - long offset = 0; - // figure out which columns have a present stream - boolean[] hasNull = RecordReaderUtils.findPresentStreamsByColumn(streamList, types); - CreateHelper list = new CreateHelper(); - for (OrcProto.Stream stream : streamList) { - long length = stream.getLength(); - int column = stream.getColumn(); - OrcProto.Stream.Kind streamKind = stream.getKind(); - // since stream kind is optional, first check if it exists - if (stream.hasKind() && - (StreamName.getArea(streamKind) == StreamName.Area.DATA) && - (column < includedColumns.length && includedColumns[column])) { - // if we aren't filtering or it is a dictionary, load it. - if (includedRowGroups == null - || RecordReaderUtils.isDictionary(streamKind, encodings.get(column))) { - RecordReaderUtils.addEntireStreamToRanges(offset, length, list, doMergeBuffers); - } else { - RecordReaderUtils.addRgFilteredStreamToRanges(stream, includedRowGroups, - isCompressed, indexes[column], encodings.get(column), types.get(column), - compressionSize, hasNull[column], offset, length, list, doMergeBuffers); - } - } - offset += length; - } - return list.extract(); - } - - void createStreams(List<OrcProto.Stream> streamDescriptions, - DiskRangeList ranges, - boolean[] includeColumn, - CompressionCodec codec, - int bufferSize, - Map<StreamName, InStream> streams) throws IOException { - long streamOffset = 0; - for (OrcProto.Stream streamDesc : streamDescriptions) { - int column = streamDesc.getColumn(); - if ((includeColumn != null && - (column < included.length && !includeColumn[column])) || - streamDesc.hasKind() && - (StreamName.getArea(streamDesc.getKind()) != StreamName.Area.DATA)) { - streamOffset += streamDesc.getLength(); - continue; - } - List<DiskRange> buffers = RecordReaderUtils.getStreamBuffers( - ranges, streamOffset, streamDesc.getLength()); - StreamName name = new StreamName(column, streamDesc.getKind()); - streams.put(name, InStream.create(name.toString(), buffers, - streamDesc.getLength(), codec, bufferSize)); - streamOffset += streamDesc.getLength(); - } - } - - private void readPartialDataStreams(StripeInformation stripe) throws IOException { - List<OrcProto.Stream> streamList = stripeFooter.getStreamsList(); - DiskRangeList toRead = planReadPartialDataStreams(streamList, - indexes, included, includedRowGroups, codec != null, - stripeFooter.getColumnsList(), types, bufferSize, true); - if (LOG.isDebugEnabled()) { - LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(toRead)); - } - bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false); - if (LOG.isDebugEnabled()) { - LOG.debug("merge = " + RecordReaderUtils.stringifyDiskRanges(bufferChunks)); - } - - createStreams(streamList, bufferChunks, included, codec, bufferSize, streams); - } - - /** - * Read the next stripe until we find a row that we don't skip. - * - * @throws IOException - */ - private void advanceStripe() throws IOException { - rowInStripe = rowCountInStripe; - while (rowInStripe >= rowCountInStripe && - currentStripe < stripes.size() - 1) { - currentStripe += 1; - readStripe(); - } - } - - /** - * Skip over rows that we aren't selecting, so that the next row is - * one that we will read. - * - * @param nextRow the row we want to go to - * @throws IOException - */ - private boolean advanceToNextRow( - TreeReaderFactory.TreeReader reader, long nextRow, boolean canAdvanceStripe) - throws IOException { - long nextRowInStripe = nextRow - rowBaseInStripe; - // check for row skipping - if (rowIndexStride != 0 && - includedRowGroups != null && - nextRowInStripe < rowCountInStripe) { - int rowGroup = (int) (nextRowInStripe / rowIndexStride); - if (!includedRowGroups[rowGroup]) { - while (rowGroup < includedRowGroups.length && !includedRowGroups[rowGroup]) { - rowGroup += 1; - } - if (rowGroup >= includedRowGroups.length) { - if (canAdvanceStripe) { - advanceStripe(); - } - return canAdvanceStripe; - } - nextRowInStripe = Math.min(rowCountInStripe, rowGroup * rowIndexStride); - } - } - if (nextRowInStripe >= rowCountInStripe) { - if (canAdvanceStripe) { - advanceStripe(); - } - return canAdvanceStripe; - } - if (nextRowInStripe != rowInStripe) { - if (rowIndexStride != 0) { - int rowGroup = (int) (nextRowInStripe / rowIndexStride); - seekToRowEntry(reader, rowGroup); - reader.skipRows(nextRowInStripe - rowGroup * rowIndexStride); - } else { - reader.skipRows(nextRowInStripe - rowInStripe); - } - rowInStripe = nextRowInStripe; - } - return true; - } - - @Override - public boolean nextBatch(VectorizedRowBatch batch) throws IOException { - try { - if (rowInStripe >= rowCountInStripe) { - currentStripe += 1; - if (currentStripe >= stripes.size()) { - batch.size = 0; - return false; - } - readStripe(); - } - - int batchSize = computeBatchSize(batch.getMaxSize()); - - rowInStripe += batchSize; - reader.setVectorColumnCount(batch.getDataColumnCount()); - reader.nextBatch(batch, batchSize); - batch.selectedInUse = false; - batch.size = batchSize; - advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true); - return batch.size != 0; - } catch (IOException e) { - // Rethrow exception with file name in log message - throw new IOException("Error reading file: " + path, e); - } - } - - private int computeBatchSize(long targetBatchSize) { - final int batchSize; - // In case of PPD, batch size should be aware of row group boundaries. If only a subset of row - // groups are selected then marker position is set to the end of range (subset of row groups - // within strip). Batch size computed out of marker position makes sure that batch size is - // aware of row group boundary and will not cause overflow when reading rows - // illustration of this case is here https://issues.apache.org/jira/browse/HIVE-6287 - if (rowIndexStride != 0 && includedRowGroups != null && rowInStripe < rowCountInStripe) { - int startRowGroup = (int) (rowInStripe / rowIndexStride); - if (!includedRowGroups[startRowGroup]) { - while (startRowGroup < includedRowGroups.length && !includedRowGroups[startRowGroup]) { - startRowGroup += 1; - } - } - - int endRowGroup = startRowGroup; - while (endRowGroup < includedRowGroups.length && includedRowGroups[endRowGroup]) { - endRowGroup += 1; - } - - final long markerPosition = - (endRowGroup * rowIndexStride) < rowCountInStripe ? (endRowGroup * rowIndexStride) - : rowCountInStripe; - batchSize = (int) Math.min(targetBatchSize, (markerPosition - rowInStripe)); - - if (isLogDebugEnabled && batchSize < targetBatchSize) { - LOG.debug("markerPosition: " + markerPosition + " batchSize: " + batchSize); - } - } else { - batchSize = (int) Math.min(targetBatchSize, (rowCountInStripe - rowInStripe)); - } - return batchSize; - } - - @Override - public void close() throws IOException { - clearStreams(); - dataReader.close(); - } - - @Override - public long getRowNumber() { - return rowInStripe + rowBaseInStripe + firstRow; - } - - /** - * Return the fraction of rows that have been read from the selected. - * section of the file - * - * @return fraction between 0.0 and 1.0 of rows consumed - */ - @Override - public float getProgress() { - return ((float) rowBaseInStripe + rowInStripe) / totalRowCount; - } - - private int findStripe(long rowNumber) { - for (int i = 0; i < stripes.size(); i++) { - StripeInformation stripe = stripes.get(i); - if (stripe.getNumberOfRows() > rowNumber) { - return i; - } - rowNumber -= stripe.getNumberOfRows(); - } - throw new IllegalArgumentException("Seek after the end of reader range"); - } - - public OrcIndex readRowIndex(int stripeIndex, boolean[] included, - boolean[] sargColumns) throws IOException { - return readRowIndex(stripeIndex, included, null, null, sargColumns); - } - - public OrcIndex readRowIndex(int stripeIndex, boolean[] included, - OrcProto.RowIndex[] indexes, - OrcProto.BloomFilterIndex[] bloomFilterIndex, - boolean[] sargColumns) throws IOException { - StripeInformation stripe = stripes.get(stripeIndex); - OrcProto.StripeFooter stripeFooter = null; - // if this is the current stripe, use the cached objects. - if (stripeIndex == currentStripe) { - stripeFooter = this.stripeFooter; - indexes = indexes == null ? this.indexes : indexes; - bloomFilterIndex = bloomFilterIndex == null ? this.bloomFilterIndices : bloomFilterIndex; - sargColumns = sargColumns == null ? - (sargApp == null ? null : sargApp.sargColumns) : sargColumns; - } - return dataReader.readRowIndex(stripe, stripeFooter, included, indexes, sargColumns, - bloomFilterIndex); - } - - private void seekToRowEntry(TreeReaderFactory.TreeReader reader, int rowEntry) - throws IOException { - PositionProvider[] index = new PositionProvider[indexes.length]; - for (int i = 0; i < indexes.length; ++i) { - if (indexes[i] != null) { - index[i] = new PositionProviderImpl(indexes[i].getEntry(rowEntry)); - } - } - reader.seek(index); - } - - @Override - public void seekToRow(long rowNumber) throws IOException { - if (rowNumber < 0) { - throw new IllegalArgumentException("Seek to a negative row number " + - rowNumber); - } else if (rowNumber < firstRow) { - throw new IllegalArgumentException("Seek before reader range " + - rowNumber); - } - // convert to our internal form (rows from the beginning of slice) - rowNumber -= firstRow; - - // move to the right stripe - int rightStripe = findStripe(rowNumber); - if (rightStripe != currentStripe) { - currentStripe = rightStripe; - readStripe(); - } - readRowIndex(currentStripe, included, sargApp == null ? null : sargApp.sargColumns); - - // if we aren't to the right row yet, advance in the stripe. - advanceToNextRow(reader, rowNumber, true); - } - - private static final String TRANSLATED_SARG_SEPARATOR = "_"; - public static String encodeTranslatedSargColumn(int rootColumn, Integer indexInSourceTable) { - return rootColumn + TRANSLATED_SARG_SEPARATOR - + ((indexInSourceTable == null) ? -1 : indexInSourceTable); - } - - public static int[] mapTranslatedSargColumns( - List<OrcProto.Type> types, List<PredicateLeaf> sargLeaves) { - int[] result = new int[sargLeaves.size()]; - OrcProto.Type lastRoot = null; // Root will be the same for everyone as of now. - String lastRootStr = null; - for (int i = 0; i < result.length; ++i) { - String[] rootAndIndex = sargLeaves.get(i).getColumnName().split(TRANSLATED_SARG_SEPARATOR); - assert rootAndIndex.length == 2; - String rootStr = rootAndIndex[0], indexStr = rootAndIndex[1]; - int index = Integer.parseInt(indexStr); - // First, check if the column even maps to anything. - if (index == -1) { - result[i] = -1; - continue; - } - assert index >= 0; - // Then, find the root type if needed. - if (!rootStr.equals(lastRootStr)) { - lastRoot = types.get(Integer.parseInt(rootStr)); - lastRootStr = rootStr; - } - // Subtypes of the root types correspond, in order, to the columns in the table schema - // (disregarding schema evolution that doesn't presently work). Get the index for the - // corresponding subtype. - result[i] = lastRoot.getSubtypes(index); - } - return result; - } -}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java b/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java deleted file mode 100644 index 6100d50..0000000 --- a/orc/src/java/org/apache/orc/impl/RecordReaderUtils.java +++ /dev/null @@ -1,578 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.orc.impl; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - -import com.google.common.collect.Lists; -import org.apache.commons.lang.builder.HashCodeBuilder; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.io.DiskRange; -import org.apache.hadoop.hive.common.io.DiskRangeList; -import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper; -import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper; -import org.apache.orc.CompressionCodec; -import org.apache.orc.DataReader; -import org.apache.orc.OrcProto; - -import com.google.common.collect.ComparisonChain; -import org.apache.orc.StripeInformation; - -/** - * Stateless methods shared between RecordReaderImpl and EncodedReaderImpl. - */ -public class RecordReaderUtils { - private static final HadoopShims SHIMS = HadoopShims.Factory.get(); - - private static class DefaultDataReader implements DataReader { - private FSDataInputStream file = null; - private final ByteBufferAllocatorPool pool; - private HadoopShims.ZeroCopyReaderShim zcr = null; - private final FileSystem fs; - private final Path path; - private final boolean useZeroCopy; - private final CompressionCodec codec; - private final int bufferSize; - private final int typeCount; - - private DefaultDataReader(DefaultDataReader other) { - this.pool = other.pool; - this.bufferSize = other.bufferSize; - this.typeCount = other.typeCount; - this.fs = other.fs; - this.path = other.path; - this.useZeroCopy = other.useZeroCopy; - this.codec = other.codec; - } - - private DefaultDataReader(DataReaderProperties properties) { - this.fs = properties.getFileSystem(); - this.path = properties.getPath(); - this.useZeroCopy = properties.getZeroCopy(); - this.codec = PhysicalFsWriter.createCodec(properties.getCompression()); - this.bufferSize = properties.getBufferSize(); - this.typeCount = properties.getTypeCount(); - if (useZeroCopy) { - this.pool = new ByteBufferAllocatorPool(); - } else { - this.pool = null; - } - } - - @Override - public void open() throws IOException { - this.file = fs.open(path); - if (useZeroCopy) { - zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool); - } else { - zcr = null; - } - } - - @Override - public OrcIndex readRowIndex(StripeInformation stripe, - OrcProto.StripeFooter footer, - boolean[] included, - OrcProto.RowIndex[] indexes, - boolean[] sargColumns, - OrcProto.BloomFilterIndex[] bloomFilterIndices - ) throws IOException { - if (file == null) { - open(); - } - if (footer == null) { - footer = readStripeFooter(stripe); - } - if (indexes == null) { - indexes = new OrcProto.RowIndex[typeCount]; - } - if (bloomFilterIndices == null) { - bloomFilterIndices = new OrcProto.BloomFilterIndex[typeCount]; - } - long offset = stripe.getOffset(); - List<OrcProto.Stream> streams = footer.getStreamsList(); - for (int i = 0; i < streams.size(); i++) { - OrcProto.Stream stream = streams.get(i); - OrcProto.Stream nextStream = null; - if (i < streams.size() - 1) { - nextStream = streams.get(i+1); - } - int col = stream.getColumn(); - int len = (int) stream.getLength(); - // row index stream and bloom filter are interlaced, check if the sarg column contains bloom - // filter and combine the io to read row index and bloom filters for that column together - if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX)) { - boolean readBloomFilter = false; - if (sargColumns != null && sargColumns[col] && - nextStream.getKind() == OrcProto.Stream.Kind.BLOOM_FILTER) { - len += nextStream.getLength(); - i += 1; - readBloomFilter = true; - } - if ((included == null || included[col]) && indexes[col] == null) { - byte[] buffer = new byte[len]; - file.readFully(offset, buffer, 0, buffer.length); - ByteBuffer bb = ByteBuffer.wrap(buffer); - indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index", - Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), stream.getLength(), - codec, bufferSize)); - if (readBloomFilter) { - bb.position((int) stream.getLength()); - bloomFilterIndices[col] = OrcProto.BloomFilterIndex.parseFrom(InStream.create( - "bloom_filter", Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), - nextStream.getLength(), codec, bufferSize)); - } - } - } - offset += len; - } - - OrcIndex index = new OrcIndex(indexes, bloomFilterIndices); - return index; - } - - @Override - public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException { - if (file == null) { - open(); - } - long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength(); - int tailLength = (int) stripe.getFooterLength(); - - // read the footer - ByteBuffer tailBuf = ByteBuffer.allocate(tailLength); - file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength); - return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer", - Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)), - tailLength, codec, bufferSize)); - } - - @Override - public DiskRangeList readFileData( - DiskRangeList range, long baseOffset, boolean doForceDirect) throws IOException { - return RecordReaderUtils.readDiskRanges(file, zcr, baseOffset, range, doForceDirect); - } - - @Override - public void close() throws IOException { - if (pool != null) { - pool.clear(); - } - // close both zcr and file - try (HadoopShims.ZeroCopyReaderShim myZcr = zcr) { - if (file != null) { - file.close(); - } - } - } - - @Override - public boolean isTrackingDiskRanges() { - return zcr != null; - } - - @Override - public void releaseBuffer(ByteBuffer buffer) { - zcr.releaseBuffer(buffer); - } - - @Override - public DataReader clone() { - return new DefaultDataReader(this); - } - - } - - public static DataReader createDefaultDataReader(DataReaderProperties properties) { - return new DefaultDataReader(properties); - } - - public static boolean[] findPresentStreamsByColumn( - List<OrcProto.Stream> streamList, List<OrcProto.Type> types) { - boolean[] hasNull = new boolean[types.size()]; - for(OrcProto.Stream stream: streamList) { - if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.PRESENT)) { - hasNull[stream.getColumn()] = true; - } - } - return hasNull; - } - - /** - * Does region A overlap region B? The end points are inclusive on both sides. - * @param leftA A's left point - * @param rightA A's right point - * @param leftB B's left point - * @param rightB B's right point - * @return Does region A overlap region B? - */ - static boolean overlap(long leftA, long rightA, long leftB, long rightB) { - if (leftA <= leftB) { - return rightA >= leftB; - } - return rightB >= leftA; - } - - public static void addEntireStreamToRanges( - long offset, long length, CreateHelper list, boolean doMergeBuffers) { - list.addOrMerge(offset, offset + length, doMergeBuffers, false); - } - - public static void addRgFilteredStreamToRanges(OrcProto.Stream stream, - boolean[] includedRowGroups, boolean isCompressed, OrcProto.RowIndex index, - OrcProto.ColumnEncoding encoding, OrcProto.Type type, int compressionSize, boolean hasNull, - long offset, long length, CreateHelper list, boolean doMergeBuffers) { - for (int group = 0; group < includedRowGroups.length; ++group) { - if (!includedRowGroups[group]) continue; - int posn = getIndexPosition( - encoding.getKind(), type.getKind(), stream.getKind(), isCompressed, hasNull); - long start = index.getEntry(group).getPositions(posn); - final long nextGroupOffset; - boolean isLast = group == (includedRowGroups.length - 1); - nextGroupOffset = isLast ? length : index.getEntry(group + 1).getPositions(posn); - - start += offset; - long end = offset + estimateRgEndOffset( - isCompressed, isLast, nextGroupOffset, length, compressionSize); - list.addOrMerge(start, end, doMergeBuffers, true); - } - } - - public static long estimateRgEndOffset(boolean isCompressed, boolean isLast, - long nextGroupOffset, long streamLength, int bufferSize) { - // figure out the worst case last location - // if adjacent groups have the same compressed block offset then stretch the slop - // by factor of 2 to safely accommodate the next compression block. - // One for the current compression block and another for the next compression block. - long slop = isCompressed ? 2 * (OutStream.HEADER_SIZE + bufferSize) : WORST_UNCOMPRESSED_SLOP; - return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + slop); - } - - private static final int BYTE_STREAM_POSITIONS = 1; - private static final int RUN_LENGTH_BYTE_POSITIONS = BYTE_STREAM_POSITIONS + 1; - private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS + 1; - private static final int RUN_LENGTH_INT_POSITIONS = BYTE_STREAM_POSITIONS + 1; - - /** - * Get the offset in the index positions for the column that the given - * stream starts. - * @param columnEncoding the encoding of the column - * @param columnType the type of the column - * @param streamType the kind of the stream - * @param isCompressed is the file compressed - * @param hasNulls does the column have a PRESENT stream? - * @return the number of positions that will be used for that stream - */ - public static int getIndexPosition(OrcProto.ColumnEncoding.Kind columnEncoding, - OrcProto.Type.Kind columnType, - OrcProto.Stream.Kind streamType, - boolean isCompressed, - boolean hasNulls) { - if (streamType == OrcProto.Stream.Kind.PRESENT) { - return 0; - } - int compressionValue = isCompressed ? 1 : 0; - int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0; - switch (columnType) { - case BOOLEAN: - case BYTE: - case SHORT: - case INT: - case LONG: - case FLOAT: - case DOUBLE: - case DATE: - case STRUCT: - case MAP: - case LIST: - case UNION: - return base; - case CHAR: - case VARCHAR: - case STRING: - if (columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY || - columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) { - return base; - } else { - if (streamType == OrcProto.Stream.Kind.DATA) { - return base; - } else { - return base + BYTE_STREAM_POSITIONS + compressionValue; - } - } - case BINARY: - if (streamType == OrcProto.Stream.Kind.DATA) { - return base; - } - return base + BYTE_STREAM_POSITIONS + compressionValue; - case DECIMAL: - if (streamType == OrcProto.Stream.Kind.DATA) { - return base; - } - return base + BYTE_STREAM_POSITIONS + compressionValue; - case TIMESTAMP: - if (streamType == OrcProto.Stream.Kind.DATA) { - return base; - } - return base + RUN_LENGTH_INT_POSITIONS + compressionValue; - default: - throw new IllegalArgumentException("Unknown type " + columnType); - } - } - - // for uncompressed streams, what is the most overlap with the following set - // of rows (long vint literal group). - static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512; - - /** - * Is this stream part of a dictionary? - * @return is this part of a dictionary? - */ - public static boolean isDictionary(OrcProto.Stream.Kind kind, - OrcProto.ColumnEncoding encoding) { - assert kind != OrcProto.Stream.Kind.DICTIONARY_COUNT; - OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind(); - return kind == OrcProto.Stream.Kind.DICTIONARY_DATA || - (kind == OrcProto.Stream.Kind.LENGTH && - (encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY || - encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2)); - } - - /** - * Build a string representation of a list of disk ranges. - * @param range ranges to stringify - * @return the resulting string - */ - public static String stringifyDiskRanges(DiskRangeList range) { - StringBuilder buffer = new StringBuilder(); - buffer.append("["); - boolean isFirst = true; - while (range != null) { - if (!isFirst) { - buffer.append(", {"); - } else { - buffer.append("{"); - } - isFirst = false; - buffer.append(range.toString()); - buffer.append("}"); - range = range.next; - } - buffer.append("]"); - return buffer.toString(); - } - - /** - * Read the list of ranges from the file. - * @param file the file to read - * @param base the base of the stripe - * @param range the disk ranges within the stripe to read - * @return the bytes read for each disk range, which is the same length as - * ranges - * @throws IOException - */ - static DiskRangeList readDiskRanges(FSDataInputStream file, - HadoopShims.ZeroCopyReaderShim zcr, - long base, - DiskRangeList range, - boolean doForceDirect) throws IOException { - if (range == null) return null; - DiskRangeList prev = range.prev; - if (prev == null) { - prev = new MutateHelper(range); - } - while (range != null) { - if (range.hasData()) { - range = range.next; - continue; - } - int len = (int) (range.getEnd() - range.getOffset()); - long off = range.getOffset(); - if (zcr != null) { - file.seek(base + off); - boolean hasReplaced = false; - while (len > 0) { - ByteBuffer partial = zcr.readBuffer(len, false); - BufferChunk bc = new BufferChunk(partial, off); - if (!hasReplaced) { - range.replaceSelfWith(bc); - hasReplaced = true; - } else { - range.insertAfter(bc); - } - range = bc; - int read = partial.remaining(); - len -= read; - off += read; - } - } else { - // Don't use HDFS ByteBuffer API because it has no readFully, and is buggy and pointless. - byte[] buffer = new byte[len]; - file.readFully((base + off), buffer, 0, buffer.length); - ByteBuffer bb = null; - if (doForceDirect) { - bb = ByteBuffer.allocateDirect(len); - bb.put(buffer); - bb.position(0); - bb.limit(len); - } else { - bb = ByteBuffer.wrap(buffer); - } - range = range.replaceSelfWith(new BufferChunk(bb, range.getOffset())); - } - range = range.next; - } - return prev.next; - } - - - static List<DiskRange> getStreamBuffers(DiskRangeList range, long offset, long length) { - // This assumes sorted ranges (as do many other parts of ORC code. - ArrayList<DiskRange> buffers = new ArrayList<DiskRange>(); - if (length == 0) return buffers; - long streamEnd = offset + length; - boolean inRange = false; - while (range != null) { - if (!inRange) { - if (range.getEnd() <= offset) { - range = range.next; - continue; // Skip until we are in range. - } - inRange = true; - if (range.getOffset() < offset) { - // Partial first buffer, add a slice of it. - buffers.add(range.sliceAndShift(offset, Math.min(streamEnd, range.getEnd()), -offset)); - if (range.getEnd() >= streamEnd) break; // Partial first buffer is also partial last buffer. - range = range.next; - continue; - } - } else if (range.getOffset() >= streamEnd) { - break; - } - if (range.getEnd() > streamEnd) { - // Partial last buffer (may also be the first buffer), add a slice of it. - buffers.add(range.sliceAndShift(range.getOffset(), streamEnd, -offset)); - break; - } - // Buffer that belongs entirely to one stream. - // TODO: ideally we would want to reuse the object and remove it from the list, but we cannot - // because bufferChunks is also used by clearStreams for zcr. Create a useless dup. - buffers.add(range.sliceAndShift(range.getOffset(), range.getEnd(), -offset)); - if (range.getEnd() == streamEnd) break; - range = range.next; - } - return buffers; - } - - static HadoopShims.ZeroCopyReaderShim createZeroCopyShim(FSDataInputStream file, - CompressionCodec codec, ByteBufferAllocatorPool pool) throws IOException { - if ((codec == null || ((codec instanceof DirectDecompressionCodec) - && ((DirectDecompressionCodec) codec).isAvailable()))) { - /* codec is null or is available */ - return SHIMS.getZeroCopyReader(file, pool); - } - return null; - } - - // this is an implementation copied from ElasticByteBufferPool in hadoop-2, - // which lacks a clear()/clean() operation - public final static class ByteBufferAllocatorPool implements HadoopShims.ByteBufferPoolShim { - private static final class Key implements Comparable<Key> { - private final int capacity; - private final long insertionGeneration; - - Key(int capacity, long insertionGeneration) { - this.capacity = capacity; - this.insertionGeneration = insertionGeneration; - } - - @Override - public int compareTo(Key other) { - return ComparisonChain.start().compare(capacity, other.capacity) - .compare(insertionGeneration, other.insertionGeneration).result(); - } - - @Override - public boolean equals(Object rhs) { - if (rhs == null) { - return false; - } - try { - Key o = (Key) rhs; - return (compareTo(o) == 0); - } catch (ClassCastException e) { - return false; - } - } - - @Override - public int hashCode() { - return new HashCodeBuilder().append(capacity).append(insertionGeneration) - .toHashCode(); - } - } - - private final TreeMap<Key, ByteBuffer> buffers = new TreeMap<Key, ByteBuffer>(); - - private final TreeMap<Key, ByteBuffer> directBuffers = new TreeMap<Key, ByteBuffer>(); - - private long currentGeneration = 0; - - private final TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) { - return direct ? directBuffers : buffers; - } - - public void clear() { - buffers.clear(); - directBuffers.clear(); - } - - @Override - public ByteBuffer getBuffer(boolean direct, int length) { - TreeMap<Key, ByteBuffer> tree = getBufferTree(direct); - Map.Entry<Key, ByteBuffer> entry = tree.ceilingEntry(new Key(length, 0)); - if (entry == null) { - return direct ? ByteBuffer.allocateDirect(length) : ByteBuffer - .allocate(length); - } - tree.remove(entry.getKey()); - return entry.getValue(); - } - - @Override - public void putBuffer(ByteBuffer buffer) { - TreeMap<Key, ByteBuffer> tree = getBufferTree(buffer.isDirect()); - while (true) { - Key key = new Key(buffer.capacity(), currentGeneration++); - if (!tree.containsKey(key)) { - tree.put(key, buffer); - return; - } - // Buffers are indexed by (capacity, generation). - // If our key is not unique on the first try, we try again - } - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/RedBlackTree.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/RedBlackTree.java b/orc/src/java/org/apache/orc/impl/RedBlackTree.java deleted file mode 100644 index 41aa4b9..0000000 --- a/orc/src/java/org/apache/orc/impl/RedBlackTree.java +++ /dev/null @@ -1,311 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.impl; - -import org.apache.orc.impl.DynamicIntArray; - -/** - * A memory efficient red-black tree that does not allocate any objects per - * an element. This class is abstract and assumes that the child class - * handles the key and comparisons with the key. - */ -abstract class RedBlackTree { - public static final int NULL = -1; - - // Various values controlling the offset of the data within the array. - private static final int LEFT_OFFSET = 0; - private static final int RIGHT_OFFSET = 1; - private static final int ELEMENT_SIZE = 2; - - protected int size = 0; - private final DynamicIntArray data; - protected int root = NULL; - protected int lastAdd = 0; - private boolean wasAdd = false; - - /** - * Create a set with the given initial capacity. - */ - public RedBlackTree(int initialCapacity) { - data = new DynamicIntArray(initialCapacity * ELEMENT_SIZE); - } - - /** - * Insert a new node into the data array, growing the array as necessary. - * - * @return Returns the position of the new node. - */ - private int insert(int left, int right, boolean isRed) { - int position = size; - size += 1; - setLeft(position, left, isRed); - setRight(position, right); - return position; - } - - /** - * Compare the value at the given position to the new value. - * @return 0 if the values are the same, -1 if the new value is smaller and - * 1 if the new value is larger. - */ - protected abstract int compareValue(int position); - - /** - * Is the given node red as opposed to black? To prevent having an extra word - * in the data array, we just the low bit on the left child index. - */ - protected boolean isRed(int position) { - return position != NULL && - (data.get(position * ELEMENT_SIZE + LEFT_OFFSET) & 1) == 1; - } - - /** - * Set the red bit true or false. - */ - private void setRed(int position, boolean isRed) { - int offset = position * ELEMENT_SIZE + LEFT_OFFSET; - if (isRed) { - data.set(offset, data.get(offset) | 1); - } else { - data.set(offset, data.get(offset) & ~1); - } - } - - /** - * Get the left field of the given position. - */ - protected int getLeft(int position) { - return data.get(position * ELEMENT_SIZE + LEFT_OFFSET) >> 1; - } - - /** - * Get the right field of the given position. - */ - protected int getRight(int position) { - return data.get(position * ELEMENT_SIZE + RIGHT_OFFSET); - } - - /** - * Set the left field of the given position. - * Note that we are storing the node color in the low bit of the left pointer. - */ - private void setLeft(int position, int left) { - int offset = position * ELEMENT_SIZE + LEFT_OFFSET; - data.set(offset, (left << 1) | (data.get(offset) & 1)); - } - - /** - * Set the left field of the given position. - * Note that we are storing the node color in the low bit of the left pointer. - */ - private void setLeft(int position, int left, boolean isRed) { - int offset = position * ELEMENT_SIZE + LEFT_OFFSET; - data.set(offset, (left << 1) | (isRed ? 1 : 0)); - } - - /** - * Set the right field of the given position. - */ - private void setRight(int position, int right) { - data.set(position * ELEMENT_SIZE + RIGHT_OFFSET, right); - } - - /** - * Insert or find a given key in the tree and rebalance the tree correctly. - * Rebalancing restores the red-black aspect of the tree to maintain the - * invariants: - * 1. If a node is red, both of its children are black. - * 2. Each child of a node has the same black height (the number of black - * nodes between it and the leaves of the tree). - * - * Inserted nodes are at the leaves and are red, therefore there is at most a - * violation of rule 1 at the node we just put in. Instead of always keeping - * the parents, this routine passing down the context. - * - * The fix is broken down into 6 cases (1.{1,2,3} and 2.{1,2,3} that are - * left-right mirror images of each other). See Algorighms by Cormen, - * Leiserson, and Rivest for the explaination of the subcases. - * - * @param node The node that we are fixing right now. - * @param fromLeft Did we come down from the left? - * @param parent Nodes' parent - * @param grandparent Parent's parent - * @param greatGrandparent Grandparent's parent - * @return Does parent also need to be checked and/or fixed? - */ - private boolean add(int node, boolean fromLeft, int parent, - int grandparent, int greatGrandparent) { - if (node == NULL) { - if (root == NULL) { - lastAdd = insert(NULL, NULL, false); - root = lastAdd; - wasAdd = true; - return false; - } else { - lastAdd = insert(NULL, NULL, true); - node = lastAdd; - wasAdd = true; - // connect the new node into the tree - if (fromLeft) { - setLeft(parent, node); - } else { - setRight(parent, node); - } - } - } else { - int compare = compareValue(node); - boolean keepGoing; - - // Recurse down to find where the node needs to be added - if (compare < 0) { - keepGoing = add(getLeft(node), true, node, parent, grandparent); - } else if (compare > 0) { - keepGoing = add(getRight(node), false, node, parent, grandparent); - } else { - lastAdd = node; - wasAdd = false; - return false; - } - - // we don't need to fix the root (because it is always set to black) - if (node == root || !keepGoing) { - return false; - } - } - - - // Do we need to fix this node? Only if there are two reds right under each - // other. - if (isRed(node) && isRed(parent)) { - if (parent == getLeft(grandparent)) { - int uncle = getRight(grandparent); - if (isRed(uncle)) { - // case 1.1 - setRed(parent, false); - setRed(uncle, false); - setRed(grandparent, true); - return true; - } else { - if (node == getRight(parent)) { - // case 1.2 - // swap node and parent - int tmp = node; - node = parent; - parent = tmp; - // left-rotate on node - setLeft(grandparent, parent); - setRight(node, getLeft(parent)); - setLeft(parent, node); - } - - // case 1.2 and 1.3 - setRed(parent, false); - setRed(grandparent, true); - - // right-rotate on grandparent - if (greatGrandparent == NULL) { - root = parent; - } else if (getLeft(greatGrandparent) == grandparent) { - setLeft(greatGrandparent, parent); - } else { - setRight(greatGrandparent, parent); - } - setLeft(grandparent, getRight(parent)); - setRight(parent, grandparent); - return false; - } - } else { - int uncle = getLeft(grandparent); - if (isRed(uncle)) { - // case 2.1 - setRed(parent, false); - setRed(uncle, false); - setRed(grandparent, true); - return true; - } else { - if (node == getLeft(parent)) { - // case 2.2 - // swap node and parent - int tmp = node; - node = parent; - parent = tmp; - // right-rotate on node - setRight(grandparent, parent); - setLeft(node, getRight(parent)); - setRight(parent, node); - } - // case 2.2 and 2.3 - setRed(parent, false); - setRed(grandparent, true); - // left-rotate on grandparent - if (greatGrandparent == NULL) { - root = parent; - } else if (getRight(greatGrandparent) == grandparent) { - setRight(greatGrandparent, parent); - } else { - setLeft(greatGrandparent, parent); - } - setRight(grandparent, getLeft(parent)); - setLeft(parent, grandparent); - return false; - } - } - } else { - return true; - } - } - - /** - * Add the new key to the tree. - * @return true if the element is a new one. - */ - protected boolean add() { - add(root, false, NULL, NULL, NULL); - if (wasAdd) { - setRed(root, false); - return true; - } else { - return false; - } - } - - /** - * Get the number of elements in the set. - */ - public int size() { - return size; - } - - /** - * Reset the table to empty. - */ - public void clear() { - root = NULL; - size = 0; - data.clear(); - } - - /** - * Get the buffer size in bytes. - */ - public long getSizeInBytes() { - return data.getSizeInBytes(); - } -} - http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java b/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java deleted file mode 100644 index 24bd051..0000000 --- a/orc/src/java/org/apache/orc/impl/RunLengthByteReader.java +++ /dev/null @@ -1,174 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.orc.impl; - -import java.io.EOFException; -import java.io.IOException; - -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; - -/** - * A reader that reads a sequence of bytes. A control byte is read before - * each run with positive values 0 to 127 meaning 3 to 130 repetitions. If the - * byte is -1 to -128, 1 to 128 literal byte values follow. - */ -public class RunLengthByteReader { - private InStream input; - private final byte[] literals = - new byte[RunLengthByteWriter.MAX_LITERAL_SIZE]; - private int numLiterals = 0; - private int used = 0; - private boolean repeat = false; - - public RunLengthByteReader(InStream input) throws IOException { - this.input = input; - } - - public void setInStream(InStream input) { - this.input = input; - } - - private void readValues(boolean ignoreEof) throws IOException { - int control = input.read(); - used = 0; - if (control == -1) { - if (!ignoreEof) { - throw new EOFException("Read past end of buffer RLE byte from " + input); - } - used = numLiterals = 0; - return; - } else if (control < 0x80) { - repeat = true; - numLiterals = control + RunLengthByteWriter.MIN_REPEAT_SIZE; - int val = input.read(); - if (val == -1) { - throw new EOFException("Reading RLE byte got EOF"); - } - literals[0] = (byte) val; - } else { - repeat = false; - numLiterals = 0x100 - control; - int bytes = 0; - while (bytes < numLiterals) { - int result = input.read(literals, bytes, numLiterals - bytes); - if (result == -1) { - throw new EOFException("Reading RLE byte literal got EOF in " + this); - } - bytes += result; - } - } - } - - public boolean hasNext() throws IOException { - return used != numLiterals || input.available() > 0; - } - - public byte next() throws IOException { - byte result; - if (used == numLiterals) { - readValues(false); - } - if (repeat) { - result = literals[0]; - } else { - result = literals[used]; - } - ++used; - return result; - } - - public void nextVector(ColumnVector previous, long[] data, long size) - throws IOException { - previous.isRepeating = true; - for (int i = 0; i < size; i++) { - if (!previous.isNull[i]) { - data[i] = next(); - } else { - // The default value of null for int types in vectorized - // processing is 1, so set that if the value is null - data[i] = 1; - } - - // The default value for nulls in Vectorization for int types is 1 - // and given that non null value can also be 1, we need to check for isNull also - // when determining the isRepeating flag. - if (previous.isRepeating - && i > 0 - && ((data[0] != data[i]) || - (previous.isNull[0] != previous.isNull[i]))) { - previous.isRepeating = false; - } - } - } - - /** - * Read the next size bytes into the data array, skipping over any slots - * where isNull is true. - * @param isNull if non-null, skip any rows where isNull[r] is true - * @param data the array to read into - * @param size the number of elements to read - * @throws IOException - */ - public void nextVector(boolean[] isNull, int[] data, - long size) throws IOException { - if (isNull == null) { - for(int i=0; i < size; ++i) { - data[i] = next(); - } - } else { - for(int i=0; i < size; ++i) { - if (!isNull[i]) { - data[i] = next(); - } - } - } - } - - public void seek(PositionProvider index) throws IOException { - input.seek(index); - int consumed = (int) index.getNext(); - if (consumed != 0) { - // a loop is required for cases where we break the run into two parts - while (consumed > 0) { - readValues(false); - used = consumed; - consumed -= numLiterals; - } - } else { - used = 0; - numLiterals = 0; - } - } - - public void skip(long items) throws IOException { - while (items > 0) { - if (used == numLiterals) { - readValues(false); - } - long consume = Math.min(items, numLiterals - used); - used += consume; - items -= consume; - } - } - - @Override - public String toString() { - return "byte rle " + (repeat ? "repeat" : "literal") + " used: " + - used + "/" + numLiterals + " from " + input; - } -}