sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r659977651



##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##########
@@ -156,55 +156,81 @@ public int readInteger() {
   }
 
   /**
-   * Reads `total` ints into `c` filling them in starting at `c[rowId]`. This 
reader
-   * reads the definition levels and then will read from `data` for the 
non-null values.
-   * If the value is null, c will be populated with `nullValue`. Note that 
`nullValue` is only
-   * necessary for readIntegers because we also use it to decode dictionaryIds 
and want to make
-   * sure it always has a value in range.
-   *
-   * This is a batched version of this logic:
-   *  if (this.readInt() == level) {
-   *    c[rowId] = data.readInteger();
-   *  } else {
-   *    c[rowId] = null;
-   *  }
+   * Reads a batch of values into vector `values`, using `valueReader`. The 
related states such
+   * as row index, offset, number of values left in the batch and page, etc, 
are tracked by
+   * `state`. The type-specific `updater` is used to update or skip values.
+   * <p>
+   * This reader reads the definition levels and then will read from 
`valueReader` for the
+   * non-null values. If the value is null, `values` will be populated with 
null value.
    */
   public void readBatch(
       ParquetReadState state,
       WritableColumnVector values,
       VectorizedValuesReader valueReader,
       ParquetVectorUpdater updater) throws IOException {
     int offset = state.offset;
-    int left = Math.min(state.valuesToReadInBatch, state.valuesToReadInPage);
+    long rowId = state.rowId;
+    int leftInBatch = state.valuesToReadInBatch;
+    int leftInPage = state.valuesToReadInPage;
 
-    while (left > 0) {
+    while (leftInBatch > 0 && leftInPage > 0) {
       if (this.currentCount == 0) this.readNextGroup();
-      int n = Math.min(left, this.currentCount);
-
-      switch (mode) {
-        case RLE:
-          if (currentValue == state.maxDefinitionLevel) {
-            updater.updateBatch(n, offset, values, valueReader);
-          } else {
-            values.putNulls(offset, n);
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; ++i) {
-            if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) 
{
-              updater.update(offset + i, values, valueReader);
+      int n = Math.min(leftInBatch, Math.min(leftInPage, this.currentCount));
+
+      long rangeStart = state.currentRangeStart();
+      long rangeEnd = state.currentRangeEnd();
+
+      if (rowId + n < rangeStart) {
+        updater.skipBatch(n, valueReader);
+        advance(n);
+        rowId += n;
+        leftInPage -= n;

Review comment:
       No, because we are skipping the batch here, and not adding the values 
into the batch.

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##########
@@ -33,31 +51,104 @@
   /** The remaining number of values to read in the current batch */
   int valuesToReadInBatch;
 
-  ParquetReadState(int maxDefinitionLevel) {
+  ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong 
rowIndexes) {
     this.maxDefinitionLevel = maxDefinitionLevel;
+    this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes);
+    nextRange();
   }
 
   /**
-   * Called at the beginning of reading a new batch.
+   * Construct a list of row ranges from the given `rowIndexes`. For example, 
suppose the
+   * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 
row ranges:

Review comment:
       It gives you an iterator so yeah generating them on the fly: 
https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java#L253.
 The indexes are generated from `Range` which is very similar to what we 
defined here. I'm planning to file a JIRA in parquet-mr to just return the 
original `Range`s so we don't have to do this step in Spark.
   
   

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##########
@@ -33,31 +51,104 @@
   /** The remaining number of values to read in the current batch */
   int valuesToReadInBatch;
 
-  ParquetReadState(int maxDefinitionLevel) {
+  ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong 
rowIndexes) {
     this.maxDefinitionLevel = maxDefinitionLevel;
+    this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes);
+    nextRange();
   }
 
   /**
-   * Called at the beginning of reading a new batch.
+   * Construct a list of row ranges from the given `rowIndexes`. For example, 
suppose the
+   * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 
row ranges:

Review comment:
       https://issues.apache.org/jira/browse/PARQUET-2061

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##########
@@ -33,31 +51,104 @@
   /** The remaining number of values to read in the current batch */
   int valuesToReadInBatch;
 
-  ParquetReadState(int maxDefinitionLevel) {
+  ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong 
rowIndexes) {
     this.maxDefinitionLevel = maxDefinitionLevel;
+    this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes);
+    nextRange();
   }
 
   /**
-   * Called at the beginning of reading a new batch.
+   * Construct a list of row ranges from the given `rowIndexes`. For example, 
suppose the
+   * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 
row ranges:
+   * `[0-2], [4-5], [7-9]`.
    */
-  void resetForBatch(int batchSize) {
+  private Iterator<RowRange> constructRanges(PrimitiveIterator.OfLong 
rowIndexes) {
+    List<RowRange> rowRanges = new ArrayList<>();
+    long currentStart = Long.MIN_VALUE;
+    long previous = Long.MIN_VALUE;
+
+    while (rowIndexes.hasNext()) {
+      long idx = rowIndexes.nextLong();
+      if (currentStart == Long.MIN_VALUE) {
+        currentStart = idx;
+      } else if (previous + 1 != idx) {
+        RowRange range = new RowRange(currentStart, previous);
+        rowRanges.add(range);
+        currentStart = idx;
+      }
+      previous = idx;
+    }
+
+    if (previous != Long.MIN_VALUE) {
+      rowRanges.add(new RowRange(currentStart, previous));
+    }
+
+    return rowRanges.iterator();
+  }
+
+  /**
+   * Must be called at the beginning of reading a new batch.
+   */
+  void resetForNewBatch(int batchSize) {
     this.offset = 0;
     this.valuesToReadInBatch = batchSize;
   }
 
   /**
-   * Called at the beginning of reading a new page.
+   * Must be called at the beginning of reading a new page.
    */
-  void resetForPage(int totalValuesInPage) {
+  void resetForNewPage(int totalValuesInPage, long pageFirstRowIndex) {
     this.valuesToReadInPage = totalValuesInPage;
+    this.rowId = pageFirstRowIndex;
   }
 
   /**
-   * Advance the current offset to the new values.
+   * Returns the start index of the current row range.
    */
-  void advanceOffset(int newOffset) {
+  long currentRangeStart() {
+    return currentRange.start;
+  }
+
+  /**
+   * Returns the end index of the current row range.
+   */
+  long currentRangeEnd() {
+    return currentRange.end;
+  }
+
+  /**
+   * Advance the current offset and rowId to the new values.
+   */
+  void advanceOffsetAndRowId(int newOffset, long newRowId) {
     valuesToReadInBatch -= (newOffset - offset);
-    valuesToReadInPage -= (newOffset - offset);
+    valuesToReadInPage -= (newRowId - rowId);

Review comment:
       I think this is not necessarily true: `rowId` tracks all the values that 
could be either read or skipped, while `offset` only tracks value that are read 
into the result column vector.

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
##########
@@ -151,26 +159,28 @@ void readBatch(int total, WritableColumnVector column) 
throws IOException {
       // page.
       dictionaryIds = column.reserveDictionaryIds(total);
     }
-    readState.resetForBatch(total);
+    readState.resetForNewBatch(total);
     while (readState.valuesToReadInBatch > 0) {
-      // Compute the number of values we want to read in this page.
       if (readState.valuesToReadInPage == 0) {
         int pageValueCount = readPage();
-        readState.resetForPage(pageValueCount);
+        readState.resetForNewPage(pageValueCount, pageFirstRowIndex);
       }
       PrimitiveType.PrimitiveTypeName typeName =
           descriptor.getPrimitiveType().getPrimitiveTypeName();
       if (isCurrentPageDictionaryEncoded) {
         // Save starting offset in case we need to decode dictionary IDs.
         int startOffset = readState.offset;
+        // Save starting row index so we can check if we need to eagerly 
decode dict ids later
+        long startRowId = readState.rowId;
 
         // Read and decode dictionary ids.
         defColumn.readIntegers(readState, dictionaryIds, column,
           (VectorizedValuesReader) dataColumn);
 
         // TIMESTAMP_MILLIS encoded as INT64 can't be lazily decoded as we 
need to post process
         // the values to add microseconds precision.
-        if (column.hasDictionary() || (startOffset == 0 && 
isLazyDecodingSupported(typeName))) {
+        if (column.hasDictionary() || (startRowId == pageFirstRowIndex &&

Review comment:
       Oh yeah, I need to update the comment too

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
##########
@@ -295,6 +307,8 @@ private int readPageV1(DataPageV1 page) throws IOException {
   }
 
   private int readPageV2(DataPageV2 page) throws IOException {
+    this.pageFirstRowIndex = page.getFirstRowIndex().orElse(0L);

Review comment:
       Good point. Let me do that.

##########
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##########
@@ -156,55 +156,81 @@ public int readInteger() {
   }
 
   /**
-   * Reads `total` ints into `c` filling them in starting at `c[rowId]`. This 
reader
-   * reads the definition levels and then will read from `data` for the 
non-null values.
-   * If the value is null, c will be populated with `nullValue`. Note that 
`nullValue` is only
-   * necessary for readIntegers because we also use it to decode dictionaryIds 
and want to make
-   * sure it always has a value in range.
-   *
-   * This is a batched version of this logic:
-   *  if (this.readInt() == level) {
-   *    c[rowId] = data.readInteger();
-   *  } else {
-   *    c[rowId] = null;
-   *  }
+   * Reads a batch of values into vector `values`, using `valueReader`. The 
related states such
+   * as row index, offset, number of values left in the batch and page, etc, 
are tracked by
+   * `state`. The type-specific `updater` is used to update or skip values.
+   * <p>
+   * This reader reads the definition levels and then will read from 
`valueReader` for the
+   * non-null values. If the value is null, `values` will be populated with 
null value.
    */
   public void readBatch(
       ParquetReadState state,
       WritableColumnVector values,
       VectorizedValuesReader valueReader,
       ParquetVectorUpdater updater) throws IOException {
     int offset = state.offset;
-    int left = Math.min(state.valuesToReadInBatch, state.valuesToReadInPage);
+    long rowId = state.rowId;
+    int leftInBatch = state.valuesToReadInBatch;
+    int leftInPage = state.valuesToReadInPage;
 
-    while (left > 0) {
+    while (leftInBatch > 0 && leftInPage > 0) {
       if (this.currentCount == 0) this.readNextGroup();
-      int n = Math.min(left, this.currentCount);
-
-      switch (mode) {
-        case RLE:
-          if (currentValue == state.maxDefinitionLevel) {
-            updater.updateBatch(n, offset, values, valueReader);
-          } else {
-            values.putNulls(offset, n);
-          }
-          break;
-        case PACKED:
-          for (int i = 0; i < n; ++i) {
-            if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) 
{
-              updater.update(offset + i, values, valueReader);
+      int n = Math.min(leftInBatch, Math.min(leftInPage, this.currentCount));
+
+      long rangeStart = state.currentRangeStart();
+      long rangeEnd = state.currentRangeEnd();
+
+      if (rowId + n < rangeStart) {
+        updater.skipBatch(n, valueReader);
+        advance(n);
+        rowId += n;
+        leftInPage -= n;

Review comment:
       > I see, so there are 2 actions here: ...
   
   If you mean `readBatch`, then yes it does the two bullets above. For 
`skipBatch` it just skip the next `n` values from `valueReader`.
   
   @viirya @cloud-fan Yeah I can change the method name to `skipValues` or 
`skipFromReader` if that's more clearer :) my preference, though, is to keep 
this close to the `readBatch` above. So how about I update the 3 methods in 
`ParquetVectorUpdater` to be: `readValues`, `readValue` and `skipValues`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to