LuciferYang opened a new pull request, #55751:
URL: https://github.com/apache/spark/pull/55751
### What changes were proposed in this pull request?
This PR adds a bulk read+widen path for the `INT32 -> Long` type-converting
Parquet vector updater (`IntegerToLongUpdater`). When the run length of values
produced by the RLE/PACKED def-level decoder is at or above a configurable
threshold (default 8), the Updater delegates to a new bulk method
`VectorizedValuesReader.readIntegersAsLongs(total, c, rowId)`, which performs a
single `getBuffer(total*4)` call and runs a tight in-method sign-extending
conversion loop. Below the threshold, the existing per-row `readInteger() +
putLong()` loop is unchanged.
`VectorizedPlainValuesReader` is specialized; the interface default
implementation falls back to the per-row pattern so non-Plain readers preserve
correctness without a speedup. Specializing other readers (e.g.
`VectorizedDeltaBinaryPackedReader`) and other type-converting Updaters
(`IntegerToDouble`, `FloatToDouble`, `DateToTimestampNTZ`, `DowncastLong`) is
left to follow-up PRs.
The threshold is exposed as a new internal SQL conf:
- Key: `spark.sql.parquet.vectorized.updater.bulkThreshold`
- Default: `8`
- Range: `>= 1`
### Why are the changes needed?
The legacy per-row path pays a per-call `ByteBuffer` slice/orient allocation
inside `getBuffer(4)`, which dominates the cost of
`IntegerToLongUpdater.readValues`. INT32 -> Long widening is a common Parquet
schema-evolution path for narrow integer columns broadened to long.
Local benchmark on the `ParquetVectorUpdaterBenchmark`
`IntegerToLongUpdater` case (Mac, OpenJDK 17):
| | Best Time | Rate | Per Row |
|---|---|---|---|
| Before | 1 ms | 834.9 M/s | 1.2 ns |
| After | 0 ms | 3190.4 M/s | 0.3 ns |
| Delta | | **3.82x** | -75% |
Other type-converting Updaters (not yet specialized) hold steady at ~835
M/s, confirming the change is local to the wired Updater.
### Does this PR introduce _any_ user-facing change?
No, except for the addition of a new internal SQL conf
`spark.sql.parquet.vectorized.updater.bulkThreshold`. Default behavior is
unchanged from a correctness standpoint; only performance characteristics on
the INT32 -> Long path change.
### How was this patch tested?
- New unit tests in `ParquetVectorUpdaterSuite`:
- Bulk vs per-row equivalence at boundary lengths (0, 1, 7, 8, 9, 17,
1024, 4097).
- Threshold gate semantics via a spy reader that distinguishes `>=` from
`>` (a pure correctness assertion cannot).
- `readValue` (singular) is not gated by the threshold.
- Sign-extension on negative INT32 values (`Int.MinValue`, `-1`,
`Int.MaxValue`).
- Long-decimal dispatch path (`INT32 + DECIMAL(9,0) -> DecimalType(15,0)`
via `canReadAsLongDecimal`).
- New hygiene test pinning the SQLConf default to the legacy 7-arg ctor
literal in `VectorizedParquetRecordReader` so drift between `sql/core` and
`catalyst` is caught at unit level.
- New end-to-end test in `ParquetIOSuite` round-trips an INT32 file read
back as `LongType` under three threshold settings (1, default, `Int.MaxValue`)
on both nullable and non-null columns.
- Existing `ParquetVectorUpdaterBenchmark` provides A/B coverage for
follow-up PRs to track perf deltas of additional Updater specializations.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]