Hi Dong,

Thanks for the update!

Best regards,
Jing

On Sun, Jul 9, 2023 at 3:26 AM Dong Lin <lindon...@gmail.com> wrote:

> Hi Jing,
>
> Thanks for the suggestions. Please see my reply inline.
>
> On Fri, Jul 7, 2023 at 3:50 PM Jing Ge <j...@ververica.com.invalid> wrote:
>
> > Hi Dong,
> >
> > Thanks for your clarification.
> >
> >
> > > Actually, I think it could make sense to toggle isBacklog between true
> > and
> > > false while the job is running.
> > >
> >
> > If isBacklog is toggled too often back and forth(e.g. by unexpected
> > mistake, unstable system, etc), a large amount of RecordAttributes might
> be
> > triggered, which will lead to performance issues. This should not be the
> > right way to use RecordAttributes right? Devs and users should be aware
> of
> > it and know how to monitor, maintain, and fix issues.
> >
> > Your reply contains valuable information. It might make sense to add them
> > into the FLIP:
> >
> > 1. It is up to the operator to decide when to emit RecordAttributes. But
> > devs and users should be aware that the number of RecordAttributes should
> > not be too high to cause performance issues.
> >
>
> Sure, I have updated the FLIP to include the following statement:
>
> "Note: It is up to the operator implementation to decide when (and how
> often) to emit RecordAttributes, similar to how operators emit
> RecordAttributes. The overhead of emitting Watermark is similar to the
> overhead of emitting Watermark"
>
>
> > 2. Although users can decide how to configure them, the end-to-end
> latency
> > should be (commonly?) configured lower than the checkpoint interval.
> >
>
> Since this is related to the performance tuning rather than
> correctness/functionality of the core APIs, I added the following sentence
> in the Appendix section:
>
> "We expect that end-to-end latency will typically be configured with a
> value lower than the checkpoint interval"
>
> 3. The three ways you mentioned for how to derive isBacklog.
> >
>
> Sure, I have updated the FLIP to include the following information:
>
> "In the future, we expect IsProcessingBacklog can very likely be determined
> using the following strategies ..."
>
> Best,
> Dong
>
>
> >
> > WDYT?
> >
> > Best regards,
> > Jing
> >
> >
> > On Fri, Jul 7, 2023 at 3:13 AM Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hi Jing,
> > >
> > > Thanks for the comments. Please see my reply inline.
> > >
> > > On Fri, Jul 7, 2023 at 5:40 AM Jing Ge <j...@ververica.com.invalid>
> > wrote:
> > >
> > > > Hi,
> > > >
> > > > Thank you all for the inspired discussion. Really appreciate it!
> > > >
> > > > @Dong I'd like to ask some (stupid) questions to make sure I
> understand
> > > > your thoughts correctly.
> > > >
> > > > 1. It will make no sense to send the same type of RecordAttributes
> > right?
> > > > e.g.  if one RecordAttributes(isBacklog=true) has been sent, a new
> > > > RecordAttributes will be only sent when isBacklog is changed to be
> > false,
> > > > and vice versa. In this way, the number of RecordAttributes will be
> > very
> > > > limited.
> > > >
> > >
> > > Yes, you are right. Actually, this is what we plan to do when we update
> > > operators to emit RecordAttributes via `Output#emitRecordAttributes()`.
> > >
> > > Note that the FLIP does not specify the frequency of how operators
> should
> > > invoke `Output#emitRecordAttributes()`. It is up to the operator
> > > to decide when to emit RecordAttributes.
> > >
> > >
> > > > 2. Since source readers can invoke Output#emitRecordAttributes to
> emit
> > > > RecordAttributes(isBacklog=true/false), it might be weird to send
> > > > RecordAttributes with different isBacklog back and forth too often.
> > Devs
> > > > and users should pay attention to it. Something is wrong when such a
> > > thing
> > > > happens(metrics for monitoring?). Is this correct?
> > > >
> > >
> > >
> >
> > > Actually, I think it could make sense to toggle isBacklog between true
> > and
> > > false while the job is running.
> > >
> > >
> >
> > > Suppose the job is reading from user-action data from Kafka and there
> is
> > a
> > > traffic spike for 2 hours. If the job keeps running in pure stream
> mode,
> > > the watermark lag might keep increasing during this period because the
> > > job's processing capability can not catch up with the Kafka input
> > > throughput. In this case, it can be beneficial to dynamically switch
> > > isBacklog to true when watermarkLag exceeds a given threshold (e.g. 5
> > > minutes), and switch isBacklog to false again when the watermarkLag is
> > low
> > > enough (30 seconds).
> > >
> > >
> > > > 3. Is there any relationship between end-to-end-latency and
> checkpoint
> > > > interval that users should pay attention to? In the example described
> > in
> > > > the FLIP, both have the same value, 2 min. What about
> > end-to-end-latency
> > > is
> > > > configured bigger than checkpoint interval? Could checkpoint between
> > > > end-to-end-latency be skipped?
> > > >
> > >
> > > This FLIP would not enforce any relationship between end-to-end latency
> > and
> > > checkpoint interval. Users are free to configure end-to-end latency to
> be
> > > bigger than checkpoint interval.
> > >
> > > I don't think there exists any use-case which requires end-to-end
> latency
> > > to be higher than the checkpoint interval. Note that introducing a
> > > relationship between these two configs would increase code complexity
> and
> > > also make the documentation of these configs a bit more complex for
> users
> > > to understand.
> > >
> > > Since there is no correctness when a user sets end-to-end latency to be
> > > bigger than the checkpointing interval, I think it is simpler to just
> let
> > > the user decide how to configure them.
> > >
> > >
> > > > 4. Afaiu, one major discussion point is that isBacklog can be derived
> > > from
> > > > back pressure and there will be no need of RecordAttributes. In case
> a
> > > > Flink job has rich resources that there is no back pressure (it will
> be
> > > > difficult to perfectly have just enough resources that everything is
> > fine
> > > > but will have back pressure only for backlog) but we want to improve
> > the
> > > > throughput. We then need some other ways to derive isBacklog. That is
> > the
> > > > reason why RecordAttributes has been introduced. Did I understand it
> > > > correctly?
> > > >
> > >
> > > I think there can be multiple ways to derive isBackog, including:
> > > 1) Based on the source operator's state. For example, when MySQL CDC
> > source
> > > is reading snapshot, it can claim isBacklog=true.
> > > 2) Based on the watermarkLag in the source. For example, when
> > system_time -
> > > watermark > user_specified_threshold, then isBacklog=true.
> > > 3) Based on metrics. For example, when busyTimeMsPerSecond (or
> > > backPressuredTimeMsPerSecond) > user_specified_threshold, then
> > > isBacklog=true.
> > >
> > > Note that there are pros/cons between these choices and none of them
> can
> > > best fit all use-cases. For example, since option-1 does not require
> any
> > > extra user-specified threshold, it can be the best choice when we want
> to
> > > improve user's existing job (e.g. the one in the motivation section)
> > > without extra user configuration.
> > >
> > > For use-cases which want to increase throughput when reading backlog
> data
> > > from Kafka, option-2 can be the best choice because a threshold based
> on
> > > the watermark lag is easier to understand and configure than
> configuring
> > > threshold based on the backPressuredTimeMsPerSecond.
> > >
> > > option-3 might be the only choice when option-1 and option-2 are not
> > > available for the given use-cases. But it might be harder to configure
> a
> > > threshold against backPressuredTimeMsPerSecond. This is because the
> > choice
> > > of the percentage (or ms per second) threshold will mostly be empirical
> > and
> > > approximate. For example, should the user configure this to be 100%,
> 99%,
> > > or 90%? I would prefer not to have user worry about this if option-1 or
> > > option-2 can be used.
> > >
> > > RecordAttributes would be necessary in order to support option-1 and
> > > option-2 well.
> > >
> > >
> > > > 5. NIT: Just like we talked about in another thread, JavaBean naming
> > > > convention is recommended, i.e. isBacklog() & setBacklog() instead of
> > > > getIsBacklog() and setIsBacklog().
> > > >
> > >
> > > Yeah, thanks for the suggestion. I have updated the FLIP as suggested.
> > >
> > > Best,
> > > Dong
> > >
> > >
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > > On Thu, Jul 6, 2023 at 2:38 PM Dong Lin <lindon...@gmail.com> wrote:
> > > >
> > > > > Hi Shammon,
> > > > >
> > > > > Thanks for your comments. Please see my reply inline.
> > > > >
> > > > >
> > > > > On Thu, Jul 6, 2023 at 12:47 PM Shammon FY <zjur...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > Thanks for your replay @Dong. I really agree with Piotr's points
> > and
> > > I
> > > > > > would like to share some thoughts from my side.
> > > > > >
> > > > > > About the latency for mini-batch mechanism in Flink SQL, I still
> > > think
> > > > > the
> > > > > > description in the FLIP is not right. If there are N operators
> and
> > > the
> > > > > > whole process time for data in the job is `t`, then the latency
> in
> > > > > > mini-batch will be `table.exec.mini-batch.allow-latency`+`t`,
> not `
> > > > > > table.exec.mini-batch.allow-latency`*N. I think this is one of
> the
> > > > > > foundations of this FLIP, and you may need to confirm it again.
> > > > > >
> > > > >
> > > > > Given that we agree to have a mechanism to support end-to-end
> latency
> > > for
> > > > > DataStream programs, I think the exact semantics of
> > > > > table.exec.mini-batch.allow-latency will not affect the motivation
> or
> > > API
> > > > > design of this FLIP. I have updated the FLIP to remove any mention
> of
> > > > > table.exec.mini-batch.allow-latency.
> > > > >
> > > > >
> > > > > >
> > > > > > I think supporting similar mechanisms in the runtime and balance
> > > > latency
> > > > > > and throughput dynamically for all flink jobs is a very good
> idea,
> > > and
> > > > I
> > > > > > have some questions for that.
> > > > > >
> > > > > > 1. We encounter a situation where the workload is high when
> > > processing
> > > > > > snapshot data and we need mini-batch in sql for performance
> reason.
> > > But
> > > > > the
> > > > > > workload is low when processing delta data, we need to
> > automatically
> > > > > adjust
> > > > > > the mini-batch SQL for them, or even cancel the mini-batch during
> > > delta
> > > > > > processing. I think this FLIP meets our needs, but I think we
> need
> > a
> > > > > > general solution which covers all source types in flink, and the
> > > > > > `isBacklog` in the FLIP is only one strategy.
> > > > > >
> > > > >
> > > > > The focus of this FLIP is to allow Flink runtime to adjust the
> > behavior
> > > > of
> > > > > operators (e.g. the buffer time) based on the IsBacklog status of
> > > sources
> > > > > and the user-specified execution.end-to-end-latency (effective only
> > > when
> > > > > there is no backlog). The FLIP assumes there is already a strategy
> > for
> > > > > sources to determine the IsProcessingBacklog status without adding
> > more
> > > > > strategies.
> > > > >
> > > > > I agree it is useful to introduce more strategies to determine the
> > the
> > > > > IsProcessingBacklog status for sources. We can determine the
> > > > > IsProcessingBacklog status based on the backpressure metrics, the
> > > > > event-time watermark lag, or anything we find reasonable. I would
> > like
> > > to
> > > > > work on this in follow-up FLIPs and that we don't work on too many
> > > things
> > > > > in the same FLIP.
> > > > >
> > > > > Would this be OK with you?
> > > > >
> > > > >
> > > > > > From the FLIP I think there should be two parts: dynamic trigger
> > > flush
> > > > > > event in JM and dynamic trigger flush operations in Operator. We
> > need
> > > > to
> > > > > > introduce much more general interfaces for them, such as
> > > > > > `DynamicFlushStrategy` in JM and `DynamicFlushOperation` in TM?
> As
> > > > Piotr
> > > > > > mentioned above, we can collect many information from TM locally
> > such
> > > > as
> > > > > > backpressure, queue size and `Operator` can decide whether to
> > buffer
> > > > data
> > > > > > or process it immediately.  JM is also the same, it can decide to
> > > send
> > > > > > flush events on a regular basis or send them based on the
> collected
> > > > > metrics
> > > > > > information and other information, such as the isBacklog in the
> > FLIP.
> > > > > >
> > > > > > 2. I really don't get enough benefits for `RecordAttribute` in
> the
> > > FLIP
> > > > > and
> > > > > > as Piotr mentioned above too, it will generate a large number of
> > > > > messages,
> > > > > >
> > > > >
> > > > > If there is any sentence in the FLIP that suggests we will emit a
> lot
> > > of
> > > > > RecordAttribute, sorry for that and I would fix it.
> > > > >
> > > > > Currently, the FLIP provides the `Output#emitRecordAttributes()`
> for
> > > > > operators (e.g. source reader) to emit RecordAttributes. The FLIP
> > > leaves
> > > > > the operator to decide the frequency and value of the emitted
> > > > > RecordAttributes.
> > > > >
> > > > > Our plan is to let SourceReader emit RecordAttributes only when its
> > > value
> > > > > (e.g. isBacklog) differs from the value of the RecordAttributes it
> > has
> > > > > emitted earlier. It should avoid resending RecordAttributes with
> the
> > > same
> > > > > value, similar to how Flink currently avoids resending
> > > > > Watermark/WatermarkStatus with the same value.
> > > > >
> > > > > Would it address your concern?
> > > > >
> > > > >
> > > > > > affecting performance. FLIP mentions that it will be applied to
> > > > Operator
> > > > > > and Sink, I try to understand it's role and please correct me if
> > I'm
> > > > > wrong.
> > > > > > a) It tells the Operator and Sink that current most of data they
> > are
> > > > > > processing are from snapshot and are "insert" data? For the out
> of
> > > > order
> > > > > in
> > > > > > flink, the Operator and Sink may receive "upsert" data from other
> > > > > sources.
> > > > > >
> > > > >
> > > > > The RecordAttributes currently proposed in the FLIP only provides
> the
> > > > > IsBacklog information, which tells the operator (including sink
> > > operator)
> > > > > whether the records received after this RercordAttributes event are
> > > > > "backlog". Note that snapshot (e.g. MySQL CDC snapshot) is one
> > > particular
> > > > > case which can be classified as backlog. But we might introduce
> more
> > > > > strategies to classify recods as backlog in the future.
> > > > >
> > > > > Currently, RecordAttributes does not specify whether the following
> > > > records
> > > > > are insert-only or upsert. We might introduce such an atttribute if
> > > there
> > > > > is a good use-case for having it.
> > > > >
> > > > >
> > > > > > b) Do Operators and Sink perform any very special operations in
> the
> > > > above
> > > > > > situations? What are the benefits of this special operations for
> > > "most
> > > > > data
> > > > > > are insert"?
> > > > > >
> > > > >
> > > > > Hmm.. I don't think the FLIP says something like "most data are
> > > insert".
> > > > > Could you clarify which part of the FLIP you are talking about?
> > > > >
> > > > > I was told that Hudi Sink can have much higher throughput if all
> its
> > > > inputs
> > > > > are insert-only. One point in the FLIP is that Hudi Sink can take
> > > > advantage
> > > > > of the features proposed in the FLIP to increase its throughput
> when
> > > > source
> > > > > are reading backlog data (e.g. MySQL CDC snapshot).
> > > > >
> > > > >
> > > > >
> > > > > > c) I think the operator and sink can collect the above
> information
> > > > > locally
> > > > > > when it receives each record without the need for
> `RecordAttribute`
> > > > even
> > > > > > when we need some special operations.
> > > > > >
> > > > >
> > > > > Let's say the job is reading from MySQL CDC, we want to process
> > records
> > > > in
> > > > > snapshot phase with high throughput (and likely high latency), and
> > > > process
> > > > > records in the binlog phase with low processing latency.
> > > > >
> > > > > We can achieve this goal by propagating RecordAttributes from
> source
> > to
> > > > > downstream operators. This allows an operator to know exactly that
> > > those
> > > > > records received before a RecordAttributes(isBacklog=false) can be
> > > > > processed with high latency, and those records received after a
> > > > > RecordAttributes(isBacklog=false) should be processed with low
> > latency.
> > > > >
> > > > > Could you help explain how to achieve this goal without
> > > > `RecordAttribute`?
> > > > >
> > > > >
> > > > > > d) Even if we do need a `RecordAttribute` events in Operator and
> > > Sink,
> > > > I
> > > > > > think broadcast them from JM is a better choice.
> > > > > >
> > > > >
> > > > > Suppose we broadcast it from JM. When MySQL CDC source switches
> from
> > > the
> > > > > snapshot phase to binlog phase, the
> RecordAttributes(IsBacklog=false)
> > > > might
> > > > > arrive at an operator while it is still processing a queue of
> records
> > > > from
> > > > > the snapshot phase, causing the operator to reduce its buffer time
> > (and
> > > > > throughput) earlier than expected. It might not be a big deal when
> > this
> > > > > interval is short. But given that propagating RecordAttributes from
> > > > sources
> > > > > to downstream operators does not have much overhead (no more than
> the
> > > > > periodic Watermark), it seems useful to make the operator behavior
> > more
> > > > > accurate.
> > > > >
> > > > > What do you think?
> > > > >
> > > > >
> > > > > >
> > > > > > 3. For the flush event, I also have some questions. What type of
> > > > > operators
> > > > > > need to buffer data and flush them based on the flush events? In
> > SQL
> > > > > >
> > > > >
> > > > > As you mentioned above, typically those operators that rely heavily
> > on
> > > > > statebackend (e.g. co-group, aggregate, join) can increase
> throughput
> > > by
> > > > > buffering data. Operators which can not benefit from flush, such as
> > > Map,
> > > > do
> > > > > not need to be updated.
> > > > >
> > > > > It is mentioned in the FLIP that we will "update operators in Flink
> > to
> > > > > override processRecordAttributes() if that helps improve job
> > > performance
> > > > > without sacrificing correctness (e.g. processing latency)".
> > > > >
> > > > >
> > > > > > mini-batch mechanism, similar processing will be added for the
> > > > aggregate
> > > > > > and join operators, while for operators such as map, it is not
> > > > necessary.
> > > > > > How can we identify different operator in the runtime layer
> > (`Input`
> > > > and
> > > > > > `TwoInputStreamOperator`)? I think buffer data in
> > Map/FlatMap/Filter
> > > > > >
> > > > >
> > > > > I am not sure I understand this question. Are you asking how to
> > > > > automatically identify the operators that can benefit from the
> flush
> > > > > operation?
> > > > >
> > > > > I think we will need to manually identify and optimize operators
> on a
> > > > > case-by-case basis. There is no good systematic way to
> automatically
> > > > > identify and optimize all such operators. An operator's
> > implementation
> > > > > needs to be manually updated and benchmarked before we commit the
> > > > > corresponding code change.
> > > > >
> > > > > What do you think?
> > > > >
> > > > > Best,
> > > > > Dong
> > > > >
> > > > >
> > > > > > Operator is not a good idea which makes data no longer flowing.
> > > > > >
> > > > >
> > > > > >
> > > > > > Best,
> > > > > > Shammon FY
> > > > > >
> > > > > >
> > > > > > On Thu, Jul 6, 2023 at 1:54 AM Piotr Nowojski <
> > > > piotr.nowoj...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > Thanks for this proposal, this is a very much needed thing that
> > > > should
> > > > > be
> > > > > > > addressed in Flink.
> > > > > > >
> > > > > > > I think there is one thing that hasn't been discussed neither
> > here
> > > > nor
> > > > > in
> > > > > > > FLIP-309. Given that we have
> > > > > > > three dimensions:
> > > > > > > - e2e latency/checkpointing interval
> > > > > > > - enabling some kind of batching/buffering on the operator
> level
> > > > > > > - how much resources we want to allocate to the job
> > > > > > >
> > > > > > > How do we want Flink to adjust itself between those three? For
> > > > example:
> > > > > > > a) Should we assume that given Job has a fixed amount of
> assigned
> > > > > > > resources and make it paramount that
> > > > > > >   Flink doesn't exceed those available resources? So in case of
> > > > > > > backpressure, we
> > > > > > >   should extend checkpointing intervals, emit records less
> > > frequently
> > > > > and
> > > > > > > in batches.
> > > > > > > b) Or should we assume that the amount of resources is flexible
> > (up
> > > > to
> > > > > a
> > > > > > > point?), and the desired e2e latency
> > > > > > >   is the paramount aspect? So in case of backpressure, we
> should
> > > > still
> > > > > > > adhere to the configured e2e latency,
> > > > > > >   and wait for the user or autoscaler to scale up the job?
> > > > > > >
> > > > > > > In case of a), I think the concept of "isProcessingBacklog" is
> > not
> > > > > > needed,
> > > > > > > we could steer the behaviour only
> > > > > > > using the backpressure information.
> > > > > > >
> > > > > > > On the other hand, in case of b), "isProcessingBacklog"
> > information
> > > > > might
> > > > > > > be helpful, to let Flink know that
> > > > > > > we can safely decrease the e2e latency/checkpoint interval even
> > if
> > > > > there
> > > > > > > is no backpressure, to use fewer
> > > > > > > resources (and let the autoscaler scale down the job).
> > > > > > >
> > > > > > > Do we want to have both, or only one of those? Do a) and b)
> > > > complement
> > > > > > one
> > > > > > > another? If job is backpressured,
> > > > > > > we should follow a) and expose to autoscaler/users information
> > > "Hey!
> > > > > I'm
> > > > > > > barely keeping up! I need more resources!".
> > > > > > > While, when there is no backpressure and latency doesn't matter
> > > > > > > (isProcessingBacklog=true), we can limit the resource
> > > > > > > usage.
> > > > > > >
> > > > > > > And a couple of more concrete remarks about the current
> proposal.
> > > > > > >
> > > > > > > 1.
> > > > > > >
> > > > > > > > I think the goal is to allow users to specify an end-to-end
> > > latency
> > > > > > > budget for the job.
> > > > > > >
> > > > > > > I fully agree with this, but in that case, why are you
> proposing
> > to
> > > > add
> > > > > > > `execution.flush.interval`? That's
> > > > > > > yet another parameter that would affect e2e latency, without
> > > actually
> > > > > > > defining it. We already have things
> > > > > > > like: execution.checkpointing.interval,
> execution.buffer-timeout.
> > > I'm
> > > > > > > pretty sure very few Flink users would be
> > > > > > > able to configure or understand all of them.
> > > > > > >
> > > > > > > I think we should simplify configuration and try to define
> > > > > > > "execution.end-to-end-latency" so the runtime
> > > > > > > could derive other things from this new configuration.
> > > > > > >
> > > > > > > 2. How do you envision `#flush()` and `#snapshotState()` to be
> > > > > connected?
> > > > > > > So far, `#snapshotState()`
> > > > > > > was considered as a kind of `#flush()` signal. Do we need both?
> > > > > Shouldn't
> > > > > > > `#flush()` be implicitly or
> > > > > > > explicitly attached to the `#snapshotState()` call?
> > > > > > >
> > > > > > > 3. What about unaligned checkpoints if we have separate
> > `#flush()`
> > > > > > > event/signal?
> > > > > > >
> > > > > > > 4. How should this be working in at-least-once mode (especially
> > > > sources
> > > > > > > that are configured to be working
> > > > > > > in at-least-once mode)?.
> > > > > > >
> > > > > > > 5. How is this FLIP connected with FLI-327? I think they are
> > trying
> > > > to
> > > > > > > achieve basically the same thing:
> > > > > > > optimise when data should be flushed/committed to balance
> between
> > > > > > > throughput and latency.
> > > > > > >
> > > > > > > 6.
> > > > > > >
> > > > > > > > Add RecordAttributesBuilder and RecordAttributes that extends
> > > > > > > StreamElement to provide operator with essential
> > > > > > > > information about the records they receive, such as whether
> the
> > > > > records
> > > > > > > are already stale due to backlog.
> > > > > > >
> > > > > > > Passing along `RecordAttribute` for every `StreamElement` would
> > be
> > > an
> > > > > > > extremely inefficient solution.
> > > > > > >
> > > > > > > If at all, this should be a marker propagated through the
> > JobGraph
> > > > vie
> > > > > > > Events or sent from JM to TMs via an RPC
> > > > > > > that would mark "backlog processing started/ended". Note that
> > > Events
> > > > > > might
> > > > > > > be costly, as they need to be
> > > > > > > broadcasted. So with a job having 5 keyBy exchanges and
> > parallelism
> > > > of
> > > > > > > 1000, the number of events sent is
> > > > > > > ~4 000 000, while the number of RPCs would be only 5000.
> > > > > > >
> > > > > > > In case we want to only check for the backpressure, we don't
> need
> > > any
> > > > > > > extra signal. Operators/subtasks can
> > > > > > > get that information very easily from the TMs runtime.
> > > > > > >
> > > > > > > Best,
> > > > > > > Piotrek
> > > > > > >
> > > > > > > czw., 29 cze 2023 o 17:19 Dong Lin <lindon...@gmail.com>
> > > napisał(a):
> > > > > > >
> > > > > > >> Hi Shammon,
> > > > > > >>
> > > > > > >> Thanks for your comments. Please see my reply inline.
> > > > > > >>
> > > > > > >> On Thu, Jun 29, 2023 at 6:01 PM Shammon FY <zjur...@gmail.com
> >
> > > > wrote:
> > > > > > >>
> > > > > > >> > Hi Dong and Yunfeng,
> > > > > > >> >
> > > > > > >> > Thanks for bringing up this discussion.
> > > > > > >> >
> > > > > > >> > As described in the FLIP, the differences between
> `end-to-end
> > > > > latency`
> > > > > > >> and
> > > > > > >> > `table.exec.mini-batch.allow-latency` are: "It allows users
> to
> > > > > specify
> > > > > > >> the
> > > > > > >> > end-to-end latency, whereas
> > table.exec.mini-batch.allow-latency
> > > > > > applies
> > > > > > >> to
> > > > > > >> > each operator. If there are N operators on the path from
> > source
> > > to
> > > > > > sink,
> > > > > > >> > the end-to-end latency could be up to
> > > > > > >> table.exec.mini-batch.allow-latency *
> > > > > > >> > N".
> > > > > > >> >
> > > > > > >> > If I understand correctly,
> > `table.exec.mini-batch.allow-latency`
> > > > is
> > > > > > also
> > > > > > >> > applied to the end-to-end latency for a job, maybe @Jack Wu
> > can
> > > > give
> > > > > > >> more
> > > > > > >> > information.
> > > > > > >> >
> > > > > > >>
> > > > > > >> Based on what I can tell from the doc/code and offline
> > > discussion, I
> > > > > > >> believe table.exec.mini-batch.allow-latency is not applied to
> > the
> > > > > > >> end-to-end latency for a job.
> > > > > > >>
> > > > > > >> It is mentioned here
> > > > > > >> <
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/config/
> > > > > > >> >
> > > > > > >> that
> > > > > > >> table.exec.mini-batch.allow-latency is "the maximum latency
> can
> > be
> > > > > used
> > > > > > >> for
> > > > > > >> MiniBatch to buffer input records". I think we should have
> > > mentioned
> > > > > > that
> > > > > > >> the config is applied to the end-to-end latency in this doc if
> > it
> > > is
> > > > > > >> indeed
> > > > > > >> the case.
> > > > > > >>
> > > > > > >>
> > > > > > >> > So, from my perspective, and please correct me if I'm
> > > > misunderstand,
> > > > > > the
> > > > > > >> > targets of this FLIP may include the following:
> > > > > > >> >
> > > > > > >> > 1. Support a mechanism like  `mini-batch` in SQL for
> > > `DataStream`,
> > > > > > which
> > > > > > >> > will collect data in the operator and flush data when it
> > > receives
> > > > a
> > > > > > >> `flush`
> > > > > > >> > event, in the FLIP it is `FlushEvent`.
> > > > > > >> >
> > > > > > >>
> > > > > > >> I think the goal is to allow users to specify an end-to-end
> > > latency
> > > > > > budget
> > > > > > >> for the job. IMO it is quite different from the `mini-batch`
> in
> > > SQL.
> > > > > > >>
> > > > > > >>
> > > > > > >> >
> > > > > > >> > 2. Support dynamic `latency` according to the progress of
> job,
> > > > such
> > > > > as
> > > > > > >> > snapshot stage and after that.
> > > > > > >> >
> > > > > > >> > To do that, I have some questions:
> > > > > > >> >
> > > > > > >> > 1. I didn't understand the purpose of public interface
> > > > > > >> `RecordAttributes`.
> > > > > > >> > I think `FlushEvent` in the FLIP is enough, and different
> > > > > > >> > `DynamicFlushStrategy` can be added to generate flush events
> > to
> > > > > > address
> > > > > > >> > different needs, such as a static interval similar to
> > mini-batch
> > > > in
> > > > > > SQL
> > > > > > >> or
> > > > > > >> > collect the information `isProcessingBacklog` and metrics to
> > > > > generate
> > > > > > >> > `FlushEvent` which is described in your FLIP? If hudi sink
> > needs
> > > > the
> > > > > > >> > `isBacklog` flag, the hudi `SplitEnumerator` can create an
> > > > operator
> > > > > > >> event
> > > > > > >> > and send it to hudi source reader.
> > > > > > >> >
> > > > > > >>
> > > > > > >> Suppose we only have FlushEvent, then operators (e.g. Hudi
> Sink)
> > > > will
> > > > > > not
> > > > > > >> know they can buffer data in the following scenario:
> > > > > > >>
> > > > > > >> - execution.allowed-latency is not configured and use the
> > default
> > > > > value
> > > > > > >> null.
> > > > > > >> - The job is reading from HybridSource and HybridSource says
> > > > > > >> isBacklog=true.
> > > > > > >>
> > > > > > >> Also note that Hudi Sink might not be the only operators that
> > can
> > > > > > benefit
> > > > > > >> from knowing isBacklog=true. Other sinks and aggregation
> > operators
> > > > > (e.g.
> > > > > > >> CoGroup) can also increase throughput by buffering/sorting
> > records
> > > > > when
> > > > > > >> there is backlog. So it seems simpler to pass RecordAttributes
> > to
> > > > > these
> > > > > > >> operators than asking every operator developer to create
> > operator
> > > > > event
> > > > > > >> etc.
> > > > > > >>
> > > > > > >>
> > > > > > >> >
> > > > > > >> > 2. How is this new mechanism unified with SQL's mini-batch
> > > > > mechanism?
> > > > > > As
> > > > > > >> > far as I am concerned, SQL implements mini-batch mechanism
> > based
> > > > on
> > > > > > >> > watermark, I think it is very unreasonable to have two
> > different
> > > > > > >> > implementation in SQL and DataStream.
> > > > > > >> >
> > > > > > >>
> > > > > > >> I think we can deprecate table.exec.mini-batch.allow-latency
> > later
> > > > > > >> once execution.allowed-latency is ready for production usage.
> > This
> > > > is
> > > > > > >> mentioned in the "Compatibility, Deprecation, and Migration
> > Plan"
> > > > > > section.
> > > > > > >>
> > > > > > >> If there is a config that supports user specifying the e2e
> > > latency,
> > > > it
> > > > > > is
> > > > > > >> probably reasonable for this config to work for both
> DataStream
> > > and
> > > > > SQL.
> > > > > > >>
> > > > > > >>
> > > > > > >> > 3. I notice that the `CheckpointCoordinator` will generate
> > > > > > `FlushEvent`,
> > > > > > >> > which information about `FlushEvent` will be stored in
> > > > > > >> >
> > > > > > >>
> > > > > > >> CheckpointCoordinator might need to send FlushEvent before
> > > > triggering
> > > > > > >> checkpoint in order to deal with the two-phase commit sinks.
> The
> > > > > > algorithm
> > > > > > >> is specified in the "Proposed Changes" section.
> > > > > > >>
> > > > > > >>
> > > > > > >> > `Checkpoint`? What is the alignment strategy for FlushEvent
> in
> > > the
> > > > > > >> > operator? The operator will flush the data when it receives
> > all
> > > > > > >> > `FlushEvent` from upstream with the same ID or do flush for
> > each
> > > > > > >> > `FlushEvent`? Can you give more detailed proposal about
> that?
> > We
> > > > > also
> > > > > > >> have
> > > > > > >> > a demand for this piece, thanks
> > > > > > >> >
> > > > > > >>
> > > > > > >> After an operator has received a FlushEvent:
> > > > > > >> - If the ID of the received FlushEvent is larger than the
> > largest
> > > ID
> > > > > > this
> > > > > > >> operator has received, then flush() is triggered for this
> > operator
> > > > and
> > > > > > the
> > > > > > >> operator should broadcast FlushEvent to downstream operators.
> > > > > > >> - Otherwise, this FlushEvent is ignored.
> > > > > > >>
> > > > > > >> This behavior is specified in the Java doc of the FlushEvent.
> > > > > > >>
> > > > > > >> Can you see if this answers your questions?
> > > > > > >>
> > > > > > >> Best,
> > > > > > >> Dong
> > > > > > >>
> > > > > > >>
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > Best,
> > > > > > >> > Shammon FY
> > > > > > >> >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > On Thu, Jun 29, 2023 at 4:35 PM Martijn Visser <
> > > > > > >> martijnvis...@apache.org>
> > > > > > >> > wrote:
> > > > > > >> >
> > > > > > >> >> Hi Dong and Yunfeng,
> > > > > > >> >>
> > > > > > >> >> Thanks for the FLIP. What's not clear for me is what's the
> > > > expected
> > > > > > >> >> behaviour when the allowed latency can't be met, for
> whatever
> > > > > reason.
> > > > > > >> >> Given that we're talking about an "allowed latency", it
> > implies
> > > > > that
> > > > > > >> >> something has gone wrong and should fail? Isn't this more a
> > > > minimum
> > > > > > >> >> latency that you're proposing?
> > > > > > >> >>
> > > > > > >> >> There's also the part about the Hudi Sink processing
> records
> > > > > > >> >> immediately upon arrival. Given that the SinkV2 API
> provides
> > > the
> > > > > > >> >> ability for custom post and pre-commit topologies [1],
> > > > specifically
> > > > > > >> >> targeted to avoid generating multiple small files, why
> isn't
> > > that
> > > > > > >> >> sufficient for the Hudi Sink? It would be great to see that
> > > added
> > > > > > >> >> under Rejected Alternatives if this is indeed not
> sufficient.
> > > > > > >> >>
> > > > > > >> >> Best regards,
> > > > > > >> >>
> > > > > > >> >> Martijn
> > > > > > >> >>
> > > > > > >> >> [1]
> > > > > > >> >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction
> > > > > > >> >>
> > > > > > >> >> On Sun, Jun 25, 2023 at 4:25 AM Yunfeng Zhou
> > > > > > >> >> <flink.zhouyunf...@gmail.com> wrote:
> > > > > > >> >> >
> > > > > > >> >> > Hi all,
> > > > > > >> >> >
> > > > > > >> >> > Dong(cc'ed) and I are opening this thread to discuss our
> > > > proposal
> > > > > > to
> > > > > > >> >> > support configuring end-to-end allowed latency for Flink
> > > jobs,
> > > > > > which
> > > > > > >> >> > has been documented in FLIP-325
> > > > > > >> >> > <
> > > > > > >> >>
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-325%3A+Support+configuring+end-to-end+allowed+latency
> > > > > > >> >> >.
> > > > > > >> >> >
> > > > > > >> >> > By configuring the latency requirement for a Flink job,
> > users
> > > > > would
> > > > > > >> be
> > > > > > >> >> > able to optimize the throughput and overhead of the job
> > while
> > > > > still
> > > > > > >> >> > acceptably increasing latency. This approach is
> > particularly
> > > > > useful
> > > > > > >> >> > when dealing with records that do not require immediate
> > > > > processing
> > > > > > >> and
> > > > > > >> >> > emission upon arrival.
> > > > > > >> >> >
> > > > > > >> >> > Please refer to the FLIP document for more details about
> > the
> > > > > > proposed
> > > > > > >> >> > design and implementation. We welcome any feedback and
> > > opinions
> > > > > on
> > > > > > >> >> > this proposal.
> > > > > > >> >> >
> > > > > > >> >> > Best regards.
> > > > > > >> >> >
> > > > > > >> >> > Dong and Yunfeng
> > > > > > >> >>
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to