Thanks for your explanation, Jan. Now I can see what you mean here. I can try to have a PR to do such optimization. Would you like to share your github ID with me to review the PR later?
On Mon, Dec 21, 2020 at 11:15 AM Robert Bradshaw <[email protected]> wrote: > If readers are expensive to create, this seems like an important (and not > too difficult) optimization. > > On Mon, Dec 21, 2020 at 11:04 AM Jan Lukavský <[email protected]> wrote: > >> Hi Boyuan, >> >> I think your analysis is correct - with one exception. It should be >> possible to reuse the reader if and only if the last taken CheckpointMark >> equals to the new CheckpointMark the reader would be created from. But - >> this equality is on the happy path and should be satisfied for vast >> majority of invocations, so it will spare many call to createReader. >> Actually, it should be non-equal only after recovery from checkpoint, but >> then there should be no reader. So to be technically correct, we should >> keep the last CheckpointMark along with the open reader, but that might >> turn out to be non-necessary (I'm not sure about that and I would >> definitely keep the last CheckpointMark, because it is better safe than >> sorry :)) >> >> Jan >> On 12/21/20 7:54 PM, Boyuan Zhang wrote: >> >> Hi Jan, >>> >>> it seems that what we would want is to couple the lifecycle of the >>> Reader not with the restriction but with the particular instance of >>> (Un)boundedSource (after being split). That could be done in the processing >>> DoFn, if it contained a cache mapping instance of the source to the >>> (possibly null - i.e. not yet open) reader. In @NewTracker we could assign >>> (or create) the reader to the tracker, as the tracker is created for each >>> restriction. >>> >>> WDYT? >>> >> I was thinking about this but it seems like it is not applicable to the >> way how UnboundedSource and UnboundedReader work together. >> Please correct me if I'm wrong. The UnboundedReader is created from >> UnboundedSource per CheckpointMark[1], which means for certain sources, the >> CheckpointMark could affect some attributes like start position of the >> reader when resuming. So a single UnboundedSource could be mapped to >> multiple readers because of different instances of CheckpointMarl. That's >> also the reason why we use CheckpointMark as the restriction. >> >> Please let me know if I misunderstand your suggestion. >> >> [1] >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L73-L78 >> >> On Mon, Dec 21, 2020 at 9:18 AM Antonio Si <[email protected]> wrote: >> >>> Hi Boyuan, >>> >>> Sorry for my late reply. I was off for a few days. >>> >>> I didn't use DirectRunner. I am using FlinkRunner. >>> >>> We measured the number of Kafka messages that we can processed per >>> second. >>> With Beam v2.26 with --experiments=use_deprecated_read and >>> --fasterCopy=true, >>> we are able to consume 13K messages per second, but with Beam v2.26 >>> without the use_deprecated_read option, we are only able to process 10K >>> messages >>> per second for the same pipeline. >>> >>> Thanks and regards, >>> >>> Antonio. >>> >>> On 2020/12/11 22:19:40, Boyuan Zhang <[email protected]> wrote: >>> > Hi Antonio, >>> > >>> > Thanks for the details! Which version of Beam SDK are you using? And >>> are >>> > you using --experiments=beam_fn_api with DirectRunner to launch your >>> > pipeline? >>> > >>> > For ReadFromKafkaDoFn.processElement(), it will take a Kafka >>> > topic+partition as input element and a KafkaConsumer will be assigned >>> to >>> > this topic+partition then poll records continuously. The Kafka consumer >>> > will resume reading and return from the process fn when >>> > >>> > - There are no available records currently(this is a feature of SDF >>> > which calls SDF self-initiated checkpoint) >>> > - The OutputAndTimeBoundedSplittableProcessElementInvoker issues >>> > checkpoint request to ReadFromKafkaDoFn for getting partial >>> results. The >>> > checkpoint frequency for DirectRunner is every 100 output records >>> or every >>> > 1 seconds. >>> > >>> > It seems like either the self-initiated checkpoint or DirectRunner >>> issued >>> > checkpoint gives you the performance regression since there is overhead >>> > when rescheduling residuals. In your case, it's more like that the >>> > checkpoint behavior of >>> OutputAndTimeBoundedSplittableProcessElementInvoker >>> > gives you 200 elements a batch. I want to understand what kind of >>> > performance regression you are noticing? Is it slower to output the >>> same >>> > amount of records? >>> > >>> > On Fri, Dec 11, 2020 at 1:31 PM Antonio Si <[email protected]> >>> wrote: >>> > >>> > > Hi Boyuan, >>> > > >>> > > This is Antonio. I reported the KafkaIO.read() performance issue on >>> the >>> > > slack channel a few days ago. >>> > > >>> > > I am not sure if this is helpful, but I have been doing some >>> debugging on >>> > > the SDK KafkaIO performance issue for our pipeline and I would like >>> to >>> > > provide some observations. >>> > > >>> > > It looks like in my case the ReadFromKafkaDoFn.processElement() was >>> > > invoked within the same thread and every time kafaconsumer.poll() is >>> > > called, it returns some records, from 1 up to 200 records. So, it >>> will >>> > > proceed to run the pipeline steps. Each kafkaconsumer.poll() takes >>> about >>> > > 0.8ms. So, in this case, the polling and running of the pipeline are >>> > > executed sequentially within a single thread. So, after processing a >>> batch >>> > > of records, it will need to wait for 0.8ms before it can process the >>> next >>> > > batch of records again. >>> > > >>> > > Any suggestions would be appreciated. >>> > > >>> > > Hope that helps. >>> > > >>> > > Thanks and regards, >>> > > >>> > > Antonio. >>> > > >>> > > On 2020/12/04 19:17:46, Boyuan Zhang <[email protected]> wrote: >>> > > > Opened https://issues.apache.org/jira/browse/BEAM-11403 for >>> tracking. >>> > > > >>> > > > On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang <[email protected]> >>> wrote: >>> > > > >>> > > > > Thanks for the pointer, Steve! I'll check it out. The execution >>> paths >>> > > for >>> > > > > UnboundedSource and SDF wrapper are different. It's highly >>> possible >>> > > that >>> > > > > the regression either comes from the invocation path for SDF >>> wrapper, >>> > > or >>> > > > > the implementation of SDF wrapper itself. >>> > > > > >>> > > > > On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz < >>> [email protected]> >>> > > wrote: >>> > > > > >>> > > > >> Coincidentally, someone else in the ASF slack mentioned [1] >>> yesterday >>> > > > >> that they were seeing significantly reduced performance using >>> > > KafkaIO.Read >>> > > > >> w/ the SDF wrapper vs the unbounded source. They mentioned >>> they were >>> > > using >>> > > > >> flink 1.9. >>> > > > >> >>> > > > >> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900 >>> > > > >> >>> > > > >> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang <[email protected] >>> > >>> > > wrote: >>> > > > >> >>> > > > >>> Hi Steve, >>> > > > >>> >>> > > > >>> I think the major performance regression comes from >>> > > > >>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], which >>> will >>> > > > >>> checkpoint the DoFn based on time/output limit and use >>> timers/state >>> > > to >>> > > > >>> reschedule works. >>> > > > >>> >>> > > > >>> [1] >>> > > > >>> >>> > > >>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java >>> > > > >>> >>> > > > >>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz < >>> [email protected]> >>> > > > >>> wrote: >>> > > > >>> >>> > > > >>>> I have a pipeline that reads from pubsub, does some >>> aggregation, and >>> > > > >>>> writes to various places. Previously, in older versions of >>> beam, >>> > > when >>> > > > >>>> running this in the DirectRunner, messages would go through >>> the >>> > > pipeline >>> > > > >>>> almost instantly, making it very easy to debug locally, etc. >>> > > > >>>> >>> > > > >>>> However, after upgrading to beam 2.25, I noticed that it >>> could take >>> > > on >>> > > > >>>> the order of 5-10 minutes for messages to get from the pubsub >>> read >>> > > step to >>> > > > >>>> the next step in the pipeline (deserializing them, etc). The >>> > > subscription >>> > > > >>>> being read from has on the order of 100,000 elements/sec >>> arriving >>> > > in it. >>> > > > >>>> >>> > > > >>>> Setting --experiments=use_deprecated_read fixes it, and makes >>> the >>> > > > >>>> pipeline behave as it did before. >>> > > > >>>> >>> > > > >>>> It seems like the SDF implementation in the DirectRunner here >>> is >>> > > > >>>> causing some kind of issue, either buffering a very large >>> amount of >>> > > data >>> > > > >>>> before emitting it in a bundle, or something else. Has >>> anyone else >>> > > run >>> > > > >>>> into this? >>> > > > >>>> >>> > > > >>> >>> > > > >>> > > >>> > >>> >>
