Well, I did not do a proper perf test. What I am saying is that my observation is:
* Flink native job does use copy of inputs but looking at stack trace perf snapshots, CPU is most time engaged by inflating bytes read from Kafka * Running Beam pipeline on Flink, Coder copy trace pops up in top CPU usages I am just speculating here. Flink's "coders" does have serialise/deserialize option and copy option. Plus it has an isImmutableType() hint, so it has more potential to be more effective. On Fri, May 3, 2019 at 2:01 PM Maximilian Michels <m...@apache.org> wrote: > Misread your post. You're saying that Kryo is more efficient that a > roundtrip obj->bytes->obj_copy. Still, most types use Flink's > serializers which also do the above roundtrip. So I'm not sure this > performance advantage holds true for other Flink jobs. > > On 02.05.19 20:01, Maximilian Michels wrote: > >> I am not sure what are you referring to here. What do you mean Kryo is > >> simply slower ... Beam Kryo or Flink Kryo or? > > > > Flink uses Kryo as a fallback serializer when its own type serialization > > system can't analyze the type. I'm just guessing here that this could be > > slower. > > > > On 02.05.19 16:51, Jozef Vilcek wrote: > >> > >> > >> On Thu, May 2, 2019 at 3:41 PM Maximilian Michels <m...@apache.org > >> <mailto:m...@apache.org>> wrote: > >> > >> Thanks for the JIRA issues Jozef! > >> > >> > So the feature in Flink is operator chaining and Flink per > >> default initiate copy of input elements. In case of Beam coders copy > >> seems to be more noticable than native Flink. > >> > >> Copying between chained operators can be turned off in the > >> FlinkPipelineOptions (if you know what you're doing). > >> > >> > >> Yes, I know that it can be instracted to reuse objects (if you are > >> referring to this). I am just not sure I want to open this door in > >> general :) > >> But it is interesting to learn, that with portability, this will be > >> turned On per default. Quite important finding imho. > >> > >> Beam coders should > >> not be slower than Flink's. They are simple wrapped. It seems Kryo > is > >> simply slower which we could fix by providing more type hints to > >> Flink. > >> > >> > >> I am not sure what are you referring to here. What do you mean Kryo is > >> simply slower ... Beam Kryo or Flink Kryo or? > >> > >> -Max > >> > >> On 02.05.19 13:15, Robert Bradshaw wrote: > >> > Thanks for filing those. > >> > > >> > As for how not doing a copy is "safe," it's not really. Beam > >> simply > >> > asserts that you MUST NOT mutate your inputs (and direct runners, > >> > which are used during testing, do perform extra copies and > >> checks to > >> > catch violations of this requirement). > >> > > >> > On Thu, May 2, 2019 at 1:02 PM Jozef Vilcek > >> <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote: > >> >> > >> >> I have created > >> >> https://issues.apache.org/jira/browse/BEAM-7204 > >> >> https://issues.apache.org/jira/browse/BEAM-7206 > >> >> > >> >> to track these topics further > >> >> > >> >> On Wed, May 1, 2019 at 1:24 PM Jozef Vilcek > >> <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote: > >> >>> > >> >>> > >> >>> > >> >>> On Tue, Apr 30, 2019 at 5:42 PM Kenneth Knowles > >> <k...@apache.org <mailto:k...@apache.org>> wrote: > >> >>>> > >> >>>> > >> >>>> > >> >>>> On Tue, Apr 30, 2019, 07:05 Reuven Lax <re...@google.com > >> <mailto:re...@google.com>> wrote: > >> >>>>> > >> >>>>> In that case, Robert's point is quite valid. The old Flink > >> runner I believe had no knowledge of fusion, which was known to make > >> it extremely slow. A lot of work went into making the portable > >> runner fusion aware, so we don't need to round trip through coders > >> on every ParDo. > >> >>>> > >> >>>> > >> >>>> The old Flink runner got fusion for free, since Flink does it. > >> The new fusion in portability is because fusing the runner side of > >> portability steps does not achieve real fusion > >> >>> > >> >>> > >> >>> Aha, I see. So the feature in Flink is operator chaining and > >> Flink per default initiate copy of input elements. In case of Beam > >> coders copy seems to be more noticable than native Flink. > >> >>> So do I get it right that in portable runner scenario, you do > >> similar chaining via this "fusion of stages"? Curious here... how is > >> it different from chaining so runner can be sure that not doing copy > >> is "safe" with respect to user defined functions and their behaviour > >> over inputs? > >> >>> > >> >>>>> > >> >>>>> > >> >>>>> Reuven > >> >>>>> > >> >>>>> On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek > >> <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote: > >> >>>>>> > >> >>>>>> It was not a portable Flink runner. > >> >>>>>> > >> >>>>>> Thanks all for the thoughts, I will create JIRAs, as > >> suggested, with my findings and send them out > >> >>>>>> > >> >>>>>> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax > >> <re...@google.com <mailto:re...@google.com>> wrote: > >> >>>>>>> > >> >>>>>>> Jozef did you use the portable Flink runner or the old one? > >> >>>>>>> > >> >>>>>>> Reuven > >> >>>>>>> > >> >>>>>>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw > >> <rober...@google.com <mailto:rober...@google.com>> wrote: > >> >>>>>>>> > >> >>>>>>>> Thanks for starting this investigation. As mentioned, most > >> of the work > >> >>>>>>>> to date has been on feature parity, not performance > >> parity, but we're > >> >>>>>>>> at the point that the latter should be tackled as well. > >> Even if there > >> >>>>>>>> is a slight overhead (and there's talk about integrating > >> more deeply > >> >>>>>>>> with the Flume DAG that could elide even that) I'd expect > >> it should be > >> >>>>>>>> nowhere near the 3x that you're seeing. Aside from the > >> timer issue, > >> >>>>>>>> sounds like the cloning via coders is is a huge drag that > >> needs to be > >> >>>>>>>> addressed. I wonder if this is one of those cases where > >> using the > >> >>>>>>>> portability framework could be a performance win > >> (specifically, no > >> >>>>>>>> cloning would happen between operators of fused stages, > >> and the > >> >>>>>>>> cloning between operators could be on the raw bytes[] (if > >> needed at > >> >>>>>>>> all, because we know they wouldn't be mutated). > >> >>>>>>>> > >> >>>>>>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles > >> <k...@apache.org <mailto:k...@apache.org>> wrote: > >> >>>>>>>>> > >> >>>>>>>>> Specifically, a lot of shared code assumes that > >> repeatedly setting a timer is nearly free / the same cost as > >> determining whether or not to set the timer. ReduceFnRunner has been > >> refactored in a way so it would be very easy to set the GC timer > >> once per window that occurs in a bundle, but there's probably some > >> underlying inefficiency around why this isn't cheap that would be a > >> bigger win. > >> >>>>>>>>> > >> >>>>>>>>> Kenn > >> >>>>>>>>> > >> >>>>>>>>> On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax > >> <re...@google.com <mailto:re...@google.com>> wrote: > >> >>>>>>>>>> > >> >>>>>>>>>> I think the short answer is that folks working on the > >> BeamFlink runner have mostly been focused on getting everything > >> working, and so have not dug into this performance too deeply. I > >> suspect that there is low-hanging fruit to optimize as a result. > >> >>>>>>>>>> > >> >>>>>>>>>> You're right that ReduceFnRunner schedules a timer for > >> each element. I think this code dates back to before Beam; on > >> Dataflow timers are identified by tag, so this simply overwrites the > >> existing timer which is very cheap in Dataflow. If it is not cheap > >> on Flink, this might be something to optimize. > >> >>>>>>>>>> > >> >>>>>>>>>> Reuven > >> >>>>>>>>>> > >> >>>>>>>>>> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek > >> <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote: > >> >>>>>>>>>>> > >> >>>>>>>>>>> Hello, > >> >>>>>>>>>>> > >> >>>>>>>>>>> I am interested in any knowledge or thoughts on what > >> should be / is an overhead of running Beam pipelines instead of > >> pipelines written on "bare runner". Is this something which is being > >> tested or investigated by community? Is there a consensus in what > >> bounds should the overhead typically be? I realise this is very > >> runner specific, but certain things are imposed also by SDK model > >> itself. > >> >>>>>>>>>>> > >> >>>>>>>>>>> I tested simple streaming pipeline on Flink vs > >> Beam-Flink and found very noticeable differences. I want to stress > >> out, it was not a performance test. Job does following: > >> >>>>>>>>>>> > >> >>>>>>>>>>> Read Kafka -> Deserialize to Proto -> Filter > >> deserialisation errors -> Reshuffle -> Report counter.inc() to > >> metrics for throughput > >> >>>>>>>>>>> > >> >>>>>>>>>>> Both jobs had same configuration and same state backed > >> with same checkpointing strategy. What I noticed from few simple > >> test runs: > >> >>>>>>>>>>> > >> >>>>>>>>>>> * first run on Flink 1.5.0 from CPU profiles on one > >> worker I have found out that ~50% time was spend either on removing > >> timers from HeapInternalTimerService or in > >> java.io.ByteArrayOutputStream from CoderUtils.clone() > >> >>>>>>>>>>> > >> >>>>>>>>>>> * problem with timer delete was addressed by > >> FLINK-9423. I have retested on Flink 1.7.2 and there was not much > >> time is spend in timer delete now, but root cause was not removed. > >> It still remains that timers are frequently registered and removed ( > >> I believe from ReduceFnRunner.scheduleGarbageCollectionTimer() in > >> which case it is called per processed element? ) which is > >> noticeable in GC activity, Heap and State ... > >> >>>>>>>>>>> > >> >>>>>>>>>>> * in Flink I use FileSystem state backed which keeps > >> state in memory CopyOnWriteStateTable which after some time is full > >> of PaneInfo objects. Maybe they come from PaneInfoTracker activity > >> >>>>>>>>>>> > >> >>>>>>>>>>> * Coder clone is painfull. Pure Flink job does copy > >> between operators too, in my case it is via Kryo.copy() but this is > >> not noticeable in CPU profile. Kryo.copy() does copy on object level > >> not boject -> bytes -> object which is cheaper > >> >>>>>>>>>>> > >> >>>>>>>>>>> Overall, my observation is that pure Flink can be > >> roughly 3x faster. > >> >>>>>>>>>>> > >> >>>>>>>>>>> I do not know what I am trying to achieve here :) > >> Probably just start a discussion and collect thoughts and other > >> experiences on the cost of running some data processing on Beam and > >> particular runner. > >> >>>>>>>>>>> > >> >