[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-30 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r661724331



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##
@@ -156,55 +156,81 @@ public int readInteger() {
   }
 
   /**
-   * Reads `total` ints into `c` filling them in starting at `c[rowId]`. This 
reader
-   * reads the definition levels and then will read from `data` for the 
non-null values.
-   * If the value is null, c will be populated with `nullValue`. Note that 
`nullValue` is only
-   * necessary for readIntegers because we also use it to decode dictionaryIds 
and want to make
-   * sure it always has a value in range.
-   *
-   * This is a batched version of this logic:
-   *  if (this.readInt() == level) {
-   *c[rowId] = data.readInteger();
-   *  } else {
-   *c[rowId] = null;
-   *  }
+   * Reads a batch of values into vector `values`, using `valueReader`. The 
related states such
+   * as row index, offset, number of values left in the batch and page, etc, 
are tracked by
+   * `state`. The type-specific `updater` is used to update or skip values.
+   * 
+   * This reader reads the definition levels and then will read from 
`valueReader` for the
+   * non-null values. If the value is null, `values` will be populated with 
null value.
*/
   public void readBatch(
   ParquetReadState state,
   WritableColumnVector values,
   VectorizedValuesReader valueReader,
   ParquetVectorUpdater updater) throws IOException {
 int offset = state.offset;
-int left = Math.min(state.valuesToReadInBatch, state.valuesToReadInPage);
+long rowId = state.rowId;
+int leftInBatch = state.valuesToReadInBatch;
+int leftInPage = state.valuesToReadInPage;
 
-while (left > 0) {
+while (leftInBatch > 0 && leftInPage > 0) {
   if (this.currentCount == 0) this.readNextGroup();
-  int n = Math.min(left, this.currentCount);
-
-  switch (mode) {
-case RLE:
-  if (currentValue == state.maxDefinitionLevel) {
-updater.updateBatch(n, offset, values, valueReader);
-  } else {
-values.putNulls(offset, n);
-  }
-  break;
-case PACKED:
-  for (int i = 0; i < n; ++i) {
-if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) 
{
-  updater.update(offset + i, values, valueReader);
+  int n = Math.min(leftInBatch, Math.min(leftInPage, this.currentCount));
+
+  long rangeStart = state.currentRangeStart();
+  long rangeEnd = state.currentRangeEnd();
+
+  if (rowId + n < rangeStart) {
+updater.skipValues(n, valueReader);
+advance(n);
+rowId += n;
+leftInPage -= n;
+  } else if (rowId > rangeEnd) {
+state.nextRange();
+  } else {
+// the range [rowId, rowId + n) overlaps with the current row range in 
state
+long start = Math.max(rangeStart, rowId);
+long end = Math.min(rangeEnd, rowId + n - 1);
+
+// skip the part [rowId, start)
+int toSkip = (int) (start - rowId);
+if (toSkip > 0) {

Review comment:
   `start` must >= `rowId` because it is defined as `long start = 
Math.max(rangeStart, rowId)`. Therefore, the case 1 `start < rowId` will never 
happen.
   
   The second case, `(start - rowId) > Int.MaxValue`, can only occur if `start` 
is equal to `rangeStart`. In this case we also know that `rangeStart <= rowId + 
n` (from line 183) and `n` is `Math.min(leftInBatch, Math.min(leftInPage, 
this.currentCount))` which is guaranteed to be within integer range. Therefore, 
the cast is safe.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-30 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r661709891



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdater.java
##
@@ -30,20 +30,28 @@
* @param values destination values vector
* @param valuesReader reader to read values from
*/
-  void updateBatch(
+  void readValues(
   int total,
   int offset,
   WritableColumnVector values,
   VectorizedValuesReader valuesReader);
 
+  /**
+   * Skip a batch of `total` values from `valuesReader`.
+   *
+   * @param total total number of values to skip
+   * @param valuesReader reader to skip values from
+   */
+  void skipValues(int total, VectorizedValuesReader valuesReader);

Review comment:
   Updated the PR description. Regarding the comment, IMO even though the 
method name is changed, the comment is still accurate in expressing what the 
method does.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-30 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r661706809



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##
@@ -33,31 +58,104 @@
   /** The remaining number of values to read in the current batch */
   int valuesToReadInBatch;
 
-  ParquetReadState(int maxDefinitionLevel) {
+  ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong 
rowIndexes) {
 this.maxDefinitionLevel = maxDefinitionLevel;
+this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes);

Review comment:
   Yes that's a fair point. Will do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-29 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r659977651



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##
@@ -156,55 +156,81 @@ public int readInteger() {
   }
 
   /**
-   * Reads `total` ints into `c` filling them in starting at `c[rowId]`. This 
reader
-   * reads the definition levels and then will read from `data` for the 
non-null values.
-   * If the value is null, c will be populated with `nullValue`. Note that 
`nullValue` is only
-   * necessary for readIntegers because we also use it to decode dictionaryIds 
and want to make
-   * sure it always has a value in range.
-   *
-   * This is a batched version of this logic:
-   *  if (this.readInt() == level) {
-   *c[rowId] = data.readInteger();
-   *  } else {
-   *c[rowId] = null;
-   *  }
+   * Reads a batch of values into vector `values`, using `valueReader`. The 
related states such
+   * as row index, offset, number of values left in the batch and page, etc, 
are tracked by
+   * `state`. The type-specific `updater` is used to update or skip values.
+   * 
+   * This reader reads the definition levels and then will read from 
`valueReader` for the
+   * non-null values. If the value is null, `values` will be populated with 
null value.
*/
   public void readBatch(
   ParquetReadState state,
   WritableColumnVector values,
   VectorizedValuesReader valueReader,
   ParquetVectorUpdater updater) throws IOException {
 int offset = state.offset;
-int left = Math.min(state.valuesToReadInBatch, state.valuesToReadInPage);
+long rowId = state.rowId;
+int leftInBatch = state.valuesToReadInBatch;
+int leftInPage = state.valuesToReadInPage;
 
-while (left > 0) {
+while (leftInBatch > 0 && leftInPage > 0) {
   if (this.currentCount == 0) this.readNextGroup();
-  int n = Math.min(left, this.currentCount);
-
-  switch (mode) {
-case RLE:
-  if (currentValue == state.maxDefinitionLevel) {
-updater.updateBatch(n, offset, values, valueReader);
-  } else {
-values.putNulls(offset, n);
-  }
-  break;
-case PACKED:
-  for (int i = 0; i < n; ++i) {
-if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) 
{
-  updater.update(offset + i, values, valueReader);
+  int n = Math.min(leftInBatch, Math.min(leftInPage, this.currentCount));
+
+  long rangeStart = state.currentRangeStart();
+  long rangeEnd = state.currentRangeEnd();
+
+  if (rowId + n < rangeStart) {
+updater.skipBatch(n, valueReader);
+advance(n);
+rowId += n;
+leftInPage -= n;

Review comment:
   No, because we are skipping the batch here, and not adding the values 
into the batch.

##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##
@@ -33,31 +51,104 @@
   /** The remaining number of values to read in the current batch */
   int valuesToReadInBatch;
 
-  ParquetReadState(int maxDefinitionLevel) {
+  ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong 
rowIndexes) {
 this.maxDefinitionLevel = maxDefinitionLevel;
+this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes);
+nextRange();
   }
 
   /**
-   * Called at the beginning of reading a new batch.
+   * Construct a list of row ranges from the given `rowIndexes`. For example, 
suppose the
+   * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 
row ranges:

Review comment:
   It gives you an iterator so yeah generating them on the fly: 
https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java#L253.
 The indexes are generated from `Range` which is very similar to what we 
defined here. I'm planning to file a JIRA in parquet-mr to just return the 
original `Range`s so we don't have to do this step in Spark.
   
   

##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##
@@ -33,31 +51,104 @@
   /** The remaining number of values to read in the current batch */
   int valuesToReadInBatch;
 
-  ParquetReadState(int maxDefinitionLevel) {
+  ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong 
rowIndexes) {
 this.maxDefinitionLevel = maxDefinitionLevel;
+this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes);
+nextRange();
   }
 
   /**
-   * Called at the beginning of reading a new batch.
+   * Construct a list of row ranges from the given `rowIndexes`. For example, 
suppose the
+   * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 
row ranges:

Review comment:
   https://issues.apache.org/jira/bro

[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-28 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r660286736



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##
@@ -156,55 +156,81 @@ public int readInteger() {
   }
 
   /**
-   * Reads `total` ints into `c` filling them in starting at `c[rowId]`. This 
reader
-   * reads the definition levels and then will read from `data` for the 
non-null values.
-   * If the value is null, c will be populated with `nullValue`. Note that 
`nullValue` is only
-   * necessary for readIntegers because we also use it to decode dictionaryIds 
and want to make
-   * sure it always has a value in range.
-   *
-   * This is a batched version of this logic:
-   *  if (this.readInt() == level) {
-   *c[rowId] = data.readInteger();
-   *  } else {
-   *c[rowId] = null;
-   *  }
+   * Reads a batch of values into vector `values`, using `valueReader`. The 
related states such
+   * as row index, offset, number of values left in the batch and page, etc, 
are tracked by
+   * `state`. The type-specific `updater` is used to update or skip values.
+   * 
+   * This reader reads the definition levels and then will read from 
`valueReader` for the
+   * non-null values. If the value is null, `values` will be populated with 
null value.
*/
   public void readBatch(
   ParquetReadState state,
   WritableColumnVector values,
   VectorizedValuesReader valueReader,
   ParquetVectorUpdater updater) throws IOException {
 int offset = state.offset;
-int left = Math.min(state.valuesToReadInBatch, state.valuesToReadInPage);
+long rowId = state.rowId;
+int leftInBatch = state.valuesToReadInBatch;
+int leftInPage = state.valuesToReadInPage;
 
-while (left > 0) {
+while (leftInBatch > 0 && leftInPage > 0) {
   if (this.currentCount == 0) this.readNextGroup();
-  int n = Math.min(left, this.currentCount);
-
-  switch (mode) {
-case RLE:
-  if (currentValue == state.maxDefinitionLevel) {
-updater.updateBatch(n, offset, values, valueReader);
-  } else {
-values.putNulls(offset, n);
-  }
-  break;
-case PACKED:
-  for (int i = 0; i < n; ++i) {
-if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) 
{
-  updater.update(offset + i, values, valueReader);
+  int n = Math.min(leftInBatch, Math.min(leftInPage, this.currentCount));
+
+  long rangeStart = state.currentRangeStart();
+  long rangeEnd = state.currentRangeEnd();
+
+  if (rowId + n < rangeStart) {
+updater.skipBatch(n, valueReader);
+advance(n);
+rowId += n;
+leftInPage -= n;

Review comment:
   > I see, so there are 2 actions here: ...
   
   If you mean `readBatch`, then yes it does the two bullets above. For 
`skipBatch` it just skip the next `n` values from `valueReader`.
   
   @viirya @cloud-fan Yeah I can change the method name to `skipValues` or 
`skipFromReader` if that's more clearer :) my preference, though, is to keep 
this close to the `readBatch` above. So how about I update the 3 methods in 
`ParquetVectorUpdater` to be: `readValues`, `readValue` and `skipValues`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-28 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r660234205



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
##
@@ -295,6 +307,8 @@ private int readPageV1(DataPageV1 page) throws IOException {
   }
 
   private int readPageV2(DataPageV2 page) throws IOException {
+this.pageFirstRowIndex = page.getFirstRowIndex().orElse(0L);

Review comment:
   Good point. Let me do that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-28 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r660177273



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
##
@@ -151,26 +159,28 @@ void readBatch(int total, WritableColumnVector column) 
throws IOException {
   // page.
   dictionaryIds = column.reserveDictionaryIds(total);
 }
-readState.resetForBatch(total);
+readState.resetForNewBatch(total);
 while (readState.valuesToReadInBatch > 0) {
-  // Compute the number of values we want to read in this page.
   if (readState.valuesToReadInPage == 0) {
 int pageValueCount = readPage();
-readState.resetForPage(pageValueCount);
+readState.resetForNewPage(pageValueCount, pageFirstRowIndex);
   }
   PrimitiveType.PrimitiveTypeName typeName =
   descriptor.getPrimitiveType().getPrimitiveTypeName();
   if (isCurrentPageDictionaryEncoded) {
 // Save starting offset in case we need to decode dictionary IDs.
 int startOffset = readState.offset;
+// Save starting row index so we can check if we need to eagerly 
decode dict ids later
+long startRowId = readState.rowId;
 
 // Read and decode dictionary ids.
 defColumn.readIntegers(readState, dictionaryIds, column,
   (VectorizedValuesReader) dataColumn);
 
 // TIMESTAMP_MILLIS encoded as INT64 can't be lazily decoded as we 
need to post process
 // the values to add microseconds precision.
-if (column.hasDictionary() || (startOffset == 0 && 
isLazyDecodingSupported(typeName))) {
+if (column.hasDictionary() || (startRowId == pageFirstRowIndex &&

Review comment:
   Oh yeah, I need to update the comment too




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-28 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r660177036



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##
@@ -33,31 +51,104 @@
   /** The remaining number of values to read in the current batch */
   int valuesToReadInBatch;
 
-  ParquetReadState(int maxDefinitionLevel) {
+  ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong 
rowIndexes) {
 this.maxDefinitionLevel = maxDefinitionLevel;
+this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes);
+nextRange();
   }
 
   /**
-   * Called at the beginning of reading a new batch.
+   * Construct a list of row ranges from the given `rowIndexes`. For example, 
suppose the
+   * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 
row ranges:
+   * `[0-2], [4-5], [7-9]`.
*/
-  void resetForBatch(int batchSize) {
+  private Iterator constructRanges(PrimitiveIterator.OfLong 
rowIndexes) {
+List rowRanges = new ArrayList<>();
+long currentStart = Long.MIN_VALUE;
+long previous = Long.MIN_VALUE;
+
+while (rowIndexes.hasNext()) {
+  long idx = rowIndexes.nextLong();
+  if (currentStart == Long.MIN_VALUE) {
+currentStart = idx;
+  } else if (previous + 1 != idx) {
+RowRange range = new RowRange(currentStart, previous);
+rowRanges.add(range);
+currentStart = idx;
+  }
+  previous = idx;
+}
+
+if (previous != Long.MIN_VALUE) {
+  rowRanges.add(new RowRange(currentStart, previous));
+}
+
+return rowRanges.iterator();
+  }
+
+  /**
+   * Must be called at the beginning of reading a new batch.
+   */
+  void resetForNewBatch(int batchSize) {
 this.offset = 0;
 this.valuesToReadInBatch = batchSize;
   }
 
   /**
-   * Called at the beginning of reading a new page.
+   * Must be called at the beginning of reading a new page.
*/
-  void resetForPage(int totalValuesInPage) {
+  void resetForNewPage(int totalValuesInPage, long pageFirstRowIndex) {
 this.valuesToReadInPage = totalValuesInPage;
+this.rowId = pageFirstRowIndex;
   }
 
   /**
-   * Advance the current offset to the new values.
+   * Returns the start index of the current row range.
*/
-  void advanceOffset(int newOffset) {
+  long currentRangeStart() {
+return currentRange.start;
+  }
+
+  /**
+   * Returns the end index of the current row range.
+   */
+  long currentRangeEnd() {
+return currentRange.end;
+  }
+
+  /**
+   * Advance the current offset and rowId to the new values.
+   */
+  void advanceOffsetAndRowId(int newOffset, long newRowId) {
 valuesToReadInBatch -= (newOffset - offset);
-valuesToReadInPage -= (newOffset - offset);
+valuesToReadInPage -= (newRowId - rowId);

Review comment:
   I think this is not necessarily true: `rowId` tracks all the values that 
could be either read or skipped, while `offset` only tracks value that are read 
into the result column vector.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-28 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r659989472



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##
@@ -33,31 +51,104 @@
   /** The remaining number of values to read in the current batch */
   int valuesToReadInBatch;
 
-  ParquetReadState(int maxDefinitionLevel) {
+  ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong 
rowIndexes) {
 this.maxDefinitionLevel = maxDefinitionLevel;
+this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes);
+nextRange();
   }
 
   /**
-   * Called at the beginning of reading a new batch.
+   * Construct a list of row ranges from the given `rowIndexes`. For example, 
suppose the
+   * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 
row ranges:

Review comment:
   https://issues.apache.org/jira/browse/PARQUET-2061




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-28 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r659984532



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##
@@ -33,31 +51,104 @@
   /** The remaining number of values to read in the current batch */
   int valuesToReadInBatch;
 
-  ParquetReadState(int maxDefinitionLevel) {
+  ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong 
rowIndexes) {
 this.maxDefinitionLevel = maxDefinitionLevel;
+this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes);
+nextRange();
   }
 
   /**
-   * Called at the beginning of reading a new batch.
+   * Construct a list of row ranges from the given `rowIndexes`. For example, 
suppose the
+   * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 
row ranges:

Review comment:
   It gives you an iterator so yeah generating them on the fly: 
https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java#L253.
 The indexes are generated from `Range` which is very similar to what we 
defined here. I'm planning to file a JIRA in parquet-mr to just return the 
original `Range`s so we don't have to do this step in Spark.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-28 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r659977651



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##
@@ -156,55 +156,81 @@ public int readInteger() {
   }
 
   /**
-   * Reads `total` ints into `c` filling them in starting at `c[rowId]`. This 
reader
-   * reads the definition levels and then will read from `data` for the 
non-null values.
-   * If the value is null, c will be populated with `nullValue`. Note that 
`nullValue` is only
-   * necessary for readIntegers because we also use it to decode dictionaryIds 
and want to make
-   * sure it always has a value in range.
-   *
-   * This is a batched version of this logic:
-   *  if (this.readInt() == level) {
-   *c[rowId] = data.readInteger();
-   *  } else {
-   *c[rowId] = null;
-   *  }
+   * Reads a batch of values into vector `values`, using `valueReader`. The 
related states such
+   * as row index, offset, number of values left in the batch and page, etc, 
are tracked by
+   * `state`. The type-specific `updater` is used to update or skip values.
+   * 
+   * This reader reads the definition levels and then will read from 
`valueReader` for the
+   * non-null values. If the value is null, `values` will be populated with 
null value.
*/
   public void readBatch(
   ParquetReadState state,
   WritableColumnVector values,
   VectorizedValuesReader valueReader,
   ParquetVectorUpdater updater) throws IOException {
 int offset = state.offset;
-int left = Math.min(state.valuesToReadInBatch, state.valuesToReadInPage);
+long rowId = state.rowId;
+int leftInBatch = state.valuesToReadInBatch;
+int leftInPage = state.valuesToReadInPage;
 
-while (left > 0) {
+while (leftInBatch > 0 && leftInPage > 0) {
   if (this.currentCount == 0) this.readNextGroup();
-  int n = Math.min(left, this.currentCount);
-
-  switch (mode) {
-case RLE:
-  if (currentValue == state.maxDefinitionLevel) {
-updater.updateBatch(n, offset, values, valueReader);
-  } else {
-values.putNulls(offset, n);
-  }
-  break;
-case PACKED:
-  for (int i = 0; i < n; ++i) {
-if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) 
{
-  updater.update(offset + i, values, valueReader);
+  int n = Math.min(leftInBatch, Math.min(leftInPage, this.currentCount));
+
+  long rangeStart = state.currentRangeStart();
+  long rangeEnd = state.currentRangeEnd();
+
+  if (rowId + n < rangeStart) {
+updater.skipBatch(n, valueReader);
+advance(n);
+rowId += n;
+leftInPage -= n;

Review comment:
   No, because we are skipping the batch here, and not adding the values 
into the batch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-26 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r659191689



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##
@@ -33,31 +51,107 @@
   /** The remaining number of values to read in the current batch */
   int valuesToReadInBatch;
 
-  ParquetReadState(int maxDefinitionLevel) {
+  ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong 
rowIndexes) {
 this.maxDefinitionLevel = maxDefinitionLevel;
+this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes);
+nextRange();
   }
 
   /**
-   * Called at the beginning of reading a new batch.
+   * Construct a list of row ranges from the given `rowIndexes`. For example, 
suppose the
+   * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 
row ranges:
+   * `[0-2], [4-5], [7-9]`.
*/
-  void resetForBatch(int batchSize) {
+  private Iterator constructRanges(PrimitiveIterator.OfLong 
rowIndexes) {
+List rowRanges = new ArrayList<>();
+long currentStart = Long.MIN_VALUE;
+long previous = Long.MIN_VALUE;
+
+while (rowIndexes.hasNext()) {
+  long idx = rowIndexes.nextLong();
+  if (previous == Long.MIN_VALUE) {
+currentStart = previous = idx;
+  } else if (previous + 1 != idx) {
+RowRange range = new RowRange(currentStart, previous);
+rowRanges.add(range);
+currentStart = previous = idx;
+  } else {
+previous = idx;
+  }

Review comment:
   Yeah good point. Will change.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-25 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r658990194



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##
@@ -17,13 +17,31 @@
 
 package org.apache.spark.sql.execution.datasources.parquet;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PrimitiveIterator;
+
 /**
  * Helper class to store intermediate state while reading a Parquet column 
chunk.
  */
 final class ParquetReadState {
-  /** Maximum definition level */
+  private static final RowRange MAX_ROW_RANGE = new RowRange(Long.MIN_VALUE, 
Long.MAX_VALUE);
+  private static final RowRange MIN_ROW_RANGE = new RowRange(Long.MAX_VALUE, 
Long.MIN_VALUE);
+
+  /** Iterator over all row ranges, only not-null if column index is present */
+  private final Iterator rowRanges;

Review comment:
   No. The list of row ranges is associated with a Parquet row group. For 
example, let's say you have two columns `c1:int` and `c2:bigint` in the row 
group, and the following pages:
   
   ```
 row index   0500   1000  1500
 ---
 c1 (int)| | | |
 ---
 c2 (bigint) |||||||
 ---
 0   250  500  750  1000 1250 1500
   ```
   
   Suppose the query is `SELECT * FROM tbl WHERE c1 = 750 AND c2 = 1100`
   This, when applied on `c1`, will produce row range `[500, 1000)`. When 
applied on `c2`, will produce row range `[1000, 1250)`. These two will be 
unioned into `[500, 1250)` and that is the row range for the whole row group.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-25 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r658990194



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##
@@ -17,13 +17,31 @@
 
 package org.apache.spark.sql.execution.datasources.parquet;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PrimitiveIterator;
+
 /**
  * Helper class to store intermediate state while reading a Parquet column 
chunk.
  */
 final class ParquetReadState {
-  /** Maximum definition level */
+  private static final RowRange MAX_ROW_RANGE = new RowRange(Long.MIN_VALUE, 
Long.MAX_VALUE);
+  private static final RowRange MIN_ROW_RANGE = new RowRange(Long.MAX_VALUE, 
Long.MIN_VALUE);
+
+  /** Iterator over all row ranges, only not-null if column index is present */
+  private final Iterator rowRanges;

Review comment:
   No. The list of row ranges is associated with a Parquet row group. For 
example, let's say you have two columns `c1:int` and `c2:bigint`, and the 
following pages:
   
   ```
 row index   0500   1000  1500
 ---
 c1 (int)| | | |
 ---
 c2 (bigint) |||||||
 ---
 0   250  500  750  1000 1250 1500
   ```
   
   Suppose the query is `SELECT * FROM tbl WHERE c1 = 750 AND c2 = 1100`
   This, when applied on `c1`, will produce row range `[500, 1000)`. When 
applied on `c2`, will produce row range `[1000, 1250)`. These two will be 
unioned into `[500, 1250)` and that is the row range for the whole row group.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-25 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r658990194



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##
@@ -17,13 +17,31 @@
 
 package org.apache.spark.sql.execution.datasources.parquet;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PrimitiveIterator;
+
 /**
  * Helper class to store intermediate state while reading a Parquet column 
chunk.
  */
 final class ParquetReadState {
-  /** Maximum definition level */
+  private static final RowRange MAX_ROW_RANGE = new RowRange(Long.MIN_VALUE, 
Long.MAX_VALUE);
+  private static final RowRange MIN_ROW_RANGE = new RowRange(Long.MAX_VALUE, 
Long.MIN_VALUE);
+
+  /** Iterator over all row ranges, only not-null if column index is present */
+  private final Iterator rowRanges;

Review comment:
   No. The list of row ranges is associated with a Parquet row group. For 
example, let's say you have two columns `c1:int` and `c2:bigint`, and the 
following pages:
   
   ```
 row index   0500   1000  1500
 ---
 c1 (int)| | | |
 ---
 c2 (bigint) |||||||
 ---
 0   250  500  750  1000 1250 1500
   ```
   
   Suppose the query is `SELECT * FROM tbl WHERE c1 = 750 AND c2 = 1100`
   This, when applied on `c1`, will produce row range `[500, 1000)`. When 
applied on `c2`, will produce row range `[1000, 1250)`. These two will be 
unioned into `[500, 1250)` and that is the row ranges for the whole row group.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-25 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r658990333



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##
@@ -33,31 +51,102 @@
   /** The remaining number of values to read in the current batch */
   int valuesToReadInBatch;
 
-  ParquetReadState(int maxDefinitionLevel) {
+  ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong 
rowIndexes) {
 this.maxDefinitionLevel = maxDefinitionLevel;
+this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes);
+nextRange();
+  }
+
+  private Iterator constructRanges(PrimitiveIterator.OfLong 
rowIndexes) {

Review comment:
   Yup will do




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-25 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r658990194



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##
@@ -17,13 +17,31 @@
 
 package org.apache.spark.sql.execution.datasources.parquet;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PrimitiveIterator;
+
 /**
  * Helper class to store intermediate state while reading a Parquet column 
chunk.
  */
 final class ParquetReadState {
-  /** Maximum definition level */
+  private static final RowRange MAX_ROW_RANGE = new RowRange(Long.MIN_VALUE, 
Long.MAX_VALUE);
+  private static final RowRange MIN_ROW_RANGE = new RowRange(Long.MAX_VALUE, 
Long.MIN_VALUE);
+
+  /** Iterator over all row ranges, only not-null if column index is present */
+  private final Iterator rowRanges;

Review comment:
   No. The list of row ranges is associated with a Parquet row group. For 
example, let's say you have two columns `c1:int` and `c2:bigint`, and the 
following pages:
   
   ```
 row index   0500   1000  1500
 ---
 c1 (int)| | | |
 ---
 c2 (bigint) |||||||
 ---
 0   250  500  750  1000 1250 1500
   ```
   
   Suppose the query is `SELECT * FROM tbl WHERE c1 = 750 AND c2 = 1100`
   This, when applied on `c1`, will produce row ranges `[500, 1000)`. When 
applied on `c2`, will produce row ranges `[1000, 1250)`. These two will be 
unioned into `[500, 1250)` and that is the row ranges for the whole row group.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-25 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r658923673



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##
@@ -17,13 +17,31 @@
 
 package org.apache.spark.sql.execution.datasources.parquet;
 
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PrimitiveIterator;
+
 /**
  * Helper class to store intermediate state while reading a Parquet column 
chunk.
  */
 final class ParquetReadState {
-  /** Maximum definition level */
+  private static final RowRange MAX_ROW_RANGE = new RowRange(Long.MIN_VALUE, 
Long.MAX_VALUE);
+  private static final RowRange MIN_ROW_RANGE = new RowRange(Long.MAX_VALUE, 
Long.MIN_VALUE);

Review comment:
   The `MIN_ROW_RANGE` here is only used as a place holder when we've 
iterated through all the ranges. It makes sure that we'll reject all row 
indexes that come after all the row ranges we've processed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-23 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r657471042



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
##
@@ -61,6 +61,14 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  @Override
+  public final void skipBooleans(int total) {
+// TODO: properly vectorize this

Review comment:
   Created https://issues.apache.org/jira/browse/SPARK-35867




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-23 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r657290651



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##
@@ -170,77 +170,135 @@ public int readInteger() {
*  }
*/
   public void readBatch(
-  int total,
-  int offset,
+  ParquetReadState state,
   WritableColumnVector values,
-  int maxDefinitionLevel,
   VectorizedValuesReader valueReader,
   ParquetVectorUpdater updater) throws IOException {
-int left = total;
-while (left > 0) {
+int offset = state.offset;
+long rowId = state.rowId;
+
+while (state.hasMoreInPage(offset, rowId)) {
   if (this.currentCount == 0) this.readNextGroup();
-  int n = Math.min(left, this.currentCount);
-  switch (mode) {
-case RLE:
-  if (currentValue == maxDefinitionLevel) {
-updater.updateBatch(n, offset, values, valueReader);
-  } else {
-values.putNulls(offset, n);
-  }
-  break;
-case PACKED:
-  for (int i = 0; i < n; ++i) {
-if (currentBuffer[currentBufferIdx++] == maxDefinitionLevel) {
-  updater.update(offset + i, values, valueReader);
+  int n = Math.min(state.valuesToReadInBatch + state.offset - offset, 
this.currentCount);

Review comment:
   because `state.valuesToReadInBatch` is the number of values left in the 
batch. Suppose the initial batch size is 1000, and `valuesToReadInBatch` is 
400. This means we've read 600 values and so `state.offset` and `offset` are 
both 600 at the beginning. We'll need to use `offset - state.offset` to get how 
many values we've read so far.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-23 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r657286411



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
##
@@ -294,6 +292,7 @@ private void initDataReader(Encoding dataEncoding, 
ByteBufferInputStream in) thr
 
   private void readPageV1(DataPageV1 page) throws IOException {
 this.pageValueCount = page.getValueCount();
+this.pageFirstRowIndex = page.getFirstRowIndex().orElse(0L);

Review comment:
   yea it can return 0, in which case it is the same as without column 
index and the whole page has to be read.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-23 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r657285733



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
##
@@ -174,24 +171,29 @@ void readBatch(int total, WritableColumnVector column) 
throws IOException {
   // page.
   dictionaryIds = column.reserveDictionaryIds(total);
 }
-while (total > 0) {
+readState.resetForBatch(total);
+while (readState.valuesToReadInBatch > 0) {
   // Compute the number of values we want to read in this page.
-  int leftInPage = (int) (endOfPageValueCount - valuesRead);
-  if (leftInPage == 0) {
+  if (readState.valuesToReadInPage == 0) {
 readPage();
-leftInPage = (int) (endOfPageValueCount - valuesRead);
+readState.resetForPage(pageValueCount, pageFirstRowIndex);
   }
-  int num = Math.min(total, leftInPage);
   PrimitiveType.PrimitiveTypeName typeName =
   descriptor.getPrimitiveType().getPrimitiveTypeName();
   if (isCurrentPageDictionaryEncoded) {
+boolean supportLazyDecoding = readState.rowId == pageFirstRowIndex &&

Review comment:
   This is just to check whether we are at the first iteration of the loop. 
Basically it checks whether lazy dictionary decoding is supported and if not, 
eagerly decode all the dictionary IDs read so far.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-23 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r657284056



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
##
@@ -95,6 +80,18 @@
*/
   private final ParquetVectorUpdaterFactory updaterFactory;
 
+  /**
+   * Helper struct to track intermediate states while reading Parquet pages in 
the column chunk.
+   */
+  private final ParquetReadState readState;
+
+  /**
+   * The index for the first row in the current page, among all rows across 
all pages in the
+   * column chunk for this reader. The value for this is 0 if there is no 
column index for the

Review comment:
   Yes it can happen. Perhaps I should rephrase it to: "If there is no 
column index, the value for this is 0".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-22 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r656479348



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PrimitiveIterator;
+
+/**
+ * Helper class to store intermediate state while reading a Parquet column 
chunk.
+ */
+final class ParquetReadState {
+  private static final RowRange MAX_ROW_RANGE = new RowRange(Long.MIN_VALUE, 
Long.MAX_VALUE);
+  private static final RowRange MIN_ROW_RANGE = new RowRange(Long.MAX_VALUE, 
Long.MIN_VALUE);
+
+  /** Iterator over all row ranges, only not-null if column index is present */
+  private final Iterator rowRanges;
+
+  /** The current row range */
+  private RowRange currentRange;
+
+  /** Maximum definition level */
+  int maxDefinitionLevel;
+
+  /** The current index overall all rows within the column chunk. This is used 
to check if the
+   * current row should be skipped by comparing the index against the row 
ranges. */
+  long rowId;
+
+  /** The offset to add the next value in the current batch */

Review comment:
   Thanks for the suggestion. I think that's more clear. Will change it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-22 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r655599949



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##
@@ -170,77 +170,135 @@ public int readInteger() {
*  }
*/
   public void readBatch(

Review comment:
   Sorry @cloud-fan, I should've add more context in the PR description. 
Let me try to add here and copy later to there.
   
   1. The column index filtering is largely implemented in parquet-mr (via 
classes such as `ColumnIndex` and `ColumnIndexFilter`), and the filtered 
Parquet pages are returned to Spark through the 
`ParquetFileReader.readNextFilteredRowGroup` and 
`ParquetFileReader.getFilteredRecordCount` API. Please see #31393 for the 
related changes in the vectorized reader path.
   2. Spark needs more work to handle mis-aligned Parquet pages returned from 
parquet-mr side, when there are multiple columns and their type width are 
different (e.g., int and bigint). For this issue, @lxian already gave a pretty 
good description in 
[SPARK-34859](https://issues.apache.org/jira/browse/SPARK-34859). To support 
the case, Spark needs to leverage the API 
[`PageReadStore.getRowIndexes`](https://javadoc.io/doc/org.apache.parquet/parquet-column/latest/org/apache/parquet/column/page/PageReadStore.html),
 which returns the indexes of all rows (note the difference between rows and 
values: for flat schema there is no difference between the two, but for complex 
schema they're different) after filtering within a Parquet row group. In 
addition, because there are gaps between pages, we'll need to know what is the 
index for the first row in a page, so we can keep consuming values from a page 
and skip them when they are not in the row indexes. This is provided by the 
`DataPage.getFirs
 tRowIndex` method.

##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PrimitiveIterator;
+
+/**
+ * Helper class to store intermediate state while reading a Parquet column 
chunk.
+ */
+final class ParquetReadState {

Review comment:
   Yes this occurred to me the other day as well :) I think it's a good 
idea. Let me move this refactoring part into a separate PR. Thanks.

##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##
@@ -170,77 +170,135 @@ public int readInteger() {
*  }
*/
   public void readBatch(

Review comment:
   Sorry @cloud-fan, I should've add more context in the PR description. 
Let me try to add here and copy later to there.
   
   1. The column index filtering is largely implemented in parquet-mr (via 
classes such as `ColumnIndex` and `ColumnIndexFilter`), and the filtered 
Parquet pages are returned to Spark through the 
`ParquetFileReader.readNextFilteredRowGroup` and 
`ParquetFileReader.getFilteredRecordCount` API. Please see #31393 for the 
related changes in the vectorized reader path.
   2. Spark needs more work to handle mis-aligned Parquet pages returned from 
parquet-mr side, when there are multiple columns and their type width are 
different (e.g., int and bigint). For this issue, @lxian already gave a pretty 
good description in 
[SPARK-34859](https://issues.apache.org/jira/browse/SPARK-34859). To support 
the case, Spark needs to leverage the API 
[`PageReadStore.getRowIndexes`](https://javadoc.io/doc/org.apache.parquet/parquet-column/latest/org/apache/parquet/column/page/PageReadStore.html),
 which returns the indexes of all rows (note the difference between rows and 
values: for flat schema there is no difference between the two, but for complex 
schema they're different) after filtering within a Parquet row group. In 
addition, because there are gaps between pages, we'll need to know what is the 
index for the first row in a page, so we can comparing indexes 

[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-21 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r655747755



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PrimitiveIterator;
+
+/**
+ * Helper class to store intermediate state while reading a Parquet column 
chunk.
+ */
+final class ParquetReadState {

Review comment:
   Opened #33006




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-21 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r655599949



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##
@@ -170,77 +170,135 @@ public int readInteger() {
*  }
*/
   public void readBatch(

Review comment:
   Sorry @cloud-fan, I should've add more context in the PR description. 
Let me try to add here and copy later to there.
   
   1. The column index filtering is largely implemented in parquet-mr (via 
classes such as `ColumnIndex` and `ColumnIndexFilter`), and the filtered 
Parquet pages are returned to Spark through the 
`ParquetFileReader.readNextFilteredRowGroup` and 
`ParquetFileReader.getFilteredRecordCount` API. Please see #31393 for the 
related changes in the vectorized reader path.
   2. Spark needs more work to handle mis-aligned Parquet pages returned from 
parquet-mr side, when there are multiple columns and their type width are 
different (e.g., int and bigint). For this issue, @lxian already gave a pretty 
good description in 
[SPARK-34859](https://issues.apache.org/jira/browse/SPARK-34859). To support 
the case, Spark needs to leverage the API 
[`PageReadStore.getRowIndexes`](https://javadoc.io/doc/org.apache.parquet/parquet-column/latest/org/apache/parquet/column/page/PageReadStore.html),
 which returns the indexes of all rows (note the difference between rows and 
values: for flat schema there is no difference between the two, but for nested 
schema they're different) after filtering within a Parquet row group. In 
addition, because there are gaps between pages, we'll need to know what is the 
index for the first row in a page, so we can compare indexes of values (rows) 
from a page with the row indexes mentioned above. This is provided by the 
`DataPage.getFirstRow
 Index` method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-21 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r655599949



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##
@@ -170,77 +170,135 @@ public int readInteger() {
*  }
*/
   public void readBatch(

Review comment:
   Sorry @cloud-fan, I should've add more context in the PR description. 
Let me try to add here and copy later to there.
   
   1. The column index filtering is largely implemented in parquet-mr (via 
classes such as `ColumnIndex` and `ColumnIndexFilter`), and the filtered 
Parquet pages are returned to Spark through the 
`ParquetFileReader.readNextFilteredRowGroup` and 
`ParquetFileReader.getFilteredRecordCount` API. Please see #31393 for the 
related changes in the vectorized reader path.
   2. Spark needs more work to handle mis-aligned Parquet pages returned from 
parquet-mr side, when there are multiple columns and their type width are 
different (e.g., int and bigint). For this issue, @lxian already gave a pretty 
good description in 
[SPARK-34859](https://issues.apache.org/jira/browse/SPARK-34859). To support 
the case, Spark needs to leverage the API 
[`PageReadStore.getRowIndexes`](https://javadoc.io/doc/org.apache.parquet/parquet-column/latest/org/apache/parquet/column/page/PageReadStore.html),
 which returns the indexes of all rows (note the difference between rows and 
values: for flat schema there is no difference between the two, but for complex 
schema they're different) after filtering within a Parquet row group. In 
addition, because there are gaps between pages, we'll need to know what is the 
index for the first row in a page, so we can compare indexes of values (rows) 
from a page with the row indexes mentioned above. This is provided by the 
`DataPage.getFirstRo
 wIndex` method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-21 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r655599949



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##
@@ -170,77 +170,135 @@ public int readInteger() {
*  }
*/
   public void readBatch(

Review comment:
   Sorry @cloud-fan, I should've add more context in the PR description. 
Let me try to add here and copy later to there.
   
   1. The column index filtering is largely implemented in parquet-mr (via 
classes such as `ColumnIndex` and `ColumnIndexFilter`), and the filtered 
Parquet pages are returned to Spark through the 
`ParquetFileReader.readNextFilteredRowGroup` and 
`ParquetFileReader.getFilteredRecordCount` API. Please see #31393 for the 
related changes in the vectorized reader path.
   2. Spark needs more work to handle mis-aligned Parquet pages returned from 
parquet-mr side, when there are multiple columns and their type width are 
different (e.g., int and bigint). For this issue, @lxian already gave a pretty 
good description in 
[SPARK-34859](https://issues.apache.org/jira/browse/SPARK-34859). To support 
the case, Spark needs to leverage the API 
[`PageReadStore.getRowIndexes`](https://javadoc.io/doc/org.apache.parquet/parquet-column/latest/org/apache/parquet/column/page/PageReadStore.html),
 which returns the indexes of all rows (note the difference between rows and 
values: for flat schema there is no difference between the two, but for complex 
schema they're different) after filtering within a Parquet row group. In 
addition, because there are gaps between pages, we'll need to know what is the 
index for the first row in a page, so we can comparing indexes of values (rows) 
from a page with the row indexes. This is provided by the 
`DataPage.getFirstRowIndex` method
 .




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-21 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r655600665



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java
##
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PrimitiveIterator;
+
+/**
+ * Helper class to store intermediate state while reading a Parquet column 
chunk.
+ */
+final class ParquetReadState {

Review comment:
   Yes this occurred to me the other day as well :) I think it's a good 
idea. Let me move this refactoring part into a separate PR. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-21 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r655599949



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##
@@ -170,77 +170,135 @@ public int readInteger() {
*  }
*/
   public void readBatch(

Review comment:
   Sorry @cloud-fan, I should've add more context in the PR description. 
Let me try to add here and copy later to there.
   
   1. The column index filtering is largely implemented in parquet-mr (via 
classes such as `ColumnIndex` and `ColumnIndexFilter`), and the filtered 
Parquet pages are returned to Spark through the 
`ParquetFileReader.readNextFilteredRowGroup` and 
`ParquetFileReader.getFilteredRecordCount` API. Please see #31393 for the 
related changes in the vectorized reader path.
   2. Spark needs more work to handle mis-aligned Parquet pages returned from 
parquet-mr side, when there are multiple columns and their type width are 
different (e.g., int and bigint). For this issue, @lxian already gave a pretty 
good description in 
[SPARK-34859](https://issues.apache.org/jira/browse/SPARK-34859). To support 
the case, Spark needs to leverage the API 
[`PageReadStore.getRowIndexes`](https://javadoc.io/doc/org.apache.parquet/parquet-column/latest/org/apache/parquet/column/page/PageReadStore.html),
 which returns the indexes of all rows (note the difference between rows and 
values: for flat schema there is no difference between the two, but for complex 
schema they're different) after filtering within a Parquet row group. In 
addition, because there are gaps between pages, we'll need to know what is the 
index for the first row in a page, so we can keep consuming values from a page 
and skip them when they are not in the row indexes. This is provided by the 
`DataPage.getFirs
 tRowIndex` method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader

2021-06-18 Thread GitBox


sunchao commented on a change in pull request #32753:
URL: https://github.com/apache/spark/pull/32753#discussion_r654010520



##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
##
@@ -340,6 +398,71 @@ public Binary readBinary(int len) {
 throw new UnsupportedOperationException("only readInts is valid.");
   }
 
+  @Override
+  public void skipIntegers(int total) {
+int left = total;
+while (left > 0) {
+  if (this.currentCount == 0) this.readNextGroup();
+  int n = Math.min(left, this.currentCount);
+  advance(n);
+  left -= n;
+}
+  }
+
+  @Override
+  public void skipBooleans(int total) {
+throw new UnsupportedOperationException("only skipIntegers is supported");

Review comment:
   I can change it to `only skipIntegers is valid`. It's because Parquet 
RLE encoding only support integers which is why all the others are 
unimplemented.

##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
##
@@ -378,11 +380,77 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
   .withWriterVersion(PARQUET_1_0)
   .withCompressionCodec(GZIP)
   .withRowGroupSize(1024 * 1024)
-  .withPageSize(1024)
+  .withPageSize(pageSize)
+  .withDictionaryPageSize(dictionaryPageSize)
   .withConf(hadoopConf)
   .build()
   }
 
+  test("test multiple pages with different sizes and nulls") {

Review comment:
   Sure wil do

##
File path: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
##
@@ -61,6 +61,14 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  @Override
+  public final void skipBooleans(int total) {
+// TODO: properly vectorize this

Review comment:
   This follows a few other TODOs in the file when handling booleans. Yes I 
can file a JIRA too




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org