On Wed, Mar 27, 2019 at 9:19 PM Kenneth Knowles <[email protected]> wrote:

> Thinking about Arrow + Beam SQL + schemas:
>
>  - Obviously many SQL operations could be usefully accelerated by arrow /
> columnar. Especially in the analytical realm this is the new normal. For
> ETL, perhaps less so.
>
>  - Beam SQL planner (pipeline construction) is implemented in Java, and so
> the various DoFns/CombineFns that implement projection, filter, etc, are
> also in Java.
>     - Arrow is of course available in Java.
>     - Presumably there is a pandas counterpart in Java. Is there? Do you
> know?
>     - But perhaps if these building blocks emitted by the planner had
> well-defined URNs we could use SIMD+columnar Java or Python implementation
> opportunistically to avoid cross-language data channels. (thinking ahead to
> when cross-language allows Python pipelines to invoke the construction-time
> planner implemented in Java)
>

I think cross-language data channels can be avoided as long as steps
(constructed by the planner) can be fused by a runner without any
interleaving Python steps, irrespective of the data format. When ever a
cross-language boundary is met, we'll have to use a coder that is
compatible across languages (avro, arrow, proto, etc). Hopefully Beam
schemas will make this simpler by introducing compatible Row types in each
language.


>
>  - Is it valuable for Beam to invent its own schemas? I'd love for Beam to
> have identical schema affordances to either protobuf or arrow or avro, with
> everything layered on that as logical types (including SQL types). What
> would it look like if Beam schemas were more-or-less Arrow schemas?
>
>  - For the event timestamp issue, there are two levels of abstraction I
> could imagine improvements:
>     - at the model layer (aka portability protos) we could make Beam
> columnar batch aware. That's a huge change and would need a massive
> justification IMO in the form of performance numbers.
>     - at the SDK layer, some language might make it pretty easy to
> overload the "GroupByKey" transform to understand that for elements that
> are really batches there are multiple timestamps contained within so it may
> need to window & group differently. The model doesn't need to know in this
> case.
>
> Kenn
>
> On Wed, Mar 27, 2019 at 4:42 PM Ahmet Altay <[email protected]> wrote:
>
>> Thank you Brian, this looks promising.
>>
>> cc: +Chamikara Jayalath <[email protected]> +Heejong Lee
>> <[email protected]>
>>
>> On Wed, Mar 27, 2019 at 1:22 PM Brian Hulette <[email protected]>
>> wrote:
>>
>>> Hi everyone,
>>> I've been doing some investigations into how Arrow might fit into Beam
>>> as a way to ramp up on the project. As I've gone about this I've prototyped
>>> a couple of additions to the Python SDK. I think these additions may be
>>> useful for others so I'm considering cleaning them up and submitting PRs,
>>> but I wanted to have a discussion here to see if it makes sense first.
>>>
>>> Note that the approach I'm using for this work right now is very naive.
>>> I've just built pipelines where individual elements are actually arrow
>>> record batches (or pandas DataFrames). This is really only acceptable for
>>> bounded pipelines without windowing, since it's impossible to define a
>>> single event time for each element. That being said, I think these tools
>>> could still be useful for people who want to run batch pipelines using
>>> parquet, arrow, and pandas.
>>>
>>
>> I agree these will be generally useful.
>>
>>
>>>
>>> Here's what I've implemented so far:
>>> # An option for io.ReadFromParquet to yield arrow record batches instead
>>> of individual elements
>>> Currently the python SDK's parquet reader uses pyarrow.parquet to read
>>> parquet row groups into arrow record batches, and then splits the batches
>>> into a single dictionary per row [1]. I've added a flag to optionally
>>> short-circuit this and just yield the arrow record batches directly, making
>>> it easier for me to build pipelines that process columnar batches. If I
>>> were to contribute this change I could also split out the record batch <->
>>> dictionary conversions as separate transforms, since they could be
>>> generally useful as well.
>>>
>>
>> I think splitting to new transforms rather that adding new options to
>> existing IO transforms would be simpler for users.  I think this would be a
>> question that could be easier to answer with a PR.
>>
>
+1. I think it's better to introduce a new transform as well. It'll be
confusing to have an option that changes the type of PCollection returned
for the same transform.


>
>>
>>> # Custom coders for Arrow Tables and Record Batches
>>> I found that the default coder (pickle/dill?) slowed down my arrow
>>> pipelines, particularly in the case where a record batch had been sliced
>>> into smaller record batches (presumably because the entire original batch
>>> is getting serialized for each slice). I put together some coders that
>>> encode arrow tables and record batches with arrow's IPC formats, which
>>> improves performance substantially.
>>>
>>
>> How did you measure this? It would be good for us to also have relevant
>> micro benchmarks here.
>>
>>
>>>
>>> Would it make sense to add these things to the Python SDK? Or would they
>>> fit better in a separate library of utilities for building pipelines with
>>> batched data?
>>>
>>
>> +1 for adding to Beam. Python SDK has a list of utility transforms
>> (e.g. BatchElements), new additions could also live in that space.
>>
>>
>>>
>>> Brian
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/parquetio.py#L239
>>>
>>

Reply via email to