Hey Zakelly!

Sorry for the late reply. I still have concerns about the proposed
solution, with my main concerns coming from
the implications of the asynchronous state access API on the checkpointing
and responsiveness of Flink.

>> What also worries me a lot in this fine grained model is the effect on
the checkpointing times.
>
> Your concerns are very reasonable. Faster checkpointing is always a core
advantage of disaggregated state,
> but only for the async phase. There will be some complexity introduced by
in-flight requests, but I'd suggest
> a checkpoint containing those in-flight state requests as part of the
state, to accelerate the sync phase by
> skipping the buffer draining. This makes the buffer size have little
impact on checkpoint time. And all the
> changes keep within the execution model we proposed while the checkpoint
barrier alignment or handling
> will not be touched in our proposal, so I guess the complexity is
relatively controllable. I have faith in that :)

As we discussed off-line, you agreed that we can not checkpoint while some
records are in the middle of being
processed. That we would have to drain the in-progress records before doing
the checkpoint. You also argued
that this is not a problem, because the size of this buffer can be
configured.

I'm really afraid of such a solution. I've seen in the past plenty of
times, that whenever Flink has to drain some
buffered records, eventually that always brakes timely checkpointing (and
hence ability for Flink to rescale in
a timely manner). Even a single record with a `flatMap` like operator
currently in Flink causes problems during
back pressure. That's especially true for example for processing
watermarks. At the same time, I don't see how
this value could be configured by even Flink's power users, let alone an
average user. The size of that in-flight
buffer not only depends on a particular query/job, but also the "good"
value changes dynamically over time,
and can change very rapidly. Sudden spikes of records or backpressure, some
hiccup during emitting watermarks,
all of those could change in an instant the theoretically optimal buffer
size of let's say "6000" records, down to "1".
And when those changes happen, those are the exact times when timely
checkpointing matters the most.
If the load pattern suddenly changes, and checkpointing takes suddenly tens
of minutes instead of a couple of
seconds, it means you can not use rescaling and you are forced to
overprovision the resources. And there also
other issues if checkpointing takes too long.

At the same time, I still don't understand why we can not implement things
incrementally? First
let's start with the current API, without the need to rewrite all of the
operators, we can asynchronously fetch whole
state for a given record using its key. That should already vastly improve
many things, and this way we could
perform a checkpoint without a need of draining the in-progress/in-flight
buffer. We could roll that version out,
test it out in practice, and then we could see if the fine grained state
access is really needed. Otherwise it sounds
to me like a premature optimization, that requires us to not only rewrite a
lot of the code, but also to later maintain
it, even if it ultimately proves to be not needed. Which I of course can
not be certain but I have a feeling that it
might be the case.

Best,
Piotrek

wt., 19 mar 2024 o 10:42 Zakelly Lan <zakelly....@gmail.com> napisał(a):

> Hi everyone,
>
> Thanks for your valuable feedback!
>
> Our discussions have been going on for a while and are nearing a
> consensus. So I would like to start a vote after 72 hours.
>
> Please let me know if you have any concerns, thanks!
>
>
> Best,
> Zakelly
>
> On Tue, Mar 19, 2024 at 3:37 PM Zakelly Lan <zakelly....@gmail.com> wrote:
>
> > Hi Yunfeng,
> >
> > Thanks for the suggestion!
> >
> > I will reorganize the FLIP-425 accordingly.
> >
> >
> > Best,
> > Zakelly
> >
> > On Tue, Mar 19, 2024 at 3:20 PM Yunfeng Zhou <
> flink.zhouyunf...@gmail.com>
> > wrote:
> >
> >> Hi Xintong and Zakelly,
> >>
> >> > 2. Regarding Strictly-ordered and Out-of-order of Watermarks
> >> I agree with it that watermarks can use only out-of-order mode for
> >> now, because there is still not a concrete example showing the
> >> correctness risk about it. However, the strictly-ordered mode should
> >> still be supported as the default option for non-record event types
> >> other than watermark, at least for checkpoint barriers.
> >>
> >> I noticed that this information has already been documented in "For
> >> other non-record events, such as RecordAttributes ...", but it's at
> >> the bottom of the "Watermark" section, which might not be very
> >> obvious. Thus it might be better to reorganize the FLIP to better
> >> claim that the two order modes are designed for all non-record events,
> >> and which mode this FLIP would choose for each type of event.
> >>
> >> Best,
> >> Yunfeng
> >>
> >> On Tue, Mar 19, 2024 at 1:09 PM Xintong Song <tonysong...@gmail.com>
> >> wrote:
> >> >
> >> > Thanks for the quick response. Sounds good to me.
> >> >
> >> > Best,
> >> >
> >> > Xintong
> >> >
> >> >
> >> >
> >> > On Tue, Mar 19, 2024 at 1:03 PM Zakelly Lan <zakelly....@gmail.com>
> >> wrote:
> >> >
> >> > > Hi Xintong,
> >> > >
> >> > > Thanks for sharing your thoughts!
> >> > >
> >> > > 1. Regarding Record-ordered and State-ordered of processElement.
> >> > > >
> >> > > > I understand that while State-ordered likely provides better
> >> performance,
> >> > > > Record-ordered is sometimes required for correctness. The question
> >> is how
> >> > > > should a user choose between these two modes? My concern is that
> >> such a
> >> > > > decision may require users to have in-depth knowledge about the
> >> Flink
> >> > > > internals, and may lead to correctness issues if State-ordered is
> >> chosen
> >> > > > improperly.
> >> > > >
> >> > > > I'd suggest not to expose such a knob, at least in the first
> >> version.
> >> > > That
> >> > > > means always use Record-ordered for custom operators / UDFs, and
> >> keep
> >> > > > State-ordered for internal usages (built-in operators) only.
> >> > > >
> >> > >
> >> > > Indeed, users may not be able to choose the mode properly. I agree
> to
> >> keep
> >> > > such options for internal use.
> >> > >
> >> > >
> >> > > 2. Regarding Strictly-ordered and Out-of-order of Watermarks.
> >> > > >
> >> > > > I'm not entirely sure about Strictly-ordered being the default, or
> >> even
> >> > > > being supported. From my understanding, a Watermark(T) only
> >> suggests that
> >> > > > all records with event time before T has arrived, and it has
> >> nothing to
> >> > > do
> >> > > > with whether records with event time after T has arrived or not.
> >> From
> >> > > that
> >> > > > perspective, preventing certain records from arriving before a
> >> Watermark
> >> > > is
> >> > > > never supported. I also cannot come up with any use case where
> >> > > > Strictly-ordered is necessary. This implies the same issue as 1):
> >> how
> >> > > does
> >> > > > the user choose between the two modes?
> >> > > >
> >> > > > I'd suggest not expose the knob to users and only support
> >> Out-of-order,
> >> > > > until we see a concrete use case that Strictly-ordered is needed.
> >> > > >
> >> > >
> >> > > The semantics of watermarks do not define the sequence between a
> >> watermark
> >> > > and subsequent records. For the most part, this is inconsequential,
> >> except
> >> > > it may affect some current users who have previously relied on the
> >> implicit
> >> > > assumption of an ordered execution. I'd be fine with initially
> >> supporting
> >> > > only out-of-order processing. We may consider exposing the
> >> > > 'Strictly-ordered' mode once we encounter a concrete use case that
> >> > > necessitates it.
> >> > >
> >> > >
> >> > > My philosophies behind not exposing the two config options are:
> >> > > > - There are already too many options in Flink that barely know how
> >> to use
> >> > > > them. I think Flink should try as much as possible to decide its
> own
> >> > > > behavior, rather than throwing all the decisions to the users.
> >> > > > - It's much harder to take back knobs than to introduce them.
> >> Therefore,
> >> > > > options should be introduced only if concrete use cases are
> >> identified.
> >> > > >
> >> > >
> >> > > I agree to keep minimal configurable items especially for the MVP.
> >> Given
> >> > > that we have the opportunity to refine the functionality before the
> >> > > framework transitions from @Experimental to @PublicEvolving, it
> makes
> >> sense
> >> > > to refrain from presenting user-facing options until we have ensured
> >> > > their necessity.
> >> > >
> >> > >
> >> > > Best,
> >> > > Zakelly
> >> > >
> >> > > On Tue, Mar 19, 2024 at 12:06 PM Xintong Song <
> tonysong...@gmail.com>
> >> > > wrote:
> >> > >
> >> > > > Sorry for joining the discussion late.
> >> > > >
> >> > > > I have two questions about FLIP-425.
> >> > > >
> >> > > > 1. Regarding Record-ordered and State-ordered of processElement.
> >> > > >
> >> > > > I understand that while State-ordered likely provides better
> >> performance,
> >> > > > Record-ordered is sometimes required for correctness. The question
> >> is how
> >> > > > should a user choose between these two modes? My concern is that
> >> such a
> >> > > > decision may require users to have in-depth knowledge about the
> >> Flink
> >> > > > internals, and may lead to correctness issues if State-ordered is
> >> chosen
> >> > > > improperly.
> >> > > >
> >> > > > I'd suggest not to expose such a knob, at least in the first
> >> version.
> >> > > That
> >> > > > means always use Record-ordered for custom operators / UDFs, and
> >> keep
> >> > > > State-ordered for internal usages (built-in operators) only.
> >> > > >
> >> > > > 2. Regarding Strictly-ordered and Out-of-order of Watermarks.
> >> > > >
> >> > > > I'm not entirely sure about Strictly-ordered being the default, or
> >> even
> >> > > > being supported. From my understanding, a Watermark(T) only
> >> suggests that
> >> > > > all records with event time before T has arrived, and it has
> >> nothing to
> >> > > do
> >> > > > with whether records with event time after T has arrived or not.
> >> From
> >> > > that
> >> > > > perspective, preventing certain records from arriving before a
> >> Watermark
> >> > > is
> >> > > > never supported. I also cannot come up with any use case where
> >> > > > Strictly-ordered is necessary. This implies the same issue as 1):
> >> how
> >> > > does
> >> > > > the user choose between the two modes?
> >> > > >
> >> > > > I'd suggest not expose the knob to users and only support
> >> Out-of-order,
> >> > > > until we see a concrete use case that Strictly-ordered is needed.
> >> > > >
> >> > > >
> >> > > > My philosophies behind not exposing the two config options are:
> >> > > > - There are already too many options in Flink that barely know how
> >> to use
> >> > > > them. I think Flink should try as much as possible to decide its
> own
> >> > > > behavior, rather than throwing all the decisions to the users.
> >> > > > - It's much harder to take back knobs than to introduce them.
> >> Therefore,
> >> > > > options should be introduced only if concrete use cases are
> >> identified.
> >> > > >
> >> > > > WDYT?
> >> > > >
> >> > > > Best,
> >> > > >
> >> > > > Xintong
> >> > > >
> >> > > >
> >> > > >
> >> > > > On Fri, Mar 8, 2024 at 2:45 AM Jing Ge <j...@ververica.com.invalid
> >
> >> > > wrote:
> >> > > >
> >> > > > > +1 for Gyula's suggestion. I just finished FLIP-423 which
> >> introduced
> >> > > the
> >> > > > > intention of the big change and high level architecture. Great
> >> content
> >> > > > btw!
> >> > > > > The only public interface change for this FLIP is one new config
> >> to use
> >> > > > > ForSt. It makes sense to have one dedicated discussion thread
> for
> >> each
> >> > > > > concrete system design.
> >> > > > >
> >> > > > > @Zakelly The links in your mail do not work except the last one,
> >> > > because
> >> > > > > the FLIP-xxx has been included into the url like
> >> > > > >
> >> > >
> >>
> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425
> >> > > > .
> >> > > > >
> >> > > > > NIT fix:
> >> > > > >
> >> > > > > FLIP-424:
> >> > > > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
> >> > > > >
> >> > > > > FLIP-425:
> >> > > > https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h
> >> > > > >
> >> > > > > FLIP-426:
> >> > > > https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf
> >> > > > >
> >> > > > > FLIP-427:
> >> > > > https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft
> >> > > > >
> >> > > > > FLIP-428:
> >> > > > https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b
> >> > > > >
> >> > > > > Best regards,
> >> > > > > Jing
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > >
> >> > > > > On Thu, Mar 7, 2024 at 10:14 AM Zakelly Lan <
> >> zakelly....@gmail.com>
> >> > > > wrote:
> >> > > > >
> >> > > > > > Hi everyone,
> >> > > > > >
> >> > > > > > Thank you all for a lively discussion here, and it is a good
> >> time to
> >> > > > move
> >> > > > > > forward to more detailed discussions. Thus we open several
> >> threads
> >> > > for
> >> > > > > > sub-FLIPs:
> >> > > > > >
> >> > > > > > FLIP-424:
> >> > > > >
> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
> >> > > > > > FLIP-425
> >> > > > > > <
> >> > > > >
> >> > >
> >>
> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425
> >> > > > >:
> >> > > > > >
> >> https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h
> >> > > > > > FLIP-426
> >> > > > > > <
> >> > > > >
> >> > >
> >>
> https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0hFLIP-426
> >> > > > >:
> >> > > > > >
> >> https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf
> >> > > > > > FLIP-427
> >> > > > > > <
> >> > > > >
> >> > >
> >>
> https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrfFLIP-427
> >> > > > >:
> >> > > > > >
> >> https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft
> >> > > > > > FLIP-428
> >> > > > > > <
> >> > > > >
> >> > >
> >>
> https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ftFLIP-428
> >> > > > >:
> >> > > > > >
> >> https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b
> >> > > > > >
> >> > > > > > If you want to talk about the overall architecture, roadmap,
> >> > > milestones
> >> > > > > or
> >> > > > > > something related with multiple FLIPs, please post it here.
> >> Otherwise
> >> > > > you
> >> > > > > > can discuss some details in separate mails. Let's try to avoid
> >> > > repeated
> >> > > > > > discussion in different threads. I will sync important
> messages
> >> here
> >> > > if
> >> > > > > > there are any in the above threads.
> >> > > > > >
> >> > > > > > And reply to @Jeyhun: We will ensure the content between those
> >> FLIPs
> >> > > is
> >> > > > > > consistent.
> >> > > > > >
> >> > > > > >
> >> > > > > > Best,
> >> > > > > > Zakelly
> >> > > > > >
> >> > > > > > On Thu, Mar 7, 2024 at 2:16 PM Yuan Mei <
> yuanmei.w...@gmail.com
> >> >
> >> > > > wrote:
> >> > > > > >
> >> > > > > > > I have been a bit busy these few weeks and sorry for
> >> responding
> >> > > late.
> >> > > > > > >
> >> > > > > > > The original thinking of keeping discussion within one
> thread
> >> is
> >> > > for
> >> > > > > > easier
> >> > > > > > > tracking and avoid for repeated discussion in different
> >> threads.
> >> > > > > > >
> >> > > > > > > For details, It might be good to start in different threads
> if
> >> > > > needed.
> >> > > > > > >
> >> > > > > > > We will think of a way to better organize the discussion.
> >> > > > > > >
> >> > > > > > > Best
> >> > > > > > > Yuan
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Thu, Mar 7, 2024 at 4:38 AM Jeyhun Karimov <
> >> > > je.kari...@gmail.com>
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > Hi,
> >> > > > > > > >
> >> > > > > > > > + 1 for the suggestion.
> >> > > > > > > > Maybe we can the discussion with the FLIPs with minimum
> >> > > > dependencies
> >> > > > > > > (from
> >> > > > > > > > the other new/proposed FLIPs).
> >> > > > > > > > Based on our discussion on a particular FLIP, the
> >> subsequent (or
> >> > > > its
> >> > > > > > > > dependent) FLIP(s) can be updated accordingly?
> >> > > > > > > >
> >> > > > > > > > Regards,
> >> > > > > > > > Jeyhun
> >> > > > > > > >
> >> > > > > > > > On Wed, Mar 6, 2024 at 5:34 PM Gyula Fóra <
> >> gyula.f...@gmail.com>
> >> > > > > > wrote:
> >> > > > > > > >
> >> > > > > > > > > Hey all!
> >> > > > > > > > >
> >> > > > > > > > > This is a massive improvement / work. I just started
> going
> >> > > > through
> >> > > > > > the
> >> > > > > > > > > Flips and have a more or less meta comment.
> >> > > > > > > > >
> >> > > > > > > > > While it's good to keep the overall architecture
> >> discussion
> >> > > > here, I
> >> > > > > > > think
> >> > > > > > > > > we should still have separate discussions for each FLIP
> >> where
> >> > > we
> >> > > > > can
> >> > > > > > > > > discuss interface details etc. With so much content if
> we
> >> start
> >> > > > > > adding
> >> > > > > > > > > minor comments here that will lead to nowhere but those
> >> > > > discussions
> >> > > > > > are
> >> > > > > > > > > still important and we should have them in separate
> >> threads
> >> > > (one
> >> > > > > for
> >> > > > > > > each
> >> > > > > > > > > FLIP)
> >> > > > > > > > >
> >> > > > > > > > > What do you think?
> >> > > > > > > > > Gyula
> >> > > > > > > > >
> >> > > > > > > > > On Wed, Mar 6, 2024 at 8:50 AM Yanfei Lei <
> >> fredia...@gmail.com
> >> > > >
> >> > > > > > wrote:
> >> > > > > > > > >
> >> > > > > > > > > > Hi team,
> >> > > > > > > > > >
> >> > > > > > > > > > Thanks for your discussion. Regarding FLIP-425, we
> have
> >> > > > > > supplemented
> >> > > > > > > > > > several updates to answer high-frequency questions:
> >> > > > > > > > > >
> >> > > > > > > > > > 1. We captured a flame graph of the Hashmap state
> >> backend in
> >> > > > > > > > > > "Synchronous execution with asynchronous APIs"[1],
> which
> >> > > > reveals
> >> > > > > > that
> >> > > > > > > > > > the framework overhead (including reference counting,
> >> > > > > > future-related
> >> > > > > > > > > > code and so on) consumes about 9% of the keyed
> operator
> >> CPU
> >> > > > time.
> >> > > > > > > > > > 2. We added a set of comparative experiments for
> >> watermark
> >> > > > > > > processing,
> >> > > > > > > > > > the performance of Out-Of-Order mode is 70% better
> than
> >> > > > > > > > > > strictly-ordered mode under ~140MB state size.
> >> Instructions
> >> > > on
> >> > > > > how
> >> > > > > > to
> >> > > > > > > > > > run this test have also been added[2].
> >> > > > > > > > > > 3. Regarding the order of StreamRecord, whether it has
> >> state
> >> > > > > access
> >> > > > > > > or
> >> > > > > > > > > > not. We supplemented a new *Strict order of
> >> > > > 'processElement'*[3].
> >> > > > > > > > > >
> >> > > > > > > > > > [1]
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-SynchronousexecutionwithasynchronousAPIs
> >> > > > > > > > > > [2]
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Strictly-orderedmodevs.Out-of-ordermodeforwatermarkprocessing
> >> > > > > > > > > > [3]
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ElementOrder
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > > Best regards,
> >> > > > > > > > > > Yanfei
> >> > > > > > > > > >
> >> > > > > > > > > > Yunfeng Zhou <flink.zhouyunf...@gmail.com>
> 于2024年3月5日周二
> >> > > > 09:25写道:
> >> > > > > > > > > > >
> >> > > > > > > > > > > Hi Zakelly,
> >> > > > > > > > > > >
> >> > > > > > > > > > > > 5. I'm not very sure ... revisiting this later
> >> since it
> >> > > is
> >> > > > > not
> >> > > > > > > > > > important.
> >> > > > > > > > > > >
> >> > > > > > > > > > > It seems that we still have some details to confirm
> >> about
> >> > > > this
> >> > > > > > > > > > > question. Let's postpone this to after the critical
> >> parts
> >> > > of
> >> > > > > the
> >> > > > > > > > > > > design are settled.
> >> > > > > > > > > > >
> >> > > > > > > > > > > > 8. Yes, we had considered ... metrics should be
> like
> >> > > > > > afterwards.
> >> > > > > > > > > > >
> >> > > > > > > > > > > Oh sorry I missed FLIP-431. I'm fine with discussing
> >> this
> >> > > > topic
> >> > > > > > in
> >> > > > > > > > > > milestone 2.
> >> > > > > > > > > > >
> >> > > > > > > > > > > Looking forward to the detailed design about the
> >> strict
> >> > > mode
> >> > > > > > > between
> >> > > > > > > > > > > same-key records and the benchmark results about the
> >> epoch
> >> > > > > > > mechanism.
> >> > > > > > > > > > >
> >> > > > > > > > > > > Best regards,
> >> > > > > > > > > > > Yunfeng
> >> > > > > > > > > > >
> >> > > > > > > > > > > On Mon, Mar 4, 2024 at 7:59 PM Zakelly Lan <
> >> > > > > > zakelly....@gmail.com>
> >> > > > > > > > > > wrote:
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Hi Yunfeng,
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > For 1:
> >> > > > > > > > > > > > I had a discussion with Lincoln Lee, and I realize
> >> it is
> >> > > a
> >> > > > > > common
> >> > > > > > > > > case
> >> > > > > > > > > > the same-key record should be blocked before the
> >> > > > > `processElement`.
> >> > > > > > It
> >> > > > > > > > is
> >> > > > > > > > > > easier for users to understand. Thus I will introduce
> a
> >> > > strict
> >> > > > > mode
> >> > > > > > > for
> >> > > > > > > > > > this and make it default. My rough idea is just like
> >> yours,
> >> > > by
> >> > > > > > > invoking
> >> > > > > > > > > > some method of AEC instance before `processElement`.
> The
> >> > > > detailed
> >> > > > > > > > design
> >> > > > > > > > > > will be described in FLIP later.
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > For 2:
> >> > > > > > > > > > > > I agree with you. We could throw exceptions for
> now
> >> and
> >> > > > > > optimize
> >> > > > > > > > this
> >> > > > > > > > > > later.
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > For 5:
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> It might be better to move the default values to
> >> the
> >> > > > > Proposed
> >> > > > > > > > > Changes
> >> > > > > > > > > > > >> section instead of making them public for now, as
> >> there
> >> > > > will
> >> > > > > > be
> >> > > > > > > > > > > >> compatibility issues once we want to dynamically
> >> adjust
> >> > > > the
> >> > > > > > > > > thresholds
> >> > > > > > > > > > > >> and timeouts in future.
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Agreed. The whole framework is under experiment
> >> until we
> >> > > > > think
> >> > > > > > it
> >> > > > > > > > is
> >> > > > > > > > > > complete in 2.0 or later. The default value should be
> >> better
> >> > > > > > > determined
> >> > > > > > > > > > with more testing results and production experience.
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >> The configuration execution.async-state.enabled
> >> seems
> >> > > > > > > unnecessary,
> >> > > > > > > > > as
> >> > > > > > > > > > > >> the infrastructure may automatically get this
> >> > > information
> >> > > > > from
> >> > > > > > > the
> >> > > > > > > > > > > >> detailed state backend configurations. We may
> >> revisit
> >> > > this
> >> > > > > > part
> >> > > > > > > > > after
> >> > > > > > > > > > > >> the core designs have reached an agreement. WDYT?
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > I'm not very sure if there is any use case where
> >> users
> >> > > > write
> >> > > > > > > their
> >> > > > > > > > > > code using async APIs but run their job in a
> >> synchronous way.
> >> > > > The
> >> > > > > > > first
> >> > > > > > > > > two
> >> > > > > > > > > > scenarios that come to me are for benchmarking or for
> a
> >> small
> >> > > > > > state,
> >> > > > > > > > > while
> >> > > > > > > > > > they don't want to rewrite their code. Actually it is
> >> easy to
> >> > > > > > > support,
> >> > > > > > > > so
> >> > > > > > > > > > I'd suggest providing it. But I'm fine with revisiting
> >> this
> >> > > > later
> >> > > > > > > since
> >> > > > > > > > > it
> >> > > > > > > > > > is not important. WDYT?
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > For 8:
> >> > > > > > > > > > > > Yes, we had considered the I/O metrics group
> >> especially
> >> > > the
> >> > > > > > > > > > back-pressure, idle and task busy per second. In the
> >> current
> >> > > > plan
> >> > > > > > we
> >> > > > > > > > can
> >> > > > > > > > > do
> >> > > > > > > > > > state access during back-pressure, meaning that those
> >> metrics
> >> > > > for
> >> > > > > > > input
> >> > > > > > > > > > would better be redefined. I suggest we discuss these
> >> > > existing
> >> > > > > > > metrics
> >> > > > > > > > as
> >> > > > > > > > > > well as some new metrics that should be introduced in
> >> > > FLIP-431
> >> > > > > > later
> >> > > > > > > in
> >> > > > > > > > > > milestone 2, since we have basically finished the
> >> framework
> >> > > > thus
> >> > > > > we
> >> > > > > > > > will
> >> > > > > > > > > > have a better view of what metrics should be like
> >> afterwards.
> >> > > > > WDYT?
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Best,
> >> > > > > > > > > > > > Zakelly
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > On Mon, Mar 4, 2024 at 6:49 PM Yunfeng Zhou <
> >> > > > > > > > > > flink.zhouyunf...@gmail.com> wrote:
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> Hi Zakelly,
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> Thanks for the responses!
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> > 1. I will discuss this with some expert SQL
> >> > > developers.
> >> > > > > ...
> >> > > > > > > mode
> >> > > > > > > > > > for StreamRecord processing.
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> In DataStream API there should also be use cases
> >> when
> >> > > the
> >> > > > > > order
> >> > > > > > > of
> >> > > > > > > > > > > >> output is strictly required. I agree with it that
> >> SQL
> >> > > > > experts
> >> > > > > > > may
> >> > > > > > > > > help
> >> > > > > > > > > > > >> provide more concrete use cases that can
> >> accelerate our
> >> > > > > > > > discussion,
> >> > > > > > > > > > > >> but please allow me to search for DataStream use
> >> cases
> >> > > > that
> >> > > > > > can
> >> > > > > > > > > prove
> >> > > > > > > > > > > >> the necessity of this strict order preservation
> >> mode, if
> >> > > > > > answers
> >> > > > > > > > > from
> >> > > > > > > > > > > >> SQL experts are shown to be negative.
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> For your convenience, my current rough idea is
> >> that we
> >> > > can
> >> > > > > > add a
> >> > > > > > > > > > > >> module between the Input(s) and processElement()
> >> module
> >> > > in
> >> > > > > > Fig 2
> >> > > > > > > > of
> >> > > > > > > > > > > >> FLIP-425. The module will be responsible for
> >> caching
> >> > > > records
> >> > > > > > > whose
> >> > > > > > > > > > > >> keys collide with in-flight records, and AEC will
> >> only
> >> > > be
> >> > > > > > > > > responsible
> >> > > > > > > > > > > >> for handling async state calls, without knowing
> the
> >> > > record
> >> > > > > > each
> >> > > > > > > > call
> >> > > > > > > > > > > >> belongs to. We may revisit this topic once the
> >> necessity
> >> > > > of
> >> > > > > > the
> >> > > > > > > > > strict
> >> > > > > > > > > > > >> order mode is clarified.
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> > 2. The amount of parallel StateRequests ...
> >> instead of
> >> > > > > > > invoking
> >> > > > > > > > > > yield
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> Your suggestions generally appeal to me. I think
> >> we may
> >> > > > let
> >> > > > > > > > > > > >> corresponding Flink jobs fail with OOM for now,
> >> since
> >> > > the
> >> > > > > > > majority
> >> > > > > > > > > of
> >> > > > > > > > > > > >> a StateRequest should just be references to
> >> existing
> >> > > Java
> >> > > > > > > objects,
> >> > > > > > > > > > > >> which only occupies very small memory space and
> can
> >> > > hardly
> >> > > > > > cause
> >> > > > > > > > OOM
> >> > > > > > > > > > > >> in common cases. We can monitor the pending
> >> > > StateRequests
> >> > > > > and
> >> > > > > > if
> >> > > > > > > > > there
> >> > > > > > > > > > > >> is really a risk of OOM in extreme cases, we can
> >> throw
> >> > > > > > > Exceptions
> >> > > > > > > > > with
> >> > > > > > > > > > > >> proper messages notifying users what to do, like
> >> > > > increasing
> >> > > > > > > memory
> >> > > > > > > > > > > >> through configurations.
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> Your suggestions to adjust threshold adaptively
> or
> >> to
> >> > > use
> >> > > > > the
> >> > > > > > > > > blocking
> >> > > > > > > > > > > >> buffer sounds good, and in my opinion we can
> >> postpone
> >> > > them
> >> > > > > to
> >> > > > > > > > future
> >> > > > > > > > > > > >> FLIPs since they seem to only benefit users in
> rare
> >> > > cases.
> >> > > > > > Given
> >> > > > > > > > > that
> >> > > > > > > > > > > >> FLIP-423~428 has already been a big enough
> design,
> >> it
> >> > > > might
> >> > > > > be
> >> > > > > > > > > better
> >> > > > > > > > > > > >> to focus on the most critical design for now and
> >> > > postpone
> >> > > > > > > > > > > >> optimizations like this. WDYT?
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> > 5. Sure, we will introduce new configs as well
> as
> >> > > their
> >> > > > > > > default
> >> > > > > > > > > > value.
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> Thanks for adding the default values and the
> values
> >> > > > > themselves
> >> > > > > > > > LGTM.
> >> > > > > > > > > > > >> It might be better to move the default values to
> >> the
> >> > > > > Proposed
> >> > > > > > > > > Changes
> >> > > > > > > > > > > >> section instead of making them public for now, as
> >> there
> >> > > > will
> >> > > > > > be
> >> > > > > > > > > > > >> compatibility issues once we want to dynamically
> >> adjust
> >> > > > the
> >> > > > > > > > > thresholds
> >> > > > > > > > > > > >> and timeouts in future.
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> The configuration execution.async-state.enabled
> >> seems
> >> > > > > > > unnecessary,
> >> > > > > > > > > as
> >> > > > > > > > > > > >> the infrastructure may automatically get this
> >> > > information
> >> > > > > from
> >> > > > > > > the
> >> > > > > > > > > > > >> detailed state backend configurations. We may
> >> revisit
> >> > > this
> >> > > > > > part
> >> > > > > > > > > after
> >> > > > > > > > > > > >> the core designs have reached an agreement. WDYT?
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> Besides, inspired by Jeyhun's comments, it comes
> >> to me
> >> > > > that
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> 8. Should this FLIP introduce metrics that
> measure
> >> the
> >> > > > time
> >> > > > > a
> >> > > > > > > > Flink
> >> > > > > > > > > > > >> job is back-pressured by State IOs? Under the
> >> current
> >> > > > > design,
> >> > > > > > > this
> >> > > > > > > > > > > >> metric could measure the time when the blocking
> >> buffer
> >> > > is
> >> > > > > full
> >> > > > > > > and
> >> > > > > > > > > > > >> yield() cannot get callbacks to process, which
> >> means the
> >> > > > > > > operator
> >> > > > > > > > is
> >> > > > > > > > > > > >> fully waiting for state responses.
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> Best regards,
> >> > > > > > > > > > > >> Yunfeng
> >> > > > > > > > > > > >>
> >> > > > > > > > > > > >> On Mon, Mar 4, 2024 at 12:33 PM Zakelly Lan <
> >> > > > > > > > zakelly....@gmail.com>
> >> > > > > > > > > > wrote:
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> > Hi Yunfeng,
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> > Thanks for your detailed comments!
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >> 1. Why do we need a close() method on
> >> StateIterator?
> >> > > > This
> >> > > > > > > > method
> >> > > > > > > > > > seems
> >> > > > > > > > > > > >> >> unused in the usage example codes.
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> > The `close()` is introduced to release internal
> >> > > > resources,
> >> > > > > > but
> >> > > > > > > > it
> >> > > > > > > > > > does not seem to require the user to call it. I
> removed
> >> this.
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >> 2. In FutureUtils.combineAll()'s JavaDoc, it
> is
> >> > > stated
> >> > > > > that
> >> > > > > > > "No
> >> > > > > > > > > > null
> >> > > > > > > > > > > >> >> entries are allowed". It might be better to
> >> further
> >> > > > > explain
> >> > > > > > > > what
> >> > > > > > > > > > will
> >> > > > > > > > > > > >> >> happen if a null value is passed, ignoring the
> >> value
> >> > > in
> >> > > > > the
> >> > > > > > > > > > returned
> >> > > > > > > > > > > >> >> Collection or throwing exceptions. Given that
> >> > > > > > > > > > > >> >> FutureUtils.emptyFuture() can be returned in
> the
> >> > > > example
> >> > > > > > > code,
> >> > > > > > > > I
> >> > > > > > > > > > > >> >> suppose the former one might be correct.
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> > The statement "No null entries are allowed"
> >> refers to
> >> > > > the
> >> > > > > > > > > > parameters, it means some arrayList like [null,
> >> StateFuture1,
> >> > > > > > > > > StateFuture2]
> >> > > > > > > > > > passed in are not allowed, and an Exception will be
> >> thrown.
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >> 1. According to Fig 2 of this FLIP, ... . This
> >> > > > situation
> >> > > > > > > should
> >> > > > > > > > > be
> >> > > > > > > > > > > >> >> avoided and the order of same-key records
> >> should be
> >> > > > > > strictly
> >> > > > > > > > > > > >> >> preserved.
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> > I will discuss this with some expert SQL
> >> developers.
> >> > > And
> >> > > > > if
> >> > > > > > it
> >> > > > > > > > is
> >> > > > > > > > > > valid and common, I suggest a strict order
> preservation
> >> mode
> >> > > > for
> >> > > > > > > > > > StreamRecord processing. WDYT?
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >> 2. The FLIP says that StateRequests submitted
> by
> >> > > > > Callbacks
> >> > > > > > > will
> >> > > > > > > > > not
> >> > > > > > > > > > > >> >> invoke further yield() methods. Given that
> >> yield() is
> >> > > > > used
> >> > > > > > > when
> >> > > > > > > > > > there
> >> > > > > > > > > > > >> >> is "too much" in-flight data, does it mean
> >> > > > StateRequests
> >> > > > > > > > > submitted
> >> > > > > > > > > > by
> >> > > > > > > > > > > >> >> Callbacks will never be "too much"? What if
> the
> >> total
> >> > > > > > number
> >> > > > > > > of
> >> > > > > > > > > > > >> >> StateRequests exceed the capacity of Flink
> >> operator's
> >> > > > > > memory
> >> > > > > > > > > space?
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> > The amount of parallel StateRequests for one
> >> > > > StreamRecord
> >> > > > > > > cannot
> >> > > > > > > > > be
> >> > > > > > > > > > determined since the code is written by users. So the
> >> > > in-flight
> >> > > > > > > > requests
> >> > > > > > > > > > may be "too much", and may cause OOM. Users should
> >> > > re-configure
> >> > > > > > their
> >> > > > > > > > > job,
> >> > > > > > > > > > controlling the amount of on-going StreamRecord. And I
> >> > > suggest
> >> > > > > two
> >> > > > > > > ways
> >> > > > > > > > > to
> >> > > > > > > > > > avoid this:
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> > Adaptively adjust the count of on-going
> >> StreamRecord
> >> > > > > > according
> >> > > > > > > > to
> >> > > > > > > > > > historical StateRequests amount.
> >> > > > > > > > > > > >> > Also control the max StateRequests that can be
> >> > > executed
> >> > > > in
> >> > > > > > > > > parallel
> >> > > > > > > > > > for each StreamRecord, and if it exceeds, put the new
> >> > > > > StateRequest
> >> > > > > > in
> >> > > > > > > > the
> >> > > > > > > > > > blocking buffer waiting for execution (instead of
> >> invoking
> >> > > > > > yield()).
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> > WDYT?
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >> 3.1 I'm concerned that the out-of-order
> >> execution
> >> > > mode,
> >> > > > > > along
> >> > > > > > > > > with
> >> > > > > > > > > > the
> >> > > > > > > > > > > >> >> epoch mechanism, would bring more complexity
> to
> >> the
> >> > > > > > execution
> >> > > > > > > > > model
> >> > > > > > > > > > > >> >> than the performance improvement it promises.
> >> Could
> >> > > we
> >> > > > > add
> >> > > > > > > some
> >> > > > > > > > > > > >> >> benchmark results proving the benefit of this
> >> mode?
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> > Agreed, will do.
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >> 3.2 The FLIP might need to add a public API
> >> section
> >> > > > > > > describing
> >> > > > > > > > > how
> >> > > > > > > > > > > >> >> users or developers can switch between these
> two
> >> > > > > execution
> >> > > > > > > > modes.
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> > Good point. We will add a Public API section.
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >> 3.3 Apart from the watermark and checkpoint
> >> mentioned
> >> > > > in
> >> > > > > > this
> >> > > > > > > > > FLIP,
> >> > > > > > > > > > > >> >> there are also more other events that might
> >> appear in
> >> > > > the
> >> > > > > > > > stream
> >> > > > > > > > > of
> >> > > > > > > > > > > >> >> data records. It might be better to generalize
> >> the
> >> > > > > > execution
> >> > > > > > > > mode
> >> > > > > > > > > > > >> >> mechanism to handle all possible events.
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> > Yes, I missed this point. Thanks for the
> >> reminder.
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >> 4. It might be better to treat
> >> callback-handling as a
> >> > > > > > > > > > > >> >> MailboxDefaultAction, instead of Mails, to
> >> avoid the
> >> > > > > > overhead
> >> > > > > > > > of
> >> > > > > > > > > > > >> >> repeatedly creating Mail objects.
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >  I thought the intermediated wrapper for
> >> callback can
> >> > > > not
> >> > > > > be
> >> > > > > > > > > > omitted, since there will be some context switch
> before
> >> each
> >> > > > > > > execution.
> >> > > > > > > > > The
> >> > > > > > > > > > MailboxDefaultAction in most cases is processInput
> >> right?
> >> > > While
> >> > > > > the
> >> > > > > > > > > > callback should be executed with higher priority. I'd
> >> suggest
> >> > > > not
> >> > > > > > > > > changing
> >> > > > > > > > > > the basic logic of Mailbox and the default action
> since
> >> it is
> >> > > > > very
> >> > > > > > > > > critical
> >> > > > > > > > > > for performance. But yes, we will try our best to
> avoid
> >> > > > creating
> >> > > > > > > > > > intermediated objects.
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >> 5. Could this FLIP provide the current default
> >> values
> >> > > > for
> >> > > > > > > > things
> >> > > > > > > > > > like
> >> > > > > > > > > > > >> >> active buffer size thresholds and timeouts?
> >> These
> >> > > could
> >> > > > > > help
> >> > > > > > > > with
> >> > > > > > > > > > > >> >> memory consumption and latency analysis.
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> > Sure, we will introduce new configs as well as
> >> their
> >> > > > > default
> >> > > > > > > > > value.
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >> 6. Why do we need to record the hashcode of a
> >> record
> >> > > in
> >> > > > > its
> >> > > > > > > > > > > >> >> RecordContext? It seems not used.
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> > The context switch before each callback
> execution
> >> > > > involves
> >> > > > > > > > > > setCurrentKey, where the hashCode is re-calculated. We
> >> cache
> >> > > it
> >> > > > > for
> >> > > > > > > > > > accelerating.
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >> 7. In "timers can be stored on the JVM heap or
> >> > > > RocksDB",
> >> > > > > > the
> >> > > > > > > > link
> >> > > > > > > > > > > >> >> points to a document in flink-1.15. It might
> be
> >> > > better
> >> > > > to
> >> > > > > > > > verify
> >> > > > > > > > > > the
> >> > > > > > > > > > > >> >> referenced content is still valid in the
> latest
> >> Flink
> >> > > > and
> >> > > > > > > > update
> >> > > > > > > > > > the
> >> > > > > > > > > > > >> >> link accordingly. Same for other references if
> >> any.
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> > Thanks for the reminder! Will check.
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> > Thanks a lot & Best,
> >> > > > > > > > > > > >> > Zakelly
> >> > > > > > > > > > > >> >
> >> > > > > > > > > > > >> > On Sat, Mar 2, 2024 at 6:18 AM Jeyhun Karimov <
> >> > > > > > > > > je.kari...@gmail.com>
> >> > > > > > > > > > wrote:
> >> > > > > > > > > > > >> >>
> >> > > > > > > > > > > >> >> Hi,
> >> > > > > > > > > > > >> >>
> >> > > > > > > > > > > >> >> Thanks for the great proposals. I have a few
> >> comments
> >> > > > > > > comments:
> >> > > > > > > > > > > >> >>
> >> > > > > > > > > > > >> >> - Backpressure Handling. Flink's original
> >> > > backpressure
> >> > > > > > > handling
> >> > > > > > > > > is
> >> > > > > > > > > > quite
> >> > > > > > > > > > > >> >> robust and the semantics is quite "simple"
> >> (simple is
> >> > > > > > > > beautiful).
> >> > > > > > > > > > > >> >> This mechanism has proven to perform
> >> better/robust
> >> > > than
> >> > > > > the
> >> > > > > > > > other
> >> > > > > > > > > > open
> >> > > > > > > > > > > >> >> source streaming systems, where they were
> >> relying on
> >> > > > some
> >> > > > > > > > > loopback
> >> > > > > > > > > > > >> >> information.
> >> > > > > > > > > > > >> >> Now that the proposal also relies on loopback
> >> (yield
> >> > > in
> >> > > > > > this
> >> > > > > > > > > > case), it is
> >> > > > > > > > > > > >> >> not clear how well the new backpressure
> handling
> >> > > > proposed
> >> > > > > > in
> >> > > > > > > > > > FLIP-425 is
> >> > > > > > > > > > > >> >> robust and handle fluctuating workloads.
> >> > > > > > > > > > > >> >>
> >> > > > > > > > > > > >> >> - Watermark/Timer Handling: Similar arguments
> >> apply
> >> > > for
> >> > > > > > > > watermark
> >> > > > > > > > > > and timer
> >> > > > > > > > > > > >> >> handling. IMHO, we need more benchmarks
> showing
> >> the
> >> > > > > > overhead
> >> > > > > > > > > > > >> >> of epoch management with different parameters
> >> (e.g.,
> >> > > > > window
> >> > > > > > > > size,
> >> > > > > > > > > > watermark
> >> > > > > > > > > > > >> >> strategy, etc)
> >> > > > > > > > > > > >> >>
> >> > > > > > > > > > > >> >> - DFS consistency guarantees. The proposal in
> >> > > FLIP-427
> >> > > > is
> >> > > > > > > > > > DFS-agnostic.
> >> > > > > > > > > > > >> >> However, different cloud providers have
> >> different
> >> > > > storage
> >> > > > > > > > > > consistency
> >> > > > > > > > > > > >> >> models.
> >> > > > > > > > > > > >> >> How do we want to deal with them?
> >> > > > > > > > > > > >> >>
> >> > > > > > > > > > > >> >>  Regards,
> >> > > > > > > > > > > >> >> Jeyhun
> >> > > > > > > > > > > >> >>
> >> > > > > > > > > > > >> >>
> >> > > > > > > > > > > >> >>
> >> > > > > > > > > > > >> >>
> >> > > > > > > > > > > >> >> On Fri, Mar 1, 2024 at 6:08 PM Zakelly Lan <
> >> > > > > > > > > zakelly....@gmail.com>
> >> > > > > > > > > > wrote:
> >> > > > > > > > > > > >> >>
> >> > > > > > > > > > > >> >> > Thanks Piotr for sharing your thoughts!
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> > I guess it depends how we would like to
> treat
> >> the
> >> > > > local
> >> > > > > > > > disks.
> >> > > > > > > > > > I've always
> >> > > > > > > > > > > >> >> > > thought about them that almost always
> >> eventually
> >> > > > all
> >> > > > > > > state
> >> > > > > > > > > > from the DFS
> >> > > > > > > > > > > >> >> > > should end up cached in the local disks.
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> > OK I got it. In our proposal we treat local
> >> disk as
> >> > > > an
> >> > > > > > > > optional
> >> > > > > > > > > > cache, so
> >> > > > > > > > > > > >> >> > the basic design will handle the case with
> >> state
> >> > > > > residing
> >> > > > > > > in
> >> > > > > > > > > DFS
> >> > > > > > > > > > only. It
> >> > > > > > > > > > > >> >> > is a more 'cloud-native' approach that does
> >> not
> >> > > rely
> >> > > > on
> >> > > > > > any
> >> > > > > > > > > > local storage
> >> > > > > > > > > > > >> >> > assumptions, which allow users to
> dynamically
> >> > > adjust
> >> > > > > the
> >> > > > > > > > > > capacity or I/O
> >> > > > > > > > > > > >> >> > bound of remote storage to gain performance
> >> or save
> >> > > > the
> >> > > > > > > cost,
> >> > > > > > > > > > even without
> >> > > > > > > > > > > >> >> > a job restart.
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> > In
> >> > > > > > > > > > > >> >> > > the currently proposed more fine grained
> >> > > solution,
> >> > > > > you
> >> > > > > > > > make a
> >> > > > > > > > > > single
> >> > > > > > > > > > > >> >> > > request to DFS per each state access.
> >> > > > > > > > > > > >> >> > >
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> > Ah that's not accurate. Actually we buffer
> the
> >> > > state
> >> > > > > > > requests
> >> > > > > > > > > > and process
> >> > > > > > > > > > > >> >> > them in batch, multiple requests will
> >> correspond to
> >> > > > one
> >> > > > > > DFS
> >> > > > > > > > > > access (One
> >> > > > > > > > > > > >> >> > block access for multiple keys performed by
> >> > > RocksDB).
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> > In that benchmark you mentioned, are you
> >> requesting
> >> > > > the
> >> > > > > > > state
> >> > > > > > > > > > > >> >> > > asynchronously from local disks into
> >> memory? If
> >> > > the
> >> > > > > > > benefit
> >> > > > > > > > > > comes from
> >> > > > > > > > > > > >> >> > > parallel I/O, then I would expect the
> >> benefit to
> >> > > > > > > > > > disappear/shrink when
> >> > > > > > > > > > > >> >> > > running multiple subtasks on the same
> >> machine, as
> >> > > > > they
> >> > > > > > > > would
> >> > > > > > > > > > be making
> >> > > > > > > > > > > >> >> > > their own parallel requests, right? Also
> >> enabling
> >> > > > > > > > > > checkpointing would
> >> > > > > > > > > > > >> >> > > further cut into the available I/O budget.
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> > That's an interesting topic. Our proposal is
> >> > > > > specifically
> >> > > > > > > > aimed
> >> > > > > > > > > > at the
> >> > > > > > > > > > > >> >> > scenario where the machine I/O is not fully
> >> loaded
> >> > > > but
> >> > > > > > the
> >> > > > > > > > I/O
> >> > > > > > > > > > latency has
> >> > > > > > > > > > > >> >> > indeed become a bottleneck for each subtask.
> >> While
> >> > > > the
> >> > > > > > > > > > distributed file
> >> > > > > > > > > > > >> >> > system is a prime example of a scenario
> >> > > characterized
> >> > > > > by
> >> > > > > > > > > > abundant and
> >> > > > > > > > > > > >> >> > easily scalable I/O bandwidth coupled with
> >> higher
> >> > > I/O
> >> > > > > > > > latency.
> >> > > > > > > > > > You may
> >> > > > > > > > > > > >> >> > expect to increase the parallelism of a job
> to
> >> > > > enhance
> >> > > > > > the
> >> > > > > > > > > > performance as
> >> > > > > > > > > > > >> >> > well, but that also brings in more waste of
> >> CPU's
> >> > > and
> >> > > > > > > memory
> >> > > > > > > > > for
> >> > > > > > > > > > building
> >> > > > > > > > > > > >> >> > up more subtasks. This is one drawback for
> the
> >> > > > > > > > > > computation-storage tightly
> >> > > > > > > > > > > >> >> > coupled nodes. While in our proposal, the
> >> parallel
> >> > > > I/O
> >> > > > > > with
> >> > > > > > > > all
> >> > > > > > > > > > the
> >> > > > > > > > > > > >> >> > callbacks still running in one task,
> >> pre-allocated
> >> > > > > > > > > computational
> >> > > > > > > > > > resources
> >> > > > > > > > > > > >> >> > are better utilized. It is a much more
> >> lightweight
> >> > > > way
> >> > > > > to
> >> > > > > > > > > > perform parallel
> >> > > > > > > > > > > >> >> > I/O.
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> > Just with what granularity those async
> >> requests
> >> > > > should
> >> > > > > be
> >> > > > > > > > made.
> >> > > > > > > > > > > >> >> > > Making state access asynchronous is
> >> definitely
> >> > > the
> >> > > > > > right
> >> > > > > > > > way
> >> > > > > > > > > > to go!
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> > I think the current proposal is based on
> such
> >> core
> >> > > > > ideas:
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> >    - A pure cloud-native disaggregated
> state.
> >> > > > > > > > > > > >> >> >    - Fully utilize the given resources and
> >> try not
> >> > > to
> >> > > > > > waste
> >> > > > > > > > > them
> >> > > > > > > > > > (including
> >> > > > > > > > > > > >> >> >    I/O).
> >> > > > > > > > > > > >> >> >    - The ability to scale isolated resources
> >> (I/O
> >> > > or
> >> > > > > CPU
> >> > > > > > or
> >> > > > > > > > > > memory)
> >> > > > > > > > > > > >> >> >    independently.
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> > We think a fine-grained granularity is more
> >> inline
> >> > > > with
> >> > > > > > > those
> >> > > > > > > > > > ideas,
> >> > > > > > > > > > > >> >> > especially without local disk assumptions
> and
> >> > > without
> >> > > > > any
> >> > > > > > > > waste
> >> > > > > > > > > > of I/O by
> >> > > > > > > > > > > >> >> > prefetching. Please note that it is not a
> >> > > replacement
> >> > > > > of
> >> > > > > > > the
> >> > > > > > > > > > original local
> >> > > > > > > > > > > >> >> > state with synchronous execution. Instead
> >> this is a
> >> > > > > > > solution
> >> > > > > > > > > > embracing the
> >> > > > > > > > > > > >> >> > cloud-native era, providing much more
> >> scalability
> >> > > and
> >> > > > > > > > resource
> >> > > > > > > > > > efficiency
> >> > > > > > > > > > > >> >> > when handling a *huge state*.
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> > What also worries me a lot in this fine
> >> grained
> >> > > model
> >> > > > > is
> >> > > > > > > the
> >> > > > > > > > > > effect on the
> >> > > > > > > > > > > >> >> > > checkpointing times.
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> > Your concerns are very reasonable. Faster
> >> > > > checkpointing
> >> > > > > > is
> >> > > > > > > > > > always a core
> >> > > > > > > > > > > >> >> > advantage of disaggregated state, but only
> >> for the
> >> > > > > async
> >> > > > > > > > phase.
> >> > > > > > > > > > There will
> >> > > > > > > > > > > >> >> > be some complexity introduced by in-flight
> >> > > requests,
> >> > > > > but
> >> > > > > > > I'd
> >> > > > > > > > > > suggest a
> >> > > > > > > > > > > >> >> > checkpoint containing those in-flight state
> >> > > requests
> >> > > > as
> >> > > > > > > part
> >> > > > > > > > of
> >> > > > > > > > > > the state,
> >> > > > > > > > > > > >> >> > to accelerate the sync phase by skipping the
> >> buffer
> >> > > > > > > draining.
> >> > > > > > > > > > This makes
> >> > > > > > > > > > > >> >> > the buffer size have little impact on
> >> checkpoint
> >> > > > time.
> >> > > > > > And
> >> > > > > > > > all
> >> > > > > > > > > > the changes
> >> > > > > > > > > > > >> >> > keep within the execution model we proposed
> >> while
> >> > > the
> >> > > > > > > > > checkpoint
> >> > > > > > > > > > barrier
> >> > > > > > > > > > > >> >> > alignment or handling will not be touched in
> >> our
> >> > > > > > proposal,
> >> > > > > > > > so I
> >> > > > > > > > > > guess
> >> > > > > > > > > > > >> >> > the complexity is relatively controllable. I
> >> have
> >> > > > faith
> >> > > > > > in
> >> > > > > > > > that
> >> > > > > > > > > > :)
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> > Also regarding the overheads, it would be
> >> great if
> >> > > > you
> >> > > > > > > could
> >> > > > > > > > > > provide
> >> > > > > > > > > > > >> >> > > profiling results of the benchmarks that
> you
> >> > > > > conducted
> >> > > > > > to
> >> > > > > > > > > > verify the
> >> > > > > > > > > > > >> >> > > results. Or maybe if you could describe
> the
> >> steps
> >> > > > to
> >> > > > > > > > > reproduce
> >> > > > > > > > > > the
> >> > > > > > > > > > > >> >> > results?
> >> > > > > > > > > > > >> >> > > Especially "Hashmap (sync)" vs "Hashmap
> with
> >> > > async
> >> > > > > > API".
> >> > > > > > > > > > > >> >> > >
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> > Yes we could profile the benchmarks. And for
> >> the
> >> > > > > > comparison
> >> > > > > > > > of
> >> > > > > > > > > > "Hashmap
> >> > > > > > > > > > > >> >> > (sync)" vs "Hashmap with async API", we
> >> conduct a
> >> > > > > > Wordcount
> >> > > > > > > > job
> >> > > > > > > > > > written
> >> > > > > > > > > > > >> >> > with async APIs but disabling the async
> >> execution
> >> > > by
> >> > > > > > > directly
> >> > > > > > > > > > completing
> >> > > > > > > > > > > >> >> > the future using sync state access. This
> >> evaluates
> >> > > > the
> >> > > > > > > > overhead
> >> > > > > > > > > > of newly
> >> > > > > > > > > > > >> >> > introduced modules like 'AEC' in sync
> >> execution
> >> > > (even
> >> > > > > > > though
> >> > > > > > > > > > they are not
> >> > > > > > > > > > > >> >> > designed for it). The code will be provided
> >> later.
> >> > > > For
> >> > > > > > > other
> >> > > > > > > > > > results of our
> >> > > > > > > > > > > >> >> > PoC[1], you can follow the instructions
> >> here[2] to
> >> > > > > > > reproduce.
> >> > > > > > > > > > Since the
> >> > > > > > > > > > > >> >> > compilation may take some effort, we will
> >> directly
> >> > > > > > provide
> >> > > > > > > > the
> >> > > > > > > > > > jar for
> >> > > > > > > > > > > >> >> > testing next week.
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> > And @Yunfeng Zhou, I have noticed your mail
> >> but it
> >> > > > is a
> >> > > > > > bit
> >> > > > > > > > > late
> >> > > > > > > > > > in my
> >> > > > > > > > > > > >> >> > local time and the next few days are
> >> weekends. So I
> >> > > > > will
> >> > > > > > > > reply
> >> > > > > > > > > > to you
> >> > > > > > > > > > > >> >> > later. Thanks for your response!
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> > [1]
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-PoCResults
> >> > > > > > > > > > > >> >> > [2]
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-Appendix:HowtorunthePoC
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> > Best,
> >> > > > > > > > > > > >> >> > Zakelly
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> > On Fri, Mar 1, 2024 at 6:38 PM Yunfeng Zhou
> <
> >> > > > > > > > > > flink.zhouyunf...@gmail.com>
> >> > > > > > > > > > > >> >> > wrote:
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > > > >> >> > > Hi,
> >> > > > > > > > > > > >> >> > >
> >> > > > > > > > > > > >> >> > > Thanks for proposing this design! I just
> >> read
> >> > > > > FLIP-424
> >> > > > > > > and
> >> > > > > > > > > > FLIP-425
> >> > > > > > > > > > > >> >> > > and have some questions about the proposed
> >> > > changes.
> >> > > > > > > > > > > >> >> > >
> >> > > > > > > > > > > >> >> > > For Async API (FLIP-424)
> >> > > > > > > > > > > >> >> > >
> >> > > > > > > > > > > >> >> > > 1. Why do we need a close() method on
> >> > > > StateIterator?
> >> > > > > > This
> >> > > > > > > > > > method seems
> >> > > > > > > > > > > >> >> > > unused in the usage example codes.
> >> > > > > > > > > > > >> >> > >
> >> > > > > > > > > > > >> >> > > 2. In FutureUtils.combineAll()'s JavaDoc,
> >> it is
> >> > > > > stated
> >> > > > > > > that
> >> > > > > > > > > > "No null
> >> > > > > > > > > > > >> >> > > entries are allowed". It might be better
> to
> >> > > further
> >> > > > > > > explain
> >> > > > > > > > > > what will
> >> > > > > > > > > > > >> >> > > happen if a null value is passed, ignoring
> >> the
> >> > > > value
> >> > > > > in
> >> > > > > > > the
> >> > > > > > > > > > returned
> >> > > > > > > > > > > >> >> > > Collection or throwing exceptions. Given
> >> that
> >> > > > > > > > > > > >> >> > > FutureUtils.emptyFuture() can be returned
> >> in the
> >> > > > > > example
> >> > > > > > > > > code,
> >> > > > > > > > > > I
> >> > > > > > > > > > > >> >> > > suppose the former one might be correct.
> >> > > > > > > > > > > >> >> > >
> >> > > > > > > > > > > >> >> > >
> >> > > > > > > > > > > >> >> > > For Async Execution (FLIP-425)
> >> > > > > > > > > > > >> >> > >
> >> > > > > > > > > > > >> >> > > 1. According to Fig 2 of this FLIP, if a
> >> recordB
> >> > > > has
> >> > > > > > its
> >> > > > > > > > key
> >> > > > > > > > > > collide
> >> > > > > > > > > > > >> >> > > with an ongoing recordA, its
> >> processElement()
> >> > > > method
> >> > > > > > can
> >> > > > > > > > > still
> >> > > > > > > > > > be
> >> > > > > > > > > > > >> >> > > triggered immediately, and then it might
> be
> >> moved
> >> > > > to
> >> > > > > > the
> >> > > > > > > > > > blocking
> >> > > > > > > > > > > >> >> > > buffer in AEC if it involves state
> >> operations.
> >> > > This
> >> > > > > > means
> >> > > > > > > > > that
> >> > > > > > > > > > > >> >> > > recordB's output will precede recordA's
> >> output in
> >> > > > > > > > downstream
> >> > > > > > > > > > > >> >> > > operators, if recordA involves state
> >> operations
> >> > > > while
> >> > > > > > > > recordB
> >> > > > > > > > > > does
> >> > > > > > > > > > > >> >> > > not. This will harm the correctness of
> >> Flink jobs
> >> > > > in
> >> > > > > > some
> >> > > > > > > > use
> >> > > > > > > > > > cases.
> >> > > > > > > > > > > >> >> > > For example, in dim table join cases,
> >> recordA
> >> > > could
> >> > > > > be
> >> > > > > > a
> >> > > > > > > > > delete
> >> > > > > > > > > > > >> >> > > operation that involves state access,
> while
> >> > > recordB
> >> > > > > > could
> >> > > > > > > > be
> >> > > > > > > > > > an insert
> >> > > > > > > > > > > >> >> > > operation that needs to visit external
> >> storage
> >> > > > > without
> >> > > > > > > > state
> >> > > > > > > > > > access.
> >> > > > > > > > > > > >> >> > > If recordB's output precedes recordA's,
> >> then an
> >> > > > entry
> >> > > > > > > that
> >> > > > > > > > is
> >> > > > > > > > > > supposed
> >> > > > > > > > > > > >> >> > > to finally exist with recordB's value in
> >> the sink
> >> > > > > table
> >> > > > > > > > might
> >> > > > > > > > > > actually
> >> > > > > > > > > > > >> >> > > be deleted according to recordA's command.
> >> This
> >> > > > > > situation
> >> > > > > > > > > > should be
> >> > > > > > > > > > > >> >> > > avoided and the order of same-key records
> >> should
> >> > > be
> >> > > > > > > > strictly
> >> > > > > > > > > > > >> >> > > preserved.
> >> > > > > > > > > > > >> >> > >
> >> > > > > > > > > > > >> >> > > 2. The FLIP says that StateRequests
> >> submitted by
> >> > > > > > > Callbacks
> >> > > > > > > > > > will not
> >> > > > > > > > > > > >> >> > > invoke further yield() methods. Given that
> >> > > yield()
> >> > > > is
> >> > > > > > > used
> >> > > > > > > > > > when there
> >> > > > > > > > > > > >> >> > > is "too much" in-flight data, does it mean
> >> > > > > > StateRequests
> >> > > > > > > > > > submitted by
> >> > > > > > > > > > > >> >> > > Callbacks will never be "too much"? What
> if
> >> the
> >> > > > total
> >> > > > > > > > number
> >> > > > > > > > > of
> >> > > > > > > > > > > >> >> > > StateRequests exceed the capacity of Flink
> >> > > > operator's
> >> > > > > > > > memory
> >> > > > > > > > > > space?
> >> > > > > > > > > > > >> >> > >
> >> > > > > > > > > > > >> >> > > 3. In the "Watermark" section, this FLIP
> >> provided
> >> > > > an
> >> > > > > > > > > > out-of-order
> >> > > > > > > > > > > >> >> > > execution mode apart from the default
> >> > > > > strictly-ordered
> >> > > > > > > > mode,
> >> > > > > > > > > > which can
> >> > > > > > > > > > > >> >> > > optimize performance by allowing more
> >> concurrent
> >> > > > > > > > executions.
> >> > > > > > > > > > > >> >> > >
> >> > > > > > > > > > > >> >> > > 3.1 I'm concerned that the out-of-order
> >> execution
> >> > > > > mode,
> >> > > > > > > > along
> >> > > > > > > > > > with the
> >> > > > > > > > > > > >> >> > > epoch mechanism, would bring more
> >> complexity to
> >> > > the
> >> > > > > > > > execution
> >> > > > > > > > > > model
> >> > > > > > > > > > > >> >> > > than the performance improvement it
> >> promises.
> >> > > Could
> >> > > > > we
> >> > > > > > > add
> >> > > > > > > > > some
> >> > > > > > > > > > > >> >> > > benchmark results proving the benefit of
> >> this
> >> > > mode?
> >> > > > > > > > > > > >> >> > >
> >> > > > > > > > > > > >> >> > > 3.2 The FLIP might need to add a public
> API
> >> > > section
> >> > > > > > > > > describing
> >> > > > > > > > > > how
> >> > > > > > > > > > > >> >> > > users or developers can switch between
> >> these two
> >> > > > > > > execution
> >> > > > > > > > > > modes.
> >> > > > > > > > > > > >> >> > >
> >> > > > > > > > > > > >> >> > > 3.3 Apart from the watermark and
> checkpoint
> >> > > > mentioned
> >> > > > > > in
> >> > > > > > > > this
> >> > > > > > > > > > FLIP,
> >> > > > > > > > > > > >> >> > > there are also more other events that
> might
> >> > > appear
> >> > > > in
> >> > > > > > the
> >> > > > > > > > > > stream of
> >> > > > > > > > > > > >> >> > > data records. It might be better to
> >> generalize
> >> > > the
> >> > > > > > > > execution
> >> > > > > > > > > > mode
> >> > > > > > > > > > > >> >> > > mechanism to handle all possible events.
> >> > > > > > > > > > > >> >> > >
> >> > > > > > > > > > > >> >> > > 4. It might be better to treat
> >> callback-handling
> >> > > > as a
> >> > > > > > > > > > > >> >> > > MailboxDefaultAction, instead of Mails, to
> >> avoid
> >> > > > the
> >> > > > > > > > overhead
> >> > > > > > > > > > of
> >> > > > > > > > > > > >> >> > > repeatedly creating Mail objects.
> >> > > > > > > > > > > >> >> > >
> >> > > > > > > > > > > >> >> > > 5. Could this FLIP provide the current
> >> default
> >> > > > values
> >> > > > > > for
> >> > > > > > > > > > things like
> >> > > > > > > > > > > >> >> > > active buffer size thresholds and
> timeouts?
> >> These
> >> > > > > could
> >> > > > > > > > help
> >> > > > > > > > > > with
> >> > > > > > > > > > > >> >> > > memory consumption and latency analysis.
> >> > > > > > > > > > > >> >> > >
> >> > > > > > > > > > > >> >> > > 6. Why do we need to record the hashcode
> of
> >> a
> >> > > > record
> >> > > > > in
> >> > > > > > > its
> >> > > > > > > > > > > >> >> > > RecordContext? It seems not used.
> >> > > > > > > > > > > >> >> > >
> >> > > > > > > > > > > >> >> > > 7. In "timers can be stored on the JVM
> heap
> >> or
> >> > > > > > RocksDB",
> >> > > > > > > > the
> >> > > > > > > > > > link
> >> > > > > > > > > > > >> >> > > points to a document in flink-1.15. It
> >> might be
> >> > > > > better
> >> > > > > > to
> >> > > > > > > > > > verify the
> >> > > > > > > > > > > >> >> > > referenced content is still valid in the
> >> latest
> >> > > > Flink
> >> > > > > > and
> >> > > > > > > > > > update the
> >> > > > > > > > > > > >> >> > > link accordingly. Same for other
> references
> >> if
> >> > > any.
> >> > > > > > > > > > > >> >> > >
> >> > > > > > > > > > > >> >> > > Best,
> >> > > > > > > > > > > >> >> > > Yunfeng Zhou
> >> > > > > > > > > > > >> >> > >
> >> > > > > > > > > > > >> >> > > On Thu, Feb 29, 2024 at 2:17 PM Yuan Mei <
> >> > > > > > > > > > yuanmei.w...@gmail.com> wrote:
> >> > > > > > > > > > > >> >> > > >
> >> > > > > > > > > > > >> >> > > > Hi Devs,
> >> > > > > > > > > > > >> >> > > >
> >> > > > > > > > > > > >> >> > > > This is a joint work of Yuan Mei,
> Zakelly
> >> Lan,
> >> > > > > > Jinzhong
> >> > > > > > > > Li,
> >> > > > > > > > > > Hangxiang
> >> > > > > > > > > > > >> >> > Yu,
> >> > > > > > > > > > > >> >> > > > Yanfei Lei and Feng Wang. We'd like to
> >> start a
> >> > > > > > > discussion
> >> > > > > > > > > > about
> >> > > > > > > > > > > >> >> > > introducing
> >> > > > > > > > > > > >> >> > > > Disaggregated State Storage and
> >> Management in
> >> > > > Flink
> >> > > > > > > 2.0.
> >> > > > > > > > > > > >> >> > > >
> >> > > > > > > > > > > >> >> > > > The past decade has witnessed a dramatic
> >> shift
> >> > > in
> >> > > > > > > Flink's
> >> > > > > > > > > > deployment
> >> > > > > > > > > > > >> >> > > mode,
> >> > > > > > > > > > > >> >> > > > workload patterns, and hardware
> >> improvements.
> >> > > > We've
> >> > > > > > > moved
> >> > > > > > > > > > from the
> >> > > > > > > > > > > >> >> > > > map-reduce era where workers are
> >> > > > > computation-storage
> >> > > > > > > > > tightly
> >> > > > > > > > > > coupled
> >> > > > > > > > > > > >> >> > > nodes
> >> > > > > > > > > > > >> >> > > > to a cloud-native world where
> >> containerized
> >> > > > > > deployments
> >> > > > > > > > on
> >> > > > > > > > > > Kubernetes
> >> > > > > > > > > > > >> >> > > > become standard. To enable Flink's
> >> Cloud-Native
> >> > > > > > future,
> >> > > > > > > > we
> >> > > > > > > > > > introduce
> >> > > > > > > > > > > >> >> > > > Disaggregated State Storage and
> >> Management that
> >> > > > > uses
> >> > > > > > > DFS
> >> > > > > > > > as
> >> > > > > > > > > > primary
> >> > > > > > > > > > > >> >> > > storage
> >> > > > > > > > > > > >> >> > > > in Flink 2.0, as promised in the Flink
> 2.0
> >> > > > Roadmap.
> >> > > > > > > > > > > >> >> > > >
> >> > > > > > > > > > > >> >> > > > Design Details can be found in
> >> FLIP-423[1].
> >> > > > > > > > > > > >> >> > > >
> >> > > > > > > > > > > >> >> > > > This new architecture is aimed to solve
> >> the
> >> > > > > following
> >> > > > > > > > > > challenges
> >> > > > > > > > > > > >> >> > brought
> >> > > > > > > > > > > >> >> > > in
> >> > > > > > > > > > > >> >> > > > the cloud-native era for Flink.
> >> > > > > > > > > > > >> >> > > > 1. Local Disk Constraints in
> >> containerization
> >> > > > > > > > > > > >> >> > > > 2. Spiky Resource Usage caused by
> >> compaction in
> >> > > > the
> >> > > > > > > > current
> >> > > > > > > > > > state model
> >> > > > > > > > > > > >> >> > > > 3. Fast Rescaling for jobs with large
> >> states
> >> > > > > > (hundreds
> >> > > > > > > of
> >> > > > > > > > > > Terabytes)
> >> > > > > > > > > > > >> >> > > > 4. Light and Fast Checkpoint in a native
> >> way
> >> > > > > > > > > > > >> >> > > >
> >> > > > > > > > > > > >> >> > > > More specifically, we want to reach a
> >> consensus
> >> > > > on
> >> > > > > > the
> >> > > > > > > > > > following issues
> >> > > > > > > > > > > >> >> > > in
> >> > > > > > > > > > > >> >> > > > this discussion:
> >> > > > > > > > > > > >> >> > > >
> >> > > > > > > > > > > >> >> > > > 1. Overall design
> >> > > > > > > > > > > >> >> > > > 2. Proposed Changes
> >> > > > > > > > > > > >> >> > > > 3. Design details to achieve Milestone1
> >> > > > > > > > > > > >> >> > > >
> >> > > > > > > > > > > >> >> > > > In M1, we aim to achieve an end-to-end
> >> baseline
> >> > > > > > version
> >> > > > > > > > > > using DFS as
> >> > > > > > > > > > > >> >> > > > primary storage and complete core
> >> > > > functionalities,
> >> > > > > > > > > including:
> >> > > > > > > > > > > >> >> > > >
> >> > > > > > > > > > > >> >> > > > - Asynchronous State APIs (FLIP-424)[2]:
> >> > > > Introduce
> >> > > > > > new
> >> > > > > > > > APIs
> >> > > > > > > > > > for
> >> > > > > > > > > > > >> >> > > > asynchronous state access.
> >> > > > > > > > > > > >> >> > > > - Asynchronous Execution Model
> >> (FLIP-425)[3]:
> >> > > > > > > Implement a
> >> > > > > > > > > > non-blocking
> >> > > > > > > > > > > >> >> > > > execution model leveraging the
> >> asynchronous
> >> > > APIs
> >> > > > > > > > introduced
> >> > > > > > > > > > in
> >> > > > > > > > > > > >> >> > FLIP-424.
> >> > > > > > > > > > > >> >> > > > - Grouping Remote State Access
> >> (FLIP-426)[4]:
> >> > > > > Enable
> >> > > > > > > > > > retrieval of
> >> > > > > > > > > > > >> >> > remote
> >> > > > > > > > > > > >> >> > > > state data in batches to avoid
> unnecessary
> >> > > > > round-trip
> >> > > > > > > > costs
> >> > > > > > > > > > for remote
> >> > > > > > > > > > > >> >> > > > access
> >> > > > > > > > > > > >> >> > > > - Disaggregated State Store
> (FLIP-427)[5]:
> >> > > > > Introduce
> >> > > > > > > the
> >> > > > > > > > > > initial
> >> > > > > > > > > > > >> >> > version
> >> > > > > > > > > > > >> >> > > of
> >> > > > > > > > > > > >> >> > > > the ForSt disaggregated state store.
> >> > > > > > > > > > > >> >> > > > - Fault Tolerance/Rescale Integration
> >> > > > > (FLIP-428)[6]:
> >> > > > > > > > > > Integrate
> >> > > > > > > > > > > >> >> > > > checkpointing mechanisms with the
> >> disaggregated
> >> > > > > state
> >> > > > > > > > store
> >> > > > > > > > > > for fault
> >> > > > > > > > > > > >> >> > > > tolerance and fast rescaling.
> >> > > > > > > > > > > >> >> > > >
> >> > > > > > > > > > > >> >> > > > We will vote on each FLIP in separate
> >> threads
> >> > > to
> >> > > > > make
> >> > > > > > > > sure
> >> > > > > > > > > > each FLIP
> >> > > > > > > > > > > >> >> > > > reaches a consensus. But we want to keep
> >> the
> >> > > > > > discussion
> >> > > > > > > > > > within a
> >> > > > > > > > > > > >> >> > focused
> >> > > > > > > > > > > >> >> > > > thread (this thread) for easier tracking
> >> of
> >> > > > > contexts
> >> > > > > > to
> >> > > > > > > > > avoid
> >> > > > > > > > > > > >> >> > duplicated
> >> > > > > > > > > > > >> >> > > > questions/discussions and also to think
> >> of the
> >> > > > > > > > > > problem/solution in a
> >> > > > > > > > > > > >> >> > full
> >> > > > > > > > > > > >> >> > > > picture.
> >> > > > > > > > > > > >> >> > > >
> >> > > > > > > > > > > >> >> > > > Looking forward to your feedback
> >> > > > > > > > > > > >> >> > > >
> >> > > > > > > > > > > >> >> > > > Best,
> >> > > > > > > > > > > >> >> > > > Yuan, Zakelly, Jinzhong, Hangxiang,
> >> Yanfei and
> >> > > > Feng
> >> > > > > > > > > > > >> >> > > >
> >> > > > > > > > > > > >> >> > > > [1]
> >> > > https://cwiki.apache.org/confluence/x/R4p3EQ
> >> > > > > > > > > > > >> >> > > > [2]
> >> > > https://cwiki.apache.org/confluence/x/SYp3EQ
> >> > > > > > > > > > > >> >> > > > [3]
> >> > > https://cwiki.apache.org/confluence/x/S4p3EQ
> >> > > > > > > > > > > >> >> > > > [4]
> >> > > https://cwiki.apache.org/confluence/x/TYp3EQ
> >> > > > > > > > > > > >> >> > > > [5]
> >> > > https://cwiki.apache.org/confluence/x/T4p3EQ
> >> > > > > > > > > > > >> >> > > > [6]
> >> > > https://cwiki.apache.org/confluence/x/UYp3EQ
> >> > > > > > > > > > > >> >> > >
> >> > > > > > > > > > > >> >> >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >>
> >
>

Reply via email to