I'm presently working on building general streaming / iterative function execution machinery that should eventually serve this use case (see recent patches in arrow/compute/* and related JIRAs), but there are not yet APIs available that do exactly what you're looking for. Depending on your appetite for low-level development you can look at the details of how functions like DictionaryEncode are executed on chunked data (see ExecBatchIterator and VectorExecutor in compute/exec_internal.h / compute/exec.cc)
On Mon, Jun 8, 2020 at 2:21 AM Yue Ni <[email protected]> wrote: > > Hi there, > > I am experimenting some computation over a stream of record batches, and > would like to use some functions in arrow::compute. Currently, the functions > in arrow::compute accepts *Datum* data structure in its API, which allows > users to pass: 1) Array 2) ChunkedArray 3) Table to the API. However, in my > case, I have a stream of record batches to read from an Java iterator like > interface, basically, it allows you to read a new batch at a time using the > "next" function. > > I can adapt it to the arrow::RecordBatchReader interface, and I wonder how I > can apply some arrow compute functions like "sum"/"dictionary encode" to the > record batch streams like this. > > Is this possible and what the recommended way is to do this in Arrow? I am > aware that I can put multiple non contiguous arrays into a ChunkedArray and > consume it using the arrow compute functions, but that requires users to > consume all the stream to the end and buffer them all in memory because users > need to construct a vector of Array from the record batch stream (if I > understand ChunkedArray correctly), which is not necessary in many cases. For > example, for "sum", I think only the global sum state and the specific array > in the current batch is needed in memory for such computation, so I would > like to know if there is an alternative approach doing it. Thanks. > > Regards, > Yue
