Boyuan your suggestion seems at odds with Jan's. Are you saying it *is* necessary to close the reader on checkpoint, so the only solution is to reduce checkpoint frequency?
On Thu, Dec 17, 2020 at 10:39 AM Boyuan Zhang <[email protected]> wrote: > Thanks for your investigation, Steve! It seems like preventing the > checkpoint from happening so frequently would be one workaround for you. > Making the checkpoint frequency configurable from pipeline option seems > like the way to go. > > On Thu, Dec 17, 2020 at 7:35 AM Jan Lukavský <[email protected]> wrote: > >> Hi Steve, >> >> I didn't mean we should deliberately make DirectRunner slow, or we should >> not fix performance issues, if can be fixed. What I meant was that if we >> are to choose between short checkpoint time (and few elements processed >> before checkpoint is taken) or performance, we should prefer better tested >> code, in this particular case. >> >> > After a bunch of debugging, I think I finally figured out what the >> problem is though. During a checkpoint (in trySplit), the >> UnboundedSourceViaSDF wrapper will close the current source reader and >> create a new one. >> >> That is actually a great example. The problem should be fixed there (the >> reader probably need not to be closed on checkpoint). And it is >> DirectRunner that manifested this, due to short checkpointing. >> >> Jan >> On 12/17/20 4:14 PM, Steve Niemitz wrote: >> >> > Primary purpose of DirectRunner is testing, not performance >> >> That's one argument, but it's very difficult to effectively test a >> pipeline when I need to wait 15+ minutes for the first element to go >> through it. I also, disagree in general that we shouldn't care about the >> performance of the DirectRunner. It's likely the first runner new users of >> beam try (I know it was for us), and if it doesn't provide enough >> performance to actually run a representative pipeline, users may >> extrapolate that performance onto other runners (I know we did). >> Anecdotally, the fact that the DirectRunner didn't work for some of our >> initial test pipelines (because of performance problems) probably delayed >> our adoption of beam by at least 6 months. >> >> > Steve, based on your findings, it seems like it takes more time for the >> SDF pipeline to actually start to read from PubSub and more time to output >> records. >> >> Pubsub reads start ~instantly. but I'm not able to see any elements >> actually output from it for a LONG time, sometimes 30+ minutes. I see the >> reader acking back to pubsub, so it IS committing, but no elements output. >> >> After a bunch of debugging, I think I finally figured out what the >> problem is though. During a checkpoint (in trySplit), the >> UnboundedSourceViaSDF wrapper will close the current source reader and >> create a new one. The problem is, the pubsub reader needs some time to >> correctly estimate it's watermark [1], and because it gets closed and >> recreated so frequently due to checkpointing (either number of elements, or >> duration), it can never actually provide accurate estimates, and always >> returns the min watermark. This seems like it prevents some internal >> timers from ever firing, effectively holding all elements in the pipeline >> state. I can confirm this also by looking at WatermarkManager, where I see >> all the bundles pending. >> >> [1] >> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L959 >> >> On Thu, Dec 17, 2020 at 9:43 AM Jan Lukavský <[email protected]> wrote: >> >>> Hi Ismaël, >>> >>> what I meant by the performance vs. testing argument is that when >>> choosing default values for certain (possibly configurable) options, we >>> should prefer choices that result in better tested code, not better >>> performance. DirectRunner actually does quite many things that are >>> suboptimal performance-wise, but are good to be done for test purposes >>> (immutability checks, as an example). >>> >>> Regarding SDF in general, I can confirm we see some issues with Flink, >>> most recently [1] (which I'm trying to fix right now). That is actually >>> correctness, not performance issue. I personally didn't notice any >>> performance issues, so far. >>> >>> Jan >>> >>> [1] https://issues.apache.org/jira/browse/BEAM-11481 >>> >>> On 12/17/20 3:24 PM, Ismaël Mejía wrote: >>> > The influence of checkpointing on the output of the results should be >>> > minimal in particular for Direct Runner. It seems what Steve reports >>> > here seems to be something different. Jan have you or others already >>> > checked the influence of this on Flink who is now using this new >>> > translation path? >>> > >>> > I think the argument that the Direct runner is mostly about testing >>> > and not about performance is an argument that is playing bad on Beam, >>> > one should not necessarily exclude the other. Direct runner is our >>> > most used runner, basically every Beam user relies on the direct >>> > runners so every regression or improvement on it affects everyone, but >>> > well that's a subject worth its own thread. >>> > >>> > On Thu, Dec 17, 2020 at 10:55 AM Jan Lukavský <[email protected]> wrote: >>> >> Hi, >>> >> >>> >> from my point of view the number in DirectRunner are set correctly. >>> Primary purpose of DirectRunner is testing, not performance, so >>> DirectRunner makes intentionally frequent checkpoints to easily exercise >>> potential bugs in user code. It might be possible to make the frequency >>> configurable, though. >>> >> >>> >> Jan >>> >> >>> >> On 12/17/20 12:20 AM, Boyuan Zhang wrote: >>> >> >>> >> It's not a portable execution on DirectRunner so I would expect that >>> outputs from OutputAndTimeBoundedSplittableProcessElementInvoker should be >>> emitted immediately. For SDF execution on DirectRunner, the overhead could >>> come from the SDF expansion, SDF wrapper and the invoker. >>> >> >>> >> Steve, based on your findings, it seems like it takes more time for >>> the SDF pipeline to actually start to read from PubSub and more time to >>> output records. Are you able to tell how much time each part is taking? >>> >> >>> >> On Wed, Dec 16, 2020 at 1:53 PM Robert Bradshaw <[email protected]> >>> wrote: >>> >>> If all it takes is bumping these numbers up a bit, that seems like a >>> reasonable thing to do ASAP. (I would argue that perhaps they shouldn't be >>> static, e.g. it might be preferable to start emitting results right away, >>> but use larger batches for the steady state if there are performance >>> benefits.) >>> >>> >>> >>> That being said, it sounds like there's something deeper going on >>> here. We should also verify that this performance impact is limited to the >>> direct runner. >>> >>> >>> >>> On Wed, Dec 16, 2020 at 1:36 PM Steve Niemitz <[email protected]> >>> wrote: >>> >>>> I tried changing my build locally to 10 seconds and 10,000 elements >>> but it didn't seem to make much of a difference, it still takes a few >>> minutes for elements to begin actually showing up to downstream stages from >>> the Pubsub read. I can see elements being emitted from >>> OutputAndTimeBoundedSplittableProcessElementInvoker, and bundles being >>> committed by ParDoEvaluator.finishBundle, but after that, they seem to just >>> kind of disappear somewhere. >>> >>>> >>> >>>> On Wed, Dec 16, 2020 at 4:18 PM Boyuan Zhang <[email protected]> >>> wrote: >>> >>>>> Making it as the PipelineOptions was my another proposal but it >>> might take some time to do so. On the other hand, tuning the number into >>> something acceptable is low-hanging fruit. >>> >>>>> >>> >>>>> On Wed, Dec 16, 2020 at 12:48 PM Ismaël Mejía <[email protected]> >>> wrote: >>> >>>>>> It sounds reasonable. I am wondering also on the consequence of >>> these >>> >>>>>> parameters for other runners (where it is every 10 seconds or >>> 10000 >>> >>>>>> elements) + their own configuration e.g. checkpointInterval, >>> >>>>>> checkpointTimeoutMillis and minPauseBetweenCheckpoints for Flink. >>> It >>> >>>>>> is not clear for me what would be chosen now in this case. >>> >>>>>> >>> >>>>>> I know we are a bit anti knobs but maybe it makes sense to make >>> this >>> >>>>>> configurable via PipelineOptions at least for Direct runner. >>> >>>>>> >>> >>>>>> On Wed, Dec 16, 2020 at 7:29 PM Boyuan Zhang <[email protected]> >>> wrote: >>> >>>>>>> I agree, Ismael. >>> >>>>>>> >>> >>>>>>> From my current investigation, the performance overhead should >>> majorly come from the frequency of checkpoint in >>> OutputAndTimeBoundedSplittableProcessElementinvoker[1], which is hardcoded >>> in the DirectRunner(every 1 seconds or 100 elements)[2]. I believe >>> configuring these numbers on DirectRunner should improve reported cases so >>> far. My last proposal was to change the number to every 5 seconds or 10000 >>> elements. What do you think? >>> >>>>>>> >>> >>>>>>> [1] >>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java >>> >>>>>>> [2] >>> https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178-L181 >>> >>>>>>> >>> >>>>>>> On Wed, Dec 16, 2020 at 9:02 AM Ismaël Mejía <[email protected]> >>> wrote: >>> >>>>>>>> I can guess that the same issues mentioned here probably will >>> affect >>> >>>>>>>> the usability for people trying Beam's interactive SQL on >>> Unbounded IO >>> >>>>>>>> too. >>> >>>>>>>> >>> >>>>>>>> We should really take into account that the performance of the >>> SDF >>> >>>>>>>> based path should be as good or better than the previous version >>> >>>>>>>> before considering its removal >>> (--experiments=use_deprecated_read) and >>> >>>>>>>> probably have consensus when this happens. >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> On Fri, Dec 11, 2020 at 11:33 PM Boyuan Zhang < >>> [email protected]> wrote: >>> >>>>>>>>>> From what I've seen, the direct runner initiates a >>> checkpoint after every element output. >>> >>>>>>>>> That seems like the 1 second limit kicks in before the output >>> reaches 100 elements. >>> >>>>>>>>> >>> >>>>>>>>> I think the original purpose for DirectRunner to use a small >>> limit on issuing checkpoint requests is for exercising SDF better in a >>> small data set. But it brings overhead on a larger set owing to too many >>> checkpoints. It would be ideal to make this limit configurable from >>> pipeline but the easiest approach is that we figure out a number for most >>> common cases. Do you think we raise the limit to 1000 elements or every 5 >>> seconds will help? >>> >>>>>>>>> >>> >>>>>>>>> On Fri, Dec 11, 2020 at 2:22 PM Steve Niemitz < >>> [email protected]> wrote: >>> >>>>>>>>>> From what I've seen, the direct runner initiates a >>> checkpoint after every element output. >>> >>>>>>>>>> >>> >>>>>>>>>> On Fri, Dec 11, 2020 at 5:19 PM Boyuan Zhang < >>> [email protected]> wrote: >>> >>>>>>>>>>> Hi Antonio, >>> >>>>>>>>>>> >>> >>>>>>>>>>> Thanks for the details! Which version of Beam SDK are you >>> using? And are you using --experiments=beam_fn_api with DirectRunner to >>> launch your pipeline? >>> >>>>>>>>>>> >>> >>>>>>>>>>> For ReadFromKafkaDoFn.processElement(), it will take a Kafka >>> topic+partition as input element and a KafkaConsumer will be assigned to >>> this topic+partition then poll records continuously. The Kafka consumer >>> will resume reading and return from the process fn when >>> >>>>>>>>>>> >>> >>>>>>>>>>> There are no available records currently(this is a feature >>> of SDF which calls SDF self-initiated checkpoint) >>> >>>>>>>>>>> The OutputAndTimeBoundedSplittableProcessElementInvoker >>> issues checkpoint request to ReadFromKafkaDoFn for getting partial results. >>> The checkpoint frequency for DirectRunner is every 100 output records or >>> every 1 seconds. >>> >>>>>>>>>>> >>> >>>>>>>>>>> It seems like either the self-initiated checkpoint or >>> DirectRunner issued checkpoint gives you the performance regression since >>> there is overhead when rescheduling residuals. In your case, it's more like >>> that the checkpoint behavior of >>> OutputAndTimeBoundedSplittableProcessElementInvoker gives you 200 elements >>> a batch. I want to understand what kind of performance regression you are >>> noticing? Is it slower to output the same amount of records? >>> >>>>>>>>>>> >>> >>>>>>>>>>> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si < >>> [email protected]> wrote: >>> >>>>>>>>>>>> Hi Boyuan, >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> This is Antonio. I reported the KafkaIO.read() performance >>> issue on the slack channel a few days ago. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> I am not sure if this is helpful, but I have been doing >>> some debugging on the SDK KafkaIO performance issue for our pipeline and I >>> would like to provide some observations. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> It looks like in my case the >>> ReadFromKafkaDoFn.processElement() was invoked within the same thread and >>> every time kafaconsumer.poll() is called, it returns some records, from 1 >>> up to 200 records. So, it will proceed to run the pipeline steps. Each >>> kafkaconsumer.poll() takes about 0.8ms. So, in this case, the polling and >>> running of the pipeline are executed sequentially within a single thread. >>> So, after processing a batch of records, it will need to wait for 0.8ms >>> before it can process the next batch of records again. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> Any suggestions would be appreciated. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> Hope that helps. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> Thanks and regards, >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> Antonio. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> On 2020/12/04 19:17:46, Boyuan Zhang <[email protected]> >>> wrote: >>> >>>>>>>>>>>>> Opened https://issues.apache.org/jira/browse/BEAM-11403 >>> for tracking. >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang < >>> [email protected]> wrote: >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>>> Thanks for the pointer, Steve! I'll check it out. The >>> execution paths for >>> >>>>>>>>>>>>>> UnboundedSource and SDF wrapper are different. It's >>> highly possible that >>> >>>>>>>>>>>>>> the regression either comes from the invocation path for >>> SDF wrapper, or >>> >>>>>>>>>>>>>> the implementation of SDF wrapper itself. >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz < >>> [email protected]> wrote: >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> Coincidentally, someone else in the ASF slack mentioned >>> [1] yesterday >>> >>>>>>>>>>>>>>> that they were seeing significantly reduced performance >>> using KafkaIO.Read >>> >>>>>>>>>>>>>>> w/ the SDF wrapper vs the unbounded source. They >>> mentioned they were using >>> >>>>>>>>>>>>>>> flink 1.9. >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> >>> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900 >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang < >>> [email protected]> wrote: >>> >>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> Hi Steve, >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> I think the major performance regression comes from >>> >>>>>>>>>>>>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], >>> which will >>> >>>>>>>>>>>>>>>> checkpoint the DoFn based on time/output limit and use >>> timers/state to >>> >>>>>>>>>>>>>>>> reschedule works. >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> [1] >>> >>>>>>>>>>>>>>>> >>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz < >>> [email protected]> >>> >>>>>>>>>>>>>>>> wrote: >>> >>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> I have a pipeline that reads from pubsub, does some >>> aggregation, and >>> >>>>>>>>>>>>>>>>> writes to various places. Previously, in older >>> versions of beam, when >>> >>>>>>>>>>>>>>>>> running this in the DirectRunner, messages would go >>> through the pipeline >>> >>>>>>>>>>>>>>>>> almost instantly, making it very easy to debug >>> locally, etc. >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> However, after upgrading to beam 2.25, I noticed that >>> it could take on >>> >>>>>>>>>>>>>>>>> the order of 5-10 minutes for messages to get from the >>> pubsub read step to >>> >>>>>>>>>>>>>>>>> the next step in the pipeline (deserializing them, >>> etc). The subscription >>> >>>>>>>>>>>>>>>>> being read from has on the order of 100,000 >>> elements/sec arriving in it. >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> Setting --experiments=use_deprecated_read fixes it, >>> and makes the >>> >>>>>>>>>>>>>>>>> pipeline behave as it did before. >>> >>>>>>>>>>>>>>>>> >>> >>>>>>>>>>>>>>>>> It seems like the SDF implementation in the >>> DirectRunner here is >>> >>>>>>>>>>>>>>>>> causing some kind of issue, either buffering a very >>> large amount of data >>> >>>>>>>>>>>>>>>>> before emitting it in a bundle, or something else. >>> Has anyone else run >>> >>>>>>>>>>>>>>>>> into this? >>> >>>>>>>>>>>>>>>>> >>> >>
