andygrove opened a new pull request, #4782: URL: https://github.com/apache/datafusion-comet/pull/4782
## Which issue does this PR close? Closes #3970. ## Rationale for this change The `mode` aggregate (the most frequent value in a group) is a mainstream statistical aggregate that previously fell back to Spark, preventing native execution of any query using it. Adding native support keeps these queries in Comet's pipeline. ## What changes are included in this PR? This PR was scaffolded with the `implement-comet-expression` project skill. - Native Rust implementation in `native/spark-expr/src/agg_funcs/mode.rs`: a `Mode` aggregate UDF with both a global `Accumulator` and a vectorized `GroupsAccumulator`. State is a frequency map keyed by `ScalarValue`, serialized as a single `struct<values: array<T>, counts: array<bigint>>` buffer column so partial/final buffer schemas stay aligned with Spark's single-attribute `TypedImperativeAggregate` buffer. - Protobuf `Mode` message and `AggExpr` oneof entry, plus the planner arm in `planner.rs`. - `CometMode` serde and registration in `QueryPlanSerde.aggrSerdeMap`. - A `Mode` branch in `adjustOutputForNativeState` (`operators.scala`) mapping the Spark binary buffer type to the native struct state type. - A `modeHasUnsupportedOrdering` shim in `CometTypeShim` (spark-3.x / spark-4.x) because `Mode.reverseOpt` only exists on Spark 4.0+. - Docs: `mode` marked supported in the expressions guide. Scope and compatibility: - Only the plain `mode(col)` form is supported. The `mode(col, deterministic)` and `mode() WITHIN GROUP (ORDER BY col)` forms (Spark 4.0+, which set `reverseOpt`) fall back to Spark. - Registered as `Incompatible` (opt-in via `spark.comet.expression.Mode.allowIncompatible=true`): Spark breaks ties non-deterministically based on JVM hash-map iteration order, which a native hash map cannot reproduce bit-for-bit. Comet instead returns the smallest tied value deterministically. - NULLs are ignored, empty input returns NULL, and float keys are normalized (`-0.0` to `0.0`, canonical `NaN`) to match Spark's counting. Supported input types are numeric, boolean, decimal, date, timestamp, timestamp_ntz, and default-collation string; other types fall back. ## How are these changes tested? - Rust unit tests in `mode.rs` covering most-frequent value, tie-break to smallest, NULL handling, empty input, float normalization, and partial/final merge equivalence for both the accumulator and the groups accumulator. - A `mode.sql` file test exercising global and grouped aggregation, NULLs, all-NULL groups, mixed aggregates, HAVING, and boolean/integer/double/decimal/string/date/timestamp inputs, plus an unsupported-type fallback assertion. Verified on Spark 3.5 and Spark 4.1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
