(Admittedly, PR title of [1] doesn't reflect that only the scalar aggregate
UDF is implemented and not the hash one - that is an oversight on my part -
sorry)

On Tue, Jun 13, 2023 at 3:51 PM Li Jin <ice.xell...@gmail.com> wrote:

> Thanks Weston.
>
> I think I found what you pointed out to me before which is this bit of
> code:
>
> https://github.com/apache/arrow/blob/main/cpp/src/arrow/dataset/partition.cc#L118
> I will try if I can adapt this to be used in streaming situation.
>
> > I know you recently added [1] and I'm maybe a little uncertain what
> > the difference is between this ask and the capabilities added in [1].
>
> In [1], I implemented scalar aggregate UDF, which just concat all the
> input batches and applies the UDF. Now I am trying to implement the
> grouped/hash aggregate version. The idea what is the group by node will
> call:
>
> https://github.com/apache/arrow/blob/main/cpp/src/arrow/acero/aggregate_node.cc#L800
>
> So I am looking into implementing a hash aggregate kernel backed by a
> Python UDF, which in turn, need to implement the logic that I asked above.
>
> On Tue, Jun 13, 2023 at 3:11 PM Weston Pace <weston.p...@gmail.com> wrote:
>
>> Are you looking for something in C++ or python?  We have a thing called
>> the
>> "grouper" (arrow::compute::Grouper in arrow/compute/row/grouper.h) which
>> (if memory serves) is the heart of the functionality in C++.  It would be
>> nice to add some python bindings for this functionality as this ask comes
>> up from pyarrow users pretty regularly.
>>
>> The grouper is used in src/arrow/dataset/partition.h to partition a record
>> batch into groups of batches.  This is how the dataset writer writes a
>> partitioned dataset.  It's a good example of how you would use the grouper
>> for a "one batch in, one batch per group out" use case.
>>
>> The grouper can also be used in a streaming situation (many batches in,
>> one
>> batch per group out).  In fact, the grouper is what is used by the group
>> by
>> node.  I know you recently added [1] and I'm maybe a little uncertain what
>> the difference is between this ask and the capabilities added in [1].
>>
>> [1] https://github.com/apache/arrow/pull/35514
>>
>> On Tue, Jun 13, 2023 at 8:23 AM Li Jin <ice.xell...@gmail.com> wrote:
>>
>> > Hi,
>> >
>> > I am trying to write a function that takes a stream of record batches
>> > (where the last column is group id), and produces k record batches,
>> where
>> > record batches k_i contain all the rows with group id == i.
>> >
>> > Pseudocode is sth like:
>> >
>> > def group_rows(batches, k) -> array[RecordBatch] {
>> >       builders = array[RecordBatchBuilder](k)
>> >       for batch in batches:
>> >            # Assuming last column is the group id
>> >            group_ids = batch.column(-1)
>> >            for i in batch.num_rows():
>> >                 k_i = group_ids[i]
>> >                 builders[k_i].append(batch[i])
>> >
>> >        batches = array[RecordBatch](k)
>> >        for i in range(k):
>> >            batches[i] = builders[i].build()
>> >        return batches
>> > }
>> >
>> > I wonder if there is some existing code that does something like this?
>> > (Specially I didn't find code that can append row/rows to a
>> > RecordBatchBuilder (either one row given an row index, or multiple rows
>> > given a list of row indices)
>> >
>>
>

Reply via email to