Thanks Zakelly for driving the entire discussion and making endless efforts
to improve the proposal.  Thanks everyone for giving invaluable input and
suggestions.

Asycn execution is fundamental to disaggregate CPU utils and IO/network
utils. It is also widely adopted in many modern systems to improve IO utils
without heavily increasing other resources (CPU/MEM for example). I believe
this change will take Flink to the next stage supporting IO-intensive
queries.

 I will start a vote for the FLIP.

Best
Yuan

On Wed, Mar 27, 2024 at 5:40 PM Zakelly Lan <zakelly....@gmail.com> wrote:

> Thank Piotrek for your valuable input!
>
> I will prepare the following FLIPs about faster checkpointing in the
> current async execution model and the new APIs. And I have added some brief
> description of this part in FLIP-423/424/425.
>
> Regarding your concern:
>
> >  My main concern here, is to prevent a situation where we
>
> end up with duplicate code base of the operators:
>
> - the current set of operators that are well behaving during checkpointing,
> > but are synchronous
> > - some set of async operators that will be miss-behaving during
> checkpoints
> >
>
> Yes, that's definitely what we should avoid. Let's thoroughly refine the
> checkpointing behavior before the SQL operator rework in milestone 2.
>
>
> Best,
> Zakelly
>
> On Wed, Mar 27, 2024 at 4:30 PM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
> > Hi!
> >
> > Yes, after some long offline discussions we agreed to proceed as planned
> > here, but we should treat the current API as experimental. The issues are
> > that either we can not checkpoint lambdas as they are currently defined,
> > leading to problems caused by in-flight records draining under
> > backpressure.
> >
> > <side comment>
> > 6000 records that's not far off from the amount of records buffered in
> > network buffers for smaller parallelism back pressured jobs, and before
> > unaligned checkpoints were implemented, even such jobs were often seen
> with
> > checkpointing times exploding to tens of minutes/hours.
> > And special handling of watermarks wouldn't solve the problem - problem
> > might be caused by upstream window operator flushing records on
> watermark,
> > while downstream operator that has those 6000 in-flight records is also
> > backpressured. Then processing of the watermark in the upstream operator
> > will be tied to the downstream operator draining all of those in-flight
> > records.
> > </side comment>
> >
> > On the other hand checkpointing in-flight requests with the current API
> > would require serializing lambdas into the checkpointed state, which has
> > significant limitations on it's own:
> > - unable to upgrade (including bug fix) JRE
> > - problems when updating dependencies of the code inside the lambdas, if
> > the updated dependencies are not binary compatible
> > - no way to fix bugs inside the lambdas - users might get stuck in an
> > unrecoverable state
> >
> > After brainstorming couple of different options, we came up with a couple
> > of solutions to those issues, one that we liked the most looks like:
> >
> > ```
> > public void open() {
> >     // name (bid),  in type, consumer
> >     consumer("GET", Void.class, (v) -> {
> >         return getState("wordcount").get();
> >     });
> >     consumer("UPDATE", Integer.class, (v) -> {
> >         return getState("wordcount").update(v == null ? 1 : v + 1);
> >     });
> >     consumer("OUT", Integer.class, (v) -> {
> >         getContext().collect(v);
> >     });
> > }
> > public void processElement() {
> >     do("GET").then("UPDATE").then("OUT");
> > }
> > ```
> >
> > Where "GET" "UPDATE" "OUT" are some uid's (`block-id`/`bid`). This way
> > users could declare upfront during the operator's startup what
> > method/function should handle a given `bid` in the current execution
> > attempt. When checkpointing in-flight async state requests, Flink would
> > store only the registered `bid`, not the serialised code itself. This
> would
> > avoid problems of serializing lambdas.
> >
> > However, to not postpone this effort we reached a consensus that we can
> > proceed with the current proposal and treat the currently proposed Async
> > API (without declaring code upfront) as experimental/PoC - a test bed for
> > the whole disaggregated state backend. Not intended to be widely used in
> > the code base. And in parallel in a follow up FLIP, we could discuss the
> > exact shape of the declarative async API that would be actually
> > checkpointable. My main concern here, is to prevent a situation where we
> > end up with duplicate code base of the operators:
> > - the current set of operators that are well behaving during
> checkpointing,
> > but are synchronous
> > - some set of async operators that will be miss-behaving during
> checkpoints
> >
> > We should really try to avoid this scenario.
> >
> > Best,
> > Piotrek
> >
> > śr., 27 mar 2024 o 05:06 Zakelly Lan <zakelly....@gmail.com> napisał(a):
> >
> > > Hi devs,
> > >
> > > It seems there is no more concern or suggestion for a while. We'd like
> to
> > > start voting recently.
> > >
> > >
> > > Best,
> > > Zakelly
> > >
> > > On Wed, Mar 27, 2024 at 11:46 AM Zakelly Lan <zakelly....@gmail.com>
> > > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > Piotr and I had a long discussion about the checkpoint duration
> issue.
> > We
> > > > think that the lambda serialization approach I proposed in last mail
> > may
> > > > bring in more issues, the most serious one is that users may not be
> > able
> > > to
> > > > modify their code in serialized lambda to perform a bug fix.
> > > >
> > > > But fortunately we found a possible solution. By introducing a set of
> > > > declarative APIs and a `declareProcess()` function that users should
> > > > implement in some newly introduced AbstractStreamOperator, we could
> get
> > > the
> > > > declaration of record processing in runtime, broken down to requests
> > and
> > > > callbacks (lambdas) like FLIP-424 introduced. Thus we avoid the
> problem
> > > of
> > > > lambda (de)serialization and instead we retrieve callbacks every time
> > > > before a task runs. The next step is to provide an API allowing users
> > to
> > > > assign an unique id to each state request and callback, or
> > automatically
> > > > assign one by declaration order. Thus we can find the corresponding
> > > > callback in runtime for each restored state request based on the id,
> > then
> > > > the whole pipeline can be resumed.
> > > >
> > > > Note that all these above are internal and won't expose to average
> > users.
> > > > Exposing this on Stream APIs can be discussed later. I will prepare
> > > another
> > > > FLIP(s) describing the whole optimized checkpointing process, and in
> > the
> > > > meantime, we can proceed on current FLIPs. The new FLIP(s) are built
> on
> > > top
> > > > of current ones and we can achieve this incrementally.
> > > >
> > > >
> > > > Best,
> > > > Zakelly
> > > >
> > > > On Thu, Mar 21, 2024 at 12:31 AM Zakelly Lan <zakelly....@gmail.com>
> > > > wrote:
> > > >
> > > >> Hi Piotrek,
> > > >>
> > > >> Thanks for your comments!
> > > >>
> > > >> As we discussed off-line, you agreed that we can not checkpoint
> while
> > > some
> > > >>> records are in the middle of being
> > > >>> processed. That we would have to drain the in-progress records
> before
> > > >>> doing
> > > >>> the checkpoint. You also argued
> > > >>> that this is not a problem, because the size of this buffer can be
> > > >>> configured.
> > > >>>
> > > >>> I'm really afraid of such a solution. I've seen in the past plenty
> of
> > > >>> times, that whenever Flink has to drain some
> > > >>> buffered records, eventually that always brakes timely
> checkpointing
> > > (and
> > > >>> hence ability for Flink to rescale in
> > > >>> a timely manner). Even a single record with a `flatMap` like
> operator
> > > >>> currently in Flink causes problems during
> > > >>> back pressure. That's especially true for example for processing
> > > >>> watermarks. At the same time, I don't see how
> > > >>> this value could be configured by even Flink's power users, let
> alone
> > > an
> > > >>> average user. The size of that in-flight
> > > >>> buffer not only depends on a particular query/job, but also the
> > "good"
> > > >>> value changes dynamically over time,
> > > >>> and can change very rapidly. Sudden spikes of records or
> > backpressure,
> > > >>> some
> > > >>> hiccup during emitting watermarks,
> > > >>> all of those could change in an instant the theoretically optimal
> > > buffer
> > > >>> size of let's say "6000" records, down to "1".
> > > >>> And when those changes happen, those are the exact times when
> timely
> > > >>> checkpointing matters the most.
> > > >>> If the load pattern suddenly changes, and checkpointing takes
> > suddenly
> > > >>> tens
> > > >>> of minutes instead of a couple of
> > > >>> seconds, it means you can not use rescaling and you are forced to
> > > >>> overprovision the resources. And there also
> > > >>> other issues if checkpointing takes too long.
> > > >>>
> > > >>
> > > >> I'm gonna clarify some misunderstanding here. First of all, is the
> > sync
> > > >> phase of checkpointing for the current plan longer than the
> > synchronous
> > > >> execution model? The answer is yes, it is a trade-off for parallel
> > > >> execution model. I think the cost is worth the improvement. Now the
> > > >> question is, how much longer are we talking about? The PoC result I
> > > >> provided is that it takes 3 seconds to drain 6000 records of a
> simple
> > > job,
> > > >> and I said it is not a big deal. Even though you would say we may
> > > encounter
> > > >> long watermark/timer processing that make the cp wait, thus I
> provide
> > > >> several ways to optimize this:
> > > >>
> > > >>    1. Instead of only controlling the in-flight records, we could
> > > >>    control the in-flight watermark.
> > > >>    2. Since we have broken down the record processing into several
> > state
> > > >>    requests with at most one subsequent callback for each request,
> the
> > > cp can
> > > >>    be processed after current RUNNING requests (NOT records) and
> their
> > > >>    callbacks finish. Which means, even though we have a lot of
> records
> > > >>    in-flight (I mean in 'processElement' here), once only a small
> > group
> > > of
> > > >>    state requests finishes, the cp can proceed. They will form into
> 1
> > > or 2
> > > >>    multiGets to rocksdb, which takes less time. Moreover, the timer
> > > processing
> > > >>    is also split into several requests, so cp won't wait for the
> whole
> > > timer
> > > >>    to finish. The picture attached describes this idea.
> > > >>
> > > >> And the size of this buffer can be configured. I'm not counting on
> > > >> average users to configure it well, I'm just saying that we'd better
> > not
> > > >> focus on absolute numbers of PoC or specific cases since we can
> always
> > > >> provide a conservative default value to make this acceptable for
> most
> > > >> cases.The adaptive buffer size is also worth a try if we provide a
> > > >> conservative strategy.
> > > >>
> > > >> Besides, I don't understand why the load pattern or spikes or
> > > >> backpressure will affect this. We are controlling the records that
> can
> > > get
> > > >> in the 'processElement' and the state requests that can fire in
> > > parallel,
> > > >> no matter how high the load spikes, they will be blocked outside. It
> > is
> > > >> relatively stable within the proposed execution model itself. The
> > > unaligned
> > > >> barrier will skip those inputs in the queue as before.
> > > >>
> > > >>
> > > >> At the same time, I still don't understand why we can not implement
> > > things
> > > >>> incrementally? First
> > > >>> let's start with the current API, without the need to rewrite all
> of
> > > the
> > > >>> operators, we can asynchronously fetch whole
> > > >>> state for a given record using its key. That should already vastly
> > > >>> improve
> > > >>> many things, and this way we could
> > > >>> perform a checkpoint without a need of draining the
> > > in-progress/in-flight
> > > >>> buffer. We could roll that version out,
> > > >>> test it out in practice, and then we could see if the fine grained
> > > state
> > > >>> access is really needed. Otherwise it sounds
> > > >>> to me like a premature optimization, that requires us to not only
> > > >>> rewrite a
> > > >>> lot of the code, but also to later maintain
> > > >>> it, even if it ultimately proves to be not needed. Which I of
> course
> > > can
> > > >>> not be certain but I have a feeling that it
> > > >>> might be the case.
> > > >>
> > > >>
> > > >> The disaggregated state management we proposed is target at
> including
> > > but
> > > >> not limited to the following challenges:
> > > >>
> > > >>    1. Local disk constraints, including limited I/O and space.
> > > >>    (discussed in FLIP-423)
> > > >>    2. Unbind the I/O resource with pre-allocated CPU resource, to
> make
> > > >>    good use of both (motivation of FLIP-424)
> > > >>    3. Elasticity of scaling I/O or storage capacity.
> > > >>
> > > >> Thus our plan is dependent on DFS, using local disk as an optional
> > cache
> > > >> only. The pre-fetching plan you mentioned is still binding I/O with
> > CPU
> > > >> resources, and will consume even more I/O to load unnecessary state.
> > It
> > > >> makes things worse. Please note that we are not targeting some
> > scenarios
> > > >> where the local state could handle well, and our goal is not to
> > replace
> > > the
> > > >> local state.
> > > >>
> > > >> And If manpower is a big concern of yours, I would say many of my
> > > >> colleagues could help contribute in runtime or SQL operators. It is
> > > >> experimental on a separate code path other than the local state and
> > > will be
> > > >> recommended to use only when we prove it mature.
> > > >>
> > > >>
> > > >> Thanks & Best,
> > > >> Zakelly
> > > >>
> > > >> On Wed, Mar 20, 2024 at 10:04 PM Piotr Nowojski <
> pnowoj...@apache.org
> > >
> > > >> wrote:
> > > >>
> > > >>> Hey Zakelly!
> > > >>>
> > > >>> Sorry for the late reply. I still have concerns about the proposed
> > > >>> solution, with my main concerns coming from
> > > >>> the implications of the asynchronous state access API on the
> > > >>> checkpointing
> > > >>> and responsiveness of Flink.
> > > >>>
> > > >>> >> What also worries me a lot in this fine grained model is the
> > effect
> > > on
> > > >>> the checkpointing times.
> > > >>> >
> > > >>> > Your concerns are very reasonable. Faster checkpointing is
> always a
> > > >>> core
> > > >>> advantage of disaggregated state,
> > > >>> > but only for the async phase. There will be some complexity
> > > introduced
> > > >>> by
> > > >>> in-flight requests, but I'd suggest
> > > >>> > a checkpoint containing those in-flight state requests as part of
> > the
> > > >>> state, to accelerate the sync phase by
> > > >>> > skipping the buffer draining. This makes the buffer size have
> > little
> > > >>> impact on checkpoint time. And all the
> > > >>> > changes keep within the execution model we proposed while the
> > > >>> checkpoint
> > > >>> barrier alignment or handling
> > > >>> > will not be touched in our proposal, so I guess the complexity is
> > > >>> relatively controllable. I have faith in that :)
> > > >>>
> > > >>> As we discussed off-line, you agreed that we can not checkpoint
> while
> > > >>> some
> > > >>> records are in the middle of being
> > > >>> processed. That we would have to drain the in-progress records
> before
> > > >>> doing
> > > >>> the checkpoint. You also argued
> > > >>> that this is not a problem, because the size of this buffer can be
> > > >>> configured.
> > > >>>
> > > >>> I'm really afraid of such a solution. I've seen in the past plenty
> of
> > > >>> times, that whenever Flink has to drain some
> > > >>> buffered records, eventually that always brakes timely
> checkpointing
> > > (and
> > > >>> hence ability for Flink to rescale in
> > > >>> a timely manner). Even a single record with a `flatMap` like
> operator
> > > >>> currently in Flink causes problems during
> > > >>> back pressure. That's especially true for example for processing
> > > >>> watermarks. At the same time, I don't see how
> > > >>> this value could be configured by even Flink's power users, let
> alone
> > > an
> > > >>> average user. The size of that in-flight
> > > >>> buffer not only depends on a particular query/job, but also the
> > "good"
> > > >>> value changes dynamically over time,
> > > >>> and can change very rapidly. Sudden spikes of records or
> > backpressure,
> > > >>> some
> > > >>> hiccup during emitting watermarks,
> > > >>> all of those could change in an instant the theoretically optimal
> > > buffer
> > > >>> size of let's say "6000" records, down to "1".
> > > >>> And when those changes happen, those are the exact times when
> timely
> > > >>> checkpointing matters the most.
> > > >>> If the load pattern suddenly changes, and checkpointing takes
> > suddenly
> > > >>> tens
> > > >>> of minutes instead of a couple of
> > > >>> seconds, it means you can not use rescaling and you are forced to
> > > >>> overprovision the resources. And there also
> > > >>> other issues if checkpointing takes too long.
> > > >>>
> > > >>> At the same time, I still don't understand why we can not implement
> > > >>> things
> > > >>> incrementally? First
> > > >>> let's start with the current API, without the need to rewrite all
> of
> > > the
> > > >>> operators, we can asynchronously fetch whole
> > > >>> state for a given record using its key. That should already vastly
> > > >>> improve
> > > >>> many things, and this way we could
> > > >>> perform a checkpoint without a need of draining the
> > > in-progress/in-flight
> > > >>> buffer. We could roll that version out,
> > > >>> test it out in practice, and then we could see if the fine grained
> > > state
> > > >>> access is really needed. Otherwise it sounds
> > > >>> to me like a premature optimization, that requires us to not only
> > > >>> rewrite a
> > > >>> lot of the code, but also to later maintain
> > > >>> it, even if it ultimately proves to be not needed. Which I of
> course
> > > can
> > > >>> not be certain but I have a feeling that it
> > > >>> might be the case.
> > > >>>
> > > >>> Best,
> > > >>> Piotrek
> > > >>>
> > > >>> wt., 19 mar 2024 o 10:42 Zakelly Lan <zakelly....@gmail.com>
> > > napisał(a):
> > > >>>
> > > >>> > Hi everyone,
> > > >>> >
> > > >>> > Thanks for your valuable feedback!
> > > >>> >
> > > >>> > Our discussions have been going on for a while and are nearing a
> > > >>> > consensus. So I would like to start a vote after 72 hours.
> > > >>> >
> > > >>> > Please let me know if you have any concerns, thanks!
> > > >>> >
> > > >>> >
> > > >>> > Best,
> > > >>> > Zakelly
> > > >>> >
> > > >>> > On Tue, Mar 19, 2024 at 3:37 PM Zakelly Lan <
> zakelly....@gmail.com
> > >
> > > >>> wrote:
> > > >>> >
> > > >>> > > Hi Yunfeng,
> > > >>> > >
> > > >>> > > Thanks for the suggestion!
> > > >>> > >
> > > >>> > > I will reorganize the FLIP-425 accordingly.
> > > >>> > >
> > > >>> > >
> > > >>> > > Best,
> > > >>> > > Zakelly
> > > >>> > >
> > > >>> > > On Tue, Mar 19, 2024 at 3:20 PM Yunfeng Zhou <
> > > >>> > flink.zhouyunf...@gmail.com>
> > > >>> > > wrote:
> > > >>> > >
> > > >>> > >> Hi Xintong and Zakelly,
> > > >>> > >>
> > > >>> > >> > 2. Regarding Strictly-ordered and Out-of-order of Watermarks
> > > >>> > >> I agree with it that watermarks can use only out-of-order mode
> > for
> > > >>> > >> now, because there is still not a concrete example showing the
> > > >>> > >> correctness risk about it. However, the strictly-ordered mode
> > > should
> > > >>> > >> still be supported as the default option for non-record event
> > > types
> > > >>> > >> other than watermark, at least for checkpoint barriers.
> > > >>> > >>
> > > >>> > >> I noticed that this information has already been documented in
> > > "For
> > > >>> > >> other non-record events, such as RecordAttributes ...", but
> it's
> > > at
> > > >>> > >> the bottom of the "Watermark" section, which might not be very
> > > >>> > >> obvious. Thus it might be better to reorganize the FLIP to
> > better
> > > >>> > >> claim that the two order modes are designed for all non-record
> > > >>> events,
> > > >>> > >> and which mode this FLIP would choose for each type of event.
> > > >>> > >>
> > > >>> > >> Best,
> > > >>> > >> Yunfeng
> > > >>> > >>
> > > >>> > >> On Tue, Mar 19, 2024 at 1:09 PM Xintong Song <
> > > tonysong...@gmail.com
> > > >>> >
> > > >>> > >> wrote:
> > > >>> > >> >
> > > >>> > >> > Thanks for the quick response. Sounds good to me.
> > > >>> > >> >
> > > >>> > >> > Best,
> > > >>> > >> >
> > > >>> > >> > Xintong
> > > >>> > >> >
> > > >>> > >> >
> > > >>> > >> >
> > > >>> > >> > On Tue, Mar 19, 2024 at 1:03 PM Zakelly Lan <
> > > >>> zakelly....@gmail.com>
> > > >>> > >> wrote:
> > > >>> > >> >
> > > >>> > >> > > Hi Xintong,
> > > >>> > >> > >
> > > >>> > >> > > Thanks for sharing your thoughts!
> > > >>> > >> > >
> > > >>> > >> > > 1. Regarding Record-ordered and State-ordered of
> > > processElement.
> > > >>> > >> > > >
> > > >>> > >> > > > I understand that while State-ordered likely provides
> > better
> > > >>> > >> performance,
> > > >>> > >> > > > Record-ordered is sometimes required for correctness.
> The
> > > >>> question
> > > >>> > >> is how
> > > >>> > >> > > > should a user choose between these two modes? My concern
> > is
> > > >>> that
> > > >>> > >> such a
> > > >>> > >> > > > decision may require users to have in-depth knowledge
> > about
> > > >>> the
> > > >>> > >> Flink
> > > >>> > >> > > > internals, and may lead to correctness issues if
> > > >>> State-ordered is
> > > >>> > >> chosen
> > > >>> > >> > > > improperly.
> > > >>> > >> > > >
> > > >>> > >> > > > I'd suggest not to expose such a knob, at least in the
> > first
> > > >>> > >> version.
> > > >>> > >> > > That
> > > >>> > >> > > > means always use Record-ordered for custom operators /
> > UDFs,
> > > >>> and
> > > >>> > >> keep
> > > >>> > >> > > > State-ordered for internal usages (built-in operators)
> > only.
> > > >>> > >> > > >
> > > >>> > >> > >
> > > >>> > >> > > Indeed, users may not be able to choose the mode
> properly. I
> > > >>> agree
> > > >>> > to
> > > >>> > >> keep
> > > >>> > >> > > such options for internal use.
> > > >>> > >> > >
> > > >>> > >> > >
> > > >>> > >> > > 2. Regarding Strictly-ordered and Out-of-order of
> > Watermarks.
> > > >>> > >> > > >
> > > >>> > >> > > > I'm not entirely sure about Strictly-ordered being the
> > > >>> default, or
> > > >>> > >> even
> > > >>> > >> > > > being supported. From my understanding, a Watermark(T)
> > only
> > > >>> > >> suggests that
> > > >>> > >> > > > all records with event time before T has arrived, and it
> > has
> > > >>> > >> nothing to
> > > >>> > >> > > do
> > > >>> > >> > > > with whether records with event time after T has arrived
> > or
> > > >>> not.
> > > >>> > >> From
> > > >>> > >> > > that
> > > >>> > >> > > > perspective, preventing certain records from arriving
> > > before a
> > > >>> > >> Watermark
> > > >>> > >> > > is
> > > >>> > >> > > > never supported. I also cannot come up with any use case
> > > where
> > > >>> > >> > > > Strictly-ordered is necessary. This implies the same
> issue
> > > as
> > > >>> 1):
> > > >>> > >> how
> > > >>> > >> > > does
> > > >>> > >> > > > the user choose between the two modes?
> > > >>> > >> > > >
> > > >>> > >> > > > I'd suggest not expose the knob to users and only
> support
> > > >>> > >> Out-of-order,
> > > >>> > >> > > > until we see a concrete use case that Strictly-ordered
> is
> > > >>> needed.
> > > >>> > >> > > >
> > > >>> > >> > >
> > > >>> > >> > > The semantics of watermarks do not define the sequence
> > > between a
> > > >>> > >> watermark
> > > >>> > >> > > and subsequent records. For the most part, this is
> > > >>> inconsequential,
> > > >>> > >> except
> > > >>> > >> > > it may affect some current users who have previously
> relied
> > on
> > > >>> the
> > > >>> > >> implicit
> > > >>> > >> > > assumption of an ordered execution. I'd be fine with
> > initially
> > > >>> > >> supporting
> > > >>> > >> > > only out-of-order processing. We may consider exposing the
> > > >>> > >> > > 'Strictly-ordered' mode once we encounter a concrete use
> > case
> > > >>> that
> > > >>> > >> > > necessitates it.
> > > >>> > >> > >
> > > >>> > >> > >
> > > >>> > >> > > My philosophies behind not exposing the two config options
> > > are:
> > > >>> > >> > > > - There are already too many options in Flink that
> barely
> > > >>> know how
> > > >>> > >> to use
> > > >>> > >> > > > them. I think Flink should try as much as possible to
> > decide
> > > >>> its
> > > >>> > own
> > > >>> > >> > > > behavior, rather than throwing all the decisions to the
> > > users.
> > > >>> > >> > > > - It's much harder to take back knobs than to introduce
> > > them.
> > > >>> > >> Therefore,
> > > >>> > >> > > > options should be introduced only if concrete use cases
> > are
> > > >>> > >> identified.
> > > >>> > >> > > >
> > > >>> > >> > >
> > > >>> > >> > > I agree to keep minimal configurable items especially for
> > the
> > > >>> MVP.
> > > >>> > >> Given
> > > >>> > >> > > that we have the opportunity to refine the functionality
> > > before
> > > >>> the
> > > >>> > >> > > framework transitions from @Experimental to
> @PublicEvolving,
> > > it
> > > >>> > makes
> > > >>> > >> sense
> > > >>> > >> > > to refrain from presenting user-facing options until we
> have
> > > >>> ensured
> > > >>> > >> > > their necessity.
> > > >>> > >> > >
> > > >>> > >> > >
> > > >>> > >> > > Best,
> > > >>> > >> > > Zakelly
> > > >>> > >> > >
> > > >>> > >> > > On Tue, Mar 19, 2024 at 12:06 PM Xintong Song <
> > > >>> > tonysong...@gmail.com>
> > > >>> > >> > > wrote:
> > > >>> > >> > >
> > > >>> > >> > > > Sorry for joining the discussion late.
> > > >>> > >> > > >
> > > >>> > >> > > > I have two questions about FLIP-425.
> > > >>> > >> > > >
> > > >>> > >> > > > 1. Regarding Record-ordered and State-ordered of
> > > >>> processElement.
> > > >>> > >> > > >
> > > >>> > >> > > > I understand that while State-ordered likely provides
> > better
> > > >>> > >> performance,
> > > >>> > >> > > > Record-ordered is sometimes required for correctness.
> The
> > > >>> question
> > > >>> > >> is how
> > > >>> > >> > > > should a user choose between these two modes? My concern
> > is
> > > >>> that
> > > >>> > >> such a
> > > >>> > >> > > > decision may require users to have in-depth knowledge
> > about
> > > >>> the
> > > >>> > >> Flink
> > > >>> > >> > > > internals, and may lead to correctness issues if
> > > >>> State-ordered is
> > > >>> > >> chosen
> > > >>> > >> > > > improperly.
> > > >>> > >> > > >
> > > >>> > >> > > > I'd suggest not to expose such a knob, at least in the
> > first
> > > >>> > >> version.
> > > >>> > >> > > That
> > > >>> > >> > > > means always use Record-ordered for custom operators /
> > UDFs,
> > > >>> and
> > > >>> > >> keep
> > > >>> > >> > > > State-ordered for internal usages (built-in operators)
> > only.
> > > >>> > >> > > >
> > > >>> > >> > > > 2. Regarding Strictly-ordered and Out-of-order of
> > > Watermarks.
> > > >>> > >> > > >
> > > >>> > >> > > > I'm not entirely sure about Strictly-ordered being the
> > > >>> default, or
> > > >>> > >> even
> > > >>> > >> > > > being supported. From my understanding, a Watermark(T)
> > only
> > > >>> > >> suggests that
> > > >>> > >> > > > all records with event time before T has arrived, and it
> > has
> > > >>> > >> nothing to
> > > >>> > >> > > do
> > > >>> > >> > > > with whether records with event time after T has arrived
> > or
> > > >>> not.
> > > >>> > >> From
> > > >>> > >> > > that
> > > >>> > >> > > > perspective, preventing certain records from arriving
> > > before a
> > > >>> > >> Watermark
> > > >>> > >> > > is
> > > >>> > >> > > > never supported. I also cannot come up with any use case
> > > where
> > > >>> > >> > > > Strictly-ordered is necessary. This implies the same
> issue
> > > as
> > > >>> 1):
> > > >>> > >> how
> > > >>> > >> > > does
> > > >>> > >> > > > the user choose between the two modes?
> > > >>> > >> > > >
> > > >>> > >> > > > I'd suggest not expose the knob to users and only
> support
> > > >>> > >> Out-of-order,
> > > >>> > >> > > > until we see a concrete use case that Strictly-ordered
> is
> > > >>> needed.
> > > >>> > >> > > >
> > > >>> > >> > > >
> > > >>> > >> > > > My philosophies behind not exposing the two config
> options
> > > >>> are:
> > > >>> > >> > > > - There are already too many options in Flink that
> barely
> > > >>> know how
> > > >>> > >> to use
> > > >>> > >> > > > them. I think Flink should try as much as possible to
> > decide
> > > >>> its
> > > >>> > own
> > > >>> > >> > > > behavior, rather than throwing all the decisions to the
> > > users.
> > > >>> > >> > > > - It's much harder to take back knobs than to introduce
> > > them.
> > > >>> > >> Therefore,
> > > >>> > >> > > > options should be introduced only if concrete use cases
> > are
> > > >>> > >> identified.
> > > >>> > >> > > >
> > > >>> > >> > > > WDYT?
> > > >>> > >> > > >
> > > >>> > >> > > > Best,
> > > >>> > >> > > >
> > > >>> > >> > > > Xintong
> > > >>> > >> > > >
> > > >>> > >> > > >
> > > >>> > >> > > >
> > > >>> > >> > > > On Fri, Mar 8, 2024 at 2:45 AM Jing Ge
> > > >>> <j...@ververica.com.invalid
> > > >>> > >
> > > >>> > >> > > wrote:
> > > >>> > >> > > >
> > > >>> > >> > > > > +1 for Gyula's suggestion. I just finished FLIP-423
> > which
> > > >>> > >> introduced
> > > >>> > >> > > the
> > > >>> > >> > > > > intention of the big change and high level
> architecture.
> > > >>> Great
> > > >>> > >> content
> > > >>> > >> > > > btw!
> > > >>> > >> > > > > The only public interface change for this FLIP is one
> > new
> > > >>> config
> > > >>> > >> to use
> > > >>> > >> > > > > ForSt. It makes sense to have one dedicated discussion
> > > >>> thread
> > > >>> > for
> > > >>> > >> each
> > > >>> > >> > > > > concrete system design.
> > > >>> > >> > > > >
> > > >>> > >> > > > > @Zakelly The links in your mail do not work except the
> > > last
> > > >>> one,
> > > >>> > >> > > because
> > > >>> > >> > > > > the FLIP-xxx has been included into the url like
> > > >>> > >> > > > >
> > > >>> > >> > >
> > > >>> > >>
> > > >>> >
> > > >>>
> > >
> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425
> > > >>> > >> > > > .
> > > >>> > >> > > > >
> > > >>> > >> > > > > NIT fix:
> > > >>> > >> > > > >
> > > >>> > >> > > > > FLIP-424:
> > > >>> > >> > > >
> > > >>> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
> > > >>> > >> > > > >
> > > >>> > >> > > > > FLIP-425:
> > > >>> > >> > > >
> > > >>> https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h
> > > >>> > >> > > > >
> > > >>> > >> > > > > FLIP-426:
> > > >>> > >> > > >
> > > >>> https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf
> > > >>> > >> > > > >
> > > >>> > >> > > > > FLIP-427:
> > > >>> > >> > > >
> > > >>> https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft
> > > >>> > >> > > > >
> > > >>> > >> > > > > FLIP-428:
> > > >>> > >> > > >
> > > >>> https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b
> > > >>> > >> > > > >
> > > >>> > >> > > > > Best regards,
> > > >>> > >> > > > > Jing
> > > >>> > >> > > > >
> > > >>> > >> > > > >
> > > >>> > >> > > > >
> > > >>> > >> > > > >
> > > >>> > >> > > > > On Thu, Mar 7, 2024 at 10:14 AM Zakelly Lan <
> > > >>> > >> zakelly....@gmail.com>
> > > >>> > >> > > > wrote:
> > > >>> > >> > > > >
> > > >>> > >> > > > > > Hi everyone,
> > > >>> > >> > > > > >
> > > >>> > >> > > > > > Thank you all for a lively discussion here, and it
> is
> > a
> > > >>> good
> > > >>> > >> time to
> > > >>> > >> > > > move
> > > >>> > >> > > > > > forward to more detailed discussions. Thus we open
> > > several
> > > >>> > >> threads
> > > >>> > >> > > for
> > > >>> > >> > > > > > sub-FLIPs:
> > > >>> > >> > > > > >
> > > >>> > >> > > > > > FLIP-424:
> > > >>> > >> > > > >
> > > >>> > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
> > > >>> > >> > > > > > FLIP-425
> > > >>> > >> > > > > > <
> > > >>> > >> > > > >
> > > >>> > >> > >
> > > >>> > >>
> > > >>> >
> > > >>>
> > >
> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425
> > > >>> > >> > > > >:
> > > >>> > >> > > > > >
> > > >>> > >>
> > https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h
> > > >>> > >> > > > > > FLIP-426
> > > >>> > >> > > > > > <
> > > >>> > >> > > > >
> > > >>> > >> > >
> > > >>> > >>
> > > >>> >
> > > >>>
> > >
> https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0hFLIP-426
> > > >>> > >> > > > >:
> > > >>> > >> > > > > >
> > > >>> > >>
> > https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf
> > > >>> > >> > > > > > FLIP-427
> > > >>> > >> > > > > > <
> > > >>> > >> > > > >
> > > >>> > >> > >
> > > >>> > >>
> > > >>> >
> > > >>>
> > >
> https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrfFLIP-427
> > > >>> > >> > > > >:
> > > >>> > >> > > > > >
> > > >>> > >>
> > https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft
> > > >>> > >> > > > > > FLIP-428
> > > >>> > >> > > > > > <
> > > >>> > >> > > > >
> > > >>> > >> > >
> > > >>> > >>
> > > >>> >
> > > >>>
> > >
> https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ftFLIP-428
> > > >>> > >> > > > >:
> > > >>> > >> > > > > >
> > > >>> > >>
> > https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b
> > > >>> > >> > > > > >
> > > >>> > >> > > > > > If you want to talk about the overall architecture,
> > > >>> roadmap,
> > > >>> > >> > > milestones
> > > >>> > >> > > > > or
> > > >>> > >> > > > > > something related with multiple FLIPs, please post
> it
> > > >>> here.
> > > >>> > >> Otherwise
> > > >>> > >> > > > you
> > > >>> > >> > > > > > can discuss some details in separate mails. Let's
> try
> > to
> > > >>> avoid
> > > >>> > >> > > repeated
> > > >>> > >> > > > > > discussion in different threads. I will sync
> important
> > > >>> > messages
> > > >>> > >> here
> > > >>> > >> > > if
> > > >>> > >> > > > > > there are any in the above threads.
> > > >>> > >> > > > > >
> > > >>> > >> > > > > > And reply to @Jeyhun: We will ensure the content
> > between
> > > >>> those
> > > >>> > >> FLIPs
> > > >>> > >> > > is
> > > >>> > >> > > > > > consistent.
> > > >>> > >> > > > > >
> > > >>> > >> > > > > >
> > > >>> > >> > > > > > Best,
> > > >>> > >> > > > > > Zakelly
> > > >>> > >> > > > > >
> > > >>> > >> > > > > > On Thu, Mar 7, 2024 at 2:16 PM Yuan Mei <
> > > >>> > yuanmei.w...@gmail.com
> > > >>> > >> >
> > > >>> > >> > > > wrote:
> > > >>> > >> > > > > >
> > > >>> > >> > > > > > > I have been a bit busy these few weeks and sorry
> for
> > > >>> > >> responding
> > > >>> > >> > > late.
> > > >>> > >> > > > > > >
> > > >>> > >> > > > > > > The original thinking of keeping discussion within
> > one
> > > >>> > thread
> > > >>> > >> is
> > > >>> > >> > > for
> > > >>> > >> > > > > > easier
> > > >>> > >> > > > > > > tracking and avoid for repeated discussion in
> > > different
> > > >>> > >> threads.
> > > >>> > >> > > > > > >
> > > >>> > >> > > > > > > For details, It might be good to start in
> different
> > > >>> threads
> > > >>> > if
> > > >>> > >> > > > needed.
> > > >>> > >> > > > > > >
> > > >>> > >> > > > > > > We will think of a way to better organize the
> > > >>> discussion.
> > > >>> > >> > > > > > >
> > > >>> > >> > > > > > > Best
> > > >>> > >> > > > > > > Yuan
> > > >>> > >> > > > > > >
> > > >>> > >> > > > > > >
> > > >>> > >> > > > > > > On Thu, Mar 7, 2024 at 4:38 AM Jeyhun Karimov <
> > > >>> > >> > > je.kari...@gmail.com>
> > > >>> > >> > > > > > > wrote:
> > > >>> > >> > > > > > >
> > > >>> > >> > > > > > > > Hi,
> > > >>> > >> > > > > > > >
> > > >>> > >> > > > > > > > + 1 for the suggestion.
> > > >>> > >> > > > > > > > Maybe we can the discussion with the FLIPs with
> > > >>> minimum
> > > >>> > >> > > > dependencies
> > > >>> > >> > > > > > > (from
> > > >>> > >> > > > > > > > the other new/proposed FLIPs).
> > > >>> > >> > > > > > > > Based on our discussion on a particular FLIP,
> the
> > > >>> > >> subsequent (or
> > > >>> > >> > > > its
> > > >>> > >> > > > > > > > dependent) FLIP(s) can be updated accordingly?
> > > >>> > >> > > > > > > >
> > > >>> > >> > > > > > > > Regards,
> > > >>> > >> > > > > > > > Jeyhun
> > > >>> > >> > > > > > > >
> > > >>> > >> > > > > > > > On Wed, Mar 6, 2024 at 5:34 PM Gyula Fóra <
> > > >>> > >> gyula.f...@gmail.com>
> > > >>> > >> > > > > > wrote:
> > > >>> > >> > > > > > > >
> > > >>> > >> > > > > > > > > Hey all!
> > > >>> > >> > > > > > > > >
> > > >>> > >> > > > > > > > > This is a massive improvement / work. I just
> > > started
> > > >>> > going
> > > >>> > >> > > > through
> > > >>> > >> > > > > > the
> > > >>> > >> > > > > > > > > Flips and have a more or less meta comment.
> > > >>> > >> > > > > > > > >
> > > >>> > >> > > > > > > > > While it's good to keep the overall
> architecture
> > > >>> > >> discussion
> > > >>> > >> > > > here, I
> > > >>> > >> > > > > > > think
> > > >>> > >> > > > > > > > > we should still have separate discussions for
> > each
> > > >>> FLIP
> > > >>> > >> where
> > > >>> > >> > > we
> > > >>> > >> > > > > can
> > > >>> > >> > > > > > > > > discuss interface details etc. With so much
> > > content
> > > >>> if
> > > >>> > we
> > > >>> > >> start
> > > >>> > >> > > > > > adding
> > > >>> > >> > > > > > > > > minor comments here that will lead to nowhere
> > but
> > > >>> those
> > > >>> > >> > > > discussions
> > > >>> > >> > > > > > are
> > > >>> > >> > > > > > > > > still important and we should have them in
> > > separate
> > > >>> > >> threads
> > > >>> > >> > > (one
> > > >>> > >> > > > > for
> > > >>> > >> > > > > > > each
> > > >>> > >> > > > > > > > > FLIP)
> > > >>> > >> > > > > > > > >
> > > >>> > >> > > > > > > > > What do you think?
> > > >>> > >> > > > > > > > > Gyula
> > > >>> > >> > > > > > > > >
> > > >>> > >> > > > > > > > > On Wed, Mar 6, 2024 at 8:50 AM Yanfei Lei <
> > > >>> > >> fredia...@gmail.com
> > > >>> > >> > > >
> > > >>> > >> > > > > > wrote:
> > > >>> > >> > > > > > > > >
> > > >>> > >> > > > > > > > > > Hi team,
> > > >>> > >> > > > > > > > > >
> > > >>> > >> > > > > > > > > > Thanks for your discussion. Regarding
> > FLIP-425,
> > > we
> > > >>> > have
> > > >>> > >> > > > > > supplemented
> > > >>> > >> > > > > > > > > > several updates to answer high-frequency
> > > >>> questions:
> > > >>> > >> > > > > > > > > >
> > > >>> > >> > > > > > > > > > 1. We captured a flame graph of the Hashmap
> > > state
> > > >>> > >> backend in
> > > >>> > >> > > > > > > > > > "Synchronous execution with asynchronous
> > > APIs"[1],
> > > >>> > which
> > > >>> > >> > > > reveals
> > > >>> > >> > > > > > that
> > > >>> > >> > > > > > > > > > the framework overhead (including reference
> > > >>> counting,
> > > >>> > >> > > > > > future-related
> > > >>> > >> > > > > > > > > > code and so on) consumes about 9% of the
> keyed
> > > >>> > operator
> > > >>> > >> CPU
> > > >>> > >> > > > time.
> > > >>> > >> > > > > > > > > > 2. We added a set of comparative experiments
> > for
> > > >>> > >> watermark
> > > >>> > >> > > > > > > processing,
> > > >>> > >> > > > > > > > > > the performance of Out-Of-Order mode is 70%
> > > better
> > > >>> > than
> > > >>> > >> > > > > > > > > > strictly-ordered mode under ~140MB state
> size.
> > > >>> > >> Instructions
> > > >>> > >> > > on
> > > >>> > >> > > > > how
> > > >>> > >> > > > > > to
> > > >>> > >> > > > > > > > > > run this test have also been added[2].
> > > >>> > >> > > > > > > > > > 3. Regarding the order of StreamRecord,
> > whether
> > > >>> it has
> > > >>> > >> state
> > > >>> > >> > > > > access
> > > >>> > >> > > > > > > or
> > > >>> > >> > > > > > > > > > not. We supplemented a new *Strict order of
> > > >>> > >> > > > 'processElement'*[3].
> > > >>> > >> > > > > > > > > >
> > > >>> > >> > > > > > > > > > [1]
> > > >>> > >> > > > > > > > > >
> > > >>> > >> > > > > > > > >
> > > >>> > >> > > > > > > >
> > > >>> > >> > > > > > >
> > > >>> > >> > > > > >
> > > >>> > >> > > > >
> > > >>> > >> > > >
> > > >>> > >> > >
> > > >>> > >>
> > > >>> >
> > > >>>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-SynchronousexecutionwithasynchronousAPIs
> > > >>> > >> > > > > > > > > > [2]
> > > >>> > >> > > > > > > > > >
> > > >>> > >> > > > > > > > >
> > > >>> > >> > > > > > > >
> > > >>> > >> > > > > > >
> > > >>> > >> > > > > >
> > > >>> > >> > > > >
> > > >>> > >> > > >
> > > >>> > >> > >
> > > >>> > >>
> > > >>> >
> > > >>>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Strictly-orderedmodevs.Out-of-ordermodeforwatermarkprocessing
> > > >>> > >> > > > > > > > > > [3]
> > > >>> > >> > > > > > > > > >
> > > >>> > >> > > > > > > > >
> > > >>> > >> > > > > > > >
> > > >>> > >> > > > > > >
> > > >>> > >> > > > > >
> > > >>> > >> > > > >
> > > >>> > >> > > >
> > > >>> > >> > >
> > > >>> > >>
> > > >>> >
> > > >>>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ElementOrder
> > > >>> > >> > > > > > > > > >
> > > >>> > >> > > > > > > > > >
> > > >>> > >> > > > > > > > > > Best regards,
> > > >>> > >> > > > > > > > > > Yanfei
> > > >>> > >> > > > > > > > > >
> > > >>> > >> > > > > > > > > > Yunfeng Zhou <flink.zhouyunf...@gmail.com>
> > > >>> > 于2024年3月5日周二
> > > >>> > >> > > > 09:25写道:
> > > >>> > >> > > > > > > > > > >
> > > >>> > >> > > > > > > > > > > Hi Zakelly,
> > > >>> > >> > > > > > > > > > >
> > > >>> > >> > > > > > > > > > > > 5. I'm not very sure ... revisiting this
> > > later
> > > >>> > >> since it
> > > >>> > >> > > is
> > > >>> > >> > > > > not
> > > >>> > >> > > > > > > > > > important.
> > > >>> > >> > > > > > > > > > >
> > > >>> > >> > > > > > > > > > > It seems that we still have some details
> to
> > > >>> confirm
> > > >>> > >> about
> > > >>> > >> > > > this
> > > >>> > >> > > > > > > > > > > question. Let's postpone this to after the
> > > >>> critical
> > > >>> > >> parts
> > > >>> > >> > > of
> > > >>> > >> > > > > the
> > > >>> > >> > > > > > > > > > > design are settled.
> > > >>> > >> > > > > > > > > > >
> > > >>> > >> > > > > > > > > > > > 8. Yes, we had considered ... metrics
> > should
> > > >>> be
> > > >>> > like
> > > >>> > >> > > > > > afterwards.
> > > >>> > >> > > > > > > > > > >
> > > >>> > >> > > > > > > > > > > Oh sorry I missed FLIP-431. I'm fine with
> > > >>> discussing
> > > >>> > >> this
> > > >>> > >> > > > topic
> > > >>> > >> > > > > > in
> > > >>> > >> > > > > > > > > > milestone 2.
> > > >>> > >> > > > > > > > > > >
> > > >>> > >> > > > > > > > > > > Looking forward to the detailed design
> about
> > > the
> > > >>> > >> strict
> > > >>> > >> > > mode
> > > >>> > >> > > > > > > between
> > > >>> > >> > > > > > > > > > > same-key records and the benchmark results
> > > >>> about the
> > > >>> > >> epoch
> > > >>> > >> > > > > > > mechanism.
> > > >>> > >> > > > > > > > > > >
> > > >>> > >> > > > > > > > > > > Best regards,
> > > >>> > >> > > > > > > > > > > Yunfeng
> > > >>> > >> > > > > > > > > > >
> > > >>> > >> > > > > > > > > > > On Mon, Mar 4, 2024 at 7:59 PM Zakelly
> Lan <
> > > >>> > >> > > > > > zakelly....@gmail.com>
> > > >>> > >> > > > > > > > > > wrote:
> > > >>> > >> > > > > > > > > > > >
> > > >>> > >> > > > > > > > > > > > Hi Yunfeng,
> > > >>> > >> > > > > > > > > > > >
> > > >>> > >> > > > > > > > > > > > For 1:
> > > >>> > >> > > > > > > > > > > > I had a discussion with Lincoln Lee,
> and I
> > > >>> realize
> > > >>> > >> it is
> > > >>> > >> > > a
> > > >>> > >> > > > > > common
> > > >>> > >> > > > > > > > > case
> > > >>> > >> > > > > > > > > > the same-key record should be blocked before
> > the
> > > >>> > >> > > > > `processElement`.
> > > >>> > >> > > > > > It
> > > >>> > >> > > > > > > > is
> > > >>> > >> > > > > > > > > > easier for users to understand. Thus I will
> > > >>> introduce
> > > >>> > a
> > > >>> > >> > > strict
> > > >>> > >> > > > > mode
> > > >>> > >> > > > > > > for
> > > >>> > >> > > > > > > > > > this and make it default. My rough idea is
> > just
> > > >>> like
> > > >>> > >> yours,
> > > >>> > >> > > by
> > > >>> > >> > > > > > > invoking
> > > >>> > >> > > > > > > > > > some method of AEC instance before
> > > >>> `processElement`.
> > > >>> > The
> > > >>> > >> > > > detailed
> > > >>> > >> > > > > > > > design
> > > >>> > >> > > > > > > > > > will be described in FLIP later.
> > > >>> > >> > > > > > > > > > > >
> > > >>> > >> > > > > > > > > > > > For 2:
> > > >>> > >> > > > > > > > > > > > I agree with you. We could throw
> > exceptions
> > > >>> for
> > > >>> > now
> > > >>> > >> and
> > > >>> > >> > > > > > optimize
> > > >>> > >> > > > > > > > this
> > > >>> > >> > > > > > > > > > later.
> > > >>> > >> > > > > > > > > > > >
> > > >>> > >> > > > > > > > > > > > For 5:
> > > >>> > >> > > > > > > > > > > >>
> > > >>> > >> > > > > > > > > > > >> It might be better to move the default
> > > >>> values to
> > > >>> > >> the
> > > >>> > >> > > > > Proposed
> > > >>> > >> > > > > > > > > Changes
> > > >>> > >> > > > > > > > > > > >> section instead of making them public
> for
> > > >>> now, as
> > > >>> > >> there
> > > >>> > >> > > > will
> > > >>> > >> > > > > > be
> > > >>> > >> > > > > > > > > > > >> compatibility issues once we want to
> > > >>> dynamically
> > > >>> > >> adjust
> > > >>> > >> > > > the
> > > >>> > >> > > > > > > > > thresholds
> > > >>> > >> > > > > > > > > > > >> and timeouts in future.
> > > >>> > >> > > > > > > > > > > >
> > > >>> > >> > > > > > > > > > > > Agreed. The whole framework is under
> > > >>> experiment
> > > >>> > >> until we
> > > >>> > >> > > > > think
> > > >>> > >> > > > > > it
> > > >>> > >> > > > > > > > is
> > > >>> > >> > > > > > > > > > complete in 2.0 or later. The default value
> > > >>> should be
> > > >>> > >> better
> > > >>> > >> > > > > > > determined
> > > >>> > >> > > > > > > > > > with more testing results and production
> > > >>> experience.
> > > >>> > >> > > > > > > > > > > >
> > > >>> > >> > > > > > > > > > > >> The configuration
> > > >>> execution.async-state.enabled
> > > >>> > >> seems
> > > >>> > >> > > > > > > unnecessary,
> > > >>> > >> > > > > > > > > as
> > > >>> > >> > > > > > > > > > > >> the infrastructure may automatically
> get
> > > this
> > > >>> > >> > > information
> > > >>> > >> > > > > from
> > > >>> > >> > > > > > > the
> > > >>> > >> > > > > > > > > > > >> detailed state backend configurations.
> We
> > > may
> > > >>> > >> revisit
> > > >>> > >> > > this
> > > >>> > >> > > > > > part
> > > >>> > >> > > > > > > > > after
> > > >>> > >> > > > > > > > > > > >> the core designs have reached an
> > agreement.
> > > >>> WDYT?
> > > >>> > >> > > > > > > > > > > >
> > > >>> > >> > > > > > > > > > > >
> > > >>> > >> > > > > > > > > > > > I'm not very sure if there is any use
> case
> > > >>> where
> > > >>> > >> users
> > > >>> > >> > > > write
> > > >>> > >> > > > > > > their
> > > >>> > >> > > > > > > > > > code using async APIs but run their job in a
> > > >>> > >> synchronous way.
> > > >>> > >> > > > The
> > > >>> > >> > > > > > > first
> > > >>> > >> > > > > > > > > two
> > > >>> > >> > > > > > > > > > scenarios that come to me are for
> benchmarking
> > > or
> > > >>> for
> > > >>> > a
> > > >>> > >> small
> > > >>> > >> > > > > > state,
> > > >>> > >> > > > > > > > > while
> > > >>> > >> > > > > > > > > > they don't want to rewrite their code.
> > Actually
> > > >>> it is
> > > >>> > >> easy to
> > > >>> > >> > > > > > > support,
> > > >>> > >> > > > > > > > so
> > > >>> > >> > > > > > > > > > I'd suggest providing it. But I'm fine with
> > > >>> revisiting
> > > >>> > >> this
> > > >>> > >> > > > later
> > > >>> > >> > > > > > > since
> > > >>> > >> > > > > > > > > it
> > > >>> > >> > > > > > > > > > is not important. WDYT?
> > > >>> > >> > > > > > > > > > > >
> > > >>> > >> > > > > > > > > > > > For 8:
> > > >>> > >> > > > > > > > > > > > Yes, we had considered the I/O metrics
> > group
> > > >>> > >> especially
> > > >>> > >> > > the
> > > >>> > >> > > > > > > > > > back-pressure, idle and task busy per
> second.
> > In
> > > >>> the
> > > >>> > >> current
> > > >>> > >> > > > plan
> > > >>> > >> > > > > > we
> > > >>> > >> > > > > > > > can
> > > >>> > >> > > > > > > > > do
> > > >>> > >> > > > > > > > > > state access during back-pressure, meaning
> > that
> > > >>> those
> > > >>> > >> metrics
> > > >>> > >> > > > for
> > > >>> > >> > > > > > > input
> > > >>> > >> > > > > > > > > > would better be redefined. I suggest we
> > discuss
> > > >>> these
> > > >>> > >> > > existing
> > > >>> > >> > > > > > > metrics
> > > >>> > >> > > > > > > > as
> > > >>> > >> > > > > > > > > > well as some new metrics that should be
> > > >>> introduced in
> > > >>> > >> > > FLIP-431
> > > >>> > >> > > > > > later
> > > >>> > >> > > > > > > in
> > > >>> > >> > > > > > > > > > milestone 2, since we have basically
> finished
> > > the
> > > >>> > >> framework
> > > >>> > >> > > > thus
> > > >>> > >> > > > > we
> > > >>> > >> > > > > > > > will
> > > >>> > >> > > > > > > > > > have a better view of what metrics should be
> > > like
> > > >>> > >> afterwards.
> > > >>> > >> > > > > WDYT?
> > > >>> > >> > > > > > > > > > > >
> > > >>> > >> > > > > > > > > > > >
> > > >>> > >> > > > > > > > > > > > Best,
> > > >>> > >> > > > > > > > > > > > Zakelly
> > > >>> > >> > > > > > > > > > > >
> > > >>> > >> > > > > > > > > > > > On Mon, Mar 4, 2024 at 6:49 PM Yunfeng
> > Zhou
> > > <
> > > >>> > >> > > > > > > > > > flink.zhouyunf...@gmail.com> wrote:
> > > >>> > >> > > > > > > > > > > >>
> > > >>> > >> > > > > > > > > > > >> Hi Zakelly,
> > > >>> > >> > > > > > > > > > > >>
> > > >>> > >> > > > > > > > > > > >> Thanks for the responses!
> > > >>> > >> > > > > > > > > > > >>
> > > >>> > >> > > > > > > > > > > >> > 1. I will discuss this with some
> expert
> > > SQL
> > > >>> > >> > > developers.
> > > >>> > >> > > > > ...
> > > >>> > >> > > > > > > mode
> > > >>> > >> > > > > > > > > > for StreamRecord processing.
> > > >>> > >> > > > > > > > > > > >>
> > > >>> > >> > > > > > > > > > > >> In DataStream API there should also be
> > use
> > > >>> cases
> > > >>> > >> when
> > > >>> > >> > > the
> > > >>> > >> > > > > > order
> > > >>> > >> > > > > > > of
> > > >>> > >> > > > > > > > > > > >> output is strictly required. I agree
> with
> > > it
> > > >>> that
> > > >>> > >> SQL
> > > >>> > >> > > > > experts
> > > >>> > >> > > > > > > may
> > > >>> > >> > > > > > > > > help
> > > >>> > >> > > > > > > > > > > >> provide more concrete use cases that
> can
> > > >>> > >> accelerate our
> > > >>> > >> > > > > > > > discussion,
> > > >>> > >> > > > > > > > > > > >> but please allow me to search for
> > > DataStream
> > > >>> use
> > > >>> > >> cases
> > > >>> > >> > > > that
> > > >>> > >> > > > > > can
> > > >>> > >> > > > > > > > > prove
> > > >>> > >> > > > > > > > > > > >> the necessity of this strict order
> > > >>> preservation
> > > >>> > >> mode, if
> > > >>> > >> > > > > > answers
> > > >>> > >> > > > > > > > > from
> > > >>> > >> > > > > > > > > > > >> SQL experts are shown to be negative.
> > > >>> > >> > > > > > > > > > > >>
> > > >>> > >> > > > > > > > > > > >> For your convenience, my current rough
> > idea
> > > >>> is
> > > >>> > >> that we
> > > >>> > >> > > can
> > > >>> > >> > > > > > add a
> > > >>> > >> > > > > > > > > > > >> module between the Input(s) and
> > > >>> processElement()
> > > >>> > >> module
> > > >>> > >> > > in
> > > >>> > >> > > > > > Fig 2
> > > >>> > >> > > > > > > > of
> > > >>> > >> > > > > > > > > > > >> FLIP-425. The module will be
> responsible
> > > for
> > > >>> > >> caching
> > > >>> > >> > > > records
> > > >>> > >> > > > > > > whose
> > > >>> > >> > > > > > > > > > > >> keys collide with in-flight records,
> and
> > > AEC
> > > >>> will
> > > >>> > >> only
> > > >>> > >> > > be
> > > >>> > >> > > > > > > > > responsible
> > > >>> > >> > > > > > > > > > > >> for handling async state calls, without
> > > >>> knowing
> > > >>> > the
> > > >>> > >> > > record
> > > >>> > >> > > > > > each
> > > >>> > >> > > > > > > > call
> > > >>> > >> > > > > > > > > > > >> belongs to. We may revisit this topic
> > once
> > > >>> the
> > > >>> > >> necessity
> > > >>> > >> > > > of
> > > >>> > >> > > > > > the
> > > >>> > >> > > > > > > > > strict
> > > >>> > >> > > > > > > > > > > >> order mode is clarified.
> > > >>> > >> > > > > > > > > > > >>
> > > >>> > >> > > > > > > > > > > >>
> > > >>> > >> > > > > > > > > > > >> > 2. The amount of parallel
> StateRequests
> > > ...
> > > >>> > >> instead of
> > > >>> > >> > > > > > > invoking
> > > >>> > >> > > > > > > > > > yield
> > > >>> > >> > > > > > > > > > > >>
> > > >>> > >> > > > > > > > > > > >> Your suggestions generally appeal to
> me.
> > I
> > > >>> think
> > > >>> > >> we may
> > > >>> > >> > > > let
> > > >>> > >> > > > > > > > > > > >> corresponding Flink jobs fail with OOM
> > for
> > > >>> now,
> > > >>> > >> since
> > > >>> > >> > > the
> > > >>> > >> > > > > > > majority
> > > >>> > >> > > > > > > > > of
> > > >>> > >> > > > > > > > > > > >> a StateRequest should just be
> references
> > to
> > > >>> > >> existing
> > > >>> > >> > > Java
> > > >>> > >> > > > > > > objects,
> > > >>> > >> > > > > > > > > > > >> which only occupies very small memory
> > space
> > > >>> and
> > > >>> > can
> > > >>> > >> > > hardly
> > > >>> > >> > > > > > cause
> > > >>> > >> > > > > > > > OOM
> > > >>> > >> > > > > > > > > > > >> in common cases. We can monitor the
> > pending
> > > >>> > >> > > StateRequests
> > > >>> > >> > > > > and
> > > >>> > >> > > > > > if
> > > >>> > >> > > > > > > > > there
> > > >>> > >> > > > > > > > > > > >> is really a risk of OOM in extreme
> cases,
> > > we
> > > >>> can
> > > >>> > >> throw
> > > >>> > >> > > > > > > Exceptions
> > > >>> > >> > > > > > > > > with
> > > >>> > >> > > > > > > > > > > >> proper messages notifying users what to
> > do,
> > > >>> like
> > > >>> > >> > > > increasing
> > > >>> > >> > > > > > > memory
> > > >>> > >> > > > > > > > > > > >> through configurations.
> > > >>> > >> > > > > > > > > > > >>
> > > >>> > >> > > > > > > > > > > >> Your suggestions to adjust threshold
> > > >>> adaptively
> > > >>> > or
> > > >>> > >> to
> > > >>> > >> > > use
> > > >>> > >> > > > > the
> > > >>> > >> > > > > > > > > blocking
> > > >>> > >> > > > > > > > > > > >> buffer sounds good, and in my opinion
> we
> > > can
> > > >>> > >> postpone
> > > >>> > >> > > them
> > > >>> > >> > > > > to
> > > >>> > >> > > > > > > > future
> > > >>> > >> > > > > > > > > > > >> FLIPs since they seem to only benefit
> > users
> > > >>> in
> > > >>> > rare
> > > >>> > >> > > cases.
> > > >>> > >> > > > > > Given
> > > >>> > >> > > > > > > > > that
> > > >>> > >> > > > > > > > > > > >> FLIP-423~428 has already been a big
> > enough
> > > >>> > design,
> > > >>> > >> it
> > > >>> > >> > > > might
> > > >>> > >> > > > > be
> > > >>> > >> > > > > > > > > better
> > > >>> > >> > > > > > > > > > > >> to focus on the most critical design
> for
> > > now
> > > >>> and
> > > >>> > >> > > postpone
> > > >>> > >> > > > > > > > > > > >> optimizations like this. WDYT?
> > > >>> > >> > > > > > > > > > > >>
> > > >>> > >> > > > > > > > > > > >>
> > > >>> > >> > > > > > > > > > > >> > 5. Sure, we will introduce new
> configs
> > as
> > > >>> well
> > > >>> > as
> > > >>> > >> > > their
> > > >>> > >> > > > > > > default
> > > >>> > >> > > > > > > > > > value.
> > > >>> > >> > > > > > > > > > > >>
> > > >>> > >> > > > > > > > > > > >> Thanks for adding the default values
> and
> > > the
> > > >>> > values
> > > >>> > >> > > > > themselves
> > > >>> > >> > > > > > > > LGTM.
> > > >>> > >> > > > > > > > > > > >> It might be better to move the default
> > > >>> values to
> > > >>> > >> the
> > > >>> > >> > > > > Proposed
> > > >>> > >> > > > > > > > > Changes
> > > >>> > >> > > > > > > > > > > >> section instead of making them public
> for
> > > >>> now, as
> > > >>> > >> there
> > > >>> > >> > > > will
> > > >>> > >> > > > > > be
> > > >>> > >> > > > > > > > > > > >> compatibility issues once we want to
> > > >>> dynamically
> > > >>> > >> adjust
> > > >>> > >> > > > the
> > > >>> > >> > > > > > > > > thresholds
> > > >>> > >> > > > > > > > > > > >> and timeouts in future.
> > > >>> > >> > > > > > > > > > > >>
> > > >>> > >> > > > > > > > > > > >> The configuration
> > > >>> execution.async-state.enabled
> > > >>> > >> seems
> > > >>> > >> > > > > > > unnecessary,
> > > >>> > >> > > > > > > > > as
> > > >>> > >> > > > > > > > > > > >> the infrastructure may automatically
> get
> > > this
> > > >>> > >> > > information
> > > >>> > >> > > > > from
> > > >>> > >> > > > > > > the
> > > >>> > >> > > > > > > > > > > >> detailed state backend configurations.
> We
> > > may
> > > >>> > >> revisit
> > > >>> > >> > > this
> > > >>> > >> > > > > > part
> > > >>> > >> > > > > > > > > after
> > > >>> > >> > > > > > > > > > > >> the core designs have reached an
> > agreement.
> > > >>> WDYT?
> > > >>> > >> > > > > > > > > > > >>
> > > >>> > >> > > > > > > > > > > >>
> > > >>> > >> > > > > > > > > > > >> Besides, inspired by Jeyhun's comments,
> > it
> > > >>> comes
> > > >>> > >> to me
> > > >>> > >> > > > that
> > > >>> > >> > > > > > > > > > > >>
> > > >>> > >> > > > > > > > > > > >> 8. Should this FLIP introduce metrics
> > that
> > > >>> > measure
> > > >>> > >> the
> > > >>> > >> > > > time
> > > >>> > >> > > > > a
> > > >>> > >> > > > > > > > Flink
> > > >>> > >> > > > > > > > > > > >> job is back-pressured by State IOs?
> Under
> > > the
> > > >>> > >> current
> > > >>> > >> > > > > design,
> > > >>> > >> > > > > > > this
> > > >>> > >> > > > > > > > > > > >> metric could measure the time when the
> > > >>> blocking
> > > >>> > >> buffer
> > > >>> > >> > > is
> > > >>> > >> > > > > full
> > > >>> > >> > > > > > > and
> > > >>> > >> > > > > > > > > > > >> yield() cannot get callbacks to
> process,
> > > >>> which
> > > >>> > >> means the
> > > >>> > >> > > > > > > operator
> > > >>> > >> > > > > > > > is
> > > >>> > >> > > > > > > > > > > >> fully waiting for state responses.
> > > >>> > >> > > > > > > > > > > >>
> > > >>> > >> > > > > > > > > > > >> Best regards,
> > > >>> > >> > > > > > > > > > > >> Yunfeng
> > > >>> > >> > > > > > > > > > > >>
> > > >>> > >> > > > > > > > > > > >> On Mon, Mar 4, 2024 at 12:33 PM Zakelly
> > > Lan <
> > > >>> > >> > > > > > > > zakelly....@gmail.com>
> > > >>> > >> > > > > > > > > > wrote:
> > > >>> > >> > > > > > > > > > > >> >
> > > >>> > >> > > > > > > > > > > >> > Hi Yunfeng,
> > > >>> > >> > > > > > > > > > > >> >
> > > >>> > >> > > > > > > > > > > >> > Thanks for your detailed comments!
> > > >>> > >> > > > > > > > > > > >> >
> > > >>> > >> > > > > > > > > > > >> >> 1. Why do we need a close() method
> on
> > > >>> > >> StateIterator?
> > > >>> > >> > > > This
> > > >>> > >> > > > > > > > method
> > > >>> > >> > > > > > > > > > seems
> > > >>> > >> > > > > > > > > > > >> >> unused in the usage example codes.
> > > >>> > >> > > > > > > > > > > >> >
> > > >>> > >> > > > > > > > > > > >> >
> > > >>> > >> > > > > > > > > > > >> > The `close()` is introduced to
> release
> > > >>> internal
> > > >>> > >> > > > resources,
> > > >>> > >> > > > > > but
> > > >>> > >> > > > > > > > it
> > > >>> > >> > > > > > > > > > does not seem to require the user to call
> it.
> > I
> > > >>> > removed
> > > >>> > >> this.
> > > >>> > >> > > > > > > > > > > >> >
> > > >>> > >> > > > > > > > > > > >> >> 2. In FutureUtils.combineAll()'s
> > > JavaDoc,
> > > >>> it
> > > >>> > is
> > > >>> > >> > > stated
> > > >>
>

Reply via email to