Hi everyone,
I've been doing some investigations into how Arrow might fit into Beam as a
way to ramp up on the project. As I've gone about this I've prototyped a
couple of additions to the Python SDK. I think these additions may be
useful for others so I'm considering cleaning them up and submitting PRs,
but I wanted to have a discussion here to see if it makes sense first.

Note that the approach I'm using for this work right now is very naive.
I've just built pipelines where individual elements are actually arrow
record batches (or pandas DataFrames). This is really only acceptable for
bounded pipelines without windowing, since it's impossible to define a
single event time for each element. That being said, I think these tools
could still be useful for people who want to run batch pipelines using
parquet, arrow, and pandas.

Here's what I've implemented so far:
# An option for io.ReadFromParquet to yield arrow record batches instead of
individual elements
Currently the python SDK's parquet reader uses pyarrow.parquet to read
parquet row groups into arrow record batches, and then splits the batches
into a single dictionary per row [1]. I've added a flag to optionally
short-circuit this and just yield the arrow record batches directly, making
it easier for me to build pipelines that process columnar batches. If I
were to contribute this change I could also split out the record batch <->
dictionary conversions as separate transforms, since they could be
generally useful as well.

# Custom coders for Arrow Tables and Record Batches
I found that the default coder (pickle/dill?) slowed down my arrow
pipelines, particularly in the case where a record batch had been sliced
into smaller record batches (presumably because the entire original batch
is getting serialized for each slice). I put together some coders that
encode arrow tables and record batches with arrow's IPC formats, which
improves performance substantially.


Would it make sense to add these things to the Python SDK? Or would they
fit better in a separate library of utilities for building pipelines with
batched data?

Brian

[1]
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/parquetio.py#L239

Reply via email to