First off, huge +1 to a good integration with Arrow and Beam. I think to fully realize the benefits we need to have deeper integration than arrow-frame-batches as elements, i.e. SDKs should be augmented to understand arrow frames as batches of individual elements, each with (possibly) their own timestamps and windows, correctly updating element counts, and allowing user operations to operate on batches rather than just elementwise. (IMHO this, along with a(n optional) more pandas-like API for manipulating PCollections, is actually one of the critical missing pieces in Python.) Some thought needs to go into how to automatically handle batching and progress and liquid sharding in this case.
For the most part, I don't think this impacts the model, though of course we'd want to support using the arrow format to send batches of elements across the FnAPI barriers. There don't seem to be any Java libraries (yet) that have the widespread use or maturity of Pandas, but that may come in the future. Certainly it makes sense for the Beam primitives used in SQL (such as projection, filtering, possibly even simple expression-based computations) to have language-agnostic representations which could be implemented in any SDK (and possibly even a runner) to maximize fusion and minimize data transfer. Also, I agree that support for large iterables makes a separate Beam schema desirable. That being said, we shouldn't unnecessarily diverge, and could possibly share implementations as well (for increased interoperability with the larger ecosystems). On Fri, Mar 29, 2019 at 5:48 AM Kenneth Knowles <[email protected]> wrote: > > On Thu, Mar 28, 2019 at 12:24 PM Brian Hulette <[email protected]> wrote: >> >> > - Presumably there is a pandas counterpart in Java. Is there? Do you know? >> I think there are some dataframe libraries in Java we could look into. I'm >> not aware of anything that has the same popularity and arrow integration as >> pandas though. Within the arrow project there is Gandiva [1], which has Java >> bindings. It generates optimized LLVM code for processing arrow data based >> on an expression tree. I think that could be a valuable tool for SQL. > > > Gandiva looks to be similar to what we get from Calcite today, but I wonder > if it is higher performance due to being lower level or more flexible (for > example Calcite's codegen is pretty hardcoded to millisecond precision > datetimes). Worth learning about. Since another big benefit of Calcite's > expression compiler is implementation of "all" builtin functions for free, > I'd look closely at how to provide a builtin function catalog to Gandiva. > >> > - Is it valuable for Beam to invent its own schemas? I'd love for Beam to >> > have identical schema affordances to either protobuf or arrow or avro, >> > with everything layered on that as logical types (including SQL types). >> > What would it look like if Beam schemas were more-or-less Arrow schemas? >> As it stands right now there is a very clear mapping from Beam schemas to >> Arrow schemas. Both define similar primitive types, as well as nested types >> like row (beam) -> struct (arrow), array (beam) -> list (arrow). In addition >> Arrow schemas have a binary representation and implementations in many >> languages. >> >> I had some offline discussion with Reuven about this - and he pointed out >> that eventually we'd like Beam schemas to have a type for large iterables as >> well, so that even a PCollection<KV<K,Iterable<V>>> can have a schema, and >> that's certainly a concept that wouldn't make sense for Arrow. So I think >> the answer is yes it is valuable for Beam to have its own schemas - that way >> we can represent Beam-only concepts, but still be able to map to other >> schemas when it makes sense (For example in the KV<K, Iterable<V>> case we >> could map V's beam schema to an arrow schema and encode it as arrow record >> batches). > > This convinces me that Beam should have its own schema definition. There are > things in Beam - and could be novelties created in Beam - that might not fit > Arrow. And we don't want to have such a tight coupling. If the mapping is > straightforward enough then there's not that much work to just convert > to/from. But the piece I would think about it is that any change to Beam or > Arrow could introduce something that doesn't translate well, so we just need > to be cognizant of that. > > Kenn > >> >> >> Brian >> >> [1] http://arrow.apache.org/blog/2018/12/05/gandiva-donation/ >> >> On Wed, Mar 27, 2019 at 9:19 PM Kenneth Knowles <[email protected]> wrote: >>> >>> Thinking about Arrow + Beam SQL + schemas: >>> >>> - Obviously many SQL operations could be usefully accelerated by arrow / >>> columnar. Especially in the analytical realm this is the new normal. For >>> ETL, perhaps less so. >>> >>> - Beam SQL planner (pipeline construction) is implemented in Java, and so >>> the various DoFns/CombineFns that implement projection, filter, etc, are >>> also in Java. >>> - Arrow is of course available in Java. >>> - Presumably there is a pandas counterpart in Java. Is there? Do you >>> know? >>> - But perhaps if these building blocks emitted by the planner had >>> well-defined URNs we could use SIMD+columnar Java or Python implementation >>> opportunistically to avoid cross-language data channels. (thinking ahead to >>> when cross-language allows Python pipelines to invoke the construction-time >>> planner implemented in Java) >>> >>> - Is it valuable for Beam to invent its own schemas? I'd love for Beam to >>> have identical schema affordances to either protobuf or arrow or avro, with >>> everything layered on that as logical types (including SQL types). What >>> would it look like if Beam schemas were more-or-less Arrow schemas? >>> >>> - For the event timestamp issue, there are two levels of abstraction I >>> could imagine improvements: >>> - at the model layer (aka portability protos) we could make Beam >>> columnar batch aware. That's a huge change and would need a massive >>> justification IMO in the form of performance numbers. >>> - at the SDK layer, some language might make it pretty easy to overload >>> the "GroupByKey" transform to understand that for elements that are really >>> batches there are multiple timestamps contained within so it may need to >>> window & group differently. The model doesn't need to know in this case. >>> >>> Kenn >>> >>> On Wed, Mar 27, 2019 at 4:42 PM Ahmet Altay <[email protected]> wrote: >>>> >>>> Thank you Brian, this looks promising. >>>> >>>> cc: +Chamikara Jayalath +Heejong Lee >>>> >>>> On Wed, Mar 27, 2019 at 1:22 PM Brian Hulette <[email protected]> wrote: >>>>> >>>>> Hi everyone, >>>>> I've been doing some investigations into how Arrow might fit into Beam as >>>>> a way to ramp up on the project. As I've gone about this I've prototyped >>>>> a couple of additions to the Python SDK. I think these additions may be >>>>> useful for others so I'm considering cleaning them up and submitting PRs, >>>>> but I wanted to have a discussion here to see if it makes sense first. >>>>> >>>>> Note that the approach I'm using for this work right now is very naive. >>>>> I've just built pipelines where individual elements are actually arrow >>>>> record batches (or pandas DataFrames). This is really only acceptable for >>>>> bounded pipelines without windowing, since it's impossible to define a >>>>> single event time for each element. That being said, I think these tools >>>>> could still be useful for people who want to run batch pipelines using >>>>> parquet, arrow, and pandas. >>>> >>>> >>>> I agree these will be generally useful. >>>> >>>>> >>>>> >>>>> Here's what I've implemented so far: >>>>> # An option for io.ReadFromParquet to yield arrow record batches instead >>>>> of individual elements >>>>> Currently the python SDK's parquet reader uses pyarrow.parquet to read >>>>> parquet row groups into arrow record batches, and then splits the batches >>>>> into a single dictionary per row [1]. I've added a flag to optionally >>>>> short-circuit this and just yield the arrow record batches directly, >>>>> making it easier for me to build pipelines that process columnar batches. >>>>> If I were to contribute this change I could also split out the record >>>>> batch <-> dictionary conversions as separate transforms, since they could >>>>> be generally useful as well. >>>> >>>> >>>> I think splitting to new transforms rather that adding new options to >>>> existing IO transforms would be simpler for users. I think this would be >>>> a question that could be easier to answer with a PR. >>>> >>>>> >>>>> # Custom coders for Arrow Tables and Record Batches >>>>> I found that the default coder (pickle/dill?) slowed down my arrow >>>>> pipelines, particularly in the case where a record batch had been sliced >>>>> into smaller record batches (presumably because the entire original batch >>>>> is getting serialized for each slice). I put together some coders that >>>>> encode arrow tables and record batches with arrow's IPC formats, which >>>>> improves performance substantially. >>>> >>>> >>>> How did you measure this? It would be good for us to also have relevant >>>> micro benchmarks here. >>>> >>>>> >>>>> >>>>> Would it make sense to add these things to the Python SDK? Or would they >>>>> fit better in a separate library of utilities for building pipelines with >>>>> batched data? >>>> >>>> >>>> +1 for adding to Beam. Python SDK has a list of utility transforms (e.g. >>>> BatchElements), new additions could also live in that space. >>>> >>>>> >>>>> >>>>> Brian >>>>> >>>>> [1] >>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/parquetio.py#L239
