Hi everyone,

Piotr and I had a long discussion about the checkpoint duration issue. We
think that the lambda serialization approach I proposed in last mail may
bring in more issues, the most serious one is that users may not be able to
modify their code in serialized lambda to perform a bug fix.

But fortunately we found a possible solution. By introducing a set of
declarative APIs and a `declareProcess()` function that users should
implement in some newly introduced AbstractStreamOperator, we could get the
declaration of record processing in runtime, broken down to requests and
callbacks (lambdas) like FLIP-424 introduced. Thus we avoid the problem of
lambda (de)serialization and instead we retrieve callbacks every time
before a task runs. The next step is to provide an API allowing users to
assign an unique id to each state request and callback, or automatically
assign one by declaration order. Thus we can find the corresponding
callback in runtime for each restored state request based on the id, then
the whole pipeline can be resumed.

Note that all these above are internal and won't expose to average users.
Exposing this on Stream APIs can be discussed later. I will prepare another
FLIP(s) describing the whole optimized checkpointing process, and in the
meantime, we can proceed on current FLIPs. The new FLIP(s) are built on top
of current ones and we can achieve this incrementally.


Best,
Zakelly

On Thu, Mar 21, 2024 at 12:31 AM Zakelly Lan <zakelly....@gmail.com> wrote:

> Hi Piotrek,
>
> Thanks for your comments!
>
> As we discussed off-line, you agreed that we can not checkpoint while some
>> records are in the middle of being
>> processed. That we would have to drain the in-progress records before
>> doing
>> the checkpoint. You also argued
>> that this is not a problem, because the size of this buffer can be
>> configured.
>>
>> I'm really afraid of such a solution. I've seen in the past plenty of
>> times, that whenever Flink has to drain some
>> buffered records, eventually that always brakes timely checkpointing (and
>> hence ability for Flink to rescale in
>> a timely manner). Even a single record with a `flatMap` like operator
>> currently in Flink causes problems during
>> back pressure. That's especially true for example for processing
>> watermarks. At the same time, I don't see how
>> this value could be configured by even Flink's power users, let alone an
>> average user. The size of that in-flight
>> buffer not only depends on a particular query/job, but also the "good"
>> value changes dynamically over time,
>> and can change very rapidly. Sudden spikes of records or backpressure,
>> some
>> hiccup during emitting watermarks,
>> all of those could change in an instant the theoretically optimal buffer
>> size of let's say "6000" records, down to "1".
>> And when those changes happen, those are the exact times when timely
>> checkpointing matters the most.
>> If the load pattern suddenly changes, and checkpointing takes suddenly
>> tens
>> of minutes instead of a couple of
>> seconds, it means you can not use rescaling and you are forced to
>> overprovision the resources. And there also
>> other issues if checkpointing takes too long.
>>
>
> I'm gonna clarify some misunderstanding here. First of all, is the sync
> phase of checkpointing for the current plan longer than the synchronous
> execution model? The answer is yes, it is a trade-off for parallel
> execution model. I think the cost is worth the improvement. Now the
> question is, how much longer are we talking about? The PoC result I
> provided is that it takes 3 seconds to drain 6000 records of a simple job,
> and I said it is not a big deal. Even though you would say we may encounter
> long watermark/timer processing that make the cp wait, thus I provide
> several ways to optimize this:
>
>    1. Instead of only controlling the in-flight records, we could control
>    the in-flight watermark.
>    2. Since we have broken down the record processing into several state
>    requests with at most one subsequent callback for each request, the cp can
>    be processed after current RUNNING requests (NOT records) and their
>    callbacks finish. Which means, even though we have a lot of records
>    in-flight (I mean in 'processElement' here), once only a small group of
>    state requests finishes, the cp can proceed. They will form into 1 or 2
>    multiGets to rocksdb, which takes less time. Moreover, the timer processing
>    is also split into several requests, so cp won't wait for the whole timer
>    to finish. The picture attached describes this idea.
>
> And the size of this buffer can be configured. I'm not counting on average
> users to configure it well, I'm just saying that we'd better not focus on
> absolute numbers of PoC or specific cases since we can always provide
> a conservative default value to make this acceptable for most cases.The
> adaptive buffer size is also worth a try if we provide a conservative
> strategy.
>
> Besides, I don't understand why the load pattern or spikes or backpressure
> will affect this. We are controlling the records that can get in the
> 'processElement' and the state requests that can fire in parallel, no
> matter how high the load spikes, they will be blocked outside. It is
> relatively stable within the proposed execution model itself. The unaligned
> barrier will skip those inputs in the queue as before.
>
>
> At the same time, I still don't understand why we can not implement things
>> incrementally? First
>> let's start with the current API, without the need to rewrite all of the
>> operators, we can asynchronously fetch whole
>> state for a given record using its key. That should already vastly improve
>> many things, and this way we could
>> perform a checkpoint without a need of draining the in-progress/in-flight
>> buffer. We could roll that version out,
>> test it out in practice, and then we could see if the fine grained state
>> access is really needed. Otherwise it sounds
>> to me like a premature optimization, that requires us to not only rewrite
>> a
>> lot of the code, but also to later maintain
>> it, even if it ultimately proves to be not needed. Which I of course can
>> not be certain but I have a feeling that it
>> might be the case.
>
>
> The disaggregated state management we proposed is target at including but
> not limited to the following challenges:
>
>    1. Local disk constraints, including limited I/O and space. (discussed
>    in FLIP-423)
>    2. Unbind the I/O resource with pre-allocated CPU resource, to make
>    good use of both (motivation of FLIP-424)
>    3. Elasticity of scaling I/O or storage capacity.
>
> Thus our plan is dependent on DFS, using local disk as an optional cache
> only. The pre-fetching plan you mentioned is still binding I/O with CPU
> resources, and will consume even more I/O to load unnecessary state. It
> makes things worse. Please note that we are not targeting some scenarios
> where the local state could handle well, and our goal is not to replace the
> local state.
>
> And If manpower is a big concern of yours, I would say many of my
> colleagues could help contribute in runtime or SQL operators. It is
> experimental on a separate code path other than the local state and will be
> recommended to use only when we prove it mature.
>
>
> Thanks & Best,
> Zakelly
>
> On Wed, Mar 20, 2024 at 10:04 PM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
>> Hey Zakelly!
>>
>> Sorry for the late reply. I still have concerns about the proposed
>> solution, with my main concerns coming from
>> the implications of the asynchronous state access API on the checkpointing
>> and responsiveness of Flink.
>>
>> >> What also worries me a lot in this fine grained model is the effect on
>> the checkpointing times.
>> >
>> > Your concerns are very reasonable. Faster checkpointing is always a core
>> advantage of disaggregated state,
>> > but only for the async phase. There will be some complexity introduced
>> by
>> in-flight requests, but I'd suggest
>> > a checkpoint containing those in-flight state requests as part of the
>> state, to accelerate the sync phase by
>> > skipping the buffer draining. This makes the buffer size have little
>> impact on checkpoint time. And all the
>> > changes keep within the execution model we proposed while the checkpoint
>> barrier alignment or handling
>> > will not be touched in our proposal, so I guess the complexity is
>> relatively controllable. I have faith in that :)
>>
>> As we discussed off-line, you agreed that we can not checkpoint while some
>> records are in the middle of being
>> processed. That we would have to drain the in-progress records before
>> doing
>> the checkpoint. You also argued
>> that this is not a problem, because the size of this buffer can be
>> configured.
>>
>> I'm really afraid of such a solution. I've seen in the past plenty of
>> times, that whenever Flink has to drain some
>> buffered records, eventually that always brakes timely checkpointing (and
>> hence ability for Flink to rescale in
>> a timely manner). Even a single record with a `flatMap` like operator
>> currently in Flink causes problems during
>> back pressure. That's especially true for example for processing
>> watermarks. At the same time, I don't see how
>> this value could be configured by even Flink's power users, let alone an
>> average user. The size of that in-flight
>> buffer not only depends on a particular query/job, but also the "good"
>> value changes dynamically over time,
>> and can change very rapidly. Sudden spikes of records or backpressure,
>> some
>> hiccup during emitting watermarks,
>> all of those could change in an instant the theoretically optimal buffer
>> size of let's say "6000" records, down to "1".
>> And when those changes happen, those are the exact times when timely
>> checkpointing matters the most.
>> If the load pattern suddenly changes, and checkpointing takes suddenly
>> tens
>> of minutes instead of a couple of
>> seconds, it means you can not use rescaling and you are forced to
>> overprovision the resources. And there also
>> other issues if checkpointing takes too long.
>>
>> At the same time, I still don't understand why we can not implement things
>> incrementally? First
>> let's start with the current API, without the need to rewrite all of the
>> operators, we can asynchronously fetch whole
>> state for a given record using its key. That should already vastly improve
>> many things, and this way we could
>> perform a checkpoint without a need of draining the in-progress/in-flight
>> buffer. We could roll that version out,
>> test it out in practice, and then we could see if the fine grained state
>> access is really needed. Otherwise it sounds
>> to me like a premature optimization, that requires us to not only rewrite
>> a
>> lot of the code, but also to later maintain
>> it, even if it ultimately proves to be not needed. Which I of course can
>> not be certain but I have a feeling that it
>> might be the case.
>>
>> Best,
>> Piotrek
>>
>> wt., 19 mar 2024 o 10:42 Zakelly Lan <zakelly....@gmail.com> napisał(a):
>>
>> > Hi everyone,
>> >
>> > Thanks for your valuable feedback!
>> >
>> > Our discussions have been going on for a while and are nearing a
>> > consensus. So I would like to start a vote after 72 hours.
>> >
>> > Please let me know if you have any concerns, thanks!
>> >
>> >
>> > Best,
>> > Zakelly
>> >
>> > On Tue, Mar 19, 2024 at 3:37 PM Zakelly Lan <zakelly....@gmail.com>
>> wrote:
>> >
>> > > Hi Yunfeng,
>> > >
>> > > Thanks for the suggestion!
>> > >
>> > > I will reorganize the FLIP-425 accordingly.
>> > >
>> > >
>> > > Best,
>> > > Zakelly
>> > >
>> > > On Tue, Mar 19, 2024 at 3:20 PM Yunfeng Zhou <
>> > flink.zhouyunf...@gmail.com>
>> > > wrote:
>> > >
>> > >> Hi Xintong and Zakelly,
>> > >>
>> > >> > 2. Regarding Strictly-ordered and Out-of-order of Watermarks
>> > >> I agree with it that watermarks can use only out-of-order mode for
>> > >> now, because there is still not a concrete example showing the
>> > >> correctness risk about it. However, the strictly-ordered mode should
>> > >> still be supported as the default option for non-record event types
>> > >> other than watermark, at least for checkpoint barriers.
>> > >>
>> > >> I noticed that this information has already been documented in "For
>> > >> other non-record events, such as RecordAttributes ...", but it's at
>> > >> the bottom of the "Watermark" section, which might not be very
>> > >> obvious. Thus it might be better to reorganize the FLIP to better
>> > >> claim that the two order modes are designed for all non-record
>> events,
>> > >> and which mode this FLIP would choose for each type of event.
>> > >>
>> > >> Best,
>> > >> Yunfeng
>> > >>
>> > >> On Tue, Mar 19, 2024 at 1:09 PM Xintong Song <tonysong...@gmail.com>
>> > >> wrote:
>> > >> >
>> > >> > Thanks for the quick response. Sounds good to me.
>> > >> >
>> > >> > Best,
>> > >> >
>> > >> > Xintong
>> > >> >
>> > >> >
>> > >> >
>> > >> > On Tue, Mar 19, 2024 at 1:03 PM Zakelly Lan <zakelly....@gmail.com
>> >
>> > >> wrote:
>> > >> >
>> > >> > > Hi Xintong,
>> > >> > >
>> > >> > > Thanks for sharing your thoughts!
>> > >> > >
>> > >> > > 1. Regarding Record-ordered and State-ordered of processElement.
>> > >> > > >
>> > >> > > > I understand that while State-ordered likely provides better
>> > >> performance,
>> > >> > > > Record-ordered is sometimes required for correctness. The
>> question
>> > >> is how
>> > >> > > > should a user choose between these two modes? My concern is
>> that
>> > >> such a
>> > >> > > > decision may require users to have in-depth knowledge about the
>> > >> Flink
>> > >> > > > internals, and may lead to correctness issues if State-ordered
>> is
>> > >> chosen
>> > >> > > > improperly.
>> > >> > > >
>> > >> > > > I'd suggest not to expose such a knob, at least in the first
>> > >> version.
>> > >> > > That
>> > >> > > > means always use Record-ordered for custom operators / UDFs,
>> and
>> > >> keep
>> > >> > > > State-ordered for internal usages (built-in operators) only.
>> > >> > > >
>> > >> > >
>> > >> > > Indeed, users may not be able to choose the mode properly. I
>> agree
>> > to
>> > >> keep
>> > >> > > such options for internal use.
>> > >> > >
>> > >> > >
>> > >> > > 2. Regarding Strictly-ordered and Out-of-order of Watermarks.
>> > >> > > >
>> > >> > > > I'm not entirely sure about Strictly-ordered being the
>> default, or
>> > >> even
>> > >> > > > being supported. From my understanding, a Watermark(T) only
>> > >> suggests that
>> > >> > > > all records with event time before T has arrived, and it has
>> > >> nothing to
>> > >> > > do
>> > >> > > > with whether records with event time after T has arrived or
>> not.
>> > >> From
>> > >> > > that
>> > >> > > > perspective, preventing certain records from arriving before a
>> > >> Watermark
>> > >> > > is
>> > >> > > > never supported. I also cannot come up with any use case where
>> > >> > > > Strictly-ordered is necessary. This implies the same issue as
>> 1):
>> > >> how
>> > >> > > does
>> > >> > > > the user choose between the two modes?
>> > >> > > >
>> > >> > > > I'd suggest not expose the knob to users and only support
>> > >> Out-of-order,
>> > >> > > > until we see a concrete use case that Strictly-ordered is
>> needed.
>> > >> > > >
>> > >> > >
>> > >> > > The semantics of watermarks do not define the sequence between a
>> > >> watermark
>> > >> > > and subsequent records. For the most part, this is
>> inconsequential,
>> > >> except
>> > >> > > it may affect some current users who have previously relied on
>> the
>> > >> implicit
>> > >> > > assumption of an ordered execution. I'd be fine with initially
>> > >> supporting
>> > >> > > only out-of-order processing. We may consider exposing the
>> > >> > > 'Strictly-ordered' mode once we encounter a concrete use case
>> that
>> > >> > > necessitates it.
>> > >> > >
>> > >> > >
>> > >> > > My philosophies behind not exposing the two config options are:
>> > >> > > > - There are already too many options in Flink that barely know
>> how
>> > >> to use
>> > >> > > > them. I think Flink should try as much as possible to decide
>> its
>> > own
>> > >> > > > behavior, rather than throwing all the decisions to the users.
>> > >> > > > - It's much harder to take back knobs than to introduce them.
>> > >> Therefore,
>> > >> > > > options should be introduced only if concrete use cases are
>> > >> identified.
>> > >> > > >
>> > >> > >
>> > >> > > I agree to keep minimal configurable items especially for the
>> MVP.
>> > >> Given
>> > >> > > that we have the opportunity to refine the functionality before
>> the
>> > >> > > framework transitions from @Experimental to @PublicEvolving, it
>> > makes
>> > >> sense
>> > >> > > to refrain from presenting user-facing options until we have
>> ensured
>> > >> > > their necessity.
>> > >> > >
>> > >> > >
>> > >> > > Best,
>> > >> > > Zakelly
>> > >> > >
>> > >> > > On Tue, Mar 19, 2024 at 12:06 PM Xintong Song <
>> > tonysong...@gmail.com>
>> > >> > > wrote:
>> > >> > >
>> > >> > > > Sorry for joining the discussion late.
>> > >> > > >
>> > >> > > > I have two questions about FLIP-425.
>> > >> > > >
>> > >> > > > 1. Regarding Record-ordered and State-ordered of
>> processElement.
>> > >> > > >
>> > >> > > > I understand that while State-ordered likely provides better
>> > >> performance,
>> > >> > > > Record-ordered is sometimes required for correctness. The
>> question
>> > >> is how
>> > >> > > > should a user choose between these two modes? My concern is
>> that
>> > >> such a
>> > >> > > > decision may require users to have in-depth knowledge about the
>> > >> Flink
>> > >> > > > internals, and may lead to correctness issues if State-ordered
>> is
>> > >> chosen
>> > >> > > > improperly.
>> > >> > > >
>> > >> > > > I'd suggest not to expose such a knob, at least in the first
>> > >> version.
>> > >> > > That
>> > >> > > > means always use Record-ordered for custom operators / UDFs,
>> and
>> > >> keep
>> > >> > > > State-ordered for internal usages (built-in operators) only.
>> > >> > > >
>> > >> > > > 2. Regarding Strictly-ordered and Out-of-order of Watermarks.
>> > >> > > >
>> > >> > > > I'm not entirely sure about Strictly-ordered being the
>> default, or
>> > >> even
>> > >> > > > being supported. From my understanding, a Watermark(T) only
>> > >> suggests that
>> > >> > > > all records with event time before T has arrived, and it has
>> > >> nothing to
>> > >> > > do
>> > >> > > > with whether records with event time after T has arrived or
>> not.
>> > >> From
>> > >> > > that
>> > >> > > > perspective, preventing certain records from arriving before a
>> > >> Watermark
>> > >> > > is
>> > >> > > > never supported. I also cannot come up with any use case where
>> > >> > > > Strictly-ordered is necessary. This implies the same issue as
>> 1):
>> > >> how
>> > >> > > does
>> > >> > > > the user choose between the two modes?
>> > >> > > >
>> > >> > > > I'd suggest not expose the knob to users and only support
>> > >> Out-of-order,
>> > >> > > > until we see a concrete use case that Strictly-ordered is
>> needed.
>> > >> > > >
>> > >> > > >
>> > >> > > > My philosophies behind not exposing the two config options are:
>> > >> > > > - There are already too many options in Flink that barely know
>> how
>> > >> to use
>> > >> > > > them. I think Flink should try as much as possible to decide
>> its
>> > own
>> > >> > > > behavior, rather than throwing all the decisions to the users.
>> > >> > > > - It's much harder to take back knobs than to introduce them.
>> > >> Therefore,
>> > >> > > > options should be introduced only if concrete use cases are
>> > >> identified.
>> > >> > > >
>> > >> > > > WDYT?
>> > >> > > >
>> > >> > > > Best,
>> > >> > > >
>> > >> > > > Xintong
>> > >> > > >
>> > >> > > >
>> > >> > > >
>> > >> > > > On Fri, Mar 8, 2024 at 2:45 AM Jing Ge
>> <j...@ververica.com.invalid
>> > >
>> > >> > > wrote:
>> > >> > > >
>> > >> > > > > +1 for Gyula's suggestion. I just finished FLIP-423 which
>> > >> introduced
>> > >> > > the
>> > >> > > > > intention of the big change and high level architecture.
>> Great
>> > >> content
>> > >> > > > btw!
>> > >> > > > > The only public interface change for this FLIP is one new
>> config
>> > >> to use
>> > >> > > > > ForSt. It makes sense to have one dedicated discussion thread
>> > for
>> > >> each
>> > >> > > > > concrete system design.
>> > >> > > > >
>> > >> > > > > @Zakelly The links in your mail do not work except the last
>> one,
>> > >> > > because
>> > >> > > > > the FLIP-xxx has been included into the url like
>> > >> > > > >
>> > >> > >
>> > >>
>> >
>> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425
>> > >> > > > .
>> > >> > > > >
>> > >> > > > > NIT fix:
>> > >> > > > >
>> > >> > > > > FLIP-424:
>> > >> > > >
>> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
>> > >> > > > >
>> > >> > > > > FLIP-425:
>> > >> > > >
>> https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h
>> > >> > > > >
>> > >> > > > > FLIP-426:
>> > >> > > >
>> https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf
>> > >> > > > >
>> > >> > > > > FLIP-427:
>> > >> > > >
>> https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft
>> > >> > > > >
>> > >> > > > > FLIP-428:
>> > >> > > >
>> https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b
>> > >> > > > >
>> > >> > > > > Best regards,
>> > >> > > > > Jing
>> > >> > > > >
>> > >> > > > >
>> > >> > > > >
>> > >> > > > >
>> > >> > > > > On Thu, Mar 7, 2024 at 10:14 AM Zakelly Lan <
>> > >> zakelly....@gmail.com>
>> > >> > > > wrote:
>> > >> > > > >
>> > >> > > > > > Hi everyone,
>> > >> > > > > >
>> > >> > > > > > Thank you all for a lively discussion here, and it is a
>> good
>> > >> time to
>> > >> > > > move
>> > >> > > > > > forward to more detailed discussions. Thus we open several
>> > >> threads
>> > >> > > for
>> > >> > > > > > sub-FLIPs:
>> > >> > > > > >
>> > >> > > > > > FLIP-424:
>> > >> > > > >
>> > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
>> > >> > > > > > FLIP-425
>> > >> > > > > > <
>> > >> > > > >
>> > >> > >
>> > >>
>> >
>> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425
>> > >> > > > >:
>> > >> > > > > >
>> > >> https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h
>> > >> > > > > > FLIP-426
>> > >> > > > > > <
>> > >> > > > >
>> > >> > >
>> > >>
>> >
>> https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0hFLIP-426
>> > >> > > > >:
>> > >> > > > > >
>> > >> https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf
>> > >> > > > > > FLIP-427
>> > >> > > > > > <
>> > >> > > > >
>> > >> > >
>> > >>
>> >
>> https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrfFLIP-427
>> > >> > > > >:
>> > >> > > > > >
>> > >> https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft
>> > >> > > > > > FLIP-428
>> > >> > > > > > <
>> > >> > > > >
>> > >> > >
>> > >>
>> >
>> https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ftFLIP-428
>> > >> > > > >:
>> > >> > > > > >
>> > >> https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b
>> > >> > > > > >
>> > >> > > > > > If you want to talk about the overall architecture,
>> roadmap,
>> > >> > > milestones
>> > >> > > > > or
>> > >> > > > > > something related with multiple FLIPs, please post it here.
>> > >> Otherwise
>> > >> > > > you
>> > >> > > > > > can discuss some details in separate mails. Let's try to
>> avoid
>> > >> > > repeated
>> > >> > > > > > discussion in different threads. I will sync important
>> > messages
>> > >> here
>> > >> > > if
>> > >> > > > > > there are any in the above threads.
>> > >> > > > > >
>> > >> > > > > > And reply to @Jeyhun: We will ensure the content between
>> those
>> > >> FLIPs
>> > >> > > is
>> > >> > > > > > consistent.
>> > >> > > > > >
>> > >> > > > > >
>> > >> > > > > > Best,
>> > >> > > > > > Zakelly
>> > >> > > > > >
>> > >> > > > > > On Thu, Mar 7, 2024 at 2:16 PM Yuan Mei <
>> > yuanmei.w...@gmail.com
>> > >> >
>> > >> > > > wrote:
>> > >> > > > > >
>> > >> > > > > > > I have been a bit busy these few weeks and sorry for
>> > >> responding
>> > >> > > late.
>> > >> > > > > > >
>> > >> > > > > > > The original thinking of keeping discussion within one
>> > thread
>> > >> is
>> > >> > > for
>> > >> > > > > > easier
>> > >> > > > > > > tracking and avoid for repeated discussion in different
>> > >> threads.
>> > >> > > > > > >
>> > >> > > > > > > For details, It might be good to start in different
>> threads
>> > if
>> > >> > > > needed.
>> > >> > > > > > >
>> > >> > > > > > > We will think of a way to better organize the discussion.
>> > >> > > > > > >
>> > >> > > > > > > Best
>> > >> > > > > > > Yuan
>> > >> > > > > > >
>> > >> > > > > > >
>> > >> > > > > > > On Thu, Mar 7, 2024 at 4:38 AM Jeyhun Karimov <
>> > >> > > je.kari...@gmail.com>
>> > >> > > > > > > wrote:
>> > >> > > > > > >
>> > >> > > > > > > > Hi,
>> > >> > > > > > > >
>> > >> > > > > > > > + 1 for the suggestion.
>> > >> > > > > > > > Maybe we can the discussion with the FLIPs with minimum
>> > >> > > > dependencies
>> > >> > > > > > > (from
>> > >> > > > > > > > the other new/proposed FLIPs).
>> > >> > > > > > > > Based on our discussion on a particular FLIP, the
>> > >> subsequent (or
>> > >> > > > its
>> > >> > > > > > > > dependent) FLIP(s) can be updated accordingly?
>> > >> > > > > > > >
>> > >> > > > > > > > Regards,
>> > >> > > > > > > > Jeyhun
>> > >> > > > > > > >
>> > >> > > > > > > > On Wed, Mar 6, 2024 at 5:34 PM Gyula Fóra <
>> > >> gyula.f...@gmail.com>
>> > >> > > > > > wrote:
>> > >> > > > > > > >
>> > >> > > > > > > > > Hey all!
>> > >> > > > > > > > >
>> > >> > > > > > > > > This is a massive improvement / work. I just started
>> > going
>> > >> > > > through
>> > >> > > > > > the
>> > >> > > > > > > > > Flips and have a more or less meta comment.
>> > >> > > > > > > > >
>> > >> > > > > > > > > While it's good to keep the overall architecture
>> > >> discussion
>> > >> > > > here, I
>> > >> > > > > > > think
>> > >> > > > > > > > > we should still have separate discussions for each
>> FLIP
>> > >> where
>> > >> > > we
>> > >> > > > > can
>> > >> > > > > > > > > discuss interface details etc. With so much content
>> if
>> > we
>> > >> start
>> > >> > > > > > adding
>> > >> > > > > > > > > minor comments here that will lead to nowhere but
>> those
>> > >> > > > discussions
>> > >> > > > > > are
>> > >> > > > > > > > > still important and we should have them in separate
>> > >> threads
>> > >> > > (one
>> > >> > > > > for
>> > >> > > > > > > each
>> > >> > > > > > > > > FLIP)
>> > >> > > > > > > > >
>> > >> > > > > > > > > What do you think?
>> > >> > > > > > > > > Gyula
>> > >> > > > > > > > >
>> > >> > > > > > > > > On Wed, Mar 6, 2024 at 8:50 AM Yanfei Lei <
>> > >> fredia...@gmail.com
>> > >> > > >
>> > >> > > > > > wrote:
>> > >> > > > > > > > >
>> > >> > > > > > > > > > Hi team,
>> > >> > > > > > > > > >
>> > >> > > > > > > > > > Thanks for your discussion. Regarding FLIP-425, we
>> > have
>> > >> > > > > > supplemented
>> > >> > > > > > > > > > several updates to answer high-frequency questions:
>> > >> > > > > > > > > >
>> > >> > > > > > > > > > 1. We captured a flame graph of the Hashmap state
>> > >> backend in
>> > >> > > > > > > > > > "Synchronous execution with asynchronous APIs"[1],
>> > which
>> > >> > > > reveals
>> > >> > > > > > that
>> > >> > > > > > > > > > the framework overhead (including reference
>> counting,
>> > >> > > > > > future-related
>> > >> > > > > > > > > > code and so on) consumes about 9% of the keyed
>> > operator
>> > >> CPU
>> > >> > > > time.
>> > >> > > > > > > > > > 2. We added a set of comparative experiments for
>> > >> watermark
>> > >> > > > > > > processing,
>> > >> > > > > > > > > > the performance of Out-Of-Order mode is 70% better
>> > than
>> > >> > > > > > > > > > strictly-ordered mode under ~140MB state size.
>> > >> Instructions
>> > >> > > on
>> > >> > > > > how
>> > >> > > > > > to
>> > >> > > > > > > > > > run this test have also been added[2].
>> > >> > > > > > > > > > 3. Regarding the order of StreamRecord, whether it
>> has
>> > >> state
>> > >> > > > > access
>> > >> > > > > > > or
>> > >> > > > > > > > > > not. We supplemented a new *Strict order of
>> > >> > > > 'processElement'*[3].
>> > >> > > > > > > > > >
>> > >> > > > > > > > > > [1]
>> > >> > > > > > > > > >
>> > >> > > > > > > > >
>> > >> > > > > > > >
>> > >> > > > > > >
>> > >> > > > > >
>> > >> > > > >
>> > >> > > >
>> > >> > >
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-SynchronousexecutionwithasynchronousAPIs
>> > >> > > > > > > > > > [2]
>> > >> > > > > > > > > >
>> > >> > > > > > > > >
>> > >> > > > > > > >
>> > >> > > > > > >
>> > >> > > > > >
>> > >> > > > >
>> > >> > > >
>> > >> > >
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Strictly-orderedmodevs.Out-of-ordermodeforwatermarkprocessing
>> > >> > > > > > > > > > [3]
>> > >> > > > > > > > > >
>> > >> > > > > > > > >
>> > >> > > > > > > >
>> > >> > > > > > >
>> > >> > > > > >
>> > >> > > > >
>> > >> > > >
>> > >> > >
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ElementOrder
>> > >> > > > > > > > > >
>> > >> > > > > > > > > >
>> > >> > > > > > > > > > Best regards,
>> > >> > > > > > > > > > Yanfei
>> > >> > > > > > > > > >
>> > >> > > > > > > > > > Yunfeng Zhou <flink.zhouyunf...@gmail.com>
>> > 于2024年3月5日周二
>> > >> > > > 09:25写道:
>> > >> > > > > > > > > > >
>> > >> > > > > > > > > > > Hi Zakelly,
>> > >> > > > > > > > > > >
>> > >> > > > > > > > > > > > 5. I'm not very sure ... revisiting this later
>> > >> since it
>> > >> > > is
>> > >> > > > > not
>> > >> > > > > > > > > > important.
>> > >> > > > > > > > > > >
>> > >> > > > > > > > > > > It seems that we still have some details to
>> confirm
>> > >> about
>> > >> > > > this
>> > >> > > > > > > > > > > question. Let's postpone this to after the
>> critical
>> > >> parts
>> > >> > > of
>> > >> > > > > the
>> > >> > > > > > > > > > > design are settled.
>> > >> > > > > > > > > > >
>> > >> > > > > > > > > > > > 8. Yes, we had considered ... metrics should be
>> > like
>> > >> > > > > > afterwards.
>> > >> > > > > > > > > > >
>> > >> > > > > > > > > > > Oh sorry I missed FLIP-431. I'm fine with
>> discussing
>> > >> this
>> > >> > > > topic
>> > >> > > > > > in
>> > >> > > > > > > > > > milestone 2.
>> > >> > > > > > > > > > >
>> > >> > > > > > > > > > > Looking forward to the detailed design about the
>> > >> strict
>> > >> > > mode
>> > >> > > > > > > between
>> > >> > > > > > > > > > > same-key records and the benchmark results about
>> the
>> > >> epoch
>> > >> > > > > > > mechanism.
>> > >> > > > > > > > > > >
>> > >> > > > > > > > > > > Best regards,
>> > >> > > > > > > > > > > Yunfeng
>> > >> > > > > > > > > > >
>> > >> > > > > > > > > > > On Mon, Mar 4, 2024 at 7:59 PM Zakelly Lan <
>> > >> > > > > > zakelly....@gmail.com>
>> > >> > > > > > > > > > wrote:
>> > >> > > > > > > > > > > >
>> > >> > > > > > > > > > > > Hi Yunfeng,
>> > >> > > > > > > > > > > >
>> > >> > > > > > > > > > > > For 1:
>> > >> > > > > > > > > > > > I had a discussion with Lincoln Lee, and I
>> realize
>> > >> it is
>> > >> > > a
>> > >> > > > > > common
>> > >> > > > > > > > > case
>> > >> > > > > > > > > > the same-key record should be blocked before the
>> > >> > > > > `processElement`.
>> > >> > > > > > It
>> > >> > > > > > > > is
>> > >> > > > > > > > > > easier for users to understand. Thus I will
>> introduce
>> > a
>> > >> > > strict
>> > >> > > > > mode
>> > >> > > > > > > for
>> > >> > > > > > > > > > this and make it default. My rough idea is just
>> like
>> > >> yours,
>> > >> > > by
>> > >> > > > > > > invoking
>> > >> > > > > > > > > > some method of AEC instance before
>> `processElement`.
>> > The
>> > >> > > > detailed
>> > >> > > > > > > > design
>> > >> > > > > > > > > > will be described in FLIP later.
>> > >> > > > > > > > > > > >
>> > >> > > > > > > > > > > > For 2:
>> > >> > > > > > > > > > > > I agree with you. We could throw exceptions for
>> > now
>> > >> and
>> > >> > > > > > optimize
>> > >> > > > > > > > this
>> > >> > > > > > > > > > later.
>> > >> > > > > > > > > > > >
>> > >> > > > > > > > > > > > For 5:
>> > >> > > > > > > > > > > >>
>> > >> > > > > > > > > > > >> It might be better to move the default values
>> to
>> > >> the
>> > >> > > > > Proposed
>> > >> > > > > > > > > Changes
>> > >> > > > > > > > > > > >> section instead of making them public for
>> now, as
>> > >> there
>> > >> > > > will
>> > >> > > > > > be
>> > >> > > > > > > > > > > >> compatibility issues once we want to
>> dynamically
>> > >> adjust
>> > >> > > > the
>> > >> > > > > > > > > thresholds
>> > >> > > > > > > > > > > >> and timeouts in future.
>> > >> > > > > > > > > > > >
>> > >> > > > > > > > > > > > Agreed. The whole framework is under experiment
>> > >> until we
>> > >> > > > > think
>> > >> > > > > > it
>> > >> > > > > > > > is
>> > >> > > > > > > > > > complete in 2.0 or later. The default value should
>> be
>> > >> better
>> > >> > > > > > > determined
>> > >> > > > > > > > > > with more testing results and production
>> experience.
>> > >> > > > > > > > > > > >
>> > >> > > > > > > > > > > >> The configuration
>> execution.async-state.enabled
>> > >> seems
>> > >> > > > > > > unnecessary,
>> > >> > > > > > > > > as
>> > >> > > > > > > > > > > >> the infrastructure may automatically get this
>> > >> > > information
>> > >> > > > > from
>> > >> > > > > > > the
>> > >> > > > > > > > > > > >> detailed state backend configurations. We may
>> > >> revisit
>> > >> > > this
>> > >> > > > > > part
>> > >> > > > > > > > > after
>> > >> > > > > > > > > > > >> the core designs have reached an agreement.
>> WDYT?
>> > >> > > > > > > > > > > >
>> > >> > > > > > > > > > > >
>> > >> > > > > > > > > > > > I'm not very sure if there is any use case
>> where
>> > >> users
>> > >> > > > write
>> > >> > > > > > > their
>> > >> > > > > > > > > > code using async APIs but run their job in a
>> > >> synchronous way.
>> > >> > > > The
>> > >> > > > > > > first
>> > >> > > > > > > > > two
>> > >> > > > > > > > > > scenarios that come to me are for benchmarking or
>> for
>> > a
>> > >> small
>> > >> > > > > > state,
>> > >> > > > > > > > > while
>> > >> > > > > > > > > > they don't want to rewrite their code. Actually it
>> is
>> > >> easy to
>> > >> > > > > > > support,
>> > >> > > > > > > > so
>> > >> > > > > > > > > > I'd suggest providing it. But I'm fine with
>> revisiting
>> > >> this
>> > >> > > > later
>> > >> > > > > > > since
>> > >> > > > > > > > > it
>> > >> > > > > > > > > > is not important. WDYT?
>> > >> > > > > > > > > > > >
>> > >> > > > > > > > > > > > For 8:
>> > >> > > > > > > > > > > > Yes, we had considered the I/O metrics group
>> > >> especially
>> > >> > > the
>> > >> > > > > > > > > > back-pressure, idle and task busy per second. In
>> the
>> > >> current
>> > >> > > > plan
>> > >> > > > > > we
>> > >> > > > > > > > can
>> > >> > > > > > > > > do
>> > >> > > > > > > > > > state access during back-pressure, meaning that
>> those
>> > >> metrics
>> > >> > > > for
>> > >> > > > > > > input
>> > >> > > > > > > > > > would better be redefined. I suggest we discuss
>> these
>> > >> > > existing
>> > >> > > > > > > metrics
>> > >> > > > > > > > as
>> > >> > > > > > > > > > well as some new metrics that should be introduced
>> in
>> > >> > > FLIP-431
>> > >> > > > > > later
>> > >> > > > > > > in
>> > >> > > > > > > > > > milestone 2, since we have basically finished the
>> > >> framework
>> > >> > > > thus
>> > >> > > > > we
>> > >> > > > > > > > will
>> > >> > > > > > > > > > have a better view of what metrics should be like
>> > >> afterwards.
>> > >> > > > > WDYT?
>> > >> > > > > > > > > > > >
>> > >> > > > > > > > > > > >
>> > >> > > > > > > > > > > > Best,
>> > >> > > > > > > > > > > > Zakelly
>> > >> > > > > > > > > > > >
>> > >> > > > > > > > > > > > On Mon, Mar 4, 2024 at 6:49 PM Yunfeng Zhou <
>> > >> > > > > > > > > > flink.zhouyunf...@gmail.com> wrote:
>> > >> > > > > > > > > > > >>
>> > >> > > > > > > > > > > >> Hi Zakelly,
>> > >> > > > > > > > > > > >>
>> > >> > > > > > > > > > > >> Thanks for the responses!
>> > >> > > > > > > > > > > >>
>> > >> > > > > > > > > > > >> > 1. I will discuss this with some expert SQL
>> > >> > > developers.
>> > >> > > > > ...
>> > >> > > > > > > mode
>> > >> > > > > > > > > > for StreamRecord processing.
>> > >> > > > > > > > > > > >>
>> > >> > > > > > > > > > > >> In DataStream API there should also be use
>> cases
>> > >> when
>> > >> > > the
>> > >> > > > > > order
>> > >> > > > > > > of
>> > >> > > > > > > > > > > >> output is strictly required. I agree with it
>> that
>> > >> SQL
>> > >> > > > > experts
>> > >> > > > > > > may
>> > >> > > > > > > > > help
>> > >> > > > > > > > > > > >> provide more concrete use cases that can
>> > >> accelerate our
>> > >> > > > > > > > discussion,
>> > >> > > > > > > > > > > >> but please allow me to search for DataStream
>> use
>> > >> cases
>> > >> > > > that
>> > >> > > > > > can
>> > >> > > > > > > > > prove
>> > >> > > > > > > > > > > >> the necessity of this strict order
>> preservation
>> > >> mode, if
>> > >> > > > > > answers
>> > >> > > > > > > > > from
>> > >> > > > > > > > > > > >> SQL experts are shown to be negative.
>> > >> > > > > > > > > > > >>
>> > >> > > > > > > > > > > >> For your convenience, my current rough idea is
>> > >> that we
>> > >> > > can
>> > >> > > > > > add a
>> > >> > > > > > > > > > > >> module between the Input(s) and
>> processElement()
>> > >> module
>> > >> > > in
>> > >> > > > > > Fig 2
>> > >> > > > > > > > of
>> > >> > > > > > > > > > > >> FLIP-425. The module will be responsible for
>> > >> caching
>> > >> > > > records
>> > >> > > > > > > whose
>> > >> > > > > > > > > > > >> keys collide with in-flight records, and AEC
>> will
>> > >> only
>> > >> > > be
>> > >> > > > > > > > > responsible
>> > >> > > > > > > > > > > >> for handling async state calls, without
>> knowing
>> > the
>> > >> > > record
>> > >> > > > > > each
>> > >> > > > > > > > call
>> > >> > > > > > > > > > > >> belongs to. We may revisit this topic once the
>> > >> necessity
>> > >> > > > of
>> > >> > > > > > the
>> > >> > > > > > > > > strict
>> > >> > > > > > > > > > > >> order mode is clarified.
>> > >> > > > > > > > > > > >>
>> > >> > > > > > > > > > > >>
>> > >> > > > > > > > > > > >> > 2. The amount of parallel StateRequests ...
>> > >> instead of
>> > >> > > > > > > invoking
>> > >> > > > > > > > > > yield
>> > >> > > > > > > > > > > >>
>> > >> > > > > > > > > > > >> Your suggestions generally appeal to me. I
>> think
>> > >> we may
>> > >> > > > let
>> > >> > > > > > > > > > > >> corresponding Flink jobs fail with OOM for
>> now,
>> > >> since
>> > >> > > the
>> > >> > > > > > > majority
>> > >> > > > > > > > > of
>> > >> > > > > > > > > > > >> a StateRequest should just be references to
>> > >> existing
>> > >> > > Java
>> > >> > > > > > > objects,
>> > >> > > > > > > > > > > >> which only occupies very small memory space
>> and
>> > can
>> > >> > > hardly
>> > >> > > > > > cause
>> > >> > > > > > > > OOM
>> > >> > > > > > > > > > > >> in common cases. We can monitor the pending
>> > >> > > StateRequests
>> > >> > > > > and
>> > >> > > > > > if
>> > >> > > > > > > > > there
>> > >> > > > > > > > > > > >> is really a risk of OOM in extreme cases, we
>> can
>> > >> throw
>> > >> > > > > > > Exceptions
>> > >> > > > > > > > > with
>> > >> > > > > > > > > > > >> proper messages notifying users what to do,
>> like
>> > >> > > > increasing
>> > >> > > > > > > memory
>> > >> > > > > > > > > > > >> through configurations.
>> > >> > > > > > > > > > > >>
>> > >> > > > > > > > > > > >> Your suggestions to adjust threshold
>> adaptively
>> > or
>> > >> to
>> > >> > > use
>> > >> > > > > the
>> > >> > > > > > > > > blocking
>> > >> > > > > > > > > > > >> buffer sounds good, and in my opinion we can
>> > >> postpone
>> > >> > > them
>> > >> > > > > to
>> > >> > > > > > > > future
>> > >> > > > > > > > > > > >> FLIPs since they seem to only benefit users in
>> > rare
>> > >> > > cases.
>> > >> > > > > > Given
>> > >> > > > > > > > > that
>> > >> > > > > > > > > > > >> FLIP-423~428 has already been a big enough
>> > design,
>> > >> it
>> > >> > > > might
>> > >> > > > > be
>> > >> > > > > > > > > better
>> > >> > > > > > > > > > > >> to focus on the most critical design for now
>> and
>> > >> > > postpone
>> > >> > > > > > > > > > > >> optimizations like this. WDYT?
>> > >> > > > > > > > > > > >>
>> > >> > > > > > > > > > > >>
>> > >> > > > > > > > > > > >> > 5. Sure, we will introduce new configs as
>> well
>> > as
>> > >> > > their
>> > >> > > > > > > default
>> > >> > > > > > > > > > value.
>> > >> > > > > > > > > > > >>
>> > >> > > > > > > > > > > >> Thanks for adding the default values and the
>> > values
>> > >> > > > > themselves
>> > >> > > > > > > > LGTM.
>> > >> > > > > > > > > > > >> It might be better to move the default values
>> to
>> > >> the
>> > >> > > > > Proposed
>> > >> > > > > > > > > Changes
>> > >> > > > > > > > > > > >> section instead of making them public for
>> now, as
>> > >> there
>> > >> > > > will
>> > >> > > > > > be
>> > >> > > > > > > > > > > >> compatibility issues once we want to
>> dynamically
>> > >> adjust
>> > >> > > > the
>> > >> > > > > > > > > thresholds
>> > >> > > > > > > > > > > >> and timeouts in future.
>> > >> > > > > > > > > > > >>
>> > >> > > > > > > > > > > >> The configuration
>> execution.async-state.enabled
>> > >> seems
>> > >> > > > > > > unnecessary,
>> > >> > > > > > > > > as
>> > >> > > > > > > > > > > >> the infrastructure may automatically get this
>> > >> > > information
>> > >> > > > > from
>> > >> > > > > > > the
>> > >> > > > > > > > > > > >> detailed state backend configurations. We may
>> > >> revisit
>> > >> > > this
>> > >> > > > > > part
>> > >> > > > > > > > > after
>> > >> > > > > > > > > > > >> the core designs have reached an agreement.
>> WDYT?
>> > >> > > > > > > > > > > >>
>> > >> > > > > > > > > > > >>
>> > >> > > > > > > > > > > >> Besides, inspired by Jeyhun's comments, it
>> comes
>> > >> to me
>> > >> > > > that
>> > >> > > > > > > > > > > >>
>> > >> > > > > > > > > > > >> 8. Should this FLIP introduce metrics that
>> > measure
>> > >> the
>> > >> > > > time
>> > >> > > > > a
>> > >> > > > > > > > Flink
>> > >> > > > > > > > > > > >> job is back-pressured by State IOs? Under the
>> > >> current
>> > >> > > > > design,
>> > >> > > > > > > this
>> > >> > > > > > > > > > > >> metric could measure the time when the
>> blocking
>> > >> buffer
>> > >> > > is
>> > >> > > > > full
>> > >> > > > > > > and
>> > >> > > > > > > > > > > >> yield() cannot get callbacks to process, which
>> > >> means the
>> > >> > > > > > > operator
>> > >> > > > > > > > is
>> > >> > > > > > > > > > > >> fully waiting for state responses.
>> > >> > > > > > > > > > > >>
>> > >> > > > > > > > > > > >> Best regards,
>> > >> > > > > > > > > > > >> Yunfeng
>> > >> > > > > > > > > > > >>
>> > >> > > > > > > > > > > >> On Mon, Mar 4, 2024 at 12:33 PM Zakelly Lan <
>> > >> > > > > > > > zakelly....@gmail.com>
>> > >> > > > > > > > > > wrote:
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> > Hi Yunfeng,
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> > Thanks for your detailed comments!
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >> 1. Why do we need a close() method on
>> > >> StateIterator?
>> > >> > > > This
>> > >> > > > > > > > method
>> > >> > > > > > > > > > seems
>> > >> > > > > > > > > > > >> >> unused in the usage example codes.
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> > The `close()` is introduced to release
>> internal
>> > >> > > > resources,
>> > >> > > > > > but
>> > >> > > > > > > > it
>> > >> > > > > > > > > > does not seem to require the user to call it. I
>> > removed
>> > >> this.
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >> 2. In FutureUtils.combineAll()'s JavaDoc,
>> it
>> > is
>> > >> > > stated
>> > >> > > > > that
>> > >> > > > > > > "No
>> > >> > > > > > > > > > null
>> > >> > > > > > > > > > > >> >> entries are allowed". It might be better to
>> > >> further
>> > >> > > > > explain
>> > >> > > > > > > > what
>> > >> > > > > > > > > > will
>> > >> > > > > > > > > > > >> >> happen if a null value is passed, ignoring
>> the
>> > >> value
>> > >> > > in
>> > >> > > > > the
>> > >> > > > > > > > > > returned
>> > >> > > > > > > > > > > >> >> Collection or throwing exceptions. Given
>> that
>> > >> > > > > > > > > > > >> >> FutureUtils.emptyFuture() can be returned
>> in
>> > the
>> > >> > > > example
>> > >> > > > > > > code,
>> > >> > > > > > > > I
>> > >> > > > > > > > > > > >> >> suppose the former one might be correct.
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> > The statement "No null entries are allowed"
>> > >> refers to
>> > >> > > > the
>> > >> > > > > > > > > > parameters, it means some arrayList like [null,
>> > >> StateFuture1,
>> > >> > > > > > > > > StateFuture2]
>> > >> > > > > > > > > > passed in are not allowed, and an Exception will be
>> > >> thrown.
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >> 1. According to Fig 2 of this FLIP, ... .
>> This
>> > >> > > > situation
>> > >> > > > > > > should
>> > >> > > > > > > > > be
>> > >> > > > > > > > > > > >> >> avoided and the order of same-key records
>> > >> should be
>> > >> > > > > > strictly
>> > >> > > > > > > > > > > >> >> preserved.
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> > I will discuss this with some expert SQL
>> > >> developers.
>> > >> > > And
>> > >> > > > > if
>> > >> > > > > > it
>> > >> > > > > > > > is
>> > >> > > > > > > > > > valid and common, I suggest a strict order
>> > preservation
>> > >> mode
>> > >> > > > for
>> > >> > > > > > > > > > StreamRecord processing. WDYT?
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >> 2. The FLIP says that StateRequests
>> submitted
>> > by
>> > >> > > > > Callbacks
>> > >> > > > > > > will
>> > >> > > > > > > > > not
>> > >> > > > > > > > > > > >> >> invoke further yield() methods. Given that
>> > >> yield() is
>> > >> > > > > used
>> > >> > > > > > > when
>> > >> > > > > > > > > > there
>> > >> > > > > > > > > > > >> >> is "too much" in-flight data, does it mean
>> > >> > > > StateRequests
>> > >> > > > > > > > > submitted
>> > >> > > > > > > > > > by
>> > >> > > > > > > > > > > >> >> Callbacks will never be "too much"? What if
>> > the
>> > >> total
>> > >> > > > > > number
>> > >> > > > > > > of
>> > >> > > > > > > > > > > >> >> StateRequests exceed the capacity of Flink
>> > >> operator's
>> > >> > > > > > memory
>> > >> > > > > > > > > space?
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> > The amount of parallel StateRequests for one
>> > >> > > > StreamRecord
>> > >> > > > > > > cannot
>> > >> > > > > > > > > be
>> > >> > > > > > > > > > determined since the code is written by users. So
>> the
>> > >> > > in-flight
>> > >> > > > > > > > requests
>> > >> > > > > > > > > > may be "too much", and may cause OOM. Users should
>> > >> > > re-configure
>> > >> > > > > > their
>> > >> > > > > > > > > job,
>> > >> > > > > > > > > > controlling the amount of on-going StreamRecord.
>> And I
>> > >> > > suggest
>> > >> > > > > two
>> > >> > > > > > > ways
>> > >> > > > > > > > > to
>> > >> > > > > > > > > > avoid this:
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> > Adaptively adjust the count of on-going
>> > >> StreamRecord
>> > >> > > > > > according
>> > >> > > > > > > > to
>> > >> > > > > > > > > > historical StateRequests amount.
>> > >> > > > > > > > > > > >> > Also control the max StateRequests that can
>> be
>> > >> > > executed
>> > >> > > > in
>> > >> > > > > > > > > parallel
>> > >> > > > > > > > > > for each StreamRecord, and if it exceeds, put the
>> new
>> > >> > > > > StateRequest
>> > >> > > > > > in
>> > >> > > > > > > > the
>> > >> > > > > > > > > > blocking buffer waiting for execution (instead of
>> > >> invoking
>> > >> > > > > > yield()).
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> > WDYT?
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >> 3.1 I'm concerned that the out-of-order
>> > >> execution
>> > >> > > mode,
>> > >> > > > > > along
>> > >> > > > > > > > > with
>> > >> > > > > > > > > > the
>> > >> > > > > > > > > > > >> >> epoch mechanism, would bring more
>> complexity
>> > to
>> > >> the
>> > >> > > > > > execution
>> > >> > > > > > > > > model
>> > >> > > > > > > > > > > >> >> than the performance improvement it
>> promises.
>> > >> Could
>> > >> > > we
>> > >> > > > > add
>> > >> > > > > > > some
>> > >> > > > > > > > > > > >> >> benchmark results proving the benefit of
>> this
>> > >> mode?
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> > Agreed, will do.
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >> 3.2 The FLIP might need to add a public API
>> > >> section
>> > >> > > > > > > describing
>> > >> > > > > > > > > how
>> > >> > > > > > > > > > > >> >> users or developers can switch between
>> these
>> > two
>> > >> > > > > execution
>> > >> > > > > > > > modes.
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> > Good point. We will add a Public API
>> section.
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >> 3.3 Apart from the watermark and checkpoint
>> > >> mentioned
>> > >> > > > in
>> > >> > > > > > this
>> > >> > > > > > > > > FLIP,
>> > >> > > > > > > > > > > >> >> there are also more other events that might
>> > >> appear in
>> > >> > > > the
>> > >> > > > > > > > stream
>> > >> > > > > > > > > of
>> > >> > > > > > > > > > > >> >> data records. It might be better to
>> generalize
>> > >> the
>> > >> > > > > > execution
>> > >> > > > > > > > mode
>> > >> > > > > > > > > > > >> >> mechanism to handle all possible events.
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> > Yes, I missed this point. Thanks for the
>> > >> reminder.
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >> 4. It might be better to treat
>> > >> callback-handling as a
>> > >> > > > > > > > > > > >> >> MailboxDefaultAction, instead of Mails, to
>> > >> avoid the
>> > >> > > > > > overhead
>> > >> > > > > > > > of
>> > >> > > > > > > > > > > >> >> repeatedly creating Mail objects.
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >  I thought the intermediated wrapper for
>> > >> callback can
>> > >> > > > not
>> > >> > > > > be
>> > >> > > > > > > > > > omitted, since there will be some context switch
>> > before
>> > >> each
>> > >> > > > > > > execution.
>> > >> > > > > > > > > The
>> > >> > > > > > > > > > MailboxDefaultAction in most cases is processInput
>> > >> right?
>> > >> > > While
>> > >> > > > > the
>> > >> > > > > > > > > > callback should be executed with higher priority.
>> I'd
>> > >> suggest
>> > >> > > > not
>> > >> > > > > > > > > changing
>> > >> > > > > > > > > > the basic logic of Mailbox and the default action
>> > since
>> > >> it is
>> > >> > > > > very
>> > >> > > > > > > > > critical
>> > >> > > > > > > > > > for performance. But yes, we will try our best to
>> > avoid
>> > >> > > > creating
>> > >> > > > > > > > > > intermediated objects.
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >> 5. Could this FLIP provide the current
>> default
>> > >> values
>> > >> > > > for
>> > >> > > > > > > > things
>> > >> > > > > > > > > > like
>> > >> > > > > > > > > > > >> >> active buffer size thresholds and timeouts?
>> > >> These
>> > >> > > could
>> > >> > > > > > help
>> > >> > > > > > > > with
>> > >> > > > > > > > > > > >> >> memory consumption and latency analysis.
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> > Sure, we will introduce new configs as well
>> as
>> > >> their
>> > >> > > > > default
>> > >> > > > > > > > > value.
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >> 6. Why do we need to record the hashcode
>> of a
>> > >> record
>> > >> > > in
>> > >> > > > > its
>> > >> > > > > > > > > > > >> >> RecordContext? It seems not used.
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> > The context switch before each callback
>> > execution
>> > >> > > > involves
>> > >> > > > > > > > > > setCurrentKey, where the hashCode is
>> re-calculated. We
>> > >> cache
>> > >> > > it
>> > >> > > > > for
>> > >> > > > > > > > > > accelerating.
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >> 7. In "timers can be stored on the JVM
>> heap or
>> > >> > > > RocksDB",
>> > >> > > > > > the
>> > >> > > > > > > > link
>> > >> > > > > > > > > > > >> >> points to a document in flink-1.15. It
>> might
>> > be
>> > >> > > better
>> > >> > > > to
>> > >> > > > > > > > verify
>> > >> > > > > > > > > > the
>> > >> > > > > > > > > > > >> >> referenced content is still valid in the
>> > latest
>> > >> Flink
>> > >> > > > and
>> > >> > > > > > > > update
>> > >> > > > > > > > > > the
>> > >> > > > > > > > > > > >> >> link accordingly. Same for other
>> references if
>> > >> any.
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> > Thanks for the reminder! Will check.
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> > Thanks a lot & Best,
>> > >> > > > > > > > > > > >> > Zakelly
>> > >> > > > > > > > > > > >> >
>> > >> > > > > > > > > > > >> > On Sat, Mar 2, 2024 at 6:18 AM Jeyhun
>> Karimov <
>> > >> > > > > > > > > je.kari...@gmail.com>
>> > >> > > > > > > > > > wrote:
>> > >> > > > > > > > > > > >> >>
>> > >> > > > > > > > > > > >> >> Hi,
>> > >> > > > > > > > > > > >> >>
>> > >> > > > > > > > > > > >> >> Thanks for the great proposals. I have a
>> few
>> > >> comments
>> > >> > > > > > > comments:
>> > >> > > > > > > > > > > >> >>
>> > >> > > > > > > > > > > >> >> - Backpressure Handling. Flink's original
>> > >> > > backpressure
>> > >> > > > > > > handling
>> > >> > > > > > > > > is
>> > >> > > > > > > > > > quite
>> > >> > > > > > > > > > > >> >> robust and the semantics is quite "simple"
>> > >> (simple is
>> > >> > > > > > > > beautiful).
>> > >> > > > > > > > > > > >> >> This mechanism has proven to perform
>> > >> better/robust
>> > >> > > than
>> > >> > > > > the
>> > >> > > > > > > > other
>> > >> > > > > > > > > > open
>> > >> > > > > > > > > > > >> >> source streaming systems, where they were
>> > >> relying on
>> > >> > > > some
>> > >> > > > > > > > > loopback
>> > >> > > > > > > > > > > >> >> information.
>> > >> > > > > > > > > > > >> >> Now that the proposal also relies on
>> loopback
>> > >> (yield
>> > >> > > in
>> > >> > > > > > this
>> > >> > > > > > > > > > case), it is
>> > >> > > > > > > > > > > >> >> not clear how well the new backpressure
>> > handling
>> > >> > > > proposed
>> > >> > > > > > in
>> > >> > > > > > > > > > FLIP-425 is
>> > >> > > > > > > > > > > >> >> robust and handle fluctuating workloads.
>> > >> > > > > > > > > > > >> >>
>> > >> > > > > > > > > > > >> >> - Watermark/Timer Handling: Similar
>> arguments
>> > >> apply
>> > >> > > for
>> > >> > > > > > > > watermark
>> > >> > > > > > > > > > and timer
>> > >> > > > > > > > > > > >> >> handling. IMHO, we need more benchmarks
>> > showing
>> > >> the
>> > >> > > > > > overhead
>> > >> > > > > > > > > > > >> >> of epoch management with different
>> parameters
>> > >> (e.g.,
>> > >> > > > > window
>> > >> > > > > > > > size,
>> > >> > > > > > > > > > watermark
>> > >> > > > > > > > > > > >> >> strategy, etc)
>> > >> > > > > > > > > > > >> >>
>> > >> > > > > > > > > > > >> >> - DFS consistency guarantees. The proposal
>> in
>> > >> > > FLIP-427
>> > >> > > > is
>> > >> > > > > > > > > > DFS-agnostic.
>> > >> > > > > > > > > > > >> >> However, different cloud providers have
>> > >> different
>> > >> > > > storage
>> > >> > > > > > > > > > consistency
>> > >> > > > > > > > > > > >> >> models.
>> > >> > > > > > > > > > > >> >> How do we want to deal with them?
>> > >> > > > > > > > > > > >> >>
>> > >> > > > > > > > > > > >> >>  Regards,
>> > >> > > > > > > > > > > >> >> Jeyhun
>> > >> > > > > > > > > > > >> >>
>> > >> > > > > > > > > > > >> >>
>> > >> > > > > > > > > > > >> >>
>> > >> > > > > > > > > > > >> >>
>> > >> > > > > > > > > > > >> >> On Fri, Mar 1, 2024 at 6:08 PM Zakelly Lan
>> <
>> > >> > > > > > > > > zakelly....@gmail.com>
>> > >> > > > > > > > > > wrote:
>> > >> > > > > > > > > > > >> >>
>> > >> > > > > > > > > > > >> >> > Thanks Piotr for sharing your thoughts!
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> > I guess it depends how we would like to
>> > treat
>> > >> the
>> > >> > > > local
>> > >> > > > > > > > disks.
>> > >> > > > > > > > > > I've always
>> > >> > > > > > > > > > > >> >> > > thought about them that almost always
>> > >> eventually
>> > >> > > > all
>> > >> > > > > > > state
>> > >> > > > > > > > > > from the DFS
>> > >> > > > > > > > > > > >> >> > > should end up cached in the local
>> disks.
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> > OK I got it. In our proposal we treat
>> local
>> > >> disk as
>> > >> > > > an
>> > >> > > > > > > > optional
>> > >> > > > > > > > > > cache, so
>> > >> > > > > > > > > > > >> >> > the basic design will handle the case
>> with
>> > >> state
>> > >> > > > > residing
>> > >> > > > > > > in
>> > >> > > > > > > > > DFS
>> > >> > > > > > > > > > only. It
>> > >> > > > > > > > > > > >> >> > is a more 'cloud-native' approach that
>> does
>> > >> not
>> > >> > > rely
>> > >> > > > on
>> > >> > > > > > any
>> > >> > > > > > > > > > local storage
>> > >> > > > > > > > > > > >> >> > assumptions, which allow users to
>> > dynamically
>> > >> > > adjust
>> > >> > > > > the
>> > >> > > > > > > > > > capacity or I/O
>> > >> > > > > > > > > > > >> >> > bound of remote storage to gain
>> performance
>> > >> or save
>> > >> > > > the
>> > >> > > > > > > cost,
>> > >> > > > > > > > > > even without
>> > >> > > > > > > > > > > >> >> > a job restart.
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> > In
>> > >> > > > > > > > > > > >> >> > > the currently proposed more fine
>> grained
>> > >> > > solution,
>> > >> > > > > you
>> > >> > > > > > > > make a
>> > >> > > > > > > > > > single
>> > >> > > > > > > > > > > >> >> > > request to DFS per each state access.
>> > >> > > > > > > > > > > >> >> > >
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> > Ah that's not accurate. Actually we
>> buffer
>> > the
>> > >> > > state
>> > >> > > > > > > requests
>> > >> > > > > > > > > > and process
>> > >> > > > > > > > > > > >> >> > them in batch, multiple requests will
>> > >> correspond to
>> > >> > > > one
>> > >> > > > > > DFS
>> > >> > > > > > > > > > access (One
>> > >> > > > > > > > > > > >> >> > block access for multiple keys performed
>> by
>> > >> > > RocksDB).
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> > In that benchmark you mentioned, are you
>> > >> requesting
>> > >> > > > the
>> > >> > > > > > > state
>> > >> > > > > > > > > > > >> >> > > asynchronously from local disks into
>> > >> memory? If
>> > >> > > the
>> > >> > > > > > > benefit
>> > >> > > > > > > > > > comes from
>> > >> > > > > > > > > > > >> >> > > parallel I/O, then I would expect the
>> > >> benefit to
>> > >> > > > > > > > > > disappear/shrink when
>> > >> > > > > > > > > > > >> >> > > running multiple subtasks on the same
>> > >> machine, as
>> > >> > > > > they
>> > >> > > > > > > > would
>> > >> > > > > > > > > > be making
>> > >> > > > > > > > > > > >> >> > > their own parallel requests, right?
>> Also
>> > >> enabling
>> > >> > > > > > > > > > checkpointing would
>> > >> > > > > > > > > > > >> >> > > further cut into the available I/O
>> budget.
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> > That's an interesting topic. Our
>> proposal is
>> > >> > > > > specifically
>> > >> > > > > > > > aimed
>> > >> > > > > > > > > > at the
>> > >> > > > > > > > > > > >> >> > scenario where the machine I/O is not
>> fully
>> > >> loaded
>> > >> > > > but
>> > >> > > > > > the
>> > >> > > > > > > > I/O
>> > >> > > > > > > > > > latency has
>> > >> > > > > > > > > > > >> >> > indeed become a bottleneck for each
>> subtask.
>> > >> While
>> > >> > > > the
>> > >> > > > > > > > > > distributed file
>> > >> > > > > > > > > > > >> >> > system is a prime example of a scenario
>> > >> > > characterized
>> > >> > > > > by
>> > >> > > > > > > > > > abundant and
>> > >> > > > > > > > > > > >> >> > easily scalable I/O bandwidth coupled
>> with
>> > >> higher
>> > >> > > I/O
>> > >> > > > > > > > latency.
>> > >> > > > > > > > > > You may
>> > >> > > > > > > > > > > >> >> > expect to increase the parallelism of a
>> job
>> > to
>> > >> > > > enhance
>> > >> > > > > > the
>> > >> > > > > > > > > > performance as
>> > >> > > > > > > > > > > >> >> > well, but that also brings in more waste
>> of
>> > >> CPU's
>> > >> > > and
>> > >> > > > > > > memory
>> > >> > > > > > > > > for
>> > >> > > > > > > > > > building
>> > >> > > > > > > > > > > >> >> > up more subtasks. This is one drawback
>> for
>> > the
>> > >> > > > > > > > > > computation-storage tightly
>> > >> > > > > > > > > > > >> >> > coupled nodes. While in our proposal, the
>> > >> parallel
>> > >> > > > I/O
>> > >> > > > > > with
>> > >> > > > > > > > all
>> > >> > > > > > > > > > the
>> > >> > > > > > > > > > > >> >> > callbacks still running in one task,
>> > >> pre-allocated
>> > >> > > > > > > > > computational
>> > >> > > > > > > > > > resources
>> > >> > > > > > > > > > > >> >> > are better utilized. It is a much more
>> > >> lightweight
>> > >> > > > way
>> > >> > > > > to
>> > >> > > > > > > > > > perform parallel
>> > >> > > > > > > > > > > >> >> > I/O.
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> > Just with what granularity those async
>> > >> requests
>> > >> > > > should
>> > >> > > > > be
>> > >> > > > > > > > made.
>> > >> > > > > > > > > > > >> >> > > Making state access asynchronous is
>> > >> definitely
>> > >> > > the
>> > >> > > > > > right
>> > >> > > > > > > > way
>> > >> > > > > > > > > > to go!
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> > I think the current proposal is based on
>> > such
>> > >> core
>> > >> > > > > ideas:
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> >    - A pure cloud-native disaggregated
>> > state.
>> > >> > > > > > > > > > > >> >> >    - Fully utilize the given resources
>> and
>> > >> try not
>> > >> > > to
>> > >> > > > > > waste
>> > >> > > > > > > > > them
>> > >> > > > > > > > > > (including
>> > >> > > > > > > > > > > >> >> >    I/O).
>> > >> > > > > > > > > > > >> >> >    - The ability to scale isolated
>> resources
>> > >> (I/O
>> > >> > > or
>> > >> > > > > CPU
>> > >> > > > > > or
>> > >> > > > > > > > > > memory)
>> > >> > > > > > > > > > > >> >> >    independently.
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> > We think a fine-grained granularity is
>> more
>> > >> inline
>> > >> > > > with
>> > >> > > > > > > those
>> > >> > > > > > > > > > ideas,
>> > >> > > > > > > > > > > >> >> > especially without local disk assumptions
>> > and
>> > >> > > without
>> > >> > > > > any
>> > >> > > > > > > > waste
>> > >> > > > > > > > > > of I/O by
>> > >> > > > > > > > > > > >> >> > prefetching. Please note that it is not a
>> > >> > > replacement
>> > >> > > > > of
>> > >> > > > > > > the
>> > >> > > > > > > > > > original local
>> > >> > > > > > > > > > > >> >> > state with synchronous execution. Instead
>> > >> this is a
>> > >> > > > > > > solution
>> > >> > > > > > > > > > embracing the
>> > >> > > > > > > > > > > >> >> > cloud-native era, providing much more
>> > >> scalability
>> > >> > > and
>> > >> > > > > > > > resource
>> > >> > > > > > > > > > efficiency
>> > >> > > > > > > > > > > >> >> > when handling a *huge state*.
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> > What also worries me a lot in this fine
>> > >> grained
>> > >> > > model
>> > >> > > > > is
>> > >> > > > > > > the
>> > >> > > > > > > > > > effect on the
>> > >> > > > > > > > > > > >> >> > > checkpointing times.
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> > Your concerns are very reasonable. Faster
>> > >> > > > checkpointing
>> > >> > > > > > is
>> > >> > > > > > > > > > always a core
>> > >> > > > > > > > > > > >> >> > advantage of disaggregated state, but
>> only
>> > >> for the
>> > >> > > > > async
>> > >> > > > > > > > phase.
>> > >> > > > > > > > > > There will
>> > >> > > > > > > > > > > >> >> > be some complexity introduced by
>> in-flight
>> > >> > > requests,
>> > >> > > > > but
>> > >> > > > > > > I'd
>> > >> > > > > > > > > > suggest a
>> > >> > > > > > > > > > > >> >> > checkpoint containing those in-flight
>> state
>> > >> > > requests
>> > >> > > > as
>> > >> > > > > > > part
>> > >> > > > > > > > of
>> > >> > > > > > > > > > the state,
>> > >> > > > > > > > > > > >> >> > to accelerate the sync phase by skipping
>> the
>> > >> buffer
>> > >> > > > > > > draining.
>> > >> > > > > > > > > > This makes
>> > >> > > > > > > > > > > >> >> > the buffer size have little impact on
>> > >> checkpoint
>> > >> > > > time.
>> > >> > > > > > And
>> > >> > > > > > > > all
>> > >> > > > > > > > > > the changes
>> > >> > > > > > > > > > > >> >> > keep within the execution model we
>> proposed
>> > >> while
>> > >> > > the
>> > >> > > > > > > > > checkpoint
>> > >> > > > > > > > > > barrier
>> > >> > > > > > > > > > > >> >> > alignment or handling will not be
>> touched in
>> > >> our
>> > >> > > > > > proposal,
>> > >> > > > > > > > so I
>> > >> > > > > > > > > > guess
>> > >> > > > > > > > > > > >> >> > the complexity is relatively
>> controllable. I
>> > >> have
>> > >> > > > faith
>> > >> > > > > > in
>> > >> > > > > > > > that
>> > >> > > > > > > > > > :)
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> > Also regarding the overheads, it would be
>> > >> great if
>> > >> > > > you
>> > >> > > > > > > could
>> > >> > > > > > > > > > provide
>> > >> > > > > > > > > > > >> >> > > profiling results of the benchmarks
>> that
>> > you
>> > >> > > > > conducted
>> > >> > > > > > to
>> > >> > > > > > > > > > verify the
>> > >> > > > > > > > > > > >> >> > > results. Or maybe if you could describe
>> > the
>> > >> steps
>> > >> > > > to
>> > >> > > > > > > > > reproduce
>> > >> > > > > > > > > > the
>> > >> > > > > > > > > > > >> >> > results?
>> > >> > > > > > > > > > > >> >> > > Especially "Hashmap (sync)" vs "Hashmap
>> > with
>> > >> > > async
>> > >> > > > > > API".
>> > >> > > > > > > > > > > >> >> > >
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> > Yes we could profile the benchmarks. And
>> for
>> > >> the
>> > >> > > > > > comparison
>> > >> > > > > > > > of
>> > >> > > > > > > > > > "Hashmap
>> > >> > > > > > > > > > > >> >> > (sync)" vs "Hashmap with async API", we
>> > >> conduct a
>> > >> > > > > > Wordcount
>> > >> > > > > > > > job
>> > >> > > > > > > > > > written
>> > >> > > > > > > > > > > >> >> > with async APIs but disabling the async
>> > >> execution
>> > >> > > by
>> > >> > > > > > > directly
>> > >> > > > > > > > > > completing
>> > >> > > > > > > > > > > >> >> > the future using sync state access. This
>> > >> evaluates
>> > >> > > > the
>> > >> > > > > > > > overhead
>> > >> > > > > > > > > > of newly
>> > >> > > > > > > > > > > >> >> > introduced modules like 'AEC' in sync
>> > >> execution
>> > >> > > (even
>> > >> > > > > > > though
>> > >> > > > > > > > > > they are not
>> > >> > > > > > > > > > > >> >> > designed for it). The code will be
>> provided
>> > >> later.
>> > >> > > > For
>> > >> > > > > > > other
>> > >> > > > > > > > > > results of our
>> > >> > > > > > > > > > > >> >> > PoC[1], you can follow the instructions
>> > >> here[2] to
>> > >> > > > > > > reproduce.
>> > >> > > > > > > > > > Since the
>> > >> > > > > > > > > > > >> >> > compilation may take some effort, we will
>> > >> directly
>> > >> > > > > > provide
>> > >> > > > > > > > the
>> > >> > > > > > > > > > jar for
>> > >> > > > > > > > > > > >> >> > testing next week.
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> > And @Yunfeng Zhou, I have noticed your
>> mail
>> > >> but it
>> > >> > > > is a
>> > >> > > > > > bit
>> > >> > > > > > > > > late
>> > >> > > > > > > > > > in my
>> > >> > > > > > > > > > > >> >> > local time and the next few days are
>> > >> weekends. So I
>> > >> > > > > will
>> > >> > > > > > > > reply
>> > >> > > > > > > > > > to you
>> > >> > > > > > > > > > > >> >> > later. Thanks for your response!
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> > [1]
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > >
>> > >> > > > > > > > >
>> > >> > > > > > > >
>> > >> > > > > > >
>> > >> > > > > >
>> > >> > > > >
>> > >> > > >
>> > >> > >
>> > >>
>> >
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-PoCResults
>> > >> > > > > > > > > > > >> >> > [2]
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > >
>> > >> > > > > > > > >
>> > >> > > > > > > >
>> > >> > > > > > >
>> > >> > > > > >
>> > >> > > > >
>> > >> > > >
>> > >> > >
>> > >>
>> >
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-Appendix:HowtorunthePoC
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> > Best,
>> > >> > > > > > > > > > > >> >> > Zakelly
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> > On Fri, Mar 1, 2024 at 6:38 PM Yunfeng
>> Zhou
>> > <
>> > >> > > > > > > > > > flink.zhouyunf...@gmail.com>
>> > >> > > > > > > > > > > >> >> > wrote:
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > > > >> >> > > Hi,
>> > >> > > > > > > > > > > >> >> > >
>> > >> > > > > > > > > > > >> >> > > Thanks for proposing this design! I
>> just
>> > >> read
>> > >> > > > > FLIP-424
>> > >> > > > > > > and
>> > >> > > > > > > > > > FLIP-425
>> > >> > > > > > > > > > > >> >> > > and have some questions about the
>> proposed
>> > >> > > changes.
>> > >> > > > > > > > > > > >> >> > >
>> > >> > > > > > > > > > > >> >> > > For Async API (FLIP-424)
>> > >> > > > > > > > > > > >> >> > >
>> > >> > > > > > > > > > > >> >> > > 1. Why do we need a close() method on
>> > >> > > > StateIterator?
>> > >> > > > > > This
>> > >> > > > > > > > > > method seems
>> > >> > > > > > > > > > > >> >> > > unused in the usage example codes.
>> > >> > > > > > > > > > > >> >> > >
>> > >> > > > > > > > > > > >> >> > > 2. In FutureUtils.combineAll()'s
>> JavaDoc,
>> > >> it is
>> > >> > > > > stated
>> > >> > > > > > > that
>> > >> > > > > > > > > > "No null
>> > >> > > > > > > > > > > >> >> > > entries are allowed". It might be
>> better
>> > to
>> > >> > > further
>> > >> > > > > > > explain
>> > >> > > > > > > > > > what will
>> > >> > > > > > > > > > > >> >> > > happen if a null value is passed,
>> ignoring
>> > >> the
>> > >> > > > value
>> > >> > > > > in
>> > >> > > > > > > the
>> > >> > > > > > > > > > returned
>> > >> > > > > > > > > > > >> >> > > Collection or throwing exceptions.
>> Given
>> > >> that
>> > >> > > > > > > > > > > >> >> > > FutureUtils.emptyFuture() can be
>> returned
>> > >> in the
>> > >> > > > > > example
>> > >> > > > > > > > > code,
>> > >> > > > > > > > > > I
>> > >> > > > > > > > > > > >> >> > > suppose the former one might be
>> correct.
>> > >> > > > > > > > > > > >> >> > >
>> > >> > > > > > > > > > > >> >> > >
>> > >> > > > > > > > > > > >> >> > > For Async Execution (FLIP-425)
>> > >> > > > > > > > > > > >> >> > >
>> > >> > > > > > > > > > > >> >> > > 1. According to Fig 2 of this FLIP, if
>> a
>> > >> recordB
>> > >> > > > has
>> > >> > > > > > its
>> > >> > > > > > > > key
>> > >> > > > > > > > > > collide
>> > >> > > > > > > > > > > >> >> > > with an ongoing recordA, its
>> > >> processElement()
>> > >> > > > method
>> > >> > > > > > can
>> > >> > > > > > > > > still
>> > >> > > > > > > > > > be
>> > >> > > > > > > > > > > >> >> > > triggered immediately, and then it
>> might
>> > be
>> > >> moved
>> > >> > > > to
>> > >> > > > > > the
>> > >> > > > > > > > > > blocking
>> > >> > > > > > > > > > > >> >> > > buffer in AEC if it involves state
>> > >> operations.
>> > >> > > This
>> > >> > > > > > means
>> > >> > > > > > > > > that
>> > >> > > > > > > > > > > >> >> > > recordB's output will precede recordA's
>> > >> output in
>> > >> > > > > > > > downstream
>> > >> > > > > > > > > > > >> >> > > operators, if recordA involves state
>> > >> operations
>> > >> > > > while
>> > >> > > > > > > > recordB
>> > >> > > > > > > > > > does
>> > >> > > > > > > > > > > >> >> > > not. This will harm the correctness of
>> > >> Flink jobs
>> > >> > > > in
>> > >> > > > > > some
>> > >> > > > > > > > use
>> > >> > > > > > > > > > cases.
>> > >> > > > > > > > > > > >> >> > > For example, in dim table join cases,
>> > >> recordA
>> > >> > > could
>> > >> > > > > be
>> > >> > > > > > a
>> > >> > > > > > > > > delete
>> > >> > > > > > > > > > > >> >> > > operation that involves state access,
>> > while
>> > >> > > recordB
>> > >> > > > > > could
>> > >> > > > > > > > be
>> > >> > > > > > > > > > an insert
>> > >> > > > > > > > > > > >> >> > > operation that needs to visit external
>> > >> storage
>> > >> > > > > without
>> > >> > > > > > > > state
>> > >> > > > > > > > > > access.
>> > >> > > > > > > > > > > >> >> > > If recordB's output precedes recordA's,
>> > >> then an
>> > >> > > > entry
>> > >> > > > > > > that
>> > >> > > > > > > > is
>> > >> > > > > > > > > > supposed
>> > >> > > > > > > > > > > >> >> > > to finally exist with recordB's value
>> in
>> > >> the sink
>> > >> > > > > table
>> > >> > > > > > > > might
>> > >> > > > > > > > > > actually
>> > >> > > > > > > > > > > >> >> > > be deleted according to recordA's
>> command.
>> > >> This
>> > >> > > > > > situation
>> > >> > > > > > > > > > should be
>> > >> > > > > > > > > > > >> >> > > avoided and the order of same-key
>> records
>> > >> should
>> > >> > > be
>> > >> > > > > > > > strictly
>> > >> > > > > > > > > > > >> >> > > preserved.
>> > >> > > > > > > > > > > >> >> > >
>> > >> > > > > > > > > > > >> >> > > 2. The FLIP says that StateRequests
>> > >> submitted by
>> > >> > > > > > > Callbacks
>> > >> > > > > > > > > > will not
>> > >> > > > > > > > > > > >> >> > > invoke further yield() methods. Given
>> that
>> > >> > > yield()
>> > >> > > > is
>> > >> > > > > > > used
>> > >> > > > > > > > > > when there
>> > >> > > > > > > > > > > >> >> > > is "too much" in-flight data, does it
>> mean
>> > >> > > > > > StateRequests
>> > >> > > > > > > > > > submitted by
>> > >> > > > > > > > > > > >> >> > > Callbacks will never be "too much"?
>> What
>> > if
>> > >> the
>> > >> > > > total
>> > >> > > > > > > > number
>> > >> > > > > > > > > of
>> > >> > > > > > > > > > > >> >> > > StateRequests exceed the capacity of
>> Flink
>> > >> > > > operator's
>> > >> > > > > > > > memory
>> > >> > > > > > > > > > space?
>> > >> > > > > > > > > > > >> >> > >
>> > >> > > > > > > > > > > >> >> > > 3. In the "Watermark" section, this
>> FLIP
>> > >> provided
>> > >> > > > an
>> > >> > > > > > > > > > out-of-order
>> > >> > > > > > > > > > > >> >> > > execution mode apart from the default
>> > >> > > > > strictly-ordered
>> > >> > > > > > > > mode,
>> > >> > > > > > > > > > which can
>> > >> > > > > > > > > > > >> >> > > optimize performance by allowing more
>> > >> concurrent
>> > >> > > > > > > > executions.
>> > >> > > > > > > > > > > >> >> > >
>> > >> > > > > > > > > > > >> >> > > 3.1 I'm concerned that the out-of-order
>> > >> execution
>> > >> > > > > mode,
>> > >> > > > > > > > along
>> > >> > > > > > > > > > with the
>> > >> > > > > > > > > > > >> >> > > epoch mechanism, would bring more
>> > >> complexity to
>> > >> > > the
>> > >> > > > > > > > execution
>> > >> > > > > > > > > > model
>> > >> > > > > > > > > > > >> >> > > than the performance improvement it
>> > >> promises.
>> > >> > > Could
>> > >> > > > > we
>> > >> > > > > > > add
>> > >> > > > > > > > > some
>> > >> > > > > > > > > > > >> >> > > benchmark results proving the benefit
>> of
>> > >> this
>> > >> > > mode?
>> > >> > > > > > > > > > > >> >> > >
>> > >> > > > > > > > > > > >> >> > > 3.2 The FLIP might need to add a public
>> > API
>> > >> > > section
>> > >> > > > > > > > > describing
>> > >> > > > > > > > > > how
>> > >> > > > > > > > > > > >> >> > > users or developers can switch between
>> > >> these two
>> > >> > > > > > > execution
>> > >> > > > > > > > > > modes.
>> > >> > > > > > > > > > > >> >> > >
>> > >> > > > > > > > > > > >> >> > > 3.3 Apart from the watermark and
>> > checkpoint
>> > >> > > > mentioned
>> > >> > > > > > in
>> > >> > > > > > > > this
>> > >> > > > > > > > > > FLIP,
>> > >> > > > > > > > > > > >> >> > > there are also more other events that
>> > might
>> > >> > > appear
>> > >> > > > in
>> > >> > > > > > the
>> > >> > > > > > > > > > stream of
>> > >> > > > > > > > > > > >> >> > > data records. It might be better to
>> > >> generalize
>> > >> > > the
>> > >> > > > > > > > execution
>> > >> > > > > > > > > > mode
>> > >> > > > > > > > > > > >> >> > > mechanism to handle all possible
>> events.
>> > >> > > > > > > > > > > >> >> > >
>> > >> > > > > > > > > > > >> >> > > 4. It might be better to treat
>> > >> callback-handling
>> > >> > > > as a
>> > >> > > > > > > > > > > >> >> > > MailboxDefaultAction, instead of
>> Mails, to
>> > >> avoid
>> > >> > > > the
>> > >> > > > > > > > overhead
>> > >> > > > > > > > > > of
>> > >> > > > > > > > > > > >> >> > > repeatedly creating Mail objects.
>> > >> > > > > > > > > > > >> >> > >
>> > >> > > > > > > > > > > >> >> > > 5. Could this FLIP provide the current
>> > >> default
>> > >> > > > values
>> > >> > > > > > for
>> > >> > > > > > > > > > things like
>> > >> > > > > > > > > > > >> >> > > active buffer size thresholds and
>> > timeouts?
>> > >> These
>> > >> > > > > could
>> > >> > > > > > > > help
>> > >> > > > > > > > > > with
>> > >> > > > > > > > > > > >> >> > > memory consumption and latency
>> analysis.
>> > >> > > > > > > > > > > >> >> > >
>> > >> > > > > > > > > > > >> >> > > 6. Why do we need to record the
>> hashcode
>> > of
>> > >> a
>> > >> > > > record
>> > >> > > > > in
>> > >> > > > > > > its
>> > >> > > > > > > > > > > >> >> > > RecordContext? It seems not used.
>> > >> > > > > > > > > > > >> >> > >
>> > >> > > > > > > > > > > >> >> > > 7. In "timers can be stored on the JVM
>> > heap
>> > >> or
>> > >> > > > > > RocksDB",
>> > >> > > > > > > > the
>> > >> > > > > > > > > > link
>> > >> > > > > > > > > > > >> >> > > points to a document in flink-1.15. It
>> > >> might be
>> > >> > > > > better
>> > >> > > > > > to
>> > >> > > > > > > > > > verify the
>> > >> > > > > > > > > > > >> >> > > referenced content is still valid in
>> the
>> > >> latest
>> > >> > > > Flink
>> > >> > > > > > and
>> > >> > > > > > > > > > update the
>> > >> > > > > > > > > > > >> >> > > link accordingly. Same for other
>> > references
>> > >> if
>> > >> > > any.
>> > >> > > > > > > > > > > >> >> > >
>> > >> > > > > > > > > > > >> >> > > Best,
>> > >> > > > > > > > > > > >> >> > > Yunfeng Zhou
>> > >> > > > > > > > > > > >> >> > >
>> > >> > > > > > > > > > > >> >> > > On Thu, Feb 29, 2024 at 2:17 PM Yuan
>> Mei <
>> > >> > > > > > > > > > yuanmei.w...@gmail.com> wrote:
>> > >> > > > > > > > > > > >> >> > > >
>> > >> > > > > > > > > > > >> >> > > > Hi Devs,
>> > >> > > > > > > > > > > >> >> > > >
>> > >> > > > > > > > > > > >> >> > > > This is a joint work of Yuan Mei,
>> > Zakelly
>> > >> Lan,
>> > >> > > > > > Jinzhong
>> > >> > > > > > > > Li,
>> > >> > > > > > > > > > Hangxiang
>> > >> > > > > > > > > > > >> >> > Yu,
>> > >> > > > > > > > > > > >> >> > > > Yanfei Lei and Feng Wang. We'd like
>> to
>> > >> start a
>> > >> > > > > > > discussion
>> > >> > > > > > > > > > about
>> > >> > > > > > > > > > > >> >> > > introducing
>> > >> > > > > > > > > > > >> >> > > > Disaggregated State Storage and
>> > >> Management in
>> > >> > > > Flink
>> > >> > > > > > > 2.0.
>> > >> > > > > > > > > > > >> >> > > >
>> > >> > > > > > > > > > > >> >> > > > The past decade has witnessed a
>> dramatic
>> > >> shift
>> > >> > > in
>> > >> > > > > > > Flink's
>> > >> > > > > > > > > > deployment
>> > >> > > > > > > > > > > >> >> > > mode,
>> > >> > > > > > > > > > > >> >> > > > workload patterns, and hardware
>> > >> improvements.
>> > >> > > > We've
>> > >> > > > > > > moved
>> > >> > > > > > > > > > from the
>> > >> > > > > > > > > > > >> >> > > > map-reduce era where workers are
>> > >> > > > > computation-storage
>> > >> > > > > > > > > tightly
>> > >> > > > > > > > > > coupled
>> > >> > > > > > > > > > > >> >> > > nodes
>> > >> > > > > > > > > > > >> >> > > > to a cloud-native world where
>> > >> containerized
>> > >> > > > > > deployments
>> > >> > > > > > > > on
>> > >> > > > > > > > > > Kubernetes
>> > >> > > > > > > > > > > >> >> > > > become standard. To enable Flink's
>> > >> Cloud-Native
>> > >> > > > > > future,
>> > >> > > > > > > > we
>> > >> > > > > > > > > > introduce
>> > >> > > > > > > > > > > >> >> > > > Disaggregated State Storage and
>> > >> Management that
>> > >> > > > > uses
>> > >> > > > > > > DFS
>> > >> > > > > > > > as
>> > >> > > > > > > > > > primary
>> > >> > > > > > > > > > > >> >> > > storage
>> > >> > > > > > > > > > > >> >> > > > in Flink 2.0, as promised in the
>> Flink
>> > 2.0
>> > >> > > > Roadmap.
>> > >> > > > > > > > > > > >> >> > > >
>> > >> > > > > > > > > > > >> >> > > > Design Details can be found in
>> > >> FLIP-423[1].
>> > >> > > > > > > > > > > >> >> > > >
>> > >> > > > > > > > > > > >> >> > > > This new architecture is aimed to
>> solve
>> > >> the
>> > >> > > > > following
>> > >> > > > > > > > > > challenges
>> > >> > > > > > > > > > > >> >> > brought
>> > >> > > > > > > > > > > >> >> > > in
>> > >> > > > > > > > > > > >> >> > > > the cloud-native era for Flink.
>> > >> > > > > > > > > > > >> >> > > > 1. Local Disk Constraints in
>> > >> containerization
>> > >> > > > > > > > > > > >> >> > > > 2. Spiky Resource Usage caused by
>> > >> compaction in
>> > >> > > > the
>> > >> > > > > > > > current
>> > >> > > > > > > > > > state model
>> > >> > > > > > > > > > > >> >> > > > 3. Fast Rescaling for jobs with large
>> > >> states
>> > >> > > > > > (hundreds
>> > >> > > > > > > of
>> > >> > > > > > > > > > Terabytes)
>> > >> > > > > > > > > > > >> >> > > > 4. Light and Fast Checkpoint in a
>> native
>> > >> way
>> > >> > > > > > > > > > > >> >> > > >
>> > >> > > > > > > > > > > >> >> > > > More specifically, we want to reach a
>> > >> consensus
>> > >> > > > on
>> > >> > > > > > the
>> > >> > > > > > > > > > following issues
>> > >> > > > > > > > > > > >> >> > > in
>> > >> > > > > > > > > > > >> >> > > > this discussion:
>> > >> > > > > > > > > > > >> >> > > >
>> > >> > > > > > > > > > > >> >> > > > 1. Overall design
>> > >> > > > > > > > > > > >> >> > > > 2. Proposed Changes
>> > >> > > > > > > > > > > >> >> > > > 3. Design details to achieve
>> Milestone1
>> > >> > > > > > > > > > > >> >> > > >
>> > >> > > > > > > > > > > >> >> > > > In M1, we aim to achieve an
>> end-to-end
>> > >> baseline
>> > >> > > > > > version
>> > >> > > > > > > > > > using DFS as
>> > >> > > > > > > > > > > >> >> > > > primary storage and complete core
>> > >> > > > functionalities,
>> > >> > > > > > > > > including:
>> > >> > > > > > > > > > > >> >> > > >
>> > >> > > > > > > > > > > >> >> > > > - Asynchronous State APIs
>> (FLIP-424)[2]:
>> > >> > > > Introduce
>> > >> > > > > > new
>> > >> > > > > > > > APIs
>> > >> > > > > > > > > > for
>> > >> > > > > > > > > > > >> >> > > > asynchronous state access.
>> > >> > > > > > > > > > > >> >> > > > - Asynchronous Execution Model
>> > >> (FLIP-425)[3]:
>> > >> > > > > > > Implement a
>> > >> > > > > > > > > > non-blocking
>> > >> > > > > > > > > > > >> >> > > > execution model leveraging the
>> > >> asynchronous
>> > >> > > APIs
>> > >> > > > > > > > introduced
>> > >> > > > > > > > > > in
>> > >> > > > > > > > > > > >> >> > FLIP-424.
>> > >> > > > > > > > > > > >> >> > > > - Grouping Remote State Access
>> > >> (FLIP-426)[4]:
>> > >> > > > > Enable
>> > >> > > > > > > > > > retrieval of
>> > >> > > > > > > > > > > >> >> > remote
>> > >> > > > > > > > > > > >> >> > > > state data in batches to avoid
>> > unnecessary
>> > >> > > > > round-trip
>> > >> > > > > > > > costs
>> > >> > > > > > > > > > for remote
>> > >> > > > > > > > > > > >> >> > > > access
>> > >> > > > > > > > > > > >> >> > > > - Disaggregated State Store
>> > (FLIP-427)[5]:
>> > >> > > > > Introduce
>> > >> > > > > > > the
>> > >> > > > > > > > > > initial
>> > >> > > > > > > > > > > >> >> > version
>> > >> > > > > > > > > > > >> >> > > of
>> > >> > > > > > > > > > > >> >> > > > the ForSt disaggregated state store.
>> > >> > > > > > > > > > > >> >> > > > - Fault Tolerance/Rescale Integration
>> > >> > > > > (FLIP-428)[6]:
>> > >> > > > > > > > > > Integrate
>> > >> > > > > > > > > > > >> >> > > > checkpointing mechanisms with the
>> > >> disaggregated
>> > >> > > > > state
>> > >> > > > > > > > store
>> > >> > > > > > > > > > for fault
>> > >> > > > > > > > > > > >> >> > > > tolerance and fast rescaling.
>> > >> > > > > > > > > > > >> >> > > >
>> > >> > > > > > > > > > > >> >> > > > We will vote on each FLIP in separate
>> > >> threads
>> > >> > > to
>> > >> > > > > make
>> > >> > > > > > > > sure
>> > >> > > > > > > > > > each FLIP
>> > >> > > > > > > > > > > >> >> > > > reaches a consensus. But we want to
>> keep
>> > >> the
>> > >> > > > > > discussion
>> > >> > > > > > > > > > within a
>> > >> > > > > > > > > > > >> >> > focused
>> > >> > > > > > > > > > > >> >> > > > thread (this thread) for easier
>> tracking
>> > >> of
>> > >> > > > > contexts
>> > >> > > > > > to
>> > >> > > > > > > > > avoid
>> > >> > > > > > > > > > > >> >> > duplicated
>> > >> > > > > > > > > > > >> >> > > > questions/discussions and also to
>> think
>> > >> of the
>> > >> > > > > > > > > > problem/solution in a
>> > >> > > > > > > > > > > >> >> > full
>> > >> > > > > > > > > > > >> >> > > > picture.
>> > >> > > > > > > > > > > >> >> > > >
>> > >> > > > > > > > > > > >> >> > > > Looking forward to your feedback
>> > >> > > > > > > > > > > >> >> > > >
>> > >> > > > > > > > > > > >> >> > > > Best,
>> > >> > > > > > > > > > > >> >> > > > Yuan, Zakelly, Jinzhong, Hangxiang,
>> > >> Yanfei and
>> > >> > > > Feng
>> > >> > > > > > > > > > > >> >> > > >
>> > >> > > > > > > > > > > >> >> > > > [1]
>> > >> > > https://cwiki.apache.org/confluence/x/R4p3EQ
>> > >> > > > > > > > > > > >> >> > > > [2]
>> > >> > > https://cwiki.apache.org/confluence/x/SYp3EQ
>> > >> > > > > > > > > > > >> >> > > > [3]
>> > >> > > https://cwiki.apache.org/confluence/x/S4p3EQ
>> > >> > > > > > > > > > > >> >> > > > [4]
>> > >> > > https://cwiki.apache.org/confluence/x/TYp3EQ
>> > >> > > > > > > > > > > >> >> > > > [5]
>> > >> > > https://cwiki.apache.org/confluence/x/T4p3EQ
>> > >> > > > > > > > > > > >> >> > > > [6]
>> > >> > > https://cwiki.apache.org/confluence/x/UYp3EQ
>> > >> > > > > > > > > > > >> >> > >
>> > >> > > > > > > > > > > >> >> >
>> > >> > > > > > > > > >
>> > >> > > > > > > > >
>> > >> > > > > > > >
>> > >> > > > > > >
>> > >> > > > > >
>> > >> > > > >
>> > >> > > >
>> > >> > >
>> > >>
>> > >
>> >
>>
>

Reply via email to