mbutrovich commented on PR #4076:
URL: 
https://github.com/apache/datafusion-comet/pull/4076#issuecomment-4336987556

   Thanks @andygrove! Categorized suggestions below:
   
   ## Performance
   
   ### One global `take` instead of per-map `sort_to_indices` + `take` + 
`concat`?
   
   `spark_map_sort` loops over every map in the batch, calling 
`sort_to_indices` and `take` on each slice, then a final `concat` over `n` 
struct slices. For `n` maps that's roughly `2n + 1` kernel calls and `2n + 1` 
allocations (`n` small index arrays, `n` full per-map struct copies from 
`take`, one full concat copy).
   
   The version below keeps the `n` `sort_to_indices` calls but drops the rest: 
one `Vec<u32>` grow (pre-sized), one `take` over the full entries, no `concat`. 
That's `n + 1` kernel calls and roughly `n + 2` allocations, and crucially it 
eliminates all per-map struct copies plus the final concat copy, which are the 
expensive ones. Peak transient memory drops from roughly `3x` entry volume to 
roughly `1x`.
   
   Compare to DataFusion's `sort_batch` 
(`datafusion/physical-plan/src/sorts/sort.rs:813`), which is what 
`ExternalSorter` uses to sort an in-memory batch:
   
   ```rust
   let indices = lexsort_to_indices(&sort_columns, fetch)?;
   let columns = take_arrays(batch.columns(), &indices, None)?;
   ```
   
   One global permutation, one take. No per-row work, no trailing concat.
   
   `map_sort` can't call `sort_to_indices` once on the full key column. It 
needs to sort *within* each map boundary. But the same shape applies: build one 
global index buffer respecting group boundaries, then a single `take` on the 
full entries:
   
   ```rust
   let total = maps_arg_entries.len();
   let mut global: Vec<u32> = Vec::with_capacity(total);
   
   for idx in 0..maps_arg.len() {
       let start = maps_arg_offsets[idx] as usize;
       let end   = maps_arg_offsets[idx + 1] as usize;
       if end == start { continue; }
   
       let keys  = maps_arg_entries.column(0).slice(start, end - start);
       let local = sort_to_indices(&keys, Some(sort_options), None)?;
       global.extend(local.values().iter().map(|i| start as u32 + *i));
   }
   
   let indices = UInt32Array::from(global);
   let sorted  = take(&maps_arg_entries, &indices, None)?;
   ```
   
   Offsets and nulls stay untouched. Output is one struct array, matching 
`sort_batch`'s pattern of global-permutation-plus-single-take.
   
   ### Early outs?
   
   Worth short-circuiting these up front?
   
   ```rust
   if maps_arg.is_empty() { return Ok(ColumnarValue::Array(arr_arg)); }
   if maps_arg.null_count() == maps_arg.len() { return 
Ok(ColumnarValue::Array(arr_arg)); }
   if *is_sorted { return Ok(ColumnarValue::Array(arr_arg)); }
   ```
   
   ### Microbenchmark numbers?
   
   It would be nice to have before/after numbers in the PR description. Spark 
4.0 map-shuffle now stays native, so there's a natural story to measure against 
the prior fallback path.
   
   ## Spark compatibility
   
   ### NaN / -0.0 on float-keyed maps
   
   Spark's key ordering goes through `Double.compare` (NaN sorts largest, -0.0 
== 0.0). Arrow's `sort_to_indices` uses IEEE total ordering (-0.0 < 0.0, NaN at 
extremes). Comet already tracks this class of incompatibility via 
`spark.comet.exec.strictFloatingPoint`.
   
   `CometSortOrder` (`serde/CometSortOrder.scala:34`) and `CometSortArray` 
(`serde/arrays.scala:150`) both consult `COMET_EXEC_STRICT_FLOATING_POINT` and 
return `Incompatible` when the child type contains Float/Double. The MapSort 
shim in this PR doesn't. `MapSort` should follow the same pattern:
   
   ```scala
   case ms: MapSort =>
     val keyType = ms.dataType.asInstanceOf[MapType].keyType
     if (!supportedScalarSortElementType(keyType)) {
       withInfo(ms, s"MapSort on map with key type $keyType is not supported")
       None
     } else if (CometConf.COMET_EXEC_STRICT_FLOATING_POINT.get() &&
                SupportLevel.containsFloatingPoint(keyType)) {
       withInfo(ms, s"MapSort on floating-point key not compatible with Spark 
when " +
         s"${CometConf.COMET_EXEC_STRICT_FLOATING_POINT.key}=true")
       None
     } else {
       // ... existing conversion
     }
   ```
   
   Default behavior is unchanged (Comet converts the expression, accepting the 
float divergence). Users who set `strictFloatingPoint=true` get the same 
fallback behavior they already get for `SortOrder` and `SortArray`.
   
   `supportedRangePartitioningDataType` in `CometShuffleExchangeExec.scala:344` 
has the same gap but is out of scope for this PR. I will open an issue to 
follow up on that.
   
   ## Style nits
   
   - `native/spark-expr/src/map_funcs/map_sort.rs:75,128`. `Arc::<dyn 
Array>::clone(array)` and `Arc::<arrow::datatypes::Field>::clone(map_field)` 
can be `Arc::clone(array)` / `map_field.clone()`.
   - `.unwrap()` on the `downcast_ref` at `:81` and `:121`. An 
`.expect("invariant: ...")` documents what's being assumed.
   
   ## Docs
   
   Worth a mention in `docs/source/user-guide/compatibility.md` that Spark 4.0 
map-shuffle / group-by-on-map now stays native (closing #1941, #3171)?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to