I agree, Ismael. >From my current investigation, the performance overhead should majorly come from the frequency of checkpoint in OutputAndTimeBoundedSplittableProcessElementinvoker[1], which is hardcoded in the DirectRunner(every 1 seconds or 100 elements)[2]. I believe configuring these numbers on DirectRunner should improve reported cases so far. My last proposal was to change the number to every 5 seconds or 10000 elements. What do you think?
[1] https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java [2] https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178-L181 On Wed, Dec 16, 2020 at 9:02 AM Ismaël Mejía <ieme...@gmail.com> wrote: > I can guess that the same issues mentioned here probably will affect > the usability for people trying Beam's interactive SQL on Unbounded IO > too. > > We should really take into account that the performance of the SDF > based path should be as good or better than the previous version > before considering its removal (--experiments=use_deprecated_read) and > probably have consensus when this happens. > > > On Fri, Dec 11, 2020 at 11:33 PM Boyuan Zhang <boyu...@google.com> wrote: > > > > > From what I've seen, the direct runner initiates a checkpoint after > every element output. > > That seems like the 1 second limit kicks in before the output reaches > 100 elements. > > > > I think the original purpose for DirectRunner to use a small limit on > issuing checkpoint requests is for exercising SDF better in a small data > set. But it brings overhead on a larger set owing to too many checkpoints. > It would be ideal to make this limit configurable from pipeline but the > easiest approach is that we figure out a number for most common cases. Do > you think we raise the limit to 1000 elements or every 5 seconds will help? > > > > On Fri, Dec 11, 2020 at 2:22 PM Steve Niemitz <sniem...@apache.org> > wrote: > >> > >> From what I've seen, the direct runner initiates a checkpoint after > every element output. > >> > >> On Fri, Dec 11, 2020 at 5:19 PM Boyuan Zhang <boyu...@google.com> > wrote: > >>> > >>> Hi Antonio, > >>> > >>> Thanks for the details! Which version of Beam SDK are you using? And > are you using --experiments=beam_fn_api with DirectRunner to launch your > pipeline? > >>> > >>> For ReadFromKafkaDoFn.processElement(), it will take a Kafka > topic+partition as input element and a KafkaConsumer will be assigned to > this topic+partition then poll records continuously. The Kafka consumer > will resume reading and return from the process fn when > >>> > >>> There are no available records currently(this is a feature of SDF > which calls SDF self-initiated checkpoint) > >>> The OutputAndTimeBoundedSplittableProcessElementInvoker issues > checkpoint request to ReadFromKafkaDoFn for getting partial results. The > checkpoint frequency for DirectRunner is every 100 output records or every > 1 seconds. > >>> > >>> It seems like either the self-initiated checkpoint or DirectRunner > issued checkpoint gives you the performance regression since there is > overhead when rescheduling residuals. In your case, it's more like that the > checkpoint behavior of OutputAndTimeBoundedSplittableProcessElementInvoker > gives you 200 elements a batch. I want to understand what kind of > performance regression you are noticing? Is it slower to output the same > amount of records? > >>> > >>> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si <antonio...@gmail.com> > wrote: > >>>> > >>>> Hi Boyuan, > >>>> > >>>> This is Antonio. I reported the KafkaIO.read() performance issue on > the slack channel a few days ago. > >>>> > >>>> I am not sure if this is helpful, but I have been doing some > debugging on the SDK KafkaIO performance issue for our pipeline and I would > like to provide some observations. > >>>> > >>>> It looks like in my case the ReadFromKafkaDoFn.processElement() was > invoked within the same thread and every time kafaconsumer.poll() is > called, it returns some records, from 1 up to 200 records. So, it will > proceed to run the pipeline steps. Each kafkaconsumer.poll() takes about > 0.8ms. So, in this case, the polling and running of the pipeline are > executed sequentially within a single thread. So, after processing a batch > of records, it will need to wait for 0.8ms before it can process the next > batch of records again. > >>>> > >>>> Any suggestions would be appreciated. > >>>> > >>>> Hope that helps. > >>>> > >>>> Thanks and regards, > >>>> > >>>> Antonio. > >>>> > >>>> On 2020/12/04 19:17:46, Boyuan Zhang <boyu...@google.com> wrote: > >>>> > Opened https://issues.apache.org/jira/browse/BEAM-11403 for > tracking. > >>>> > > >>>> > On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang <boyu...@google.com> > wrote: > >>>> > > >>>> > > Thanks for the pointer, Steve! I'll check it out. The execution > paths for > >>>> > > UnboundedSource and SDF wrapper are different. It's highly > possible that > >>>> > > the regression either comes from the invocation path for SDF > wrapper, or > >>>> > > the implementation of SDF wrapper itself. > >>>> > > > >>>> > > On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz <sniem...@apache.org> > wrote: > >>>> > > > >>>> > >> Coincidentally, someone else in the ASF slack mentioned [1] > yesterday > >>>> > >> that they were seeing significantly reduced performance using > KafkaIO.Read > >>>> > >> w/ the SDF wrapper vs the unbounded source. They mentioned they > were using > >>>> > >> flink 1.9. > >>>> > >> > >>>> > >> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900 > >>>> > >> > >>>> > >> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang <boyu...@google.com> > wrote: > >>>> > >> > >>>> > >>> Hi Steve, > >>>> > >>> > >>>> > >>> I think the major performance regression comes from > >>>> > >>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], which > will > >>>> > >>> checkpoint the DoFn based on time/output limit and use > timers/state to > >>>> > >>> reschedule works. > >>>> > >>> > >>>> > >>> [1] > >>>> > >>> > https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java > >>>> > >>> > >>>> > >>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz < > sniem...@apache.org> > >>>> > >>> wrote: > >>>> > >>> > >>>> > >>>> I have a pipeline that reads from pubsub, does some > aggregation, and > >>>> > >>>> writes to various places. Previously, in older versions of > beam, when > >>>> > >>>> running this in the DirectRunner, messages would go through > the pipeline > >>>> > >>>> almost instantly, making it very easy to debug locally, etc. > >>>> > >>>> > >>>> > >>>> However, after upgrading to beam 2.25, I noticed that it could > take on > >>>> > >>>> the order of 5-10 minutes for messages to get from the pubsub > read step to > >>>> > >>>> the next step in the pipeline (deserializing them, etc). The > subscription > >>>> > >>>> being read from has on the order of 100,000 elements/sec > arriving in it. > >>>> > >>>> > >>>> > >>>> Setting --experiments=use_deprecated_read fixes it, and makes > the > >>>> > >>>> pipeline behave as it did before. > >>>> > >>>> > >>>> > >>>> It seems like the SDF implementation in the DirectRunner here > is > >>>> > >>>> causing some kind of issue, either buffering a very large > amount of data > >>>> > >>>> before emitting it in a bundle, or something else. Has anyone > else run > >>>> > >>>> into this? > >>>> > >>>> > >>>> > >>> > >>>> > >