Recommendations on reading/writing avro to hdfs

2017-02-28 Thread Michael Luckey
Hi all,

we are currently using beam over spark, reading and writing avro files to
hdfs.

Until now we use HDFSFileSource for reading and HadoopIO for writing,
essentially reading and writing PCollection

With the changes introduced by
https://issues.apache.org/jira/browse/BEAM-1497 this seems to be not
directly supported anymore by beam, as the required AvroWrapperCoder is
deleted.

So as we have to change our code anyway, we are wondering, what would be
the recommended approach to read/write avro files from/to hdfs with beam on
spark.

- use the new implementation of HDFSFileSource/HDFSFileSink
- use spark provided HadoopIO (and probably reimplement AvroWrapperCoder by
ourself?)

What ware the trade offs here, possibly also considering already planned
changes on IO? Do we have advantages using the spark HadoopIO as our
underlying engine is currently spark, or will this eventually be deprecated
and exists only for ‘historical’ reasons?

Any thoughts and advice here?

Regards,

michel


Re: New blog post: "Stateful processing with Apache Beam"

2017-02-28 Thread Robert Bradshaw
The direct runner blows up memory usage as well.

Currently, the size limitations and performance characteristics of BagState
are a property of the runner. I don't know if this kind of thing can or
should be lifted to the model itself, though it might make sense to provide
expectations and recommendations to be a "performant" runner.

I think the comment about regressing is specific to the Dataflow runner, in
that it might get better, but we won't make it worse.

On Tue, Feb 28, 2017 at 1:32 PM, Beau Fabry  wrote:

> Could I get some clarification on "But the property of being able to
> exceed memory won't be regressed, whatever strategy is adopted.". It seems
> like both Dataflow batch mode, and Flink all modes currently could blow
> memory usage if you attempt to read a large BagState, so should I take that
> to mean that a design that assumes a BagState can be arbitrarily large is
> inappropriate?
>
> On Tue, Feb 28, 2017 at 1:21 PM Kenneth Knowles  wrote:
>
>> Yes, currently the Dataflow runner does read BagState lazily in streaming
>> mode. In batch mode it uses in-memory state at the moment.
>>
>> But I want to re-emphasize that the APIs are designed to allow many
>> possible implementations and styles of optimization, so it can (and likely
>> will) change over time. But the property of being able to exceed memory
>> won't be regressed, whatever strategy is adopted.
>>
>> Kenn
>>
>> On Tue, Feb 28, 2017 at 12:39 PM, Aljoscha Krettek 
>> wrote:
>>
>> Just to confirm: yes, the Flink Runner does (currently) not read lazily.
>>
>> On Tue, 28 Feb 2017 at 21:26 Beau Fabry  wrote:
>>
>> Ok, this might be out of the scope of the mailing list, but do you know
>> if the dataflow runner specifically reads bagstate lazily? Our read of the
>> flink runner code is that it does not.
>>
>> On Tue., 28 Feb. 2017, 10:27 am Robert Bradshaw, 
>> wrote:
>>
>> Yes, the intent is that runners can (should?) support streaming of
>> BagState reads. Of course reading a large state every time could still be
>> expensive... (One of the key features of BagState is that it supports blind
>> writes, i.e. you can append cheaply without reading).
>>
>>
>> On Tue, Feb 28, 2017 at 10:08 AM, Beau Fabry  wrote:
>>
>> Awesome, thanks for the answers. I have one more: Is the Iterable
>> returned by BagState backed by some sort of lazy-loading from disk, like
>> the Iterable result in a GroupByKey, as mentioned here
>> http://stackoverflow.com/questions/34136919/cogbkresult-has-more-than-
>> 1-elements-reiteration-which-may-be-slow-is-requ/34140487#34140487 ?
>>
>> We're joining some infrequently changing small amount of data (per-key,
>> but still far too big globally to fit in memory as a sideinput) to a very
>> large frequently updated stream and trying to figure out how to reduce our
>> redundant outputs.
>>
>> Cheers,
>> Beau
>>
>> On Tue, Feb 21, 2017 at 8:04 PM Kenneth Knowles  wrote:
>>
>> Hi Beau,
>>
>> I've answered inline.
>>
>> On Tue, Feb 21, 2017 at 5:09 PM, Beau Fabry  wrote:
>>
>> In times when communicating between outputs would be an optimisation, and
>> reducing inputs in parallel would be an optimisation, how do you make the
>> call between using a stateful dofn or a Combine.perKey?
>>
>>
>> (Standard optimization caveat: you'd make the call by measuring them both)
>>
>> Usually you will be forced to make this decision not based on
>> optimization, but based on whether your use case can be expressed naturally
>> as one or the other.
>>
>> The key difference is just the one I talk about in the blog post, the
>> associative `merge` operation. This enables nice optimizations so if your
>> use case fits nicely as a CombineFn then you should probably use it. There
>> are exceptions, like fine-grained access to just a piece of your
>> accumulator state, but the most common way to combine things is with
>> Combine :-)
>>
>> You can always follow a Combine.perKey with a stateful ParDo that
>> communicates between outputs, if that also presents a major savings.
>>
>> Is there a case to be made that CombineFn could also get access to state
>> within extractOutput? It seems like the only benefit to a CombineFn now is
>> that the merge and addInput steps can run on multiple workers, is there a
>> rule of thumb to know if we have enough data per key that that is
>> significant?
>>
>>
>> This is a big benefit a lot of the time, especially in large batch
>> executions. Without this optimization opportunity, a Combine.perKey would
>> execute by first doing a GroupByKey - shuffling all of your data - and then
>> applying your CombineFn to each KV. But instead, this can
>> be optimized so that the combine happens before the shuffle, which can
>> reduce the shuffled data to a single accumulator per key. This optimization
>> is general enough that my description applies 

Re: New blog post: "Stateful processing with Apache Beam"

2017-02-28 Thread Beau Fabry
Could I get some clarification on "But the property of being able to exceed
memory won't be regressed, whatever strategy is adopted.". It seems like
both Dataflow batch mode, and Flink all modes currently could blow memory
usage if you attempt to read a large BagState, so should I take that to
mean that a design that assumes a BagState can be arbitrarily large is
inappropriate?

On Tue, Feb 28, 2017 at 1:21 PM Kenneth Knowles  wrote:

> Yes, currently the Dataflow runner does read BagState lazily in streaming
> mode. In batch mode it uses in-memory state at the moment.
>
> But I want to re-emphasize that the APIs are designed to allow many
> possible implementations and styles of optimization, so it can (and likely
> will) change over time. But the property of being able to exceed memory
> won't be regressed, whatever strategy is adopted.
>
> Kenn
>
> On Tue, Feb 28, 2017 at 12:39 PM, Aljoscha Krettek 
> wrote:
>
> Just to confirm: yes, the Flink Runner does (currently) not read lazily.
>
> On Tue, 28 Feb 2017 at 21:26 Beau Fabry  wrote:
>
> Ok, this might be out of the scope of the mailing list, but do you know if
> the dataflow runner specifically reads bagstate lazily? Our read of the
> flink runner code is that it does not.
>
> On Tue., 28 Feb. 2017, 10:27 am Robert Bradshaw, 
> wrote:
>
> Yes, the intent is that runners can (should?) support streaming of
> BagState reads. Of course reading a large state every time could still be
> expensive... (One of the key features of BagState is that it supports blind
> writes, i.e. you can append cheaply without reading).
>
>
> On Tue, Feb 28, 2017 at 10:08 AM, Beau Fabry  wrote:
>
> Awesome, thanks for the answers. I have one more: Is the Iterable returned
> by BagState backed by some sort of lazy-loading from disk, like the
> Iterable result in a GroupByKey, as mentioned here
> http://stackoverflow.com/questions/34136919/cogbkresult-has-more-than-1-elements-reiteration-which-may-be-slow-is-requ/34140487#34140487
>  ?
>
> We're joining some infrequently changing small amount of data (per-key,
> but still far too big globally to fit in memory as a sideinput) to a very
> large frequently updated stream and trying to figure out how to reduce our
> redundant outputs.
>
> Cheers,
> Beau
>
> On Tue, Feb 21, 2017 at 8:04 PM Kenneth Knowles  wrote:
>
> Hi Beau,
>
> I've answered inline.
>
> On Tue, Feb 21, 2017 at 5:09 PM, Beau Fabry  wrote:
>
> In times when communicating between outputs would be an optimisation, and
> reducing inputs in parallel would be an optimisation, how do you make the
> call between using a stateful dofn or a Combine.perKey?
>
>
> (Standard optimization caveat: you'd make the call by measuring them both)
>
> Usually you will be forced to make this decision not based on
> optimization, but based on whether your use case can be expressed naturally
> as one or the other.
>
> The key difference is just the one I talk about in the blog post, the
> associative `merge` operation. This enables nice optimizations so if your
> use case fits nicely as a CombineFn then you should probably use it. There
> are exceptions, like fine-grained access to just a piece of your
> accumulator state, but the most common way to combine things is with
> Combine :-)
>
> You can always follow a Combine.perKey with a stateful ParDo that
> communicates between outputs, if that also presents a major savings.
>
> Is there a case to be made that CombineFn could also get access to state
> within extractOutput? It seems like the only benefit to a CombineFn now is
> that the merge and addInput steps can run on multiple workers, is there a
> rule of thumb to know if we have enough data per key that that is
> significant?
>
>
> This is a big benefit a lot of the time, especially in large batch
> executions. Without this optimization opportunity, a Combine.perKey would
> execute by first doing a GroupByKey - shuffling all of your data - and then
> applying your CombineFn to each KV. But instead, this can
> be optimized so that the combine happens before the shuffle, which can
> reduce the shuffled data to a single accumulator per key. This optimization
> is general enough that my description applies to all runners, though it has
> lesser benefits in streaming executions where there is not as much data per
> key+window+triggering.
>
> Kenn
>
>
>
> Cheers,
> Beau
>
> On Wed, Feb 15, 2017 at 8:53 AM Ismaël Mejía  wrote:
>
> Great post, I like the use of the previous figure style with geometric
> forms and colors, as well as the table analogy that really helps to
> understand the concepts. I am still digesting some of the consequences of
> the State API, in particular the implications of using state that you
> mention at the end. Really good to discuss those also as part of the post.
>
> I found some small typos and 

Re: New blog post: "Stateful processing with Apache Beam"

2017-02-28 Thread Kenneth Knowles
Yes, currently the Dataflow runner does read BagState lazily in streaming
mode. In batch mode it uses in-memory state at the moment.

But I want to re-emphasize that the APIs are designed to allow many
possible implementations and styles of optimization, so it can (and likely
will) change over time. But the property of being able to exceed memory
won't be regressed, whatever strategy is adopted.

Kenn

On Tue, Feb 28, 2017 at 12:39 PM, Aljoscha Krettek 
wrote:

> Just to confirm: yes, the Flink Runner does (currently) not read lazily.
>
> On Tue, 28 Feb 2017 at 21:26 Beau Fabry  wrote:
>
>> Ok, this might be out of the scope of the mailing list, but do you know
>> if the dataflow runner specifically reads bagstate lazily? Our read of the
>> flink runner code is that it does not.
>>
>> On Tue., 28 Feb. 2017, 10:27 am Robert Bradshaw, 
>> wrote:
>>
>> Yes, the intent is that runners can (should?) support streaming of
>> BagState reads. Of course reading a large state every time could still be
>> expensive... (One of the key features of BagState is that it supports blind
>> writes, i.e. you can append cheaply without reading).
>>
>>
>> On Tue, Feb 28, 2017 at 10:08 AM, Beau Fabry  wrote:
>>
>> Awesome, thanks for the answers. I have one more: Is the Iterable
>> returned by BagState backed by some sort of lazy-loading from disk, like
>> the Iterable result in a GroupByKey, as mentioned here
>> http://stackoverflow.com/questions/34136919/cogbkresult-has-more-than-
>> 1-elements-reiteration-which-may-be-slow-is-requ/34140487#34140487 ?
>>
>> We're joining some infrequently changing small amount of data (per-key,
>> but still far too big globally to fit in memory as a sideinput) to a very
>> large frequently updated stream and trying to figure out how to reduce our
>> redundant outputs.
>>
>> Cheers,
>> Beau
>>
>> On Tue, Feb 21, 2017 at 8:04 PM Kenneth Knowles  wrote:
>>
>> Hi Beau,
>>
>> I've answered inline.
>>
>> On Tue, Feb 21, 2017 at 5:09 PM, Beau Fabry  wrote:
>>
>> In times when communicating between outputs would be an optimisation, and
>> reducing inputs in parallel would be an optimisation, how do you make the
>> call between using a stateful dofn or a Combine.perKey?
>>
>>
>> (Standard optimization caveat: you'd make the call by measuring them both)
>>
>> Usually you will be forced to make this decision not based on
>> optimization, but based on whether your use case can be expressed naturally
>> as one or the other.
>>
>> The key difference is just the one I talk about in the blog post, the
>> associative `merge` operation. This enables nice optimizations so if your
>> use case fits nicely as a CombineFn then you should probably use it. There
>> are exceptions, like fine-grained access to just a piece of your
>> accumulator state, but the most common way to combine things is with
>> Combine :-)
>>
>> You can always follow a Combine.perKey with a stateful ParDo that
>> communicates between outputs, if that also presents a major savings.
>>
>> Is there a case to be made that CombineFn could also get access to state
>> within extractOutput? It seems like the only benefit to a CombineFn now is
>> that the merge and addInput steps can run on multiple workers, is there a
>> rule of thumb to know if we have enough data per key that that is
>> significant?
>>
>>
>> This is a big benefit a lot of the time, especially in large batch
>> executions. Without this optimization opportunity, a Combine.perKey would
>> execute by first doing a GroupByKey - shuffling all of your data - and then
>> applying your CombineFn to each KV. But instead, this can
>> be optimized so that the combine happens before the shuffle, which can
>> reduce the shuffled data to a single accumulator per key. This optimization
>> is general enough that my description applies to all runners, though it has
>> lesser benefits in streaming executions where there is not as much data per
>> key+window+triggering.
>>
>> Kenn
>>
>>
>>
>> Cheers,
>> Beau
>>
>> On Wed, Feb 15, 2017 at 8:53 AM Ismaël Mejía  wrote:
>>
>> Great post, I like the use of the previous figure style with geometric
>> forms and colors, as well as the table analogy that really helps to
>> understand the concepts. I am still digesting some of the consequences of
>> the State API, in particular the implications of using state that you
>> mention at the end. Really good to discuss those also as part of the post.
>>
>> I found some small typos and formatting issues that I addressed here.
>> https://github.com/apache/beam-site/pull/156
>>
>> Thanks for writing,
>> Ismaël
>>
>>
>> On Tue, Feb 14, 2017 at 11:50 AM, Jean-Baptiste Onofré 
>> wrote:
>>
>> Hey Ken
>>
>> Just take a quick look and it's a great post !
>>
>> Thanks
>> Regards
>> JB
>> On Feb 13, 2017, at 18:44, Kenneth Knowles  wrote:

Re: New blog post: "Stateful processing with Apache Beam"

2017-02-28 Thread Aljoscha Krettek
Just to confirm: yes, the Flink Runner does (currently) not read lazily.

On Tue, 28 Feb 2017 at 21:26 Beau Fabry  wrote:

> Ok, this might be out of the scope of the mailing list, but do you know if
> the dataflow runner specifically reads bagstate lazily? Our read of the
> flink runner code is that it does not.
>
> On Tue., 28 Feb. 2017, 10:27 am Robert Bradshaw, 
> wrote:
>
> Yes, the intent is that runners can (should?) support streaming of
> BagState reads. Of course reading a large state every time could still be
> expensive... (One of the key features of BagState is that it supports blind
> writes, i.e. you can append cheaply without reading).
>
>
> On Tue, Feb 28, 2017 at 10:08 AM, Beau Fabry  wrote:
>
> Awesome, thanks for the answers. I have one more: Is the Iterable returned
> by BagState backed by some sort of lazy-loading from disk, like the
> Iterable result in a GroupByKey, as mentioned here
> http://stackoverflow.com/questions/34136919/cogbkresult-has-more-than-1-elements-reiteration-which-may-be-slow-is-requ/34140487#34140487
>  ?
>
> We're joining some infrequently changing small amount of data (per-key,
> but still far too big globally to fit in memory as a sideinput) to a very
> large frequently updated stream and trying to figure out how to reduce our
> redundant outputs.
>
> Cheers,
> Beau
>
> On Tue, Feb 21, 2017 at 8:04 PM Kenneth Knowles  wrote:
>
> Hi Beau,
>
> I've answered inline.
>
> On Tue, Feb 21, 2017 at 5:09 PM, Beau Fabry  wrote:
>
> In times when communicating between outputs would be an optimisation, and
> reducing inputs in parallel would be an optimisation, how do you make the
> call between using a stateful dofn or a Combine.perKey?
>
>
> (Standard optimization caveat: you'd make the call by measuring them both)
>
> Usually you will be forced to make this decision not based on
> optimization, but based on whether your use case can be expressed naturally
> as one or the other.
>
> The key difference is just the one I talk about in the blog post, the
> associative `merge` operation. This enables nice optimizations so if your
> use case fits nicely as a CombineFn then you should probably use it. There
> are exceptions, like fine-grained access to just a piece of your
> accumulator state, but the most common way to combine things is with
> Combine :-)
>
> You can always follow a Combine.perKey with a stateful ParDo that
> communicates between outputs, if that also presents a major savings.
>
> Is there a case to be made that CombineFn could also get access to state
> within extractOutput? It seems like the only benefit to a CombineFn now is
> that the merge and addInput steps can run on multiple workers, is there a
> rule of thumb to know if we have enough data per key that that is
> significant?
>
>
> This is a big benefit a lot of the time, especially in large batch
> executions. Without this optimization opportunity, a Combine.perKey would
> execute by first doing a GroupByKey - shuffling all of your data - and then
> applying your CombineFn to each KV. But instead, this can
> be optimized so that the combine happens before the shuffle, which can
> reduce the shuffled data to a single accumulator per key. This optimization
> is general enough that my description applies to all runners, though it has
> lesser benefits in streaming executions where there is not as much data per
> key+window+triggering.
>
> Kenn
>
>
>
> Cheers,
> Beau
>
> On Wed, Feb 15, 2017 at 8:53 AM Ismaël Mejía  wrote:
>
> Great post, I like the use of the previous figure style with geometric
> forms and colors, as well as the table analogy that really helps to
> understand the concepts. I am still digesting some of the consequences of
> the State API, in particular the implications of using state that you
> mention at the end. Really good to discuss those also as part of the post.
>
> I found some small typos and formatting issues that I addressed here.
> https://github.com/apache/beam-site/pull/156
>
> Thanks for writing,
> Ismaël
>
>
> On Tue, Feb 14, 2017 at 11:50 AM, Jean-Baptiste Onofré 
> wrote:
>
> Hey Ken
>
> Just take a quick look and it's a great post !
>
> Thanks
> Regards
> JB
> On Feb 13, 2017, at 18:44, Kenneth Knowles  wrote:
>
> Hi all,
>
> I've just published a blog post about Beam's new stateful processing
> capabilities:
>
> https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>
> The blog post covers stateful processing from a few angles: how it works,
> how it fits into the Beam model, what you might use it for, and finally
> some examples of stateful Beam code.
>
> I'd love for you to take a look and see how this feature might apply to
> your use of Beam. And, of course, I'd also love to hear from you about it.
>
> Kenn
>
>
>
>