Thanks Weston.

I think I found what you pointed out to me before which is this bit of code:
https://github.com/apache/arrow/blob/main/cpp/src/arrow/dataset/partition.cc#L118
I will try if I can adapt this to be used in streaming situation.

> I know you recently added [1] and I'm maybe a little uncertain what
> the difference is between this ask and the capabilities added in [1].

In [1], I implemented scalar aggregate UDF, which just concat all the input
batches and applies the UDF. Now I am trying to implement the grouped/hash
aggregate version. The idea what is the group by node will call:
https://github.com/apache/arrow/blob/main/cpp/src/arrow/acero/aggregate_node.cc#L800

So I am looking into implementing a hash aggregate kernel backed by a
Python UDF, which in turn, need to implement the logic that I asked above.

On Tue, Jun 13, 2023 at 3:11 PM Weston Pace <weston.p...@gmail.com> wrote:

> Are you looking for something in C++ or python?  We have a thing called the
> "grouper" (arrow::compute::Grouper in arrow/compute/row/grouper.h) which
> (if memory serves) is the heart of the functionality in C++.  It would be
> nice to add some python bindings for this functionality as this ask comes
> up from pyarrow users pretty regularly.
>
> The grouper is used in src/arrow/dataset/partition.h to partition a record
> batch into groups of batches.  This is how the dataset writer writes a
> partitioned dataset.  It's a good example of how you would use the grouper
> for a "one batch in, one batch per group out" use case.
>
> The grouper can also be used in a streaming situation (many batches in, one
> batch per group out).  In fact, the grouper is what is used by the group by
> node.  I know you recently added [1] and I'm maybe a little uncertain what
> the difference is between this ask and the capabilities added in [1].
>
> [1] https://github.com/apache/arrow/pull/35514
>
> On Tue, Jun 13, 2023 at 8:23 AM Li Jin <ice.xell...@gmail.com> wrote:
>
> > Hi,
> >
> > I am trying to write a function that takes a stream of record batches
> > (where the last column is group id), and produces k record batches, where
> > record batches k_i contain all the rows with group id == i.
> >
> > Pseudocode is sth like:
> >
> > def group_rows(batches, k) -> array[RecordBatch] {
> >       builders = array[RecordBatchBuilder](k)
> >       for batch in batches:
> >            # Assuming last column is the group id
> >            group_ids = batch.column(-1)
> >            for i in batch.num_rows():
> >                 k_i = group_ids[i]
> >                 builders[k_i].append(batch[i])
> >
> >        batches = array[RecordBatch](k)
> >        for i in range(k):
> >            batches[i] = builders[i].build()
> >        return batches
> > }
> >
> > I wonder if there is some existing code that does something like this?
> > (Specially I didn't find code that can append row/rows to a
> > RecordBatchBuilder (either one row given an row index, or multiple rows
> > given a list of row indices)
> >
>

Reply via email to