Hi devs,

It seems there is no more concern or suggestion for a while. We'd like to
start voting recently.


Best,
Zakelly

On Wed, Mar 27, 2024 at 11:46 AM Zakelly Lan <zakelly....@gmail.com> wrote:

> Hi everyone,
>
> Piotr and I had a long discussion about the checkpoint duration issue. We
> think that the lambda serialization approach I proposed in last mail may
> bring in more issues, the most serious one is that users may not be able to
> modify their code in serialized lambda to perform a bug fix.
>
> But fortunately we found a possible solution. By introducing a set of
> declarative APIs and a `declareProcess()` function that users should
> implement in some newly introduced AbstractStreamOperator, we could get the
> declaration of record processing in runtime, broken down to requests and
> callbacks (lambdas) like FLIP-424 introduced. Thus we avoid the problem of
> lambda (de)serialization and instead we retrieve callbacks every time
> before a task runs. The next step is to provide an API allowing users to
> assign an unique id to each state request and callback, or automatically
> assign one by declaration order. Thus we can find the corresponding
> callback in runtime for each restored state request based on the id, then
> the whole pipeline can be resumed.
>
> Note that all these above are internal and won't expose to average users.
> Exposing this on Stream APIs can be discussed later. I will prepare another
> FLIP(s) describing the whole optimized checkpointing process, and in the
> meantime, we can proceed on current FLIPs. The new FLIP(s) are built on top
> of current ones and we can achieve this incrementally.
>
>
> Best,
> Zakelly
>
> On Thu, Mar 21, 2024 at 12:31 AM Zakelly Lan <zakelly....@gmail.com>
> wrote:
>
>> Hi Piotrek,
>>
>> Thanks for your comments!
>>
>> As we discussed off-line, you agreed that we can not checkpoint while some
>>> records are in the middle of being
>>> processed. That we would have to drain the in-progress records before
>>> doing
>>> the checkpoint. You also argued
>>> that this is not a problem, because the size of this buffer can be
>>> configured.
>>>
>>> I'm really afraid of such a solution. I've seen in the past plenty of
>>> times, that whenever Flink has to drain some
>>> buffered records, eventually that always brakes timely checkpointing (and
>>> hence ability for Flink to rescale in
>>> a timely manner). Even a single record with a `flatMap` like operator
>>> currently in Flink causes problems during
>>> back pressure. That's especially true for example for processing
>>> watermarks. At the same time, I don't see how
>>> this value could be configured by even Flink's power users, let alone an
>>> average user. The size of that in-flight
>>> buffer not only depends on a particular query/job, but also the "good"
>>> value changes dynamically over time,
>>> and can change very rapidly. Sudden spikes of records or backpressure,
>>> some
>>> hiccup during emitting watermarks,
>>> all of those could change in an instant the theoretically optimal buffer
>>> size of let's say "6000" records, down to "1".
>>> And when those changes happen, those are the exact times when timely
>>> checkpointing matters the most.
>>> If the load pattern suddenly changes, and checkpointing takes suddenly
>>> tens
>>> of minutes instead of a couple of
>>> seconds, it means you can not use rescaling and you are forced to
>>> overprovision the resources. And there also
>>> other issues if checkpointing takes too long.
>>>
>>
>> I'm gonna clarify some misunderstanding here. First of all, is the sync
>> phase of checkpointing for the current plan longer than the synchronous
>> execution model? The answer is yes, it is a trade-off for parallel
>> execution model. I think the cost is worth the improvement. Now the
>> question is, how much longer are we talking about? The PoC result I
>> provided is that it takes 3 seconds to drain 6000 records of a simple job,
>> and I said it is not a big deal. Even though you would say we may encounter
>> long watermark/timer processing that make the cp wait, thus I provide
>> several ways to optimize this:
>>
>>    1. Instead of only controlling the in-flight records, we could
>>    control the in-flight watermark.
>>    2. Since we have broken down the record processing into several state
>>    requests with at most one subsequent callback for each request, the cp can
>>    be processed after current RUNNING requests (NOT records) and their
>>    callbacks finish. Which means, even though we have a lot of records
>>    in-flight (I mean in 'processElement' here), once only a small group of
>>    state requests finishes, the cp can proceed. They will form into 1 or 2
>>    multiGets to rocksdb, which takes less time. Moreover, the timer 
>> processing
>>    is also split into several requests, so cp won't wait for the whole timer
>>    to finish. The picture attached describes this idea.
>>
>> And the size of this buffer can be configured. I'm not counting on
>> average users to configure it well, I'm just saying that we'd better not
>> focus on absolute numbers of PoC or specific cases since we can always
>> provide a conservative default value to make this acceptable for most
>> cases.The adaptive buffer size is also worth a try if we provide a
>> conservative strategy.
>>
>> Besides, I don't understand why the load pattern or spikes or
>> backpressure will affect this. We are controlling the records that can get
>> in the 'processElement' and the state requests that can fire in parallel,
>> no matter how high the load spikes, they will be blocked outside. It is
>> relatively stable within the proposed execution model itself. The unaligned
>> barrier will skip those inputs in the queue as before.
>>
>>
>> At the same time, I still don't understand why we can not implement things
>>> incrementally? First
>>> let's start with the current API, without the need to rewrite all of the
>>> operators, we can asynchronously fetch whole
>>> state for a given record using its key. That should already vastly
>>> improve
>>> many things, and this way we could
>>> perform a checkpoint without a need of draining the in-progress/in-flight
>>> buffer. We could roll that version out,
>>> test it out in practice, and then we could see if the fine grained state
>>> access is really needed. Otherwise it sounds
>>> to me like a premature optimization, that requires us to not only
>>> rewrite a
>>> lot of the code, but also to later maintain
>>> it, even if it ultimately proves to be not needed. Which I of course can
>>> not be certain but I have a feeling that it
>>> might be the case.
>>
>>
>> The disaggregated state management we proposed is target at including but
>> not limited to the following challenges:
>>
>>    1. Local disk constraints, including limited I/O and space.
>>    (discussed in FLIP-423)
>>    2. Unbind the I/O resource with pre-allocated CPU resource, to make
>>    good use of both (motivation of FLIP-424)
>>    3. Elasticity of scaling I/O or storage capacity.
>>
>> Thus our plan is dependent on DFS, using local disk as an optional cache
>> only. The pre-fetching plan you mentioned is still binding I/O with CPU
>> resources, and will consume even more I/O to load unnecessary state. It
>> makes things worse. Please note that we are not targeting some scenarios
>> where the local state could handle well, and our goal is not to replace the
>> local state.
>>
>> And If manpower is a big concern of yours, I would say many of my
>> colleagues could help contribute in runtime or SQL operators. It is
>> experimental on a separate code path other than the local state and will be
>> recommended to use only when we prove it mature.
>>
>>
>> Thanks & Best,
>> Zakelly
>>
>> On Wed, Mar 20, 2024 at 10:04 PM Piotr Nowojski <pnowoj...@apache.org>
>> wrote:
>>
>>> Hey Zakelly!
>>>
>>> Sorry for the late reply. I still have concerns about the proposed
>>> solution, with my main concerns coming from
>>> the implications of the asynchronous state access API on the
>>> checkpointing
>>> and responsiveness of Flink.
>>>
>>> >> What also worries me a lot in this fine grained model is the effect on
>>> the checkpointing times.
>>> >
>>> > Your concerns are very reasonable. Faster checkpointing is always a
>>> core
>>> advantage of disaggregated state,
>>> > but only for the async phase. There will be some complexity introduced
>>> by
>>> in-flight requests, but I'd suggest
>>> > a checkpoint containing those in-flight state requests as part of the
>>> state, to accelerate the sync phase by
>>> > skipping the buffer draining. This makes the buffer size have little
>>> impact on checkpoint time. And all the
>>> > changes keep within the execution model we proposed while the
>>> checkpoint
>>> barrier alignment or handling
>>> > will not be touched in our proposal, so I guess the complexity is
>>> relatively controllable. I have faith in that :)
>>>
>>> As we discussed off-line, you agreed that we can not checkpoint while
>>> some
>>> records are in the middle of being
>>> processed. That we would have to drain the in-progress records before
>>> doing
>>> the checkpoint. You also argued
>>> that this is not a problem, because the size of this buffer can be
>>> configured.
>>>
>>> I'm really afraid of such a solution. I've seen in the past plenty of
>>> times, that whenever Flink has to drain some
>>> buffered records, eventually that always brakes timely checkpointing (and
>>> hence ability for Flink to rescale in
>>> a timely manner). Even a single record with a `flatMap` like operator
>>> currently in Flink causes problems during
>>> back pressure. That's especially true for example for processing
>>> watermarks. At the same time, I don't see how
>>> this value could be configured by even Flink's power users, let alone an
>>> average user. The size of that in-flight
>>> buffer not only depends on a particular query/job, but also the "good"
>>> value changes dynamically over time,
>>> and can change very rapidly. Sudden spikes of records or backpressure,
>>> some
>>> hiccup during emitting watermarks,
>>> all of those could change in an instant the theoretically optimal buffer
>>> size of let's say "6000" records, down to "1".
>>> And when those changes happen, those are the exact times when timely
>>> checkpointing matters the most.
>>> If the load pattern suddenly changes, and checkpointing takes suddenly
>>> tens
>>> of minutes instead of a couple of
>>> seconds, it means you can not use rescaling and you are forced to
>>> overprovision the resources. And there also
>>> other issues if checkpointing takes too long.
>>>
>>> At the same time, I still don't understand why we can not implement
>>> things
>>> incrementally? First
>>> let's start with the current API, without the need to rewrite all of the
>>> operators, we can asynchronously fetch whole
>>> state for a given record using its key. That should already vastly
>>> improve
>>> many things, and this way we could
>>> perform a checkpoint without a need of draining the in-progress/in-flight
>>> buffer. We could roll that version out,
>>> test it out in practice, and then we could see if the fine grained state
>>> access is really needed. Otherwise it sounds
>>> to me like a premature optimization, that requires us to not only
>>> rewrite a
>>> lot of the code, but also to later maintain
>>> it, even if it ultimately proves to be not needed. Which I of course can
>>> not be certain but I have a feeling that it
>>> might be the case.
>>>
>>> Best,
>>> Piotrek
>>>
>>> wt., 19 mar 2024 o 10:42 Zakelly Lan <zakelly....@gmail.com> napisał(a):
>>>
>>> > Hi everyone,
>>> >
>>> > Thanks for your valuable feedback!
>>> >
>>> > Our discussions have been going on for a while and are nearing a
>>> > consensus. So I would like to start a vote after 72 hours.
>>> >
>>> > Please let me know if you have any concerns, thanks!
>>> >
>>> >
>>> > Best,
>>> > Zakelly
>>> >
>>> > On Tue, Mar 19, 2024 at 3:37 PM Zakelly Lan <zakelly....@gmail.com>
>>> wrote:
>>> >
>>> > > Hi Yunfeng,
>>> > >
>>> > > Thanks for the suggestion!
>>> > >
>>> > > I will reorganize the FLIP-425 accordingly.
>>> > >
>>> > >
>>> > > Best,
>>> > > Zakelly
>>> > >
>>> > > On Tue, Mar 19, 2024 at 3:20 PM Yunfeng Zhou <
>>> > flink.zhouyunf...@gmail.com>
>>> > > wrote:
>>> > >
>>> > >> Hi Xintong and Zakelly,
>>> > >>
>>> > >> > 2. Regarding Strictly-ordered and Out-of-order of Watermarks
>>> > >> I agree with it that watermarks can use only out-of-order mode for
>>> > >> now, because there is still not a concrete example showing the
>>> > >> correctness risk about it. However, the strictly-ordered mode should
>>> > >> still be supported as the default option for non-record event types
>>> > >> other than watermark, at least for checkpoint barriers.
>>> > >>
>>> > >> I noticed that this information has already been documented in "For
>>> > >> other non-record events, such as RecordAttributes ...", but it's at
>>> > >> the bottom of the "Watermark" section, which might not be very
>>> > >> obvious. Thus it might be better to reorganize the FLIP to better
>>> > >> claim that the two order modes are designed for all non-record
>>> events,
>>> > >> and which mode this FLIP would choose for each type of event.
>>> > >>
>>> > >> Best,
>>> > >> Yunfeng
>>> > >>
>>> > >> On Tue, Mar 19, 2024 at 1:09 PM Xintong Song <tonysong...@gmail.com
>>> >
>>> > >> wrote:
>>> > >> >
>>> > >> > Thanks for the quick response. Sounds good to me.
>>> > >> >
>>> > >> > Best,
>>> > >> >
>>> > >> > Xintong
>>> > >> >
>>> > >> >
>>> > >> >
>>> > >> > On Tue, Mar 19, 2024 at 1:03 PM Zakelly Lan <
>>> zakelly....@gmail.com>
>>> > >> wrote:
>>> > >> >
>>> > >> > > Hi Xintong,
>>> > >> > >
>>> > >> > > Thanks for sharing your thoughts!
>>> > >> > >
>>> > >> > > 1. Regarding Record-ordered and State-ordered of processElement.
>>> > >> > > >
>>> > >> > > > I understand that while State-ordered likely provides better
>>> > >> performance,
>>> > >> > > > Record-ordered is sometimes required for correctness. The
>>> question
>>> > >> is how
>>> > >> > > > should a user choose between these two modes? My concern is
>>> that
>>> > >> such a
>>> > >> > > > decision may require users to have in-depth knowledge about
>>> the
>>> > >> Flink
>>> > >> > > > internals, and may lead to correctness issues if
>>> State-ordered is
>>> > >> chosen
>>> > >> > > > improperly.
>>> > >> > > >
>>> > >> > > > I'd suggest not to expose such a knob, at least in the first
>>> > >> version.
>>> > >> > > That
>>> > >> > > > means always use Record-ordered for custom operators / UDFs,
>>> and
>>> > >> keep
>>> > >> > > > State-ordered for internal usages (built-in operators) only.
>>> > >> > > >
>>> > >> > >
>>> > >> > > Indeed, users may not be able to choose the mode properly. I
>>> agree
>>> > to
>>> > >> keep
>>> > >> > > such options for internal use.
>>> > >> > >
>>> > >> > >
>>> > >> > > 2. Regarding Strictly-ordered and Out-of-order of Watermarks.
>>> > >> > > >
>>> > >> > > > I'm not entirely sure about Strictly-ordered being the
>>> default, or
>>> > >> even
>>> > >> > > > being supported. From my understanding, a Watermark(T) only
>>> > >> suggests that
>>> > >> > > > all records with event time before T has arrived, and it has
>>> > >> nothing to
>>> > >> > > do
>>> > >> > > > with whether records with event time after T has arrived or
>>> not.
>>> > >> From
>>> > >> > > that
>>> > >> > > > perspective, preventing certain records from arriving before a
>>> > >> Watermark
>>> > >> > > is
>>> > >> > > > never supported. I also cannot come up with any use case where
>>> > >> > > > Strictly-ordered is necessary. This implies the same issue as
>>> 1):
>>> > >> how
>>> > >> > > does
>>> > >> > > > the user choose between the two modes?
>>> > >> > > >
>>> > >> > > > I'd suggest not expose the knob to users and only support
>>> > >> Out-of-order,
>>> > >> > > > until we see a concrete use case that Strictly-ordered is
>>> needed.
>>> > >> > > >
>>> > >> > >
>>> > >> > > The semantics of watermarks do not define the sequence between a
>>> > >> watermark
>>> > >> > > and subsequent records. For the most part, this is
>>> inconsequential,
>>> > >> except
>>> > >> > > it may affect some current users who have previously relied on
>>> the
>>> > >> implicit
>>> > >> > > assumption of an ordered execution. I'd be fine with initially
>>> > >> supporting
>>> > >> > > only out-of-order processing. We may consider exposing the
>>> > >> > > 'Strictly-ordered' mode once we encounter a concrete use case
>>> that
>>> > >> > > necessitates it.
>>> > >> > >
>>> > >> > >
>>> > >> > > My philosophies behind not exposing the two config options are:
>>> > >> > > > - There are already too many options in Flink that barely
>>> know how
>>> > >> to use
>>> > >> > > > them. I think Flink should try as much as possible to decide
>>> its
>>> > own
>>> > >> > > > behavior, rather than throwing all the decisions to the users.
>>> > >> > > > - It's much harder to take back knobs than to introduce them.
>>> > >> Therefore,
>>> > >> > > > options should be introduced only if concrete use cases are
>>> > >> identified.
>>> > >> > > >
>>> > >> > >
>>> > >> > > I agree to keep minimal configurable items especially for the
>>> MVP.
>>> > >> Given
>>> > >> > > that we have the opportunity to refine the functionality before
>>> the
>>> > >> > > framework transitions from @Experimental to @PublicEvolving, it
>>> > makes
>>> > >> sense
>>> > >> > > to refrain from presenting user-facing options until we have
>>> ensured
>>> > >> > > their necessity.
>>> > >> > >
>>> > >> > >
>>> > >> > > Best,
>>> > >> > > Zakelly
>>> > >> > >
>>> > >> > > On Tue, Mar 19, 2024 at 12:06 PM Xintong Song <
>>> > tonysong...@gmail.com>
>>> > >> > > wrote:
>>> > >> > >
>>> > >> > > > Sorry for joining the discussion late.
>>> > >> > > >
>>> > >> > > > I have two questions about FLIP-425.
>>> > >> > > >
>>> > >> > > > 1. Regarding Record-ordered and State-ordered of
>>> processElement.
>>> > >> > > >
>>> > >> > > > I understand that while State-ordered likely provides better
>>> > >> performance,
>>> > >> > > > Record-ordered is sometimes required for correctness. The
>>> question
>>> > >> is how
>>> > >> > > > should a user choose between these two modes? My concern is
>>> that
>>> > >> such a
>>> > >> > > > decision may require users to have in-depth knowledge about
>>> the
>>> > >> Flink
>>> > >> > > > internals, and may lead to correctness issues if
>>> State-ordered is
>>> > >> chosen
>>> > >> > > > improperly.
>>> > >> > > >
>>> > >> > > > I'd suggest not to expose such a knob, at least in the first
>>> > >> version.
>>> > >> > > That
>>> > >> > > > means always use Record-ordered for custom operators / UDFs,
>>> and
>>> > >> keep
>>> > >> > > > State-ordered for internal usages (built-in operators) only.
>>> > >> > > >
>>> > >> > > > 2. Regarding Strictly-ordered and Out-of-order of Watermarks.
>>> > >> > > >
>>> > >> > > > I'm not entirely sure about Strictly-ordered being the
>>> default, or
>>> > >> even
>>> > >> > > > being supported. From my understanding, a Watermark(T) only
>>> > >> suggests that
>>> > >> > > > all records with event time before T has arrived, and it has
>>> > >> nothing to
>>> > >> > > do
>>> > >> > > > with whether records with event time after T has arrived or
>>> not.
>>> > >> From
>>> > >> > > that
>>> > >> > > > perspective, preventing certain records from arriving before a
>>> > >> Watermark
>>> > >> > > is
>>> > >> > > > never supported. I also cannot come up with any use case where
>>> > >> > > > Strictly-ordered is necessary. This implies the same issue as
>>> 1):
>>> > >> how
>>> > >> > > does
>>> > >> > > > the user choose between the two modes?
>>> > >> > > >
>>> > >> > > > I'd suggest not expose the knob to users and only support
>>> > >> Out-of-order,
>>> > >> > > > until we see a concrete use case that Strictly-ordered is
>>> needed.
>>> > >> > > >
>>> > >> > > >
>>> > >> > > > My philosophies behind not exposing the two config options
>>> are:
>>> > >> > > > - There are already too many options in Flink that barely
>>> know how
>>> > >> to use
>>> > >> > > > them. I think Flink should try as much as possible to decide
>>> its
>>> > own
>>> > >> > > > behavior, rather than throwing all the decisions to the users.
>>> > >> > > > - It's much harder to take back knobs than to introduce them.
>>> > >> Therefore,
>>> > >> > > > options should be introduced only if concrete use cases are
>>> > >> identified.
>>> > >> > > >
>>> > >> > > > WDYT?
>>> > >> > > >
>>> > >> > > > Best,
>>> > >> > > >
>>> > >> > > > Xintong
>>> > >> > > >
>>> > >> > > >
>>> > >> > > >
>>> > >> > > > On Fri, Mar 8, 2024 at 2:45 AM Jing Ge
>>> <j...@ververica.com.invalid
>>> > >
>>> > >> > > wrote:
>>> > >> > > >
>>> > >> > > > > +1 for Gyula's suggestion. I just finished FLIP-423 which
>>> > >> introduced
>>> > >> > > the
>>> > >> > > > > intention of the big change and high level architecture.
>>> Great
>>> > >> content
>>> > >> > > > btw!
>>> > >> > > > > The only public interface change for this FLIP is one new
>>> config
>>> > >> to use
>>> > >> > > > > ForSt. It makes sense to have one dedicated discussion
>>> thread
>>> > for
>>> > >> each
>>> > >> > > > > concrete system design.
>>> > >> > > > >
>>> > >> > > > > @Zakelly The links in your mail do not work except the last
>>> one,
>>> > >> > > because
>>> > >> > > > > the FLIP-xxx has been included into the url like
>>> > >> > > > >
>>> > >> > >
>>> > >>
>>> >
>>> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425
>>> > >> > > > .
>>> > >> > > > >
>>> > >> > > > > NIT fix:
>>> > >> > > > >
>>> > >> > > > > FLIP-424:
>>> > >> > > >
>>> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
>>> > >> > > > >
>>> > >> > > > > FLIP-425:
>>> > >> > > >
>>> https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h
>>> > >> > > > >
>>> > >> > > > > FLIP-426:
>>> > >> > > >
>>> https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf
>>> > >> > > > >
>>> > >> > > > > FLIP-427:
>>> > >> > > >
>>> https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft
>>> > >> > > > >
>>> > >> > > > > FLIP-428:
>>> > >> > > >
>>> https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b
>>> > >> > > > >
>>> > >> > > > > Best regards,
>>> > >> > > > > Jing
>>> > >> > > > >
>>> > >> > > > >
>>> > >> > > > >
>>> > >> > > > >
>>> > >> > > > > On Thu, Mar 7, 2024 at 10:14 AM Zakelly Lan <
>>> > >> zakelly....@gmail.com>
>>> > >> > > > wrote:
>>> > >> > > > >
>>> > >> > > > > > Hi everyone,
>>> > >> > > > > >
>>> > >> > > > > > Thank you all for a lively discussion here, and it is a
>>> good
>>> > >> time to
>>> > >> > > > move
>>> > >> > > > > > forward to more detailed discussions. Thus we open several
>>> > >> threads
>>> > >> > > for
>>> > >> > > > > > sub-FLIPs:
>>> > >> > > > > >
>>> > >> > > > > > FLIP-424:
>>> > >> > > > >
>>> > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
>>> > >> > > > > > FLIP-425
>>> > >> > > > > > <
>>> > >> > > > >
>>> > >> > >
>>> > >>
>>> >
>>> https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425
>>> > >> > > > >:
>>> > >> > > > > >
>>> > >> https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h
>>> > >> > > > > > FLIP-426
>>> > >> > > > > > <
>>> > >> > > > >
>>> > >> > >
>>> > >>
>>> >
>>> https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0hFLIP-426
>>> > >> > > > >:
>>> > >> > > > > >
>>> > >> https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf
>>> > >> > > > > > FLIP-427
>>> > >> > > > > > <
>>> > >> > > > >
>>> > >> > >
>>> > >>
>>> >
>>> https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrfFLIP-427
>>> > >> > > > >:
>>> > >> > > > > >
>>> > >> https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft
>>> > >> > > > > > FLIP-428
>>> > >> > > > > > <
>>> > >> > > > >
>>> > >> > >
>>> > >>
>>> >
>>> https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ftFLIP-428
>>> > >> > > > >:
>>> > >> > > > > >
>>> > >> https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b
>>> > >> > > > > >
>>> > >> > > > > > If you want to talk about the overall architecture,
>>> roadmap,
>>> > >> > > milestones
>>> > >> > > > > or
>>> > >> > > > > > something related with multiple FLIPs, please post it
>>> here.
>>> > >> Otherwise
>>> > >> > > > you
>>> > >> > > > > > can discuss some details in separate mails. Let's try to
>>> avoid
>>> > >> > > repeated
>>> > >> > > > > > discussion in different threads. I will sync important
>>> > messages
>>> > >> here
>>> > >> > > if
>>> > >> > > > > > there are any in the above threads.
>>> > >> > > > > >
>>> > >> > > > > > And reply to @Jeyhun: We will ensure the content between
>>> those
>>> > >> FLIPs
>>> > >> > > is
>>> > >> > > > > > consistent.
>>> > >> > > > > >
>>> > >> > > > > >
>>> > >> > > > > > Best,
>>> > >> > > > > > Zakelly
>>> > >> > > > > >
>>> > >> > > > > > On Thu, Mar 7, 2024 at 2:16 PM Yuan Mei <
>>> > yuanmei.w...@gmail.com
>>> > >> >
>>> > >> > > > wrote:
>>> > >> > > > > >
>>> > >> > > > > > > I have been a bit busy these few weeks and sorry for
>>> > >> responding
>>> > >> > > late.
>>> > >> > > > > > >
>>> > >> > > > > > > The original thinking of keeping discussion within one
>>> > thread
>>> > >> is
>>> > >> > > for
>>> > >> > > > > > easier
>>> > >> > > > > > > tracking and avoid for repeated discussion in different
>>> > >> threads.
>>> > >> > > > > > >
>>> > >> > > > > > > For details, It might be good to start in different
>>> threads
>>> > if
>>> > >> > > > needed.
>>> > >> > > > > > >
>>> > >> > > > > > > We will think of a way to better organize the
>>> discussion.
>>> > >> > > > > > >
>>> > >> > > > > > > Best
>>> > >> > > > > > > Yuan
>>> > >> > > > > > >
>>> > >> > > > > > >
>>> > >> > > > > > > On Thu, Mar 7, 2024 at 4:38 AM Jeyhun Karimov <
>>> > >> > > je.kari...@gmail.com>
>>> > >> > > > > > > wrote:
>>> > >> > > > > > >
>>> > >> > > > > > > > Hi,
>>> > >> > > > > > > >
>>> > >> > > > > > > > + 1 for the suggestion.
>>> > >> > > > > > > > Maybe we can the discussion with the FLIPs with
>>> minimum
>>> > >> > > > dependencies
>>> > >> > > > > > > (from
>>> > >> > > > > > > > the other new/proposed FLIPs).
>>> > >> > > > > > > > Based on our discussion on a particular FLIP, the
>>> > >> subsequent (or
>>> > >> > > > its
>>> > >> > > > > > > > dependent) FLIP(s) can be updated accordingly?
>>> > >> > > > > > > >
>>> > >> > > > > > > > Regards,
>>> > >> > > > > > > > Jeyhun
>>> > >> > > > > > > >
>>> > >> > > > > > > > On Wed, Mar 6, 2024 at 5:34 PM Gyula Fóra <
>>> > >> gyula.f...@gmail.com>
>>> > >> > > > > > wrote:
>>> > >> > > > > > > >
>>> > >> > > > > > > > > Hey all!
>>> > >> > > > > > > > >
>>> > >> > > > > > > > > This is a massive improvement / work. I just started
>>> > going
>>> > >> > > > through
>>> > >> > > > > > the
>>> > >> > > > > > > > > Flips and have a more or less meta comment.
>>> > >> > > > > > > > >
>>> > >> > > > > > > > > While it's good to keep the overall architecture
>>> > >> discussion
>>> > >> > > > here, I
>>> > >> > > > > > > think
>>> > >> > > > > > > > > we should still have separate discussions for each
>>> FLIP
>>> > >> where
>>> > >> > > we
>>> > >> > > > > can
>>> > >> > > > > > > > > discuss interface details etc. With so much content
>>> if
>>> > we
>>> > >> start
>>> > >> > > > > > adding
>>> > >> > > > > > > > > minor comments here that will lead to nowhere but
>>> those
>>> > >> > > > discussions
>>> > >> > > > > > are
>>> > >> > > > > > > > > still important and we should have them in separate
>>> > >> threads
>>> > >> > > (one
>>> > >> > > > > for
>>> > >> > > > > > > each
>>> > >> > > > > > > > > FLIP)
>>> > >> > > > > > > > >
>>> > >> > > > > > > > > What do you think?
>>> > >> > > > > > > > > Gyula
>>> > >> > > > > > > > >
>>> > >> > > > > > > > > On Wed, Mar 6, 2024 at 8:50 AM Yanfei Lei <
>>> > >> fredia...@gmail.com
>>> > >> > > >
>>> > >> > > > > > wrote:
>>> > >> > > > > > > > >
>>> > >> > > > > > > > > > Hi team,
>>> > >> > > > > > > > > >
>>> > >> > > > > > > > > > Thanks for your discussion. Regarding FLIP-425, we
>>> > have
>>> > >> > > > > > supplemented
>>> > >> > > > > > > > > > several updates to answer high-frequency
>>> questions:
>>> > >> > > > > > > > > >
>>> > >> > > > > > > > > > 1. We captured a flame graph of the Hashmap state
>>> > >> backend in
>>> > >> > > > > > > > > > "Synchronous execution with asynchronous APIs"[1],
>>> > which
>>> > >> > > > reveals
>>> > >> > > > > > that
>>> > >> > > > > > > > > > the framework overhead (including reference
>>> counting,
>>> > >> > > > > > future-related
>>> > >> > > > > > > > > > code and so on) consumes about 9% of the keyed
>>> > operator
>>> > >> CPU
>>> > >> > > > time.
>>> > >> > > > > > > > > > 2. We added a set of comparative experiments for
>>> > >> watermark
>>> > >> > > > > > > processing,
>>> > >> > > > > > > > > > the performance of Out-Of-Order mode is 70% better
>>> > than
>>> > >> > > > > > > > > > strictly-ordered mode under ~140MB state size.
>>> > >> Instructions
>>> > >> > > on
>>> > >> > > > > how
>>> > >> > > > > > to
>>> > >> > > > > > > > > > run this test have also been added[2].
>>> > >> > > > > > > > > > 3. Regarding the order of StreamRecord, whether
>>> it has
>>> > >> state
>>> > >> > > > > access
>>> > >> > > > > > > or
>>> > >> > > > > > > > > > not. We supplemented a new *Strict order of
>>> > >> > > > 'processElement'*[3].
>>> > >> > > > > > > > > >
>>> > >> > > > > > > > > > [1]
>>> > >> > > > > > > > > >
>>> > >> > > > > > > > >
>>> > >> > > > > > > >
>>> > >> > > > > > >
>>> > >> > > > > >
>>> > >> > > > >
>>> > >> > > >
>>> > >> > >
>>> > >>
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-SynchronousexecutionwithasynchronousAPIs
>>> > >> > > > > > > > > > [2]
>>> > >> > > > > > > > > >
>>> > >> > > > > > > > >
>>> > >> > > > > > > >
>>> > >> > > > > > >
>>> > >> > > > > >
>>> > >> > > > >
>>> > >> > > >
>>> > >> > >
>>> > >>
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Strictly-orderedmodevs.Out-of-ordermodeforwatermarkprocessing
>>> > >> > > > > > > > > > [3]
>>> > >> > > > > > > > > >
>>> > >> > > > > > > > >
>>> > >> > > > > > > >
>>> > >> > > > > > >
>>> > >> > > > > >
>>> > >> > > > >
>>> > >> > > >
>>> > >> > >
>>> > >>
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ElementOrder
>>> > >> > > > > > > > > >
>>> > >> > > > > > > > > >
>>> > >> > > > > > > > > > Best regards,
>>> > >> > > > > > > > > > Yanfei
>>> > >> > > > > > > > > >
>>> > >> > > > > > > > > > Yunfeng Zhou <flink.zhouyunf...@gmail.com>
>>> > 于2024年3月5日周二
>>> > >> > > > 09:25写道:
>>> > >> > > > > > > > > > >
>>> > >> > > > > > > > > > > Hi Zakelly,
>>> > >> > > > > > > > > > >
>>> > >> > > > > > > > > > > > 5. I'm not very sure ... revisiting this later
>>> > >> since it
>>> > >> > > is
>>> > >> > > > > not
>>> > >> > > > > > > > > > important.
>>> > >> > > > > > > > > > >
>>> > >> > > > > > > > > > > It seems that we still have some details to
>>> confirm
>>> > >> about
>>> > >> > > > this
>>> > >> > > > > > > > > > > question. Let's postpone this to after the
>>> critical
>>> > >> parts
>>> > >> > > of
>>> > >> > > > > the
>>> > >> > > > > > > > > > > design are settled.
>>> > >> > > > > > > > > > >
>>> > >> > > > > > > > > > > > 8. Yes, we had considered ... metrics should
>>> be
>>> > like
>>> > >> > > > > > afterwards.
>>> > >> > > > > > > > > > >
>>> > >> > > > > > > > > > > Oh sorry I missed FLIP-431. I'm fine with
>>> discussing
>>> > >> this
>>> > >> > > > topic
>>> > >> > > > > > in
>>> > >> > > > > > > > > > milestone 2.
>>> > >> > > > > > > > > > >
>>> > >> > > > > > > > > > > Looking forward to the detailed design about the
>>> > >> strict
>>> > >> > > mode
>>> > >> > > > > > > between
>>> > >> > > > > > > > > > > same-key records and the benchmark results
>>> about the
>>> > >> epoch
>>> > >> > > > > > > mechanism.
>>> > >> > > > > > > > > > >
>>> > >> > > > > > > > > > > Best regards,
>>> > >> > > > > > > > > > > Yunfeng
>>> > >> > > > > > > > > > >
>>> > >> > > > > > > > > > > On Mon, Mar 4, 2024 at 7:59 PM Zakelly Lan <
>>> > >> > > > > > zakelly....@gmail.com>
>>> > >> > > > > > > > > > wrote:
>>> > >> > > > > > > > > > > >
>>> > >> > > > > > > > > > > > Hi Yunfeng,
>>> > >> > > > > > > > > > > >
>>> > >> > > > > > > > > > > > For 1:
>>> > >> > > > > > > > > > > > I had a discussion with Lincoln Lee, and I
>>> realize
>>> > >> it is
>>> > >> > > a
>>> > >> > > > > > common
>>> > >> > > > > > > > > case
>>> > >> > > > > > > > > > the same-key record should be blocked before the
>>> > >> > > > > `processElement`.
>>> > >> > > > > > It
>>> > >> > > > > > > > is
>>> > >> > > > > > > > > > easier for users to understand. Thus I will
>>> introduce
>>> > a
>>> > >> > > strict
>>> > >> > > > > mode
>>> > >> > > > > > > for
>>> > >> > > > > > > > > > this and make it default. My rough idea is just
>>> like
>>> > >> yours,
>>> > >> > > by
>>> > >> > > > > > > invoking
>>> > >> > > > > > > > > > some method of AEC instance before
>>> `processElement`.
>>> > The
>>> > >> > > > detailed
>>> > >> > > > > > > > design
>>> > >> > > > > > > > > > will be described in FLIP later.
>>> > >> > > > > > > > > > > >
>>> > >> > > > > > > > > > > > For 2:
>>> > >> > > > > > > > > > > > I agree with you. We could throw exceptions
>>> for
>>> > now
>>> > >> and
>>> > >> > > > > > optimize
>>> > >> > > > > > > > this
>>> > >> > > > > > > > > > later.
>>> > >> > > > > > > > > > > >
>>> > >> > > > > > > > > > > > For 5:
>>> > >> > > > > > > > > > > >>
>>> > >> > > > > > > > > > > >> It might be better to move the default
>>> values to
>>> > >> the
>>> > >> > > > > Proposed
>>> > >> > > > > > > > > Changes
>>> > >> > > > > > > > > > > >> section instead of making them public for
>>> now, as
>>> > >> there
>>> > >> > > > will
>>> > >> > > > > > be
>>> > >> > > > > > > > > > > >> compatibility issues once we want to
>>> dynamically
>>> > >> adjust
>>> > >> > > > the
>>> > >> > > > > > > > > thresholds
>>> > >> > > > > > > > > > > >> and timeouts in future.
>>> > >> > > > > > > > > > > >
>>> > >> > > > > > > > > > > > Agreed. The whole framework is under
>>> experiment
>>> > >> until we
>>> > >> > > > > think
>>> > >> > > > > > it
>>> > >> > > > > > > > is
>>> > >> > > > > > > > > > complete in 2.0 or later. The default value
>>> should be
>>> > >> better
>>> > >> > > > > > > determined
>>> > >> > > > > > > > > > with more testing results and production
>>> experience.
>>> > >> > > > > > > > > > > >
>>> > >> > > > > > > > > > > >> The configuration
>>> execution.async-state.enabled
>>> > >> seems
>>> > >> > > > > > > unnecessary,
>>> > >> > > > > > > > > as
>>> > >> > > > > > > > > > > >> the infrastructure may automatically get this
>>> > >> > > information
>>> > >> > > > > from
>>> > >> > > > > > > the
>>> > >> > > > > > > > > > > >> detailed state backend configurations. We may
>>> > >> revisit
>>> > >> > > this
>>> > >> > > > > > part
>>> > >> > > > > > > > > after
>>> > >> > > > > > > > > > > >> the core designs have reached an agreement.
>>> WDYT?
>>> > >> > > > > > > > > > > >
>>> > >> > > > > > > > > > > >
>>> > >> > > > > > > > > > > > I'm not very sure if there is any use case
>>> where
>>> > >> users
>>> > >> > > > write
>>> > >> > > > > > > their
>>> > >> > > > > > > > > > code using async APIs but run their job in a
>>> > >> synchronous way.
>>> > >> > > > The
>>> > >> > > > > > > first
>>> > >> > > > > > > > > two
>>> > >> > > > > > > > > > scenarios that come to me are for benchmarking or
>>> for
>>> > a
>>> > >> small
>>> > >> > > > > > state,
>>> > >> > > > > > > > > while
>>> > >> > > > > > > > > > they don't want to rewrite their code. Actually
>>> it is
>>> > >> easy to
>>> > >> > > > > > > support,
>>> > >> > > > > > > > so
>>> > >> > > > > > > > > > I'd suggest providing it. But I'm fine with
>>> revisiting
>>> > >> this
>>> > >> > > > later
>>> > >> > > > > > > since
>>> > >> > > > > > > > > it
>>> > >> > > > > > > > > > is not important. WDYT?
>>> > >> > > > > > > > > > > >
>>> > >> > > > > > > > > > > > For 8:
>>> > >> > > > > > > > > > > > Yes, we had considered the I/O metrics group
>>> > >> especially
>>> > >> > > the
>>> > >> > > > > > > > > > back-pressure, idle and task busy per second. In
>>> the
>>> > >> current
>>> > >> > > > plan
>>> > >> > > > > > we
>>> > >> > > > > > > > can
>>> > >> > > > > > > > > do
>>> > >> > > > > > > > > > state access during back-pressure, meaning that
>>> those
>>> > >> metrics
>>> > >> > > > for
>>> > >> > > > > > > input
>>> > >> > > > > > > > > > would better be redefined. I suggest we discuss
>>> these
>>> > >> > > existing
>>> > >> > > > > > > metrics
>>> > >> > > > > > > > as
>>> > >> > > > > > > > > > well as some new metrics that should be
>>> introduced in
>>> > >> > > FLIP-431
>>> > >> > > > > > later
>>> > >> > > > > > > in
>>> > >> > > > > > > > > > milestone 2, since we have basically finished the
>>> > >> framework
>>> > >> > > > thus
>>> > >> > > > > we
>>> > >> > > > > > > > will
>>> > >> > > > > > > > > > have a better view of what metrics should be like
>>> > >> afterwards.
>>> > >> > > > > WDYT?
>>> > >> > > > > > > > > > > >
>>> > >> > > > > > > > > > > >
>>> > >> > > > > > > > > > > > Best,
>>> > >> > > > > > > > > > > > Zakelly
>>> > >> > > > > > > > > > > >
>>> > >> > > > > > > > > > > > On Mon, Mar 4, 2024 at 6:49 PM Yunfeng Zhou <
>>> > >> > > > > > > > > > flink.zhouyunf...@gmail.com> wrote:
>>> > >> > > > > > > > > > > >>
>>> > >> > > > > > > > > > > >> Hi Zakelly,
>>> > >> > > > > > > > > > > >>
>>> > >> > > > > > > > > > > >> Thanks for the responses!
>>> > >> > > > > > > > > > > >>
>>> > >> > > > > > > > > > > >> > 1. I will discuss this with some expert SQL
>>> > >> > > developers.
>>> > >> > > > > ...
>>> > >> > > > > > > mode
>>> > >> > > > > > > > > > for StreamRecord processing.
>>> > >> > > > > > > > > > > >>
>>> > >> > > > > > > > > > > >> In DataStream API there should also be use
>>> cases
>>> > >> when
>>> > >> > > the
>>> > >> > > > > > order
>>> > >> > > > > > > of
>>> > >> > > > > > > > > > > >> output is strictly required. I agree with it
>>> that
>>> > >> SQL
>>> > >> > > > > experts
>>> > >> > > > > > > may
>>> > >> > > > > > > > > help
>>> > >> > > > > > > > > > > >> provide more concrete use cases that can
>>> > >> accelerate our
>>> > >> > > > > > > > discussion,
>>> > >> > > > > > > > > > > >> but please allow me to search for DataStream
>>> use
>>> > >> cases
>>> > >> > > > that
>>> > >> > > > > > can
>>> > >> > > > > > > > > prove
>>> > >> > > > > > > > > > > >> the necessity of this strict order
>>> preservation
>>> > >> mode, if
>>> > >> > > > > > answers
>>> > >> > > > > > > > > from
>>> > >> > > > > > > > > > > >> SQL experts are shown to be negative.
>>> > >> > > > > > > > > > > >>
>>> > >> > > > > > > > > > > >> For your convenience, my current rough idea
>>> is
>>> > >> that we
>>> > >> > > can
>>> > >> > > > > > add a
>>> > >> > > > > > > > > > > >> module between the Input(s) and
>>> processElement()
>>> > >> module
>>> > >> > > in
>>> > >> > > > > > Fig 2
>>> > >> > > > > > > > of
>>> > >> > > > > > > > > > > >> FLIP-425. The module will be responsible for
>>> > >> caching
>>> > >> > > > records
>>> > >> > > > > > > whose
>>> > >> > > > > > > > > > > >> keys collide with in-flight records, and AEC
>>> will
>>> > >> only
>>> > >> > > be
>>> > >> > > > > > > > > responsible
>>> > >> > > > > > > > > > > >> for handling async state calls, without
>>> knowing
>>> > the
>>> > >> > > record
>>> > >> > > > > > each
>>> > >> > > > > > > > call
>>> > >> > > > > > > > > > > >> belongs to. We may revisit this topic once
>>> the
>>> > >> necessity
>>> > >> > > > of
>>> > >> > > > > > the
>>> > >> > > > > > > > > strict
>>> > >> > > > > > > > > > > >> order mode is clarified.
>>> > >> > > > > > > > > > > >>
>>> > >> > > > > > > > > > > >>
>>> > >> > > > > > > > > > > >> > 2. The amount of parallel StateRequests ...
>>> > >> instead of
>>> > >> > > > > > > invoking
>>> > >> > > > > > > > > > yield
>>> > >> > > > > > > > > > > >>
>>> > >> > > > > > > > > > > >> Your suggestions generally appeal to me. I
>>> think
>>> > >> we may
>>> > >> > > > let
>>> > >> > > > > > > > > > > >> corresponding Flink jobs fail with OOM for
>>> now,
>>> > >> since
>>> > >> > > the
>>> > >> > > > > > > majority
>>> > >> > > > > > > > > of
>>> > >> > > > > > > > > > > >> a StateRequest should just be references to
>>> > >> existing
>>> > >> > > Java
>>> > >> > > > > > > objects,
>>> > >> > > > > > > > > > > >> which only occupies very small memory space
>>> and
>>> > can
>>> > >> > > hardly
>>> > >> > > > > > cause
>>> > >> > > > > > > > OOM
>>> > >> > > > > > > > > > > >> in common cases. We can monitor the pending
>>> > >> > > StateRequests
>>> > >> > > > > and
>>> > >> > > > > > if
>>> > >> > > > > > > > > there
>>> > >> > > > > > > > > > > >> is really a risk of OOM in extreme cases, we
>>> can
>>> > >> throw
>>> > >> > > > > > > Exceptions
>>> > >> > > > > > > > > with
>>> > >> > > > > > > > > > > >> proper messages notifying users what to do,
>>> like
>>> > >> > > > increasing
>>> > >> > > > > > > memory
>>> > >> > > > > > > > > > > >> through configurations.
>>> > >> > > > > > > > > > > >>
>>> > >> > > > > > > > > > > >> Your suggestions to adjust threshold
>>> adaptively
>>> > or
>>> > >> to
>>> > >> > > use
>>> > >> > > > > the
>>> > >> > > > > > > > > blocking
>>> > >> > > > > > > > > > > >> buffer sounds good, and in my opinion we can
>>> > >> postpone
>>> > >> > > them
>>> > >> > > > > to
>>> > >> > > > > > > > future
>>> > >> > > > > > > > > > > >> FLIPs since they seem to only benefit users
>>> in
>>> > rare
>>> > >> > > cases.
>>> > >> > > > > > Given
>>> > >> > > > > > > > > that
>>> > >> > > > > > > > > > > >> FLIP-423~428 has already been a big enough
>>> > design,
>>> > >> it
>>> > >> > > > might
>>> > >> > > > > be
>>> > >> > > > > > > > > better
>>> > >> > > > > > > > > > > >> to focus on the most critical design for now
>>> and
>>> > >> > > postpone
>>> > >> > > > > > > > > > > >> optimizations like this. WDYT?
>>> > >> > > > > > > > > > > >>
>>> > >> > > > > > > > > > > >>
>>> > >> > > > > > > > > > > >> > 5. Sure, we will introduce new configs as
>>> well
>>> > as
>>> > >> > > their
>>> > >> > > > > > > default
>>> > >> > > > > > > > > > value.
>>> > >> > > > > > > > > > > >>
>>> > >> > > > > > > > > > > >> Thanks for adding the default values and the
>>> > values
>>> > >> > > > > themselves
>>> > >> > > > > > > > LGTM.
>>> > >> > > > > > > > > > > >> It might be better to move the default
>>> values to
>>> > >> the
>>> > >> > > > > Proposed
>>> > >> > > > > > > > > Changes
>>> > >> > > > > > > > > > > >> section instead of making them public for
>>> now, as
>>> > >> there
>>> > >> > > > will
>>> > >> > > > > > be
>>> > >> > > > > > > > > > > >> compatibility issues once we want to
>>> dynamically
>>> > >> adjust
>>> > >> > > > the
>>> > >> > > > > > > > > thresholds
>>> > >> > > > > > > > > > > >> and timeouts in future.
>>> > >> > > > > > > > > > > >>
>>> > >> > > > > > > > > > > >> The configuration
>>> execution.async-state.enabled
>>> > >> seems
>>> > >> > > > > > > unnecessary,
>>> > >> > > > > > > > > as
>>> > >> > > > > > > > > > > >> the infrastructure may automatically get this
>>> > >> > > information
>>> > >> > > > > from
>>> > >> > > > > > > the
>>> > >> > > > > > > > > > > >> detailed state backend configurations. We may
>>> > >> revisit
>>> > >> > > this
>>> > >> > > > > > part
>>> > >> > > > > > > > > after
>>> > >> > > > > > > > > > > >> the core designs have reached an agreement.
>>> WDYT?
>>> > >> > > > > > > > > > > >>
>>> > >> > > > > > > > > > > >>
>>> > >> > > > > > > > > > > >> Besides, inspired by Jeyhun's comments, it
>>> comes
>>> > >> to me
>>> > >> > > > that
>>> > >> > > > > > > > > > > >>
>>> > >> > > > > > > > > > > >> 8. Should this FLIP introduce metrics that
>>> > measure
>>> > >> the
>>> > >> > > > time
>>> > >> > > > > a
>>> > >> > > > > > > > Flink
>>> > >> > > > > > > > > > > >> job is back-pressured by State IOs? Under the
>>> > >> current
>>> > >> > > > > design,
>>> > >> > > > > > > this
>>> > >> > > > > > > > > > > >> metric could measure the time when the
>>> blocking
>>> > >> buffer
>>> > >> > > is
>>> > >> > > > > full
>>> > >> > > > > > > and
>>> > >> > > > > > > > > > > >> yield() cannot get callbacks to process,
>>> which
>>> > >> means the
>>> > >> > > > > > > operator
>>> > >> > > > > > > > is
>>> > >> > > > > > > > > > > >> fully waiting for state responses.
>>> > >> > > > > > > > > > > >>
>>> > >> > > > > > > > > > > >> Best regards,
>>> > >> > > > > > > > > > > >> Yunfeng
>>> > >> > > > > > > > > > > >>
>>> > >> > > > > > > > > > > >> On Mon, Mar 4, 2024 at 12:33 PM Zakelly Lan <
>>> > >> > > > > > > > zakelly....@gmail.com>
>>> > >> > > > > > > > > > wrote:
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> > Hi Yunfeng,
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> > Thanks for your detailed comments!
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >> 1. Why do we need a close() method on
>>> > >> StateIterator?
>>> > >> > > > This
>>> > >> > > > > > > > method
>>> > >> > > > > > > > > > seems
>>> > >> > > > > > > > > > > >> >> unused in the usage example codes.
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> > The `close()` is introduced to release
>>> internal
>>> > >> > > > resources,
>>> > >> > > > > > but
>>> > >> > > > > > > > it
>>> > >> > > > > > > > > > does not seem to require the user to call it. I
>>> > removed
>>> > >> this.
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >> 2. In FutureUtils.combineAll()'s JavaDoc,
>>> it
>>> > is
>>> > >> > > stated
>>> > >> > > > > that
>>> > >> > > > > > > "No
>>> > >> > > > > > > > > > null
>>> > >> > > > > > > > > > > >> >> entries are allowed". It might be better
>>> to
>>> > >> further
>>> > >> > > > > explain
>>> > >> > > > > > > > what
>>> > >> > > > > > > > > > will
>>> > >> > > > > > > > > > > >> >> happen if a null value is passed,
>>> ignoring the
>>> > >> value
>>> > >> > > in
>>> > >> > > > > the
>>> > >> > > > > > > > > > returned
>>> > >> > > > > > > > > > > >> >> Collection or throwing exceptions. Given
>>> that
>>> > >> > > > > > > > > > > >> >> FutureUtils.emptyFuture() can be returned
>>> in
>>> > the
>>> > >> > > > example
>>> > >> > > > > > > code,
>>> > >> > > > > > > > I
>>> > >> > > > > > > > > > > >> >> suppose the former one might be correct.
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> > The statement "No null entries are allowed"
>>> > >> refers to
>>> > >> > > > the
>>> > >> > > > > > > > > > parameters, it means some arrayList like [null,
>>> > >> StateFuture1,
>>> > >> > > > > > > > > StateFuture2]
>>> > >> > > > > > > > > > passed in are not allowed, and an Exception will
>>> be
>>> > >> thrown.
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >> 1. According to Fig 2 of this FLIP, ... .
>>> This
>>> > >> > > > situation
>>> > >> > > > > > > should
>>> > >> > > > > > > > > be
>>> > >> > > > > > > > > > > >> >> avoided and the order of same-key records
>>> > >> should be
>>> > >> > > > > > strictly
>>> > >> > > > > > > > > > > >> >> preserved.
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> > I will discuss this with some expert SQL
>>> > >> developers.
>>> > >> > > And
>>> > >> > > > > if
>>> > >> > > > > > it
>>> > >> > > > > > > > is
>>> > >> > > > > > > > > > valid and common, I suggest a strict order
>>> > preservation
>>> > >> mode
>>> > >> > > > for
>>> > >> > > > > > > > > > StreamRecord processing. WDYT?
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >> 2. The FLIP says that StateRequests
>>> submitted
>>> > by
>>> > >> > > > > Callbacks
>>> > >> > > > > > > will
>>> > >> > > > > > > > > not
>>> > >> > > > > > > > > > > >> >> invoke further yield() methods. Given that
>>> > >> yield() is
>>> > >> > > > > used
>>> > >> > > > > > > when
>>> > >> > > > > > > > > > there
>>> > >> > > > > > > > > > > >> >> is "too much" in-flight data, does it mean
>>> > >> > > > StateRequests
>>> > >> > > > > > > > > submitted
>>> > >> > > > > > > > > > by
>>> > >> > > > > > > > > > > >> >> Callbacks will never be "too much"? What
>>> if
>>> > the
>>> > >> total
>>> > >> > > > > > number
>>> > >> > > > > > > of
>>> > >> > > > > > > > > > > >> >> StateRequests exceed the capacity of Flink
>>> > >> operator's
>>> > >> > > > > > memory
>>> > >> > > > > > > > > space?
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> > The amount of parallel StateRequests for
>>> one
>>> > >> > > > StreamRecord
>>> > >> > > > > > > cannot
>>> > >> > > > > > > > > be
>>> > >> > > > > > > > > > determined since the code is written by users. So
>>> the
>>> > >> > > in-flight
>>> > >> > > > > > > > requests
>>> > >> > > > > > > > > > may be "too much", and may cause OOM. Users should
>>> > >> > > re-configure
>>> > >> > > > > > their
>>> > >> > > > > > > > > job,
>>> > >> > > > > > > > > > controlling the amount of on-going StreamRecord.
>>> And I
>>> > >> > > suggest
>>> > >> > > > > two
>>> > >> > > > > > > ways
>>> > >> > > > > > > > > to
>>> > >> > > > > > > > > > avoid this:
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> > Adaptively adjust the count of on-going
>>> > >> StreamRecord
>>> > >> > > > > > according
>>> > >> > > > > > > > to
>>> > >> > > > > > > > > > historical StateRequests amount.
>>> > >> > > > > > > > > > > >> > Also control the max StateRequests that
>>> can be
>>> > >> > > executed
>>> > >> > > > in
>>> > >> > > > > > > > > parallel
>>> > >> > > > > > > > > > for each StreamRecord, and if it exceeds, put the
>>> new
>>> > >> > > > > StateRequest
>>> > >> > > > > > in
>>> > >> > > > > > > > the
>>> > >> > > > > > > > > > blocking buffer waiting for execution (instead of
>>> > >> invoking
>>> > >> > > > > > yield()).
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> > WDYT?
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >> 3.1 I'm concerned that the out-of-order
>>> > >> execution
>>> > >> > > mode,
>>> > >> > > > > > along
>>> > >> > > > > > > > > with
>>> > >> > > > > > > > > > the
>>> > >> > > > > > > > > > > >> >> epoch mechanism, would bring more
>>> complexity
>>> > to
>>> > >> the
>>> > >> > > > > > execution
>>> > >> > > > > > > > > model
>>> > >> > > > > > > > > > > >> >> than the performance improvement it
>>> promises.
>>> > >> Could
>>> > >> > > we
>>> > >> > > > > add
>>> > >> > > > > > > some
>>> > >> > > > > > > > > > > >> >> benchmark results proving the benefit of
>>> this
>>> > >> mode?
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> > Agreed, will do.
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >> 3.2 The FLIP might need to add a public
>>> API
>>> > >> section
>>> > >> > > > > > > describing
>>> > >> > > > > > > > > how
>>> > >> > > > > > > > > > > >> >> users or developers can switch between
>>> these
>>> > two
>>> > >> > > > > execution
>>> > >> > > > > > > > modes.
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> > Good point. We will add a Public API
>>> section.
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >> 3.3 Apart from the watermark and
>>> checkpoint
>>> > >> mentioned
>>> > >> > > > in
>>> > >> > > > > > this
>>> > >> > > > > > > > > FLIP,
>>> > >> > > > > > > > > > > >> >> there are also more other events that
>>> might
>>> > >> appear in
>>> > >> > > > the
>>> > >> > > > > > > > stream
>>> > >> > > > > > > > > of
>>> > >> > > > > > > > > > > >> >> data records. It might be better to
>>> generalize
>>> > >> the
>>> > >> > > > > > execution
>>> > >> > > > > > > > mode
>>> > >> > > > > > > > > > > >> >> mechanism to handle all possible events.
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> > Yes, I missed this point. Thanks for the
>>> > >> reminder.
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >> 4. It might be better to treat
>>> > >> callback-handling as a
>>> > >> > > > > > > > > > > >> >> MailboxDefaultAction, instead of Mails, to
>>> > >> avoid the
>>> > >> > > > > > overhead
>>> > >> > > > > > > > of
>>> > >> > > > > > > > > > > >> >> repeatedly creating Mail objects.
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >  I thought the intermediated wrapper for
>>> > >> callback can
>>> > >> > > > not
>>> > >> > > > > be
>>> > >> > > > > > > > > > omitted, since there will be some context switch
>>> > before
>>> > >> each
>>> > >> > > > > > > execution.
>>> > >> > > > > > > > > The
>>> > >> > > > > > > > > > MailboxDefaultAction in most cases is processInput
>>> > >> right?
>>> > >> > > While
>>> > >> > > > > the
>>> > >> > > > > > > > > > callback should be executed with higher priority.
>>> I'd
>>> > >> suggest
>>> > >> > > > not
>>> > >> > > > > > > > > changing
>>> > >> > > > > > > > > > the basic logic of Mailbox and the default action
>>> > since
>>> > >> it is
>>> > >> > > > > very
>>> > >> > > > > > > > > critical
>>> > >> > > > > > > > > > for performance. But yes, we will try our best to
>>> > avoid
>>> > >> > > > creating
>>> > >> > > > > > > > > > intermediated objects.
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >> 5. Could this FLIP provide the current
>>> default
>>> > >> values
>>> > >> > > > for
>>> > >> > > > > > > > things
>>> > >> > > > > > > > > > like
>>> > >> > > > > > > > > > > >> >> active buffer size thresholds and
>>> timeouts?
>>> > >> These
>>> > >> > > could
>>> > >> > > > > > help
>>> > >> > > > > > > > with
>>> > >> > > > > > > > > > > >> >> memory consumption and latency analysis.
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> > Sure, we will introduce new configs as
>>> well as
>>> > >> their
>>> > >> > > > > default
>>> > >> > > > > > > > > value.
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >> 6. Why do we need to record the hashcode
>>> of a
>>> > >> record
>>> > >> > > in
>>> > >> > > > > its
>>> > >> > > > > > > > > > > >> >> RecordContext? It seems not used.
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> > The context switch before each callback
>>> > execution
>>> > >> > > > involves
>>> > >> > > > > > > > > > setCurrentKey, where the hashCode is
>>> re-calculated. We
>>> > >> cache
>>> > >> > > it
>>> > >> > > > > for
>>> > >> > > > > > > > > > accelerating.
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >> 7. In "timers can be stored on the JVM
>>> heap or
>>> > >> > > > RocksDB",
>>> > >> > > > > > the
>>> > >> > > > > > > > link
>>> > >> > > > > > > > > > > >> >> points to a document in flink-1.15. It
>>> might
>>> > be
>>> > >> > > better
>>> > >> > > > to
>>> > >> > > > > > > > verify
>>> > >> > > > > > > > > > the
>>> > >> > > > > > > > > > > >> >> referenced content is still valid in the
>>> > latest
>>> > >> Flink
>>> > >> > > > and
>>> > >> > > > > > > > update
>>> > >> > > > > > > > > > the
>>> > >> > > > > > > > > > > >> >> link accordingly. Same for other
>>> references if
>>> > >> any.
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> > Thanks for the reminder! Will check.
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> > Thanks a lot & Best,
>>> > >> > > > > > > > > > > >> > Zakelly
>>> > >> > > > > > > > > > > >> >
>>> > >> > > > > > > > > > > >> > On Sat, Mar 2, 2024 at 6:18 AM Jeyhun
>>> Karimov <
>>> > >> > > > > > > > > je.kari...@gmail.com>
>>> > >> > > > > > > > > > wrote:
>>> > >> > > > > > > > > > > >> >>
>>> > >> > > > > > > > > > > >> >> Hi,
>>> > >> > > > > > > > > > > >> >>
>>> > >> > > > > > > > > > > >> >> Thanks for the great proposals. I have a
>>> few
>>> > >> comments
>>> > >> > > > > > > comments:
>>> > >> > > > > > > > > > > >> >>
>>> > >> > > > > > > > > > > >> >> - Backpressure Handling. Flink's original
>>> > >> > > backpressure
>>> > >> > > > > > > handling
>>> > >> > > > > > > > > is
>>> > >> > > > > > > > > > quite
>>> > >> > > > > > > > > > > >> >> robust and the semantics is quite "simple"
>>> > >> (simple is
>>> > >> > > > > > > > beautiful).
>>> > >> > > > > > > > > > > >> >> This mechanism has proven to perform
>>> > >> better/robust
>>> > >> > > than
>>> > >> > > > > the
>>> > >> > > > > > > > other
>>> > >> > > > > > > > > > open
>>> > >> > > > > > > > > > > >> >> source streaming systems, where they were
>>> > >> relying on
>>> > >> > > > some
>>> > >> > > > > > > > > loopback
>>> > >> > > > > > > > > > > >> >> information.
>>> > >> > > > > > > > > > > >> >> Now that the proposal also relies on
>>> loopback
>>> > >> (yield
>>> > >> > > in
>>> > >> > > > > > this
>>> > >> > > > > > > > > > case), it is
>>> > >> > > > > > > > > > > >> >> not clear how well the new backpressure
>>> > handling
>>> > >> > > > proposed
>>> > >> > > > > > in
>>> > >> > > > > > > > > > FLIP-425 is
>>> > >> > > > > > > > > > > >> >> robust and handle fluctuating workloads.
>>> > >> > > > > > > > > > > >> >>
>>> > >> > > > > > > > > > > >> >> - Watermark/Timer Handling: Similar
>>> arguments
>>> > >> apply
>>> > >> > > for
>>> > >> > > > > > > > watermark
>>> > >> > > > > > > > > > and timer
>>> > >> > > > > > > > > > > >> >> handling. IMHO, we need more benchmarks
>>> > showing
>>> > >> the
>>> > >> > > > > > overhead
>>> > >> > > > > > > > > > > >> >> of epoch management with different
>>> parameters
>>> > >> (e.g.,
>>> > >> > > > > window
>>> > >> > > > > > > > size,
>>> > >> > > > > > > > > > watermark
>>> > >> > > > > > > > > > > >> >> strategy, etc)
>>> > >> > > > > > > > > > > >> >>
>>> > >> > > > > > > > > > > >> >> - DFS consistency guarantees. The
>>> proposal in
>>> > >> > > FLIP-427
>>> > >> > > > is
>>> > >> > > > > > > > > > DFS-agnostic.
>>> > >> > > > > > > > > > > >> >> However, different cloud providers have
>>> > >> different
>>> > >> > > > storage
>>> > >> > > > > > > > > > consistency
>>> > >> > > > > > > > > > > >> >> models.
>>> > >> > > > > > > > > > > >> >> How do we want to deal with them?
>>> > >> > > > > > > > > > > >> >>
>>> > >> > > > > > > > > > > >> >>  Regards,
>>> > >> > > > > > > > > > > >> >> Jeyhun
>>> > >> > > > > > > > > > > >> >>
>>> > >> > > > > > > > > > > >> >>
>>> > >> > > > > > > > > > > >> >>
>>> > >> > > > > > > > > > > >> >>
>>> > >> > > > > > > > > > > >> >> On Fri, Mar 1, 2024 at 6:08 PM Zakelly
>>> Lan <
>>> > >> > > > > > > > > zakelly....@gmail.com>
>>> > >> > > > > > > > > > wrote:
>>> > >> > > > > > > > > > > >> >>
>>> > >> > > > > > > > > > > >> >> > Thanks Piotr for sharing your thoughts!
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> > I guess it depends how we would like to
>>> > treat
>>> > >> the
>>> > >> > > > local
>>> > >> > > > > > > > disks.
>>> > >> > > > > > > > > > I've always
>>> > >> > > > > > > > > > > >> >> > > thought about them that almost always
>>> > >> eventually
>>> > >> > > > all
>>> > >> > > > > > > state
>>> > >> > > > > > > > > > from the DFS
>>> > >> > > > > > > > > > > >> >> > > should end up cached in the local
>>> disks.
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> > OK I got it. In our proposal we treat
>>> local
>>> > >> disk as
>>> > >> > > > an
>>> > >> > > > > > > > optional
>>> > >> > > > > > > > > > cache, so
>>> > >> > > > > > > > > > > >> >> > the basic design will handle the case
>>> with
>>> > >> state
>>> > >> > > > > residing
>>> > >> > > > > > > in
>>> > >> > > > > > > > > DFS
>>> > >> > > > > > > > > > only. It
>>> > >> > > > > > > > > > > >> >> > is a more 'cloud-native' approach that
>>> does
>>> > >> not
>>> > >> > > rely
>>> > >> > > > on
>>> > >> > > > > > any
>>> > >> > > > > > > > > > local storage
>>> > >> > > > > > > > > > > >> >> > assumptions, which allow users to
>>> > dynamically
>>> > >> > > adjust
>>> > >> > > > > the
>>> > >> > > > > > > > > > capacity or I/O
>>> > >> > > > > > > > > > > >> >> > bound of remote storage to gain
>>> performance
>>> > >> or save
>>> > >> > > > the
>>> > >> > > > > > > cost,
>>> > >> > > > > > > > > > even without
>>> > >> > > > > > > > > > > >> >> > a job restart.
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> > In
>>> > >> > > > > > > > > > > >> >> > > the currently proposed more fine
>>> grained
>>> > >> > > solution,
>>> > >> > > > > you
>>> > >> > > > > > > > make a
>>> > >> > > > > > > > > > single
>>> > >> > > > > > > > > > > >> >> > > request to DFS per each state access.
>>> > >> > > > > > > > > > > >> >> > >
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> > Ah that's not accurate. Actually we
>>> buffer
>>> > the
>>> > >> > > state
>>> > >> > > > > > > requests
>>> > >> > > > > > > > > > and process
>>> > >> > > > > > > > > > > >> >> > them in batch, multiple requests will
>>> > >> correspond to
>>> > >> > > > one
>>> > >> > > > > > DFS
>>> > >> > > > > > > > > > access (One
>>> > >> > > > > > > > > > > >> >> > block access for multiple keys
>>> performed by
>>> > >> > > RocksDB).
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> > In that benchmark you mentioned, are you
>>> > >> requesting
>>> > >> > > > the
>>> > >> > > > > > > state
>>> > >> > > > > > > > > > > >> >> > > asynchronously from local disks into
>>> > >> memory? If
>>> > >> > > the
>>> > >> > > > > > > benefit
>>> > >> > > > > > > > > > comes from
>>> > >> > > > > > > > > > > >> >> > > parallel I/O, then I would expect the
>>> > >> benefit to
>>> > >> > > > > > > > > > disappear/shrink when
>>> > >> > > > > > > > > > > >> >> > > running multiple subtasks on the same
>>> > >> machine, as
>>> > >> > > > > they
>>> > >> > > > > > > > would
>>> > >> > > > > > > > > > be making
>>> > >> > > > > > > > > > > >> >> > > their own parallel requests, right?
>>> Also
>>> > >> enabling
>>> > >> > > > > > > > > > checkpointing would
>>> > >> > > > > > > > > > > >> >> > > further cut into the available I/O
>>> budget.
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> > That's an interesting topic. Our
>>> proposal is
>>> > >> > > > > specifically
>>> > >> > > > > > > > aimed
>>> > >> > > > > > > > > > at the
>>> > >> > > > > > > > > > > >> >> > scenario where the machine I/O is not
>>> fully
>>> > >> loaded
>>> > >> > > > but
>>> > >> > > > > > the
>>> > >> > > > > > > > I/O
>>> > >> > > > > > > > > > latency has
>>> > >> > > > > > > > > > > >> >> > indeed become a bottleneck for each
>>> subtask.
>>> > >> While
>>> > >> > > > the
>>> > >> > > > > > > > > > distributed file
>>> > >> > > > > > > > > > > >> >> > system is a prime example of a scenario
>>> > >> > > characterized
>>> > >> > > > > by
>>> > >> > > > > > > > > > abundant and
>>> > >> > > > > > > > > > > >> >> > easily scalable I/O bandwidth coupled
>>> with
>>> > >> higher
>>> > >> > > I/O
>>> > >> > > > > > > > latency.
>>> > >> > > > > > > > > > You may
>>> > >> > > > > > > > > > > >> >> > expect to increase the parallelism of a
>>> job
>>> > to
>>> > >> > > > enhance
>>> > >> > > > > > the
>>> > >> > > > > > > > > > performance as
>>> > >> > > > > > > > > > > >> >> > well, but that also brings in more
>>> waste of
>>> > >> CPU's
>>> > >> > > and
>>> > >> > > > > > > memory
>>> > >> > > > > > > > > for
>>> > >> > > > > > > > > > building
>>> > >> > > > > > > > > > > >> >> > up more subtasks. This is one drawback
>>> for
>>> > the
>>> > >> > > > > > > > > > computation-storage tightly
>>> > >> > > > > > > > > > > >> >> > coupled nodes. While in our proposal,
>>> the
>>> > >> parallel
>>> > >> > > > I/O
>>> > >> > > > > > with
>>> > >> > > > > > > > all
>>> > >> > > > > > > > > > the
>>> > >> > > > > > > > > > > >> >> > callbacks still running in one task,
>>> > >> pre-allocated
>>> > >> > > > > > > > > computational
>>> > >> > > > > > > > > > resources
>>> > >> > > > > > > > > > > >> >> > are better utilized. It is a much more
>>> > >> lightweight
>>> > >> > > > way
>>> > >> > > > > to
>>> > >> > > > > > > > > > perform parallel
>>> > >> > > > > > > > > > > >> >> > I/O.
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> > Just with what granularity those async
>>> > >> requests
>>> > >> > > > should
>>> > >> > > > > be
>>> > >> > > > > > > > made.
>>> > >> > > > > > > > > > > >> >> > > Making state access asynchronous is
>>> > >> definitely
>>> > >> > > the
>>> > >> > > > > > right
>>> > >> > > > > > > > way
>>> > >> > > > > > > > > > to go!
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> > I think the current proposal is based on
>>> > such
>>> > >> core
>>> > >> > > > > ideas:
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> >    - A pure cloud-native disaggregated
>>> > state.
>>> > >> > > > > > > > > > > >> >> >    - Fully utilize the given resources
>>> and
>>> > >> try not
>>> > >> > > to
>>> > >> > > > > > waste
>>> > >> > > > > > > > > them
>>> > >> > > > > > > > > > (including
>>> > >> > > > > > > > > > > >> >> >    I/O).
>>> > >> > > > > > > > > > > >> >> >    - The ability to scale isolated
>>> resources
>>> > >> (I/O
>>> > >> > > or
>>> > >> > > > > CPU
>>> > >> > > > > > or
>>> > >> > > > > > > > > > memory)
>>> > >> > > > > > > > > > > >> >> >    independently.
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> > We think a fine-grained granularity is
>>> more
>>> > >> inline
>>> > >> > > > with
>>> > >> > > > > > > those
>>> > >> > > > > > > > > > ideas,
>>> > >> > > > > > > > > > > >> >> > especially without local disk
>>> assumptions
>>> > and
>>> > >> > > without
>>> > >> > > > > any
>>> > >> > > > > > > > waste
>>> > >> > > > > > > > > > of I/O by
>>> > >> > > > > > > > > > > >> >> > prefetching. Please note that it is not
>>> a
>>> > >> > > replacement
>>> > >> > > > > of
>>> > >> > > > > > > the
>>> > >> > > > > > > > > > original local
>>> > >> > > > > > > > > > > >> >> > state with synchronous execution.
>>> Instead
>>> > >> this is a
>>> > >> > > > > > > solution
>>> > >> > > > > > > > > > embracing the
>>> > >> > > > > > > > > > > >> >> > cloud-native era, providing much more
>>> > >> scalability
>>> > >> > > and
>>> > >> > > > > > > > resource
>>> > >> > > > > > > > > > efficiency
>>> > >> > > > > > > > > > > >> >> > when handling a *huge state*.
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> > What also worries me a lot in this fine
>>> > >> grained
>>> > >> > > model
>>> > >> > > > > is
>>> > >> > > > > > > the
>>> > >> > > > > > > > > > effect on the
>>> > >> > > > > > > > > > > >> >> > > checkpointing times.
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> > Your concerns are very reasonable.
>>> Faster
>>> > >> > > > checkpointing
>>> > >> > > > > > is
>>> > >> > > > > > > > > > always a core
>>> > >> > > > > > > > > > > >> >> > advantage of disaggregated state, but
>>> only
>>> > >> for the
>>> > >> > > > > async
>>> > >> > > > > > > > phase.
>>> > >> > > > > > > > > > There will
>>> > >> > > > > > > > > > > >> >> > be some complexity introduced by
>>> in-flight
>>> > >> > > requests,
>>> > >> > > > > but
>>> > >> > > > > > > I'd
>>> > >> > > > > > > > > > suggest a
>>> > >> > > > > > > > > > > >> >> > checkpoint containing those in-flight
>>> state
>>> > >> > > requests
>>> > >> > > > as
>>> > >> > > > > > > part
>>> > >> > > > > > > > of
>>> > >> > > > > > > > > > the state,
>>> > >> > > > > > > > > > > >> >> > to accelerate the sync phase by
>>> skipping the
>>> > >> buffer
>>> > >> > > > > > > draining.
>>> > >> > > > > > > > > > This makes
>>> > >> > > > > > > > > > > >> >> > the buffer size have little impact on
>>> > >> checkpoint
>>> > >> > > > time.
>>> > >> > > > > > And
>>> > >> > > > > > > > all
>>> > >> > > > > > > > > > the changes
>>> > >> > > > > > > > > > > >> >> > keep within the execution model we
>>> proposed
>>> > >> while
>>> > >> > > the
>>> > >> > > > > > > > > checkpoint
>>> > >> > > > > > > > > > barrier
>>> > >> > > > > > > > > > > >> >> > alignment or handling will not be
>>> touched in
>>> > >> our
>>> > >> > > > > > proposal,
>>> > >> > > > > > > > so I
>>> > >> > > > > > > > > > guess
>>> > >> > > > > > > > > > > >> >> > the complexity is relatively
>>> controllable. I
>>> > >> have
>>> > >> > > > faith
>>> > >> > > > > > in
>>> > >> > > > > > > > that
>>> > >> > > > > > > > > > :)
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> > Also regarding the overheads, it would
>>> be
>>> > >> great if
>>> > >> > > > you
>>> > >> > > > > > > could
>>> > >> > > > > > > > > > provide
>>> > >> > > > > > > > > > > >> >> > > profiling results of the benchmarks
>>> that
>>> > you
>>> > >> > > > > conducted
>>> > >> > > > > > to
>>> > >> > > > > > > > > > verify the
>>> > >> > > > > > > > > > > >> >> > > results. Or maybe if you could
>>> describe
>>> > the
>>> > >> steps
>>> > >> > > > to
>>> > >> > > > > > > > > reproduce
>>> > >> > > > > > > > > > the
>>> > >> > > > > > > > > > > >> >> > results?
>>> > >> > > > > > > > > > > >> >> > > Especially "Hashmap (sync)" vs
>>> "Hashmap
>>> > with
>>> > >> > > async
>>> > >> > > > > > API".
>>> > >> > > > > > > > > > > >> >> > >
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> > Yes we could profile the benchmarks.
>>> And for
>>> > >> the
>>> > >> > > > > > comparison
>>> > >> > > > > > > > of
>>> > >> > > > > > > > > > "Hashmap
>>> > >> > > > > > > > > > > >> >> > (sync)" vs "Hashmap with async API", we
>>> > >> conduct a
>>> > >> > > > > > Wordcount
>>> > >> > > > > > > > job
>>> > >> > > > > > > > > > written
>>> > >> > > > > > > > > > > >> >> > with async APIs but disabling the async
>>> > >> execution
>>> > >> > > by
>>> > >> > > > > > > directly
>>> > >> > > > > > > > > > completing
>>> > >> > > > > > > > > > > >> >> > the future using sync state access. This
>>> > >> evaluates
>>> > >> > > > the
>>> > >> > > > > > > > overhead
>>> > >> > > > > > > > > > of newly
>>> > >> > > > > > > > > > > >> >> > introduced modules like 'AEC' in sync
>>> > >> execution
>>> > >> > > (even
>>> > >> > > > > > > though
>>> > >> > > > > > > > > > they are not
>>> > >> > > > > > > > > > > >> >> > designed for it). The code will be
>>> provided
>>> > >> later.
>>> > >> > > > For
>>> > >> > > > > > > other
>>> > >> > > > > > > > > > results of our
>>> > >> > > > > > > > > > > >> >> > PoC[1], you can follow the instructions
>>> > >> here[2] to
>>> > >> > > > > > > reproduce.
>>> > >> > > > > > > > > > Since the
>>> > >> > > > > > > > > > > >> >> > compilation may take some effort, we
>>> will
>>> > >> directly
>>> > >> > > > > > provide
>>> > >> > > > > > > > the
>>> > >> > > > > > > > > > jar for
>>> > >> > > > > > > > > > > >> >> > testing next week.
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> > And @Yunfeng Zhou, I have noticed your
>>> mail
>>> > >> but it
>>> > >> > > > is a
>>> > >> > > > > > bit
>>> > >> > > > > > > > > late
>>> > >> > > > > > > > > > in my
>>> > >> > > > > > > > > > > >> >> > local time and the next few days are
>>> > >> weekends. So I
>>> > >> > > > > will
>>> > >> > > > > > > > reply
>>> > >> > > > > > > > > > to you
>>> > >> > > > > > > > > > > >> >> > later. Thanks for your response!
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> > [1]
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > >
>>> > >> > > > > > > > >
>>> > >> > > > > > > >
>>> > >> > > > > > >
>>> > >> > > > > >
>>> > >> > > > >
>>> > >> > > >
>>> > >> > >
>>> > >>
>>> >
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-PoCResults
>>> > >> > > > > > > > > > > >> >> > [2]
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > >
>>> > >> > > > > > > > >
>>> > >> > > > > > > >
>>> > >> > > > > > >
>>> > >> > > > > >
>>> > >> > > > >
>>> > >> > > >
>>> > >> > >
>>> > >>
>>> >
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-Appendix:HowtorunthePoC
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> > Best,
>>> > >> > > > > > > > > > > >> >> > Zakelly
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> > On Fri, Mar 1, 2024 at 6:38 PM Yunfeng
>>> Zhou
>>> > <
>>> > >> > > > > > > > > > flink.zhouyunf...@gmail.com>
>>> > >> > > > > > > > > > > >> >> > wrote:
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > > > >> >> > > Hi,
>>> > >> > > > > > > > > > > >> >> > >
>>> > >> > > > > > > > > > > >> >> > > Thanks for proposing this design! I
>>> just
>>> > >> read
>>> > >> > > > > FLIP-424
>>> > >> > > > > > > and
>>> > >> > > > > > > > > > FLIP-425
>>> > >> > > > > > > > > > > >> >> > > and have some questions about the
>>> proposed
>>> > >> > > changes.
>>> > >> > > > > > > > > > > >> >> > >
>>> > >> > > > > > > > > > > >> >> > > For Async API (FLIP-424)
>>> > >> > > > > > > > > > > >> >> > >
>>> > >> > > > > > > > > > > >> >> > > 1. Why do we need a close() method on
>>> > >> > > > StateIterator?
>>> > >> > > > > > This
>>> > >> > > > > > > > > > method seems
>>> > >> > > > > > > > > > > >> >> > > unused in the usage example codes.
>>> > >> > > > > > > > > > > >> >> > >
>>> > >> > > > > > > > > > > >> >> > > 2. In FutureUtils.combineAll()'s
>>> JavaDoc,
>>> > >> it is
>>> > >> > > > > stated
>>> > >> > > > > > > that
>>> > >> > > > > > > > > > "No null
>>> > >> > > > > > > > > > > >> >> > > entries are allowed". It might be
>>> better
>>> > to
>>> > >> > > further
>>> > >> > > > > > > explain
>>> > >> > > > > > > > > > what will
>>> > >> > > > > > > > > > > >> >> > > happen if a null value is passed,
>>> ignoring
>>> > >> the
>>> > >> > > > value
>>> > >> > > > > in
>>> > >> > > > > > > the
>>> > >> > > > > > > > > > returned
>>> > >> > > > > > > > > > > >> >> > > Collection or throwing exceptions.
>>> Given
>>> > >> that
>>> > >> > > > > > > > > > > >> >> > > FutureUtils.emptyFuture() can be
>>> returned
>>> > >> in the
>>> > >> > > > > > example
>>> > >> > > > > > > > > code,
>>> > >> > > > > > > > > > I
>>> > >> > > > > > > > > > > >> >> > > suppose the former one might be
>>> correct.
>>> > >> > > > > > > > > > > >> >> > >
>>> > >> > > > > > > > > > > >> >> > >
>>> > >> > > > > > > > > > > >> >> > > For Async Execution (FLIP-425)
>>> > >> > > > > > > > > > > >> >> > >
>>> > >> > > > > > > > > > > >> >> > > 1. According to Fig 2 of this FLIP,
>>> if a
>>> > >> recordB
>>> > >> > > > has
>>> > >> > > > > > its
>>> > >> > > > > > > > key
>>> > >> > > > > > > > > > collide
>>> > >> > > > > > > > > > > >> >> > > with an ongoing recordA, its
>>> > >> processElement()
>>> > >> > > > method
>>> > >> > > > > > can
>>> > >> > > > > > > > > still
>>> > >> > > > > > > > > > be
>>> > >> > > > > > > > > > > >> >> > > triggered immediately, and then it
>>> might
>>> > be
>>> > >> moved
>>> > >> > > > to
>>> > >> > > > > > the
>>> > >> > > > > > > > > > blocking
>>> > >> > > > > > > > > > > >> >> > > buffer in AEC if it involves state
>>> > >> operations.
>>> > >> > > This
>>> > >> > > > > > means
>>> > >> > > > > > > > > that
>>> > >> > > > > > > > > > > >> >> > > recordB's output will precede
>>> recordA's
>>> > >> output in
>>> > >> > > > > > > > downstream
>>> > >> > > > > > > > > > > >> >> > > operators, if recordA involves state
>>> > >> operations
>>> > >> > > > while
>>> > >> > > > > > > > recordB
>>> > >> > > > > > > > > > does
>>> > >> > > > > > > > > > > >> >> > > not. This will harm the correctness of
>>> > >> Flink jobs
>>> > >> > > > in
>>> > >> > > > > > some
>>> > >> > > > > > > > use
>>> > >> > > > > > > > > > cases.
>>> > >> > > > > > > > > > > >> >> > > For example, in dim table join cases,
>>> > >> recordA
>>> > >> > > could
>>> > >> > > > > be
>>> > >> > > > > > a
>>> > >> > > > > > > > > delete
>>> > >> > > > > > > > > > > >> >> > > operation that involves state access,
>>> > while
>>> > >> > > recordB
>>> > >> > > > > > could
>>> > >> > > > > > > > be
>>> > >> > > > > > > > > > an insert
>>> > >> > > > > > > > > > > >> >> > > operation that needs to visit external
>>> > >> storage
>>> > >> > > > > without
>>> > >> > > > > > > > state
>>> > >> > > > > > > > > > access.
>>> > >> > > > > > > > > > > >> >> > > If recordB's output precedes
>>> recordA's,
>>> > >> then an
>>> > >> > > > entry
>>> > >> > > > > > > that
>>> > >> > > > > > > > is
>>> > >> > > > > > > > > > supposed
>>> > >> > > > > > > > > > > >> >> > > to finally exist with recordB's value
>>> in
>>> > >> the sink
>>> > >> > > > > table
>>> > >> > > > > > > > might
>>> > >> > > > > > > > > > actually
>>> > >> > > > > > > > > > > >> >> > > be deleted according to recordA's
>>> command.
>>> > >> This
>>> > >> > > > > > situation
>>> > >> > > > > > > > > > should be
>>> > >> > > > > > > > > > > >> >> > > avoided and the order of same-key
>>> records
>>> > >> should
>>> > >> > > be
>>> > >> > > > > > > > strictly
>>> > >> > > > > > > > > > > >> >> > > preserved.
>>> > >> > > > > > > > > > > >> >> > >
>>> > >> > > > > > > > > > > >> >> > > 2. The FLIP says that StateRequests
>>> > >> submitted by
>>> > >> > > > > > > Callbacks
>>> > >> > > > > > > > > > will not
>>> > >> > > > > > > > > > > >> >> > > invoke further yield() methods. Given
>>> that
>>> > >> > > yield()
>>> > >> > > > is
>>> > >> > > > > > > used
>>> > >> > > > > > > > > > when there
>>> > >> > > > > > > > > > > >> >> > > is "too much" in-flight data, does it
>>> mean
>>> > >> > > > > > StateRequests
>>> > >> > > > > > > > > > submitted by
>>> > >> > > > > > > > > > > >> >> > > Callbacks will never be "too much"?
>>> What
>>> > if
>>> > >> the
>>> > >> > > > total
>>> > >> > > > > > > > number
>>> > >> > > > > > > > > of
>>> > >> > > > > > > > > > > >> >> > > StateRequests exceed the capacity of
>>> Flink
>>> > >> > > > operator's
>>> > >> > > > > > > > memory
>>> > >> > > > > > > > > > space?
>>> > >> > > > > > > > > > > >> >> > >
>>> > >> > > > > > > > > > > >> >> > > 3. In the "Watermark" section, this
>>> FLIP
>>> > >> provided
>>> > >> > > > an
>>> > >> > > > > > > > > > out-of-order
>>> > >> > > > > > > > > > > >> >> > > execution mode apart from the default
>>> > >> > > > > strictly-ordered
>>> > >> > > > > > > > mode,
>>> > >> > > > > > > > > > which can
>>> > >> > > > > > > > > > > >> >> > > optimize performance by allowing more
>>> > >> concurrent
>>> > >> > > > > > > > executions.
>>> > >> > > > > > > > > > > >> >> > >
>>> > >> > > > > > > > > > > >> >> > > 3.1 I'm concerned that the
>>> out-of-order
>>> > >> execution
>>> > >> > > > > mode,
>>> > >> > > > > > > > along
>>> > >> > > > > > > > > > with the
>>> > >> > > > > > > > > > > >> >> > > epoch mechanism, would bring more
>>> > >> complexity to
>>> > >> > > the
>>> > >> > > > > > > > execution
>>> > >> > > > > > > > > > model
>>> > >> > > > > > > > > > > >> >> > > than the performance improvement it
>>> > >> promises.
>>> > >> > > Could
>>> > >> > > > > we
>>> > >> > > > > > > add
>>> > >> > > > > > > > > some
>>> > >> > > > > > > > > > > >> >> > > benchmark results proving the benefit
>>> of
>>> > >> this
>>> > >> > > mode?
>>> > >> > > > > > > > > > > >> >> > >
>>> > >> > > > > > > > > > > >> >> > > 3.2 The FLIP might need to add a
>>> public
>>> > API
>>> > >> > > section
>>> > >> > > > > > > > > describing
>>> > >> > > > > > > > > > how
>>> > >> > > > > > > > > > > >> >> > > users or developers can switch between
>>> > >> these two
>>> > >> > > > > > > execution
>>> > >> > > > > > > > > > modes.
>>> > >> > > > > > > > > > > >> >> > >
>>> > >> > > > > > > > > > > >> >> > > 3.3 Apart from the watermark and
>>> > checkpoint
>>> > >> > > > mentioned
>>> > >> > > > > > in
>>> > >> > > > > > > > this
>>> > >> > > > > > > > > > FLIP,
>>> > >> > > > > > > > > > > >> >> > > there are also more other events that
>>> > might
>>> > >> > > appear
>>> > >> > > > in
>>> > >> > > > > > the
>>> > >> > > > > > > > > > stream of
>>> > >> > > > > > > > > > > >> >> > > data records. It might be better to
>>> > >> generalize
>>> > >> > > the
>>> > >> > > > > > > > execution
>>> > >> > > > > > > > > > mode
>>> > >> > > > > > > > > > > >> >> > > mechanism to handle all possible
>>> events.
>>> > >> > > > > > > > > > > >> >> > >
>>> > >> > > > > > > > > > > >> >> > > 4. It might be better to treat
>>> > >> callback-handling
>>> > >> > > > as a
>>> > >> > > > > > > > > > > >> >> > > MailboxDefaultAction, instead of
>>> Mails, to
>>> > >> avoid
>>> > >> > > > the
>>> > >> > > > > > > > overhead
>>> > >> > > > > > > > > > of
>>> > >> > > > > > > > > > > >> >> > > repeatedly creating Mail objects.
>>> > >> > > > > > > > > > > >> >> > >
>>> > >> > > > > > > > > > > >> >> > > 5. Could this FLIP provide the current
>>> > >> default
>>> > >> > > > values
>>> > >> > > > > > for
>>> > >> > > > > > > > > > things like
>>> > >> > > > > > > > > > > >> >> > > active buffer size thresholds and
>>> > timeouts?
>>> > >> These
>>> > >> > > > > could
>>> > >> > > > > > > > help
>>> > >> > > > > > > > > > with
>>> > >> > > > > > > > > > > >> >> > > memory consumption and latency
>>> analysis.
>>> > >> > > > > > > > > > > >> >> > >
>>> > >> > > > > > > > > > > >> >> > > 6. Why do we need to record the
>>> hashcode
>>> > of
>>> > >> a
>>> > >> > > > record
>>> > >> > > > > in
>>> > >> > > > > > > its
>>> > >> > > > > > > > > > > >> >> > > RecordContext? It seems not used.
>>> > >> > > > > > > > > > > >> >> > >
>>> > >> > > > > > > > > > > >> >> > > 7. In "timers can be stored on the JVM
>>> > heap
>>> > >> or
>>> > >> > > > > > RocksDB",
>>> > >> > > > > > > > the
>>> > >> > > > > > > > > > link
>>> > >> > > > > > > > > > > >> >> > > points to a document in flink-1.15. It
>>> > >> might be
>>> > >> > > > > better
>>> > >> > > > > > to
>>> > >> > > > > > > > > > verify the
>>> > >> > > > > > > > > > > >> >> > > referenced content is still valid in
>>> the
>>> > >> latest
>>> > >> > > > Flink
>>> > >> > > > > > and
>>> > >> > > > > > > > > > update the
>>> > >> > > > > > > > > > > >> >> > > link accordingly. Same for other
>>> > references
>>> > >> if
>>> > >> > > any.
>>> > >> > > > > > > > > > > >> >> > >
>>> > >> > > > > > > > > > > >> >> > > Best,
>>> > >> > > > > > > > > > > >> >> > > Yunfeng Zhou
>>> > >> > > > > > > > > > > >> >> > >
>>> > >> > > > > > > > > > > >> >> > > On Thu, Feb 29, 2024 at 2:17 PM Yuan
>>> Mei <
>>> > >> > > > > > > > > > yuanmei.w...@gmail.com> wrote:
>>> > >> > > > > > > > > > > >> >> > > >
>>> > >> > > > > > > > > > > >> >> > > > Hi Devs,
>>> > >> > > > > > > > > > > >> >> > > >
>>> > >> > > > > > > > > > > >> >> > > > This is a joint work of Yuan Mei,
>>> > Zakelly
>>> > >> Lan,
>>> > >> > > > > > Jinzhong
>>> > >> > > > > > > > Li,
>>> > >> > > > > > > > > > Hangxiang
>>> > >> > > > > > > > > > > >> >> > Yu,
>>> > >> > > > > > > > > > > >> >> > > > Yanfei Lei and Feng Wang. We'd like
>>> to
>>> > >> start a
>>> > >> > > > > > > discussion
>>> > >> > > > > > > > > > about
>>> > >> > > > > > > > > > > >> >> > > introducing
>>> > >> > > > > > > > > > > >> >> > > > Disaggregated State Storage and
>>> > >> Management in
>>> > >> > > > Flink
>>> > >> > > > > > > 2.0.
>>> > >> > > > > > > > > > > >> >> > > >
>>> > >> > > > > > > > > > > >> >> > > > The past decade has witnessed a
>>> dramatic
>>> > >> shift
>>> > >> > > in
>>> > >> > > > > > > Flink's
>>> > >> > > > > > > > > > deployment
>>> > >> > > > > > > > > > > >> >> > > mode,
>>> > >> > > > > > > > > > > >> >> > > > workload patterns, and hardware
>>> > >> improvements.
>>> > >> > > > We've
>>> > >> > > > > > > moved
>>> > >> > > > > > > > > > from the
>>> > >> > > > > > > > > > > >> >> > > > map-reduce era where workers are
>>> > >> > > > > computation-storage
>>> > >> > > > > > > > > tightly
>>> > >> > > > > > > > > > coupled
>>> > >> > > > > > > > > > > >> >> > > nodes
>>> > >> > > > > > > > > > > >> >> > > > to a cloud-native world where
>>> > >> containerized
>>> > >> > > > > > deployments
>>> > >> > > > > > > > on
>>> > >> > > > > > > > > > Kubernetes
>>> > >> > > > > > > > > > > >> >> > > > become standard. To enable Flink's
>>> > >> Cloud-Native
>>> > >> > > > > > future,
>>> > >> > > > > > > > we
>>> > >> > > > > > > > > > introduce
>>> > >> > > > > > > > > > > >> >> > > > Disaggregated State Storage and
>>> > >> Management that
>>> > >> > > > > uses
>>> > >> > > > > > > DFS
>>> > >> > > > > > > > as
>>> > >> > > > > > > > > > primary
>>> > >> > > > > > > > > > > >> >> > > storage
>>> > >> > > > > > > > > > > >> >> > > > in Flink 2.0, as promised in the
>>> Flink
>>> > 2.0
>>> > >> > > > Roadmap.
>>> > >> > > > > > > > > > > >> >> > > >
>>> > >> > > > > > > > > > > >> >> > > > Design Details can be found in
>>> > >> FLIP-423[1].
>>> > >> > > > > > > > > > > >> >> > > >
>>> > >> > > > > > > > > > > >> >> > > > This new architecture is aimed to
>>> solve
>>> > >> the
>>> > >> > > > > following
>>> > >> > > > > > > > > > challenges
>>> > >> > > > > > > > > > > >> >> > brought
>>> > >> > > > > > > > > > > >> >> > > in
>>> > >> > > > > > > > > > > >> >> > > > the cloud-native era for Flink.
>>> > >> > > > > > > > > > > >> >> > > > 1. Local Disk Constraints in
>>> > >> containerization
>>> > >> > > > > > > > > > > >> >> > > > 2. Spiky Resource Usage caused by
>>> > >> compaction in
>>> > >> > > > the
>>> > >> > > > > > > > current
>>> > >> > > > > > > > > > state model
>>> > >> > > > > > > > > > > >> >> > > > 3. Fast Rescaling for jobs with
>>> large
>>> > >> states
>>> > >> > > > > > (hundreds
>>> > >> > > > > > > of
>>> > >> > > > > > > > > > Terabytes)
>>> > >> > > > > > > > > > > >> >> > > > 4. Light and Fast Checkpoint in a
>>> native
>>> > >> way
>>> > >> > > > > > > > > > > >> >> > > >
>>> > >> > > > > > > > > > > >> >> > > > More specifically, we want to reach
>>> a
>>> > >> consensus
>>> > >> > > > on
>>> > >> > > > > > the
>>> > >> > > > > > > > > > following issues
>>> > >> > > > > > > > > > > >> >> > > in
>>> > >> > > > > > > > > > > >> >> > > > this discussion:
>>> > >> > > > > > > > > > > >> >> > > >
>>> > >> > > > > > > > > > > >> >> > > > 1. Overall design
>>> > >> > > > > > > > > > > >> >> > > > 2. Proposed Changes
>>> > >> > > > > > > > > > > >> >> > > > 3. Design details to achieve
>>> Milestone1
>>> > >> > > > > > > > > > > >> >> > > >
>>> > >> > > > > > > > > > > >> >> > > > In M1, we aim to achieve an
>>> end-to-end
>>> > >> baseline
>>> > >> > > > > > version
>>> > >> > > > > > > > > > using DFS as
>>> > >> > > > > > > > > > > >> >> > > > primary storage and complete core
>>> > >> > > > functionalities,
>>> > >> > > > > > > > > including:
>>> > >> > > > > > > > > > > >> >> > > >
>>> > >> > > > > > > > > > > >> >> > > > - Asynchronous State APIs
>>> (FLIP-424)[2]:
>>> > >> > > > Introduce
>>> > >> > > > > > new
>>> > >> > > > > > > > APIs
>>> > >> > > > > > > > > > for
>>> > >> > > > > > > > > > > >> >> > > > asynchronous state access.
>>> > >> > > > > > > > > > > >> >> > > > - Asynchronous Execution Model
>>> > >> (FLIP-425)[3]:
>>> > >> > > > > > > Implement a
>>> > >> > > > > > > > > > non-blocking
>>> > >> > > > > > > > > > > >> >> > > > execution model leveraging the
>>> > >> asynchronous
>>> > >> > > APIs
>>> > >> > > > > > > > introduced
>>> > >> > > > > > > > > > in
>>> > >> > > > > > > > > > > >> >> > FLIP-424.
>>> > >> > > > > > > > > > > >> >> > > > - Grouping Remote State Access
>>> > >> (FLIP-426)[4]:
>>> > >> > > > > Enable
>>> > >> > > > > > > > > > retrieval of
>>> > >> > > > > > > > > > > >> >> > remote
>>> > >> > > > > > > > > > > >> >> > > > state data in batches to avoid
>>> > unnecessary
>>> > >> > > > > round-trip
>>> > >> > > > > > > > costs
>>> > >> > > > > > > > > > for remote
>>> > >> > > > > > > > > > > >> >> > > > access
>>> > >> > > > > > > > > > > >> >> > > > - Disaggregated State Store
>>> > (FLIP-427)[5]:
>>> > >> > > > > Introduce
>>> > >> > > > > > > the
>>> > >> > > > > > > > > > initial
>>> > >> > > > > > > > > > > >> >> > version
>>> > >> > > > > > > > > > > >> >> > > of
>>> > >> > > > > > > > > > > >> >> > > > the ForSt disaggregated state store.
>>> > >> > > > > > > > > > > >> >> > > > - Fault Tolerance/Rescale
>>> Integration
>>> > >> > > > > (FLIP-428)[6]:
>>> > >> > > > > > > > > > Integrate
>>> > >> > > > > > > > > > > >> >> > > > checkpointing mechanisms with the
>>> > >> disaggregated
>>> > >> > > > > state
>>> > >> > > > > > > > store
>>> > >> > > > > > > > > > for fault
>>> > >> > > > > > > > > > > >> >> > > > tolerance and fast rescaling.
>>> > >> > > > > > > > > > > >> >> > > >
>>> > >> > > > > > > > > > > >> >> > > > We will vote on each FLIP in
>>> separate
>>> > >> threads
>>> > >> > > to
>>> > >> > > > > make
>>> > >> > > > > > > > sure
>>> > >> > > > > > > > > > each FLIP
>>> > >> > > > > > > > > > > >> >> > > > reaches a consensus. But we want to
>>> keep
>>> > >> the
>>> > >> > > > > > discussion
>>> > >> > > > > > > > > > within a
>>> > >> > > > > > > > > > > >> >> > focused
>>> > >> > > > > > > > > > > >> >> > > > thread (this thread) for easier
>>> tracking
>>> > >> of
>>> > >> > > > > contexts
>>> > >> > > > > > to
>>> > >> > > > > > > > > avoid
>>> > >> > > > > > > > > > > >> >> > duplicated
>>> > >> > > > > > > > > > > >> >> > > > questions/discussions and also to
>>> think
>>> > >> of the
>>> > >> > > > > > > > > > problem/solution in a
>>> > >> > > > > > > > > > > >> >> > full
>>> > >> > > > > > > > > > > >> >> > > > picture.
>>> > >> > > > > > > > > > > >> >> > > >
>>> > >> > > > > > > > > > > >> >> > > > Looking forward to your feedback
>>> > >> > > > > > > > > > > >> >> > > >
>>> > >> > > > > > > > > > > >> >> > > > Best,
>>> > >> > > > > > > > > > > >> >> > > > Yuan, Zakelly, Jinzhong, Hangxiang,
>>> > >> Yanfei and
>>> > >> > > > Feng
>>> > >> > > > > > > > > > > >> >> > > >
>>> > >> > > > > > > > > > > >> >> > > > [1]
>>> > >> > > https://cwiki.apache.org/confluence/x/R4p3EQ
>>> > >> > > > > > > > > > > >> >> > > > [2]
>>> > >> > > https://cwiki.apache.org/confluence/x/SYp3EQ
>>> > >> > > > > > > > > > > >> >> > > > [3]
>>> > >> > > https://cwiki.apache.org/confluence/x/S4p3EQ
>>> > >> > > > > > > > > > > >> >> > > > [4]
>>> > >> > > https://cwiki.apache.org/confluence/x/TYp3EQ
>>> > >> > > > > > > > > > > >> >> > > > [5]
>>> > >> > > https://cwiki.apache.org/confluence/x/T4p3EQ
>>> > >> > > > > > > > > > > >> >> > > > [6]
>>> > >> > > https://cwiki.apache.org/confluence/x/UYp3EQ
>>> > >> > > > > > > > > > > >> >> > >
>>> > >> > > > > > > > > > > >> >> >
>>> > >> > > > > > > > > >
>>> > >> > > > > > > > >
>>> > >> > > > > > > >
>>> > >> > > > > > >
>>> > >> > > > > >
>>> > >> > > > >
>>> > >> > > >
>>> > >> > >
>>> > >>
>>> > >
>>> >
>>>
>>

Reply via email to