Hi,
I am trying to write a function that takes a stream of record batches
(where the last column is group id), and produces k record batches, where
record batches k_i contain all the rows with group id == i.
Pseudocode is sth like:
def group_rows(batches, k) -> array[RecordBatch] {
builders = array[RecordBatchBuilder](k)
for batch in batches:
# Assuming last column is the group id
group_ids = batch.column(-1)
for i in batch.num_rows():
k_i = group_ids[i]
builders[k_i].append(batch[i])
batches = array[RecordBatch](k)
for i in range(k):
batches[i] = builders[i].build()
return batches
}
I wonder if there is some existing code that does something like this?
(Specially I didn't find code that can append row/rows to a
RecordBatchBuilder (either one row given an row index, or multiple rows
given a list of row indices)