icexelloss opened a new pull request, #36253: URL: https://github.com/apache/arrow/pull/36253
### Rationale for this change In https://github.com/apache/arrow/issues/35515, I have implemented a Scalar version of the non decomposable UDF (Scalar as in SCALAR_AGGREGATE). I would like to support the Hash version of it (Hash as in HASH_AGGREGATE) With this PR, user can register an aggregate UDF once with `pc.register_aggregate_function` and it can be used as both scalar aggregate function and hash aggregate function. Example: ``` def median(x): return pa.scalar(np.nanmedian(x)) pc.register_aggregate_function(func=median, func_name='median_udf', ...) table = ... table.groupby("id").aggregate(["v", 'median_udf']) ``` <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ### What changes are included in this PR? The main changes are: * In ResigterAggregateFunction (udf.cc), we now register the function both as a scalar aggregate function and a hash aggregate function (with signature adjustment for hash aggregate kernel because we need to append the grouping key) * Implemented PythonUdfHashAggregateImpl, similar to the PythonUdfScalarAggregateImpl. In Consume, it will accumulate both the input batches and the group id array. In Merge, it will merge the input batches and group id array (with the group_id_mapping). In Finalize, it will apply groupings to the accumulated batches to create one record batch per group, then apply the UDF over each group. For table.groupby().aggregate(...), the space complexity is O(n) where n is the size of the table (and therefore, is not very useful). However, this is more useful in the segmented aggregation case, where the space complexity of O(s), where s the size of the segments. ### Are these changes tested? Added new test in test_udf.py (with table.group_by().aggregate() and test_substrait.py (with segmented aggregation) <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ### Are there any user-facing changes? Yes with this change, user can call use registered aggregate UDF with `table.group_by().aggregate() ` or Acero's segmented aggregation. ### Checklist [ ] Self Review [ ] API Documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org