PARQUET-743: Fix DictionaryFilter when compressed dictionaries are reused.

BytesInput is not supposed to be held and reused, but decompressed
dictionary pages do this. Reusing the dictionary will cause a failure,
so the cleanest option is to keep the bytes around once the underlying
stream has been read.

Author: Ryan Blue <b...@apache.org>

Closes #376 from rdblue/PARQUET-743-fix-reused-dictionaries and squashes the 
following commits:

28c0903 [Ryan Blue] PARQUET-743: Fix DictionaryFilter when dictionaries are 
reused.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/cec25700
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/cec25700
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/cec25700

Branch: refs/heads/parquet-1.8.x
Commit: cec25700d2437ac34f69b43ac3097a98cf293b04
Parents: 2034669
Author: Ryan Blue <b...@apache.org>
Authored: Wed Oct 12 18:05:21 2016 -0700
Committer: Ryan Blue <b...@apache.org>
Committed: Mon Jan 9 16:54:54 2017 -0800

----------------------------------------------------------------------
 .../parquet/hadoop/DictionaryPageReader.java     | 19 ++++++++++++++++++-
 .../dictionarylevel/DictionaryFilterTest.java    |  4 ++--
 2 files changed, 20 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/cec25700/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
index 9a99358..2be7ffe 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
@@ -19,6 +19,7 @@
 package org.apache.parquet.hadoop;
 
 import org.apache.parquet.Strings;
+import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.EncodingStats;
@@ -93,7 +94,10 @@ class DictionaryPageReader implements 
DictionaryPageReadStore {
         // check the cache again in case this thread waited on another reading 
the same page
         if (!cache.containsKey(dotPath)) {
           DictionaryPage dict = hasDictionaryPage(column) ? 
reader.readDictionary(column) : null;
-          cache.put(dotPath, dict);
+          // copy the dictionary to ensure it can be reused if it is returned
+          // more than once. this can happen when a DictionaryFilter has two or
+          // more predicates for the same column.
+          cache.put(dotPath, reusableCopy(dict));
         }
       }
 
@@ -104,6 +108,19 @@ class DictionaryPageReader implements 
DictionaryPageReadStore {
     }
   }
 
+  private static DictionaryPage reusableCopy(DictionaryPage dict) {
+    if (dict == null) {
+      return null;
+    }
+    try {
+      return new DictionaryPage(
+          BytesInput.from(dict.getBytes().toByteArray()),
+          dict.getDictionarySize(), dict.getEncoding());
+    } catch (IOException e) {
+      throw new ParquetDecodingException("Cannot read dictionary", e);
+    }
+  }
+
   private boolean hasDictionaryPage(ColumnChunkMetaData column) {
     EncodingStats stats = column.getEncodingStats();
     if (stats != null) {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/cec25700/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
index 7af0c40..eca6332 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
@@ -56,7 +56,7 @@ import java.util.UUID;
 import static 
org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
 import static 
org.apache.parquet.filter2.dictionarylevel.DictionaryFilter.canDrop;
 import static org.apache.parquet.filter2.predicate.FilterApi.*;
-import static 
org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
 import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -118,7 +118,7 @@ public class DictionaryFilterTest {
     SimpleGroupFactory f = new SimpleGroupFactory(schema);
     ParquetWriter<Group> writer = ExampleParquetWriter.builder(file)
         .withWriterVersion(PARQUET_1_0)
-        .withCompressionCodec(UNCOMPRESSED)
+        .withCompressionCodec(GZIP)
         .withRowGroupSize(1024*1024)
         .withPageSize(1024)
         .enableDictionaryEncoding()

Reply via email to