Thanks Weston. I think I found what you pointed out to me before which is this bit of code: https://github.com/apache/arrow/blob/main/cpp/src/arrow/dataset/partition.cc#L118 I will try if I can adapt this to be used in streaming situation.
> I know you recently added [1] and I'm maybe a little uncertain what > the difference is between this ask and the capabilities added in [1]. In [1], I implemented scalar aggregate UDF, which just concat all the input batches and applies the UDF. Now I am trying to implement the grouped/hash aggregate version. The idea what is the group by node will call: https://github.com/apache/arrow/blob/main/cpp/src/arrow/acero/aggregate_node.cc#L800 So I am looking into implementing a hash aggregate kernel backed by a Python UDF, which in turn, need to implement the logic that I asked above. On Tue, Jun 13, 2023 at 3:11 PM Weston Pace <weston.p...@gmail.com> wrote: > Are you looking for something in C++ or python? We have a thing called the > "grouper" (arrow::compute::Grouper in arrow/compute/row/grouper.h) which > (if memory serves) is the heart of the functionality in C++. It would be > nice to add some python bindings for this functionality as this ask comes > up from pyarrow users pretty regularly. > > The grouper is used in src/arrow/dataset/partition.h to partition a record > batch into groups of batches. This is how the dataset writer writes a > partitioned dataset. It's a good example of how you would use the grouper > for a "one batch in, one batch per group out" use case. > > The grouper can also be used in a streaming situation (many batches in, one > batch per group out). In fact, the grouper is what is used by the group by > node. I know you recently added [1] and I'm maybe a little uncertain what > the difference is between this ask and the capabilities added in [1]. > > [1] https://github.com/apache/arrow/pull/35514 > > On Tue, Jun 13, 2023 at 8:23 AM Li Jin <ice.xell...@gmail.com> wrote: > > > Hi, > > > > I am trying to write a function that takes a stream of record batches > > (where the last column is group id), and produces k record batches, where > > record batches k_i contain all the rows with group id == i. > > > > Pseudocode is sth like: > > > > def group_rows(batches, k) -> array[RecordBatch] { > > builders = array[RecordBatchBuilder](k) > > for batch in batches: > > # Assuming last column is the group id > > group_ids = batch.column(-1) > > for i in batch.num_rows(): > > k_i = group_ids[i] > > builders[k_i].append(batch[i]) > > > > batches = array[RecordBatch](k) > > for i in range(k): > > batches[i] = builders[i].build() > > return batches > > } > > > > I wonder if there is some existing code that does something like this? > > (Specially I didn't find code that can append row/rows to a > > RecordBatchBuilder (either one row given an row index, or multiple rows > > given a list of row indices) > > >