Well, I did not do a proper perf test. What I am saying is that my
observation is:

* Flink native job does use copy of inputs but looking at stack trace perf
snapshots, CPU is most time engaged by inflating bytes read from Kafka
* Running Beam pipeline on Flink, Coder copy trace pops up in top CPU usages

I am just speculating here. Flink's "coders" does have
serialise/deserialize option and copy option. Plus it has an
isImmutableType() hint, so it has more potential to be more effective.


On Fri, May 3, 2019 at 2:01 PM Maximilian Michels <m...@apache.org> wrote:

> Misread your post. You're saying that Kryo is more efficient that a
> roundtrip obj->bytes->obj_copy. Still, most types use Flink's
> serializers which also do the above roundtrip. So I'm not sure this
> performance advantage holds true for other Flink jobs.
>
> On 02.05.19 20:01, Maximilian Michels wrote:
> >> I am not sure what are you referring to here. What do you mean Kryo is
> >> simply slower ... Beam Kryo or Flink Kryo or?
> >
> > Flink uses Kryo as a fallback serializer when its own type serialization
> > system can't analyze the type. I'm just guessing here that this could be
> > slower.
> >
> > On 02.05.19 16:51, Jozef Vilcek wrote:
> >>
> >>
> >> On Thu, May 2, 2019 at 3:41 PM Maximilian Michels <m...@apache.org
> >> <mailto:m...@apache.org>> wrote:
> >>
> >>     Thanks for the JIRA issues Jozef!
> >>
> >>      > So the feature in Flink is operator chaining and Flink per
> >>     default initiate copy of input elements. In case of Beam coders copy
> >>     seems to be more noticable than native Flink.
> >>
> >>     Copying between chained operators can be turned off in the
> >>     FlinkPipelineOptions (if you know what you're doing).
> >>
> >>
> >> Yes, I know that it can be instracted to reuse objects (if you are
> >> referring to this). I am just not sure I want to open this door in
> >> general :)
> >> But it is interesting to learn, that with portability, this will be
> >> turned On per default. Quite important finding imho.
> >>
> >>     Beam coders should
> >>     not be slower than Flink's. They are simple wrapped. It seems Kryo
> is
> >>     simply slower which we could fix by providing more type hints to
> >> Flink.
> >>
> >>
> >> I am not sure what are you referring to here. What do you mean Kryo is
> >> simply slower ... Beam Kryo or Flink Kryo or?
> >>
> >>     -Max
> >>
> >>     On 02.05.19 13:15, Robert Bradshaw wrote:
> >>      > Thanks for filing those.
> >>      >
> >>      > As for how not doing a copy is "safe," it's not really. Beam
> >> simply
> >>      > asserts that you MUST NOT mutate your inputs (and direct runners,
> >>      > which are used during testing, do perform extra copies and
> >> checks to
> >>      > catch violations of this requirement).
> >>      >
> >>      > On Thu, May 2, 2019 at 1:02 PM Jozef Vilcek
> >>     <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote:
> >>      >>
> >>      >> I have created
> >>      >> https://issues.apache.org/jira/browse/BEAM-7204
> >>      >> https://issues.apache.org/jira/browse/BEAM-7206
> >>      >>
> >>      >> to track these topics further
> >>      >>
> >>      >> On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek
> >>     <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote:
> >>      >>>
> >>      >>>
> >>      >>>
> >>      >>> On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles
> >>     <k...@apache.org <mailto:k...@apache.org>> wrote:
> >>      >>>>
> >>      >>>>
> >>      >>>>
> >>      >>>> On Tue, Apr 30, 2019, 07:05 Reuven Lax <re...@google.com
> >>     <mailto:re...@google.com>> wrote:
> >>      >>>>>
> >>      >>>>> In that case, Robert's point is quite valid. The old Flink
> >>     runner I believe had no knowledge of fusion, which was known to make
> >>     it extremely slow. A lot of work went into making the portable
> >>     runner fusion aware, so we don't need to round trip through coders
> >>     on every ParDo.
> >>      >>>>
> >>      >>>>
> >>      >>>> The old Flink runner got fusion for free, since Flink does it.
> >>     The new fusion in portability is because fusing the runner side of
> >>     portability steps does not achieve real fusion
> >>      >>>
> >>      >>>
> >>      >>> Aha, I see. So the feature in Flink is operator chaining and
> >>     Flink per default initiate copy of input elements. In case of Beam
> >>     coders copy seems to be more noticable than native Flink.
> >>      >>> So do I get it right that in portable runner scenario, you do
> >>     similar chaining via this "fusion of stages"? Curious here... how is
> >>     it different from chaining so runner can be sure that not doing copy
> >>     is "safe" with respect to user defined functions and their behaviour
> >>     over inputs?
> >>      >>>
> >>      >>>>>
> >>      >>>>>
> >>      >>>>> Reuven
> >>      >>>>>
> >>      >>>>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek
> >>     <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote:
> >>      >>>>>>
> >>      >>>>>> It was not a portable Flink runner.
> >>      >>>>>>
> >>      >>>>>> Thanks all for the thoughts, I will create JIRAs, as
> >>     suggested, with my findings and send them out
> >>      >>>>>>
> >>      >>>>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax
> >>     <re...@google.com <mailto:re...@google.com>> wrote:
> >>      >>>>>>>
> >>      >>>>>>> Jozef did you use the portable Flink runner or the old one?
> >>      >>>>>>>
> >>      >>>>>>> Reuven
> >>      >>>>>>>
> >>      >>>>>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw
> >>     <rober...@google.com <mailto:rober...@google.com>> wrote:
> >>      >>>>>>>>
> >>      >>>>>>>> Thanks for starting this investigation. As mentioned, most
> >>     of the work
> >>      >>>>>>>> to date has been on feature parity, not performance
> >>     parity, but we're
> >>      >>>>>>>> at the point that the latter should be tackled as well.
> >>     Even if there
> >>      >>>>>>>> is a slight overhead (and there's talk about integrating
> >>     more deeply
> >>      >>>>>>>> with the Flume DAG that could elide even that) I'd expect
> >>     it should be
> >>      >>>>>>>> nowhere near the 3x that you're seeing. Aside from the
> >>     timer issue,
> >>      >>>>>>>> sounds like the cloning via coders is is a huge drag that
> >>     needs to be
> >>      >>>>>>>> addressed. I wonder if this is one of those cases where
> >>     using the
> >>      >>>>>>>> portability framework could be a performance win
> >>     (specifically, no
> >>      >>>>>>>> cloning would happen between operators of fused stages,
> >>     and the
> >>      >>>>>>>> cloning between operators could be on the raw bytes[] (if
> >>     needed at
> >>      >>>>>>>> all, because we know they wouldn't be mutated).
> >>      >>>>>>>>
> >>      >>>>>>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles
> >>     <k...@apache.org <mailto:k...@apache.org>> wrote:
> >>      >>>>>>>>>
> >>      >>>>>>>>> Specifically, a lot of shared code assumes that
> >>     repeatedly setting a timer is nearly free / the same cost as
> >>     determining whether or not to set the timer. ReduceFnRunner has been
> >>     refactored in a way so it would be very easy to set the GC timer
> >>     once per window that occurs in a bundle, but there's probably some
> >>     underlying inefficiency around why this isn't cheap that would be a
> >>     bigger win.
> >>      >>>>>>>>>
> >>      >>>>>>>>> Kenn
> >>      >>>>>>>>>
> >>      >>>>>>>>> On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax
> >>     <re...@google.com <mailto:re...@google.com>> wrote:
> >>      >>>>>>>>>>
> >>      >>>>>>>>>> I think the short answer is that folks working on the
> >>     BeamFlink runner have mostly been focused on getting everything
> >>     working, and so have not dug into this performance too deeply. I
> >>     suspect that there is low-hanging fruit to optimize as a result.
> >>      >>>>>>>>>>
> >>      >>>>>>>>>> You're right that ReduceFnRunner schedules a timer for
> >>     each element. I think this code dates back to before Beam; on
> >>     Dataflow timers are identified by tag, so this simply overwrites the
> >>     existing timer which is very cheap in Dataflow. If it is not cheap
> >>     on Flink, this might be something to optimize.
> >>      >>>>>>>>>>
> >>      >>>>>>>>>> Reuven
> >>      >>>>>>>>>>
> >>      >>>>>>>>>> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek
> >>     <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote:
> >>      >>>>>>>>>>>
> >>      >>>>>>>>>>> Hello,
> >>      >>>>>>>>>>>
> >>      >>>>>>>>>>> I am interested in any knowledge or thoughts on what
> >>     should be / is an overhead of running Beam pipelines instead of
> >>     pipelines written on "bare runner". Is this something which is being
> >>     tested or investigated by community? Is there a consensus in what
> >>     bounds should the overhead typically be? I realise this is very
> >>     runner specific, but certain things are imposed also by SDK model
> >>     itself.
> >>      >>>>>>>>>>>
> >>      >>>>>>>>>>> I tested simple streaming pipeline on Flink vs
> >>     Beam-Flink and found very noticeable differences. I want to stress
> >>     out, it was not a performance test. Job does following:
> >>      >>>>>>>>>>>
> >>      >>>>>>>>>>> Read Kafka -> Deserialize to Proto -> Filter
> >>     deserialisation errors -> Reshuffle -> Report counter.inc() to
> >>     metrics for throughput
> >>      >>>>>>>>>>>
> >>      >>>>>>>>>>> Both jobs had same configuration and same state backed
> >>     with same checkpointing strategy. What I noticed from few simple
> >>     test runs:
> >>      >>>>>>>>>>>
> >>      >>>>>>>>>>> * first run on Flink 1.5.0 from CPU profiles on one
> >>     worker I have found out that ~50% time was spend either on removing
> >>     timers from HeapInternalTimerService or in
> >>     java.io.ByteArrayOutputStream from CoderUtils.clone()
> >>      >>>>>>>>>>>
> >>      >>>>>>>>>>> * problem with timer delete was addressed by
> >>     FLINK-9423. I have retested on Flink 1.7.2 and there was not much
> >>     time is spend in timer delete now, but root cause was not removed.
> >>     It still remains that timers are frequently registered and removed (
> >>     I believe from ReduceFnRunner.scheduleGarbageCollectionTimer() in
> >>     which case it is called per processed element? )  which is
> >>     noticeable in GC activity, Heap and State ...
> >>      >>>>>>>>>>>
> >>      >>>>>>>>>>> * in Flink I use FileSystem state backed which keeps
> >>     state in memory CopyOnWriteStateTable which after some time is full
> >>     of PaneInfo objects. Maybe they come from PaneInfoTracker activity
> >>      >>>>>>>>>>>
> >>      >>>>>>>>>>> * Coder clone is painfull. Pure Flink job does copy
> >>     between operators too, in my case it is via Kryo.copy() but this is
> >>     not noticeable in CPU profile. Kryo.copy() does copy on object level
> >>     not boject -> bytes -> object which is cheaper
> >>      >>>>>>>>>>>
> >>      >>>>>>>>>>> Overall, my observation is that pure Flink can be
> >>     roughly 3x faster.
> >>      >>>>>>>>>>>
> >>      >>>>>>>>>>> I do not know what I am trying to achieve here :)
> >>     Probably just start a discussion and collect thoughts and other
> >>     experiences on the cost of running some data processing on Beam and
> >>     particular runner.
> >>      >>>>>>>>>>>
> >>
>

Reply via email to