mbutrovich commented on PR #4076:
URL:
https://github.com/apache/datafusion-comet/pull/4076#issuecomment-4336987556
Thanks @andygrove! Categorized suggestions below:
## Performance
### One global `take` instead of per-map `sort_to_indices` + `take` +
`concat`?
`spark_map_sort` loops over every map in the batch, calling
`sort_to_indices` and `take` on each slice, then a final `concat` over `n`
struct slices. For `n` maps that's roughly `2n + 1` kernel calls and `2n + 1`
allocations (`n` small index arrays, `n` full per-map struct copies from
`take`, one full concat copy).
The version below keeps the `n` `sort_to_indices` calls but drops the rest:
one `Vec<u32>` grow (pre-sized), one `take` over the full entries, no `concat`.
That's `n + 1` kernel calls and roughly `n + 2` allocations, and crucially it
eliminates all per-map struct copies plus the final concat copy, which are the
expensive ones. Peak transient memory drops from roughly `3x` entry volume to
roughly `1x`.
Compare to DataFusion's `sort_batch`
(`datafusion/physical-plan/src/sorts/sort.rs:813`), which is what
`ExternalSorter` uses to sort an in-memory batch:
```rust
let indices = lexsort_to_indices(&sort_columns, fetch)?;
let columns = take_arrays(batch.columns(), &indices, None)?;
```
One global permutation, one take. No per-row work, no trailing concat.
`map_sort` can't call `sort_to_indices` once on the full key column. It
needs to sort *within* each map boundary. But the same shape applies: build one
global index buffer respecting group boundaries, then a single `take` on the
full entries:
```rust
let total = maps_arg_entries.len();
let mut global: Vec<u32> = Vec::with_capacity(total);
for idx in 0..maps_arg.len() {
let start = maps_arg_offsets[idx] as usize;
let end = maps_arg_offsets[idx + 1] as usize;
if end == start { continue; }
let keys = maps_arg_entries.column(0).slice(start, end - start);
let local = sort_to_indices(&keys, Some(sort_options), None)?;
global.extend(local.values().iter().map(|i| start as u32 + *i));
}
let indices = UInt32Array::from(global);
let sorted = take(&maps_arg_entries, &indices, None)?;
```
Offsets and nulls stay untouched. Output is one struct array, matching
`sort_batch`'s pattern of global-permutation-plus-single-take.
### Early outs?
Worth short-circuiting these up front?
```rust
if maps_arg.is_empty() { return Ok(ColumnarValue::Array(arr_arg)); }
if maps_arg.null_count() == maps_arg.len() { return
Ok(ColumnarValue::Array(arr_arg)); }
if *is_sorted { return Ok(ColumnarValue::Array(arr_arg)); }
```
### Microbenchmark numbers?
It would be nice to have before/after numbers in the PR description. Spark
4.0 map-shuffle now stays native, so there's a natural story to measure against
the prior fallback path.
## Spark compatibility
### NaN / -0.0 on float-keyed maps
Spark's key ordering goes through `Double.compare` (NaN sorts largest, -0.0
== 0.0). Arrow's `sort_to_indices` uses IEEE total ordering (-0.0 < 0.0, NaN at
extremes). Comet already tracks this class of incompatibility via
`spark.comet.exec.strictFloatingPoint`.
`CometSortOrder` (`serde/CometSortOrder.scala:34`) and `CometSortArray`
(`serde/arrays.scala:150`) both consult `COMET_EXEC_STRICT_FLOATING_POINT` and
return `Incompatible` when the child type contains Float/Double. The MapSort
shim in this PR doesn't. `MapSort` should follow the same pattern:
```scala
case ms: MapSort =>
val keyType = ms.dataType.asInstanceOf[MapType].keyType
if (!supportedScalarSortElementType(keyType)) {
withInfo(ms, s"MapSort on map with key type $keyType is not supported")
None
} else if (CometConf.COMET_EXEC_STRICT_FLOATING_POINT.get() &&
SupportLevel.containsFloatingPoint(keyType)) {
withInfo(ms, s"MapSort on floating-point key not compatible with Spark
when " +
s"${CometConf.COMET_EXEC_STRICT_FLOATING_POINT.key}=true")
None
} else {
// ... existing conversion
}
```
Default behavior is unchanged (Comet converts the expression, accepting the
float divergence). Users who set `strictFloatingPoint=true` get the same
fallback behavior they already get for `SortOrder` and `SortArray`.
`supportedRangePartitioningDataType` in `CometShuffleExchangeExec.scala:344`
has the same gap but is out of scope for this PR. I will open an issue to
follow up on that.
## Style nits
- `native/spark-expr/src/map_funcs/map_sort.rs:75,128`. `Arc::<dyn
Array>::clone(array)` and `Arc::<arrow::datatypes::Field>::clone(map_field)`
can be `Arc::clone(array)` / `map_field.clone()`.
- `.unwrap()` on the `downcast_ref` at `:81` and `:121`. An
`.expect("invariant: ...")` documents what's being assumed.
## Docs
Worth a mention in `docs/source/user-guide/compatibility.md` that Spark 4.0
map-shuffle / group-by-on-map now stays native (closing #1941, #3171)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]