Repository: spark
Updated Branches:
  refs/heads/branch-2.0 cd41e6a33 -> 4cb8ff73f


[SPARK-16334] Maintain single dictionary per row-batch in vectorized parquet 
reader

## What changes were proposed in this pull request?

As part of the bugfix in https://github.com/apache/spark/pull/12279, if a row 
batch consist of both dictionary encoded and non-dictionary encoded pages, we 
explicitly decode the dictionary for the values that are already dictionary 
encoded. Currently we reset the dictionary while reading every page that can 
potentially cause ` java.lang.ArrayIndexOutOfBoundsException` while decoding 
older pages. This patch fixes the problem by maintaining a single dictionary 
per row-batch in vectorized parquet reader.

## How was this patch tested?

Manual Tests against a number of hand-generated parquet files.

Author: Sameer Agarwal <samee...@cs.berkeley.edu>

Closes #14225 from sameeragarwal/vectorized.

(cherry picked from commit 46f80a307392bee6743e5847eb5243bf5fcd00a4)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4cb8ff73
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4cb8ff73
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4cb8ff73

Branch: refs/heads/branch-2.0
Commit: 4cb8ff73fc91869716a579972166f41984fcdbf4
Parents: cd41e6a
Author: Sameer Agarwal <samee...@cs.berkeley.edu>
Authored: Thu Jul 21 15:34:32 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu Jul 21 15:34:39 2016 -0700

----------------------------------------------------------------------
 .../parquet/VectorizedColumnReader.java         | 21 ++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4cb8ff73/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index a18b881..6c47dc0 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -59,7 +59,7 @@ public class VectorizedColumnReader {
   /**
    * If true, the current page is dictionary encoded.
    */
-  private boolean useDictionary;
+  private boolean isCurrentPageDictionaryEncoded;
 
   /**
    * Maximum definition level for this column.
@@ -100,13 +100,13 @@ public class VectorizedColumnReader {
     if (dictionaryPage != null) {
       try {
         this.dictionary = 
dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
-        this.useDictionary = true;
+        this.isCurrentPageDictionaryEncoded = true;
       } catch (IOException e) {
         throw new IOException("could not decode the dictionary for " + 
descriptor, e);
       }
     } else {
       this.dictionary = null;
-      this.useDictionary = false;
+      this.isCurrentPageDictionaryEncoded = false;
     }
     this.totalValueCount = pageReader.getTotalValueCount();
     if (totalValueCount == 0) {
@@ -136,6 +136,13 @@ public class VectorizedColumnReader {
    */
   void readBatch(int total, ColumnVector column) throws IOException {
     int rowId = 0;
+    ColumnVector dictionaryIds = null;
+    if (dictionary != null) {
+      // SPARK-16334: We only maintain a single dictionary per row batch, so 
that it can be used to
+      // decode all previous dictionary encoded pages if we ever encounter a 
non-dictionary encoded
+      // page.
+      dictionaryIds = column.reserveDictionaryIds(total);
+    }
     while (total > 0) {
       // Compute the number of values we want to read in this page.
       int leftInPage = (int) (endOfPageValueCount - valuesRead);
@@ -144,12 +151,10 @@ public class VectorizedColumnReader {
         leftInPage = (int) (endOfPageValueCount - valuesRead);
       }
       int num = Math.min(total, leftInPage);
-      if (useDictionary) {
+      if (isCurrentPageDictionaryEncoded) {
         // Read and decode dictionary ids.
-        ColumnVector dictionaryIds = column.reserveDictionaryIds(total);
         defColumn.readIntegers(
             num, dictionaryIds, column, rowId, maxDefLevel, 
(VectorizedValuesReader) dataColumn);
-
         if (column.hasDictionary() || (rowId == 0 &&
             (descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT32 ||
             descriptor.getType() == PrimitiveType.PrimitiveTypeName.INT64 ||
@@ -461,13 +466,13 @@ public class VectorizedColumnReader {
         throw new UnsupportedOperationException("Unsupported encoding: " + 
dataEncoding);
       }
       this.dataColumn = new VectorizedRleValuesReader();
-      this.useDictionary = true;
+      this.isCurrentPageDictionaryEncoded = true;
     } else {
       if (dataEncoding != Encoding.PLAIN) {
         throw new UnsupportedOperationException("Unsupported encoding: " + 
dataEncoding);
       }
       this.dataColumn = new VectorizedPlainValuesReader();
-      this.useDictionary = false;
+      this.isCurrentPageDictionaryEncoded = false;
     }
 
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to