Re: Performance issue in Beam 2.4 onwards

2018-07-17 Thread Jean-Baptiste Onofré
Hi,

I'm on it, still investigating/digging.

Regards
JB

On 17/07/2018 09:44, Ismaël Mejía wrote:
> Given that the 2.6.0 cut is supposed to be today (or next days), what
> is the status on this, has it been identified / reverted ? or is there
> any other plan ?
> On Tue, Jul 10, 2018 at 2:50 PM Reuven Lax  wrote:
>>
>> If we added something slow to the core library in order to better test 
>> DirectRunner, that does sound like an unfortunate bug.
>>
>> On Mon, Jul 9, 2018 at 11:21 PM Vojtech Janota  
>> wrote:
>>>
>>> Hi guys,
>>>
>>> Thank you for all of your feedback. I have created relevant issue in JIRA: 
>>> https://issues.apache.org/jira/browse/BEAM-4750
>>>
>>> @Lukasz: me mentioning the DirectRunner was somewhat unfortunate - the 
>>> bottleneck was introduced into the core library and so Flink and Spark 
>>> runners would be impacted too
>>>
>>> Thanks,
>>> Vojta
>>>
>>> On Mon, Jul 9, 2018 at 5:48 PM, Lukasz Cwik  wrote:

 Instead of reverting/working around specific checks/tests that the 
 DirectRunner is doing, have you considered using one of the other runners 
 like Flink or Spark with a local execution cluster. You won't hit the 
 validation/verification bottlenecks that DirectRunner specifically imposes.

 On Mon, Jul 9, 2018 at 8:46 AM Jean-Baptiste Onofré  
 wrote:
>
> Thanks for the update Eugene.
>
> @Vojta: do you mind to create a Jira ? I will tackle a fix for that.
>
> Regards
> JB
>
> On 09/07/2018 17:33, Eugene Kirpichov wrote:
>> Hi -
>>
>> If I remember correctly, the reason for this change was to ensure that
>> the state is encodable at all. Prior to the change, there had been
>> situations where the coder specified on a state cell is buggy, absent or
>> set incorrectly (due to some issue in coder inference), but direct
>> runner did not detect this because it never tried to encode the state
>> cells - this would have blown up in any distributed runner.
>>
>> I think it should be possible to relax this and clone only values being
>> added to the state, rather than cloning the whole state on copy(). I
>> don't have time to work on this change myself, but I can review a PR if
>> someone else does.
>>
>> On Mon, Jul 9, 2018 at 8:28 AM Jean-Baptiste Onofré > > wrote:
>>
>> Hi Vojta,
>>
>> I fully agree, that's why it makes sense to wait Eugene's feedback.
>>
>> I remember we had some performance regression on the direct runner
>> identified thanks to Nexmark, but it has been addressed by reverting 
>> a
>> change.
>>
>> Good catch anyway !
>>
>> Regards
>> JB
>>
>> On 09/07/2018 17:20, Vojtech Janota wrote:
>> > Hi Reuven,
>> >
>> > I'm not really complaining about DirectRunner. In fact it seems to
>> me as
>> > if what previously was considered as part of the "expensive extra
>> > checks" done by the DirectRunner is now done within the
>> > beam-runners-core-java library. Considering that all objects 
>> involved
>> > are immutable (in our case at least) and simple assignment is
>> > sufficient, the serialization-deserialization really seems as 
>> unwanted
>> > and hugely expensive correctness check. If there was a problem with
>> > identity copy, wasn't DirectRunner supposed to reveal it?
>> >
>> > Regards,
>> > Vojta
>> >
>> > On Mon, Jul 9, 2018 at 4:46 PM, Reuven Lax > 
>> > >> wrote:
>> >
>> > Hi Vojita,
>> >
>> > One problem is that the DirectRunner is designed for testing, 
>> not
>> > for performance. The DirectRunner currently does many
>> > purposely-inefficient things, the point of which is to better
>> expose
>> > potential bugs in tests. For example, the DirectRunner will
>> randomly
>> > shuffle the order of PCollections to ensure that your code
>> does not
>> > rely on ordering.  All of this adds cost, because the current
>> runner
>> > is designed for testing. There have been requests in the past
>> for an
>> > "optimized" local runner, however we don't currently have such
>> a thing.
>> >
>> > In this case, using coders to clone values is more correct. In 
>> a
>> > distributed environment using encode/decode is the only way to
>> copy
>> > values, and the DirectRunner is trying to ensure that your 
>> code is
>> > correct in a distributed environment.
>> >
>> > Reuven
>> >
>> > On Mon, Jul 9, 2018 at 

Re: Performance issue in Beam 2.4 onwards

2018-07-17 Thread Ismaël Mejía
Given that the 2.6.0 cut is supposed to be today (or next days), what
is the status on this, has it been identified / reverted ? or is there
any other plan ?
On Tue, Jul 10, 2018 at 2:50 PM Reuven Lax  wrote:
>
> If we added something slow to the core library in order to better test 
> DirectRunner, that does sound like an unfortunate bug.
>
> On Mon, Jul 9, 2018 at 11:21 PM Vojtech Janota  wrote:
>>
>> Hi guys,
>>
>> Thank you for all of your feedback. I have created relevant issue in JIRA: 
>> https://issues.apache.org/jira/browse/BEAM-4750
>>
>> @Lukasz: me mentioning the DirectRunner was somewhat unfortunate - the 
>> bottleneck was introduced into the core library and so Flink and Spark 
>> runners would be impacted too
>>
>> Thanks,
>> Vojta
>>
>> On Mon, Jul 9, 2018 at 5:48 PM, Lukasz Cwik  wrote:
>>>
>>> Instead of reverting/working around specific checks/tests that the 
>>> DirectRunner is doing, have you considered using one of the other runners 
>>> like Flink or Spark with a local execution cluster. You won't hit the 
>>> validation/verification bottlenecks that DirectRunner specifically imposes.
>>>
>>> On Mon, Jul 9, 2018 at 8:46 AM Jean-Baptiste Onofré  
>>> wrote:

 Thanks for the update Eugene.

 @Vojta: do you mind to create a Jira ? I will tackle a fix for that.

 Regards
 JB

 On 09/07/2018 17:33, Eugene Kirpichov wrote:
 > Hi -
 >
 > If I remember correctly, the reason for this change was to ensure that
 > the state is encodable at all. Prior to the change, there had been
 > situations where the coder specified on a state cell is buggy, absent or
 > set incorrectly (due to some issue in coder inference), but direct
 > runner did not detect this because it never tried to encode the state
 > cells - this would have blown up in any distributed runner.
 >
 > I think it should be possible to relax this and clone only values being
 > added to the state, rather than cloning the whole state on copy(). I
 > don't have time to work on this change myself, but I can review a PR if
 > someone else does.
 >
 > On Mon, Jul 9, 2018 at 8:28 AM Jean-Baptiste Onofré >>> > > wrote:
 >
 > Hi Vojta,
 >
 > I fully agree, that's why it makes sense to wait Eugene's feedback.
 >
 > I remember we had some performance regression on the direct runner
 > identified thanks to Nexmark, but it has been addressed by reverting 
 > a
 > change.
 >
 > Good catch anyway !
 >
 > Regards
 > JB
 >
 > On 09/07/2018 17:20, Vojtech Janota wrote:
 > > Hi Reuven,
 > >
 > > I'm not really complaining about DirectRunner. In fact it seems to
 > me as
 > > if what previously was considered as part of the "expensive extra
 > > checks" done by the DirectRunner is now done within the
 > > beam-runners-core-java library. Considering that all objects 
 > involved
 > > are immutable (in our case at least) and simple assignment is
 > > sufficient, the serialization-deserialization really seems as 
 > unwanted
 > > and hugely expensive correctness check. If there was a problem with
 > > identity copy, wasn't DirectRunner supposed to reveal it?
 > >
 > > Regards,
 > > Vojta
 > >
 > > On Mon, Jul 9, 2018 at 4:46 PM, Reuven Lax >>> > 
 > > >> wrote:
 > >
 > > Hi Vojita,
 > >
 > > One problem is that the DirectRunner is designed for testing, 
 > not
 > > for performance. The DirectRunner currently does many
 > > purposely-inefficient things, the point of which is to better
 > expose
 > > potential bugs in tests. For example, the DirectRunner will
 > randomly
 > > shuffle the order of PCollections to ensure that your code
 > does not
 > > rely on ordering.  All of this adds cost, because the current
 > runner
 > > is designed for testing. There have been requests in the past
 > for an
 > > "optimized" local runner, however we don't currently have such
 > a thing.
 > >
 > > In this case, using coders to clone values is more correct. In 
 > a
 > > distributed environment using encode/decode is the only way to
 > copy
 > > values, and the DirectRunner is trying to ensure that your 
 > code is
 > > correct in a distributed environment.
 > >
 > > Reuven
 > >
 > > On Mon, Jul 9, 2018 at 7:22 AM Vojtech Janota
 > > mailto:vojta.jan...@gmail.com>
 > >> 

Re: Performance issue in Beam 2.4 onwards

2018-07-10 Thread Reuven Lax
If we added something slow to the core library in order to better test
DirectRunner, that does sound like an unfortunate bug.

On Mon, Jul 9, 2018 at 11:21 PM Vojtech Janota 
wrote:

> Hi guys,
>
> Thank you for all of your feedback. I have created relevant issue in JIRA:
> https://issues.apache.org/jira/browse/BEAM-4750
>
> @Lukasz: me mentioning the DirectRunner was somewhat unfortunate - the
> bottleneck was introduced into the core library and so Flink and Spark
> runners would be impacted too
>
> Thanks,
> Vojta
>
> On Mon, Jul 9, 2018 at 5:48 PM, Lukasz Cwik  wrote:
>
>> Instead of reverting/working around specific checks/tests that the
>> DirectRunner is doing, have you considered using one of the other runners
>> like Flink or Spark with a local execution cluster. You won't hit the
>> validation/verification bottlenecks that DirectRunner specifically imposes.
>>
>> On Mon, Jul 9, 2018 at 8:46 AM Jean-Baptiste Onofré 
>> wrote:
>>
>>> Thanks for the update Eugene.
>>>
>>> @Vojta: do you mind to create a Jira ? I will tackle a fix for that.
>>>
>>> Regards
>>> JB
>>>
>>> On 09/07/2018 17:33, Eugene Kirpichov wrote:
>>> > Hi -
>>> >
>>> > If I remember correctly, the reason for this change was to ensure that
>>> > the state is encodable at all. Prior to the change, there had been
>>> > situations where the coder specified on a state cell is buggy, absent
>>> or
>>> > set incorrectly (due to some issue in coder inference), but direct
>>> > runner did not detect this because it never tried to encode the state
>>> > cells - this would have blown up in any distributed runner.
>>> >
>>> > I think it should be possible to relax this and clone only values being
>>> > added to the state, rather than cloning the whole state on copy(). I
>>> > don't have time to work on this change myself, but I can review a PR if
>>> > someone else does.
>>> >
>>> > On Mon, Jul 9, 2018 at 8:28 AM Jean-Baptiste Onofré >> > > wrote:
>>> >
>>> > Hi Vojta,
>>> >
>>> > I fully agree, that's why it makes sense to wait Eugene's feedback.
>>> >
>>> > I remember we had some performance regression on the direct runner
>>> > identified thanks to Nexmark, but it has been addressed by
>>> reverting a
>>> > change.
>>> >
>>> > Good catch anyway !
>>> >
>>> > Regards
>>> > JB
>>> >
>>> > On 09/07/2018 17:20, Vojtech Janota wrote:
>>> > > Hi Reuven,
>>> > >
>>> > > I'm not really complaining about DirectRunner. In fact it seems
>>> to
>>> > me as
>>> > > if what previously was considered as part of the "expensive extra
>>> > > checks" done by the DirectRunner is now done within the
>>> > > beam-runners-core-java library. Considering that all objects
>>> involved
>>> > > are immutable (in our case at least) and simple assignment is
>>> > > sufficient, the serialization-deserialization really seems as
>>> unwanted
>>> > > and hugely expensive correctness check. If there was a problem
>>> with
>>> > > identity copy, wasn't DirectRunner supposed to reveal it?
>>> > >
>>> > > Regards,
>>> > > Vojta
>>> > >
>>> > > On Mon, Jul 9, 2018 at 4:46 PM, Reuven Lax >> > 
>>> > > >> wrote:
>>> > >
>>> > > Hi Vojita,
>>> > >
>>> > > One problem is that the DirectRunner is designed for
>>> testing, not
>>> > > for performance. The DirectRunner currently does many
>>> > > purposely-inefficient things, the point of which is to better
>>> > expose
>>> > > potential bugs in tests. For example, the DirectRunner will
>>> > randomly
>>> > > shuffle the order of PCollections to ensure that your code
>>> > does not
>>> > > rely on ordering.  All of this adds cost, because the current
>>> > runner
>>> > > is designed for testing. There have been requests in the past
>>> > for an
>>> > > "optimized" local runner, however we don't currently have
>>> such
>>> > a thing.
>>> > >
>>> > > In this case, using coders to clone values is more correct.
>>> In a
>>> > > distributed environment using encode/decode is the only way
>>> to
>>> > copy
>>> > > values, and the DirectRunner is trying to ensure that your
>>> code is
>>> > > correct in a distributed environment.
>>> > >
>>> > > Reuven
>>> > >
>>> > > On Mon, Jul 9, 2018 at 7:22 AM Vojtech Janota
>>> > > mailto:vojta.jan...@gmail.com>
>>> > >>
>>> wrote:
>>> > >
>>> > > Hi,
>>> > >
>>> > > We are using Apache Beam in our project for some time
>>> now.
>>> > Since
>>> > > our datasets are of modest size, we have so far used
>>> > > DirectRunner as the computation easily fits onto a single
>>> > > machine. 

Re: Performance issue in Beam 2.4 onwards

2018-07-10 Thread Vojtech Janota
Hi guys,

Thank you for all of your feedback. I have created relevant issue in JIRA:
https://issues.apache.org/jira/browse/BEAM-4750

@Lukasz: me mentioning the DirectRunner was somewhat unfortunate - the
bottleneck was introduced into the core library and so Flink and Spark
runners would be impacted too

Thanks,
Vojta

On Mon, Jul 9, 2018 at 5:48 PM, Lukasz Cwik  wrote:

> Instead of reverting/working around specific checks/tests that the
> DirectRunner is doing, have you considered using one of the other runners
> like Flink or Spark with a local execution cluster. You won't hit the
> validation/verification bottlenecks that DirectRunner specifically imposes.
>
> On Mon, Jul 9, 2018 at 8:46 AM Jean-Baptiste Onofré 
> wrote:
>
>> Thanks for the update Eugene.
>>
>> @Vojta: do you mind to create a Jira ? I will tackle a fix for that.
>>
>> Regards
>> JB
>>
>> On 09/07/2018 17:33, Eugene Kirpichov wrote:
>> > Hi -
>> >
>> > If I remember correctly, the reason for this change was to ensure that
>> > the state is encodable at all. Prior to the change, there had been
>> > situations where the coder specified on a state cell is buggy, absent or
>> > set incorrectly (due to some issue in coder inference), but direct
>> > runner did not detect this because it never tried to encode the state
>> > cells - this would have blown up in any distributed runner.
>> >
>> > I think it should be possible to relax this and clone only values being
>> > added to the state, rather than cloning the whole state on copy(). I
>> > don't have time to work on this change myself, but I can review a PR if
>> > someone else does.
>> >
>> > On Mon, Jul 9, 2018 at 8:28 AM Jean-Baptiste Onofré > > > wrote:
>> >
>> > Hi Vojta,
>> >
>> > I fully agree, that's why it makes sense to wait Eugene's feedback.
>> >
>> > I remember we had some performance regression on the direct runner
>> > identified thanks to Nexmark, but it has been addressed by
>> reverting a
>> > change.
>> >
>> > Good catch anyway !
>> >
>> > Regards
>> > JB
>> >
>> > On 09/07/2018 17:20, Vojtech Janota wrote:
>> > > Hi Reuven,
>> > >
>> > > I'm not really complaining about DirectRunner. In fact it seems to
>> > me as
>> > > if what previously was considered as part of the "expensive extra
>> > > checks" done by the DirectRunner is now done within the
>> > > beam-runners-core-java library. Considering that all objects
>> involved
>> > > are immutable (in our case at least) and simple assignment is
>> > > sufficient, the serialization-deserialization really seems as
>> unwanted
>> > > and hugely expensive correctness check. If there was a problem
>> with
>> > > identity copy, wasn't DirectRunner supposed to reveal it?
>> > >
>> > > Regards,
>> > > Vojta
>> > >
>> > > On Mon, Jul 9, 2018 at 4:46 PM, Reuven Lax > > 
>> > > >> wrote:
>> > >
>> > > Hi Vojita,
>> > >
>> > > One problem is that the DirectRunner is designed for testing,
>> not
>> > > for performance. The DirectRunner currently does many
>> > > purposely-inefficient things, the point of which is to better
>> > expose
>> > > potential bugs in tests. For example, the DirectRunner will
>> > randomly
>> > > shuffle the order of PCollections to ensure that your code
>> > does not
>> > > rely on ordering.  All of this adds cost, because the current
>> > runner
>> > > is designed for testing. There have been requests in the past
>> > for an
>> > > "optimized" local runner, however we don't currently have such
>> > a thing.
>> > >
>> > > In this case, using coders to clone values is more correct.
>> In a
>> > > distributed environment using encode/decode is the only way to
>> > copy
>> > > values, and the DirectRunner is trying to ensure that your
>> code is
>> > > correct in a distributed environment.
>> > >
>> > > Reuven
>> > >
>> > > On Mon, Jul 9, 2018 at 7:22 AM Vojtech Janota
>> > > mailto:vojta.jan...@gmail.com>
>> > >>
>> wrote:
>> > >
>> > > Hi,
>> > >
>> > > We are using Apache Beam in our project for some time now.
>> > Since
>> > > our datasets are of modest size, we have so far used
>> > > DirectRunner as the computation easily fits onto a single
>> > > machine. Recently we upgraded Beam from 2.2 to 2.4 and
>> > found out
>> > > that performance of our pipelines drastically
>> deteriorated.
>> > > Pipelines that took ~3 minutes with 2.2 do not finish
>> within
>> > > hours now. We tried to isolate the change that causes the
>> > > slowdown and 

Re: Performance issue in Beam 2.4 onwards

2018-07-09 Thread Lukasz Cwik
Instead of reverting/working around specific checks/tests that the
DirectRunner is doing, have you considered using one of the other runners
like Flink or Spark with a local execution cluster. You won't hit the
validation/verification bottlenecks that DirectRunner specifically imposes.

On Mon, Jul 9, 2018 at 8:46 AM Jean-Baptiste Onofré  wrote:

> Thanks for the update Eugene.
>
> @Vojta: do you mind to create a Jira ? I will tackle a fix for that.
>
> Regards
> JB
>
> On 09/07/2018 17:33, Eugene Kirpichov wrote:
> > Hi -
> >
> > If I remember correctly, the reason for this change was to ensure that
> > the state is encodable at all. Prior to the change, there had been
> > situations where the coder specified on a state cell is buggy, absent or
> > set incorrectly (due to some issue in coder inference), but direct
> > runner did not detect this because it never tried to encode the state
> > cells - this would have blown up in any distributed runner.
> >
> > I think it should be possible to relax this and clone only values being
> > added to the state, rather than cloning the whole state on copy(). I
> > don't have time to work on this change myself, but I can review a PR if
> > someone else does.
> >
> > On Mon, Jul 9, 2018 at 8:28 AM Jean-Baptiste Onofré  > > wrote:
> >
> > Hi Vojta,
> >
> > I fully agree, that's why it makes sense to wait Eugene's feedback.
> >
> > I remember we had some performance regression on the direct runner
> > identified thanks to Nexmark, but it has been addressed by reverting
> a
> > change.
> >
> > Good catch anyway !
> >
> > Regards
> > JB
> >
> > On 09/07/2018 17:20, Vojtech Janota wrote:
> > > Hi Reuven,
> > >
> > > I'm not really complaining about DirectRunner. In fact it seems to
> > me as
> > > if what previously was considered as part of the "expensive extra
> > > checks" done by the DirectRunner is now done within the
> > > beam-runners-core-java library. Considering that all objects
> involved
> > > are immutable (in our case at least) and simple assignment is
> > > sufficient, the serialization-deserialization really seems as
> unwanted
> > > and hugely expensive correctness check. If there was a problem with
> > > identity copy, wasn't DirectRunner supposed to reveal it?
> > >
> > > Regards,
> > > Vojta
> > >
> > > On Mon, Jul 9, 2018 at 4:46 PM, Reuven Lax  > 
> > > >> wrote:
> > >
> > > Hi Vojita,
> > >
> > > One problem is that the DirectRunner is designed for testing,
> not
> > > for performance. The DirectRunner currently does many
> > > purposely-inefficient things, the point of which is to better
> > expose
> > > potential bugs in tests. For example, the DirectRunner will
> > randomly
> > > shuffle the order of PCollections to ensure that your code
> > does not
> > > rely on ordering.  All of this adds cost, because the current
> > runner
> > > is designed for testing. There have been requests in the past
> > for an
> > > "optimized" local runner, however we don't currently have such
> > a thing.
> > >
> > > In this case, using coders to clone values is more correct. In
> a
> > > distributed environment using encode/decode is the only way to
> > copy
> > > values, and the DirectRunner is trying to ensure that your
> code is
> > > correct in a distributed environment.
> > >
> > > Reuven
> > >
> > > On Mon, Jul 9, 2018 at 7:22 AM Vojtech Janota
> > > mailto:vojta.jan...@gmail.com>
> > >>
> wrote:
> > >
> > > Hi,
> > >
> > > We are using Apache Beam in our project for some time now.
> > Since
> > > our datasets are of modest size, we have so far used
> > > DirectRunner as the computation easily fits onto a single
> > > machine. Recently we upgraded Beam from 2.2 to 2.4 and
> > found out
> > > that performance of our pipelines drastically deteriorated.
> > > Pipelines that took ~3 minutes with 2.2 do not finish
> within
> > > hours now. We tried to isolate the change that causes the
> > > slowdown and came to the commits into the
> > > "InMemoryStateInternals" class:
> > >
> > > * https://github.com/apache/beam/commit/32a427c
> > > 
> > > * https://github.com/apache/beam/commit/8151d82
> > > 
> > >
> > > In a nutshell where previously the copy() method simply
> > assigned:
> > >
> > >   that.value = 

Re: Performance issue in Beam 2.4 onwards

2018-07-09 Thread Jean-Baptiste Onofré
Thanks for the update Eugene.

@Vojta: do you mind to create a Jira ? I will tackle a fix for that.

Regards
JB

On 09/07/2018 17:33, Eugene Kirpichov wrote:
> Hi -
> 
> If I remember correctly, the reason for this change was to ensure that
> the state is encodable at all. Prior to the change, there had been
> situations where the coder specified on a state cell is buggy, absent or
> set incorrectly (due to some issue in coder inference), but direct
> runner did not detect this because it never tried to encode the state
> cells - this would have blown up in any distributed runner.
> 
> I think it should be possible to relax this and clone only values being
> added to the state, rather than cloning the whole state on copy(). I
> don't have time to work on this change myself, but I can review a PR if
> someone else does.
> 
> On Mon, Jul 9, 2018 at 8:28 AM Jean-Baptiste Onofré  > wrote:
> 
> Hi Vojta,
> 
> I fully agree, that's why it makes sense to wait Eugene's feedback.
> 
> I remember we had some performance regression on the direct runner
> identified thanks to Nexmark, but it has been addressed by reverting a
> change.
> 
> Good catch anyway !
> 
> Regards
> JB
> 
> On 09/07/2018 17:20, Vojtech Janota wrote:
> > Hi Reuven,
> >
> > I'm not really complaining about DirectRunner. In fact it seems to
> me as
> > if what previously was considered as part of the "expensive extra
> > checks" done by the DirectRunner is now done within the
> > beam-runners-core-java library. Considering that all objects involved
> > are immutable (in our case at least) and simple assignment is
> > sufficient, the serialization-deserialization really seems as unwanted
> > and hugely expensive correctness check. If there was a problem with
> > identity copy, wasn't DirectRunner supposed to reveal it? 
> >
> > Regards,
> > Vojta
> >
> > On Mon, Jul 9, 2018 at 4:46 PM, Reuven Lax  
> > >> wrote:
> >
> >     Hi Vojita,
> >
> >     One problem is that the DirectRunner is designed for testing, not
> >     for performance. The DirectRunner currently does many
> >     purposely-inefficient things, the point of which is to better
> expose
> >     potential bugs in tests. For example, the DirectRunner will
> randomly
> >     shuffle the order of PCollections to ensure that your code
> does not
> >     rely on ordering.  All of this adds cost, because the current
> runner
> >     is designed for testing. There have been requests in the past
> for an
> >     "optimized" local runner, however we don't currently have such
> a thing.
> >
> >     In this case, using coders to clone values is more correct. In a
> >     distributed environment using encode/decode is the only way to
> copy
> >     values, and the DirectRunner is trying to ensure that your code is
> >     correct in a distributed environment.
> >
> >     Reuven
> >
> >     On Mon, Jul 9, 2018 at 7:22 AM Vojtech Janota
> >     mailto:vojta.jan...@gmail.com>
> >> wrote:
> >
> >         Hi,
> >
> >         We are using Apache Beam in our project for some time now.
> Since
> >         our datasets are of modest size, we have so far used
> >         DirectRunner as the computation easily fits onto a single
> >         machine. Recently we upgraded Beam from 2.2 to 2.4 and
> found out
> >         that performance of our pipelines drastically deteriorated.
> >         Pipelines that took ~3 minutes with 2.2 do not finish within
> >         hours now. We tried to isolate the change that causes the
> >         slowdown and came to the commits into the
> >         "InMemoryStateInternals" class:
> >
> >         * https://github.com/apache/beam/commit/32a427c
> >         
> >         * https://github.com/apache/beam/commit/8151d82
> >         
> >
> >         In a nutshell where previously the copy() method simply
> assigned:
> >
> >           that.value = this.value
> >
> >         There is now coder encode/decode combo hidden behind:
> >
> >           that.value = uncheckedClone(coder, this.value)
> >
> >         Can somebody explain the purpose of this change? Is it
> meant as
> >         an additional "enforcement" point, similar to DirectRunner's
> >         enforceImmutability and enforceEncodability? Or is it
> something
> >         that is genuinely needed to provide correct behaviour of the
> >         pipeline?
> >
> >         Any hints or thoughts are 

Re: Performance issue in Beam 2.4 onwards

2018-07-09 Thread Eugene Kirpichov
Hi -

If I remember correctly, the reason for this change was to ensure that the
state is encodable at all. Prior to the change, there had been situations
where the coder specified on a state cell is buggy, absent or set
incorrectly (due to some issue in coder inference), but direct runner did
not detect this because it never tried to encode the state cells - this
would have blown up in any distributed runner.

I think it should be possible to relax this and clone only values being
added to the state, rather than cloning the whole state on copy(). I don't
have time to work on this change myself, but I can review a PR if someone
else does.

On Mon, Jul 9, 2018 at 8:28 AM Jean-Baptiste Onofré  wrote:

> Hi Vojta,
>
> I fully agree, that's why it makes sense to wait Eugene's feedback.
>
> I remember we had some performance regression on the direct runner
> identified thanks to Nexmark, but it has been addressed by reverting a
> change.
>
> Good catch anyway !
>
> Regards
> JB
>
> On 09/07/2018 17:20, Vojtech Janota wrote:
> > Hi Reuven,
> >
> > I'm not really complaining about DirectRunner. In fact it seems to me as
> > if what previously was considered as part of the "expensive extra
> > checks" done by the DirectRunner is now done within the
> > beam-runners-core-java library. Considering that all objects involved
> > are immutable (in our case at least) and simple assignment is
> > sufficient, the serialization-deserialization really seems as unwanted
> > and hugely expensive correctness check. If there was a problem with
> > identity copy, wasn't DirectRunner supposed to reveal it?
> >
> > Regards,
> > Vojta
> >
> > On Mon, Jul 9, 2018 at 4:46 PM, Reuven Lax  > > wrote:
> >
> > Hi Vojita,
> >
> > One problem is that the DirectRunner is designed for testing, not
> > for performance. The DirectRunner currently does many
> > purposely-inefficient things, the point of which is to better expose
> > potential bugs in tests. For example, the DirectRunner will randomly
> > shuffle the order of PCollections to ensure that your code does not
> > rely on ordering.  All of this adds cost, because the current runner
> > is designed for testing. There have been requests in the past for an
> > "optimized" local runner, however we don't currently have such a
> thing.
> >
> > In this case, using coders to clone values is more correct. In a
> > distributed environment using encode/decode is the only way to copy
> > values, and the DirectRunner is trying to ensure that your code is
> > correct in a distributed environment.
> >
> > Reuven
> >
> > On Mon, Jul 9, 2018 at 7:22 AM Vojtech Janota
> > mailto:vojta.jan...@gmail.com>> wrote:
> >
> > Hi,
> >
> > We are using Apache Beam in our project for some time now. Since
> > our datasets are of modest size, we have so far used
> > DirectRunner as the computation easily fits onto a single
> > machine. Recently we upgraded Beam from 2.2 to 2.4 and found out
> > that performance of our pipelines drastically deteriorated.
> > Pipelines that took ~3 minutes with 2.2 do not finish within
> > hours now. We tried to isolate the change that causes the
> > slowdown and came to the commits into the
> > "InMemoryStateInternals" class:
> >
> > * https://github.com/apache/beam/commit/32a427c
> > 
> > * https://github.com/apache/beam/commit/8151d82
> > 
> >
> > In a nutshell where previously the copy() method simply assigned:
> >
> >   that.value = this.value
> >
> > There is now coder encode/decode combo hidden behind:
> >
> >   that.value = uncheckedClone(coder, this.value)
> >
> > Can somebody explain the purpose of this change? Is it meant as
> > an additional "enforcement" point, similar to DirectRunner's
> > enforceImmutability and enforceEncodability? Or is it something
> > that is genuinely needed to provide correct behaviour of the
> > pipeline?
> >
> > Any hints or thoughts are appreciated.
> >
> > Regards,
> > Vojta
> >
> >
> >
> >
> >
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: Performance issue in Beam 2.4 onwards

2018-07-09 Thread Jean-Baptiste Onofré
Hi Vojta,

I fully agree, that's why it makes sense to wait Eugene's feedback.

I remember we had some performance regression on the direct runner
identified thanks to Nexmark, but it has been addressed by reverting a
change.

Good catch anyway !

Regards
JB

On 09/07/2018 17:20, Vojtech Janota wrote:
> Hi Reuven,
> 
> I'm not really complaining about DirectRunner. In fact it seems to me as
> if what previously was considered as part of the "expensive extra
> checks" done by the DirectRunner is now done within the
> beam-runners-core-java library. Considering that all objects involved
> are immutable (in our case at least) and simple assignment is
> sufficient, the serialization-deserialization really seems as unwanted
> and hugely expensive correctness check. If there was a problem with
> identity copy, wasn't DirectRunner supposed to reveal it? 
> 
> Regards,
> Vojta
> 
> On Mon, Jul 9, 2018 at 4:46 PM, Reuven Lax  > wrote:
> 
> Hi Vojita,
> 
> One problem is that the DirectRunner is designed for testing, not
> for performance. The DirectRunner currently does many
> purposely-inefficient things, the point of which is to better expose
> potential bugs in tests. For example, the DirectRunner will randomly
> shuffle the order of PCollections to ensure that your code does not
> rely on ordering.  All of this adds cost, because the current runner
> is designed for testing. There have been requests in the past for an
> "optimized" local runner, however we don't currently have such a thing.
> 
> In this case, using coders to clone values is more correct. In a
> distributed environment using encode/decode is the only way to copy
> values, and the DirectRunner is trying to ensure that your code is
> correct in a distributed environment.
> 
> Reuven
> 
> On Mon, Jul 9, 2018 at 7:22 AM Vojtech Janota
> mailto:vojta.jan...@gmail.com>> wrote:
> 
> Hi,
> 
> We are using Apache Beam in our project for some time now. Since
> our datasets are of modest size, we have so far used
> DirectRunner as the computation easily fits onto a single
> machine. Recently we upgraded Beam from 2.2 to 2.4 and found out
> that performance of our pipelines drastically deteriorated.
> Pipelines that took ~3 minutes with 2.2 do not finish within
> hours now. We tried to isolate the change that causes the
> slowdown and came to the commits into the
> "InMemoryStateInternals" class:
> 
> * https://github.com/apache/beam/commit/32a427c
> 
> * https://github.com/apache/beam/commit/8151d82
> 
> 
> In a nutshell where previously the copy() method simply assigned:
> 
>   that.value = this.value
> 
> There is now coder encode/decode combo hidden behind:
> 
>   that.value = uncheckedClone(coder, this.value)
> 
> Can somebody explain the purpose of this change? Is it meant as
> an additional "enforcement" point, similar to DirectRunner's
> enforceImmutability and enforceEncodability? Or is it something
> that is genuinely needed to provide correct behaviour of the
> pipeline?
> 
> Any hints or thoughts are appreciated.
> 
> Regards,
> Vojta
> 
>  
> 
> 
> 
> 

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: Performance issue in Beam 2.4 onwards

2018-07-09 Thread Vojtech Janota
Hi Reuven,

I'm not really complaining about DirectRunner. In fact it seems to me as if
what previously was considered as part of the "expensive extra checks" done
by the DirectRunner is now done within the beam-runners-core-java library.
Considering that all objects involved are immutable (in our case at least)
and simple assignment is sufficient, the serialization-deserialization
really seems as unwanted and hugely expensive correctness check. If there
was a problem with identity copy, wasn't DirectRunner supposed to reveal
it?

Regards,
Vojta

On Mon, Jul 9, 2018 at 4:46 PM, Reuven Lax  wrote:

> Hi Vojita,
>
> One problem is that the DirectRunner is designed for testing, not for
> performance. The DirectRunner currently does many purposely-inefficient
> things, the point of which is to better expose potential bugs in tests. For
> example, the DirectRunner will randomly shuffle the order of PCollections
> to ensure that your code does not rely on ordering.  All of this adds cost,
> because the current runner is designed for testing. There have been
> requests in the past for an "optimized" local runner, however we don't
> currently have such a thing.
>
> In this case, using coders to clone values is more correct. In a
> distributed environment using encode/decode is the only way to copy values,
> and the DirectRunner is trying to ensure that your code is correct in a
> distributed environment.
>
> Reuven
>
> On Mon, Jul 9, 2018 at 7:22 AM Vojtech Janota 
> wrote:
>
>> Hi,
>>
>> We are using Apache Beam in our project for some time now. Since our
>> datasets are of modest size, we have so far used DirectRunner as the
>> computation easily fits onto a single machine. Recently we upgraded Beam
>> from 2.2 to 2.4 and found out that performance of our pipelines drastically
>> deteriorated. Pipelines that took ~3 minutes with 2.2 do not finish within
>> hours now. We tried to isolate the change that causes the slowdown and came
>> to the commits into the "InMemoryStateInternals" class:
>>
>> * https://github.com/apache/beam/commit/32a427c
>> * https://github.com/apache/beam/commit/8151d82
>>
>> In a nutshell where previously the copy() method simply assigned:
>>
>>   that.value = this.value
>>
>> There is now coder encode/decode combo hidden behind:
>>
>>   that.value = uncheckedClone(coder, this.value)
>>
>> Can somebody explain the purpose of this change? Is it meant as an
>> additional "enforcement" point, similar to DirectRunner's
>> enforceImmutability and enforceEncodability? Or is it something that is
>> genuinely needed to provide correct behaviour of the pipeline?
>>
>> Any hints or thoughts are appreciated.
>>
>> Regards,
>> Vojta
>>
>>
>>
>>
>>
>>


Re: Performance issue in Beam 2.4 onwards

2018-07-09 Thread Reuven Lax
Hi Vojita,

One problem is that the DirectRunner is designed for testing, not for
performance. The DirectRunner currently does many purposely-inefficient
things, the point of which is to better expose potential bugs in tests. For
example, the DirectRunner will randomly shuffle the order of PCollections
to ensure that your code does not rely on ordering.  All of this adds cost,
because the current runner is designed for testing. There have been
requests in the past for an "optimized" local runner, however we don't
currently have such a thing.

In this case, using coders to clone values is more correct. In a
distributed environment using encode/decode is the only way to copy values,
and the DirectRunner is trying to ensure that your code is correct in a
distributed environment.

Reuven

On Mon, Jul 9, 2018 at 7:22 AM Vojtech Janota 
wrote:

> Hi,
>
> We are using Apache Beam in our project for some time now. Since our
> datasets are of modest size, we have so far used DirectRunner as the
> computation easily fits onto a single machine. Recently we upgraded Beam
> from 2.2 to 2.4 and found out that performance of our pipelines drastically
> deteriorated. Pipelines that took ~3 minutes with 2.2 do not finish within
> hours now. We tried to isolate the change that causes the slowdown and came
> to the commits into the "InMemoryStateInternals" class:
>
> * https://github.com/apache/beam/commit/32a427c
> * https://github.com/apache/beam/commit/8151d82
>
> In a nutshell where previously the copy() method simply assigned:
>
>   that.value = this.value
>
> There is now coder encode/decode combo hidden behind:
>
>   that.value = uncheckedClone(coder, this.value)
>
> Can somebody explain the purpose of this change? Is it meant as an
> additional "enforcement" point, similar to DirectRunner's
> enforceImmutability and enforceEncodability? Or is it something that is
> genuinely needed to provide correct behaviour of the pipeline?
>
> Any hints or thoughts are appreciated.
>
> Regards,
> Vojta
>
>
>
>
>
>


Re: Performance issue in Beam 2.4 onwards

2018-07-09 Thread Jean-Baptiste Onofré
Hi,

Do you use specific/complex coders in your pipeline ?

I'm sure Eugene will propose some insights about this change: AFAIR, the
purpose is to have a cleaner use of coders and identify identity copy.

Regards
JB

On 09/07/2018 16:22, Vojtech Janota wrote:
> Hi,
> 
> We are using Apache Beam in our project for some time now. Since our
> datasets are of modest size, we have so far used DirectRunner as the
> computation easily fits onto a single machine. Recently we upgraded Beam
> from 2.2 to 2.4 and found out that performance of our pipelines
> drastically deteriorated. Pipelines that took ~3 minutes with 2.2 do not
> finish within hours now. We tried to isolate the change that causes the
> slowdown and came to the commits into the "InMemoryStateInternals" class:
> 
> * https://github.com/apache/beam/commit/32a427c
> * https://github.com/apache/beam/commit/8151d82
> 
> In a nutshell where previously the copy() method simply assigned:
> 
>   that.value = this.value
> 
> There is now coder encode/decode combo hidden behind:
> 
>   that.value = uncheckedClone(coder, this.value)
> 
> Can somebody explain the purpose of this change? Is it meant as an
> additional "enforcement" point, similar to DirectRunner's
> enforceImmutability and enforceEncodability? Or is it something that is
> genuinely needed to provide correct behaviour of the pipeline?
> 
> Any hints or thoughts are appreciated.
> 
> Regards,
> Vojta
> 
>  
> 
> 
> 

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Performance issue in Beam 2.4 onwards

2018-07-09 Thread Vojtech Janota
Hi,

We are using Apache Beam in our project for some time now. Since our
datasets are of modest size, we have so far used DirectRunner as the
computation easily fits onto a single machine. Recently we upgraded Beam
from 2.2 to 2.4 and found out that performance of our pipelines drastically
deteriorated. Pipelines that took ~3 minutes with 2.2 do not finish within
hours now. We tried to isolate the change that causes the slowdown and came
to the commits into the "InMemoryStateInternals" class:

* https://github.com/apache/beam/commit/32a427c
* https://github.com/apache/beam/commit/8151d82

In a nutshell where previously the copy() method simply assigned:

  that.value = this.value

There is now coder encode/decode combo hidden behind:

  that.value = uncheckedClone(coder, this.value)

Can somebody explain the purpose of this change? Is it meant as an
additional "enforcement" point, similar to DirectRunner's
enforceImmutability and enforceEncodability? Or is it something that is
genuinely needed to provide correct behaviour of the pipeline?

Any hints or thoughts are appreciated.

Regards,
Vojta