On Thu, Mar 28, 2019 at 12:24 PM Brian Hulette <[email protected]> wrote:
> > - Presumably there is a pandas counterpart in Java. Is there? Do you > know? > I think there are some dataframe libraries in Java we could look into. I'm > not aware of anything that has the same popularity and arrow integration as > pandas though. Within the arrow project there is Gandiva [1], which has > Java bindings. It generates optimized LLVM code for processing arrow data > based on an expression tree. I think that could be a valuable tool for SQL. > Gandiva looks to be similar to what we get from Calcite today, but I wonder if it is higher performance due to being lower level or more flexible (for example Calcite's codegen is pretty hardcoded to millisecond precision datetimes). Worth learning about. Since another big benefit of Calcite's expression compiler is implementation of "all" builtin functions for free, I'd look closely at how to provide a builtin function catalog to Gandiva. > - Is it valuable for Beam to invent its own schemas? I'd love for Beam to > have identical schema affordances to either protobuf or arrow or avro, with > everything layered on that as logical types (including SQL types). What > would it look like if Beam schemas were more-or-less Arrow schemas? > As it stands right now there is a very clear mapping from Beam schemas to > Arrow schemas. Both define similar primitive types, as well as nested types > like row (beam) -> struct (arrow), array (beam) -> list (arrow). In > addition Arrow schemas have a binary representation and implementations in > many languages. > > I had some offline discussion with Reuven about this - and he pointed out > that eventually we'd like Beam schemas to have a type for large iterables > as well, so that even a PCollection<KV<K,Iterable<V>>> can have a schema, > and that's certainly a concept that wouldn't make sense for Arrow. So I > think the answer is yes it is valuable for Beam to have its own schemas - > that way we can represent Beam-only concepts, but still be able to map to > other schemas when it makes sense (For example in the KV<K, Iterable<V>> > case we could map V's beam schema to an arrow schema and encode it as arrow > record batches). > This convinces me that Beam should have its own schema definition. There are things in Beam - and could be novelties created in Beam - that might not fit Arrow. And we don't want to have such a tight coupling. If the mapping is straightforward enough then there's not that much work to just convert to/from. But the piece I would think about it is that any change to Beam or Arrow could introduce something that doesn't translate well, so we just need to be cognizant of that. Kenn > > Brian > > [1] http://arrow.apache.org/blog/2018/12/05/gandiva-donation/ > > On Wed, Mar 27, 2019 at 9:19 PM Kenneth Knowles <[email protected]> wrote: > >> Thinking about Arrow + Beam SQL + schemas: >> >> - Obviously many SQL operations could be usefully accelerated by arrow / >> columnar. Especially in the analytical realm this is the new normal. For >> ETL, perhaps less so. >> >> - Beam SQL planner (pipeline construction) is implemented in Java, and >> so the various DoFns/CombineFns that implement projection, filter, etc, are >> also in Java. >> - Arrow is of course available in Java. >> - Presumably there is a pandas counterpart in Java. Is there? Do you >> know? >> - But perhaps if these building blocks emitted by the planner had >> well-defined URNs we could use SIMD+columnar Java or Python implementation >> opportunistically to avoid cross-language data channels. (thinking ahead to >> when cross-language allows Python pipelines to invoke the construction-time >> planner implemented in Java) >> >> - Is it valuable for Beam to invent its own schemas? I'd love for Beam >> to have identical schema affordances to either protobuf or arrow or avro, >> with everything layered on that as logical types (including SQL types). >> What would it look like if Beam schemas were more-or-less Arrow schemas? >> >> - For the event timestamp issue, there are two levels of abstraction I >> could imagine improvements: >> - at the model layer (aka portability protos) we could make Beam >> columnar batch aware. That's a huge change and would need a massive >> justification IMO in the form of performance numbers. >> - at the SDK layer, some language might make it pretty easy to >> overload the "GroupByKey" transform to understand that for elements that >> are really batches there are multiple timestamps contained within so it may >> need to window & group differently. The model doesn't need to know in this >> case. >> >> Kenn >> >> On Wed, Mar 27, 2019 at 4:42 PM Ahmet Altay <[email protected]> wrote: >> >>> Thank you Brian, this looks promising. >>> >>> cc: +Chamikara Jayalath <[email protected]> +Heejong Lee >>> <[email protected]> >>> >>> On Wed, Mar 27, 2019 at 1:22 PM Brian Hulette <[email protected]> >>> wrote: >>> >>>> Hi everyone, >>>> I've been doing some investigations into how Arrow might fit into Beam >>>> as a way to ramp up on the project. As I've gone about this I've prototyped >>>> a couple of additions to the Python SDK. I think these additions may be >>>> useful for others so I'm considering cleaning them up and submitting PRs, >>>> but I wanted to have a discussion here to see if it makes sense first. >>>> >>>> Note that the approach I'm using for this work right now is very naive. >>>> I've just built pipelines where individual elements are actually arrow >>>> record batches (or pandas DataFrames). This is really only acceptable for >>>> bounded pipelines without windowing, since it's impossible to define a >>>> single event time for each element. That being said, I think these tools >>>> could still be useful for people who want to run batch pipelines using >>>> parquet, arrow, and pandas. >>>> >>> >>> I agree these will be generally useful. >>> >>> >>>> >>>> Here's what I've implemented so far: >>>> # An option for io.ReadFromParquet to yield arrow record batches >>>> instead of individual elements >>>> Currently the python SDK's parquet reader uses pyarrow.parquet to read >>>> parquet row groups into arrow record batches, and then splits the batches >>>> into a single dictionary per row [1]. I've added a flag to optionally >>>> short-circuit this and just yield the arrow record batches directly, making >>>> it easier for me to build pipelines that process columnar batches. If I >>>> were to contribute this change I could also split out the record batch <-> >>>> dictionary conversions as separate transforms, since they could be >>>> generally useful as well. >>>> >>> >>> I think splitting to new transforms rather that adding new options to >>> existing IO transforms would be simpler for users. I think this would be a >>> question that could be easier to answer with a PR. >>> >>> >>>> # Custom coders for Arrow Tables and Record Batches >>>> I found that the default coder (pickle/dill?) slowed down my arrow >>>> pipelines, particularly in the case where a record batch had been sliced >>>> into smaller record batches (presumably because the entire original batch >>>> is getting serialized for each slice). I put together some coders that >>>> encode arrow tables and record batches with arrow's IPC formats, which >>>> improves performance substantially. >>>> >>> >>> How did you measure this? It would be good for us to also have relevant >>> micro benchmarks here. >>> >>> >>>> >>>> Would it make sense to add these things to the Python SDK? Or would >>>> they fit better in a separate library of utilities for building pipelines >>>> with batched data? >>>> >>> >>> +1 for adding to Beam. Python SDK has a list of utility transforms >>> (e.g. BatchElements), new additions could also live in that space. >>> >>> >>>> >>>> Brian >>>> >>>> [1] >>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/parquetio.py#L239 >>>> >>>
