(Admittedly, PR title of [1] doesn't reflect that only the scalar aggregate UDF is implemented and not the hash one - that is an oversight on my part - sorry)
On Tue, Jun 13, 2023 at 3:51 PM Li Jin <ice.xell...@gmail.com> wrote: > Thanks Weston. > > I think I found what you pointed out to me before which is this bit of > code: > > https://github.com/apache/arrow/blob/main/cpp/src/arrow/dataset/partition.cc#L118 > I will try if I can adapt this to be used in streaming situation. > > > I know you recently added [1] and I'm maybe a little uncertain what > > the difference is between this ask and the capabilities added in [1]. > > In [1], I implemented scalar aggregate UDF, which just concat all the > input batches and applies the UDF. Now I am trying to implement the > grouped/hash aggregate version. The idea what is the group by node will > call: > > https://github.com/apache/arrow/blob/main/cpp/src/arrow/acero/aggregate_node.cc#L800 > > So I am looking into implementing a hash aggregate kernel backed by a > Python UDF, which in turn, need to implement the logic that I asked above. > > On Tue, Jun 13, 2023 at 3:11 PM Weston Pace <weston.p...@gmail.com> wrote: > >> Are you looking for something in C++ or python? We have a thing called >> the >> "grouper" (arrow::compute::Grouper in arrow/compute/row/grouper.h) which >> (if memory serves) is the heart of the functionality in C++. It would be >> nice to add some python bindings for this functionality as this ask comes >> up from pyarrow users pretty regularly. >> >> The grouper is used in src/arrow/dataset/partition.h to partition a record >> batch into groups of batches. This is how the dataset writer writes a >> partitioned dataset. It's a good example of how you would use the grouper >> for a "one batch in, one batch per group out" use case. >> >> The grouper can also be used in a streaming situation (many batches in, >> one >> batch per group out). In fact, the grouper is what is used by the group >> by >> node. I know you recently added [1] and I'm maybe a little uncertain what >> the difference is between this ask and the capabilities added in [1]. >> >> [1] https://github.com/apache/arrow/pull/35514 >> >> On Tue, Jun 13, 2023 at 8:23 AM Li Jin <ice.xell...@gmail.com> wrote: >> >> > Hi, >> > >> > I am trying to write a function that takes a stream of record batches >> > (where the last column is group id), and produces k record batches, >> where >> > record batches k_i contain all the rows with group id == i. >> > >> > Pseudocode is sth like: >> > >> > def group_rows(batches, k) -> array[RecordBatch] { >> > builders = array[RecordBatchBuilder](k) >> > for batch in batches: >> > # Assuming last column is the group id >> > group_ids = batch.column(-1) >> > for i in batch.num_rows(): >> > k_i = group_ids[i] >> > builders[k_i].append(batch[i]) >> > >> > batches = array[RecordBatch](k) >> > for i in range(k): >> > batches[i] = builders[i].build() >> > return batches >> > } >> > >> > I wonder if there is some existing code that does something like this? >> > (Specially I didn't find code that can append row/rows to a >> > RecordBatchBuilder (either one row given an row index, or multiple rows >> > given a list of row indices) >> > >> >