Thank you Brian, this looks promising.

cc: +Chamikara Jayalath <[email protected]> +Heejong Lee
<[email protected]>

On Wed, Mar 27, 2019 at 1:22 PM Brian Hulette <[email protected]> wrote:

> Hi everyone,
> I've been doing some investigations into how Arrow might fit into Beam as
> a way to ramp up on the project. As I've gone about this I've prototyped a
> couple of additions to the Python SDK. I think these additions may be
> useful for others so I'm considering cleaning them up and submitting PRs,
> but I wanted to have a discussion here to see if it makes sense first.
>
> Note that the approach I'm using for this work right now is very naive.
> I've just built pipelines where individual elements are actually arrow
> record batches (or pandas DataFrames). This is really only acceptable for
> bounded pipelines without windowing, since it's impossible to define a
> single event time for each element. That being said, I think these tools
> could still be useful for people who want to run batch pipelines using
> parquet, arrow, and pandas.
>

I agree these will be generally useful.


>
> Here's what I've implemented so far:
> # An option for io.ReadFromParquet to yield arrow record batches instead
> of individual elements
> Currently the python SDK's parquet reader uses pyarrow.parquet to read
> parquet row groups into arrow record batches, and then splits the batches
> into a single dictionary per row [1]. I've added a flag to optionally
> short-circuit this and just yield the arrow record batches directly, making
> it easier for me to build pipelines that process columnar batches. If I
> were to contribute this change I could also split out the record batch <->
> dictionary conversions as separate transforms, since they could be
> generally useful as well.
>

I think splitting to new transforms rather that adding new options to
existing IO transforms would be simpler for users.  I think this would be a
question that could be easier to answer with a PR.


> # Custom coders for Arrow Tables and Record Batches
> I found that the default coder (pickle/dill?) slowed down my arrow
> pipelines, particularly in the case where a record batch had been sliced
> into smaller record batches (presumably because the entire original batch
> is getting serialized for each slice). I put together some coders that
> encode arrow tables and record batches with arrow's IPC formats, which
> improves performance substantially.
>

How did you measure this? It would be good for us to also have relevant
micro benchmarks here.


>
> Would it make sense to add these things to the Python SDK? Or would they
> fit better in a separate library of utilities for building pipelines with
> batched data?
>

+1 for adding to Beam. Python SDK has a list of utility transforms
(e.g. BatchElements), new additions could also live in that space.


>
> Brian
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/parquetio.py#L239
>

Reply via email to