baibaichen opened a new pull request, #12188: URL: https://github.com/apache/gluten/pull/12188
### What changes were proposed in this pull request? Closes #12187. Adds a Velox implementation of Spark's `AttachDistributedSequenceExec` (prepends a contiguous `Long` id column to child output). Used by pandas-on-Spark distributed-sequence index and `DataFrame.zipWithIndex`. ### How is this implemented? **Plan-level** - New abstract base `ColumnarAttachDistributedSequenceBaseExec` in `gluten-substrait/` with factory `from(plan)` delegating to the backend API. - New offload rule case + validator gate in `OffloadSingleNodeRules` / `Validators`. - New backend hook `genColumnarAttachDistributedSequenceExec` on `SparkPlanExecApi`. Velox override returns the columnar impl; the CH override throws `GlutenNotSupportException` until that backend is ported. - Config: `spark.gluten.sql.columnar.attachDistributedSequence` (default `true`) lets users disable the offload. **Velox runtime (`ColumnarAttachDistributedSequenceExec`)** For >1 partition: 1. Materialize the child output once via Gluten's existing `ColumnarCachedBatchSerializer`, persisted at `MEMORY_AND_DISK_SER`. The cache blob is Velox-native serialization (`CachedColumnarBatch`) — kryo-friendly and typically much more compact than unsafe-row SER. 2. **Count pass**: read `CachedColumnarBatch.numRows` of partitions `[0, numPartitions - 1)` — no native deserialization. 3. **Assign pass**: `convertCachedBatchToColumnarBatch` → Velox-native batch → `ColumnarBatches.load` (zero-copy Arrow C-Data ABI handoff) → prepend one `ArrowWritableColumnVector` with the id column. Single-partition queries skip caching entirely (`startOffset = 0`). **Memory hygiene** - The base class exposes a `doColumnarCleanup()` hook called from `cleanupResources()`. The Velox impl uses it to `unpersist` the cached RDD when the query finishes, so BlockManager does not hold the serialized batches beyond the operator's lifetime. - The persisted RDD is cached behind a `synchronized` accessor so repeated `doExecuteColumnar()` calls share a single `persist()` handle. - `assignIds` wraps the per-batch build in a `try/catch` that closes the freshly-loaded heavy batch on failure, so a mid-build OOM (e.g. while allocating the id vector) cannot leak Arrow buffers. ### Alternative considered We considered the row-mode pipeline `Velox → C2R → RDD[InternalRow] cached → R2C`. For Gluten that costs a full C2R/R2C transition on every row, and the unsafe-row serialized cache is typically 2–5× larger than Velox-native serialization for wide / nested data. The columnar path keeps everything columnar and pays serialization only once. ### How was this patch tested? New `VeloxAttachDistributedSequenceExecSuite` in `backends-velox`. ### Does this PR introduce any user-facing change? Yes — a new config: - `spark.gluten.sql.columnar.attachDistributedSequence` (default `true`). When enabled, `df.zipWithIndex` and pandas-on-Spark `distributed-sequence` index materialize the id column columnarly on Velox instead of falling back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
