I am not sure what are you referring to here. What do you mean Kryo is simply 
slower ... Beam Kryo or Flink Kryo or?

Flink uses Kryo as a fallback serializer when its own type serialization system can't analyze the type. I'm just guessing here that this could be slower.

On 02.05.19 16:51, Jozef Vilcek wrote:


On Thu, May 2, 2019 at 3:41 PM Maximilian Michels <[email protected] <mailto:[email protected]>> wrote:

    Thanks for the JIRA issues Jozef!

     > So the feature in Flink is operator chaining and Flink per
    default initiate copy of input elements. In case of Beam coders copy
    seems to be more noticable than native Flink.

    Copying between chained operators can be turned off in the
    FlinkPipelineOptions (if you know what you're doing).


Yes, I know that it can be instracted to reuse objects (if you are referring to this). I am just not sure I want to open this door in general :) But it is interesting to learn, that with portability, this will be turned On per default. Quite important finding imho.

    Beam coders should
    not be slower than Flink's. They are simple wrapped. It seems Kryo is
    simply slower which we could fix by providing more type hints to Flink.


I am not sure what are you referring to here. What do you mean Kryo is simply slower ... Beam Kryo or Flink Kryo or?

    -Max

    On 02.05.19 13:15, Robert Bradshaw wrote:
     > Thanks for filing those.
     >
     > As for how not doing a copy is "safe," it's not really. Beam simply
     > asserts that you MUST NOT mutate your inputs (and direct runners,
     > which are used during testing, do perform extra copies and checks to
     > catch violations of this requirement).
     >
     > On Thu, May 2, 2019 at 1:02 PM Jozef Vilcek
    <[email protected] <mailto:[email protected]>> wrote:
     >>
     >> I have created
     >> https://issues.apache.org/jira/browse/BEAM-7204
     >> https://issues.apache.org/jira/browse/BEAM-7206
     >>
     >> to track these topics further
     >>
     >> On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek
    <[email protected] <mailto:[email protected]>> wrote:
     >>>
     >>>
     >>>
     >>> On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles
    <[email protected] <mailto:[email protected]>> wrote:
     >>>>
     >>>>
     >>>>
     >>>> On Tue, Apr 30, 2019, 07:05 Reuven Lax <[email protected]
    <mailto:[email protected]>> wrote:
     >>>>>
     >>>>> In that case, Robert's point is quite valid. The old Flink
    runner I believe had no knowledge of fusion, which was known to make
    it extremely slow. A lot of work went into making the portable
    runner fusion aware, so we don't need to round trip through coders
    on every ParDo.
     >>>>
     >>>>
     >>>> The old Flink runner got fusion for free, since Flink does it.
    The new fusion in portability is because fusing the runner side of
    portability steps does not achieve real fusion
     >>>
     >>>
     >>> Aha, I see. So the feature in Flink is operator chaining and
    Flink per default initiate copy of input elements. In case of Beam
    coders copy seems to be more noticable than native Flink.
     >>> So do I get it right that in portable runner scenario, you do
    similar chaining via this "fusion of stages"? Curious here... how is
    it different from chaining so runner can be sure that not doing copy
    is "safe" with respect to user defined functions and their behaviour
    over inputs?
     >>>
     >>>>>
     >>>>>
     >>>>> Reuven
     >>>>>
     >>>>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek
    <[email protected] <mailto:[email protected]>> wrote:
     >>>>>>
     >>>>>> It was not a portable Flink runner.
     >>>>>>
     >>>>>> Thanks all for the thoughts, I will create JIRAs, as
    suggested, with my findings and send them out
     >>>>>>
     >>>>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax
    <[email protected] <mailto:[email protected]>> wrote:
     >>>>>>>
     >>>>>>> Jozef did you use the portable Flink runner or the old one?
     >>>>>>>
     >>>>>>> Reuven
     >>>>>>>
     >>>>>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw
    <[email protected] <mailto:[email protected]>> wrote:
     >>>>>>>>
     >>>>>>>> Thanks for starting this investigation. As mentioned, most
    of the work
     >>>>>>>> to date has been on feature parity, not performance
    parity, but we're
     >>>>>>>> at the point that the latter should be tackled as well.
    Even if there
     >>>>>>>> is a slight overhead (and there's talk about integrating
    more deeply
     >>>>>>>> with the Flume DAG that could elide even that) I'd expect
    it should be
     >>>>>>>> nowhere near the 3x that you're seeing. Aside from the
    timer issue,
     >>>>>>>> sounds like the cloning via coders is is a huge drag that
    needs to be
     >>>>>>>> addressed. I wonder if this is one of those cases where
    using the
     >>>>>>>> portability framework could be a performance win
    (specifically, no
     >>>>>>>> cloning would happen between operators of fused stages,
    and the
     >>>>>>>> cloning between operators could be on the raw bytes[] (if
    needed at
     >>>>>>>> all, because we know they wouldn't be mutated).
     >>>>>>>>
     >>>>>>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles
    <[email protected] <mailto:[email protected]>> wrote:
     >>>>>>>>>
     >>>>>>>>> Specifically, a lot of shared code assumes that
    repeatedly setting a timer is nearly free / the same cost as
    determining whether or not to set the timer. ReduceFnRunner has been
    refactored in a way so it would be very easy to set the GC timer
    once per window that occurs in a bundle, but there's probably some
    underlying inefficiency around why this isn't cheap that would be a
    bigger win.
     >>>>>>>>>
     >>>>>>>>> Kenn
     >>>>>>>>>
     >>>>>>>>> On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax
    <[email protected] <mailto:[email protected]>> wrote:
     >>>>>>>>>>
     >>>>>>>>>> I think the short answer is that folks working on the
    BeamFlink runner have mostly been focused on getting everything
    working, and so have not dug into this performance too deeply. I
    suspect that there is low-hanging fruit to optimize as a result.
     >>>>>>>>>>
     >>>>>>>>>> You're right that ReduceFnRunner schedules a timer for
    each element. I think this code dates back to before Beam; on
    Dataflow timers are identified by tag, so this simply overwrites the
    existing timer which is very cheap in Dataflow. If it is not cheap
    on Flink, this might be something to optimize.
     >>>>>>>>>>
     >>>>>>>>>> Reuven
     >>>>>>>>>>
     >>>>>>>>>> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek
    <[email protected] <mailto:[email protected]>> wrote:
     >>>>>>>>>>>
     >>>>>>>>>>> Hello,
     >>>>>>>>>>>
     >>>>>>>>>>> I am interested in any knowledge or thoughts on what
    should be / is an overhead of running Beam pipelines instead of
    pipelines written on "bare runner". Is this something which is being
    tested or investigated by community? Is there a consensus in what
    bounds should the overhead typically be? I realise this is very
    runner specific, but certain things are imposed also by SDK model
    itself.
     >>>>>>>>>>>
     >>>>>>>>>>> I tested simple streaming pipeline on Flink vs
    Beam-Flink and found very noticeable differences. I want to stress
    out, it was not a performance test. Job does following:
     >>>>>>>>>>>
     >>>>>>>>>>> Read Kafka -> Deserialize to Proto -> Filter
    deserialisation errors -> Reshuffle -> Report counter.inc() to
    metrics for throughput
     >>>>>>>>>>>
     >>>>>>>>>>> Both jobs had same configuration and same state backed
    with same checkpointing strategy. What I noticed from few simple
    test runs:
     >>>>>>>>>>>
     >>>>>>>>>>> * first run on Flink 1.5.0 from CPU profiles on one
    worker I have found out that ~50% time was spend either on removing
    timers from HeapInternalTimerService or in
    java.io.ByteArrayOutputStream from CoderUtils.clone()
     >>>>>>>>>>>
     >>>>>>>>>>> * problem with timer delete was addressed by
    FLINK-9423. I have retested on Flink 1.7.2 and there was not much
    time is spend in timer delete now, but root cause was not removed.
    It still remains that timers are frequently registered and removed (
    I believe from ReduceFnRunner.scheduleGarbageCollectionTimer() in
    which case it is called per processed element? )  which is
    noticeable in GC activity, Heap and State ...
     >>>>>>>>>>>
     >>>>>>>>>>> * in Flink I use FileSystem state backed which keeps
    state in memory CopyOnWriteStateTable which after some time is full
    of PaneInfo objects. Maybe they come from PaneInfoTracker activity
     >>>>>>>>>>>
     >>>>>>>>>>> * Coder clone is painfull. Pure Flink job does copy
    between operators too, in my case it is via Kryo.copy() but this is
    not noticeable in CPU profile. Kryo.copy() does copy on object level
    not boject -> bytes -> object which is cheaper
     >>>>>>>>>>>
     >>>>>>>>>>> Overall, my observation is that pure Flink can be
    roughly 3x faster.
     >>>>>>>>>>>
     >>>>>>>>>>> I do not know what I am trying to achieve here :)
    Probably just start a discussion and collect thoughts and other
    experiences on the cost of running some data processing on Beam and
    particular runner.
     >>>>>>>>>>>

Reply via email to