> From what I've seen, the direct runner initiates a checkpoint after every element output. That seems like the 1 second limit kicks in before the output reaches 100 elements.
I think the original purpose for DirectRunner to use a small limit on issuing checkpoint requests is for exercising SDF better in a small data set. But it brings overhead on a larger set owing to too many checkpoints. It would be ideal to make this limit configurable from pipeline but the easiest approach is that we figure out a number for most common cases. Do you think we raise the limit to 1000 elements or every 5 seconds will help? On Fri, Dec 11, 2020 at 2:22 PM Steve Niemitz <[email protected]> wrote: > From what I've seen, the direct runner initiates a checkpoint after every > element output. > > On Fri, Dec 11, 2020 at 5:19 PM Boyuan Zhang <[email protected]> wrote: > >> Hi Antonio, >> >> Thanks for the details! Which version of Beam SDK are you using? And are >> you using --experiments=beam_fn_api with DirectRunner to launch your >> pipeline? >> >> For ReadFromKafkaDoFn.processElement(), it will take a Kafka >> topic+partition as input element and a KafkaConsumer will be assigned to >> this topic+partition then poll records continuously. The Kafka consumer >> will resume reading and return from the process fn when >> >> - There are no available records currently(this is a feature of SDF >> which calls SDF self-initiated checkpoint) >> - The OutputAndTimeBoundedSplittableProcessElementInvoker issues >> checkpoint request to ReadFromKafkaDoFn for getting partial results. The >> checkpoint frequency for DirectRunner is every 100 output records or every >> 1 seconds. >> >> It seems like either the self-initiated checkpoint or DirectRunner issued >> checkpoint gives you the performance regression since there is overhead >> when rescheduling residuals. In your case, it's more like that the >> checkpoint behavior of OutputAndTimeBoundedSplittableProcessElementInvoker >> gives you 200 elements a batch. I want to understand what kind of >> performance regression you are noticing? Is it slower to output the same >> amount of records? >> >> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si <[email protected]> wrote: >> >>> Hi Boyuan, >>> >>> This is Antonio. I reported the KafkaIO.read() performance issue on the >>> slack channel a few days ago. >>> >>> I am not sure if this is helpful, but I have been doing some debugging >>> on the SDK KafkaIO performance issue for our pipeline and I would like to >>> provide some observations. >>> >>> It looks like in my case the ReadFromKafkaDoFn.processElement() was >>> invoked within the same thread and every time kafaconsumer.poll() is >>> called, it returns some records, from 1 up to 200 records. So, it will >>> proceed to run the pipeline steps. Each kafkaconsumer.poll() takes about >>> 0.8ms. So, in this case, the polling and running of the pipeline are >>> executed sequentially within a single thread. So, after processing a batch >>> of records, it will need to wait for 0.8ms before it can process the next >>> batch of records again. >>> >>> Any suggestions would be appreciated. >>> >>> Hope that helps. >>> >>> Thanks and regards, >>> >>> Antonio. >>> >>> On 2020/12/04 19:17:46, Boyuan Zhang <[email protected]> wrote: >>> > Opened https://issues.apache.org/jira/browse/BEAM-11403 for tracking. >>> > >>> > On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang <[email protected]> >>> wrote: >>> > >>> > > Thanks for the pointer, Steve! I'll check it out. The execution >>> paths for >>> > > UnboundedSource and SDF wrapper are different. It's highly possible >>> that >>> > > the regression either comes from the invocation path for SDF >>> wrapper, or >>> > > the implementation of SDF wrapper itself. >>> > > >>> > > On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz <[email protected]> >>> wrote: >>> > > >>> > >> Coincidentally, someone else in the ASF slack mentioned [1] >>> yesterday >>> > >> that they were seeing significantly reduced performance using >>> KafkaIO.Read >>> > >> w/ the SDF wrapper vs the unbounded source. They mentioned they >>> were using >>> > >> flink 1.9. >>> > >> >>> > >> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900 >>> > >> >>> > >> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang <[email protected]> >>> wrote: >>> > >> >>> > >>> Hi Steve, >>> > >>> >>> > >>> I think the major performance regression comes from >>> > >>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], which will >>> > >>> checkpoint the DoFn based on time/output limit and use >>> timers/state to >>> > >>> reschedule works. >>> > >>> >>> > >>> [1] >>> > >>> >>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java >>> > >>> >>> > >>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz <[email protected]> >>> > >>> wrote: >>> > >>> >>> > >>>> I have a pipeline that reads from pubsub, does some aggregation, >>> and >>> > >>>> writes to various places. Previously, in older versions of beam, >>> when >>> > >>>> running this in the DirectRunner, messages would go through the >>> pipeline >>> > >>>> almost instantly, making it very easy to debug locally, etc. >>> > >>>> >>> > >>>> However, after upgrading to beam 2.25, I noticed that it could >>> take on >>> > >>>> the order of 5-10 minutes for messages to get from the pubsub >>> read step to >>> > >>>> the next step in the pipeline (deserializing them, etc). The >>> subscription >>> > >>>> being read from has on the order of 100,000 elements/sec arriving >>> in it. >>> > >>>> >>> > >>>> Setting --experiments=use_deprecated_read fixes it, and makes the >>> > >>>> pipeline behave as it did before. >>> > >>>> >>> > >>>> It seems like the SDF implementation in the DirectRunner here is >>> > >>>> causing some kind of issue, either buffering a very large amount >>> of data >>> > >>>> before emitting it in a bundle, or something else. Has anyone >>> else run >>> > >>>> into this? >>> > >>>> >>> > >>> >>> > >>> >>
