Thank you Brian, this looks promising. cc: +Chamikara Jayalath <[email protected]> +Heejong Lee <[email protected]>
On Wed, Mar 27, 2019 at 1:22 PM Brian Hulette <[email protected]> wrote: > Hi everyone, > I've been doing some investigations into how Arrow might fit into Beam as > a way to ramp up on the project. As I've gone about this I've prototyped a > couple of additions to the Python SDK. I think these additions may be > useful for others so I'm considering cleaning them up and submitting PRs, > but I wanted to have a discussion here to see if it makes sense first. > > Note that the approach I'm using for this work right now is very naive. > I've just built pipelines where individual elements are actually arrow > record batches (or pandas DataFrames). This is really only acceptable for > bounded pipelines without windowing, since it's impossible to define a > single event time for each element. That being said, I think these tools > could still be useful for people who want to run batch pipelines using > parquet, arrow, and pandas. > I agree these will be generally useful. > > Here's what I've implemented so far: > # An option for io.ReadFromParquet to yield arrow record batches instead > of individual elements > Currently the python SDK's parquet reader uses pyarrow.parquet to read > parquet row groups into arrow record batches, and then splits the batches > into a single dictionary per row [1]. I've added a flag to optionally > short-circuit this and just yield the arrow record batches directly, making > it easier for me to build pipelines that process columnar batches. If I > were to contribute this change I could also split out the record batch <-> > dictionary conversions as separate transforms, since they could be > generally useful as well. > I think splitting to new transforms rather that adding new options to existing IO transforms would be simpler for users. I think this would be a question that could be easier to answer with a PR. > # Custom coders for Arrow Tables and Record Batches > I found that the default coder (pickle/dill?) slowed down my arrow > pipelines, particularly in the case where a record batch had been sliced > into smaller record batches (presumably because the entire original batch > is getting serialized for each slice). I put together some coders that > encode arrow tables and record batches with arrow's IPC formats, which > improves performance substantially. > How did you measure this? It would be good for us to also have relevant micro benchmarks here. > > Would it make sense to add these things to the Python SDK? Or would they > fit better in a separate library of utilities for building pipelines with > batched data? > +1 for adding to Beam. Python SDK has a list of utility transforms (e.g. BatchElements), new additions could also live in that space. > > Brian > > [1] > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/parquetio.py#L239 >
