davecromberge opened a new pull request, #18633:
URL: https://github.com/apache/pinot/pull/18633

   ## Summary
   
   Eliminates the per-row `byte[]` allocation + `memcpy` that today fronts 
every serialized-sketch read in Pinot. Theta sketches become true zero-copy via 
`Sketch.wrap`; CPC and Integer-Tuple sketches skip the upstream alloc but still 
heapify (no zero-copy wrap exists for those families).
   
   The change is additive and codec-safe — a stability flag on the 
forward-index reader gates every view path, so compressed columns transparently 
fall back to the existing `byte[]` path. No segment format change, no wire 
protocol change, all SPI additions are interface defaults.
   
   ## What's in each commit
   
   1. **`ForwardIndexReader.getBytesView(int, T)`** - SPI default wraps 
`getBytes()`; V3 (`VarByteChunkSVForwardIndexReader`) and V4 
(`VarByteChunkForwardIndexReaderV4`) override with true zero-copy slices into 
the underlying `PinotDataBuffer` (PASS_THROUGH) or the per-context 
decompression scratch (compressed). V5/V6 inherit V4. Huge compressed values 
fall back to byte[] (no slice path).
   2. **Query read path** - 
`ForwardIndexReader.isBufferViewStableAcrossReads()` capability flag (`true` 
only for PASS_THROUGH var-byte readers). `BlockValSet.getBytesValueViewsSV() / 
isBytesViewStableAcrossReads()` mirror it. 
`ProjectionBlockValSet`/`DataBlockCache`/`DataFetcher` plumb views through. 
Theta/CPC/Tuple aggregation functions branch on the flag and consume views via 
`Memory.wrap(buffer, LITTLE_ENDIAN)`. Theta's advanced multi-argument 
`FilterEvaluator` form stays on the byte[] path (out of scope, see below).
   3. **Star-tree build path** - `ValueAggregator.applyRawValueFromBuffer / 
applyAggregatedValueFromBuffer` SPI defaults (drain to byte[] + delegate); 
theta/CPC/Tuple aggregator overrides. 
`PinotSegmentColumnReader.getValueAsBuffer`. `BaseSingleTreeBuilder` dispatches 
on a per-metric `_metricUsesBufferPath` flag.
   4. **Broker reduce path** - `ObjectSerDeUtils` switches six sketch serdes 
(theta, tuple, CPC + their three accumulator variants) from `new 
byte[remaining()] + memcpy + Memory.wrap(bytes)` to `Memory.wrap(buffer, 
LITTLE_ENDIAN)`. Source is the heap byte[] in 
`DataTableImplV4._variableSizeData` - immutable after wire-decode.
   
   ## Risk surface
   
     - **Lifetime contract on `getBytesView`**: returned buffer is valid only 
until the next call on the same context (the compressed reader slices a scratch 
buffer that gets overwritten). Stability flag gates every batched caller; 
non-batched code paths consume in-place. Documented at the SPI and at every 
introduced call site.
     - **Theta `Sketch.wrap` retains the wrapped Memory** - the only zero-copy 
retain in the branch. Audited at all three call sites: query and build wrappers 
are method-local; broker wrapper sits in `ThetaSketchAccumulator._accumulator` 
until the next threshold union (default 2). Heap byte[], GC-managed, bounded by 
reduce duration.
     - **`--add-opens=java.base/sun.nio.ch=ALL-UNNAMED` requirement** - 
`Memory.wrap(ByteBuffer)` reflects into closed internals. Pinot's standard 
launcher (`appAssemblerScriptTemplate`) already sets this; surefire is 
configured for tests. Without the flag, PASS_THROUGH reads throw 
`IllegalAccessError` - flagged here in case any deployment uses a custom 
launcher.
     - **Byte order**: every `Memory.wrap(ByteBuffer, …)` call passes 
`ByteOrder.LITTLE_ENDIAN` explicitly to match the on-disk sketch format and the 
implicit native order of `Memory.wrap(byte[])`.
   
     ## Benchmarks
   
     Inner-loop pinpoint bench (50K rows × ~8 KB compact theta sketches, JDK 
21, PASS_THROUGH):
   
     | sketch | byte[] (ms/op) | view (ms/op) | delta |
     |---|---|---|---|
     | theta | ~25 | ~5 | **−80%** |
     | cpc   | ~485 | ~500 | ~0% (heapify dominates) |
     | tuple | ~606 | ~528 | **−13%** |
   
     `BenchmarkRawForwardIndexReader` (commit 1): ~55% improvement 
PASS_THROUGH, ~22% LZ4 at the SPI hot path.
   
     Star-tree build: ~10% improvement PASS_THROUGH (build is allocation-bound).
   
     Theta dominates because `Sketch.wrap` is true zero-copy; CPC/Tuple savings 
are smaller because heapify allocates internal state regardless of input shape.
   
     ## Test coverage
   
     - `ForwardIndexBytesViewTest` - V3/V4 view ≡ byte[] across all codecs and 
edge cases including huge values.
     - `SketchViewPathParityTest` - theta/CPC/Tuple distinct count identical 
between PASS_THROUGH and LZ4 segments end-to-end through real query plumbing.
     - `DistinctCount{Theta,CPC,Tuple}SketchValueAggregatorTest` — buffer-API 
path ≡ byte[]-API path per aggregator.
     - `SketchBuildPathParityTest` - star-tree pre-aggregated sketches 
bit-identical between PASS_THROUGH and LZ4 source segments.
     - `ObjectSerDeUtilsBufferParityTest` - deserialize from byte[] ≡ 
deserialize from ByteBuffer (including non-zero array offset slices matching 
`DataTableImplV4.getCustomObject` output).
   
     All pass. `BenchmarkValueAggregatorBufferApi` verifies no regression on 
the default SPI fallback for non-overriding aggregators.
   
     ## Out of scope (deferred follow-ups)
   
     - **Theta multi-argument form** (`distinctCountThetaSketchWithFilter`, 
etc.) - `FilterEvaluator` re-reads value arrays per row and casts to 
`byte[][]`; switching it requires interface
     changes. Gated off in this PR.
     - **KLL doubles sketch** - uses `KllDoublesSketch.wrap` (also a 
retain-style wrap); mechanically straightforward but out of this PR's scope.
     - **HLL / HLL+** - HLL already reads via `IntBuffer`; HLL+ delegates to a 
third-party `HyperLogLogPlus.Builder.build(byte[])` that would need a fork.
     - **TDigest**  - already optimal (passes the buffer directly to 
`MergingDigest.fromBytes`).
     - **Compressed columns getting the inner-loop win** - would require 
restructuring aggregation functions to consume views per-row instead of 
materialising a view array. Larger refactor; not
      pursued here because the codec-gated approach is safe everywhere and free 
on PASS_THROUGH where it matters most.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to