PARQUET-743: Fix DictionaryFilter when compressed dictionaries are reused. BytesInput is not supposed to be held and reused, but decompressed dictionary pages do this. Reusing the dictionary will cause a failure, so the cleanest option is to keep the bytes around once the underlying stream has been read.
Author: Ryan Blue <b...@apache.org> Closes #376 from rdblue/PARQUET-743-fix-reused-dictionaries and squashes the following commits: 28c0903 [Ryan Blue] PARQUET-743: Fix DictionaryFilter when dictionaries are reused. Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/cec25700 Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/cec25700 Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/cec25700 Branch: refs/heads/parquet-1.8.x Commit: cec25700d2437ac34f69b43ac3097a98cf293b04 Parents: 2034669 Author: Ryan Blue <b...@apache.org> Authored: Wed Oct 12 18:05:21 2016 -0700 Committer: Ryan Blue <b...@apache.org> Committed: Mon Jan 9 16:54:54 2017 -0800 ---------------------------------------------------------------------- .../parquet/hadoop/DictionaryPageReader.java | 19 ++++++++++++++++++- .../dictionarylevel/DictionaryFilterTest.java | 4 ++-- 2 files changed, 20 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/cec25700/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java index 9a99358..2be7ffe 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java @@ -19,6 +19,7 @@ package org.apache.parquet.hadoop; import org.apache.parquet.Strings; +import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.EncodingStats; @@ -93,7 +94,10 @@ class DictionaryPageReader implements DictionaryPageReadStore { // check the cache again in case this thread waited on another reading the same page if (!cache.containsKey(dotPath)) { DictionaryPage dict = hasDictionaryPage(column) ? reader.readDictionary(column) : null; - cache.put(dotPath, dict); + // copy the dictionary to ensure it can be reused if it is returned + // more than once. this can happen when a DictionaryFilter has two or + // more predicates for the same column. + cache.put(dotPath, reusableCopy(dict)); } } @@ -104,6 +108,19 @@ class DictionaryPageReader implements DictionaryPageReadStore { } } + private static DictionaryPage reusableCopy(DictionaryPage dict) { + if (dict == null) { + return null; + } + try { + return new DictionaryPage( + BytesInput.from(dict.getBytes().toByteArray()), + dict.getDictionarySize(), dict.getEncoding()); + } catch (IOException e) { + throw new ParquetDecodingException("Cannot read dictionary", e); + } + } + private boolean hasDictionaryPage(ColumnChunkMetaData column) { EncodingStats stats = column.getEncodingStats(); if (stats != null) { http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/cec25700/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java index 7af0c40..eca6332 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java @@ -56,7 +56,7 @@ import java.util.UUID; import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0; import static org.apache.parquet.filter2.dictionarylevel.DictionaryFilter.canDrop; import static org.apache.parquet.filter2.predicate.FilterApi.*; -import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -118,7 +118,7 @@ public class DictionaryFilterTest { SimpleGroupFactory f = new SimpleGroupFactory(schema); ParquetWriter<Group> writer = ExampleParquetWriter.builder(file) .withWriterVersion(PARQUET_1_0) - .withCompressionCodec(UNCOMPRESSED) + .withCompressionCodec(GZIP) .withRowGroupSize(1024*1024) .withPageSize(1024) .enableDictionaryEncoding()