real-mj-song opened a new issue, #18629: URL: https://github.com/apache/pinot/issues/18629
### Summary This issue proposes adding a column-major ingestion path for Apache Arrow IPC files by implementing the `ColumnReader` SPI (introduced in #16727) inside the existing `pinot-arrow` plugin. The new `ArrowColumnReaderFactory` would let `SegmentIndexCreationDriverImpl.buildColumnar()` consume an Arrow IPC file directly, one `FieldVector` at a time, instead of pivoting through per-row `GenericRow` materialization. --- ### Background `pinot-arrow` today exposes Arrow IPC files only through `ArrowRecordReader`, a row-major reader. The file is opened with `ArrowFileReader`, and each row is copied out of the columnar `VectorSchemaRoot` into a `GenericRow` for the driver. The standard `SegmentIndexCreationDriverImpl.build()` then routes each row back into per-column stats collectors and forward-index creators. The source format is columnar and the segment storage is columnar, but the ingestion path between them is row-major. The `ColumnReader` / `ColumnReaderFactory` SPI added in #16727 enables a column-major alternative: `SegmentIndexCreationDriverImpl.buildColumnar()` walks one column at a time, handing each value directly to the per-column writers. At present this SPI has only one in-tree consumer — the segment-to-segment converter — and no consumer that reads from a non-Pinot source. `pinot-arrow` is a natural first non-segment consumer: an Arrow `FieldVector` already exposes positional access (`getObject(int)`, typed accessors, null bitmap), which lines up directly with the `ColumnReader` interface. No restructuring of the source is required. --- ### Proposal Add two source files under `pinot-plugins/pinot-input-format/pinot-arrow`: - **`ArrowColumnReaderFactory implements ColumnReaderFactory`** — opens an Arrow IPC file via `ArrowFileReader`, accumulates the file's record batches into per-column `FieldVector`s via `TransferPair.copyValueSafe`, and exposes one `ColumnReader` per requested target-schema column. The Arrow allocator limit is configurable via `arrowAllocatorLimit`. - **`ArrowColumnReader implements ColumnReader`** — wraps a single `FieldVector` and implements the three documented access patterns: generic sequential `next()` with null checks, typed sequential `nextInt()` / `nextLong()` / ... with `isNextNull()` and `skipNext()`, and random access by `docId`. Single-value and multi-value (`List` of primitive) variants are covered. Both paths must agree on Arrow → Pinot type conversion (e.g. `Utf8` / `LargeUtf8` must unwrap from `org.apache.arrow.vector.util.Text` to `String`; `List` must materialize to `Object[]`; temporal types must follow the `extractRawTimeValues` flag). To avoid drift between the row-major and column-major paths, the proposal extracts `ArrowRecordExtractor.convert()` and its helpers into a shared utility (`ArrowToPinotTypeConverter`) and routes both `ArrowRecordExtractor.extract` and `ArrowColumnReader.getValue` through it. This is a behavior-preserving refactor for the row-major path. | Layer | Today | Proposed | |---|---|---| | Driver entry | `driver.build()` | `driver.build()` → `buildColumnar()` | | Input adapter | `ArrowRecordReader` (per-row `GenericRow`) | `ArrowColumnReaderFactory` (per-column `ColumnReader`) | | Arrow → Pinot type conversion | inline in `ArrowRecordExtractor` | shared `ArrowToPinotTypeConverter` (used by both paths) | --- ### Arrow caveat: type assumptions downstream of `buildColumnar()` `buildColumnar()`'s only existing in-tree consumer is the segment-to-segment converter, which reads from an already-built Pinot segment and emits canonical JDK types (`String` for strings, `Object[]` for MV columns, etc.). Several downstream cast sites — for example `StringColumnPreIndexStatsCollector.collect` and `NoDictColumnStatisticsCollector.collect` — cast directly to `String` without a type check. This holds for the existing caller but is fragile under any non-segment source. An Arrow `FieldVector` for `Utf8` / `LargeUtf8` emits `org.apache.arrow.vector.util.Text` (a `byte[]`-backed string wrapper), not `String`. The shared converter unwraps `Text → String`, so this proposal works correctly today, but the underlying cast assumptions remain unverified. Tightening them is out of scope here and is best addressed in a follow-up. --- ### How this can be leveraged This is a building block for batch ingestion pipelines whose upstream stage operates on columnar data. Pipelines that can emit Arrow IPC shards can hand those shards to `ArrowColumnReaderFactory` and drive segment construction via `buildColumnar()` instead of replaying each shard row-by-row through `ArrowRecordReader`. The structural change is that the segment-build phase consumes one column at a time in bulk rather than one row at a time, so the per-row `GenericRow` allocation and per-primitive boxing inside the driver are no longer on the path. The proposal does not change the `pinot-batch-ingestion-spark-3` plugin, which today doesn't read Arrow at all. Wiring this path into that plugin (or any other batch framework) is a separate follow-up. --- ### Compatibility and rollout The change is purely additive: - `ArrowRecordReader` and the row-major path are unchanged. - Users opt in by constructing the driver via the `ColumnReaderFactory`-accepting overload of `SegmentIndexCreationDriverImpl.init`. - The shared converter extraction does not change row-major behavior. - No SPI changes — the new code consumes the already-merged `ColumnReader` / `ColumnReaderFactory` interfaces. --- ### References - `ColumnReader` SPI: #16727 - Row-major `ArrowRecordExtractor` refactor: #18434 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
