All right, I can test it out if I can. How to deploy pipeline on Flink
portable runner? Should I follow this to be able to do it?
https://beam.apache.org/documentation/runners/flink/

On Tue, Apr 30, 2019 at 4:05 PM Reuven Lax <re...@google.com> wrote:

> In that case, Robert's point is quite valid. The old Flink runner I
> believe had no knowledge of fusion, which was known to make it extremely
> slow. A lot of work went into making the portable runner fusion aware, so
> we don't need to round trip through coders on every ParDo.
>
> Reuven
>
> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek <jozo.vil...@gmail.com>
> wrote:
>
>> It was not a portable Flink runner.
>>
>> Thanks all for the thoughts, I will create JIRAs, as suggested, with my
>> findings and send them out
>>
>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax <re...@google.com> wrote:
>>
>>> Jozef did you use the portable Flink runner or the old one?
>>>
>>> Reuven
>>>
>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw <rober...@google.com>
>>> wrote:
>>>
>>>> Thanks for starting this investigation. As mentioned, most of the work
>>>> to date has been on feature parity, not performance parity, but we're
>>>> at the point that the latter should be tackled as well. Even if there
>>>> is a slight overhead (and there's talk about integrating more deeply
>>>> with the Flume DAG that could elide even that) I'd expect it should be
>>>> nowhere near the 3x that you're seeing. Aside from the timer issue,
>>>> sounds like the cloning via coders is is a huge drag that needs to be
>>>> addressed. I wonder if this is one of those cases where using the
>>>> portability framework could be a performance win (specifically, no
>>>> cloning would happen between operators of fused stages, and the
>>>> cloning between operators could be on the raw bytes[] (if needed at
>>>> all, because we know they wouldn't be mutated).
>>>>
>>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles <k...@apache.org>
>>>> wrote:
>>>> >
>>>> > Specifically, a lot of shared code assumes that repeatedly setting a
>>>> timer is nearly free / the same cost as determining whether or not to set
>>>> the timer. ReduceFnRunner has been refactored in a way so it would be very
>>>> easy to set the GC timer once per window that occurs in a bundle, but
>>>> there's probably some underlying inefficiency around why this isn't cheap
>>>> that would be a bigger win.
>>>> >
>>>> > Kenn
>>>> >
>>>> > On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax <re...@google.com> wrote:
>>>> >>
>>>> >> I think the short answer is that folks working on the BeamFlink
>>>> runner have mostly been focused on getting everything working, and so have
>>>> not dug into this performance too deeply. I suspect that there is
>>>> low-hanging fruit to optimize as a result.
>>>> >>
>>>> >> You're right that ReduceFnRunner schedules a timer for each element.
>>>> I think this code dates back to before Beam; on Dataflow timers are
>>>> identified by tag, so this simply overwrites the existing timer which is
>>>> very cheap in Dataflow. If it is not cheap on Flink, this might be
>>>> something to optimize.
>>>> >>
>>>> >> Reuven
>>>> >>
>>>> >> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek <jozo.vil...@gmail.com>
>>>> wrote:
>>>> >>>
>>>> >>> Hello,
>>>> >>>
>>>> >>> I am interested in any knowledge or thoughts on what should be / is
>>>> an overhead of running Beam pipelines instead of pipelines written on "bare
>>>> runner". Is this something which is being tested or investigated by
>>>> community? Is there a consensus in what bounds should the overhead
>>>> typically be? I realise this is very runner specific, but certain things
>>>> are imposed also by SDK model itself.
>>>> >>>
>>>> >>> I tested simple streaming pipeline on Flink vs Beam-Flink and found
>>>> very noticeable differences. I want to stress out, it was not a performance
>>>> test. Job does following:
>>>> >>>
>>>> >>> Read Kafka -> Deserialize to Proto -> Filter deserialisation errors
>>>> -> Reshuffle -> Report counter.inc() to metrics for throughput
>>>> >>>
>>>> >>> Both jobs had same configuration and same state backed with same
>>>> checkpointing strategy. What I noticed from few simple test runs:
>>>> >>>
>>>> >>> * first run on Flink 1.5.0 from CPU profiles on one worker I have
>>>> found out that ~50% time was spend either on removing timers from
>>>> HeapInternalTimerService or in java.io.ByteArrayOutputStream from
>>>> CoderUtils.clone()
>>>> >>>
>>>> >>> * problem with timer delete was addressed by FLINK-9423. I have
>>>> retested on Flink 1.7.2 and there was not much time is spend in timer
>>>> delete now, but root cause was not removed. It still remains that timers
>>>> are frequently registered and removed ( I believe from
>>>> ReduceFnRunner.scheduleGarbageCollectionTimer() in which case it is called
>>>> per processed element? )  which is noticeable in GC activity, Heap and
>>>> State ...
>>>> >>>
>>>> >>> * in Flink I use FileSystem state backed which keeps state in
>>>> memory CopyOnWriteStateTable which after some time is full of PaneInfo
>>>> objects. Maybe they come from PaneInfoTracker activity
>>>> >>>
>>>> >>> * Coder clone is painfull. Pure Flink job does copy between
>>>> operators too, in my case it is via Kryo.copy() but this is not noticeable
>>>> in CPU profile. Kryo.copy() does copy on object level not boject -> bytes
>>>> -> object which is cheaper
>>>> >>>
>>>> >>> Overall, my observation is that pure Flink can be roughly 3x faster.
>>>> >>>
>>>> >>> I do not know what I am trying to achieve here :) Probably just
>>>> start a discussion and collect thoughts and other experiences on the cost
>>>> of running some data processing on Beam and particular runner.
>>>> >>>
>>>>
>>>

Reply via email to