Hi Dong,

Thanks for your clarification.


> Actually, I think it could make sense to toggle isBacklog between true and
> false while the job is running.
>

If isBacklog is toggled too often back and forth(e.g. by unexpected
mistake, unstable system, etc), a large amount of RecordAttributes might be
triggered, which will lead to performance issues. This should not be the
right way to use RecordAttributes right? Devs and users should be aware of
it and know how to monitor, maintain, and fix issues.

Your reply contains valuable information. It might make sense to add them
into the FLIP:

1. It is up to the operator to decide when to emit RecordAttributes. But
devs and users should be aware that the number of RecordAttributes should
not be too high to cause performance issues.
2. Although users can decide how to configure them, the end-to-end latency
should be (commonly?) configured lower than the checkpoint interval.
3. The three ways you mentioned for how to derive isBacklog.

WDYT?

Best regards,
Jing


On Fri, Jul 7, 2023 at 3:13 AM Dong Lin <lindon...@gmail.com> wrote:

> Hi Jing,
>
> Thanks for the comments. Please see my reply inline.
>
> On Fri, Jul 7, 2023 at 5:40 AM Jing Ge <j...@ververica.com.invalid> wrote:
>
> > Hi,
> >
> > Thank you all for the inspired discussion. Really appreciate it!
> >
> > @Dong I'd like to ask some (stupid) questions to make sure I understand
> > your thoughts correctly.
> >
> > 1. It will make no sense to send the same type of RecordAttributes right?
> > e.g.  if one RecordAttributes(isBacklog=true) has been sent, a new
> > RecordAttributes will be only sent when isBacklog is changed to be false,
> > and vice versa. In this way, the number of RecordAttributes will be very
> > limited.
> >
>
> Yes, you are right. Actually, this is what we plan to do when we update
> operators to emit RecordAttributes via `Output#emitRecordAttributes()`.
>
> Note that the FLIP does not specify the frequency of how operators should
> invoke `Output#emitRecordAttributes()`. It is up to the operator
> to decide when to emit RecordAttributes.
>
>
> > 2. Since source readers can invoke Output#emitRecordAttributes to emit
> > RecordAttributes(isBacklog=true/false), it might be weird to send
> > RecordAttributes with different isBacklog back and forth too often. Devs
> > and users should pay attention to it. Something is wrong when such a
> thing
> > happens(metrics for monitoring?). Is this correct?
> >
>
>

> Actually, I think it could make sense to toggle isBacklog between true and
> false while the job is running.
>
>

> Suppose the job is reading from user-action data from Kafka and there is a
> traffic spike for 2 hours. If the job keeps running in pure stream mode,
> the watermark lag might keep increasing during this period because the
> job's processing capability can not catch up with the Kafka input
> throughput. In this case, it can be beneficial to dynamically switch
> isBacklog to true when watermarkLag exceeds a given threshold (e.g. 5
> minutes), and switch isBacklog to false again when the watermarkLag is low
> enough (30 seconds).
>
>
> > 3. Is there any relationship between end-to-end-latency and checkpoint
> > interval that users should pay attention to? In the example described in
> > the FLIP, both have the same value, 2 min. What about end-to-end-latency
> is
> > configured bigger than checkpoint interval? Could checkpoint between
> > end-to-end-latency be skipped?
> >
>
> This FLIP would not enforce any relationship between end-to-end latency and
> checkpoint interval. Users are free to configure end-to-end latency to be
> bigger than checkpoint interval.
>
> I don't think there exists any use-case which requires end-to-end latency
> to be higher than the checkpoint interval. Note that introducing a
> relationship between these two configs would increase code complexity and
> also make the documentation of these configs a bit more complex for users
> to understand.
>
> Since there is no correctness when a user sets end-to-end latency to be
> bigger than the checkpointing interval, I think it is simpler to just let
> the user decide how to configure them.
>
>
> > 4. Afaiu, one major discussion point is that isBacklog can be derived
> from
> > back pressure and there will be no need of RecordAttributes. In case a
> > Flink job has rich resources that there is no back pressure (it will be
> > difficult to perfectly have just enough resources that everything is fine
> > but will have back pressure only for backlog) but we want to improve the
> > throughput. We then need some other ways to derive isBacklog. That is the
> > reason why RecordAttributes has been introduced. Did I understand it
> > correctly?
> >
>
> I think there can be multiple ways to derive isBackog, including:
> 1) Based on the source operator's state. For example, when MySQL CDC source
> is reading snapshot, it can claim isBacklog=true.
> 2) Based on the watermarkLag in the source. For example, when system_time -
> watermark > user_specified_threshold, then isBacklog=true.
> 3) Based on metrics. For example, when busyTimeMsPerSecond (or
> backPressuredTimeMsPerSecond) > user_specified_threshold, then
> isBacklog=true.
>
> Note that there are pros/cons between these choices and none of them can
> best fit all use-cases. For example, since option-1 does not require any
> extra user-specified threshold, it can be the best choice when we want to
> improve user's existing job (e.g. the one in the motivation section)
> without extra user configuration.
>
> For use-cases which want to increase throughput when reading backlog data
> from Kafka, option-2 can be the best choice because a threshold based on
> the watermark lag is easier to understand and configure than configuring
> threshold based on the backPressuredTimeMsPerSecond.
>
> option-3 might be the only choice when option-1 and option-2 are not
> available for the given use-cases. But it might be harder to configure a
> threshold against backPressuredTimeMsPerSecond. This is because the choice
> of the percentage (or ms per second) threshold will mostly be empirical and
> approximate. For example, should the user configure this to be 100%, 99%,
> or 90%? I would prefer not to have user worry about this if option-1 or
> option-2 can be used.
>
> RecordAttributes would be necessary in order to support option-1 and
> option-2 well.
>
>
> > 5. NIT: Just like we talked about in another thread, JavaBean naming
> > convention is recommended, i.e. isBacklog() & setBacklog() instead of
> > getIsBacklog() and setIsBacklog().
> >
>
> Yeah, thanks for the suggestion. I have updated the FLIP as suggested.
>
> Best,
> Dong
>
>
> >
> > Best regards,
> > Jing
> >
> > On Thu, Jul 6, 2023 at 2:38 PM Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hi Shammon,
> > >
> > > Thanks for your comments. Please see my reply inline.
> > >
> > >
> > > On Thu, Jul 6, 2023 at 12:47 PM Shammon FY <zjur...@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > Thanks for your replay @Dong. I really agree with Piotr's points and
> I
> > > > would like to share some thoughts from my side.
> > > >
> > > > About the latency for mini-batch mechanism in Flink SQL, I still
> think
> > > the
> > > > description in the FLIP is not right. If there are N operators and
> the
> > > > whole process time for data in the job is `t`, then the latency in
> > > > mini-batch will be `table.exec.mini-batch.allow-latency`+`t`, not `
> > > > table.exec.mini-batch.allow-latency`*N. I think this is one of the
> > > > foundations of this FLIP, and you may need to confirm it again.
> > > >
> > >
> > > Given that we agree to have a mechanism to support end-to-end latency
> for
> > > DataStream programs, I think the exact semantics of
> > > table.exec.mini-batch.allow-latency will not affect the motivation or
> API
> > > design of this FLIP. I have updated the FLIP to remove any mention of
> > > table.exec.mini-batch.allow-latency.
> > >
> > >
> > > >
> > > > I think supporting similar mechanisms in the runtime and balance
> > latency
> > > > and throughput dynamically for all flink jobs is a very good idea,
> and
> > I
> > > > have some questions for that.
> > > >
> > > > 1. We encounter a situation where the workload is high when
> processing
> > > > snapshot data and we need mini-batch in sql for performance reason.
> But
> > > the
> > > > workload is low when processing delta data, we need to automatically
> > > adjust
> > > > the mini-batch SQL for them, or even cancel the mini-batch during
> delta
> > > > processing. I think this FLIP meets our needs, but I think we need a
> > > > general solution which covers all source types in flink, and the
> > > > `isBacklog` in the FLIP is only one strategy.
> > > >
> > >
> > > The focus of this FLIP is to allow Flink runtime to adjust the behavior
> > of
> > > operators (e.g. the buffer time) based on the IsBacklog status of
> sources
> > > and the user-specified execution.end-to-end-latency (effective only
> when
> > > there is no backlog). The FLIP assumes there is already a strategy for
> > > sources to determine the IsProcessingBacklog status without adding more
> > > strategies.
> > >
> > > I agree it is useful to introduce more strategies to determine the the
> > > IsProcessingBacklog status for sources. We can determine the
> > > IsProcessingBacklog status based on the backpressure metrics, the
> > > event-time watermark lag, or anything we find reasonable. I would like
> to
> > > work on this in follow-up FLIPs and that we don't work on too many
> things
> > > in the same FLIP.
> > >
> > > Would this be OK with you?
> > >
> > >
> > > > From the FLIP I think there should be two parts: dynamic trigger
> flush
> > > > event in JM and dynamic trigger flush operations in Operator. We need
> > to
> > > > introduce much more general interfaces for them, such as
> > > > `DynamicFlushStrategy` in JM and `DynamicFlushOperation` in TM? As
> > Piotr
> > > > mentioned above, we can collect many information from TM locally such
> > as
> > > > backpressure, queue size and `Operator` can decide whether to buffer
> > data
> > > > or process it immediately.  JM is also the same, it can decide to
> send
> > > > flush events on a regular basis or send them based on the collected
> > > metrics
> > > > information and other information, such as the isBacklog in the FLIP.
> > > >
> > > > 2. I really don't get enough benefits for `RecordAttribute` in the
> FLIP
> > > and
> > > > as Piotr mentioned above too, it will generate a large number of
> > > messages,
> > > >
> > >
> > > If there is any sentence in the FLIP that suggests we will emit a lot
> of
> > > RecordAttribute, sorry for that and I would fix it.
> > >
> > > Currently, the FLIP provides the `Output#emitRecordAttributes()` for
> > > operators (e.g. source reader) to emit RecordAttributes. The FLIP
> leaves
> > > the operator to decide the frequency and value of the emitted
> > > RecordAttributes.
> > >
> > > Our plan is to let SourceReader emit RecordAttributes only when its
> value
> > > (e.g. isBacklog) differs from the value of the RecordAttributes it has
> > > emitted earlier. It should avoid resending RecordAttributes with the
> same
> > > value, similar to how Flink currently avoids resending
> > > Watermark/WatermarkStatus with the same value.
> > >
> > > Would it address your concern?
> > >
> > >
> > > > affecting performance. FLIP mentions that it will be applied to
> > Operator
> > > > and Sink, I try to understand it's role and please correct me if I'm
> > > wrong.
> > > > a) It tells the Operator and Sink that current most of data they are
> > > > processing are from snapshot and are "insert" data? For the out of
> > order
> > > in
> > > > flink, the Operator and Sink may receive "upsert" data from other
> > > sources.
> > > >
> > >
> > > The RecordAttributes currently proposed in the FLIP only provides the
> > > IsBacklog information, which tells the operator (including sink
> operator)
> > > whether the records received after this RercordAttributes event are
> > > "backlog". Note that snapshot (e.g. MySQL CDC snapshot) is one
> particular
> > > case which can be classified as backlog. But we might introduce more
> > > strategies to classify recods as backlog in the future.
> > >
> > > Currently, RecordAttributes does not specify whether the following
> > records
> > > are insert-only or upsert. We might introduce such an atttribute if
> there
> > > is a good use-case for having it.
> > >
> > >
> > > > b) Do Operators and Sink perform any very special operations in the
> > above
> > > > situations? What are the benefits of this special operations for
> "most
> > > data
> > > > are insert"?
> > > >
> > >
> > > Hmm.. I don't think the FLIP says something like "most data are
> insert".
> > > Could you clarify which part of the FLIP you are talking about?
> > >
> > > I was told that Hudi Sink can have much higher throughput if all its
> > inputs
> > > are insert-only. One point in the FLIP is that Hudi Sink can take
> > advantage
> > > of the features proposed in the FLIP to increase its throughput when
> > source
> > > are reading backlog data (e.g. MySQL CDC snapshot).
> > >
> > >
> > >
> > > > c) I think the operator and sink can collect the above information
> > > locally
> > > > when it receives each record without the need for `RecordAttribute`
> > even
> > > > when we need some special operations.
> > > >
> > >
> > > Let's say the job is reading from MySQL CDC, we want to process records
> > in
> > > snapshot phase with high throughput (and likely high latency), and
> > process
> > > records in the binlog phase with low processing latency.
> > >
> > > We can achieve this goal by propagating RecordAttributes from source to
> > > downstream operators. This allows an operator to know exactly that
> those
> > > records received before a RecordAttributes(isBacklog=false) can be
> > > processed with high latency, and those records received after a
> > > RecordAttributes(isBacklog=false) should be processed with low latency.
> > >
> > > Could you help explain how to achieve this goal without
> > `RecordAttribute`?
> > >
> > >
> > > > d) Even if we do need a `RecordAttribute` events in Operator and
> Sink,
> > I
> > > > think broadcast them from JM is a better choice.
> > > >
> > >
> > > Suppose we broadcast it from JM. When MySQL CDC source switches from
> the
> > > snapshot phase to binlog phase, the RecordAttributes(IsBacklog=false)
> > might
> > > arrive at an operator while it is still processing a queue of records
> > from
> > > the snapshot phase, causing the operator to reduce its buffer time (and
> > > throughput) earlier than expected. It might not be a big deal when this
> > > interval is short. But given that propagating RecordAttributes from
> > sources
> > > to downstream operators does not have much overhead (no more than the
> > > periodic Watermark), it seems useful to make the operator behavior more
> > > accurate.
> > >
> > > What do you think?
> > >
> > >
> > > >
> > > > 3. For the flush event, I also have some questions. What type of
> > > operators
> > > > need to buffer data and flush them based on the flush events? In SQL
> > > >
> > >
> > > As you mentioned above, typically those operators that rely heavily on
> > > statebackend (e.g. co-group, aggregate, join) can increase throughput
> by
> > > buffering data. Operators which can not benefit from flush, such as
> Map,
> > do
> > > not need to be updated.
> > >
> > > It is mentioned in the FLIP that we will "update operators in Flink to
> > > override processRecordAttributes() if that helps improve job
> performance
> > > without sacrificing correctness (e.g. processing latency)".
> > >
> > >
> > > > mini-batch mechanism, similar processing will be added for the
> > aggregate
> > > > and join operators, while for operators such as map, it is not
> > necessary.
> > > > How can we identify different operator in the runtime layer (`Input`
> > and
> > > > `TwoInputStreamOperator`)? I think buffer data in Map/FlatMap/Filter
> > > >
> > >
> > > I am not sure I understand this question. Are you asking how to
> > > automatically identify the operators that can benefit from the flush
> > > operation?
> > >
> > > I think we will need to manually identify and optimize operators on a
> > > case-by-case basis. There is no good systematic way to automatically
> > > identify and optimize all such operators. An operator's implementation
> > > needs to be manually updated and benchmarked before we commit the
> > > corresponding code change.
> > >
> > > What do you think?
> > >
> > > Best,
> > > Dong
> > >
> > >
> > > > Operator is not a good idea which makes data no longer flowing.
> > > >
> > >
> > > >
> > > > Best,
> > > > Shammon FY
> > > >
> > > >
> > > > On Thu, Jul 6, 2023 at 1:54 AM Piotr Nowojski <
> > piotr.nowoj...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > Thanks for this proposal, this is a very much needed thing that
> > should
> > > be
> > > > > addressed in Flink.
> > > > >
> > > > > I think there is one thing that hasn't been discussed neither here
> > nor
> > > in
> > > > > FLIP-309. Given that we have
> > > > > three dimensions:
> > > > > - e2e latency/checkpointing interval
> > > > > - enabling some kind of batching/buffering on the operator level
> > > > > - how much resources we want to allocate to the job
> > > > >
> > > > > How do we want Flink to adjust itself between those three? For
> > example:
> > > > > a) Should we assume that given Job has a fixed amount of assigned
> > > > > resources and make it paramount that
> > > > >   Flink doesn't exceed those available resources? So in case of
> > > > > backpressure, we
> > > > >   should extend checkpointing intervals, emit records less
> frequently
> > > and
> > > > > in batches.
> > > > > b) Or should we assume that the amount of resources is flexible (up
> > to
> > > a
> > > > > point?), and the desired e2e latency
> > > > >   is the paramount aspect? So in case of backpressure, we should
> > still
> > > > > adhere to the configured e2e latency,
> > > > >   and wait for the user or autoscaler to scale up the job?
> > > > >
> > > > > In case of a), I think the concept of "isProcessingBacklog" is not
> > > > needed,
> > > > > we could steer the behaviour only
> > > > > using the backpressure information.
> > > > >
> > > > > On the other hand, in case of b), "isProcessingBacklog" information
> > > might
> > > > > be helpful, to let Flink know that
> > > > > we can safely decrease the e2e latency/checkpoint interval even if
> > > there
> > > > > is no backpressure, to use fewer
> > > > > resources (and let the autoscaler scale down the job).
> > > > >
> > > > > Do we want to have both, or only one of those? Do a) and b)
> > complement
> > > > one
> > > > > another? If job is backpressured,
> > > > > we should follow a) and expose to autoscaler/users information
> "Hey!
> > > I'm
> > > > > barely keeping up! I need more resources!".
> > > > > While, when there is no backpressure and latency doesn't matter
> > > > > (isProcessingBacklog=true), we can limit the resource
> > > > > usage.
> > > > >
> > > > > And a couple of more concrete remarks about the current proposal.
> > > > >
> > > > > 1.
> > > > >
> > > > > > I think the goal is to allow users to specify an end-to-end
> latency
> > > > > budget for the job.
> > > > >
> > > > > I fully agree with this, but in that case, why are you proposing to
> > add
> > > > > `execution.flush.interval`? That's
> > > > > yet another parameter that would affect e2e latency, without
> actually
> > > > > defining it. We already have things
> > > > > like: execution.checkpointing.interval, execution.buffer-timeout.
> I'm
> > > > > pretty sure very few Flink users would be
> > > > > able to configure or understand all of them.
> > > > >
> > > > > I think we should simplify configuration and try to define
> > > > > "execution.end-to-end-latency" so the runtime
> > > > > could derive other things from this new configuration.
> > > > >
> > > > > 2. How do you envision `#flush()` and `#snapshotState()` to be
> > > connected?
> > > > > So far, `#snapshotState()`
> > > > > was considered as a kind of `#flush()` signal. Do we need both?
> > > Shouldn't
> > > > > `#flush()` be implicitly or
> > > > > explicitly attached to the `#snapshotState()` call?
> > > > >
> > > > > 3. What about unaligned checkpoints if we have separate `#flush()`
> > > > > event/signal?
> > > > >
> > > > > 4. How should this be working in at-least-once mode (especially
> > sources
> > > > > that are configured to be working
> > > > > in at-least-once mode)?.
> > > > >
> > > > > 5. How is this FLIP connected with FLI-327? I think they are trying
> > to
> > > > > achieve basically the same thing:
> > > > > optimise when data should be flushed/committed to balance between
> > > > > throughput and latency.
> > > > >
> > > > > 6.
> > > > >
> > > > > > Add RecordAttributesBuilder and RecordAttributes that extends
> > > > > StreamElement to provide operator with essential
> > > > > > information about the records they receive, such as whether the
> > > records
> > > > > are already stale due to backlog.
> > > > >
> > > > > Passing along `RecordAttribute` for every `StreamElement` would be
> an
> > > > > extremely inefficient solution.
> > > > >
> > > > > If at all, this should be a marker propagated through the JobGraph
> > vie
> > > > > Events or sent from JM to TMs via an RPC
> > > > > that would mark "backlog processing started/ended". Note that
> Events
> > > > might
> > > > > be costly, as they need to be
> > > > > broadcasted. So with a job having 5 keyBy exchanges and parallelism
> > of
> > > > > 1000, the number of events sent is
> > > > > ~4 000 000, while the number of RPCs would be only 5000.
> > > > >
> > > > > In case we want to only check for the backpressure, we don't need
> any
> > > > > extra signal. Operators/subtasks can
> > > > > get that information very easily from the TMs runtime.
> > > > >
> > > > > Best,
> > > > > Piotrek
> > > > >
> > > > > czw., 29 cze 2023 o 17:19 Dong Lin <lindon...@gmail.com>
> napisał(a):
> > > > >
> > > > >> Hi Shammon,
> > > > >>
> > > > >> Thanks for your comments. Please see my reply inline.
> > > > >>
> > > > >> On Thu, Jun 29, 2023 at 6:01 PM Shammon FY <zjur...@gmail.com>
> > wrote:
> > > > >>
> > > > >> > Hi Dong and Yunfeng,
> > > > >> >
> > > > >> > Thanks for bringing up this discussion.
> > > > >> >
> > > > >> > As described in the FLIP, the differences between `end-to-end
> > > latency`
> > > > >> and
> > > > >> > `table.exec.mini-batch.allow-latency` are: "It allows users to
> > > specify
> > > > >> the
> > > > >> > end-to-end latency, whereas table.exec.mini-batch.allow-latency
> > > > applies
> > > > >> to
> > > > >> > each operator. If there are N operators on the path from source
> to
> > > > sink,
> > > > >> > the end-to-end latency could be up to
> > > > >> table.exec.mini-batch.allow-latency *
> > > > >> > N".
> > > > >> >
> > > > >> > If I understand correctly, `table.exec.mini-batch.allow-latency`
> > is
> > > > also
> > > > >> > applied to the end-to-end latency for a job, maybe @Jack Wu can
> > give
> > > > >> more
> > > > >> > information.
> > > > >> >
> > > > >>
> > > > >> Based on what I can tell from the doc/code and offline
> discussion, I
> > > > >> believe table.exec.mini-batch.allow-latency is not applied to the
> > > > >> end-to-end latency for a job.
> > > > >>
> > > > >> It is mentioned here
> > > > >> <
> > > > >>
> > > >
> > >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/config/
> > > > >> >
> > > > >> that
> > > > >> table.exec.mini-batch.allow-latency is "the maximum latency can be
> > > used
> > > > >> for
> > > > >> MiniBatch to buffer input records". I think we should have
> mentioned
> > > > that
> > > > >> the config is applied to the end-to-end latency in this doc if it
> is
> > > > >> indeed
> > > > >> the case.
> > > > >>
> > > > >>
> > > > >> > So, from my perspective, and please correct me if I'm
> > misunderstand,
> > > > the
> > > > >> > targets of this FLIP may include the following:
> > > > >> >
> > > > >> > 1. Support a mechanism like  `mini-batch` in SQL for
> `DataStream`,
> > > > which
> > > > >> > will collect data in the operator and flush data when it
> receives
> > a
> > > > >> `flush`
> > > > >> > event, in the FLIP it is `FlushEvent`.
> > > > >> >
> > > > >>
> > > > >> I think the goal is to allow users to specify an end-to-end
> latency
> > > > budget
> > > > >> for the job. IMO it is quite different from the `mini-batch` in
> SQL.
> > > > >>
> > > > >>
> > > > >> >
> > > > >> > 2. Support dynamic `latency` according to the progress of job,
> > such
> > > as
> > > > >> > snapshot stage and after that.
> > > > >> >
> > > > >> > To do that, I have some questions:
> > > > >> >
> > > > >> > 1. I didn't understand the purpose of public interface
> > > > >> `RecordAttributes`.
> > > > >> > I think `FlushEvent` in the FLIP is enough, and different
> > > > >> > `DynamicFlushStrategy` can be added to generate flush events to
> > > > address
> > > > >> > different needs, such as a static interval similar to mini-batch
> > in
> > > > SQL
> > > > >> or
> > > > >> > collect the information `isProcessingBacklog` and metrics to
> > > generate
> > > > >> > `FlushEvent` which is described in your FLIP? If hudi sink needs
> > the
> > > > >> > `isBacklog` flag, the hudi `SplitEnumerator` can create an
> > operator
> > > > >> event
> > > > >> > and send it to hudi source reader.
> > > > >> >
> > > > >>
> > > > >> Suppose we only have FlushEvent, then operators (e.g. Hudi Sink)
> > will
> > > > not
> > > > >> know they can buffer data in the following scenario:
> > > > >>
> > > > >> - execution.allowed-latency is not configured and use the default
> > > value
> > > > >> null.
> > > > >> - The job is reading from HybridSource and HybridSource says
> > > > >> isBacklog=true.
> > > > >>
> > > > >> Also note that Hudi Sink might not be the only operators that can
> > > > benefit
> > > > >> from knowing isBacklog=true. Other sinks and aggregation operators
> > > (e.g.
> > > > >> CoGroup) can also increase throughput by buffering/sorting records
> > > when
> > > > >> there is backlog. So it seems simpler to pass RecordAttributes to
> > > these
> > > > >> operators than asking every operator developer to create operator
> > > event
> > > > >> etc.
> > > > >>
> > > > >>
> > > > >> >
> > > > >> > 2. How is this new mechanism unified with SQL's mini-batch
> > > mechanism?
> > > > As
> > > > >> > far as I am concerned, SQL implements mini-batch mechanism based
> > on
> > > > >> > watermark, I think it is very unreasonable to have two different
> > > > >> > implementation in SQL and DataStream.
> > > > >> >
> > > > >>
> > > > >> I think we can deprecate table.exec.mini-batch.allow-latency later
> > > > >> once execution.allowed-latency is ready for production usage. This
> > is
> > > > >> mentioned in the "Compatibility, Deprecation, and Migration Plan"
> > > > section.
> > > > >>
> > > > >> If there is a config that supports user specifying the e2e
> latency,
> > it
> > > > is
> > > > >> probably reasonable for this config to work for both DataStream
> and
> > > SQL.
> > > > >>
> > > > >>
> > > > >> > 3. I notice that the `CheckpointCoordinator` will generate
> > > > `FlushEvent`,
> > > > >> > which information about `FlushEvent` will be stored in
> > > > >> >
> > > > >>
> > > > >> CheckpointCoordinator might need to send FlushEvent before
> > triggering
> > > > >> checkpoint in order to deal with the two-phase commit sinks. The
> > > > algorithm
> > > > >> is specified in the "Proposed Changes" section.
> > > > >>
> > > > >>
> > > > >> > `Checkpoint`? What is the alignment strategy for FlushEvent in
> the
> > > > >> > operator? The operator will flush the data when it receives all
> > > > >> > `FlushEvent` from upstream with the same ID or do flush for each
> > > > >> > `FlushEvent`? Can you give more detailed proposal about that? We
> > > also
> > > > >> have
> > > > >> > a demand for this piece, thanks
> > > > >> >
> > > > >>
> > > > >> After an operator has received a FlushEvent:
> > > > >> - If the ID of the received FlushEvent is larger than the largest
> ID
> > > > this
> > > > >> operator has received, then flush() is triggered for this operator
> > and
> > > > the
> > > > >> operator should broadcast FlushEvent to downstream operators.
> > > > >> - Otherwise, this FlushEvent is ignored.
> > > > >>
> > > > >> This behavior is specified in the Java doc of the FlushEvent.
> > > > >>
> > > > >> Can you see if this answers your questions?
> > > > >>
> > > > >> Best,
> > > > >> Dong
> > > > >>
> > > > >>
> > > > >> >
> > > > >> >
> > > > >> > Best,
> > > > >> > Shammon FY
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> > On Thu, Jun 29, 2023 at 4:35 PM Martijn Visser <
> > > > >> martijnvis...@apache.org>
> > > > >> > wrote:
> > > > >> >
> > > > >> >> Hi Dong and Yunfeng,
> > > > >> >>
> > > > >> >> Thanks for the FLIP. What's not clear for me is what's the
> > expected
> > > > >> >> behaviour when the allowed latency can't be met, for whatever
> > > reason.
> > > > >> >> Given that we're talking about an "allowed latency", it implies
> > > that
> > > > >> >> something has gone wrong and should fail? Isn't this more a
> > minimum
> > > > >> >> latency that you're proposing?
> > > > >> >>
> > > > >> >> There's also the part about the Hudi Sink processing records
> > > > >> >> immediately upon arrival. Given that the SinkV2 API provides
> the
> > > > >> >> ability for custom post and pre-commit topologies [1],
> > specifically
> > > > >> >> targeted to avoid generating multiple small files, why isn't
> that
> > > > >> >> sufficient for the Hudi Sink? It would be great to see that
> added
> > > > >> >> under Rejected Alternatives if this is indeed not sufficient.
> > > > >> >>
> > > > >> >> Best regards,
> > > > >> >>
> > > > >> >> Martijn
> > > > >> >>
> > > > >> >> [1]
> > > > >> >>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction
> > > > >> >>
> > > > >> >> On Sun, Jun 25, 2023 at 4:25 AM Yunfeng Zhou
> > > > >> >> <flink.zhouyunf...@gmail.com> wrote:
> > > > >> >> >
> > > > >> >> > Hi all,
> > > > >> >> >
> > > > >> >> > Dong(cc'ed) and I are opening this thread to discuss our
> > proposal
> > > > to
> > > > >> >> > support configuring end-to-end allowed latency for Flink
> jobs,
> > > > which
> > > > >> >> > has been documented in FLIP-325
> > > > >> >> > <
> > > > >> >>
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-325%3A+Support+configuring+end-to-end+allowed+latency
> > > > >> >> >.
> > > > >> >> >
> > > > >> >> > By configuring the latency requirement for a Flink job, users
> > > would
> > > > >> be
> > > > >> >> > able to optimize the throughput and overhead of the job while
> > > still
> > > > >> >> > acceptably increasing latency. This approach is particularly
> > > useful
> > > > >> >> > when dealing with records that do not require immediate
> > > processing
> > > > >> and
> > > > >> >> > emission upon arrival.
> > > > >> >> >
> > > > >> >> > Please refer to the FLIP document for more details about the
> > > > proposed
> > > > >> >> > design and implementation. We welcome any feedback and
> opinions
> > > on
> > > > >> >> > this proposal.
> > > > >> >> >
> > > > >> >> > Best regards.
> > > > >> >> >
> > > > >> >> > Dong and Yunfeng
> > > > >> >>
> > > > >> >
> > > > >>
> > > > >
> > > >
> > >
> >
>

Reply via email to