Repository: orc Updated Branches: refs/heads/branch-1.5 55db9ada2 -> eaee12d09
ORC-397. Allow selective disabling of dictionary encoding. Original patch was by Mithun Radhakrishnan. Fixes #304 Signed-off-by: Owen O'Malley <omal...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/orc/repo Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/e8f01a5c Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/e8f01a5c Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/e8f01a5c Branch: refs/heads/branch-1.5 Commit: e8f01a5c6ff4e170126220fc0fabd2d6ec1d602c Parents: 55db9ad Author: Owen O'Malley <omal...@apache.org> Authored: Tue Aug 28 16:12:05 2018 -0700 Committer: Owen O'Malley <omal...@apache.org> Committed: Mon Sep 17 13:19:39 2018 -0700 ---------------------------------------------------------------------- java/core/src/java/org/apache/orc/OrcConf.java | 4 +- java/core/src/java/org/apache/orc/OrcFile.java | 17 + .../java/org/apache/orc/impl/WriterImpl.java | 11 + .../orc/impl/writer/StringBaseTreeWriter.java | 3 +- .../apache/orc/impl/writer/WriterContext.java | 2 + .../apache/orc/impl/writer/WriterImplV2.java | 548 +------------------ .../org/apache/orc/TestStringDictionary.java | 83 ++- 7 files changed, 117 insertions(+), 551 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/orc/blob/e8f01a5c/java/core/src/java/org/apache/orc/OrcConf.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/OrcConf.java b/java/core/src/java/org/apache/orc/OrcConf.java index d92f776..bdf8c0d 100644 --- a/java/core/src/java/org/apache/orc/OrcConf.java +++ b/java/core/src/java/org/apache/orc/OrcConf.java @@ -155,7 +155,9 @@ public enum OrcConf { "A boolean flag to determine if the comparision of field names in schema evolution is case sensitive .\n"), WRITE_VARIABLE_LENGTH_BLOCKS("orc.write.variable.length.blocks", null, false, "A boolean flag as to whether the ORC writer should write variable length\n" - + "HDFS blocks.") + + "HDFS blocks."), + DIRECT_ENCODING_COLUMNS("orc.column.encoding.direct", "orc.column.encoding.direct", "", + "Comma-separated list of columns for which dictionary encoding is to be skipped."), ; private final String attribute; http://git-wip-us.apache.org/repos/asf/orc/blob/e8f01a5c/java/core/src/java/org/apache/orc/OrcFile.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/OrcFile.java b/java/core/src/java/org/apache/orc/OrcFile.java index b07355a..33aa431 100644 --- a/java/core/src/java/org/apache/orc/OrcFile.java +++ b/java/core/src/java/org/apache/orc/OrcFile.java @@ -407,6 +407,7 @@ public class OrcFile { private boolean overwrite; private boolean writeVariableLengthBlocks; private HadoopShims shims; + private String directEncodingColumns; protected WriterOptions(Properties tableProperties, Configuration conf) { configuration = conf; @@ -449,6 +450,8 @@ public class OrcFile { shims = HadoopShimsFactory.get(); writeVariableLengthBlocks = OrcConf.WRITE_VARIABLE_LENGTH_BLOCKS.getBoolean(tableProperties,conf); + directEncodingColumns = OrcConf.DIRECT_ENCODING_COLUMNS.getString( + tableProperties, conf); } /** @@ -687,6 +690,16 @@ public class OrcFile { return this; } + /** + * Set the comma-separated list of columns that should be direct encoded. + * @param value the value to set + * @return this + */ + public WriterOptions directEncodingColumns(String value) { + directEncodingColumns = value; + return this; + } + public boolean getBlockPadding() { return blockPaddingValue; } @@ -786,6 +799,10 @@ public class OrcFile { public boolean getUseUTCTimestamp() { return useUTCTimestamp; } + + public String getDirectEncodingColumns() { + return directEncodingColumns; + } } /** http://git-wip-us.apache.org/repos/asf/orc/blob/e8f01a5c/java/core/src/java/org/apache/orc/impl/WriterImpl.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java index d6239f2..827747e 100644 --- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java +++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java @@ -35,6 +35,7 @@ import org.apache.orc.ColumnStatistics; import org.apache.orc.CompressionCodec; import org.apache.orc.CompressionKind; import org.apache.orc.MemoryManager; +import org.apache.orc.OrcConf; import org.apache.orc.OrcFile; import org.apache.orc.OrcProto; import org.apache.orc.OrcUtils; @@ -113,6 +114,8 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback { private final OrcFile.BloomFilterVersion bloomFilterVersion; private final boolean writeTimeZone; private final boolean useUTCTimeZone; + private final double dictionaryKeySizeThreshold; + private final boolean[] directEncodingColumns; public WriterImpl(FileSystem fs, Path path, @@ -123,6 +126,10 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback { this.schema = opts.getSchema(); this.writerVersion = opts.getWriterVersion(); bloomFilterVersion = opts.getBloomFilterVersion(); + this.directEncodingColumns = OrcUtils.includeColumns( + opts.getDirectEncodingColumns(), opts.getSchema()); + dictionaryKeySizeThreshold = + OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf); if (callback != null) { callbackContext = new OrcFile.WriterContext(){ @@ -410,6 +417,10 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback { public boolean getUseUTCTimestamp() { return useUTCTimeZone; } + + public double getDictionaryKeySizeThreshold(int columnId) { + return directEncodingColumns[columnId] ? 0.0 : dictionaryKeySizeThreshold; + } } http://git-wip-us.apache.org/repos/asf/orc/blob/e8f01a5c/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java b/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java index 742c1ed..e7d3259 100644 --- a/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java +++ b/java/core/src/java/org/apache/orc/impl/writer/StringBaseTreeWriter.java @@ -76,8 +76,7 @@ public abstract class StringBaseTreeWriter extends TreeWriterBase { rowIndexValueCount.add(0L); buildIndex = writer.buildIndex(); Configuration conf = writer.getConfiguration(); - dictionaryKeySizeThreshold = - OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf); + dictionaryKeySizeThreshold = writer.getDictionaryKeySizeThreshold(columnId); strideDictionaryCheck = OrcConf.ROW_INDEX_STRIDE_DICTIONARY_CHECK.getBoolean(conf); if (dictionaryKeySizeThreshold <= 0.0) { http://git-wip-us.apache.org/repos/asf/orc/blob/e8f01a5c/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java b/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java index 1c8ca1a..9ef3dda 100644 --- a/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java +++ b/java/core/src/java/org/apache/orc/impl/writer/WriterContext.java @@ -103,4 +103,6 @@ public interface WriterContext { ) throws IOException; boolean getUseUTCTimestamp(); + + double getDictionaryKeySizeThreshold(int column); } http://git-wip-us.apache.org/repos/asf/orc/blob/e8f01a5c/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java b/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java index e1f410c..6d93a34 100644 --- a/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java +++ b/java/core/src/java/org/apache/orc/impl/writer/WriterImplV2.java @@ -71,557 +71,17 @@ import java.util.TreeMap; * to be confined to a single thread as well. * */ -public class WriterImplV2 implements WriterInternal, MemoryManager.Callback { +public class WriterImplV2 extends WriterImpl { private static final Logger LOG = LoggerFactory.getLogger(WriterImplV2.class); - private static final int MIN_ROW_INDEX_STRIDE = 1000; - - private final Path path; - private long adjustedStripeSize; - private final int rowIndexStride; - private final CompressionKind compress; - private int bufferSize; - private final TypeDescription schema; - private final PhysicalWriter physicalWriter; - private final OrcFile.WriterVersion writerVersion; - - private long rowCount = 0; - private long rowsInStripe = 0; - private long rawDataSize = 0; - private int rowsInIndex = 0; - private long lastFlushOffset = 0; - private int stripesAtLastFlush = -1; - private final List<OrcProto.StripeInformation> stripes = - new ArrayList<>(); - private final OrcProto.Metadata.Builder fileMetadata = - OrcProto.Metadata.newBuilder(); - private final Map<String, ByteString> userMetadata = - new TreeMap<>(); - private final TreeWriter treeWriter; - private final boolean buildIndex; - private final MemoryManager memoryManager; - private final OrcFile.Version version; - private final Configuration conf; - private final OrcFile.WriterCallback callback; - private final OrcFile.WriterContext callbackContext; - private final OrcFile.EncodingStrategy encodingStrategy; - private final OrcFile.CompressionStrategy compressionStrategy; - private final boolean[] bloomFilterColumns; - private final double bloomFilterFpp; - private final OrcFile.BloomFilterVersion bloomFilterVersion; - private final boolean writeTimeZone; - private final boolean useUTCTimeZone; - public WriterImplV2(FileSystem fs, Path path, OrcFile.WriterOptions opts) throws IOException { - this.path = path; - this.conf = opts.getConfiguration(); - this.callback = opts.getCallback(); - this.schema = opts.getSchema(); - this.writerVersion = opts.getWriterVersion(); - bloomFilterVersion = opts.getBloomFilterVersion(); - if (callback != null) { - callbackContext = new OrcFile.WriterContext(){ - - @Override - public Writer getWriter() { - return WriterImplV2.this; - } - }; - } else { - callbackContext = null; - } - this.writeTimeZone = hasTimestamp(schema); - this.useUTCTimeZone = opts.getUseUTCTimestamp(); - this.adjustedStripeSize = opts.getStripeSize(); - this.version = opts.getVersion(); - this.encodingStrategy = opts.getEncodingStrategy(); - this.compressionStrategy = opts.getCompressionStrategy(); - this.compress = opts.getCompress(); - this.rowIndexStride = opts.getRowIndexStride(); - this.memoryManager = opts.getMemoryManager(); - buildIndex = rowIndexStride > 0; - int numColumns = schema.getMaximumId() + 1; - if (opts.isEnforceBufferSize()) { - OutStream.assertBufferSizeValid(opts.getBufferSize()); - this.bufferSize = opts.getBufferSize(); - } else { - this.bufferSize = WriterImpl.getEstimatedBufferSize(adjustedStripeSize, - numColumns, opts.getBufferSize()); - } - if (version == OrcFile.Version.FUTURE) { - throw new IllegalArgumentException("Can not write in a unknown version."); - } else if (version == OrcFile.Version.UNSTABLE_PRE_2_0) { - LOG.warn("ORC files written in " + version.getName() + " will not be" + + super(fs, path, opts); + LOG.warn("ORC files written in " + + OrcFile.Version.UNSTABLE_PRE_2_0.getName() + " will not be" + " readable by other versions of the software. It is only for" + " developer testing."); - } - if (version == OrcFile.Version.V_0_11) { - /* do not write bloom filters for ORC v11 */ - this.bloomFilterColumns = new boolean[schema.getMaximumId() + 1]; - } else { - this.bloomFilterColumns = - OrcUtils.includeColumns(opts.getBloomFilterColumns(), schema); - } - this.bloomFilterFpp = opts.getBloomFilterFpp(); - this.physicalWriter = opts.getPhysicalWriter() == null ? - new PhysicalFsWriter(fs, path, opts) : opts.getPhysicalWriter(); - physicalWriter.writeHeader(); - treeWriter = TreeWriter.Factory.create(schema, new StreamFactory(), false); - if (buildIndex && rowIndexStride < MIN_ROW_INDEX_STRIDE) { - throw new IllegalArgumentException("Row stride must be at least " + - MIN_ROW_INDEX_STRIDE); - } - - // ensure that we are able to handle callbacks before we register ourselves - memoryManager.addWriter(path, opts.getStripeSize(), this); - LOG.info("ORC writer created for path: {} with stripeSize: {} blockSize: {}" + - " compression: {} bufferSize: {}", path, adjustedStripeSize, opts.getBlockSize(), - compress, bufferSize); - } - - @Override - public boolean checkMemory(double newScale) throws IOException { - long limit = Math.round(adjustedStripeSize * newScale); - long size = treeWriter.estimateMemory(); - if (LOG.isDebugEnabled()) { - LOG.debug("ORC writer " + physicalWriter + " size = " + size + - " limit = " + limit); - } - if (size > limit) { - flushStripe(); - return true; - } - return false; - } - - - CompressionCodec getCustomizedCodec(OrcProto.Stream.Kind kind) { - // TODO: modify may create a new codec here. We want to end() it when the stream is closed, - // but at this point there's no close() for the stream. - CompressionCodec result = physicalWriter.getCompressionCodec(); - if (result != null) { - switch (kind) { - case BLOOM_FILTER: - case DATA: - case DICTIONARY_DATA: - case BLOOM_FILTER_UTF8: - if (compressionStrategy == OrcFile.CompressionStrategy.SPEED) { - result = result.modify(EnumSet.of(CompressionCodec.Modifier.FAST, - CompressionCodec.Modifier.TEXT)); - } else { - result = result.modify(EnumSet.of(CompressionCodec.Modifier.DEFAULT, - CompressionCodec.Modifier.TEXT)); - } - break; - case LENGTH: - case DICTIONARY_COUNT: - case PRESENT: - case ROW_INDEX: - case SECONDARY: - // easily compressed using the fastest modes - result = result.modify(EnumSet.of(CompressionCodec.Modifier.FASTEST, - CompressionCodec.Modifier.BINARY)); - break; - default: - LOG.info("Missing ORC compression modifiers for " + kind); - break; - } - } - return result; - } - - @Override - public void increaseCompressionSize(int newSize) { - if (newSize > bufferSize) { - bufferSize = newSize; - } - } - - /** - * Interface from the Writer to the TreeWriters. This limits the visibility - * that the TreeWriters have into the Writer. - */ - private class StreamFactory implements WriterContext { - /** - * Create a stream to store part of a column. - * @param column the column id for the stream - * @param kind the kind of stream - * @return The output outStream that the section needs to be written to. - */ - public OutStream createStream(int column, - OrcProto.Stream.Kind kind - ) throws IOException { - final StreamName name = new StreamName(column, kind); - CompressionCodec codec = getCustomizedCodec(kind); - - return new OutStream(physicalWriter.toString(), bufferSize, codec, - physicalWriter.createDataStream(name)); - } - - /** - * Get the stride rate of the row index. - */ - public int getRowIndexStride() { - return rowIndexStride; - } - - /** - * Should be building the row index. - * @return true if we are building the index - */ - public boolean buildIndex() { - return buildIndex; - } - - /** - * Is the ORC file compressed? - * @return are the streams compressed - */ - public boolean isCompressed() { - return physicalWriter.getCompressionCodec() != null; - } - - /** - * Get the encoding strategy to use. - * @return encoding strategy - */ - public OrcFile.EncodingStrategy getEncodingStrategy() { - return encodingStrategy; - } - - /** - * Get the bloom filter columns - * @return bloom filter columns - */ - public boolean[] getBloomFilterColumns() { - return bloomFilterColumns; - } - - /** - * Get bloom filter false positive percentage. - * @return fpp - */ - public double getBloomFilterFPP() { - return bloomFilterFpp; - } - - /** - * Get the writer's configuration. - * @return configuration - */ - public Configuration getConfiguration() { - return conf; - } - - /** - * Get the version of the file to write. - */ - public OrcFile.Version getVersion() { - return version; - } - - /** - * Get the PhysicalWriter. - * - * @return the file's physical writer. - */ - @Override - public PhysicalWriter getPhysicalWriter() { - return physicalWriter; - } - - public OrcFile.BloomFilterVersion getBloomFilterVersion() { - return bloomFilterVersion; - } - - public void writeIndex(StreamName name, - OrcProto.RowIndex.Builder index) throws IOException { - physicalWriter.writeIndex(name, index, getCustomizedCodec(name.getKind())); - } - - public void writeBloomFilter(StreamName name, - OrcProto.BloomFilterIndex.Builder bloom - ) throws IOException { - physicalWriter.writeBloomFilter(name, bloom, - getCustomizedCodec(name.getKind())); - } - - public boolean getUseUTCTimestamp() { - return useUTCTimeZone; - } - } - - - private static void writeTypes(OrcProto.Footer.Builder builder, - TypeDescription schema) { - builder.addAllTypes(OrcUtils.getOrcTypes(schema)); - } - - private void createRowIndexEntry() throws IOException { - treeWriter.createRowIndexEntry(); - rowsInIndex = 0; - } - - private void flushStripe() throws IOException { - if (buildIndex && rowsInIndex != 0) { - createRowIndexEntry(); - } - if (rowsInStripe != 0) { - if (callback != null) { - callback.preStripeWrite(callbackContext); - } - // finalize the data for the stripe - int requiredIndexEntries = rowIndexStride == 0 ? 0 : - (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride); - OrcProto.StripeFooter.Builder builder = - OrcProto.StripeFooter.newBuilder(); - if (writeTimeZone) { - if (useUTCTimeZone) { - builder.setWriterTimezone("UTC"); - } else { - builder.setWriterTimezone(TimeZone.getDefault().getID()); - } - } - OrcProto.StripeStatistics.Builder stats = - OrcProto.StripeStatistics.newBuilder(); - - treeWriter.flushStreams(); - treeWriter.writeStripe(builder, stats, requiredIndexEntries); - - OrcProto.StripeInformation.Builder dirEntry = - OrcProto.StripeInformation.newBuilder() - .setNumberOfRows(rowsInStripe); - physicalWriter.finalizeStripe(builder, dirEntry); - - fileMetadata.addStripeStats(stats.build()); - stripes.add(dirEntry.build()); - rowCount += rowsInStripe; - rowsInStripe = 0; - } - } - - private long computeRawDataSize() { - return treeWriter.getRawDataSize(); - } - - private OrcProto.CompressionKind writeCompressionKind(CompressionKind kind) { - switch (kind) { - case NONE: return OrcProto.CompressionKind.NONE; - case ZLIB: return OrcProto.CompressionKind.ZLIB; - case SNAPPY: return OrcProto.CompressionKind.SNAPPY; - case LZO: return OrcProto.CompressionKind.LZO; - case LZ4: return OrcProto.CompressionKind.LZ4; - default: - throw new IllegalArgumentException("Unknown compression " + kind); - } - } - - private void writeFileStatistics(OrcProto.Footer.Builder builder, - TreeWriter writer) throws IOException { - writer.writeFileStatistics(builder); - } - - private void writeMetadata() throws IOException { - physicalWriter.writeFileMetadata(fileMetadata); - } - - private long writePostScript() throws IOException { - OrcProto.PostScript.Builder builder = - OrcProto.PostScript.newBuilder() - .setCompression(writeCompressionKind(compress)) - .setMagic(OrcFile.MAGIC) - .addVersion(version.getMajor()) - .addVersion(version.getMinor()) - .setWriterVersion(writerVersion.getId()); - if (compress != CompressionKind.NONE) { - builder.setCompressionBlockSize(bufferSize); - } - return physicalWriter.writePostScript(builder); - } - - private long writeFooter() throws IOException { - writeMetadata(); - OrcProto.Footer.Builder builder = OrcProto.Footer.newBuilder(); - builder.setNumberOfRows(rowCount); - builder.setRowIndexStride(rowIndexStride); - rawDataSize = computeRawDataSize(); - // serialize the types - writeTypes(builder, schema); - // add the stripe information - for(OrcProto.StripeInformation stripe: stripes) { - builder.addStripes(stripe); - } - // add the column statistics - writeFileStatistics(builder, treeWriter); - // add all of the user metadata - for(Map.Entry<String, ByteString> entry: userMetadata.entrySet()) { - builder.addMetadata(OrcProto.UserMetadataItem.newBuilder() - .setName(entry.getKey()).setValue(entry.getValue())); - } - builder.setWriter(OrcFile.WriterImplementation.ORC_JAVA.getId()); - physicalWriter.writeFileFooter(builder); - return writePostScript(); - } - - @Override - public TypeDescription getSchema() { - return schema; - } - - @Override - public void addUserMetadata(String name, ByteBuffer value) { - userMetadata.put(name, ByteString.copyFrom(value)); - } - - @Override - public void addRowBatch(VectorizedRowBatch batch) throws IOException { - if (buildIndex) { - // Batch the writes up to the rowIndexStride so that we can get the - // right size indexes. - int posn = 0; - while (posn < batch.size) { - int chunkSize = Math.min(batch.size - posn, - rowIndexStride - rowsInIndex); - treeWriter.writeRootBatch(batch, posn, chunkSize); - posn += chunkSize; - rowsInIndex += chunkSize; - rowsInStripe += chunkSize; - if (rowsInIndex >= rowIndexStride) { - createRowIndexEntry(); - } - } - } else { - rowsInStripe += batch.size; - treeWriter.writeRootBatch(batch, 0, batch.size); - } - memoryManager.addedRow(batch.size); - } - - @Override - public void close() throws IOException { - if (callback != null) { - callback.preFooterWrite(callbackContext); - } - // remove us from the memory manager so that we don't get any callbacks - memoryManager.removeWriter(path); - // actually close the file - flushStripe(); - lastFlushOffset = writeFooter(); - physicalWriter.close(); - } - - /** - * Raw data size will be compute when writing the file footer. Hence raw data - * size value will be available only after closing the writer. - */ - @Override - public long getRawDataSize() { - return rawDataSize; - } - - /** - * Row count gets updated when flushing the stripes. To get accurate row - * count call this method after writer is closed. - */ - @Override - public long getNumberOfRows() { - return rowCount; - } - - @Override - public long writeIntermediateFooter() throws IOException { - // flush any buffered rows - flushStripe(); - // write a footer - if (stripesAtLastFlush != stripes.size()) { - if (callback != null) { - callback.preFooterWrite(callbackContext); - } - lastFlushOffset = writeFooter(); - stripesAtLastFlush = stripes.size(); - physicalWriter.flush(); - } - return lastFlushOffset; - } - - static void checkArgument(boolean expression, String message) { - if (!expression) { - throw new IllegalArgumentException(message); - } - } - - @Override - public void appendStripe(byte[] stripe, int offset, int length, - StripeInformation stripeInfo, - OrcProto.StripeStatistics stripeStatistics) throws IOException { - checkArgument(stripe != null, "Stripe must not be null"); - checkArgument(length <= stripe.length, - "Specified length must not be greater specified array length"); - checkArgument(stripeInfo != null, "Stripe information must not be null"); - checkArgument(stripeStatistics != null, - "Stripe statistics must not be null"); - - rowsInStripe = stripeInfo.getNumberOfRows(); - // update stripe information - OrcProto.StripeInformation.Builder dirEntry = OrcProto.StripeInformation - .newBuilder() - .setNumberOfRows(rowsInStripe) - .setIndexLength(stripeInfo.getIndexLength()) - .setDataLength(stripeInfo.getDataLength()) - .setFooterLength(stripeInfo.getFooterLength()); - physicalWriter.appendRawStripe(ByteBuffer.wrap(stripe, offset, length), - dirEntry); - - // since we have already written the stripe, just update stripe statistics - treeWriter.updateFileStatistics(stripeStatistics); - fileMetadata.addStripeStats(stripeStatistics); - - stripes.add(dirEntry.build()); - - // reset it after writing the stripe - rowCount += rowsInStripe; - rowsInStripe = 0; - } - - @Override - public void appendUserMetadata(List<OrcProto.UserMetadataItem> userMetadata) { - if (userMetadata != null) { - for (OrcProto.UserMetadataItem item : userMetadata) { - this.userMetadata.put(item.getName(), item.getValue()); - } - } - } - - @Override - public ColumnStatistics[] getStatistics() - throws IOException { - // Generate the stats - OrcProto.Footer.Builder builder = OrcProto.Footer.newBuilder(); - - // add the column statistics - writeFileStatistics(builder, treeWriter); - return ReaderImpl.deserializeStats(schema, builder.getStatisticsList()); - } - - public CompressionCodec getCompressionCodec() { - return physicalWriter.getCompressionCodec(); - } - - private static boolean hasTimestamp(TypeDescription schema) { - if (schema.getCategory() == TypeDescription.Category.TIMESTAMP) { - return true; - } - List<TypeDescription> children = schema.getChildren(); - if (children != null) { - for (TypeDescription child : children) { - if (hasTimestamp(child)) { - return true; - } - } - } - return false; } } http://git-wip-us.apache.org/repos/asf/orc/blob/e8f01a5c/java/core/src/test/org/apache/orc/TestStringDictionary.java ---------------------------------------------------------------------- diff --git a/java/core/src/test/org/apache/orc/TestStringDictionary.java b/java/core/src/test/org/apache/orc/TestStringDictionary.java index dbd615a..cc4f8d8 100644 --- a/java/core/src/test/org/apache/orc/TestStringDictionary.java +++ b/java/core/src/test/org/apache/orc/TestStringDictionary.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; @@ -35,14 +34,12 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.impl.OutStream; import org.apache.orc.impl.RecordReaderImpl; -import org.apache.orc.impl.RunLengthIntegerWriter; import org.apache.orc.impl.StreamName; import org.apache.orc.impl.TestInStream; -import org.apache.orc.impl.WriterImpl; import org.apache.orc.impl.writer.StringTreeWriter; import org.apache.orc.impl.writer.TreeWriter; import org.apache.orc.impl.writer.WriterContext; -import org.apache.orc.impl.writer.WriterImplV2; +import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -245,6 +242,11 @@ public class TestStringDictionary { public boolean getUseUTCTimestamp() { return true; } + + @Override + public double getDictionaryKeySizeThreshold(int column) { + return OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getDouble(conf); + } } @Test @@ -409,4 +411,77 @@ public class TestStringDictionary { } + /** + * Test that dictionaries can be disabled, per column. In this test, we want to disable DICTIONARY_V2 for the + * `longString` column (presumably for a low hit-ratio), while preserving DICTIONARY_V2 for `shortString`. + * @throws Exception on unexpected failure + */ + @Test + public void testDisableDictionaryForSpecificColumn() throws Exception { + final String SHORT_STRING_VALUE = "foo"; + final String LONG_STRING_VALUE = "BAAAAAAAAR!!"; + + TypeDescription schema = + TypeDescription.fromString("struct<shortString:string,longString:string>"); + + Writer writer = OrcFile.createWriter( + testFilePath, + OrcFile.writerOptions(conf).setSchema(schema) + .compress(CompressionKind.NONE) + .bufferSize(10000) + .directEncodingColumns("longString")); + + VectorizedRowBatch batch = schema.createRowBatch(); + BytesColumnVector shortStringColumnVector = (BytesColumnVector) batch.cols[0]; + BytesColumnVector longStringColumnVector = (BytesColumnVector) batch.cols[1]; + + for (int i = 0; i < 20000; i++) { + if (batch.size == batch.getMaxSize()) { + writer.addRowBatch(batch); + batch.reset(); + } + shortStringColumnVector.setVal(batch.size, SHORT_STRING_VALUE.getBytes()); + longStringColumnVector.setVal( batch.size, LONG_STRING_VALUE.getBytes()); + ++batch.size; + } + writer.addRowBatch(batch); + writer.close(); + + Reader reader = OrcFile.createReader(testFilePath, OrcFile.readerOptions(conf).filesystem(fs)); + RecordReader recordReader = reader.rows(); + batch = reader.getSchema().createRowBatch(); + shortStringColumnVector = (BytesColumnVector) batch.cols[0]; + longStringColumnVector = (BytesColumnVector) batch.cols[1]; + while (recordReader.nextBatch(batch)) { + for(int r=0; r < batch.size; ++r) { + assertEquals(SHORT_STRING_VALUE, shortStringColumnVector.toString(r)); + assertEquals(LONG_STRING_VALUE, longStringColumnVector.toString(r)); + } + } + + // make sure the encoding type is correct + for (StripeInformation stripe : reader.getStripes()) { + // hacky but does the job, this casting will work as long this test resides + // within the same package as ORC reader + OrcProto.StripeFooter footer = ((RecordReaderImpl) recordReader).readStripeFooter(stripe); + for (int i = 0; i < footer.getColumnsCount(); ++i) { + Assert.assertEquals( + "Expected 3 columns in the footer: One for the Orc Struct, and two for its members.", + 3, footer.getColumnsCount()); + Assert.assertEquals( + "The ORC schema struct should be DIRECT encoded.", + OrcProto.ColumnEncoding.Kind.DIRECT, footer.getColumns(0).getKind() + ); + Assert.assertEquals( + "The shortString column must be DICTIONARY_V2 encoded", + OrcProto.ColumnEncoding.Kind.DICTIONARY_V2, footer.getColumns(1).getKind() + ); + Assert.assertEquals( + "The longString column must be DIRECT_V2 encoded", + OrcProto.ColumnEncoding.Kind.DIRECT_V2, footer.getColumns(2).getKind() + ); + } + } + } + }