This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c20d5195c34 [SPARK-38891][SQL] Skipping allocating vector for 
repetition & definition levels when possible
c20d5195c34 is described below

commit c20d5195c343b0a64a588ca56baacb52000e1c90
Author: Chao Sun <sunc...@apple.com>
AuthorDate: Wed May 4 09:37:06 2022 -0700

    [SPARK-38891][SQL] Skipping allocating vector for repetition & definition 
levels when possible
    
    ### What changes were proposed in this pull request?
    
    This PR adds two optimization on the vectorized Parquet reader for complex 
types:
    - avoid allocating vectors for repetition and definition levels whenever 
can be applied
    - avoid reading definition levels whenever can be applied
    
    ### Why are the changes needed?
    
    At the moment, Spark will allocate vectors for repetition and definition 
levels, and also read definition levels even if it's not necessary, for 
instance, when reading primitive types. This may add extra memory footprint 
especially when reading wide tables. Therefore, we should avoid them if 
possible.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests.
    
    Closes #36202 from sunchao/SPARK-38891.
    
    Authored-by: Chao Sun <sunc...@apple.com>
    Signed-off-by: Chao Sun <sunc...@apple.com>
---
 .../datasources/parquet/ParquetColumnVector.java   | 39 +++++++---
 .../parquet/VectorizedRleValuesReader.java         | 88 +++++++++++++++++++++-
 2 files changed, 115 insertions(+), 12 deletions(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
index 4b29520d30f..c8399d9137f 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetColumnVector.java
@@ -61,7 +61,16 @@ final class ParquetColumnVector {
       int capacity,
       MemoryMode memoryMode,
       Set<ParquetColumn> missingColumns) {
+    this(column, vector, capacity, memoryMode, missingColumns, true);
+  }
 
+  ParquetColumnVector(
+      ParquetColumn column,
+      WritableColumnVector vector,
+      int capacity,
+      MemoryMode memoryMode,
+      Set<ParquetColumn> missingColumns,
+      boolean isTopLevel) {
     DataType sparkType = column.sparkType();
     if (!sparkType.sameType(vector.dataType())) {
       throw new IllegalArgumentException("Spark type: " + sparkType +
@@ -79,20 +88,27 @@ final class ParquetColumnVector {
     }
 
     if (isPrimitive) {
-      // TODO: avoid allocating these if not necessary, for instance, the node 
is of top-level
-      //  and is not repeated, or the node is not top-level but its max 
repetition level is 0.
-      repetitionLevels = allocateLevelsVector(capacity, memoryMode);
-      definitionLevels = allocateLevelsVector(capacity, memoryMode);
+      if (column.repetitionLevel() > 0) {
+        repetitionLevels = allocateLevelsVector(capacity, memoryMode);
+      }
+      // We don't need to create and store definition levels if the column is 
top-level.
+      if (!isTopLevel) {
+        definitionLevels = allocateLevelsVector(capacity, memoryMode);
+      }
     } else {
       Preconditions.checkArgument(column.children().size() == 
vector.getNumChildren());
+      boolean allChildrenAreMissing = true;
+
       for (int i = 0; i < column.children().size(); i++) {
         ParquetColumnVector childCv = new 
ParquetColumnVector(column.children().apply(i),
-          vector.getChild(i), capacity, memoryMode, missingColumns);
+          vector.getChild(i), capacity, memoryMode, missingColumns, false);
         children.add(childCv);
 
+
         // Only use levels from non-missing child, this can happen if only 
some but not all
         // fields of a struct are missing.
         if (!childCv.vector.isAllNull()) {
+          allChildrenAreMissing = false;
           this.repetitionLevels = childCv.repetitionLevels;
           this.definitionLevels = childCv.definitionLevels;
         }
@@ -100,7 +116,7 @@ final class ParquetColumnVector {
 
       // This can happen if all the fields of a struct are missing, in which 
case we should mark
       // the struct itself as a missing column
-      if (repetitionLevels == null) {
+      if (allChildrenAreMissing) {
         vector.setAllNull();
       }
     }
@@ -163,8 +179,12 @@ final class ParquetColumnVector {
     if (vector.isAllNull()) return;
 
     vector.reset();
-    repetitionLevels.reset();
-    definitionLevels.reset();
+    if (repetitionLevels != null) {
+      repetitionLevels.reset();
+    }
+    if (definitionLevels != null) {
+      definitionLevels.reset();
+    }
     for (ParquetColumnVector child : children) {
       child.reset();
     }
@@ -289,7 +309,8 @@ final class ParquetColumnVector {
     vector.reserve(definitionLevels.getElementsAppended());
 
     int rowId = 0;
-    boolean hasRepetitionLevels = repetitionLevels.getElementsAppended() > 0;
+    boolean hasRepetitionLevels =
+      repetitionLevels != null && repetitionLevels.getElementsAppended() > 0;
     for (int i = 0; i < definitionLevels.getElementsAppended(); i++) {
       // If repetition level > maxRepetitionLevel, the value is a nested 
element (e.g., an array
       // element in struct<array<int>>), and we should skip the definition 
level since it doesn't
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
index 2cc763a5b72..12fe5697954 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -172,7 +172,11 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
       WritableColumnVector defLevels,
       VectorizedValuesReader valueReader,
       ParquetVectorUpdater updater) {
-    readBatchInternal(state, values, values, defLevels, valueReader, updater);
+    if (defLevels == null) {
+      readBatchInternal(state, values, values, valueReader, updater);
+    } else {
+      readBatchInternalWithDefLevels(state, values, values, defLevels, 
valueReader, updater);
+    }
   }
 
   /**
@@ -185,11 +189,89 @@ public final class VectorizedRleValuesReader extends 
ValuesReader
       WritableColumnVector nulls,
       WritableColumnVector defLevels,
       VectorizedValuesReader valueReader) {
-    readBatchInternal(state, values, nulls, defLevels, valueReader,
-      new ParquetVectorUpdaterFactory.IntegerUpdater());
+    if (defLevels == null) {
+      readBatchInternal(state, values, nulls, valueReader,
+        new ParquetVectorUpdaterFactory.IntegerUpdater());
+    } else {
+      readBatchInternalWithDefLevels(state, values, nulls, defLevels, 
valueReader,
+        new ParquetVectorUpdaterFactory.IntegerUpdater());
+    }
   }
 
   private void readBatchInternal(
+      ParquetReadState state,
+      WritableColumnVector values,
+      WritableColumnVector nulls,
+      VectorizedValuesReader valueReader,
+      ParquetVectorUpdater updater) {
+
+    long rowId = state.rowId;
+    int leftInBatch = state.rowsToReadInBatch;
+    int leftInPage = state.valuesToReadInPage;
+
+    while (leftInBatch > 0 && leftInPage > 0) {
+      if (currentCount == 0 && !readNextGroup()) break;
+      int n = Math.min(leftInBatch, Math.min(leftInPage, this.currentCount));
+
+      long rangeStart = state.currentRangeStart();
+      long rangeEnd = state.currentRangeEnd();
+
+      if (rowId + n < rangeStart) {
+        skipValues(n, state, valueReader, updater);
+        rowId += n;
+        leftInPage -= n;
+      } else if (rowId > rangeEnd) {
+        state.nextRange();
+      } else {
+        // The range [rowId, rowId + n) overlaps with the current row range in 
state
+        long start = Math.max(rangeStart, rowId);
+        long end = Math.min(rangeEnd, rowId + n - 1);
+
+        // Skip the part [rowId, start)
+        int toSkip = (int) (start - rowId);
+        if (toSkip > 0) {
+          skipValues(toSkip, state, valueReader, updater);
+          rowId += toSkip;
+          leftInPage -= toSkip;
+        }
+
+        // Read the part [start, end]
+        n = (int) (end - start + 1);
+
+        switch (mode) {
+          case RLE:
+            if (currentValue == state.maxDefinitionLevel) {
+              updater.readValues(n, state.valueOffset, values, valueReader);
+            } else {
+              nulls.putNulls(state.valueOffset, n);
+            }
+            state.valueOffset += n;
+            break;
+          case PACKED:
+            for (int i = 0; i < n; ++i) {
+              int currentValue = currentBuffer[currentBufferIdx++];
+              if (currentValue == state.maxDefinitionLevel) {
+                updater.readValue(state.valueOffset++, values, valueReader);
+              } else {
+                nulls.putNull(state.valueOffset++);
+              }
+            }
+            break;
+        }
+        state.levelOffset += n;
+        leftInBatch -= n;
+        rowId += n;
+        leftInPage -= n;
+        currentCount -= n;
+      }
+    }
+
+    state.rowsToReadInBatch = leftInBatch;
+    state.valuesToReadInPage = leftInPage;
+    state.rowId = rowId;
+  }
+
+  private void readBatchInternalWithDefLevels(
       ParquetReadState state,
       WritableColumnVector values,
       WritableColumnVector nulls,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to