All right, I can test it out if I can. How to deploy pipeline on Flink portable runner? Should I follow this to be able to do it? https://beam.apache.org/documentation/runners/flink/
On Tue, Apr 30, 2019 at 4:05 PM Reuven Lax <re...@google.com> wrote: > In that case, Robert's point is quite valid. The old Flink runner I > believe had no knowledge of fusion, which was known to make it extremely > slow. A lot of work went into making the portable runner fusion aware, so > we don't need to round trip through coders on every ParDo. > > Reuven > > On Tue, Apr 30, 2019 at 6:58 AM Jozef Vilcek <jozo.vil...@gmail.com> > wrote: > >> It was not a portable Flink runner. >> >> Thanks all for the thoughts, I will create JIRAs, as suggested, with my >> findings and send them out >> >> On Tue, Apr 30, 2019 at 11:34 AM Reuven Lax <re...@google.com> wrote: >> >>> Jozef did you use the portable Flink runner or the old one? >>> >>> Reuven >>> >>> On Tue, Apr 30, 2019 at 1:03 AM Robert Bradshaw <rober...@google.com> >>> wrote: >>> >>>> Thanks for starting this investigation. As mentioned, most of the work >>>> to date has been on feature parity, not performance parity, but we're >>>> at the point that the latter should be tackled as well. Even if there >>>> is a slight overhead (and there's talk about integrating more deeply >>>> with the Flume DAG that could elide even that) I'd expect it should be >>>> nowhere near the 3x that you're seeing. Aside from the timer issue, >>>> sounds like the cloning via coders is is a huge drag that needs to be >>>> addressed. I wonder if this is one of those cases where using the >>>> portability framework could be a performance win (specifically, no >>>> cloning would happen between operators of fused stages, and the >>>> cloning between operators could be on the raw bytes[] (if needed at >>>> all, because we know they wouldn't be mutated). >>>> >>>> On Tue, Apr 30, 2019 at 12:31 AM Kenneth Knowles <k...@apache.org> >>>> wrote: >>>> > >>>> > Specifically, a lot of shared code assumes that repeatedly setting a >>>> timer is nearly free / the same cost as determining whether or not to set >>>> the timer. ReduceFnRunner has been refactored in a way so it would be very >>>> easy to set the GC timer once per window that occurs in a bundle, but >>>> there's probably some underlying inefficiency around why this isn't cheap >>>> that would be a bigger win. >>>> > >>>> > Kenn >>>> > >>>> > On Mon, Apr 29, 2019 at 10:05 AM Reuven Lax <re...@google.com> wrote: >>>> >> >>>> >> I think the short answer is that folks working on the BeamFlink >>>> runner have mostly been focused on getting everything working, and so have >>>> not dug into this performance too deeply. I suspect that there is >>>> low-hanging fruit to optimize as a result. >>>> >> >>>> >> You're right that ReduceFnRunner schedules a timer for each element. >>>> I think this code dates back to before Beam; on Dataflow timers are >>>> identified by tag, so this simply overwrites the existing timer which is >>>> very cheap in Dataflow. If it is not cheap on Flink, this might be >>>> something to optimize. >>>> >> >>>> >> Reuven >>>> >> >>>> >> On Mon, Apr 29, 2019 at 3:48 AM Jozef Vilcek <jozo.vil...@gmail.com> >>>> wrote: >>>> >>> >>>> >>> Hello, >>>> >>> >>>> >>> I am interested in any knowledge or thoughts on what should be / is >>>> an overhead of running Beam pipelines instead of pipelines written on "bare >>>> runner". Is this something which is being tested or investigated by >>>> community? Is there a consensus in what bounds should the overhead >>>> typically be? I realise this is very runner specific, but certain things >>>> are imposed also by SDK model itself. >>>> >>> >>>> >>> I tested simple streaming pipeline on Flink vs Beam-Flink and found >>>> very noticeable differences. I want to stress out, it was not a performance >>>> test. Job does following: >>>> >>> >>>> >>> Read Kafka -> Deserialize to Proto -> Filter deserialisation errors >>>> -> Reshuffle -> Report counter.inc() to metrics for throughput >>>> >>> >>>> >>> Both jobs had same configuration and same state backed with same >>>> checkpointing strategy. What I noticed from few simple test runs: >>>> >>> >>>> >>> * first run on Flink 1.5.0 from CPU profiles on one worker I have >>>> found out that ~50% time was spend either on removing timers from >>>> HeapInternalTimerService or in java.io.ByteArrayOutputStream from >>>> CoderUtils.clone() >>>> >>> >>>> >>> * problem with timer delete was addressed by FLINK-9423. I have >>>> retested on Flink 1.7.2 and there was not much time is spend in timer >>>> delete now, but root cause was not removed. It still remains that timers >>>> are frequently registered and removed ( I believe from >>>> ReduceFnRunner.scheduleGarbageCollectionTimer() in which case it is called >>>> per processed element? ) which is noticeable in GC activity, Heap and >>>> State ... >>>> >>> >>>> >>> * in Flink I use FileSystem state backed which keeps state in >>>> memory CopyOnWriteStateTable which after some time is full of PaneInfo >>>> objects. Maybe they come from PaneInfoTracker activity >>>> >>> >>>> >>> * Coder clone is painfull. Pure Flink job does copy between >>>> operators too, in my case it is via Kryo.copy() but this is not noticeable >>>> in CPU profile. Kryo.copy() does copy on object level not boject -> bytes >>>> -> object which is cheaper >>>> >>> >>>> >>> Overall, my observation is that pure Flink can be roughly 3x faster. >>>> >>> >>>> >>> I do not know what I am trying to achieve here :) Probably just >>>> start a discussion and collect thoughts and other experiences on the cost >>>> of running some data processing on Beam and particular runner. >>>> >>> >>>> >>>