I tried changing my build locally to 10 seconds and 10,000 elements but it didn't seem to make much of a difference, it still takes a few minutes for elements to begin actually showing up to downstream stages from the Pubsub read. I can see elements being emitted from OutputAndTimeBoundedSplittableProcessElementInvoker, and bundles being committed by ParDoEvaluator.finishBundle, but after that, they seem to just kind of disappear somewhere.
On Wed, Dec 16, 2020 at 4:18 PM Boyuan Zhang <boyu...@google.com> wrote: > Making it as the PipelineOptions was my another proposal but it might take > some time to do so. On the other hand, tuning the number into > something acceptable is low-hanging fruit. > > On Wed, Dec 16, 2020 at 12:48 PM Ismaël Mejía <ieme...@gmail.com> wrote: > >> It sounds reasonable. I am wondering also on the consequence of these >> parameters for other runners (where it is every 10 seconds or 10000 >> elements) + their own configuration e.g. checkpointInterval, >> checkpointTimeoutMillis and minPauseBetweenCheckpoints for Flink. It >> is not clear for me what would be chosen now in this case. >> >> I know we are a bit anti knobs but maybe it makes sense to make this >> configurable via PipelineOptions at least for Direct runner. >> >> On Wed, Dec 16, 2020 at 7:29 PM Boyuan Zhang <boyu...@google.com> wrote: >> > >> > I agree, Ismael. >> > >> > From my current investigation, the performance overhead should majorly >> come from the frequency of checkpoint in >> OutputAndTimeBoundedSplittableProcessElementinvoker[1], which is hardcoded >> in the DirectRunner(every 1 seconds or 100 elements)[2]. I believe >> configuring these numbers on DirectRunner should improve reported cases so >> far. My last proposal was to change the number to every 5 seconds or 10000 >> elements. What do you think? >> > >> > [1] >> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java >> > [2] >> https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178-L181 >> > >> > On Wed, Dec 16, 2020 at 9:02 AM Ismaël Mejía <ieme...@gmail.com> wrote: >> >> >> >> I can guess that the same issues mentioned here probably will affect >> >> the usability for people trying Beam's interactive SQL on Unbounded IO >> >> too. >> >> >> >> We should really take into account that the performance of the SDF >> >> based path should be as good or better than the previous version >> >> before considering its removal (--experiments=use_deprecated_read) and >> >> probably have consensus when this happens. >> >> >> >> >> >> On Fri, Dec 11, 2020 at 11:33 PM Boyuan Zhang <boyu...@google.com> >> wrote: >> >> > >> >> > > From what I've seen, the direct runner initiates a checkpoint >> after every element output. >> >> > That seems like the 1 second limit kicks in before the output >> reaches 100 elements. >> >> > >> >> > I think the original purpose for DirectRunner to use a small limit >> on issuing checkpoint requests is for exercising SDF better in a small data >> set. But it brings overhead on a larger set owing to too many checkpoints. >> It would be ideal to make this limit configurable from pipeline but the >> easiest approach is that we figure out a number for most common cases. Do >> you think we raise the limit to 1000 elements or every 5 seconds will help? >> >> > >> >> > On Fri, Dec 11, 2020 at 2:22 PM Steve Niemitz <sniem...@apache.org> >> wrote: >> >> >> >> >> >> From what I've seen, the direct runner initiates a checkpoint after >> every element output. >> >> >> >> >> >> On Fri, Dec 11, 2020 at 5:19 PM Boyuan Zhang <boyu...@google.com> >> wrote: >> >> >>> >> >> >>> Hi Antonio, >> >> >>> >> >> >>> Thanks for the details! Which version of Beam SDK are you using? >> And are you using --experiments=beam_fn_api with DirectRunner to launch >> your pipeline? >> >> >>> >> >> >>> For ReadFromKafkaDoFn.processElement(), it will take a Kafka >> topic+partition as input element and a KafkaConsumer will be assigned to >> this topic+partition then poll records continuously. The Kafka consumer >> will resume reading and return from the process fn when >> >> >>> >> >> >>> There are no available records currently(this is a feature of SDF >> which calls SDF self-initiated checkpoint) >> >> >>> The OutputAndTimeBoundedSplittableProcessElementInvoker issues >> checkpoint request to ReadFromKafkaDoFn for getting partial results. The >> checkpoint frequency for DirectRunner is every 100 output records or every >> 1 seconds. >> >> >>> >> >> >>> It seems like either the self-initiated checkpoint or DirectRunner >> issued checkpoint gives you the performance regression since there is >> overhead when rescheduling residuals. In your case, it's more like that the >> checkpoint behavior of OutputAndTimeBoundedSplittableProcessElementInvoker >> gives you 200 elements a batch. I want to understand what kind of >> performance regression you are noticing? Is it slower to output the same >> amount of records? >> >> >>> >> >> >>> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si <antonio...@gmail.com> >> wrote: >> >> >>>> >> >> >>>> Hi Boyuan, >> >> >>>> >> >> >>>> This is Antonio. I reported the KafkaIO.read() performance issue >> on the slack channel a few days ago. >> >> >>>> >> >> >>>> I am not sure if this is helpful, but I have been doing some >> debugging on the SDK KafkaIO performance issue for our pipeline and I would >> like to provide some observations. >> >> >>>> >> >> >>>> It looks like in my case the ReadFromKafkaDoFn.processElement() >> was invoked within the same thread and every time kafaconsumer.poll() is >> called, it returns some records, from 1 up to 200 records. So, it will >> proceed to run the pipeline steps. Each kafkaconsumer.poll() takes about >> 0.8ms. So, in this case, the polling and running of the pipeline are >> executed sequentially within a single thread. So, after processing a batch >> of records, it will need to wait for 0.8ms before it can process the next >> batch of records again. >> >> >>>> >> >> >>>> Any suggestions would be appreciated. >> >> >>>> >> >> >>>> Hope that helps. >> >> >>>> >> >> >>>> Thanks and regards, >> >> >>>> >> >> >>>> Antonio. >> >> >>>> >> >> >>>> On 2020/12/04 19:17:46, Boyuan Zhang <boyu...@google.com> wrote: >> >> >>>> > Opened https://issues.apache.org/jira/browse/BEAM-11403 for >> tracking. >> >> >>>> > >> >> >>>> > On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang < >> boyu...@google.com> wrote: >> >> >>>> > >> >> >>>> > > Thanks for the pointer, Steve! I'll check it out. The >> execution paths for >> >> >>>> > > UnboundedSource and SDF wrapper are different. It's highly >> possible that >> >> >>>> > > the regression either comes from the invocation path for SDF >> wrapper, or >> >> >>>> > > the implementation of SDF wrapper itself. >> >> >>>> > > >> >> >>>> > > On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz < >> sniem...@apache.org> wrote: >> >> >>>> > > >> >> >>>> > >> Coincidentally, someone else in the ASF slack mentioned [1] >> yesterday >> >> >>>> > >> that they were seeing significantly reduced performance >> using KafkaIO.Read >> >> >>>> > >> w/ the SDF wrapper vs the unbounded source. They mentioned >> they were using >> >> >>>> > >> flink 1.9. >> >> >>>> > >> >> >> >>>> > >> >> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900 >> >> >>>> > >> >> >> >>>> > >> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang < >> boyu...@google.com> wrote: >> >> >>>> > >> >> >> >>>> > >>> Hi Steve, >> >> >>>> > >>> >> >> >>>> > >>> I think the major performance regression comes from >> >> >>>> > >>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], >> which will >> >> >>>> > >>> checkpoint the DoFn based on time/output limit and use >> timers/state to >> >> >>>> > >>> reschedule works. >> >> >>>> > >>> >> >> >>>> > >>> [1] >> >> >>>> > >>> >> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java >> >> >>>> > >>> >> >> >>>> > >>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz < >> sniem...@apache.org> >> >> >>>> > >>> wrote: >> >> >>>> > >>> >> >> >>>> > >>>> I have a pipeline that reads from pubsub, does some >> aggregation, and >> >> >>>> > >>>> writes to various places. Previously, in older versions >> of beam, when >> >> >>>> > >>>> running this in the DirectRunner, messages would go >> through the pipeline >> >> >>>> > >>>> almost instantly, making it very easy to debug locally, >> etc. >> >> >>>> > >>>> >> >> >>>> > >>>> However, after upgrading to beam 2.25, I noticed that it >> could take on >> >> >>>> > >>>> the order of 5-10 minutes for messages to get from the >> pubsub read step to >> >> >>>> > >>>> the next step in the pipeline (deserializing them, etc). >> The subscription >> >> >>>> > >>>> being read from has on the order of 100,000 elements/sec >> arriving in it. >> >> >>>> > >>>> >> >> >>>> > >>>> Setting --experiments=use_deprecated_read fixes it, and >> makes the >> >> >>>> > >>>> pipeline behave as it did before. >> >> >>>> > >>>> >> >> >>>> > >>>> It seems like the SDF implementation in the DirectRunner >> here is >> >> >>>> > >>>> causing some kind of issue, either buffering a very large >> amount of data >> >> >>>> > >>>> before emitting it in a bundle, or something else. Has >> anyone else run >> >> >>>> > >>>> into this? >> >> >>>> > >>>> >> >> >>>> > >>> >> >> >>>> > >> >