[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r661724331 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java ## @@ -156,55 +156,81 @@ public int readInteger() { } /** - * Reads `total` ints into `c` filling them in starting at `c[rowId]`. This reader - * reads the definition levels and then will read from `data` for the non-null values. - * If the value is null, c will be populated with `nullValue`. Note that `nullValue` is only - * necessary for readIntegers because we also use it to decode dictionaryIds and want to make - * sure it always has a value in range. - * - * This is a batched version of this logic: - * if (this.readInt() == level) { - *c[rowId] = data.readInteger(); - * } else { - *c[rowId] = null; - * } + * Reads a batch of values into vector `values`, using `valueReader`. The related states such + * as row index, offset, number of values left in the batch and page, etc, are tracked by + * `state`. The type-specific `updater` is used to update or skip values. + * + * This reader reads the definition levels and then will read from `valueReader` for the + * non-null values. If the value is null, `values` will be populated with null value. */ public void readBatch( ParquetReadState state, WritableColumnVector values, VectorizedValuesReader valueReader, ParquetVectorUpdater updater) throws IOException { int offset = state.offset; -int left = Math.min(state.valuesToReadInBatch, state.valuesToReadInPage); +long rowId = state.rowId; +int leftInBatch = state.valuesToReadInBatch; +int leftInPage = state.valuesToReadInPage; -while (left > 0) { +while (leftInBatch > 0 && leftInPage > 0) { if (this.currentCount == 0) this.readNextGroup(); - int n = Math.min(left, this.currentCount); - - switch (mode) { -case RLE: - if (currentValue == state.maxDefinitionLevel) { -updater.updateBatch(n, offset, values, valueReader); - } else { -values.putNulls(offset, n); - } - break; -case PACKED: - for (int i = 0; i < n; ++i) { -if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) { - updater.update(offset + i, values, valueReader); + int n = Math.min(leftInBatch, Math.min(leftInPage, this.currentCount)); + + long rangeStart = state.currentRangeStart(); + long rangeEnd = state.currentRangeEnd(); + + if (rowId + n < rangeStart) { +updater.skipValues(n, valueReader); +advance(n); +rowId += n; +leftInPage -= n; + } else if (rowId > rangeEnd) { +state.nextRange(); + } else { +// the range [rowId, rowId + n) overlaps with the current row range in state +long start = Math.max(rangeStart, rowId); +long end = Math.min(rangeEnd, rowId + n - 1); + +// skip the part [rowId, start) +int toSkip = (int) (start - rowId); +if (toSkip > 0) { Review comment: `start` must >= `rowId` because it is defined as `long start = Math.max(rangeStart, rowId)`. Therefore, the case 1 `start < rowId` will never happen. The second case, `(start - rowId) > Int.MaxValue`, can only occur if `start` is equal to `rangeStart`. In this case we also know that `rangeStart <= rowId + n` (from line 183) and `n` is `Math.min(leftInBatch, Math.min(leftInPage, this.currentCount))` which is guaranteed to be within integer range. Therefore, the cast is safe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r661709891 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdater.java ## @@ -30,20 +30,28 @@ * @param values destination values vector * @param valuesReader reader to read values from */ - void updateBatch( + void readValues( int total, int offset, WritableColumnVector values, VectorizedValuesReader valuesReader); + /** + * Skip a batch of `total` values from `valuesReader`. + * + * @param total total number of values to skip + * @param valuesReader reader to skip values from + */ + void skipValues(int total, VectorizedValuesReader valuesReader); Review comment: Updated the PR description. Regarding the comment, IMO even though the method name is changed, the comment is still accurate in expressing what the method does. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r661706809 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java ## @@ -33,31 +58,104 @@ /** The remaining number of values to read in the current batch */ int valuesToReadInBatch; - ParquetReadState(int maxDefinitionLevel) { + ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong rowIndexes) { this.maxDefinitionLevel = maxDefinitionLevel; +this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes); Review comment: Yes that's a fair point. Will do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r659977651 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java ## @@ -156,55 +156,81 @@ public int readInteger() { } /** - * Reads `total` ints into `c` filling them in starting at `c[rowId]`. This reader - * reads the definition levels and then will read from `data` for the non-null values. - * If the value is null, c will be populated with `nullValue`. Note that `nullValue` is only - * necessary for readIntegers because we also use it to decode dictionaryIds and want to make - * sure it always has a value in range. - * - * This is a batched version of this logic: - * if (this.readInt() == level) { - *c[rowId] = data.readInteger(); - * } else { - *c[rowId] = null; - * } + * Reads a batch of values into vector `values`, using `valueReader`. The related states such + * as row index, offset, number of values left in the batch and page, etc, are tracked by + * `state`. The type-specific `updater` is used to update or skip values. + * + * This reader reads the definition levels and then will read from `valueReader` for the + * non-null values. If the value is null, `values` will be populated with null value. */ public void readBatch( ParquetReadState state, WritableColumnVector values, VectorizedValuesReader valueReader, ParquetVectorUpdater updater) throws IOException { int offset = state.offset; -int left = Math.min(state.valuesToReadInBatch, state.valuesToReadInPage); +long rowId = state.rowId; +int leftInBatch = state.valuesToReadInBatch; +int leftInPage = state.valuesToReadInPage; -while (left > 0) { +while (leftInBatch > 0 && leftInPage > 0) { if (this.currentCount == 0) this.readNextGroup(); - int n = Math.min(left, this.currentCount); - - switch (mode) { -case RLE: - if (currentValue == state.maxDefinitionLevel) { -updater.updateBatch(n, offset, values, valueReader); - } else { -values.putNulls(offset, n); - } - break; -case PACKED: - for (int i = 0; i < n; ++i) { -if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) { - updater.update(offset + i, values, valueReader); + int n = Math.min(leftInBatch, Math.min(leftInPage, this.currentCount)); + + long rangeStart = state.currentRangeStart(); + long rangeEnd = state.currentRangeEnd(); + + if (rowId + n < rangeStart) { +updater.skipBatch(n, valueReader); +advance(n); +rowId += n; +leftInPage -= n; Review comment: No, because we are skipping the batch here, and not adding the values into the batch. ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java ## @@ -33,31 +51,104 @@ /** The remaining number of values to read in the current batch */ int valuesToReadInBatch; - ParquetReadState(int maxDefinitionLevel) { + ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong rowIndexes) { this.maxDefinitionLevel = maxDefinitionLevel; +this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes); +nextRange(); } /** - * Called at the beginning of reading a new batch. + * Construct a list of row ranges from the given `rowIndexes`. For example, suppose the + * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 row ranges: Review comment: It gives you an iterator so yeah generating them on the fly: https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java#L253. The indexes are generated from `Range` which is very similar to what we defined here. I'm planning to file a JIRA in parquet-mr to just return the original `Range`s so we don't have to do this step in Spark. ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java ## @@ -33,31 +51,104 @@ /** The remaining number of values to read in the current batch */ int valuesToReadInBatch; - ParquetReadState(int maxDefinitionLevel) { + ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong rowIndexes) { this.maxDefinitionLevel = maxDefinitionLevel; +this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes); +nextRange(); } /** - * Called at the beginning of reading a new batch. + * Construct a list of row ranges from the given `rowIndexes`. For example, suppose the + * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 row ranges: Review comment: https://issues.apache.org/jira/bro
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r660286736 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java ## @@ -156,55 +156,81 @@ public int readInteger() { } /** - * Reads `total` ints into `c` filling them in starting at `c[rowId]`. This reader - * reads the definition levels and then will read from `data` for the non-null values. - * If the value is null, c will be populated with `nullValue`. Note that `nullValue` is only - * necessary for readIntegers because we also use it to decode dictionaryIds and want to make - * sure it always has a value in range. - * - * This is a batched version of this logic: - * if (this.readInt() == level) { - *c[rowId] = data.readInteger(); - * } else { - *c[rowId] = null; - * } + * Reads a batch of values into vector `values`, using `valueReader`. The related states such + * as row index, offset, number of values left in the batch and page, etc, are tracked by + * `state`. The type-specific `updater` is used to update or skip values. + * + * This reader reads the definition levels and then will read from `valueReader` for the + * non-null values. If the value is null, `values` will be populated with null value. */ public void readBatch( ParquetReadState state, WritableColumnVector values, VectorizedValuesReader valueReader, ParquetVectorUpdater updater) throws IOException { int offset = state.offset; -int left = Math.min(state.valuesToReadInBatch, state.valuesToReadInPage); +long rowId = state.rowId; +int leftInBatch = state.valuesToReadInBatch; +int leftInPage = state.valuesToReadInPage; -while (left > 0) { +while (leftInBatch > 0 && leftInPage > 0) { if (this.currentCount == 0) this.readNextGroup(); - int n = Math.min(left, this.currentCount); - - switch (mode) { -case RLE: - if (currentValue == state.maxDefinitionLevel) { -updater.updateBatch(n, offset, values, valueReader); - } else { -values.putNulls(offset, n); - } - break; -case PACKED: - for (int i = 0; i < n; ++i) { -if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) { - updater.update(offset + i, values, valueReader); + int n = Math.min(leftInBatch, Math.min(leftInPage, this.currentCount)); + + long rangeStart = state.currentRangeStart(); + long rangeEnd = state.currentRangeEnd(); + + if (rowId + n < rangeStart) { +updater.skipBatch(n, valueReader); +advance(n); +rowId += n; +leftInPage -= n; Review comment: > I see, so there are 2 actions here: ... If you mean `readBatch`, then yes it does the two bullets above. For `skipBatch` it just skip the next `n` values from `valueReader`. @viirya @cloud-fan Yeah I can change the method name to `skipValues` or `skipFromReader` if that's more clearer :) my preference, though, is to keep this close to the `readBatch` above. So how about I update the 3 methods in `ParquetVectorUpdater` to be: `readValues`, `readValue` and `skipValues`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r660234205 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java ## @@ -295,6 +307,8 @@ private int readPageV1(DataPageV1 page) throws IOException { } private int readPageV2(DataPageV2 page) throws IOException { +this.pageFirstRowIndex = page.getFirstRowIndex().orElse(0L); Review comment: Good point. Let me do that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r660177273 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java ## @@ -151,26 +159,28 @@ void readBatch(int total, WritableColumnVector column) throws IOException { // page. dictionaryIds = column.reserveDictionaryIds(total); } -readState.resetForBatch(total); +readState.resetForNewBatch(total); while (readState.valuesToReadInBatch > 0) { - // Compute the number of values we want to read in this page. if (readState.valuesToReadInPage == 0) { int pageValueCount = readPage(); -readState.resetForPage(pageValueCount); +readState.resetForNewPage(pageValueCount, pageFirstRowIndex); } PrimitiveType.PrimitiveTypeName typeName = descriptor.getPrimitiveType().getPrimitiveTypeName(); if (isCurrentPageDictionaryEncoded) { // Save starting offset in case we need to decode dictionary IDs. int startOffset = readState.offset; +// Save starting row index so we can check if we need to eagerly decode dict ids later +long startRowId = readState.rowId; // Read and decode dictionary ids. defColumn.readIntegers(readState, dictionaryIds, column, (VectorizedValuesReader) dataColumn); // TIMESTAMP_MILLIS encoded as INT64 can't be lazily decoded as we need to post process // the values to add microseconds precision. -if (column.hasDictionary() || (startOffset == 0 && isLazyDecodingSupported(typeName))) { +if (column.hasDictionary() || (startRowId == pageFirstRowIndex && Review comment: Oh yeah, I need to update the comment too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r660177036 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java ## @@ -33,31 +51,104 @@ /** The remaining number of values to read in the current batch */ int valuesToReadInBatch; - ParquetReadState(int maxDefinitionLevel) { + ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong rowIndexes) { this.maxDefinitionLevel = maxDefinitionLevel; +this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes); +nextRange(); } /** - * Called at the beginning of reading a new batch. + * Construct a list of row ranges from the given `rowIndexes`. For example, suppose the + * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 row ranges: + * `[0-2], [4-5], [7-9]`. */ - void resetForBatch(int batchSize) { + private Iterator constructRanges(PrimitiveIterator.OfLong rowIndexes) { +List rowRanges = new ArrayList<>(); +long currentStart = Long.MIN_VALUE; +long previous = Long.MIN_VALUE; + +while (rowIndexes.hasNext()) { + long idx = rowIndexes.nextLong(); + if (currentStart == Long.MIN_VALUE) { +currentStart = idx; + } else if (previous + 1 != idx) { +RowRange range = new RowRange(currentStart, previous); +rowRanges.add(range); +currentStart = idx; + } + previous = idx; +} + +if (previous != Long.MIN_VALUE) { + rowRanges.add(new RowRange(currentStart, previous)); +} + +return rowRanges.iterator(); + } + + /** + * Must be called at the beginning of reading a new batch. + */ + void resetForNewBatch(int batchSize) { this.offset = 0; this.valuesToReadInBatch = batchSize; } /** - * Called at the beginning of reading a new page. + * Must be called at the beginning of reading a new page. */ - void resetForPage(int totalValuesInPage) { + void resetForNewPage(int totalValuesInPage, long pageFirstRowIndex) { this.valuesToReadInPage = totalValuesInPage; +this.rowId = pageFirstRowIndex; } /** - * Advance the current offset to the new values. + * Returns the start index of the current row range. */ - void advanceOffset(int newOffset) { + long currentRangeStart() { +return currentRange.start; + } + + /** + * Returns the end index of the current row range. + */ + long currentRangeEnd() { +return currentRange.end; + } + + /** + * Advance the current offset and rowId to the new values. + */ + void advanceOffsetAndRowId(int newOffset, long newRowId) { valuesToReadInBatch -= (newOffset - offset); -valuesToReadInPage -= (newOffset - offset); +valuesToReadInPage -= (newRowId - rowId); Review comment: I think this is not necessarily true: `rowId` tracks all the values that could be either read or skipped, while `offset` only tracks value that are read into the result column vector. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r659989472 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java ## @@ -33,31 +51,104 @@ /** The remaining number of values to read in the current batch */ int valuesToReadInBatch; - ParquetReadState(int maxDefinitionLevel) { + ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong rowIndexes) { this.maxDefinitionLevel = maxDefinitionLevel; +this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes); +nextRange(); } /** - * Called at the beginning of reading a new batch. + * Construct a list of row ranges from the given `rowIndexes`. For example, suppose the + * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 row ranges: Review comment: https://issues.apache.org/jira/browse/PARQUET-2061 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r659984532 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java ## @@ -33,31 +51,104 @@ /** The remaining number of values to read in the current batch */ int valuesToReadInBatch; - ParquetReadState(int maxDefinitionLevel) { + ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong rowIndexes) { this.maxDefinitionLevel = maxDefinitionLevel; +this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes); +nextRange(); } /** - * Called at the beginning of reading a new batch. + * Construct a list of row ranges from the given `rowIndexes`. For example, suppose the + * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 row ranges: Review comment: It gives you an iterator so yeah generating them on the fly: https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java#L253. The indexes are generated from `Range` which is very similar to what we defined here. I'm planning to file a JIRA in parquet-mr to just return the original `Range`s so we don't have to do this step in Spark. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r659977651 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java ## @@ -156,55 +156,81 @@ public int readInteger() { } /** - * Reads `total` ints into `c` filling them in starting at `c[rowId]`. This reader - * reads the definition levels and then will read from `data` for the non-null values. - * If the value is null, c will be populated with `nullValue`. Note that `nullValue` is only - * necessary for readIntegers because we also use it to decode dictionaryIds and want to make - * sure it always has a value in range. - * - * This is a batched version of this logic: - * if (this.readInt() == level) { - *c[rowId] = data.readInteger(); - * } else { - *c[rowId] = null; - * } + * Reads a batch of values into vector `values`, using `valueReader`. The related states such + * as row index, offset, number of values left in the batch and page, etc, are tracked by + * `state`. The type-specific `updater` is used to update or skip values. + * + * This reader reads the definition levels and then will read from `valueReader` for the + * non-null values. If the value is null, `values` will be populated with null value. */ public void readBatch( ParquetReadState state, WritableColumnVector values, VectorizedValuesReader valueReader, ParquetVectorUpdater updater) throws IOException { int offset = state.offset; -int left = Math.min(state.valuesToReadInBatch, state.valuesToReadInPage); +long rowId = state.rowId; +int leftInBatch = state.valuesToReadInBatch; +int leftInPage = state.valuesToReadInPage; -while (left > 0) { +while (leftInBatch > 0 && leftInPage > 0) { if (this.currentCount == 0) this.readNextGroup(); - int n = Math.min(left, this.currentCount); - - switch (mode) { -case RLE: - if (currentValue == state.maxDefinitionLevel) { -updater.updateBatch(n, offset, values, valueReader); - } else { -values.putNulls(offset, n); - } - break; -case PACKED: - for (int i = 0; i < n; ++i) { -if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) { - updater.update(offset + i, values, valueReader); + int n = Math.min(leftInBatch, Math.min(leftInPage, this.currentCount)); + + long rangeStart = state.currentRangeStart(); + long rangeEnd = state.currentRangeEnd(); + + if (rowId + n < rangeStart) { +updater.skipBatch(n, valueReader); +advance(n); +rowId += n; +leftInPage -= n; Review comment: No, because we are skipping the batch here, and not adding the values into the batch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r659191689 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java ## @@ -33,31 +51,107 @@ /** The remaining number of values to read in the current batch */ int valuesToReadInBatch; - ParquetReadState(int maxDefinitionLevel) { + ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong rowIndexes) { this.maxDefinitionLevel = maxDefinitionLevel; +this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes); +nextRange(); } /** - * Called at the beginning of reading a new batch. + * Construct a list of row ranges from the given `rowIndexes`. For example, suppose the + * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into 3 row ranges: + * `[0-2], [4-5], [7-9]`. */ - void resetForBatch(int batchSize) { + private Iterator constructRanges(PrimitiveIterator.OfLong rowIndexes) { +List rowRanges = new ArrayList<>(); +long currentStart = Long.MIN_VALUE; +long previous = Long.MIN_VALUE; + +while (rowIndexes.hasNext()) { + long idx = rowIndexes.nextLong(); + if (previous == Long.MIN_VALUE) { +currentStart = previous = idx; + } else if (previous + 1 != idx) { +RowRange range = new RowRange(currentStart, previous); +rowRanges.add(range); +currentStart = previous = idx; + } else { +previous = idx; + } Review comment: Yeah good point. Will change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r658990194 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java ## @@ -17,13 +17,31 @@ package org.apache.spark.sql.execution.datasources.parquet; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.PrimitiveIterator; + /** * Helper class to store intermediate state while reading a Parquet column chunk. */ final class ParquetReadState { - /** Maximum definition level */ + private static final RowRange MAX_ROW_RANGE = new RowRange(Long.MIN_VALUE, Long.MAX_VALUE); + private static final RowRange MIN_ROW_RANGE = new RowRange(Long.MAX_VALUE, Long.MIN_VALUE); + + /** Iterator over all row ranges, only not-null if column index is present */ + private final Iterator rowRanges; Review comment: No. The list of row ranges is associated with a Parquet row group. For example, let's say you have two columns `c1:int` and `c2:bigint` in the row group, and the following pages: ``` row index 0500 1000 1500 --- c1 (int)| | | | --- c2 (bigint) ||||||| --- 0 250 500 750 1000 1250 1500 ``` Suppose the query is `SELECT * FROM tbl WHERE c1 = 750 AND c2 = 1100` This, when applied on `c1`, will produce row range `[500, 1000)`. When applied on `c2`, will produce row range `[1000, 1250)`. These two will be unioned into `[500, 1250)` and that is the row range for the whole row group. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r658990194 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java ## @@ -17,13 +17,31 @@ package org.apache.spark.sql.execution.datasources.parquet; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.PrimitiveIterator; + /** * Helper class to store intermediate state while reading a Parquet column chunk. */ final class ParquetReadState { - /** Maximum definition level */ + private static final RowRange MAX_ROW_RANGE = new RowRange(Long.MIN_VALUE, Long.MAX_VALUE); + private static final RowRange MIN_ROW_RANGE = new RowRange(Long.MAX_VALUE, Long.MIN_VALUE); + + /** Iterator over all row ranges, only not-null if column index is present */ + private final Iterator rowRanges; Review comment: No. The list of row ranges is associated with a Parquet row group. For example, let's say you have two columns `c1:int` and `c2:bigint`, and the following pages: ``` row index 0500 1000 1500 --- c1 (int)| | | | --- c2 (bigint) ||||||| --- 0 250 500 750 1000 1250 1500 ``` Suppose the query is `SELECT * FROM tbl WHERE c1 = 750 AND c2 = 1100` This, when applied on `c1`, will produce row range `[500, 1000)`. When applied on `c2`, will produce row range `[1000, 1250)`. These two will be unioned into `[500, 1250)` and that is the row range for the whole row group. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r658990194 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java ## @@ -17,13 +17,31 @@ package org.apache.spark.sql.execution.datasources.parquet; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.PrimitiveIterator; + /** * Helper class to store intermediate state while reading a Parquet column chunk. */ final class ParquetReadState { - /** Maximum definition level */ + private static final RowRange MAX_ROW_RANGE = new RowRange(Long.MIN_VALUE, Long.MAX_VALUE); + private static final RowRange MIN_ROW_RANGE = new RowRange(Long.MAX_VALUE, Long.MIN_VALUE); + + /** Iterator over all row ranges, only not-null if column index is present */ + private final Iterator rowRanges; Review comment: No. The list of row ranges is associated with a Parquet row group. For example, let's say you have two columns `c1:int` and `c2:bigint`, and the following pages: ``` row index 0500 1000 1500 --- c1 (int)| | | | --- c2 (bigint) ||||||| --- 0 250 500 750 1000 1250 1500 ``` Suppose the query is `SELECT * FROM tbl WHERE c1 = 750 AND c2 = 1100` This, when applied on `c1`, will produce row range `[500, 1000)`. When applied on `c2`, will produce row range `[1000, 1250)`. These two will be unioned into `[500, 1250)` and that is the row ranges for the whole row group. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r658990333 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java ## @@ -33,31 +51,102 @@ /** The remaining number of values to read in the current batch */ int valuesToReadInBatch; - ParquetReadState(int maxDefinitionLevel) { + ParquetReadState(int maxDefinitionLevel, PrimitiveIterator.OfLong rowIndexes) { this.maxDefinitionLevel = maxDefinitionLevel; +this.rowRanges = rowIndexes == null ? null : constructRanges(rowIndexes); +nextRange(); + } + + private Iterator constructRanges(PrimitiveIterator.OfLong rowIndexes) { Review comment: Yup will do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r658990194 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java ## @@ -17,13 +17,31 @@ package org.apache.spark.sql.execution.datasources.parquet; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.PrimitiveIterator; + /** * Helper class to store intermediate state while reading a Parquet column chunk. */ final class ParquetReadState { - /** Maximum definition level */ + private static final RowRange MAX_ROW_RANGE = new RowRange(Long.MIN_VALUE, Long.MAX_VALUE); + private static final RowRange MIN_ROW_RANGE = new RowRange(Long.MAX_VALUE, Long.MIN_VALUE); + + /** Iterator over all row ranges, only not-null if column index is present */ + private final Iterator rowRanges; Review comment: No. The list of row ranges is associated with a Parquet row group. For example, let's say you have two columns `c1:int` and `c2:bigint`, and the following pages: ``` row index 0500 1000 1500 --- c1 (int)| | | | --- c2 (bigint) ||||||| --- 0 250 500 750 1000 1250 1500 ``` Suppose the query is `SELECT * FROM tbl WHERE c1 = 750 AND c2 = 1100` This, when applied on `c1`, will produce row ranges `[500, 1000)`. When applied on `c2`, will produce row ranges `[1000, 1250)`. These two will be unioned into `[500, 1250)` and that is the row ranges for the whole row group. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r658923673 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java ## @@ -17,13 +17,31 @@ package org.apache.spark.sql.execution.datasources.parquet; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.PrimitiveIterator; + /** * Helper class to store intermediate state while reading a Parquet column chunk. */ final class ParquetReadState { - /** Maximum definition level */ + private static final RowRange MAX_ROW_RANGE = new RowRange(Long.MIN_VALUE, Long.MAX_VALUE); + private static final RowRange MIN_ROW_RANGE = new RowRange(Long.MAX_VALUE, Long.MIN_VALUE); Review comment: The `MIN_ROW_RANGE` here is only used as a place holder when we've iterated through all the ranges. It makes sure that we'll reject all row indexes that come after all the row ranges we've processed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r657471042 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java ## @@ -61,6 +61,14 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + @Override + public final void skipBooleans(int total) { +// TODO: properly vectorize this Review comment: Created https://issues.apache.org/jira/browse/SPARK-35867 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r657290651 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java ## @@ -170,77 +170,135 @@ public int readInteger() { * } */ public void readBatch( - int total, - int offset, + ParquetReadState state, WritableColumnVector values, - int maxDefinitionLevel, VectorizedValuesReader valueReader, ParquetVectorUpdater updater) throws IOException { -int left = total; -while (left > 0) { +int offset = state.offset; +long rowId = state.rowId; + +while (state.hasMoreInPage(offset, rowId)) { if (this.currentCount == 0) this.readNextGroup(); - int n = Math.min(left, this.currentCount); - switch (mode) { -case RLE: - if (currentValue == maxDefinitionLevel) { -updater.updateBatch(n, offset, values, valueReader); - } else { -values.putNulls(offset, n); - } - break; -case PACKED: - for (int i = 0; i < n; ++i) { -if (currentBuffer[currentBufferIdx++] == maxDefinitionLevel) { - updater.update(offset + i, values, valueReader); + int n = Math.min(state.valuesToReadInBatch + state.offset - offset, this.currentCount); Review comment: because `state.valuesToReadInBatch` is the number of values left in the batch. Suppose the initial batch size is 1000, and `valuesToReadInBatch` is 400. This means we've read 600 values and so `state.offset` and `offset` are both 600 at the beginning. We'll need to use `offset - state.offset` to get how many values we've read so far. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r657286411 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java ## @@ -294,6 +292,7 @@ private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in) thr private void readPageV1(DataPageV1 page) throws IOException { this.pageValueCount = page.getValueCount(); +this.pageFirstRowIndex = page.getFirstRowIndex().orElse(0L); Review comment: yea it can return 0, in which case it is the same as without column index and the whole page has to be read. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r657285733 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java ## @@ -174,24 +171,29 @@ void readBatch(int total, WritableColumnVector column) throws IOException { // page. dictionaryIds = column.reserveDictionaryIds(total); } -while (total > 0) { +readState.resetForBatch(total); +while (readState.valuesToReadInBatch > 0) { // Compute the number of values we want to read in this page. - int leftInPage = (int) (endOfPageValueCount - valuesRead); - if (leftInPage == 0) { + if (readState.valuesToReadInPage == 0) { readPage(); -leftInPage = (int) (endOfPageValueCount - valuesRead); +readState.resetForPage(pageValueCount, pageFirstRowIndex); } - int num = Math.min(total, leftInPage); PrimitiveType.PrimitiveTypeName typeName = descriptor.getPrimitiveType().getPrimitiveTypeName(); if (isCurrentPageDictionaryEncoded) { +boolean supportLazyDecoding = readState.rowId == pageFirstRowIndex && Review comment: This is just to check whether we are at the first iteration of the loop. Basically it checks whether lazy dictionary decoding is supported and if not, eagerly decode all the dictionary IDs read so far. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r657284056 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java ## @@ -95,6 +80,18 @@ */ private final ParquetVectorUpdaterFactory updaterFactory; + /** + * Helper struct to track intermediate states while reading Parquet pages in the column chunk. + */ + private final ParquetReadState readState; + + /** + * The index for the first row in the current page, among all rows across all pages in the + * column chunk for this reader. The value for this is 0 if there is no column index for the Review comment: Yes it can happen. Perhaps I should rephrase it to: "If there is no column index, the value for this is 0". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r656479348 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.PrimitiveIterator; + +/** + * Helper class to store intermediate state while reading a Parquet column chunk. + */ +final class ParquetReadState { + private static final RowRange MAX_ROW_RANGE = new RowRange(Long.MIN_VALUE, Long.MAX_VALUE); + private static final RowRange MIN_ROW_RANGE = new RowRange(Long.MAX_VALUE, Long.MIN_VALUE); + + /** Iterator over all row ranges, only not-null if column index is present */ + private final Iterator rowRanges; + + /** The current row range */ + private RowRange currentRange; + + /** Maximum definition level */ + int maxDefinitionLevel; + + /** The current index overall all rows within the column chunk. This is used to check if the + * current row should be skipped by comparing the index against the row ranges. */ + long rowId; + + /** The offset to add the next value in the current batch */ Review comment: Thanks for the suggestion. I think that's more clear. Will change it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r655599949 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java ## @@ -170,77 +170,135 @@ public int readInteger() { * } */ public void readBatch( Review comment: Sorry @cloud-fan, I should've add more context in the PR description. Let me try to add here and copy later to there. 1. The column index filtering is largely implemented in parquet-mr (via classes such as `ColumnIndex` and `ColumnIndexFilter`), and the filtered Parquet pages are returned to Spark through the `ParquetFileReader.readNextFilteredRowGroup` and `ParquetFileReader.getFilteredRecordCount` API. Please see #31393 for the related changes in the vectorized reader path. 2. Spark needs more work to handle mis-aligned Parquet pages returned from parquet-mr side, when there are multiple columns and their type width are different (e.g., int and bigint). For this issue, @lxian already gave a pretty good description in [SPARK-34859](https://issues.apache.org/jira/browse/SPARK-34859). To support the case, Spark needs to leverage the API [`PageReadStore.getRowIndexes`](https://javadoc.io/doc/org.apache.parquet/parquet-column/latest/org/apache/parquet/column/page/PageReadStore.html), which returns the indexes of all rows (note the difference between rows and values: for flat schema there is no difference between the two, but for complex schema they're different) after filtering within a Parquet row group. In addition, because there are gaps between pages, we'll need to know what is the index for the first row in a page, so we can keep consuming values from a page and skip them when they are not in the row indexes. This is provided by the `DataPage.getFirs tRowIndex` method. ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.PrimitiveIterator; + +/** + * Helper class to store intermediate state while reading a Parquet column chunk. + */ +final class ParquetReadState { Review comment: Yes this occurred to me the other day as well :) I think it's a good idea. Let me move this refactoring part into a separate PR. Thanks. ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java ## @@ -170,77 +170,135 @@ public int readInteger() { * } */ public void readBatch( Review comment: Sorry @cloud-fan, I should've add more context in the PR description. Let me try to add here and copy later to there. 1. The column index filtering is largely implemented in parquet-mr (via classes such as `ColumnIndex` and `ColumnIndexFilter`), and the filtered Parquet pages are returned to Spark through the `ParquetFileReader.readNextFilteredRowGroup` and `ParquetFileReader.getFilteredRecordCount` API. Please see #31393 for the related changes in the vectorized reader path. 2. Spark needs more work to handle mis-aligned Parquet pages returned from parquet-mr side, when there are multiple columns and their type width are different (e.g., int and bigint). For this issue, @lxian already gave a pretty good description in [SPARK-34859](https://issues.apache.org/jira/browse/SPARK-34859). To support the case, Spark needs to leverage the API [`PageReadStore.getRowIndexes`](https://javadoc.io/doc/org.apache.parquet/parquet-column/latest/org/apache/parquet/column/page/PageReadStore.html), which returns the indexes of all rows (note the difference between rows and values: for flat schema there is no difference between the two, but for complex schema they're different) after filtering within a Parquet row group. In addition, because there are gaps between pages, we'll need to know what is the index for the first row in a page, so we can comparing indexes
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r655747755 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.PrimitiveIterator; + +/** + * Helper class to store intermediate state while reading a Parquet column chunk. + */ +final class ParquetReadState { Review comment: Opened #33006 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r655599949 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java ## @@ -170,77 +170,135 @@ public int readInteger() { * } */ public void readBatch( Review comment: Sorry @cloud-fan, I should've add more context in the PR description. Let me try to add here and copy later to there. 1. The column index filtering is largely implemented in parquet-mr (via classes such as `ColumnIndex` and `ColumnIndexFilter`), and the filtered Parquet pages are returned to Spark through the `ParquetFileReader.readNextFilteredRowGroup` and `ParquetFileReader.getFilteredRecordCount` API. Please see #31393 for the related changes in the vectorized reader path. 2. Spark needs more work to handle mis-aligned Parquet pages returned from parquet-mr side, when there are multiple columns and their type width are different (e.g., int and bigint). For this issue, @lxian already gave a pretty good description in [SPARK-34859](https://issues.apache.org/jira/browse/SPARK-34859). To support the case, Spark needs to leverage the API [`PageReadStore.getRowIndexes`](https://javadoc.io/doc/org.apache.parquet/parquet-column/latest/org/apache/parquet/column/page/PageReadStore.html), which returns the indexes of all rows (note the difference between rows and values: for flat schema there is no difference between the two, but for nested schema they're different) after filtering within a Parquet row group. In addition, because there are gaps between pages, we'll need to know what is the index for the first row in a page, so we can compare indexes of values (rows) from a page with the row indexes mentioned above. This is provided by the `DataPage.getFirstRow Index` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r655599949 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java ## @@ -170,77 +170,135 @@ public int readInteger() { * } */ public void readBatch( Review comment: Sorry @cloud-fan, I should've add more context in the PR description. Let me try to add here and copy later to there. 1. The column index filtering is largely implemented in parquet-mr (via classes such as `ColumnIndex` and `ColumnIndexFilter`), and the filtered Parquet pages are returned to Spark through the `ParquetFileReader.readNextFilteredRowGroup` and `ParquetFileReader.getFilteredRecordCount` API. Please see #31393 for the related changes in the vectorized reader path. 2. Spark needs more work to handle mis-aligned Parquet pages returned from parquet-mr side, when there are multiple columns and their type width are different (e.g., int and bigint). For this issue, @lxian already gave a pretty good description in [SPARK-34859](https://issues.apache.org/jira/browse/SPARK-34859). To support the case, Spark needs to leverage the API [`PageReadStore.getRowIndexes`](https://javadoc.io/doc/org.apache.parquet/parquet-column/latest/org/apache/parquet/column/page/PageReadStore.html), which returns the indexes of all rows (note the difference between rows and values: for flat schema there is no difference between the two, but for complex schema they're different) after filtering within a Parquet row group. In addition, because there are gaps between pages, we'll need to know what is the index for the first row in a page, so we can compare indexes of values (rows) from a page with the row indexes mentioned above. This is provided by the `DataPage.getFirstRo wIndex` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r655599949 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java ## @@ -170,77 +170,135 @@ public int readInteger() { * } */ public void readBatch( Review comment: Sorry @cloud-fan, I should've add more context in the PR description. Let me try to add here and copy later to there. 1. The column index filtering is largely implemented in parquet-mr (via classes such as `ColumnIndex` and `ColumnIndexFilter`), and the filtered Parquet pages are returned to Spark through the `ParquetFileReader.readNextFilteredRowGroup` and `ParquetFileReader.getFilteredRecordCount` API. Please see #31393 for the related changes in the vectorized reader path. 2. Spark needs more work to handle mis-aligned Parquet pages returned from parquet-mr side, when there are multiple columns and their type width are different (e.g., int and bigint). For this issue, @lxian already gave a pretty good description in [SPARK-34859](https://issues.apache.org/jira/browse/SPARK-34859). To support the case, Spark needs to leverage the API [`PageReadStore.getRowIndexes`](https://javadoc.io/doc/org.apache.parquet/parquet-column/latest/org/apache/parquet/column/page/PageReadStore.html), which returns the indexes of all rows (note the difference between rows and values: for flat schema there is no difference between the two, but for complex schema they're different) after filtering within a Parquet row group. In addition, because there are gaps between pages, we'll need to know what is the index for the first row in a page, so we can comparing indexes of values (rows) from a page with the row indexes. This is provided by the `DataPage.getFirstRowIndex` method . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r655600665 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetReadState.java ## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.PrimitiveIterator; + +/** + * Helper class to store intermediate state while reading a Parquet column chunk. + */ +final class ParquetReadState { Review comment: Yes this occurred to me the other day as well :) I think it's a good idea. Let me move this refactoring part into a separate PR. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r655599949 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java ## @@ -170,77 +170,135 @@ public int readInteger() { * } */ public void readBatch( Review comment: Sorry @cloud-fan, I should've add more context in the PR description. Let me try to add here and copy later to there. 1. The column index filtering is largely implemented in parquet-mr (via classes such as `ColumnIndex` and `ColumnIndexFilter`), and the filtered Parquet pages are returned to Spark through the `ParquetFileReader.readNextFilteredRowGroup` and `ParquetFileReader.getFilteredRecordCount` API. Please see #31393 for the related changes in the vectorized reader path. 2. Spark needs more work to handle mis-aligned Parquet pages returned from parquet-mr side, when there are multiple columns and their type width are different (e.g., int and bigint). For this issue, @lxian already gave a pretty good description in [SPARK-34859](https://issues.apache.org/jira/browse/SPARK-34859). To support the case, Spark needs to leverage the API [`PageReadStore.getRowIndexes`](https://javadoc.io/doc/org.apache.parquet/parquet-column/latest/org/apache/parquet/column/page/PageReadStore.html), which returns the indexes of all rows (note the difference between rows and values: for flat schema there is no difference between the two, but for complex schema they're different) after filtering within a Parquet row group. In addition, because there are gaps between pages, we'll need to know what is the index for the first row in a page, so we can keep consuming values from a page and skip them when they are not in the row indexes. This is provided by the `DataPage.getFirs tRowIndex` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sunchao commented on a change in pull request #32753: [SPARK-34859][SQL] Handle column index when using vectorized Parquet reader
sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r654010520 ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java ## @@ -340,6 +398,71 @@ public Binary readBinary(int len) { throw new UnsupportedOperationException("only readInts is valid."); } + @Override + public void skipIntegers(int total) { +int left = total; +while (left > 0) { + if (this.currentCount == 0) this.readNextGroup(); + int n = Math.min(left, this.currentCount); + advance(n); + left -= n; +} + } + + @Override + public void skipBooleans(int total) { +throw new UnsupportedOperationException("only skipIntegers is supported"); Review comment: I can change it to `only skipIntegers is valid`. It's because Parquet RLE encoding only support integers which is why all the others are unimplemented. ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala ## @@ -378,11 +380,77 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession .withWriterVersion(PARQUET_1_0) .withCompressionCodec(GZIP) .withRowGroupSize(1024 * 1024) - .withPageSize(1024) + .withPageSize(pageSize) + .withDictionaryPageSize(dictionaryPageSize) .withConf(hadoopConf) .build() } + test("test multiple pages with different sizes and nulls") { Review comment: Sure wil do ## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java ## @@ -61,6 +61,14 @@ public final void readBooleans(int total, WritableColumnVector c, int rowId) { } } + @Override + public final void skipBooleans(int total) { +// TODO: properly vectorize this Review comment: This follows a few other TODOs in the file when handling booleans. Yes I can file a JIRA too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org