If we added something slow to the core library in order to better test
DirectRunner, that does sound like an unfortunate bug.

On Mon, Jul 9, 2018 at 11:21 PM Vojtech Janota <vojta.jan...@gmail.com>
wrote:

> Hi guys,
>
> Thank you for all of your feedback. I have created relevant issue in JIRA:
> https://issues.apache.org/jira/browse/BEAM-4750
>
> @Lukasz: me mentioning the DirectRunner was somewhat unfortunate - the
> bottleneck was introduced into the core library and so Flink and Spark
> runners would be impacted too
>
> Thanks,
> Vojta
>
> On Mon, Jul 9, 2018 at 5:48 PM, Lukasz Cwik <lc...@google.com> wrote:
>
>> Instead of reverting/working around specific checks/tests that the
>> DirectRunner is doing, have you considered using one of the other runners
>> like Flink or Spark with a local execution cluster. You won't hit the
>> validation/verification bottlenecks that DirectRunner specifically imposes.
>>
>> On Mon, Jul 9, 2018 at 8:46 AM Jean-Baptiste Onofré <j...@nanthrax.net>
>> wrote:
>>
>>> Thanks for the update Eugene.
>>>
>>> @Vojta: do you mind to create a Jira ? I will tackle a fix for that.
>>>
>>> Regards
>>> JB
>>>
>>> On 09/07/2018 17:33, Eugene Kirpichov wrote:
>>> > Hi -
>>> >
>>> > If I remember correctly, the reason for this change was to ensure that
>>> > the state is encodable at all. Prior to the change, there had been
>>> > situations where the coder specified on a state cell is buggy, absent
>>> or
>>> > set incorrectly (due to some issue in coder inference), but direct
>>> > runner did not detect this because it never tried to encode the state
>>> > cells - this would have blown up in any distributed runner.
>>> >
>>> > I think it should be possible to relax this and clone only values being
>>> > added to the state, rather than cloning the whole state on copy(). I
>>> > don't have time to work on this change myself, but I can review a PR if
>>> > someone else does.
>>> >
>>> > On Mon, Jul 9, 2018 at 8:28 AM Jean-Baptiste Onofré <j...@nanthrax.net
>>> > <mailto:j...@nanthrax.net>> wrote:
>>> >
>>> >     Hi Vojta,
>>> >
>>> >     I fully agree, that's why it makes sense to wait Eugene's feedback.
>>> >
>>> >     I remember we had some performance regression on the direct runner
>>> >     identified thanks to Nexmark, but it has been addressed by
>>> reverting a
>>> >     change.
>>> >
>>> >     Good catch anyway !
>>> >
>>> >     Regards
>>> >     JB
>>> >
>>> >     On 09/07/2018 17:20, Vojtech Janota wrote:
>>> >     > Hi Reuven,
>>> >     >
>>> >     > I'm not really complaining about DirectRunner. In fact it seems
>>> to
>>> >     me as
>>> >     > if what previously was considered as part of the "expensive extra
>>> >     > checks" done by the DirectRunner is now done within the
>>> >     > beam-runners-core-java library. Considering that all objects
>>> involved
>>> >     > are immutable (in our case at least) and simple assignment is
>>> >     > sufficient, the serialization-deserialization really seems as
>>> unwanted
>>> >     > and hugely expensive correctness check. If there was a problem
>>> with
>>> >     > identity copy, wasn't DirectRunner supposed to reveal it?
>>> >     >
>>> >     > Regards,
>>> >     > Vojta
>>> >     >
>>> >     > On Mon, Jul 9, 2018 at 4:46 PM, Reuven Lax <re...@google.com
>>> >     <mailto:re...@google.com>
>>> >     > <mailto:re...@google.com <mailto:re...@google.com>>> wrote:
>>> >     >
>>> >     >     Hi Vojita,
>>> >     >
>>> >     >     One problem is that the DirectRunner is designed for
>>> testing, not
>>> >     >     for performance. The DirectRunner currently does many
>>> >     >     purposely-inefficient things, the point of which is to better
>>> >     expose
>>> >     >     potential bugs in tests. For example, the DirectRunner will
>>> >     randomly
>>> >     >     shuffle the order of PCollections to ensure that your code
>>> >     does not
>>> >     >     rely on ordering.  All of this adds cost, because the current
>>> >     runner
>>> >     >     is designed for testing. There have been requests in the past
>>> >     for an
>>> >     >     "optimized" local runner, however we don't currently have
>>> such
>>> >     a thing.
>>> >     >
>>> >     >     In this case, using coders to clone values is more correct.
>>> In a
>>> >     >     distributed environment using encode/decode is the only way
>>> to
>>> >     copy
>>> >     >     values, and the DirectRunner is trying to ensure that your
>>> code is
>>> >     >     correct in a distributed environment.
>>> >     >
>>> >     >     Reuven
>>> >     >
>>> >     >     On Mon, Jul 9, 2018 at 7:22 AM Vojtech Janota
>>> >     >     <vojta.jan...@gmail.com <mailto:vojta.jan...@gmail.com>
>>> >     <mailto:vojta.jan...@gmail.com <mailto:vojta.jan...@gmail.com>>>
>>> wrote:
>>> >     >
>>> >     >         Hi,
>>> >     >
>>> >     >         We are using Apache Beam in our project for some time
>>> now.
>>> >     Since
>>> >     >         our datasets are of modest size, we have so far used
>>> >     >         DirectRunner as the computation easily fits onto a single
>>> >     >         machine. Recently we upgraded Beam from 2.2 to 2.4 and
>>> >     found out
>>> >     >         that performance of our pipelines drastically
>>> deteriorated.
>>> >     >         Pipelines that took ~3 minutes with 2.2 do not finish
>>> within
>>> >     >         hours now. We tried to isolate the change that causes the
>>> >     >         slowdown and came to the commits into the
>>> >     >         "InMemoryStateInternals" class:
>>> >     >
>>> >     >         * https://github.com/apache/beam/commit/32a427c
>>> >     >         <https://github.com/apache/beam/commit/32a427c>
>>> >     >         * https://github.com/apache/beam/commit/8151d82
>>> >     >         <https://github.com/apache/beam/commit/8151d82>
>>> >     >
>>> >     >         In a nutshell where previously the copy() method simply
>>> >     assigned:
>>> >     >
>>> >     >           that.value = this.value
>>> >     >
>>> >     >         There is now coder encode/decode combo hidden behind:
>>> >     >
>>> >     >           that.value = uncheckedClone(coder, this.value)
>>> >     >
>>> >     >         Can somebody explain the purpose of this change? Is it
>>> >     meant as
>>> >     >         an additional "enforcement" point, similar to
>>> DirectRunner's
>>> >     >         enforceImmutability and enforceEncodability? Or is it
>>> >     something
>>> >     >         that is genuinely needed to provide correct behaviour of
>>> the
>>> >     >         pipeline?
>>> >     >
>>> >     >         Any hints or thoughts are appreciated.
>>> >     >
>>> >     >         Regards,
>>> >     >         Vojta
>>> >     >
>>> >     >
>>> >     >
>>> >     >
>>> >     >
>>> >     >
>>> >
>>> >     --
>>> >     Jean-Baptiste Onofré
>>> >     jbono...@apache.org <mailto:jbono...@apache.org>
>>> >     http://blog.nanthrax.net
>>> >     Talend - http://www.talend.com
>>> >
>>>
>>> --
>>> Jean-Baptiste Onofré
>>> jbono...@apache.org
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>
>

Reply via email to