Thank Piotrek for your valuable input!

I will prepare the following FLIPs about faster checkpointing in the
current async execution model and the new APIs. And I have added some brief
description of this part in FLIP-423/424/425.

Regarding your concern:

>  My main concern here, is to prevent a situation where we

end up with duplicate code base of the operators:

- the current set of operators that are well behaving during checkpointing,
> but are synchronous
> - some set of async operators that will be miss-behaving during checkpoints
>

Yes, that's definitely what we should avoid. Let's thoroughly refine the
checkpointing behavior before the SQL operator rework in milestone 2.


Best,
Zakelly

On Wed, Mar 27, 2024 at 4:30 PM Piotr Nowojski <pnowoj...@apache.org> wrote:

> Hi!
>
> Yes, after some long offline discussions we agreed to proceed as planned
> here, but we should treat the current API as experimental. The issues are
> that either we can not checkpoint lambdas as they are currently defined,
> leading to problems caused by in-flight records draining under
> backpressure.
>
> <side comment>
> 6000 records that's not far off from the amount of records buffered in
> network buffers for smaller parallelism back pressured jobs, and before
> unaligned checkpoints were implemented, even such jobs were often seen with
> checkpointing times exploding to tens of minutes/hours.
> And special handling of watermarks wouldn't solve the problem - problem
> might be caused by upstream window operator flushing records on watermark,
> while downstream operator that has those 6000 in-flight records is also
> backpressured. Then processing of the watermark in the upstream operator
> will be tied to the downstream operator draining all of those in-flight
> records.
> </side comment>
>
> On the other hand checkpointing in-flight requests with the current API
> would require serializing lambdas into the checkpointed state, which has
> significant limitations on it's own:
> - unable to upgrade (including bug fix) JRE
> - problems when updating dependencies of the code inside the lambdas, if
> the updated dependencies are not binary compatible
> - no way to fix bugs inside the lambdas - users might get stuck in an
> unrecoverable state
>
> After brainstorming couple of different options, we came up with a couple
> of solutions to those issues, one that we liked the most looks like:
>
> ```
> public void open() {
>     // name (bid),  in type, consumer
>     consumer("GET", Void.class, (v) -> {
>         return getState("wordcount").get();
>     });
>     consumer("UPDATE", Integer.class, (v) -> {
>         return getState("wordcount").update(v == null ? 1 : v + 1);
>     });
>     consumer("OUT", Integer.class, (v) -> {
>         getContext().collect(v);
>     });
> }
> public void processElement() {
>     do("GET").then("UPDATE").then("OUT");
> }
> ```
>
> Where "GET" "UPDATE" "OUT" are some uid's (`block-id`/`bid`). This way
> users could declare upfront during the operator's startup what
> method/function should handle a given `bid` in the current execution
> attempt. When checkpointing in-flight async state requests, Flink would
> store only the registered `bid`, not the serialised code itself. This would
> avoid problems of serializing lambdas.
>
> However, to not postpone this effort we reached a consensus that we can
> proceed with the current proposal and treat the currently proposed Async
> API (without declaring code upfront) as experimental/PoC - a test bed for
> the whole disaggregated state backend. Not intended to be widely used in
> the code base. And in parallel in a follow up FLIP, we could discuss the
> exact shape of the declarative async API that would be actually
> checkpointable. My main concern here, is to prevent a situation where we
> end up with duplicate code base of the operators:
> - the current set of operators that are well behaving during checkpointing,
> but are synchronous
> - some set of async operators that will be miss-behaving during checkpoints
>
> We should really try to avoid this scenario.
>
> Best,
> Piotrek
>
> śr., 27 mar 2024 o 05:06 Zakelly Lan <zakelly....@gmail.com> napisał(a):
>
> > Hi devs,
> >
> > It seems there is no more concern or suggestion for a while. We'd like to
> > start voting recently.
> >
> >
> > Best,
> > Zakelly
> >
> > On Wed, Mar 27, 2024 at 11:46 AM Zakelly Lan <zakelly....@gmail.com>
> > wrote:
> >
> > > Hi everyone,
> > >
> > > Piotr and I had a long discussion about the checkpoint duration issue.
> We
> > > think that the lambda serialization approach I proposed in last mail
> may
> > > bring in more issues, the most serious one is that users may not be
> able
> > to
> > > modify their code in serialized lambda to perform a bug fix.
> > >
> > > But fortunately we found a possible solution. By introducing a set of
> > > declarative APIs and a `declareProcess()` function that users should
> > > implement in some newly introduced AbstractStreamOperator, we could get
> > the
> > > declaration of record processing in runtime, broken down to requests
> and
> > > callbacks (lambdas) like FLIP-424 introduced. Thus we avoid the problem
> > of
> > > lambda (de)serialization and instead we retrieve callbacks every time
> > > before a task runs. The next step is to provide an API allowing users
> to
> > > assign an unique id to each state request and callback, or
> automatically
> > > assign one by declaration order. Thus we can find the corresponding
> > > callback in runtime for each restored state request based on the id,
> then
> > > the whole pipeline can be resumed.
> > >
> > > Note that all these above are internal and won't expose to average
> users.
> > > Exposing this on Stream APIs can be discussed later. I will prepare
> > another
> > > FLIP(s) describing the whole optimized checkpointing process, and in
> the
> > > meantime, we can proceed on current FLIPs. The new FLIP(s) are built on
> > top
> > > of current ones and we can achieve this incrementally.
> > >
> > >
> > > Best,
> > > Zakelly
> > >
> > > On Thu, Mar 21, 2024 at 12:31 AM Zakelly Lan <zakelly....@gmail.com>
> > > wrote:
> > >
> > >> Hi Piotrek,
> > >>
> > >> Thanks for your comments!
> > >>
> > >> As we discussed off-line, you agreed that we can not checkpoint while
> > some
> > >>> records are in the middle of being
> > >>> processed. That we would have to drain the in-progress records before
> > >>> doing
> > >>> the checkpoint. You also argued
> > >>> that this is not a problem, because the size of this buffer can be
> > >>> configured.
> > >>>
> > >>> I'm really afraid of such a solution. I've seen in the past plenty of
> > >>> times, that whenever Flink has to drain some
> > >>> buffered records, eventually that always brakes timely checkpointing
> > (and
> > >>> hence ability for Flink to rescale in
> > >>> a timely manner). Even a single record with a `flatMap` like operator
> > >>> currently in Flink causes problems during
> > >>> back pressure. That's especially true for example for processing
> > >>> watermarks. At the same time, I don't see how
> > >>> this value could be configured by even Flink's power users, let alone
> > an
> > >>> average user. The size of that in-flight
> > >>> buffer not only depends on a particular query/job, but also the
> "good"
> > >>> value changes dynamically over time,
> > >>> and can change very rapidly. Sudden spikes of records or
> backpressure,
> > >>> some
> > >>> hiccup during emitting watermarks,
> > >>> all of those could change in an instant the theoretically optimal
> > buffer
> > >>> size of let's say "6000" records, down to "1".
> > >>> And when those changes happen, those are the exact times when timely
> > >>> checkpointing matters the most.
> > >>> If the load pattern suddenly changes, and checkpointing takes
> suddenly
> > >>> tens
> > >>> of minutes instead of a couple of
> > >>> seconds, it means you can not use rescaling and you are forced to
> > >>> overprovision the resources. And there also
> > >>> other issues if checkpointing takes too long.
> > >>>
> > >>
> > >> I'm gonna clarify some misunderstanding here. First of all, is the
> sync
> > >> phase of checkpointing for the current plan longer than the
> synchronous
> > >> execution model? The answer is yes, it is a trade-off for parallel
> > >> execution model. I think the cost is worth the improvement. Now the
> > >> question is, how much longer are we talking about? The PoC result I
> > >> provided is that it takes 3 seconds to drain 6000 records of a simple
> > job,
> > >> and I said it is not a big deal. Even though you would say we may
> > encounter
> > >> long watermark/timer processing that make the cp wait, thus I provide
> > >> several ways to optimize this:
> > >>
> > >>    1. Instead of only controlling the in-flight records, we could
> > >>    control the in-flight watermark.
> > >>    2. Since we have broken down the record processing into several
> state
> > >>    requests with at most one subsequent callback for each request, the
> > cp can
> > >>    be processed after current RUNNING requests (NOT records) and their
> > >>    callbacks finish. Which means, even though we have a lot of records
> > >>    in-flight (I mean in 'processElement' here), once only a small
> group
> > of
> > >>    state requests finishes, the cp can proceed. They will form into 1
> > or 2
> > >>    multiGets to rocksdb, which takes less time. Moreover, the timer
> > processing
> > >>    is also split into several requests, so cp won't wait for the whole
> > timer
> > >>    to finish. The picture attached describes this idea.
> > >>
> > >> And the size of this buffer can be configured. I'm not counting on
> > >> average users to configure it well, I'm just saying that we'd better
> not
> > >> focus on absolute numbers of PoC or specific cases since we can always
> > >> provide a conservative default value to make this acceptable for most
> > >> cases.The adaptive buffer size is also worth a try if we provide a
> > >> conservative strategy.
> > >>
> > >> Besides, I don't understand why the load pattern or spikes or
> > >> backpressure will affect this. We are controlling the records that can
> > get
> > >> in the 'processElement' and the state requests that can fire in
> > parallel,
> > >> no matter how high the load spikes, they will be blocked outside. It
> is
> > >> relatively stable within the proposed execution model itself. The
> > unaligned
> > >> barrier will skip those inputs in the queue as before.
> > >>
> > >>
> > >> At the same time, I still don't understand why we can not implement
> > things
> > >>> incrementally? First
> > >>> let's start with the current API, without the need to rewrite all of
> > the
> > >>> operators, we can asynchronously fetch whole
> > >>> state for a given record using its key. That should already vastly
> > >>> improve
> > >>> many things, and this way we could
> > >>> perform a checkpoint without a need of draining the
> > in-progress/in-flight
> > >>> buffer. We could roll that version out,
> > >>> test it out in practice, and then we could see if the fine grained
> > state
> > >>> access is really needed. Otherwise it sounds
> > >>> to me like a premature optimization, that requires us to not only
> > >>> rewrite a
> > >>> lot of the code, but also to later maintain
> > >>> it, even if it ultimately proves to be not needed. Which I of course
> > can
> > >>> not be certain but I have a feeling that it
> > >>> might be the case.
> > >>
> > >>
> > >> The disaggregated state management we proposed is target at including
> > but
> > >> not limited to the following challenges:
> > >>
> > >>    1. Local disk constraints, including limited I/O and space.
> > >>    (discussed in FLIP-423)
> > >>    2. Unbind the I/O resource with pre-allocated CPU resource, to make
> > >>    good use of both (motivation of FLIP-424)
> > >>    3. Elasticity of scaling I/O or storage capacity.
> > >>
> > >> Thus our plan is dependent on DFS, using local disk as an optional
> cache
> > >> only. The pre-fetching plan you mentioned is still binding I/O with
> CPU
> > >> resources, and will consume even more I/O to load unnecessary state.
> It
> > >> makes things worse. Please note that we are not targeting some
> scenarios
> > >> where the local state could handle well, and our goal is not to
> replace
> > the
> > >> local state.
> > >>
> > >> And If manpower is a big concern of yours, I would say many of my
> > >> colleagues could help contribute in runtime or SQL operators. It is
> > >> experimental on a separate code path other than the local state and
> > will be
> > >> recommended to use only when we prove it mature.
> > >>
> > >>
> > >> Thanks & Best,
> > >> Zakelly
> > >>
> > >> On Wed, Mar 20, 2024 at 10:04 PM Piotr Nowojski <pnowoj...@apache.org
> >
> > >> wrote:
> > >>
> > >>> Hey Zakelly!
> > >>>
> > >>> Sorry for the late reply. I still have concerns about the proposed
> > >>> solution, with my main concerns coming from
> > >>> the implications of the asynchronous state access API on the
> > >>> checkpointing
> > >>> and responsiveness of Flink.
> > >>>
> > >>> >> What also worries me a lot in this fine grained model is the
> effect
> > on
> > >>> the checkpointing times.
> > >>> >
> > >>> > Your concerns are very reasonable. Faster checkpointing is always a
> > >>> core
> > >>> advantage of disaggregated state,
> > >>> > but only for the async phase. There will be some complexity
> > introduced
> > >>> by
> > >>> in-flight requests, but I'd suggest
> > >>> > a checkpoint containing those in-flight state requests as part of
> the
> > >>> state, to accelerate the sync phase by
> > >>> > skipping the buffer draining. This makes the buffer size have
> little
> > >>> impact on checkpoint time. And all the
> > >>> > changes keep within the execution model we proposed while the
> > >>> checkpoint
> > >>> barrier alignment or handling
> > >>> > will not be touched in our proposal, so I guess the complexity is
> > >>> relatively controllable. I have faith in that :)
> > >>>
> > >>> As we discussed off-line, you agreed that we can not checkpoint while
> > >>> some
> > >>> records are in the middle of being
> > >>> processed. That we would have to drain the in-progress records before
> > >>> doing
> > >>> the checkpoint. You also argued
> > >>> that this is not a problem, because the size of this buffer can be
> > >>> configured.
> > >>>
> > >>> I'm really afraid of such a solution. I've seen in the past plenty of
> > >>> times, that whenever Flink has to drain some
> > >>> buffered records, eventually that always brakes timely checkpointing
> > (and
> > >>> hence ability for Flink to rescale in
> > >>> a timely manner). Even a single record with a `flatMap` like operator
> > >>> currently in Flink causes problems during
> > >>> back pressure. That's especially true for example for processing
> > >>> watermarks. At the same time, I don't see how
> > >>> this value could be configured by even Flink's power users, let alone
> > an
> > >>> average user. The size of that in-flight
> > >>> buffer not only depends on a particular query/job, but also the
> "good"
> > >>> value changes dynamically over time,
> > >>> and can change very rapidly. Sudden spikes of records or
> backpressure,
> > >>> some
> > >>> hiccup during emitting watermarks,
> > >>> all of those could change in an instant the theoretically optimal
> > buffer
> > >>> size of let's say "6000" records, down to "1".
> > >>> And when those changes happen, those are the exact times when timely
> > >>> checkpointing matters the most.
> > >>> If the load pattern suddenly changes, and checkpointing takes
> suddenly
> > >>> tens
> > >>> of minutes instead of a couple of
> > >>> seconds, it means you can not use rescaling and you are forced to
> > >>> overprovision the resources. And there also
> > >>> other issues if checkpointing takes too long.
> > >>>
> > >>> At the same time, I still don't understand why we can not implement
> > >>> things
> > >>> incrementally? First
> > >>> let's start with the current API, without the need to rewrite all of
> > the
> > >>> operators, we can asynchronously fetch whole
> > >>> state for a given record using its key. That should already vastly
> > >>> improve
> > >>> many things, and this way we could
> > >>> perform a checkpoint without a need of draining the
> > in-progress/in-flight
> > >>> buffer. We could roll that version out,
> > >>> test it out in practice, and then we could see if the fine grained
> > state
> > >>> access is really needed. Otherwise it sounds
> > >>> to me like a premature optimization, that requires us to not only
> > >>> rewrite a
> > >>> lot of the code, but also to later maintain
> > >>> it, even if it ultimately proves to be not needed. Which I of course
> > can
> > >>> not be certain but I have a feeling that it
> > >>> might be the case.
> > >>>
> > >>> Best,
> > >>> Piotrek
> > >>>
> > >>> wt., 19 mar 2024 o 10:42 Zakelly Lan <zakelly....@gmail.com>
> > napisał(a):
> > >>>
> > >>> > Hi everyone,
> > >>> >
> > >>> > Thanks for your valuable feedback!
> > >>> >
> > >>> > Our discussions have been going on for a while and are nearing a
> > >>> > consensus. So I would like to start a vote after 72 hours.
> > >>> >
> > >>> > Please let me know if you have any concerns, thanks!
> > >>> >
> > >>> >
> > >>> > Best,
> > >>> > Zakelly
> > >>> >
> > >>> > On Tue, Mar 19, 2024 at 3:37 PM Zakelly Lan <zakelly....@gmail.com
> >
> > >>> wrote:
> > >>> >
> > >>> > > Hi Yunfeng,
> > >>> > >
> > >>> > > Thanks for the suggestion!
> > >>> > >
> > >>> > > I will reorganize the FLIP-425 accordingly.
> > >>> > >
> > >>> > >
> > >>> > > Best,
> > >>> > > Zakelly
> > >>> > >
> > >>> > > On Tue, Mar 19, 2024 at 3:20 PM Yunfeng Zhou <
> > >>> > flink.zhouyunf...@gmail.com>
> > >>> > > wrote:
> > >>> > >
> > >>> > >> Hi Xintong and Zakelly,
> > >>> > >>
> > >>> > >> > 2. Regarding Strictly-ordered and Out-of-order of Watermarks
> > >>> > >> I agree with it that watermarks can use only out-of-order mode
> for
> > >>> > >> now, because there is still not a concrete example showing the
> > >>> > >> correctness risk about it. However, the strictly-ordered mode
> > should
> > >>> > >> still be supported as the default option for non-record event
> > types
> > >>> > >> other than watermark, at least for checkpoint barriers.
> > >>> > >>
> > >>> > >> I noticed that this information has already been documented in
> > "For
> > >>> > >> other non-record events, such as RecordAttributes ...", but it's
> > at
> > >>> > >> the bottom of the "Watermark" section, which might not be very
> > >>> > >> obvious. Thus it might be better to reorganize the FLIP to
> better
> > >>> > >> claim that the two order modes are designed for all non-record
> > >>> events,
> > >>> > >> and which mode this FLIP would choose for each type of event.
> > >>> > >>
> > >>> > >> Best,
> > >>> > >> Yunfeng
> > >>> > >>
> > >>> > >> On Tue, Mar 19, 2024 at 1:09 PM Xintong Song <
> > tonysong...@gmail.com
> > >>> >
> > >>> > >> wrote:
> > >>> > >> >
> > >>> > >> > Thanks for the quick response. Sounds good to me.
> > >>> > >> >
> > >>> > >> > Best,
> > >>> > >> >
> > >>> > >> > Xintong
> > >>> > >> >
> > >>> > >> >
> > >>> > >> >
> > >>> > >> > On Tue, Mar 19, 2024 at 1:03 PM Zakelly Lan <
> > >>> zakelly....@gmail.com>
> > >>> > >> wrote:
> > >>> > >> >
> > >>> > >> > > Hi Xintong,
> > >>> > >> > >
> > >>> > >> > > Thanks for sharing your thoughts!
> > >>> > >> > >
> > >>> > >> > > 1. Regarding Record-ordered and State-ordered of
> > processElement.
> > >>> > >> > > >
> > >>> > >> > > > I understand that while State-ordered likely provides
> better
> > >>> > >> performance,
> > >>> > >> > > > Record-ordered is sometimes required for correctness. The
> > >>> question
> > >>> > >> is how
> > >>> > >> > > > should a user choose between these two modes? My concern
> is
> > >>> that
> > >>> > >> such a
> > >>> > >> > > > decision may require users to have in-depth knowledge
> about
> > >>> the
> > >>> > >> Flink
> > >>> > >> > > > internals, and may lead to correctness issues if
> > >>> State-ordered is
> > >>> > >> chosen
> > >>> > >> > > > improperly.
> > >>> > >> > > >
> > >>> > >> > > > I'd suggest not to expose such a knob, at least in the
> first
> > >>> > >> version.
> > >>> > >> > > That
> > >>> > >> > > > means always use Record-ordered for custom operators /
> UDFs,
> > >>> and
> > >>> > >> keep
> > >>> > >> > > > State-ordered for internal usages (built-in operators)
> only.
> > >>> > >> > > >
> > >>> > >> > >
> > >>> > >> > > Indeed, users may not be able to choose the mode properly. I
> > >>> agree
> > >>> > to
> > >>> > >> keep
> > >>> > >> > > such options for internal use.
> > >>> > >> > >
> > >>> > >> > >
> > >>> > >> > > 2. Regarding Strictly-ordered and Out-of-order of
> Watermarks.
> > >>> > >> > > >
> > >>> > >> > > > I'm not entirely sure about Strictly-ordered being the
> > >>> default, or
> > >>> > >> even
> > >>> > >> > > > being supported. From my understanding, a Watermark(T)
> only
> > >>> > >> suggests that
> > >>> > >> > > > all records with event time before T has arrived, and it
> has
> > >>> > >> nothing to
> > >>> > >> > > do
> > >>> > >> > > > with whether records with event time after T has arrived
> or
> > >>> not.
> > >>> > >> From
> > >>> > >> > > that
> > >>> > >> > > > perspective, preventing certain records from arriving
> > before a
> > >>> > >> Watermark
> > >>> > >> > > is
> > >>> > >> > > > never supported. I also cannot come up with any use case
> > where
> > >>> > >> > > > Strictly-ordered is necessary. This implies the same issue
> > as
> > >>> 1):
> > >>> > >> how
> > >>> > >> > > does
> > >>> > >> > > > the user choose between the two modes?
> > >>> > >> > > >
> > >>> > >> > > > I'd suggest not expose the knob to users and only support
> > >>> > >> Out-of-order,
> > >>> > >> > > > until we see a concrete use case that Strictly-ordered is
> > >>> needed.
> > >>> > >> > > >
> > >>> > >> > >
> > >>> > >> > > The semantics of watermarks do not define the sequence
> > between a
> > >>> > >> watermark
> > >>> > >> > > and subsequent records. For the most part, this is
> > >>> inconsequential,
> > >>> > >> except
> > >>> > >> > > it may affect some current users who have previously relied
> on
> > >>> the
> > >>> > >> implicit
> > >>> > >> > > assumption of an ordered execution. I'd be fine with
> initially
> > >>> > >> supporting
> > >>> > >> > > only out-of-order processing. We may consider exposing the
> > >>> > >> > > 'Strictly-ordered' mode once we encounter a concrete use
> case
> > >>> that
> > >>> > >> > > necessitates it.
> > >>> > >> > >
> > >>> > >> > >
> > >>> > >> > > My philosophies behind not exposing the two config options
> > are:
> > >>> > >> > > > - There are already too many options in Flink that barely
> > >>> know how
> > >>> > >> to use
> > >>> > >> > > > them. I think Flink should try as much as possible to
> decide
> > >>> its
> > >>> > own
> > >>> > >> > > > behavior, rather than throwing all the decisions to the
> > users.
> > >>> > >> > > > - It's much harder to take back knobs than to introduce
> > them.
> > >>> > >> Therefore,
> > >>> > >> > > > options should be introduced only if concrete use cases
> are
> > >>> > >> identified.
> > >>> > >> > > >
> > >>> > >> > >
> > >>> > >> > > I agree to keep minimal configurable items especially for
> the
> > >>> MVP.
> > >>> > >> Given
> > >>> > >> > > that we have the opportunity to refine the functionality
> > before
> > >>> the
> > >>> > >> > > framework transitions from @Experimental to @PublicEvolving,
> > it
> > >>> > makes
> > >>> > >> sense
> > >>> > >> > > to refrain from presenting user-facing options until we have
> > >>> ensured
> > >>> > >> > > their necessity.
> > >>> > >> > >
> > >>> > >> > >
> > >>> > >> > > Best,
> > >>> > >> > > Zakelly
> > >>> > >> > >
> > >>> > >> > > On Tue, Mar 19, 2024 at 12:06 PM Xintong Song <
> > >>> > tonysong...@gmail.com>
> > >>> > >> > > wrote:
> > >>> > >> > >
> > >>> > >> > > > Sorry for joining the discussion late.
> > >>> > >> > > >
> > >>> > >> > > > I have two questions about FLIP-425.
> > >>> > >> > > >
> > >>> > >> > > > 1. Regarding Record-ordered and State-ordered of
> > >>> processElement.
> > >>> > >> > > >
> > >>> > >> > > > I understand that while State-ordered likely provides
> better
> > >>> > >> performance,
> > >>> > >> > > > Record-ordered is sometimes required for correctness. The
> > >>> question
> > >>> > >> is how
> > >>> > >> > > > should a user choose between these two modes? My concern
> is
> > >>> that
> > >>> > >> such a
> > >>> > >> > > > decision may require users to have in-depth knowledge
> about
> > >>> the
> > >>> > >> Flink
> > >>> > >> > > > internals, and may lead to correctness issues if
> > >>> State-ordered is
> > >>> > >> chosen
> > >>> > >> > > > improperly.
> > >>> > >> > > >
> > >>> > >> > > > I'd suggest not to expose such a knob, at least in the
> first
> > >>> > >> version.
> > >>> > >> > > That
> > >>> > >> > > > means always use Record-ordered for custom operators /
> UDFs,
> > >>> and
> > >>> > >> keep
> > >>> > >> > > > State-ordered for internal usages (built-in operators)
> only.
> > >>> > >> > > >
> > >>> > >> > > > 2. Regarding Strictly-ordered and Out-of-order of
> > Watermarks.
> > >>> > >> > > >
> > >>> > >> > > > I'm not entirely sure about Strictly-ordered being the
> > >>> default, or
> > >>> > >> even
> > >>> > >> > > > being supported. From my understanding, a Watermark(T)
> only
> > >>> > >> suggests that
> > >>> > >> > > > all records with event time before T has arrived, and it
> has
> > >>> > >> nothing to
> > >>> > >> > > do
> > >>> > >> > > > with whether records with event time after T has arrived
> or
> > >>> not.
> > >>> > >> From
> > >>> > >> > > that
> > >>> > >> > > > perspective, preventing certain records from arriving
> > before a
> > >>> > >> Watermark
> > >>> > >> > > is
> > >>> > >> > > > never supported. I also cannot come up with any use case
> > where
> > >>> > >> > > > Strictly-ordered is necessary. This implies the same issue
> > as
> > >>> 1):
> > >>> > >> how
> > >>> > >> > > does
> > >>> > >> > > > the user choose between the two modes?
> > >>> > >> > > >
> > >>> > >> > > > I'd suggest not expose the knob to users and only support
> > >>> > >> Out-of-order,
> > >>> > >> > > > until we see a concrete use case that Strictly-ordered is
> > >>> needed.
> > >>> > >> > > >
> > >>> > >> > > >
> > >>> > >> > > > My philosophies behind not exposing the two config options
> > >>> are:
> > >>> > >> > > > - There are already too many options in Flink that barely
> > >>> know how
> > >>> > >> to use
> > >>> > >> > > > them. I think Flink should try as much as possible to
> decide
> > >>> its
> > >>> > own
> > >>> > >> > > > behavior, rather than throwing all the decisions to the
> > users.
> > >>> > >> > > > - It's much harder to take back knobs than to introduce
> > them.
> > >>> > >> Therefore,
> > >>> > >> > > > options should be introduced only if concrete use cases
> are
> > >>> > >> identified.
> > >>> > >> > > >
> > >>> > >> > > > WDYT?
> > >>> > >> > > >
> > >>> > >> > > > Best,
> > >>> > >> > > >
> > >>> > >> > > > Xintong
> > >>> > >> > > >
> > >>> > >> > > >
> > >>> > >> > > >
> > >>> > >> > > > On Fri, Mar 8, 2024 at 2:45 AM Jing Ge
> > >>> <j...@ververica.com.invalid
> > >>> > >
> > >>> > >> > > wrote:
> > >>> > >> > > >
> > >>> > >> > > > > +1 for Gyula's suggestion. I just finished FLIP-423
> which
> > >>> > >> introduced
> > >>> > >> > > the
> > >>> > >> > > > > intention of the big change and high level architecture.
> > >>> Great
> > >>> > >> content
> > >>> > >> > > > btw!
> > >>> > >> > > > > The only public interface change for this FLIP is one
> new
> > >>> config
> > >>> > >> to use
> > >>> > >> > > > > ForSt. It makes sense to have one dedicated discussion
> > >>> thread
> > >>> > for
> > >>> > >> each
> > >>> > >> > > > > concrete system design.
> > >>> > >> > > > >
> > >>> > >> > > > > @Zakelly The links in your mail do not work except the
> > last
> > >>> one,
> > >>> > >> > > because
> > >>> > >> > > > > the FLIP-xxx has been included into the url like
> > >>> > >> > > > >
> > >>> > >> > >
> > >>> > >>
> > >>> >
> > >>>
> > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425
> > >>> > >> > > > .
> > >>> > >> > > > >
> > >>> > >> > > > > NIT fix:
> > >>> > >> > > > >
> > >>> > >> > > > > FLIP-424:
> > >>> > >> > > >
> > >>> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
> > >>> > >> > > > >
> > >>> > >> > > > > FLIP-425:
> > >>> > >> > > >
> > >>> https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h
> > >>> > >> > > > >
> > >>> > >> > > > > FLIP-426:
> > >>> > >> > > >
> > >>> https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf
> > >>> > >> > > > >
> > >>> > >> > > > > FLIP-427:
> > >>> > >> > > >
> > >>> https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft
> > >>> > >> > > > >
> > >>> > >> > > > > FLIP-428:
> > >>> > >> > > >
> > >>> https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b
> > >>> > >> > > > >
> > >>> > >> > > > > Best regards,
> > >>> > >> > > > > Jing
> > >>> > >> > > > >
> > >>> > >> > > > >
> > >>> > >> > > > >
> > >>> > >> > > > >
> > >>> > >> > > > > On Thu, Mar 7, 2024 at 10:14 AM Zakelly Lan <
> > >>> > >> zakelly....@gmail.com>
> > >>> > >> > > > wrote:
> > >>> > >> > > > >
> > >>> > >> > > > > > Hi everyone,
> > >>> > >> > > > > >
> > >>> > >> > > > > > Thank you all for a lively discussion here, and it is
> a
> > >>> good
> > >>> > >> time to
> > >>> > >> > > > move
> > >>> > >> > > > > > forward to more detailed discussions. Thus we open
> > several
> > >>> > >> threads
> > >>> > >> > > for
> > >>> > >> > > > > > sub-FLIPs:
> > >>> > >> > > > > >
> > >>> > >> > > > > > FLIP-424:
> > >>> > >> > > > >
> > >>> > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
> > >>> > >> > > > > > FLIP-425
> > >>> > >> > > > > > <
> > >>> > >> > > > >
> > >>> > >> > >
> > >>> > >>
> > >>> >
> > >>>
> > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425
> > >>> > >> > > > >:
> > >>> > >> > > > > >
> > >>> > >>
> https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h
> > >>> > >> > > > > > FLIP-426
> > >>> > >> > > > > > <
> > >>> > >> > > > >
> > >>> > >> > >
> > >>> > >>
> > >>> >
> > >>>
> > https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0hFLIP-426
> > >>> > >> > > > >:
> > >>> > >> > > > > >
> > >>> > >>
> https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf
> > >>> > >> > > > > > FLIP-427
> > >>> > >> > > > > > <
> > >>> > >> > > > >
> > >>> > >> > >
> > >>> > >>
> > >>> >
> > >>>
> > https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrfFLIP-427
> > >>> > >> > > > >:
> > >>> > >> > > > > >
> > >>> > >>
> https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft
> > >>> > >> > > > > > FLIP-428
> > >>> > >> > > > > > <
> > >>> > >> > > > >
> > >>> > >> > >
> > >>> > >>
> > >>> >
> > >>>
> > https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ftFLIP-428
> > >>> > >> > > > >:
> > >>> > >> > > > > >
> > >>> > >>
> https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b
> > >>> > >> > > > > >
> > >>> > >> > > > > > If you want to talk about the overall architecture,
> > >>> roadmap,
> > >>> > >> > > milestones
> > >>> > >> > > > > or
> > >>> > >> > > > > > something related with multiple FLIPs, please post it
> > >>> here.
> > >>> > >> Otherwise
> > >>> > >> > > > you
> > >>> > >> > > > > > can discuss some details in separate mails. Let's try
> to
> > >>> avoid
> > >>> > >> > > repeated
> > >>> > >> > > > > > discussion in different threads. I will sync important
> > >>> > messages
> > >>> > >> here
> > >>> > >> > > if
> > >>> > >> > > > > > there are any in the above threads.
> > >>> > >> > > > > >
> > >>> > >> > > > > > And reply to @Jeyhun: We will ensure the content
> between
> > >>> those
> > >>> > >> FLIPs
> > >>> > >> > > is
> > >>> > >> > > > > > consistent.
> > >>> > >> > > > > >
> > >>> > >> > > > > >
> > >>> > >> > > > > > Best,
> > >>> > >> > > > > > Zakelly
> > >>> > >> > > > > >
> > >>> > >> > > > > > On Thu, Mar 7, 2024 at 2:16 PM Yuan Mei <
> > >>> > yuanmei.w...@gmail.com
> > >>> > >> >
> > >>> > >> > > > wrote:
> > >>> > >> > > > > >
> > >>> > >> > > > > > > I have been a bit busy these few weeks and sorry for
> > >>> > >> responding
> > >>> > >> > > late.
> > >>> > >> > > > > > >
> > >>> > >> > > > > > > The original thinking of keeping discussion within
> one
> > >>> > thread
> > >>> > >> is
> > >>> > >> > > for
> > >>> > >> > > > > > easier
> > >>> > >> > > > > > > tracking and avoid for repeated discussion in
> > different
> > >>> > >> threads.
> > >>> > >> > > > > > >
> > >>> > >> > > > > > > For details, It might be good to start in different
> > >>> threads
> > >>> > if
> > >>> > >> > > > needed.
> > >>> > >> > > > > > >
> > >>> > >> > > > > > > We will think of a way to better organize the
> > >>> discussion.
> > >>> > >> > > > > > >
> > >>> > >> > > > > > > Best
> > >>> > >> > > > > > > Yuan
> > >>> > >> > > > > > >
> > >>> > >> > > > > > >
> > >>> > >> > > > > > > On Thu, Mar 7, 2024 at 4:38 AM Jeyhun Karimov <
> > >>> > >> > > je.kari...@gmail.com>
> > >>> > >> > > > > > > wrote:
> > >>> > >> > > > > > >
> > >>> > >> > > > > > > > Hi,
> > >>> > >> > > > > > > >
> > >>> > >> > > > > > > > + 1 for the suggestion.
> > >>> > >> > > > > > > > Maybe we can the discussion with the FLIPs with
> > >>> minimum
> > >>> > >> > > > dependencies
> > >>> > >> > > > > > > (from
> > >>> > >> > > > > > > > the other new/proposed FLIPs).
> > >>> > >> > > > > > > > Based on our discussion on a particular FLIP, the
> > >>> > >> subsequent (or
> > >>> > >> > > > its
> > >>> > >> > > > > > > > dependent) FLIP(s) can be updated accordingly?
> > >>> > >> > > > > > > >
> > >>> > >> > > > > > > > Regards,
> > >>> > >> > > > > > > > Jeyhun
> > >>> > >> > > > > > > >
> > >>> > >> > > > > > > > On Wed, Mar 6, 2024 at 5:34 PM Gyula Fóra <
> > >>> > >> gyula.f...@gmail.com>
> > >>> > >> > > > > > wrote:
> > >>> > >> > > > > > > >
> > >>> > >> > > > > > > > > Hey all!
> > >>> > >> > > > > > > > >
> > >>> > >> > > > > > > > > This is a massive improvement / work. I just
> > started
> > >>> > going
> > >>> > >> > > > through
> > >>> > >> > > > > > the
> > >>> > >> > > > > > > > > Flips and have a more or less meta comment.
> > >>> > >> > > > > > > > >
> > >>> > >> > > > > > > > > While it's good to keep the overall architecture
> > >>> > >> discussion
> > >>> > >> > > > here, I
> > >>> > >> > > > > > > think
> > >>> > >> > > > > > > > > we should still have separate discussions for
> each
> > >>> FLIP
> > >>> > >> where
> > >>> > >> > > we
> > >>> > >> > > > > can
> > >>> > >> > > > > > > > > discuss interface details etc. With so much
> > content
> > >>> if
> > >>> > we
> > >>> > >> start
> > >>> > >> > > > > > adding
> > >>> > >> > > > > > > > > minor comments here that will lead to nowhere
> but
> > >>> those
> > >>> > >> > > > discussions
> > >>> > >> > > > > > are
> > >>> > >> > > > > > > > > still important and we should have them in
> > separate
> > >>> > >> threads
> > >>> > >> > > (one
> > >>> > >> > > > > for
> > >>> > >> > > > > > > each
> > >>> > >> > > > > > > > > FLIP)
> > >>> > >> > > > > > > > >
> > >>> > >> > > > > > > > > What do you think?
> > >>> > >> > > > > > > > > Gyula
> > >>> > >> > > > > > > > >
> > >>> > >> > > > > > > > > On Wed, Mar 6, 2024 at 8:50 AM Yanfei Lei <
> > >>> > >> fredia...@gmail.com
> > >>> > >> > > >
> > >>> > >> > > > > > wrote:
> > >>> > >> > > > > > > > >
> > >>> > >> > > > > > > > > > Hi team,
> > >>> > >> > > > > > > > > >
> > >>> > >> > > > > > > > > > Thanks for your discussion. Regarding
> FLIP-425,
> > we
> > >>> > have
> > >>> > >> > > > > > supplemented
> > >>> > >> > > > > > > > > > several updates to answer high-frequency
> > >>> questions:
> > >>> > >> > > > > > > > > >
> > >>> > >> > > > > > > > > > 1. We captured a flame graph of the Hashmap
> > state
> > >>> > >> backend in
> > >>> > >> > > > > > > > > > "Synchronous execution with asynchronous
> > APIs"[1],
> > >>> > which
> > >>> > >> > > > reveals
> > >>> > >> > > > > > that
> > >>> > >> > > > > > > > > > the framework overhead (including reference
> > >>> counting,
> > >>> > >> > > > > > future-related
> > >>> > >> > > > > > > > > > code and so on) consumes about 9% of the keyed
> > >>> > operator
> > >>> > >> CPU
> > >>> > >> > > > time.
> > >>> > >> > > > > > > > > > 2. We added a set of comparative experiments
> for
> > >>> > >> watermark
> > >>> > >> > > > > > > processing,
> > >>> > >> > > > > > > > > > the performance of Out-Of-Order mode is 70%
> > better
> > >>> > than
> > >>> > >> > > > > > > > > > strictly-ordered mode under ~140MB state size.
> > >>> > >> Instructions
> > >>> > >> > > on
> > >>> > >> > > > > how
> > >>> > >> > > > > > to
> > >>> > >> > > > > > > > > > run this test have also been added[2].
> > >>> > >> > > > > > > > > > 3. Regarding the order of StreamRecord,
> whether
> > >>> it has
> > >>> > >> state
> > >>> > >> > > > > access
> > >>> > >> > > > > > > or
> > >>> > >> > > > > > > > > > not. We supplemented a new *Strict order of
> > >>> > >> > > > 'processElement'*[3].
> > >>> > >> > > > > > > > > >
> > >>> > >> > > > > > > > > > [1]
> > >>> > >> > > > > > > > > >
> > >>> > >> > > > > > > > >
> > >>> > >> > > > > > > >
> > >>> > >> > > > > > >
> > >>> > >> > > > > >
> > >>> > >> > > > >
> > >>> > >> > > >
> > >>> > >> > >
> > >>> > >>
> > >>> >
> > >>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-SynchronousexecutionwithasynchronousAPIs
> > >>> > >> > > > > > > > > > [2]
> > >>> > >> > > > > > > > > >
> > >>> > >> > > > > > > > >
> > >>> > >> > > > > > > >
> > >>> > >> > > > > > >
> > >>> > >> > > > > >
> > >>> > >> > > > >
> > >>> > >> > > >
> > >>> > >> > >
> > >>> > >>
> > >>> >
> > >>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Strictly-orderedmodevs.Out-of-ordermodeforwatermarkprocessing
> > >>> > >> > > > > > > > > > [3]
> > >>> > >> > > > > > > > > >
> > >>> > >> > > > > > > > >
> > >>> > >> > > > > > > >
> > >>> > >> > > > > > >
> > >>> > >> > > > > >
> > >>> > >> > > > >
> > >>> > >> > > >
> > >>> > >> > >
> > >>> > >>
> > >>> >
> > >>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ElementOrder
> > >>> > >> > > > > > > > > >
> > >>> > >> > > > > > > > > >
> > >>> > >> > > > > > > > > > Best regards,
> > >>> > >> > > > > > > > > > Yanfei
> > >>> > >> > > > > > > > > >
> > >>> > >> > > > > > > > > > Yunfeng Zhou <flink.zhouyunf...@gmail.com>
> > >>> > 于2024年3月5日周二
> > >>> > >> > > > 09:25写道:
> > >>> > >> > > > > > > > > > >
> > >>> > >> > > > > > > > > > > Hi Zakelly,
> > >>> > >> > > > > > > > > > >
> > >>> > >> > > > > > > > > > > > 5. I'm not very sure ... revisiting this
> > later
> > >>> > >> since it
> > >>> > >> > > is
> > >>> > >> > > > > not
> > >>> > >> > > > > > > > > > important.
> > >>> > >> > > > > > > > > > >
> > >>> > >> > > > > > > > > > > It seems that we still have some details to
> > >>> confirm
> > >>> > >> about
> > >>> > >> > > > this
> > >>> > >> > > > > > > > > > > question. Let's postpone this to after the
> > >>> critical
> > >>> > >> parts
> > >>> > >> > > of
> > >>> > >> > > > > the
> > >>> > >> > > > > > > > > > > design are settled.
> > >>> > >> > > > > > > > > > >
> > >>> > >> > > > > > > > > > > > 8. Yes, we had considered ... metrics
> should
> > >>> be
> > >>> > like
> > >>> > >> > > > > > afterwards.
> > >>> > >> > > > > > > > > > >
> > >>> > >> > > > > > > > > > > Oh sorry I missed FLIP-431. I'm fine with
> > >>> discussing
> > >>> > >> this
> > >>> > >> > > > topic
> > >>> > >> > > > > > in
> > >>> > >> > > > > > > > > > milestone 2.
> > >>> > >> > > > > > > > > > >
> > >>> > >> > > > > > > > > > > Looking forward to the detailed design about
> > the
> > >>> > >> strict
> > >>> > >> > > mode
> > >>> > >> > > > > > > between
> > >>> > >> > > > > > > > > > > same-key records and the benchmark results
> > >>> about the
> > >>> > >> epoch
> > >>> > >> > > > > > > mechanism.
> > >>> > >> > > > > > > > > > >
> > >>> > >> > > > > > > > > > > Best regards,
> > >>> > >> > > > > > > > > > > Yunfeng
> > >>> > >> > > > > > > > > > >
> > >>> > >> > > > > > > > > > > On Mon, Mar 4, 2024 at 7:59 PM Zakelly Lan <
> > >>> > >> > > > > > zakelly....@gmail.com>
> > >>> > >> > > > > > > > > > wrote:
> > >>> > >> > > > > > > > > > > >
> > >>> > >> > > > > > > > > > > > Hi Yunfeng,
> > >>> > >> > > > > > > > > > > >
> > >>> > >> > > > > > > > > > > > For 1:
> > >>> > >> > > > > > > > > > > > I had a discussion with Lincoln Lee, and I
> > >>> realize
> > >>> > >> it is
> > >>> > >> > > a
> > >>> > >> > > > > > common
> > >>> > >> > > > > > > > > case
> > >>> > >> > > > > > > > > > the same-key record should be blocked before
> the
> > >>> > >> > > > > `processElement`.
> > >>> > >> > > > > > It
> > >>> > >> > > > > > > > is
> > >>> > >> > > > > > > > > > easier for users to understand. Thus I will
> > >>> introduce
> > >>> > a
> > >>> > >> > > strict
> > >>> > >> > > > > mode
> > >>> > >> > > > > > > for
> > >>> > >> > > > > > > > > > this and make it default. My rough idea is
> just
> > >>> like
> > >>> > >> yours,
> > >>> > >> > > by
> > >>> > >> > > > > > > invoking
> > >>> > >> > > > > > > > > > some method of AEC instance before
> > >>> `processElement`.
> > >>> > The
> > >>> > >> > > > detailed
> > >>> > >> > > > > > > > design
> > >>> > >> > > > > > > > > > will be described in FLIP later.
> > >>> > >> > > > > > > > > > > >
> > >>> > >> > > > > > > > > > > > For 2:
> > >>> > >> > > > > > > > > > > > I agree with you. We could throw
> exceptions
> > >>> for
> > >>> > now
> > >>> > >> and
> > >>> > >> > > > > > optimize
> > >>> > >> > > > > > > > this
> > >>> > >> > > > > > > > > > later.
> > >>> > >> > > > > > > > > > > >
> > >>> > >> > > > > > > > > > > > For 5:
> > >>> > >> > > > > > > > > > > >>
> > >>> > >> > > > > > > > > > > >> It might be better to move the default
> > >>> values to
> > >>> > >> the
> > >>> > >> > > > > Proposed
> > >>> > >> > > > > > > > > Changes
> > >>> > >> > > > > > > > > > > >> section instead of making them public for
> > >>> now, as
> > >>> > >> there
> > >>> > >> > > > will
> > >>> > >> > > > > > be
> > >>> > >> > > > > > > > > > > >> compatibility issues once we want to
> > >>> dynamically
> > >>> > >> adjust
> > >>> > >> > > > the
> > >>> > >> > > > > > > > > thresholds
> > >>> > >> > > > > > > > > > > >> and timeouts in future.
> > >>> > >> > > > > > > > > > > >
> > >>> > >> > > > > > > > > > > > Agreed. The whole framework is under
> > >>> experiment
> > >>> > >> until we
> > >>> > >> > > > > think
> > >>> > >> > > > > > it
> > >>> > >> > > > > > > > is
> > >>> > >> > > > > > > > > > complete in 2.0 or later. The default value
> > >>> should be
> > >>> > >> better
> > >>> > >> > > > > > > determined
> > >>> > >> > > > > > > > > > with more testing results and production
> > >>> experience.
> > >>> > >> > > > > > > > > > > >
> > >>> > >> > > > > > > > > > > >> The configuration
> > >>> execution.async-state.enabled
> > >>> > >> seems
> > >>> > >> > > > > > > unnecessary,
> > >>> > >> > > > > > > > > as
> > >>> > >> > > > > > > > > > > >> the infrastructure may automatically get
> > this
> > >>> > >> > > information
> > >>> > >> > > > > from
> > >>> > >> > > > > > > the
> > >>> > >> > > > > > > > > > > >> detailed state backend configurations. We
> > may
> > >>> > >> revisit
> > >>> > >> > > this
> > >>> > >> > > > > > part
> > >>> > >> > > > > > > > > after
> > >>> > >> > > > > > > > > > > >> the core designs have reached an
> agreement.
> > >>> WDYT?
> > >>> > >> > > > > > > > > > > >
> > >>> > >> > > > > > > > > > > >
> > >>> > >> > > > > > > > > > > > I'm not very sure if there is any use case
> > >>> where
> > >>> > >> users
> > >>> > >> > > > write
> > >>> > >> > > > > > > their
> > >>> > >> > > > > > > > > > code using async APIs but run their job in a
> > >>> > >> synchronous way.
> > >>> > >> > > > The
> > >>> > >> > > > > > > first
> > >>> > >> > > > > > > > > two
> > >>> > >> > > > > > > > > > scenarios that come to me are for benchmarking
> > or
> > >>> for
> > >>> > a
> > >>> > >> small
> > >>> > >> > > > > > state,
> > >>> > >> > > > > > > > > while
> > >>> > >> > > > > > > > > > they don't want to rewrite their code.
> Actually
> > >>> it is
> > >>> > >> easy to
> > >>> > >> > > > > > > support,
> > >>> > >> > > > > > > > so
> > >>> > >> > > > > > > > > > I'd suggest providing it. But I'm fine with
> > >>> revisiting
> > >>> > >> this
> > >>> > >> > > > later
> > >>> > >> > > > > > > since
> > >>> > >> > > > > > > > > it
> > >>> > >> > > > > > > > > > is not important. WDYT?
> > >>> > >> > > > > > > > > > > >
> > >>> > >> > > > > > > > > > > > For 8:
> > >>> > >> > > > > > > > > > > > Yes, we had considered the I/O metrics
> group
> > >>> > >> especially
> > >>> > >> > > the
> > >>> > >> > > > > > > > > > back-pressure, idle and task busy per second.
> In
> > >>> the
> > >>> > >> current
> > >>> > >> > > > plan
> > >>> > >> > > > > > we
> > >>> > >> > > > > > > > can
> > >>> > >> > > > > > > > > do
> > >>> > >> > > > > > > > > > state access during back-pressure, meaning
> that
> > >>> those
> > >>> > >> metrics
> > >>> > >> > > > for
> > >>> > >> > > > > > > input
> > >>> > >> > > > > > > > > > would better be redefined. I suggest we
> discuss
> > >>> these
> > >>> > >> > > existing
> > >>> > >> > > > > > > metrics
> > >>> > >> > > > > > > > as
> > >>> > >> > > > > > > > > > well as some new metrics that should be
> > >>> introduced in
> > >>> > >> > > FLIP-431
> > >>> > >> > > > > > later
> > >>> > >> > > > > > > in
> > >>> > >> > > > > > > > > > milestone 2, since we have basically finished
> > the
> > >>> > >> framework
> > >>> > >> > > > thus
> > >>> > >> > > > > we
> > >>> > >> > > > > > > > will
> > >>> > >> > > > > > > > > > have a better view of what metrics should be
> > like
> > >>> > >> afterwards.
> > >>> > >> > > > > WDYT?
> > >>> > >> > > > > > > > > > > >
> > >>> > >> > > > > > > > > > > >
> > >>> > >> > > > > > > > > > > > Best,
> > >>> > >> > > > > > > > > > > > Zakelly
> > >>> > >> > > > > > > > > > > >
> > >>> > >> > > > > > > > > > > > On Mon, Mar 4, 2024 at 6:49 PM Yunfeng
> Zhou
> > <
> > >>> > >> > > > > > > > > > flink.zhouyunf...@gmail.com> wrote:
> > >>> > >> > > > > > > > > > > >>
> > >>> > >> > > > > > > > > > > >> Hi Zakelly,
> > >>> > >> > > > > > > > > > > >>
> > >>> > >> > > > > > > > > > > >> Thanks for the responses!
> > >>> > >> > > > > > > > > > > >>
> > >>> > >> > > > > > > > > > > >> > 1. I will discuss this with some expert
> > SQL
> > >>> > >> > > developers.
> > >>> > >> > > > > ...
> > >>> > >> > > > > > > mode
> > >>> > >> > > > > > > > > > for StreamRecord processing.
> > >>> > >> > > > > > > > > > > >>
> > >>> > >> > > > > > > > > > > >> In DataStream API there should also be
> use
> > >>> cases
> > >>> > >> when
> > >>> > >> > > the
> > >>> > >> > > > > > order
> > >>> > >> > > > > > > of
> > >>> > >> > > > > > > > > > > >> output is strictly required. I agree with
> > it
> > >>> that
> > >>> > >> SQL
> > >>> > >> > > > > experts
> > >>> > >> > > > > > > may
> > >>> > >> > > > > > > > > help
> > >>> > >> > > > > > > > > > > >> provide more concrete use cases that can
> > >>> > >> accelerate our
> > >>> > >> > > > > > > > discussion,
> > >>> > >> > > > > > > > > > > >> but please allow me to search for
> > DataStream
> > >>> use
> > >>> > >> cases
> > >>> > >> > > > that
> > >>> > >> > > > > > can
> > >>> > >> > > > > > > > > prove
> > >>> > >> > > > > > > > > > > >> the necessity of this strict order
> > >>> preservation
> > >>> > >> mode, if
> > >>> > >> > > > > > answers
> > >>> > >> > > > > > > > > from
> > >>> > >> > > > > > > > > > > >> SQL experts are shown to be negative.
> > >>> > >> > > > > > > > > > > >>
> > >>> > >> > > > > > > > > > > >> For your convenience, my current rough
> idea
> > >>> is
> > >>> > >> that we
> > >>> > >> > > can
> > >>> > >> > > > > > add a
> > >>> > >> > > > > > > > > > > >> module between the Input(s) and
> > >>> processElement()
> > >>> > >> module
> > >>> > >> > > in
> > >>> > >> > > > > > Fig 2
> > >>> > >> > > > > > > > of
> > >>> > >> > > > > > > > > > > >> FLIP-425. The module will be responsible
> > for
> > >>> > >> caching
> > >>> > >> > > > records
> > >>> > >> > > > > > > whose
> > >>> > >> > > > > > > > > > > >> keys collide with in-flight records, and
> > AEC
> > >>> will
> > >>> > >> only
> > >>> > >> > > be
> > >>> > >> > > > > > > > > responsible
> > >>> > >> > > > > > > > > > > >> for handling async state calls, without
> > >>> knowing
> > >>> > the
> > >>> > >> > > record
> > >>> > >> > > > > > each
> > >>> > >> > > > > > > > call
> > >>> > >> > > > > > > > > > > >> belongs to. We may revisit this topic
> once
> > >>> the
> > >>> > >> necessity
> > >>> > >> > > > of
> > >>> > >> > > > > > the
> > >>> > >> > > > > > > > > strict
> > >>> > >> > > > > > > > > > > >> order mode is clarified.
> > >>> > >> > > > > > > > > > > >>
> > >>> > >> > > > > > > > > > > >>
> > >>> > >> > > > > > > > > > > >> > 2. The amount of parallel StateRequests
> > ...
> > >>> > >> instead of
> > >>> > >> > > > > > > invoking
> > >>> > >> > > > > > > > > > yield
> > >>> > >> > > > > > > > > > > >>
> > >>> > >> > > > > > > > > > > >> Your suggestions generally appeal to me.
> I
> > >>> think
> > >>> > >> we may
> > >>> > >> > > > let
> > >>> > >> > > > > > > > > > > >> corresponding Flink jobs fail with OOM
> for
> > >>> now,
> > >>> > >> since
> > >>> > >> > > the
> > >>> > >> > > > > > > majority
> > >>> > >> > > > > > > > > of
> > >>> > >> > > > > > > > > > > >> a StateRequest should just be references
> to
> > >>> > >> existing
> > >>> > >> > > Java
> > >>> > >> > > > > > > objects,
> > >>> > >> > > > > > > > > > > >> which only occupies very small memory
> space
> > >>> and
> > >>> > can
> > >>> > >> > > hardly
> > >>> > >> > > > > > cause
> > >>> > >> > > > > > > > OOM
> > >>> > >> > > > > > > > > > > >> in common cases. We can monitor the
> pending
> > >>> > >> > > StateRequests
> > >>> > >> > > > > and
> > >>> > >> > > > > > if
> > >>> > >> > > > > > > > > there
> > >>> > >> > > > > > > > > > > >> is really a risk of OOM in extreme cases,
> > we
> > >>> can
> > >>> > >> throw
> > >>> > >> > > > > > > Exceptions
> > >>> > >> > > > > > > > > with
> > >>> > >> > > > > > > > > > > >> proper messages notifying users what to
> do,
> > >>> like
> > >>> > >> > > > increasing
> > >>> > >> > > > > > > memory
> > >>> > >> > > > > > > > > > > >> through configurations.
> > >>> > >> > > > > > > > > > > >>
> > >>> > >> > > > > > > > > > > >> Your suggestions to adjust threshold
> > >>> adaptively
> > >>> > or
> > >>> > >> to
> > >>> > >> > > use
> > >>> > >> > > > > the
> > >>> > >> > > > > > > > > blocking
> > >>> > >> > > > > > > > > > > >> buffer sounds good, and in my opinion we
> > can
> > >>> > >> postpone
> > >>> > >> > > them
> > >>> > >> > > > > to
> > >>> > >> > > > > > > > future
> > >>> > >> > > > > > > > > > > >> FLIPs since they seem to only benefit
> users
> > >>> in
> > >>> > rare
> > >>> > >> > > cases.
> > >>> > >> > > > > > Given
> > >>> > >> > > > > > > > > that
> > >>> > >> > > > > > > > > > > >> FLIP-423~428 has already been a big
> enough
> > >>> > design,
> > >>> > >> it
> > >>> > >> > > > might
> > >>> > >> > > > > be
> > >>> > >> > > > > > > > > better
> > >>> > >> > > > > > > > > > > >> to focus on the most critical design for
> > now
> > >>> and
> > >>> > >> > > postpone
> > >>> > >> > > > > > > > > > > >> optimizations like this. WDYT?
> > >>> > >> > > > > > > > > > > >>
> > >>> > >> > > > > > > > > > > >>
> > >>> > >> > > > > > > > > > > >> > 5. Sure, we will introduce new configs
> as
> > >>> well
> > >>> > as
> > >>> > >> > > their
> > >>> > >> > > > > > > default
> > >>> > >> > > > > > > > > > value.
> > >>> > >> > > > > > > > > > > >>
> > >>> > >> > > > > > > > > > > >> Thanks for adding the default values and
> > the
> > >>> > values
> > >>> > >> > > > > themselves
> > >>> > >> > > > > > > > LGTM.
> > >>> > >> > > > > > > > > > > >> It might be better to move the default
> > >>> values to
> > >>> > >> the
> > >>> > >> > > > > Proposed
> > >>> > >> > > > > > > > > Changes
> > >>> > >> > > > > > > > > > > >> section instead of making them public for
> > >>> now, as
> > >>> > >> there
> > >>> > >> > > > will
> > >>> > >> > > > > > be
> > >>> > >> > > > > > > > > > > >> compatibility issues once we want to
> > >>> dynamically
> > >>> > >> adjust
> > >>> > >> > > > the
> > >>> > >> > > > > > > > > thresholds
> > >>> > >> > > > > > > > > > > >> and timeouts in future.
> > >>> > >> > > > > > > > > > > >>
> > >>> > >> > > > > > > > > > > >> The configuration
> > >>> execution.async-state.enabled
> > >>> > >> seems
> > >>> > >> > > > > > > unnecessary,
> > >>> > >> > > > > > > > > as
> > >>> > >> > > > > > > > > > > >> the infrastructure may automatically get
> > this
> > >>> > >> > > information
> > >>> > >> > > > > from
> > >>> > >> > > > > > > the
> > >>> > >> > > > > > > > > > > >> detailed state backend configurations. We
> > may
> > >>> > >> revisit
> > >>> > >> > > this
> > >>> > >> > > > > > part
> > >>> > >> > > > > > > > > after
> > >>> > >> > > > > > > > > > > >> the core designs have reached an
> agreement.
> > >>> WDYT?
> > >>> > >> > > > > > > > > > > >>
> > >>> > >> > > > > > > > > > > >>
> > >>> > >> > > > > > > > > > > >> Besides, inspired by Jeyhun's comments,
> it
> > >>> comes
> > >>> > >> to me
> > >>> > >> > > > that
> > >>> > >> > > > > > > > > > > >>
> > >>> > >> > > > > > > > > > > >> 8. Should this FLIP introduce metrics
> that
> > >>> > measure
> > >>> > >> the
> > >>> > >> > > > time
> > >>> > >> > > > > a
> > >>> > >> > > > > > > > Flink
> > >>> > >> > > > > > > > > > > >> job is back-pressured by State IOs? Under
> > the
> > >>> > >> current
> > >>> > >> > > > > design,
> > >>> > >> > > > > > > this
> > >>> > >> > > > > > > > > > > >> metric could measure the time when the
> > >>> blocking
> > >>> > >> buffer
> > >>> > >> > > is
> > >>> > >> > > > > full
> > >>> > >> > > > > > > and
> > >>> > >> > > > > > > > > > > >> yield() cannot get callbacks to process,
> > >>> which
> > >>> > >> means the
> > >>> > >> > > > > > > operator
> > >>> > >> > > > > > > > is
> > >>> > >> > > > > > > > > > > >> fully waiting for state responses.
> > >>> > >> > > > > > > > > > > >>
> > >>> > >> > > > > > > > > > > >> Best regards,
> > >>> > >> > > > > > > > > > > >> Yunfeng
> > >>> > >> > > > > > > > > > > >>
> > >>> > >> > > > > > > > > > > >> On Mon, Mar 4, 2024 at 12:33 PM Zakelly
> > Lan <
> > >>> > >> > > > > > > > zakelly....@gmail.com>
> > >>> > >> > > > > > > > > > wrote:
> > >>> > >> > > > > > > > > > > >> >
> > >>> > >> > > > > > > > > > > >> > Hi Yunfeng,
> > >>> > >> > > > > > > > > > > >> >
> > >>> > >> > > > > > > > > > > >> > Thanks for your detailed comments!
> > >>> > >> > > > > > > > > > > >> >
> > >>> > >> > > > > > > > > > > >> >> 1. Why do we need a close() method on
> > >>> > >> StateIterator?
> > >>> > >> > > > This
> > >>> > >> > > > > > > > method
> > >>> > >> > > > > > > > > > seems
> > >>> > >> > > > > > > > > > > >> >> unused in the usage example codes.
> > >>> > >> > > > > > > > > > > >> >
> > >>> > >> > > > > > > > > > > >> >
> > >>> > >> > > > > > > > > > > >> > The `close()` is introduced to release
> > >>> internal
> > >>> > >> > > > resources,
> > >>> > >> > > > > > but
> > >>> > >> > > > > > > > it
> > >>> > >> > > > > > > > > > does not seem to require the user to call it.
> I
> > >>> > removed
> > >>> > >> this.
> > >>> > >> > > > > > > > > > > >> >
> > >>> > >> > > > > > > > > > > >> >> 2. In FutureUtils.combineAll()'s
> > JavaDoc,
> > >>> it
> > >>> > is
> > >>> > >> > > stated
> > >>

Reply via email to