icexelloss opened a new pull request, #36253:
URL: https://github.com/apache/arrow/pull/36253

   ### Rationale for this change
   
   In https://github.com/apache/arrow/issues/35515,
   
   I have implemented a Scalar version of the non decomposable UDF (Scalar as 
in SCALAR_AGGREGATE). I would like to support the Hash version of it (Hash as 
in HASH_AGGREGATE)
   
   With this PR, user can register an aggregate UDF once with 
`pc.register_aggregate_function` and it can be used as both scalar aggregate 
function and hash aggregate function.
   
   Example:
   
   ```
   def median(x):
       return pa.scalar(np.nanmedian(x))
   
   pc.register_aggregate_function(func=median, func_name='median_udf', ...)
   
   table = ...
   table.groupby("id").aggregate(["v", 'median_udf'])
   ```
   
   <!--
    Why are you proposing this change? If this is already explained clearly in 
the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your 
changes and offer better suggestions for fixes.  
   -->
   
   ### What changes are included in this PR?
   
   The main changes are:
   * In ResigterAggregateFunction (udf.cc), we now register the function both 
as a scalar aggregate function and a hash aggregate function (with signature 
adjustment for hash aggregate kernel because we need to append the grouping key)
   * Implemented PythonUdfHashAggregateImpl, similar to the 
PythonUdfScalarAggregateImpl. In Consume, it will accumulate both the input 
batches and the group id array. In Merge, it will merge the input batches and 
group id array (with the group_id_mapping). In Finalize, it will apply 
groupings to the accumulated batches to create one record batch per group, then 
apply the UDF over each group.
   
   For table.groupby().aggregate(...), the space complexity is O(n) where n is 
the size of the table (and therefore, is not very useful). However, this is 
more useful in the segmented aggregation case, where the space complexity of 
O(s), where s the size of the segments.
   
   ### Are these changes tested?
   Added new test in test_udf.py (with table.group_by().aggregate() and 
test_substrait.py (with segmented aggregation)
   
   
   <!--
   We typically require tests for all PRs in order to:
   1. Prevent the code from being accidentally broken by subsequent changes
   2. Serve as another way to document the expected behavior of the code
   
   If tests are not included in your PR, please explain why (for example, are 
they covered by existing tests)?
   -->
   
   ### Are there any user-facing changes?
   Yes with this change, user can call use registered aggregate UDF with 
`table.group_by().aggregate() ` or Acero's segmented aggregation.
   
   ### Checklist
   
   [ ] Self Review
   [ ] API Documentation


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to