I have created
https://issues.apache.org/jira/browse/BEAM-7204
https://issues.apache.org/jira/browse/BEAM-7206

to track these topics further

On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek <[email protected]> wrote:

>
>
> On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles <[email protected]> wrote:
>
>>
>>
>> On Tue, Apr 30, 2019, 07:05 Reuven Lax <[email protected]> wrote:
>>
>>> In that case, Robert's point is quite valid. The old Flink runner I
>>> believe had no knowledge of fusion, which was known to make it extremely
>>> slow. A lot of work went into making the portable runner fusion aware, so
>>> we don't need to round trip through coders on every ParDo.
>>>
>>
>> The old Flink runner got fusion for free, since Flink does it. The new
>> fusion in portability is because fusing the runner side of portability
>> steps does not achieve real fusion
>>
>
> Aha, I see. So the feature in Flink is operator chaining and Flink per
> default initiate copy of input elements. In case of Beam coders copy seems
> to be more noticable than native Flink.
> So do I get it right that in portable runner scenario, you do similar
> chaining via this "fusion of stages"? Curious here... how is it different
> from chaining so runner can be sure that not doing copy is "safe" with
> respect to user defined functions and their behaviour over inputs?
>
>
>>
>>> Reuven
>>>
>>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek <[email protected]>
>>> wrote:
>>>
>>>> It was not a portable Flink runner.
>>>>
>>>> Thanks all for the thoughts, I will create JIRAs, as suggested, with my
>>>> findings and send them out
>>>>
>>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax <[email protected]> wrote:
>>>>
>>>>> Jozef did you use the portable Flink runner or the old one?
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Thanks for starting this investigation. As mentioned, most of the work
>>>>>> to date has been on feature parity, not performance parity, but we're
>>>>>> at the point that the latter should be tackled as well. Even if there
>>>>>> is a slight overhead (and there's talk about integrating more deeply
>>>>>> with the Flume DAG that could elide even that) I'd expect it should be
>>>>>> nowhere near the 3x that you're seeing. Aside from the timer issue,
>>>>>> sounds like the cloning via coders is is a huge drag that needs to be
>>>>>> addressed. I wonder if this is one of those cases where using the
>>>>>> portability framework could be a performance win (specifically, no
>>>>>> cloning would happen between operators of fused stages, and the
>>>>>> cloning between operators could be on the raw bytes[] (if needed at
>>>>>> all, because we know they wouldn't be mutated).
>>>>>>
>>>>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles <[email protected]>
>>>>>> wrote:
>>>>>> >
>>>>>> > Specifically, a lot of shared code assumes that repeatedly setting
>>>>>> a timer is nearly free / the same cost as determining whether or not to 
>>>>>> set
>>>>>> the timer. ReduceFnRunner has been refactored in a way so it would be 
>>>>>> very
>>>>>> easy to set the GC timer once per window that occurs in a bundle, but
>>>>>> there's probably some underlying inefficiency around why this isn't cheap
>>>>>> that would be a bigger win.
>>>>>> >
>>>>>> > Kenn
>>>>>> >
>>>>>> > On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax <[email protected]>
>>>>>> wrote:
>>>>>> >>
>>>>>> >> I think the short answer is that folks working on the BeamFlink
>>>>>> runner have mostly been focused on getting everything working, and so 
>>>>>> have
>>>>>> not dug into this performance too deeply. I suspect that there is
>>>>>> low-hanging fruit to optimize as a result.
>>>>>> >>
>>>>>> >> You're right that ReduceFnRunner schedules a timer for each
>>>>>> element. I think this code dates back to before Beam; on Dataflow timers
>>>>>> are identified by tag, so this simply overwrites the existing timer which
>>>>>> is very cheap in Dataflow. If it is not cheap on Flink, this might be
>>>>>> something to optimize.
>>>>>> >>
>>>>>> >> Reuven
>>>>>> >>
>>>>>> >> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek <
>>>>>> [email protected]> wrote:
>>>>>> >>>
>>>>>> >>> Hello,
>>>>>> >>>
>>>>>> >>> I am interested in any knowledge or thoughts on what should be /
>>>>>> is an overhead of running Beam pipelines instead of pipelines written on
>>>>>> "bare runner". Is this something which is being tested or investigated by
>>>>>> community? Is there a consensus in what bounds should the overhead
>>>>>> typically be? I realise this is very runner specific, but certain things
>>>>>> are imposed also by SDK model itself.
>>>>>> >>>
>>>>>> >>> I tested simple streaming pipeline on Flink vs Beam-Flink and
>>>>>> found very noticeable differences. I want to stress out, it was not a
>>>>>> performance test. Job does following:
>>>>>> >>>
>>>>>> >>> Read Kafka -> Deserialize to Proto -> Filter deserialisation
>>>>>> errors -> Reshuffle -> Report counter.inc() to metrics for throughput
>>>>>> >>>
>>>>>> >>> Both jobs had same configuration and same state backed with same
>>>>>> checkpointing strategy. What I noticed from few simple test runs:
>>>>>> >>>
>>>>>> >>> * first run on Flink 1.5.0 from CPU profiles on one worker I have
>>>>>> found out that ~50% time was spend either on removing timers from
>>>>>> HeapInternalTimerService or in java.io.ByteArrayOutputStream from
>>>>>> CoderUtils.clone()
>>>>>> >>>
>>>>>> >>> * problem with timer delete was addressed by FLINK-9423. I have
>>>>>> retested on Flink 1.7.2 and there was not much time is spend in timer
>>>>>> delete now, but root cause was not removed. It still remains that timers
>>>>>> are frequently registered and removed ( I believe from
>>>>>> ReduceFnRunner.scheduleGarbageCollectionTimer() in which case it is 
>>>>>> called
>>>>>> per processed element? )  which is noticeable in GC activity, Heap and
>>>>>> State ...
>>>>>> >>>
>>>>>> >>> * in Flink I use FileSystem state backed which keeps state in
>>>>>> memory CopyOnWriteStateTable which after some time is full of PaneInfo
>>>>>> objects. Maybe they come from PaneInfoTracker activity
>>>>>> >>>
>>>>>> >>> * Coder clone is painfull. Pure Flink job does copy between
>>>>>> operators too, in my case it is via Kryo.copy() but this is not 
>>>>>> noticeable
>>>>>> in CPU profile. Kryo.copy() does copy on object level not boject -> bytes
>>>>>> -> object which is cheaper
>>>>>> >>>
>>>>>> >>> Overall, my observation is that pure Flink can be roughly 3x
>>>>>> faster.
>>>>>> >>>
>>>>>> >>> I do not know what I am trying to achieve here :) Probably just
>>>>>> start a discussion and collect thoughts and other experiences on the cost
>>>>>> of running some data processing on Beam and particular runner.
>>>>>> >>>
>>>>>>
>>>>>

Reply via email to