Hi Dong, Thanks for the update!
Best regards, Jing On Sun, Jul 9, 2023 at 3:26 AM Dong Lin <lindon...@gmail.com> wrote: > Hi Jing, > > Thanks for the suggestions. Please see my reply inline. > > On Fri, Jul 7, 2023 at 3:50 PM Jing Ge <j...@ververica.com.invalid> wrote: > > > Hi Dong, > > > > Thanks for your clarification. > > > > > > > Actually, I think it could make sense to toggle isBacklog between true > > and > > > false while the job is running. > > > > > > > If isBacklog is toggled too often back and forth(e.g. by unexpected > > mistake, unstable system, etc), a large amount of RecordAttributes might > be > > triggered, which will lead to performance issues. This should not be the > > right way to use RecordAttributes right? Devs and users should be aware > of > > it and know how to monitor, maintain, and fix issues. > > > > Your reply contains valuable information. It might make sense to add them > > into the FLIP: > > > > 1. It is up to the operator to decide when to emit RecordAttributes. But > > devs and users should be aware that the number of RecordAttributes should > > not be too high to cause performance issues. > > > > Sure, I have updated the FLIP to include the following statement: > > "Note: It is up to the operator implementation to decide when (and how > often) to emit RecordAttributes, similar to how operators emit > RecordAttributes. The overhead of emitting Watermark is similar to the > overhead of emitting Watermark" > > > > 2. Although users can decide how to configure them, the end-to-end > latency > > should be (commonly?) configured lower than the checkpoint interval. > > > > Since this is related to the performance tuning rather than > correctness/functionality of the core APIs, I added the following sentence > in the Appendix section: > > "We expect that end-to-end latency will typically be configured with a > value lower than the checkpoint interval" > > 3. The three ways you mentioned for how to derive isBacklog. > > > > Sure, I have updated the FLIP to include the following information: > > "In the future, we expect IsProcessingBacklog can very likely be determined > using the following strategies ..." > > Best, > Dong > > > > > > WDYT? > > > > Best regards, > > Jing > > > > > > On Fri, Jul 7, 2023 at 3:13 AM Dong Lin <lindon...@gmail.com> wrote: > > > > > Hi Jing, > > > > > > Thanks for the comments. Please see my reply inline. > > > > > > On Fri, Jul 7, 2023 at 5:40 AM Jing Ge <j...@ververica.com.invalid> > > wrote: > > > > > > > Hi, > > > > > > > > Thank you all for the inspired discussion. Really appreciate it! > > > > > > > > @Dong I'd like to ask some (stupid) questions to make sure I > understand > > > > your thoughts correctly. > > > > > > > > 1. It will make no sense to send the same type of RecordAttributes > > right? > > > > e.g. if one RecordAttributes(isBacklog=true) has been sent, a new > > > > RecordAttributes will be only sent when isBacklog is changed to be > > false, > > > > and vice versa. In this way, the number of RecordAttributes will be > > very > > > > limited. > > > > > > > > > > Yes, you are right. Actually, this is what we plan to do when we update > > > operators to emit RecordAttributes via `Output#emitRecordAttributes()`. > > > > > > Note that the FLIP does not specify the frequency of how operators > should > > > invoke `Output#emitRecordAttributes()`. It is up to the operator > > > to decide when to emit RecordAttributes. > > > > > > > > > > 2. Since source readers can invoke Output#emitRecordAttributes to > emit > > > > RecordAttributes(isBacklog=true/false), it might be weird to send > > > > RecordAttributes with different isBacklog back and forth too often. > > Devs > > > > and users should pay attention to it. Something is wrong when such a > > > thing > > > > happens(metrics for monitoring?). Is this correct? > > > > > > > > > > > > > > > Actually, I think it could make sense to toggle isBacklog between true > > and > > > false while the job is running. > > > > > > > > > > > Suppose the job is reading from user-action data from Kafka and there > is > > a > > > traffic spike for 2 hours. If the job keeps running in pure stream > mode, > > > the watermark lag might keep increasing during this period because the > > > job's processing capability can not catch up with the Kafka input > > > throughput. In this case, it can be beneficial to dynamically switch > > > isBacklog to true when watermarkLag exceeds a given threshold (e.g. 5 > > > minutes), and switch isBacklog to false again when the watermarkLag is > > low > > > enough (30 seconds). > > > > > > > > > > 3. Is there any relationship between end-to-end-latency and > checkpoint > > > > interval that users should pay attention to? In the example described > > in > > > > the FLIP, both have the same value, 2 min. What about > > end-to-end-latency > > > is > > > > configured bigger than checkpoint interval? Could checkpoint between > > > > end-to-end-latency be skipped? > > > > > > > > > > This FLIP would not enforce any relationship between end-to-end latency > > and > > > checkpoint interval. Users are free to configure end-to-end latency to > be > > > bigger than checkpoint interval. > > > > > > I don't think there exists any use-case which requires end-to-end > latency > > > to be higher than the checkpoint interval. Note that introducing a > > > relationship between these two configs would increase code complexity > and > > > also make the documentation of these configs a bit more complex for > users > > > to understand. > > > > > > Since there is no correctness when a user sets end-to-end latency to be > > > bigger than the checkpointing interval, I think it is simpler to just > let > > > the user decide how to configure them. > > > > > > > > > > 4. Afaiu, one major discussion point is that isBacklog can be derived > > > from > > > > back pressure and there will be no need of RecordAttributes. In case > a > > > > Flink job has rich resources that there is no back pressure (it will > be > > > > difficult to perfectly have just enough resources that everything is > > fine > > > > but will have back pressure only for backlog) but we want to improve > > the > > > > throughput. We then need some other ways to derive isBacklog. That is > > the > > > > reason why RecordAttributes has been introduced. Did I understand it > > > > correctly? > > > > > > > > > > I think there can be multiple ways to derive isBackog, including: > > > 1) Based on the source operator's state. For example, when MySQL CDC > > source > > > is reading snapshot, it can claim isBacklog=true. > > > 2) Based on the watermarkLag in the source. For example, when > > system_time - > > > watermark > user_specified_threshold, then isBacklog=true. > > > 3) Based on metrics. For example, when busyTimeMsPerSecond (or > > > backPressuredTimeMsPerSecond) > user_specified_threshold, then > > > isBacklog=true. > > > > > > Note that there are pros/cons between these choices and none of them > can > > > best fit all use-cases. For example, since option-1 does not require > any > > > extra user-specified threshold, it can be the best choice when we want > to > > > improve user's existing job (e.g. the one in the motivation section) > > > without extra user configuration. > > > > > > For use-cases which want to increase throughput when reading backlog > data > > > from Kafka, option-2 can be the best choice because a threshold based > on > > > the watermark lag is easier to understand and configure than > configuring > > > threshold based on the backPressuredTimeMsPerSecond. > > > > > > option-3 might be the only choice when option-1 and option-2 are not > > > available for the given use-cases. But it might be harder to configure > a > > > threshold against backPressuredTimeMsPerSecond. This is because the > > choice > > > of the percentage (or ms per second) threshold will mostly be empirical > > and > > > approximate. For example, should the user configure this to be 100%, > 99%, > > > or 90%? I would prefer not to have user worry about this if option-1 or > > > option-2 can be used. > > > > > > RecordAttributes would be necessary in order to support option-1 and > > > option-2 well. > > > > > > > > > > 5. NIT: Just like we talked about in another thread, JavaBean naming > > > > convention is recommended, i.e. isBacklog() & setBacklog() instead of > > > > getIsBacklog() and setIsBacklog(). > > > > > > > > > > Yeah, thanks for the suggestion. I have updated the FLIP as suggested. > > > > > > Best, > > > Dong > > > > > > > > > > > > > > Best regards, > > > > Jing > > > > > > > > On Thu, Jul 6, 2023 at 2:38 PM Dong Lin <lindon...@gmail.com> wrote: > > > > > > > > > Hi Shammon, > > > > > > > > > > Thanks for your comments. Please see my reply inline. > > > > > > > > > > > > > > > On Thu, Jul 6, 2023 at 12:47 PM Shammon FY <zjur...@gmail.com> > > wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > Thanks for your replay @Dong. I really agree with Piotr's points > > and > > > I > > > > > > would like to share some thoughts from my side. > > > > > > > > > > > > About the latency for mini-batch mechanism in Flink SQL, I still > > > think > > > > > the > > > > > > description in the FLIP is not right. If there are N operators > and > > > the > > > > > > whole process time for data in the job is `t`, then the latency > in > > > > > > mini-batch will be `table.exec.mini-batch.allow-latency`+`t`, > not ` > > > > > > table.exec.mini-batch.allow-latency`*N. I think this is one of > the > > > > > > foundations of this FLIP, and you may need to confirm it again. > > > > > > > > > > > > > > > > Given that we agree to have a mechanism to support end-to-end > latency > > > for > > > > > DataStream programs, I think the exact semantics of > > > > > table.exec.mini-batch.allow-latency will not affect the motivation > or > > > API > > > > > design of this FLIP. I have updated the FLIP to remove any mention > of > > > > > table.exec.mini-batch.allow-latency. > > > > > > > > > > > > > > > > > > > > > > I think supporting similar mechanisms in the runtime and balance > > > > latency > > > > > > and throughput dynamically for all flink jobs is a very good > idea, > > > and > > > > I > > > > > > have some questions for that. > > > > > > > > > > > > 1. We encounter a situation where the workload is high when > > > processing > > > > > > snapshot data and we need mini-batch in sql for performance > reason. > > > But > > > > > the > > > > > > workload is low when processing delta data, we need to > > automatically > > > > > adjust > > > > > > the mini-batch SQL for them, or even cancel the mini-batch during > > > delta > > > > > > processing. I think this FLIP meets our needs, but I think we > need > > a > > > > > > general solution which covers all source types in flink, and the > > > > > > `isBacklog` in the FLIP is only one strategy. > > > > > > > > > > > > > > > > The focus of this FLIP is to allow Flink runtime to adjust the > > behavior > > > > of > > > > > operators (e.g. the buffer time) based on the IsBacklog status of > > > sources > > > > > and the user-specified execution.end-to-end-latency (effective only > > > when > > > > > there is no backlog). The FLIP assumes there is already a strategy > > for > > > > > sources to determine the IsProcessingBacklog status without adding > > more > > > > > strategies. > > > > > > > > > > I agree it is useful to introduce more strategies to determine the > > the > > > > > IsProcessingBacklog status for sources. We can determine the > > > > > IsProcessingBacklog status based on the backpressure metrics, the > > > > > event-time watermark lag, or anything we find reasonable. I would > > like > > > to > > > > > work on this in follow-up FLIPs and that we don't work on too many > > > things > > > > > in the same FLIP. > > > > > > > > > > Would this be OK with you? > > > > > > > > > > > > > > > > From the FLIP I think there should be two parts: dynamic trigger > > > flush > > > > > > event in JM and dynamic trigger flush operations in Operator. We > > need > > > > to > > > > > > introduce much more general interfaces for them, such as > > > > > > `DynamicFlushStrategy` in JM and `DynamicFlushOperation` in TM? > As > > > > Piotr > > > > > > mentioned above, we can collect many information from TM locally > > such > > > > as > > > > > > backpressure, queue size and `Operator` can decide whether to > > buffer > > > > data > > > > > > or process it immediately. JM is also the same, it can decide to > > > send > > > > > > flush events on a regular basis or send them based on the > collected > > > > > metrics > > > > > > information and other information, such as the isBacklog in the > > FLIP. > > > > > > > > > > > > 2. I really don't get enough benefits for `RecordAttribute` in > the > > > FLIP > > > > > and > > > > > > as Piotr mentioned above too, it will generate a large number of > > > > > messages, > > > > > > > > > > > > > > > > If there is any sentence in the FLIP that suggests we will emit a > lot > > > of > > > > > RecordAttribute, sorry for that and I would fix it. > > > > > > > > > > Currently, the FLIP provides the `Output#emitRecordAttributes()` > for > > > > > operators (e.g. source reader) to emit RecordAttributes. The FLIP > > > leaves > > > > > the operator to decide the frequency and value of the emitted > > > > > RecordAttributes. > > > > > > > > > > Our plan is to let SourceReader emit RecordAttributes only when its > > > value > > > > > (e.g. isBacklog) differs from the value of the RecordAttributes it > > has > > > > > emitted earlier. It should avoid resending RecordAttributes with > the > > > same > > > > > value, similar to how Flink currently avoids resending > > > > > Watermark/WatermarkStatus with the same value. > > > > > > > > > > Would it address your concern? > > > > > > > > > > > > > > > > affecting performance. FLIP mentions that it will be applied to > > > > Operator > > > > > > and Sink, I try to understand it's role and please correct me if > > I'm > > > > > wrong. > > > > > > a) It tells the Operator and Sink that current most of data they > > are > > > > > > processing are from snapshot and are "insert" data? For the out > of > > > > order > > > > > in > > > > > > flink, the Operator and Sink may receive "upsert" data from other > > > > > sources. > > > > > > > > > > > > > > > > The RecordAttributes currently proposed in the FLIP only provides > the > > > > > IsBacklog information, which tells the operator (including sink > > > operator) > > > > > whether the records received after this RercordAttributes event are > > > > > "backlog". Note that snapshot (e.g. MySQL CDC snapshot) is one > > > particular > > > > > case which can be classified as backlog. But we might introduce > more > > > > > strategies to classify recods as backlog in the future. > > > > > > > > > > Currently, RecordAttributes does not specify whether the following > > > > records > > > > > are insert-only or upsert. We might introduce such an atttribute if > > > there > > > > > is a good use-case for having it. > > > > > > > > > > > > > > > > b) Do Operators and Sink perform any very special operations in > the > > > > above > > > > > > situations? What are the benefits of this special operations for > > > "most > > > > > data > > > > > > are insert"? > > > > > > > > > > > > > > > > Hmm.. I don't think the FLIP says something like "most data are > > > insert". > > > > > Could you clarify which part of the FLIP you are talking about? > > > > > > > > > > I was told that Hudi Sink can have much higher throughput if all > its > > > > inputs > > > > > are insert-only. One point in the FLIP is that Hudi Sink can take > > > > advantage > > > > > of the features proposed in the FLIP to increase its throughput > when > > > > source > > > > > are reading backlog data (e.g. MySQL CDC snapshot). > > > > > > > > > > > > > > > > > > > > > c) I think the operator and sink can collect the above > information > > > > > locally > > > > > > when it receives each record without the need for > `RecordAttribute` > > > > even > > > > > > when we need some special operations. > > > > > > > > > > > > > > > > Let's say the job is reading from MySQL CDC, we want to process > > records > > > > in > > > > > snapshot phase with high throughput (and likely high latency), and > > > > process > > > > > records in the binlog phase with low processing latency. > > > > > > > > > > We can achieve this goal by propagating RecordAttributes from > source > > to > > > > > downstream operators. This allows an operator to know exactly that > > > those > > > > > records received before a RecordAttributes(isBacklog=false) can be > > > > > processed with high latency, and those records received after a > > > > > RecordAttributes(isBacklog=false) should be processed with low > > latency. > > > > > > > > > > Could you help explain how to achieve this goal without > > > > `RecordAttribute`? > > > > > > > > > > > > > > > > d) Even if we do need a `RecordAttribute` events in Operator and > > > Sink, > > > > I > > > > > > think broadcast them from JM is a better choice. > > > > > > > > > > > > > > > > Suppose we broadcast it from JM. When MySQL CDC source switches > from > > > the > > > > > snapshot phase to binlog phase, the > RecordAttributes(IsBacklog=false) > > > > might > > > > > arrive at an operator while it is still processing a queue of > records > > > > from > > > > > the snapshot phase, causing the operator to reduce its buffer time > > (and > > > > > throughput) earlier than expected. It might not be a big deal when > > this > > > > > interval is short. But given that propagating RecordAttributes from > > > > sources > > > > > to downstream operators does not have much overhead (no more than > the > > > > > periodic Watermark), it seems useful to make the operator behavior > > more > > > > > accurate. > > > > > > > > > > What do you think? > > > > > > > > > > > > > > > > > > > > > > 3. For the flush event, I also have some questions. What type of > > > > > operators > > > > > > need to buffer data and flush them based on the flush events? In > > SQL > > > > > > > > > > > > > > > > As you mentioned above, typically those operators that rely heavily > > on > > > > > statebackend (e.g. co-group, aggregate, join) can increase > throughput > > > by > > > > > buffering data. Operators which can not benefit from flush, such as > > > Map, > > > > do > > > > > not need to be updated. > > > > > > > > > > It is mentioned in the FLIP that we will "update operators in Flink > > to > > > > > override processRecordAttributes() if that helps improve job > > > performance > > > > > without sacrificing correctness (e.g. processing latency)". > > > > > > > > > > > > > > > > mini-batch mechanism, similar processing will be added for the > > > > aggregate > > > > > > and join operators, while for operators such as map, it is not > > > > necessary. > > > > > > How can we identify different operator in the runtime layer > > (`Input` > > > > and > > > > > > `TwoInputStreamOperator`)? I think buffer data in > > Map/FlatMap/Filter > > > > > > > > > > > > > > > > I am not sure I understand this question. Are you asking how to > > > > > automatically identify the operators that can benefit from the > flush > > > > > operation? > > > > > > > > > > I think we will need to manually identify and optimize operators > on a > > > > > case-by-case basis. There is no good systematic way to > automatically > > > > > identify and optimize all such operators. An operator's > > implementation > > > > > needs to be manually updated and benchmarked before we commit the > > > > > corresponding code change. > > > > > > > > > > What do you think? > > > > > > > > > > Best, > > > > > Dong > > > > > > > > > > > > > > > > Operator is not a good idea which makes data no longer flowing. > > > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > Shammon FY > > > > > > > > > > > > > > > > > > On Thu, Jul 6, 2023 at 1:54 AM Piotr Nowojski < > > > > piotr.nowoj...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > Thanks for this proposal, this is a very much needed thing that > > > > should > > > > > be > > > > > > > addressed in Flink. > > > > > > > > > > > > > > I think there is one thing that hasn't been discussed neither > > here > > > > nor > > > > > in > > > > > > > FLIP-309. Given that we have > > > > > > > three dimensions: > > > > > > > - e2e latency/checkpointing interval > > > > > > > - enabling some kind of batching/buffering on the operator > level > > > > > > > - how much resources we want to allocate to the job > > > > > > > > > > > > > > How do we want Flink to adjust itself between those three? For > > > > example: > > > > > > > a) Should we assume that given Job has a fixed amount of > assigned > > > > > > > resources and make it paramount that > > > > > > > Flink doesn't exceed those available resources? So in case of > > > > > > > backpressure, we > > > > > > > should extend checkpointing intervals, emit records less > > > frequently > > > > > and > > > > > > > in batches. > > > > > > > b) Or should we assume that the amount of resources is flexible > > (up > > > > to > > > > > a > > > > > > > point?), and the desired e2e latency > > > > > > > is the paramount aspect? So in case of backpressure, we > should > > > > still > > > > > > > adhere to the configured e2e latency, > > > > > > > and wait for the user or autoscaler to scale up the job? > > > > > > > > > > > > > > In case of a), I think the concept of "isProcessingBacklog" is > > not > > > > > > needed, > > > > > > > we could steer the behaviour only > > > > > > > using the backpressure information. > > > > > > > > > > > > > > On the other hand, in case of b), "isProcessingBacklog" > > information > > > > > might > > > > > > > be helpful, to let Flink know that > > > > > > > we can safely decrease the e2e latency/checkpoint interval even > > if > > > > > there > > > > > > > is no backpressure, to use fewer > > > > > > > resources (and let the autoscaler scale down the job). > > > > > > > > > > > > > > Do we want to have both, or only one of those? Do a) and b) > > > > complement > > > > > > one > > > > > > > another? If job is backpressured, > > > > > > > we should follow a) and expose to autoscaler/users information > > > "Hey! > > > > > I'm > > > > > > > barely keeping up! I need more resources!". > > > > > > > While, when there is no backpressure and latency doesn't matter > > > > > > > (isProcessingBacklog=true), we can limit the resource > > > > > > > usage. > > > > > > > > > > > > > > And a couple of more concrete remarks about the current > proposal. > > > > > > > > > > > > > > 1. > > > > > > > > > > > > > > > I think the goal is to allow users to specify an end-to-end > > > latency > > > > > > > budget for the job. > > > > > > > > > > > > > > I fully agree with this, but in that case, why are you > proposing > > to > > > > add > > > > > > > `execution.flush.interval`? That's > > > > > > > yet another parameter that would affect e2e latency, without > > > actually > > > > > > > defining it. We already have things > > > > > > > like: execution.checkpointing.interval, > execution.buffer-timeout. > > > I'm > > > > > > > pretty sure very few Flink users would be > > > > > > > able to configure or understand all of them. > > > > > > > > > > > > > > I think we should simplify configuration and try to define > > > > > > > "execution.end-to-end-latency" so the runtime > > > > > > > could derive other things from this new configuration. > > > > > > > > > > > > > > 2. How do you envision `#flush()` and `#snapshotState()` to be > > > > > connected? > > > > > > > So far, `#snapshotState()` > > > > > > > was considered as a kind of `#flush()` signal. Do we need both? > > > > > Shouldn't > > > > > > > `#flush()` be implicitly or > > > > > > > explicitly attached to the `#snapshotState()` call? > > > > > > > > > > > > > > 3. What about unaligned checkpoints if we have separate > > `#flush()` > > > > > > > event/signal? > > > > > > > > > > > > > > 4. How should this be working in at-least-once mode (especially > > > > sources > > > > > > > that are configured to be working > > > > > > > in at-least-once mode)?. > > > > > > > > > > > > > > 5. How is this FLIP connected with FLI-327? I think they are > > trying > > > > to > > > > > > > achieve basically the same thing: > > > > > > > optimise when data should be flushed/committed to balance > between > > > > > > > throughput and latency. > > > > > > > > > > > > > > 6. > > > > > > > > > > > > > > > Add RecordAttributesBuilder and RecordAttributes that extends > > > > > > > StreamElement to provide operator with essential > > > > > > > > information about the records they receive, such as whether > the > > > > > records > > > > > > > are already stale due to backlog. > > > > > > > > > > > > > > Passing along `RecordAttribute` for every `StreamElement` would > > be > > > an > > > > > > > extremely inefficient solution. > > > > > > > > > > > > > > If at all, this should be a marker propagated through the > > JobGraph > > > > vie > > > > > > > Events or sent from JM to TMs via an RPC > > > > > > > that would mark "backlog processing started/ended". Note that > > > Events > > > > > > might > > > > > > > be costly, as they need to be > > > > > > > broadcasted. So with a job having 5 keyBy exchanges and > > parallelism > > > > of > > > > > > > 1000, the number of events sent is > > > > > > > ~4 000 000, while the number of RPCs would be only 5000. > > > > > > > > > > > > > > In case we want to only check for the backpressure, we don't > need > > > any > > > > > > > extra signal. Operators/subtasks can > > > > > > > get that information very easily from the TMs runtime. > > > > > > > > > > > > > > Best, > > > > > > > Piotrek > > > > > > > > > > > > > > czw., 29 cze 2023 o 17:19 Dong Lin <lindon...@gmail.com> > > > napisał(a): > > > > > > > > > > > > > >> Hi Shammon, > > > > > > >> > > > > > > >> Thanks for your comments. Please see my reply inline. > > > > > > >> > > > > > > >> On Thu, Jun 29, 2023 at 6:01 PM Shammon FY <zjur...@gmail.com > > > > > > wrote: > > > > > > >> > > > > > > >> > Hi Dong and Yunfeng, > > > > > > >> > > > > > > > >> > Thanks for bringing up this discussion. > > > > > > >> > > > > > > > >> > As described in the FLIP, the differences between > `end-to-end > > > > > latency` > > > > > > >> and > > > > > > >> > `table.exec.mini-batch.allow-latency` are: "It allows users > to > > > > > specify > > > > > > >> the > > > > > > >> > end-to-end latency, whereas > > table.exec.mini-batch.allow-latency > > > > > > applies > > > > > > >> to > > > > > > >> > each operator. If there are N operators on the path from > > source > > > to > > > > > > sink, > > > > > > >> > the end-to-end latency could be up to > > > > > > >> table.exec.mini-batch.allow-latency * > > > > > > >> > N". > > > > > > >> > > > > > > > >> > If I understand correctly, > > `table.exec.mini-batch.allow-latency` > > > > is > > > > > > also > > > > > > >> > applied to the end-to-end latency for a job, maybe @Jack Wu > > can > > > > give > > > > > > >> more > > > > > > >> > information. > > > > > > >> > > > > > > > >> > > > > > > >> Based on what I can tell from the doc/code and offline > > > discussion, I > > > > > > >> believe table.exec.mini-batch.allow-latency is not applied to > > the > > > > > > >> end-to-end latency for a job. > > > > > > >> > > > > > > >> It is mentioned here > > > > > > >> < > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/config/ > > > > > > >> > > > > > > > >> that > > > > > > >> table.exec.mini-batch.allow-latency is "the maximum latency > can > > be > > > > > used > > > > > > >> for > > > > > > >> MiniBatch to buffer input records". I think we should have > > > mentioned > > > > > > that > > > > > > >> the config is applied to the end-to-end latency in this doc if > > it > > > is > > > > > > >> indeed > > > > > > >> the case. > > > > > > >> > > > > > > >> > > > > > > >> > So, from my perspective, and please correct me if I'm > > > > misunderstand, > > > > > > the > > > > > > >> > targets of this FLIP may include the following: > > > > > > >> > > > > > > > >> > 1. Support a mechanism like `mini-batch` in SQL for > > > `DataStream`, > > > > > > which > > > > > > >> > will collect data in the operator and flush data when it > > > receives > > > > a > > > > > > >> `flush` > > > > > > >> > event, in the FLIP it is `FlushEvent`. > > > > > > >> > > > > > > > >> > > > > > > >> I think the goal is to allow users to specify an end-to-end > > > latency > > > > > > budget > > > > > > >> for the job. IMO it is quite different from the `mini-batch` > in > > > SQL. > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > > >> > 2. Support dynamic `latency` according to the progress of > job, > > > > such > > > > > as > > > > > > >> > snapshot stage and after that. > > > > > > >> > > > > > > > >> > To do that, I have some questions: > > > > > > >> > > > > > > > >> > 1. I didn't understand the purpose of public interface > > > > > > >> `RecordAttributes`. > > > > > > >> > I think `FlushEvent` in the FLIP is enough, and different > > > > > > >> > `DynamicFlushStrategy` can be added to generate flush events > > to > > > > > > address > > > > > > >> > different needs, such as a static interval similar to > > mini-batch > > > > in > > > > > > SQL > > > > > > >> or > > > > > > >> > collect the information `isProcessingBacklog` and metrics to > > > > > generate > > > > > > >> > `FlushEvent` which is described in your FLIP? If hudi sink > > needs > > > > the > > > > > > >> > `isBacklog` flag, the hudi `SplitEnumerator` can create an > > > > operator > > > > > > >> event > > > > > > >> > and send it to hudi source reader. > > > > > > >> > > > > > > > >> > > > > > > >> Suppose we only have FlushEvent, then operators (e.g. Hudi > Sink) > > > > will > > > > > > not > > > > > > >> know they can buffer data in the following scenario: > > > > > > >> > > > > > > >> - execution.allowed-latency is not configured and use the > > default > > > > > value > > > > > > >> null. > > > > > > >> - The job is reading from HybridSource and HybridSource says > > > > > > >> isBacklog=true. > > > > > > >> > > > > > > >> Also note that Hudi Sink might not be the only operators that > > can > > > > > > benefit > > > > > > >> from knowing isBacklog=true. Other sinks and aggregation > > operators > > > > > (e.g. > > > > > > >> CoGroup) can also increase throughput by buffering/sorting > > records > > > > > when > > > > > > >> there is backlog. So it seems simpler to pass RecordAttributes > > to > > > > > these > > > > > > >> operators than asking every operator developer to create > > operator > > > > > event > > > > > > >> etc. > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > > >> > 2. How is this new mechanism unified with SQL's mini-batch > > > > > mechanism? > > > > > > As > > > > > > >> > far as I am concerned, SQL implements mini-batch mechanism > > based > > > > on > > > > > > >> > watermark, I think it is very unreasonable to have two > > different > > > > > > >> > implementation in SQL and DataStream. > > > > > > >> > > > > > > > >> > > > > > > >> I think we can deprecate table.exec.mini-batch.allow-latency > > later > > > > > > >> once execution.allowed-latency is ready for production usage. > > This > > > > is > > > > > > >> mentioned in the "Compatibility, Deprecation, and Migration > > Plan" > > > > > > section. > > > > > > >> > > > > > > >> If there is a config that supports user specifying the e2e > > > latency, > > > > it > > > > > > is > > > > > > >> probably reasonable for this config to work for both > DataStream > > > and > > > > > SQL. > > > > > > >> > > > > > > >> > > > > > > >> > 3. I notice that the `CheckpointCoordinator` will generate > > > > > > `FlushEvent`, > > > > > > >> > which information about `FlushEvent` will be stored in > > > > > > >> > > > > > > > >> > > > > > > >> CheckpointCoordinator might need to send FlushEvent before > > > > triggering > > > > > > >> checkpoint in order to deal with the two-phase commit sinks. > The > > > > > > algorithm > > > > > > >> is specified in the "Proposed Changes" section. > > > > > > >> > > > > > > >> > > > > > > >> > `Checkpoint`? What is the alignment strategy for FlushEvent > in > > > the > > > > > > >> > operator? The operator will flush the data when it receives > > all > > > > > > >> > `FlushEvent` from upstream with the same ID or do flush for > > each > > > > > > >> > `FlushEvent`? Can you give more detailed proposal about > that? > > We > > > > > also > > > > > > >> have > > > > > > >> > a demand for this piece, thanks > > > > > > >> > > > > > > > >> > > > > > > >> After an operator has received a FlushEvent: > > > > > > >> - If the ID of the received FlushEvent is larger than the > > largest > > > ID > > > > > > this > > > > > > >> operator has received, then flush() is triggered for this > > operator > > > > and > > > > > > the > > > > > > >> operator should broadcast FlushEvent to downstream operators. > > > > > > >> - Otherwise, this FlushEvent is ignored. > > > > > > >> > > > > > > >> This behavior is specified in the Java doc of the FlushEvent. > > > > > > >> > > > > > > >> Can you see if this answers your questions? > > > > > > >> > > > > > > >> Best, > > > > > > >> Dong > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > > >> > Best, > > > > > > >> > Shammon FY > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > On Thu, Jun 29, 2023 at 4:35 PM Martijn Visser < > > > > > > >> martijnvis...@apache.org> > > > > > > >> > wrote: > > > > > > >> > > > > > > > >> >> Hi Dong and Yunfeng, > > > > > > >> >> > > > > > > >> >> Thanks for the FLIP. What's not clear for me is what's the > > > > expected > > > > > > >> >> behaviour when the allowed latency can't be met, for > whatever > > > > > reason. > > > > > > >> >> Given that we're talking about an "allowed latency", it > > implies > > > > > that > > > > > > >> >> something has gone wrong and should fail? Isn't this more a > > > > minimum > > > > > > >> >> latency that you're proposing? > > > > > > >> >> > > > > > > >> >> There's also the part about the Hudi Sink processing > records > > > > > > >> >> immediately upon arrival. Given that the SinkV2 API > provides > > > the > > > > > > >> >> ability for custom post and pre-commit topologies [1], > > > > specifically > > > > > > >> >> targeted to avoid generating multiple small files, why > isn't > > > that > > > > > > >> >> sufficient for the Hudi Sink? It would be great to see that > > > added > > > > > > >> >> under Rejected Alternatives if this is indeed not > sufficient. > > > > > > >> >> > > > > > > >> >> Best regards, > > > > > > >> >> > > > > > > >> >> Martijn > > > > > > >> >> > > > > > > >> >> [1] > > > > > > >> >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction > > > > > > >> >> > > > > > > >> >> On Sun, Jun 25, 2023 at 4:25 AM Yunfeng Zhou > > > > > > >> >> <flink.zhouyunf...@gmail.com> wrote: > > > > > > >> >> > > > > > > > >> >> > Hi all, > > > > > > >> >> > > > > > > > >> >> > Dong(cc'ed) and I are opening this thread to discuss our > > > > proposal > > > > > > to > > > > > > >> >> > support configuring end-to-end allowed latency for Flink > > > jobs, > > > > > > which > > > > > > >> >> > has been documented in FLIP-325 > > > > > > >> >> > < > > > > > > >> >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-325%3A+Support+configuring+end-to-end+allowed+latency > > > > > > >> >> >. > > > > > > >> >> > > > > > > > >> >> > By configuring the latency requirement for a Flink job, > > users > > > > > would > > > > > > >> be > > > > > > >> >> > able to optimize the throughput and overhead of the job > > while > > > > > still > > > > > > >> >> > acceptably increasing latency. This approach is > > particularly > > > > > useful > > > > > > >> >> > when dealing with records that do not require immediate > > > > > processing > > > > > > >> and > > > > > > >> >> > emission upon arrival. > > > > > > >> >> > > > > > > > >> >> > Please refer to the FLIP document for more details about > > the > > > > > > proposed > > > > > > >> >> > design and implementation. We welcome any feedback and > > > opinions > > > > > on > > > > > > >> >> > this proposal. > > > > > > >> >> > > > > > > > >> >> > Best regards. > > > > > > >> >> > > > > > > > >> >> > Dong and Yunfeng > > > > > > >> >> > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >