On Wed, Mar 27, 2019 at 9:19 PM Kenneth Knowles <[email protected]> wrote:
> Thinking about Arrow + Beam SQL + schemas: > > - Obviously many SQL operations could be usefully accelerated by arrow / > columnar. Especially in the analytical realm this is the new normal. For > ETL, perhaps less so. > > - Beam SQL planner (pipeline construction) is implemented in Java, and so > the various DoFns/CombineFns that implement projection, filter, etc, are > also in Java. > - Arrow is of course available in Java. > - Presumably there is a pandas counterpart in Java. Is there? Do you > know? > - But perhaps if these building blocks emitted by the planner had > well-defined URNs we could use SIMD+columnar Java or Python implementation > opportunistically to avoid cross-language data channels. (thinking ahead to > when cross-language allows Python pipelines to invoke the construction-time > planner implemented in Java) > I think cross-language data channels can be avoided as long as steps (constructed by the planner) can be fused by a runner without any interleaving Python steps, irrespective of the data format. When ever a cross-language boundary is met, we'll have to use a coder that is compatible across languages (avro, arrow, proto, etc). Hopefully Beam schemas will make this simpler by introducing compatible Row types in each language. > > - Is it valuable for Beam to invent its own schemas? I'd love for Beam to > have identical schema affordances to either protobuf or arrow or avro, with > everything layered on that as logical types (including SQL types). What > would it look like if Beam schemas were more-or-less Arrow schemas? > > - For the event timestamp issue, there are two levels of abstraction I > could imagine improvements: > - at the model layer (aka portability protos) we could make Beam > columnar batch aware. That's a huge change and would need a massive > justification IMO in the form of performance numbers. > - at the SDK layer, some language might make it pretty easy to > overload the "GroupByKey" transform to understand that for elements that > are really batches there are multiple timestamps contained within so it may > need to window & group differently. The model doesn't need to know in this > case. > > Kenn > > On Wed, Mar 27, 2019 at 4:42 PM Ahmet Altay <[email protected]> wrote: > >> Thank you Brian, this looks promising. >> >> cc: +Chamikara Jayalath <[email protected]> +Heejong Lee >> <[email protected]> >> >> On Wed, Mar 27, 2019 at 1:22 PM Brian Hulette <[email protected]> >> wrote: >> >>> Hi everyone, >>> I've been doing some investigations into how Arrow might fit into Beam >>> as a way to ramp up on the project. As I've gone about this I've prototyped >>> a couple of additions to the Python SDK. I think these additions may be >>> useful for others so I'm considering cleaning them up and submitting PRs, >>> but I wanted to have a discussion here to see if it makes sense first. >>> >>> Note that the approach I'm using for this work right now is very naive. >>> I've just built pipelines where individual elements are actually arrow >>> record batches (or pandas DataFrames). This is really only acceptable for >>> bounded pipelines without windowing, since it's impossible to define a >>> single event time for each element. That being said, I think these tools >>> could still be useful for people who want to run batch pipelines using >>> parquet, arrow, and pandas. >>> >> >> I agree these will be generally useful. >> >> >>> >>> Here's what I've implemented so far: >>> # An option for io.ReadFromParquet to yield arrow record batches instead >>> of individual elements >>> Currently the python SDK's parquet reader uses pyarrow.parquet to read >>> parquet row groups into arrow record batches, and then splits the batches >>> into a single dictionary per row [1]. I've added a flag to optionally >>> short-circuit this and just yield the arrow record batches directly, making >>> it easier for me to build pipelines that process columnar batches. If I >>> were to contribute this change I could also split out the record batch <-> >>> dictionary conversions as separate transforms, since they could be >>> generally useful as well. >>> >> >> I think splitting to new transforms rather that adding new options to >> existing IO transforms would be simpler for users. I think this would be a >> question that could be easier to answer with a PR. >> > +1. I think it's better to introduce a new transform as well. It'll be confusing to have an option that changes the type of PCollection returned for the same transform. > >> >>> # Custom coders for Arrow Tables and Record Batches >>> I found that the default coder (pickle/dill?) slowed down my arrow >>> pipelines, particularly in the case where a record batch had been sliced >>> into smaller record batches (presumably because the entire original batch >>> is getting serialized for each slice). I put together some coders that >>> encode arrow tables and record batches with arrow's IPC formats, which >>> improves performance substantially. >>> >> >> How did you measure this? It would be good for us to also have relevant >> micro benchmarks here. >> >> >>> >>> Would it make sense to add these things to the Python SDK? Or would they >>> fit better in a separate library of utilities for building pipelines with >>> batched data? >>> >> >> +1 for adding to Beam. Python SDK has a list of utility transforms >> (e.g. BatchElements), new additions could also live in that space. >> >> >>> >>> Brian >>> >>> [1] >>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/parquetio.py#L239 >>> >>
