Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2021-01-12 Thread Boyuan Zhang
Hi, I proposed to make runner-issue checkpoint frequency configurable for a pipeline author here: https://docs.google.com/document/d/18jNLtTyyApx0N2ytp1ytOMmUPLouj2h08N3-4SyWGgQ/edit?usp=sharing. I believe it will also be helpful for the performance issue. Please feel free to drop any comments

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2021-01-06 Thread Jan Lukavský
Sorry for the typo in your name. :-) On 1/6/21 10:11 AM, Jan Lukavský wrote: Hi Antonie, yes, for instance. I'd just like to rule out possibility that a single DoFn processing multiple partitions (restrictions) brings some overhead in your case. Jan On 12/31/20 10:36 PM, Antonio Si wrote:

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2021-01-06 Thread Jan Lukavský
Hi Antonie, yes, for instance. I'd just like to rule out possibility that a single DoFn processing multiple partitions (restrictions) brings some overhead in your case. Jan On 12/31/20 10:36 PM, Antonio Si wrote: Hi Jan, Sorry for the late reply. My topic has 180 partitions. Do you mean

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2021-01-05 Thread Antonio Si
Hi Jan, Sorry for the late reply. My topic has 180 partitions. Do you mean run with a parallelism set to 900? Thanks. Antonio. On 2020/12/23 20:30:34, Jan Lukavský wrote: > OK, > > could you make an experiment and increase the parallelism to something > significantly higher than the total

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-28 Thread Boyuan Zhang
Hi Steve, We have one wrapper optimization[1] merged in and it will be released with 2.27.0. Would you like to verify whether it helps improve the performance on DirectRunner? [1] https://github.com/apache/beam/pull/13592 On Mon, Dec 28, 2020 at 12:17 PM Boyuan Zhang wrote: > Hi Antonio, > >

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-28 Thread Boyuan Zhang
Hi Antonio, Thanks for the data! I want to elaborate more on where the overhead could come from when on Flink. - with --experiments=use_deprecated_read --fasterrCopy=true, I am able to > achieve 13K TPS This execution uses UnboundedSource path, where the checkpoint frequency for source

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-23 Thread Jan Lukavský
OK, could you make an experiment and increase the parallelism to something significantly higher than the total number of partitions? Say 5 times higher? Would that have impact on throughput in your case? Jan On 12/23/20 7:03 PM, Antonio Si wrote: Hi Jan, The performance data that I

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-23 Thread Antonio Si
Hi Jan, The performance data that I reported was run with parallelism = 8. We also ran with parallelism = 15 and we observed similar behaviors although I don't have the exact numbers. I can get you the numbers if needed. Regarding number of partitions, since we have multiple topics, the number

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-23 Thread Jan Lukavský
Hi Antonio, can you please clarify a few things:  a) what parallelism you use for your sources  b) how many partitions there is in your topic(s) Thanks,  Jan On 12/22/20 10:07 PM, Antonio Si wrote: Hi Boyuan, Let me clarify, I have tried with and without using

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-22 Thread Antonio Si
Hi Boyuan, Let me clarify, I have tried with and without using --experiments=beam_fn_api,use_sdf_kafka_read option: - with --experiments=use_deprecated_read --fasterrCopy=true, I am able to achieve 13K TPS - with --experiments="beam_fn_api,use_sdf_kafka_read" --fasterCopy=true, I am able to

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-21 Thread Boyuan Zhang
Hi Antonio, I'm getting one more question for your Kafka experiment on FlinkRunner. I'm wondering what your checkpoint interval is for your flink application. The reason why I ask is that IIUC, creating connections in Kafka should be really cheap. So I would imagine the overhead here should be

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-21 Thread Jan Lukavský
Sure. My ID is je-ik. Thanks,  Jan On 12/21/20 8:43 PM, Boyuan Zhang wrote: Thanks for your explanation, Jan. Now I can see what you mean here. I can try to have a PR to do such optimization. Would you like to share your github ID with me to review the PR later? On Mon, Dec 21, 2020 at

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-21 Thread Boyuan Zhang
Thanks for your explanation, Jan. Now I can see what you mean here. I can try to have a PR to do such optimization. Would you like to share your github ID with me to review the PR later? On Mon, Dec 21, 2020 at 11:15 AM Robert Bradshaw wrote: > If readers are expensive to create, this seems

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-21 Thread Robert Bradshaw
If readers are expensive to create, this seems like an important (and not too difficult) optimization. On Mon, Dec 21, 2020 at 11:04 AM Jan Lukavský wrote: > Hi Boyuan, > > I think your analysis is correct - with one exception. It should be > possible to reuse the reader if and only if the

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-21 Thread Jan Lukavský
Hi Boyuan, I think your analysis is correct - with one exception. It should be possible to reuse the reader if and only if the last taken CheckpointMark equals to the new CheckpointMark the reader would be created from. But - this equality is on the happy path and should be satisfied for

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-21 Thread Antonio Si
Hi Boyuan, Sorry for my late reply. I was off for a few days. I didn't use DirectRunner. I am using FlinkRunner. We measured the number of Kafka messages that we can processed per second. With Beam v2.26 with --experiments=use_deprecated_read and --fasterCopy=true, we are able to consume 13K

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-18 Thread Jan Lukavský
Hi Boyuan, I understand, that this could be difficult in the current implementation, my intent was just to point out that this should be possible, even in the general case. From the top of my head (and I didn't walk this though entirely, so please don't take me too literaly), it seems that

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-17 Thread Boyuan Zhang
Hi Jan, I'm not saying that it's completely impossible to do so but I want to explain why it's hard to apply these changes to existing SDF wrapper. In the current SDF UnboundedSource wrapper[1], the restriction is the . The reader is binded to the UnboundedSourceAsSDFRestrictionTracker[2]. The

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-17 Thread Jan Lukavský
Hi Boyuan, > Several changes could be made into PubSub SDF implementation specially. For example, the PuSub SDF can choose not respond to the checkpoint request when it thinks it's not a good time to do so. Besides, if the expensive connection can be binded to the lifecycle of the SDF

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-17 Thread Boyuan Zhang
Hi Jan, thanks for the quick followup! I'm not sure if I see the difference between writing a "native" SDF for > PubSub source and the UnboundedSource wrapper. With regards to the relation > between reader and checkpoint, wouldn't the native implementation be at the > same position? Several

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-17 Thread Jan Lukavský
Hi Boyuan, I'm not sure if I see the difference between writing a "native" SDF for PubSub source and the UnboundedSource wrapper. With regards to the relation between reader and checkpoint, wouldn't the native implementation be at the same position? In my point of view, the decision to

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-17 Thread Boyuan Zhang
Sorry for the confusion. > Are you saying it *is* necessary to close the reader on checkpoint, so > the only solution is to reduce checkpoint frequency? In the PubSub on DirectRunner with SDF wrapper case, my answer is yes based on my understanding. Closing the reader during checkpoint is the

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-17 Thread Brian Hulette
Boyuan your suggestion seems at odds with Jan's. Are you saying it *is* necessary to close the reader on checkpoint, so the only solution is to reduce checkpoint frequency? On Thu, Dec 17, 2020 at 10:39 AM Boyuan Zhang wrote: > Thanks for your investigation, Steve! It seems like preventing the

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-17 Thread Boyuan Zhang
Thanks for your investigation, Steve! It seems like preventing the checkpoint from happening so frequently would be one workaround for you. Making the checkpoint frequency configurable from pipeline option seems like the way to go. On Thu, Dec 17, 2020 at 7:35 AM Jan Lukavský wrote: > Hi Steve,

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-17 Thread Jan Lukavský
Hi Steve, I didn't mean we should deliberately make DirectRunner slow, or we should not fix performance issues, if can be fixed. What I meant was that if we are to choose between short checkpoint time (and few elements processed before checkpoint is taken) or performance, we should prefer

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-17 Thread Steve Niemitz
> Primary purpose of DirectRunner is testing, not performance That's one argument, but it's very difficult to effectively test a pipeline when I need to wait 15+ minutes for the first element to go through it. I also, disagree in general that we shouldn't care about the performance of the

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-17 Thread Jan Lukavský
Hi Ismaël, what I meant by the performance vs. testing argument is that when choosing default values for certain (possibly configurable) options, we should prefer choices that result in better tested code, not better performance. DirectRunner actually does quite many things that are

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-17 Thread Ismaël Mejía
The influence of checkpointing on the output of the results should be minimal in particular for Direct Runner. It seems what Steve reports here seems to be something different. Jan have you or others already checked the influence of this on Flink who is now using this new translation path? I

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-17 Thread Jan Lukavský
Hi, from my point of view the number in DirectRunner are set correctly. Primary purpose of DirectRunner is testing, not performance, so DirectRunner makes intentionally frequent checkpoints to easily exercise potential bugs in user code. It might be possible to make the frequency

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-16 Thread Boyuan Zhang
It's not a portable execution on DirectRunner so I would expect that outputs from OutputAndTimeBoundedSplittableProcessElementInvoker should be emitted immediately. For SDF execution on DirectRunner, the overhead could come from the SDF expansion, SDF wrapper and the invoker. Steve, based on your

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-16 Thread Robert Bradshaw
If all it takes is bumping these numbers up a bit, that seems like a reasonable thing to do ASAP. (I would argue that perhaps they shouldn't be static, e.g. it might be preferable to start emitting results right away, but use larger batches for the steady state if there are performance benefits.)

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-16 Thread Steve Niemitz
I tried changing my build locally to 10 seconds and 10,000 elements but it didn't seem to make much of a difference, it still takes a few minutes for elements to begin actually showing up to downstream stages from the Pubsub read. I can see elements being emitted from

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-16 Thread Boyuan Zhang
Making it as the PipelineOptions was my another proposal but it might take some time to do so. On the other hand, tuning the number into something acceptable is low-hanging fruit. On Wed, Dec 16, 2020 at 12:48 PM Ismaël Mejía wrote: > It sounds reasonable. I am wondering also on the consequence

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-16 Thread Ismaël Mejía
It sounds reasonable. I am wondering also on the consequence of these parameters for other runners (where it is every 10 seconds or 1 elements) + their own configuration e.g. checkpointInterval, checkpointTimeoutMillis and minPauseBetweenCheckpoints for Flink. It is not clear for me what would

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-16 Thread Boyuan Zhang
I agree, Ismael. >From my current investigation, the performance overhead should majorly come from the frequency of checkpoint in OutputAndTimeBoundedSplittableProcessElementinvoker[1], which is hardcoded in the DirectRunner(every 1 seconds or 100 elements)[2]. I believe configuring these numbers

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-16 Thread Ismaël Mejía
I can guess that the same issues mentioned here probably will affect the usability for people trying Beam's interactive SQL on Unbounded IO too. We should really take into account that the performance of the SDF based path should be as good or better than the previous version before considering

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-11 Thread Boyuan Zhang
> From what I've seen, the direct runner initiates a checkpoint after every element output. That seems like the 1 second limit kicks in before the output reaches 100 elements. I think the original purpose for DirectRunner to use a small limit on issuing checkpoint requests is for exercising SDF

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-11 Thread Steve Niemitz
>From what I've seen, the direct runner initiates a checkpoint after every element output. On Fri, Dec 11, 2020 at 5:19 PM Boyuan Zhang wrote: > Hi Antonio, > > Thanks for the details! Which version of Beam SDK are you using? And are > you using --experiments=beam_fn_api with DirectRunner to

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-11 Thread Boyuan Zhang
Hi Antonio, Thanks for the details! Which version of Beam SDK are you using? And are you using --experiments=beam_fn_api with DirectRunner to launch your pipeline? For ReadFromKafkaDoFn.processElement(), it will take a Kafka topic+partition as input element and a KafkaConsumer will be assigned

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-11 Thread Antonio Si
Hi Boyuan, This is Antonio. I reported the KafkaIO.read() performance issue on the slack channel a few days ago. I am not sure if this is helpful, but I have been doing some debugging on the SDK KafkaIO performance issue for our pipeline and I would like to provide some observations. It

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-04 Thread Boyuan Zhang
Opened https://issues.apache.org/jira/browse/BEAM-11403 for tracking. On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang wrote: > Thanks for the pointer, Steve! I'll check it out. The execution paths for > UnboundedSource and SDF wrapper are different. It's highly possible that > the regression

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-04 Thread Boyuan Zhang
Thanks for the pointer, Steve! I'll check it out. The execution paths for UnboundedSource and SDF wrapper are different. It's highly possible that the regression either comes from the invocation path for SDF wrapper, or the implementation of SDF wrapper itself. On Fri, Dec 4, 2020 at 6:33 AM

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-04 Thread Steve Niemitz
Coincidentally, someone else in the ASF slack mentioned [1] yesterday that they were seeing significantly reduced performance using KafkaIO.Read w/ the SDF wrapper vs the unbounded source. They mentioned they were using flink 1.9. https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-03 Thread Boyuan Zhang
Hi Steve, I think the major performance regression comes from OutputAndTimeBoundedSplittableProcessElementInvoker[1], which will checkpoint the DoFn based on time/output limit and use timers/state to reschedule works. [1]

Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-03 Thread Steve Niemitz
I have a pipeline that reads from pubsub, does some aggregation, and writes to various places. Previously, in older versions of beam, when running this in the DirectRunner, messages would go through the pipeline almost instantly, making it very easy to debug locally, etc. However, after