baibaichen opened a new pull request, #12188:
URL: https://github.com/apache/gluten/pull/12188

   ### What changes were proposed in this pull request?
   
   Closes #12187.
   
   Adds a Velox implementation of Spark's `AttachDistributedSequenceExec` 
(prepends a contiguous `Long` id column to child output). Used by 
pandas-on-Spark distributed-sequence index and `DataFrame.zipWithIndex`.
   
   ### How is this implemented?
   
   **Plan-level**
   
   - New abstract base `ColumnarAttachDistributedSequenceBaseExec` in 
`gluten-substrait/` with factory `from(plan)` delegating to the backend API.
   - New offload rule case + validator gate in `OffloadSingleNodeRules` / 
`Validators`.
   - New backend hook `genColumnarAttachDistributedSequenceExec` on 
`SparkPlanExecApi`. Velox override returns the columnar impl; the CH override 
throws `GlutenNotSupportException` until that backend is ported.
   - Config: `spark.gluten.sql.columnar.attachDistributedSequence` (default 
`true`) lets users disable the offload.
   
   **Velox runtime (`ColumnarAttachDistributedSequenceExec`)**
   
   For >1 partition:
   
   1. Materialize the child output once via Gluten's existing 
`ColumnarCachedBatchSerializer`, persisted at `MEMORY_AND_DISK_SER`. The cache 
blob is Velox-native serialization (`CachedColumnarBatch`) — kryo-friendly and 
typically much more compact than unsafe-row SER.
   2. **Count pass**: read `CachedColumnarBatch.numRows` of partitions `[0, 
numPartitions - 1)` — no native deserialization.
   3. **Assign pass**: `convertCachedBatchToColumnarBatch` → Velox-native batch 
→ `ColumnarBatches.load` (zero-copy Arrow C-Data ABI handoff) → prepend one 
`ArrowWritableColumnVector` with the id column.
   
   Single-partition queries skip caching entirely (`startOffset = 0`).
   
   **Memory hygiene**
   
   - The base class exposes a `doColumnarCleanup()` hook called from 
`cleanupResources()`. The Velox impl uses it to `unpersist` the cached RDD when 
the query finishes, so BlockManager does not hold the serialized batches beyond 
the operator's lifetime.
   - The persisted RDD is cached behind a `synchronized` accessor so repeated 
`doExecuteColumnar()` calls share a single `persist()` handle.
   - `assignIds` wraps the per-batch build in a `try/catch` that closes the 
freshly-loaded heavy batch on failure, so a mid-build OOM (e.g. while 
allocating the id vector) cannot leak Arrow buffers.
   
   ### Alternative considered
   
   We considered the row-mode pipeline `Velox → C2R → RDD[InternalRow] cached → 
R2C`. For Gluten that costs a full C2R/R2C transition on every row, and the 
unsafe-row serialized cache is typically 2–5× larger than Velox-native 
serialization for wide / nested data. The columnar path keeps everything 
columnar and pays serialization only once.
   
   ### How was this patch tested?
   
   New `VeloxAttachDistributedSequenceExecSuite` in `backends-velox`.
   
   ### Does this PR introduce any user-facing change?
   
   Yes — a new config:
   
   - `spark.gluten.sql.columnar.attachDistributedSequence` (default `true`).
   
   When enabled, `df.zipWithIndex` and pandas-on-Spark `distributed-sequence` 
index materialize the id column columnarly on Velox instead of falling back.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to