sunchao commented on a change in pull request #32753: URL: https://github.com/apache/spark/pull/32753#discussion_r661724331
########## File path: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java ########## @@ -156,55 +156,81 @@ public int readInteger() { } /** - * Reads `total` ints into `c` filling them in starting at `c[rowId]`. This reader - * reads the definition levels and then will read from `data` for the non-null values. - * If the value is null, c will be populated with `nullValue`. Note that `nullValue` is only - * necessary for readIntegers because we also use it to decode dictionaryIds and want to make - * sure it always has a value in range. - * - * This is a batched version of this logic: - * if (this.readInt() == level) { - * c[rowId] = data.readInteger(); - * } else { - * c[rowId] = null; - * } + * Reads a batch of values into vector `values`, using `valueReader`. The related states such + * as row index, offset, number of values left in the batch and page, etc, are tracked by + * `state`. The type-specific `updater` is used to update or skip values. + * <p> + * This reader reads the definition levels and then will read from `valueReader` for the + * non-null values. If the value is null, `values` will be populated with null value. */ public void readBatch( ParquetReadState state, WritableColumnVector values, VectorizedValuesReader valueReader, ParquetVectorUpdater updater) throws IOException { int offset = state.offset; - int left = Math.min(state.valuesToReadInBatch, state.valuesToReadInPage); + long rowId = state.rowId; + int leftInBatch = state.valuesToReadInBatch; + int leftInPage = state.valuesToReadInPage; - while (left > 0) { + while (leftInBatch > 0 && leftInPage > 0) { if (this.currentCount == 0) this.readNextGroup(); - int n = Math.min(left, this.currentCount); - - switch (mode) { - case RLE: - if (currentValue == state.maxDefinitionLevel) { - updater.updateBatch(n, offset, values, valueReader); - } else { - values.putNulls(offset, n); - } - break; - case PACKED: - for (int i = 0; i < n; ++i) { - if (currentBuffer[currentBufferIdx++] == state.maxDefinitionLevel) { - updater.update(offset + i, values, valueReader); + int n = Math.min(leftInBatch, Math.min(leftInPage, this.currentCount)); + + long rangeStart = state.currentRangeStart(); + long rangeEnd = state.currentRangeEnd(); + + if (rowId + n < rangeStart) { + updater.skipValues(n, valueReader); + advance(n); + rowId += n; + leftInPage -= n; + } else if (rowId > rangeEnd) { + state.nextRange(); + } else { + // the range [rowId, rowId + n) overlaps with the current row range in state + long start = Math.max(rangeStart, rowId); + long end = Math.min(rangeEnd, rowId + n - 1); + + // skip the part [rowId, start) + int toSkip = (int) (start - rowId); + if (toSkip > 0) { Review comment: `start` must >= `rowId` because it is defined as `long start = Math.max(rangeStart, rowId)`. Therefore, the case 1 `start < rowId` will never happen. The second case, `(start - rowId) > Int.MaxValue`, can only occur if `start` is equal to `rangeStart`. In this case we also know that `rangeStart <= rowId + n` (from line 183) and `n` is `Math.min(leftInBatch, Math.min(leftInPage, this.currentCount))` which is guaranteed to be within integer range. Therefore, the cast is safe. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org