Hi Xintong and Zakelly,

> 2. Regarding Strictly-ordered and Out-of-order of Watermarks
I agree with it that watermarks can use only out-of-order mode for
now, because there is still not a concrete example showing the
correctness risk about it. However, the strictly-ordered mode should
still be supported as the default option for non-record event types
other than watermark, at least for checkpoint barriers.

I noticed that this information has already been documented in "For
other non-record events, such as RecordAttributes ...", but it's at
the bottom of the "Watermark" section, which might not be very
obvious. Thus it might be better to reorganize the FLIP to better
claim that the two order modes are designed for all non-record events,
and which mode this FLIP would choose for each type of event.

Best,
Yunfeng

On Tue, Mar 19, 2024 at 1:09 PM Xintong Song <tonysong...@gmail.com> wrote:
>
> Thanks for the quick response. Sounds good to me.
>
> Best,
>
> Xintong
>
>
>
> On Tue, Mar 19, 2024 at 1:03 PM Zakelly Lan <zakelly....@gmail.com> wrote:
>
> > Hi Xintong,
> >
> > Thanks for sharing your thoughts!
> >
> > 1. Regarding Record-ordered and State-ordered of processElement.
> > >
> > > I understand that while State-ordered likely provides better performance,
> > > Record-ordered is sometimes required for correctness. The question is how
> > > should a user choose between these two modes? My concern is that such a
> > > decision may require users to have in-depth knowledge about the Flink
> > > internals, and may lead to correctness issues if State-ordered is chosen
> > > improperly.
> > >
> > > I'd suggest not to expose such a knob, at least in the first version.
> > That
> > > means always use Record-ordered for custom operators / UDFs, and keep
> > > State-ordered for internal usages (built-in operators) only.
> > >
> >
> > Indeed, users may not be able to choose the mode properly. I agree to keep
> > such options for internal use.
> >
> >
> > 2. Regarding Strictly-ordered and Out-of-order of Watermarks.
> > >
> > > I'm not entirely sure about Strictly-ordered being the default, or even
> > > being supported. From my understanding, a Watermark(T) only suggests that
> > > all records with event time before T has arrived, and it has nothing to
> > do
> > > with whether records with event time after T has arrived or not. From
> > that
> > > perspective, preventing certain records from arriving before a Watermark
> > is
> > > never supported. I also cannot come up with any use case where
> > > Strictly-ordered is necessary. This implies the same issue as 1): how
> > does
> > > the user choose between the two modes?
> > >
> > > I'd suggest not expose the knob to users and only support Out-of-order,
> > > until we see a concrete use case that Strictly-ordered is needed.
> > >
> >
> > The semantics of watermarks do not define the sequence between a watermark
> > and subsequent records. For the most part, this is inconsequential, except
> > it may affect some current users who have previously relied on the implicit
> > assumption of an ordered execution. I'd be fine with initially supporting
> > only out-of-order processing. We may consider exposing the
> > 'Strictly-ordered' mode once we encounter a concrete use case that
> > necessitates it.
> >
> >
> > My philosophies behind not exposing the two config options are:
> > > - There are already too many options in Flink that barely know how to use
> > > them. I think Flink should try as much as possible to decide its own
> > > behavior, rather than throwing all the decisions to the users.
> > > - It's much harder to take back knobs than to introduce them. Therefore,
> > > options should be introduced only if concrete use cases are identified.
> > >
> >
> > I agree to keep minimal configurable items especially for the MVP. Given
> > that we have the opportunity to refine the functionality before the
> > framework transitions from @Experimental to @PublicEvolving, it makes sense
> > to refrain from presenting user-facing options until we have ensured
> > their necessity.
> >
> >
> > Best,
> > Zakelly
> >
> > On Tue, Mar 19, 2024 at 12:06 PM Xintong Song <tonysong...@gmail.com>
> > wrote:
> >
> > > Sorry for joining the discussion late.
> > >
> > > I have two questions about FLIP-425.
> > >
> > > 1. Regarding Record-ordered and State-ordered of processElement.
> > >
> > > I understand that while State-ordered likely provides better performance,
> > > Record-ordered is sometimes required for correctness. The question is how
> > > should a user choose between these two modes? My concern is that such a
> > > decision may require users to have in-depth knowledge about the Flink
> > > internals, and may lead to correctness issues if State-ordered is chosen
> > > improperly.
> > >
> > > I'd suggest not to expose such a knob, at least in the first version.
> > That
> > > means always use Record-ordered for custom operators / UDFs, and keep
> > > State-ordered for internal usages (built-in operators) only.
> > >
> > > 2. Regarding Strictly-ordered and Out-of-order of Watermarks.
> > >
> > > I'm not entirely sure about Strictly-ordered being the default, or even
> > > being supported. From my understanding, a Watermark(T) only suggests that
> > > all records with event time before T has arrived, and it has nothing to
> > do
> > > with whether records with event time after T has arrived or not. From
> > that
> > > perspective, preventing certain records from arriving before a Watermark
> > is
> > > never supported. I also cannot come up with any use case where
> > > Strictly-ordered is necessary. This implies the same issue as 1): how
> > does
> > > the user choose between the two modes?
> > >
> > > I'd suggest not expose the knob to users and only support Out-of-order,
> > > until we see a concrete use case that Strictly-ordered is needed.
> > >
> > >
> > > My philosophies behind not exposing the two config options are:
> > > - There are already too many options in Flink that barely know how to use
> > > them. I think Flink should try as much as possible to decide its own
> > > behavior, rather than throwing all the decisions to the users.
> > > - It's much harder to take back knobs than to introduce them. Therefore,
> > > options should be introduced only if concrete use cases are identified.
> > >
> > > WDYT?
> > >
> > > Best,
> > >
> > > Xintong
> > >
> > >
> > >
> > > On Fri, Mar 8, 2024 at 2:45 AM Jing Ge <j...@ververica.com.invalid>
> > wrote:
> > >
> > > > +1 for Gyula's suggestion. I just finished FLIP-423 which introduced
> > the
> > > > intention of the big change and high level architecture. Great content
> > > btw!
> > > > The only public interface change for this FLIP is one new config to use
> > > > ForSt. It makes sense to have one dedicated discussion thread for each
> > > > concrete system design.
> > > >
> > > > @Zakelly The links in your mail do not work except the last one,
> > because
> > > > the FLIP-xxx has been included into the url like
> > > >
> > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425
> > > .
> > > >
> > > > NIT fix:
> > > >
> > > > FLIP-424:
> > > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
> > > >
> > > > FLIP-425:
> > > https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h
> > > >
> > > > FLIP-426:
> > > https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf
> > > >
> > > > FLIP-427:
> > > https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft
> > > >
> > > > FLIP-428:
> > > https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b
> > > >
> > > > Best regards,
> > > > Jing
> > > >
> > > >
> > > >
> > > >
> > > > On Thu, Mar 7, 2024 at 10:14 AM Zakelly Lan <zakelly....@gmail.com>
> > > wrote:
> > > >
> > > > > Hi everyone,
> > > > >
> > > > > Thank you all for a lively discussion here, and it is a good time to
> > > move
> > > > > forward to more detailed discussions. Thus we open several threads
> > for
> > > > > sub-FLIPs:
> > > > >
> > > > > FLIP-424:
> > > > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864
> > > > > FLIP-425
> > > > > <
> > > >
> > https://lists.apache.org/thread/nmd9qd0k8l94ygcfgllxms49wmtz1864FLIP-425
> > > >:
> > > > > https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0h
> > > > > FLIP-426
> > > > > <
> > > >
> > https://lists.apache.org/thread/wxn1j848fnfkqsnrs947wh1wmj8n8z0hFLIP-426
> > > >:
> > > > > https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrf
> > > > > FLIP-427
> > > > > <
> > > >
> > https://lists.apache.org/thread/bt931focfl9971cwq194trmf3pkdsxrfFLIP-427
> > > >:
> > > > > https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ft
> > > > > FLIP-428
> > > > > <
> > > >
> > https://lists.apache.org/thread/vktfzqvb7t4rltg7fdlsyd9sfdmrc4ftFLIP-428
> > > >:
> > > > > https://lists.apache.org/thread/vr8f91p715ct4lop6b3nr0fh4z5p312b
> > > > >
> > > > > If you want to talk about the overall architecture, roadmap,
> > milestones
> > > > or
> > > > > something related with multiple FLIPs, please post it here. Otherwise
> > > you
> > > > > can discuss some details in separate mails. Let's try to avoid
> > repeated
> > > > > discussion in different threads. I will sync important messages here
> > if
> > > > > there are any in the above threads.
> > > > >
> > > > > And reply to @Jeyhun: We will ensure the content between those FLIPs
> > is
> > > > > consistent.
> > > > >
> > > > >
> > > > > Best,
> > > > > Zakelly
> > > > >
> > > > > On Thu, Mar 7, 2024 at 2:16 PM Yuan Mei <yuanmei.w...@gmail.com>
> > > wrote:
> > > > >
> > > > > > I have been a bit busy these few weeks and sorry for responding
> > late.
> > > > > >
> > > > > > The original thinking of keeping discussion within one thread is
> > for
> > > > > easier
> > > > > > tracking and avoid for repeated discussion in different threads.
> > > > > >
> > > > > > For details, It might be good to start in different threads if
> > > needed.
> > > > > >
> > > > > > We will think of a way to better organize the discussion.
> > > > > >
> > > > > > Best
> > > > > > Yuan
> > > > > >
> > > > > >
> > > > > > On Thu, Mar 7, 2024 at 4:38 AM Jeyhun Karimov <
> > je.kari...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > + 1 for the suggestion.
> > > > > > > Maybe we can the discussion with the FLIPs with minimum
> > > dependencies
> > > > > > (from
> > > > > > > the other new/proposed FLIPs).
> > > > > > > Based on our discussion on a particular FLIP, the subsequent (or
> > > its
> > > > > > > dependent) FLIP(s) can be updated accordingly?
> > > > > > >
> > > > > > > Regards,
> > > > > > > Jeyhun
> > > > > > >
> > > > > > > On Wed, Mar 6, 2024 at 5:34 PM Gyula Fóra <gyula.f...@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > > > Hey all!
> > > > > > > >
> > > > > > > > This is a massive improvement / work. I just started going
> > > through
> > > > > the
> > > > > > > > Flips and have a more or less meta comment.
> > > > > > > >
> > > > > > > > While it's good to keep the overall architecture discussion
> > > here, I
> > > > > > think
> > > > > > > > we should still have separate discussions for each FLIP where
> > we
> > > > can
> > > > > > > > discuss interface details etc. With so much content if we start
> > > > > adding
> > > > > > > > minor comments here that will lead to nowhere but those
> > > discussions
> > > > > are
> > > > > > > > still important and we should have them in separate threads
> > (one
> > > > for
> > > > > > each
> > > > > > > > FLIP)
> > > > > > > >
> > > > > > > > What do you think?
> > > > > > > > Gyula
> > > > > > > >
> > > > > > > > On Wed, Mar 6, 2024 at 8:50 AM Yanfei Lei <fredia...@gmail.com
> > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi team,
> > > > > > > > >
> > > > > > > > > Thanks for your discussion. Regarding FLIP-425, we have
> > > > > supplemented
> > > > > > > > > several updates to answer high-frequency questions:
> > > > > > > > >
> > > > > > > > > 1. We captured a flame graph of the Hashmap state backend in
> > > > > > > > > "Synchronous execution with asynchronous APIs"[1], which
> > > reveals
> > > > > that
> > > > > > > > > the framework overhead (including reference counting,
> > > > > future-related
> > > > > > > > > code and so on) consumes about 9% of the keyed operator CPU
> > > time.
> > > > > > > > > 2. We added a set of comparative experiments for watermark
> > > > > > processing,
> > > > > > > > > the performance of Out-Of-Order mode is 70% better than
> > > > > > > > > strictly-ordered mode under ~140MB state size. Instructions
> > on
> > > > how
> > > > > to
> > > > > > > > > run this test have also been added[2].
> > > > > > > > > 3. Regarding the order of StreamRecord, whether it has state
> > > > access
> > > > > > or
> > > > > > > > > not. We supplemented a new *Strict order of
> > > 'processElement'*[3].
> > > > > > > > >
> > > > > > > > > [1]
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-SynchronousexecutionwithasynchronousAPIs
> > > > > > > > > [2]
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Strictly-orderedmodevs.Out-of-ordermodeforwatermarkprocessing
> > > > > > > > > [3]
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ElementOrder
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Best regards,
> > > > > > > > > Yanfei
> > > > > > > > >
> > > > > > > > > Yunfeng Zhou <flink.zhouyunf...@gmail.com> 于2024年3月5日周二
> > > 09:25写道:
> > > > > > > > > >
> > > > > > > > > > Hi Zakelly,
> > > > > > > > > >
> > > > > > > > > > > 5. I'm not very sure ... revisiting this later since it
> > is
> > > > not
> > > > > > > > > important.
> > > > > > > > > >
> > > > > > > > > > It seems that we still have some details to confirm about
> > > this
> > > > > > > > > > question. Let's postpone this to after the critical parts
> > of
> > > > the
> > > > > > > > > > design are settled.
> > > > > > > > > >
> > > > > > > > > > > 8. Yes, we had considered ... metrics should be like
> > > > > afterwards.
> > > > > > > > > >
> > > > > > > > > > Oh sorry I missed FLIP-431. I'm fine with discussing this
> > > topic
> > > > > in
> > > > > > > > > milestone 2.
> > > > > > > > > >
> > > > > > > > > > Looking forward to the detailed design about the strict
> > mode
> > > > > > between
> > > > > > > > > > same-key records and the benchmark results about the epoch
> > > > > > mechanism.
> > > > > > > > > >
> > > > > > > > > > Best regards,
> > > > > > > > > > Yunfeng
> > > > > > > > > >
> > > > > > > > > > On Mon, Mar 4, 2024 at 7:59 PM Zakelly Lan <
> > > > > zakelly....@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > Hi Yunfeng,
> > > > > > > > > > >
> > > > > > > > > > > For 1:
> > > > > > > > > > > I had a discussion with Lincoln Lee, and I realize it is
> > a
> > > > > common
> > > > > > > > case
> > > > > > > > > the same-key record should be blocked before the
> > > > `processElement`.
> > > > > It
> > > > > > > is
> > > > > > > > > easier for users to understand. Thus I will introduce a
> > strict
> > > > mode
> > > > > > for
> > > > > > > > > this and make it default. My rough idea is just like yours,
> > by
> > > > > > invoking
> > > > > > > > > some method of AEC instance before `processElement`. The
> > > detailed
> > > > > > > design
> > > > > > > > > will be described in FLIP later.
> > > > > > > > > > >
> > > > > > > > > > > For 2:
> > > > > > > > > > > I agree with you. We could throw exceptions for now and
> > > > > optimize
> > > > > > > this
> > > > > > > > > later.
> > > > > > > > > > >
> > > > > > > > > > > For 5:
> > > > > > > > > > >>
> > > > > > > > > > >> It might be better to move the default values to the
> > > > Proposed
> > > > > > > > Changes
> > > > > > > > > > >> section instead of making them public for now, as there
> > > will
> > > > > be
> > > > > > > > > > >> compatibility issues once we want to dynamically adjust
> > > the
> > > > > > > > thresholds
> > > > > > > > > > >> and timeouts in future.
> > > > > > > > > > >
> > > > > > > > > > > Agreed. The whole framework is under experiment until we
> > > > think
> > > > > it
> > > > > > > is
> > > > > > > > > complete in 2.0 or later. The default value should be better
> > > > > > determined
> > > > > > > > > with more testing results and production experience.
> > > > > > > > > > >
> > > > > > > > > > >> The configuration execution.async-state.enabled seems
> > > > > > unnecessary,
> > > > > > > > as
> > > > > > > > > > >> the infrastructure may automatically get this
> > information
> > > > from
> > > > > > the
> > > > > > > > > > >> detailed state backend configurations. We may revisit
> > this
> > > > > part
> > > > > > > > after
> > > > > > > > > > >> the core designs have reached an agreement. WDYT?
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > I'm not very sure if there is any use case where users
> > > write
> > > > > > their
> > > > > > > > > code using async APIs but run their job in a synchronous way.
> > > The
> > > > > > first
> > > > > > > > two
> > > > > > > > > scenarios that come to me are for benchmarking or for a small
> > > > > state,
> > > > > > > > while
> > > > > > > > > they don't want to rewrite their code. Actually it is easy to
> > > > > > support,
> > > > > > > so
> > > > > > > > > I'd suggest providing it. But I'm fine with revisiting this
> > > later
> > > > > > since
> > > > > > > > it
> > > > > > > > > is not important. WDYT?
> > > > > > > > > > >
> > > > > > > > > > > For 8:
> > > > > > > > > > > Yes, we had considered the I/O metrics group especially
> > the
> > > > > > > > > back-pressure, idle and task busy per second. In the current
> > > plan
> > > > > we
> > > > > > > can
> > > > > > > > do
> > > > > > > > > state access during back-pressure, meaning that those metrics
> > > for
> > > > > > input
> > > > > > > > > would better be redefined. I suggest we discuss these
> > existing
> > > > > > metrics
> > > > > > > as
> > > > > > > > > well as some new metrics that should be introduced in
> > FLIP-431
> > > > > later
> > > > > > in
> > > > > > > > > milestone 2, since we have basically finished the framework
> > > thus
> > > > we
> > > > > > > will
> > > > > > > > > have a better view of what metrics should be like afterwards.
> > > > WDYT?
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Best,
> > > > > > > > > > > Zakelly
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Mar 4, 2024 at 6:49 PM Yunfeng Zhou <
> > > > > > > > > flink.zhouyunf...@gmail.com> wrote:
> > > > > > > > > > >>
> > > > > > > > > > >> Hi Zakelly,
> > > > > > > > > > >>
> > > > > > > > > > >> Thanks for the responses!
> > > > > > > > > > >>
> > > > > > > > > > >> > 1. I will discuss this with some expert SQL
> > developers.
> > > > ...
> > > > > > mode
> > > > > > > > > for StreamRecord processing.
> > > > > > > > > > >>
> > > > > > > > > > >> In DataStream API there should also be use cases when
> > the
> > > > > order
> > > > > > of
> > > > > > > > > > >> output is strictly required. I agree with it that SQL
> > > > experts
> > > > > > may
> > > > > > > > help
> > > > > > > > > > >> provide more concrete use cases that can accelerate our
> > > > > > > discussion,
> > > > > > > > > > >> but please allow me to search for DataStream use cases
> > > that
> > > > > can
> > > > > > > > prove
> > > > > > > > > > >> the necessity of this strict order preservation mode, if
> > > > > answers
> > > > > > > > from
> > > > > > > > > > >> SQL experts are shown to be negative.
> > > > > > > > > > >>
> > > > > > > > > > >> For your convenience, my current rough idea is that we
> > can
> > > > > add a
> > > > > > > > > > >> module between the Input(s) and processElement() module
> > in
> > > > > Fig 2
> > > > > > > of
> > > > > > > > > > >> FLIP-425. The module will be responsible for caching
> > > records
> > > > > > whose
> > > > > > > > > > >> keys collide with in-flight records, and AEC will only
> > be
> > > > > > > > responsible
> > > > > > > > > > >> for handling async state calls, without knowing the
> > record
> > > > > each
> > > > > > > call
> > > > > > > > > > >> belongs to. We may revisit this topic once the necessity
> > > of
> > > > > the
> > > > > > > > strict
> > > > > > > > > > >> order mode is clarified.
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > > >> > 2. The amount of parallel StateRequests ... instead of
> > > > > > invoking
> > > > > > > > > yield
> > > > > > > > > > >>
> > > > > > > > > > >> Your suggestions generally appeal to me. I think we may
> > > let
> > > > > > > > > > >> corresponding Flink jobs fail with OOM for now, since
> > the
> > > > > > majority
> > > > > > > > of
> > > > > > > > > > >> a StateRequest should just be references to existing
> > Java
> > > > > > objects,
> > > > > > > > > > >> which only occupies very small memory space and can
> > hardly
> > > > > cause
> > > > > > > OOM
> > > > > > > > > > >> in common cases. We can monitor the pending
> > StateRequests
> > > > and
> > > > > if
> > > > > > > > there
> > > > > > > > > > >> is really a risk of OOM in extreme cases, we can throw
> > > > > > Exceptions
> > > > > > > > with
> > > > > > > > > > >> proper messages notifying users what to do, like
> > > increasing
> > > > > > memory
> > > > > > > > > > >> through configurations.
> > > > > > > > > > >>
> > > > > > > > > > >> Your suggestions to adjust threshold adaptively or to
> > use
> > > > the
> > > > > > > > blocking
> > > > > > > > > > >> buffer sounds good, and in my opinion we can postpone
> > them
> > > > to
> > > > > > > future
> > > > > > > > > > >> FLIPs since they seem to only benefit users in rare
> > cases.
> > > > > Given
> > > > > > > > that
> > > > > > > > > > >> FLIP-423~428 has already been a big enough design, it
> > > might
> > > > be
> > > > > > > > better
> > > > > > > > > > >> to focus on the most critical design for now and
> > postpone
> > > > > > > > > > >> optimizations like this. WDYT?
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > > >> > 5. Sure, we will introduce new configs as well as
> > their
> > > > > > default
> > > > > > > > > value.
> > > > > > > > > > >>
> > > > > > > > > > >> Thanks for adding the default values and the values
> > > > themselves
> > > > > > > LGTM.
> > > > > > > > > > >> It might be better to move the default values to the
> > > > Proposed
> > > > > > > > Changes
> > > > > > > > > > >> section instead of making them public for now, as there
> > > will
> > > > > be
> > > > > > > > > > >> compatibility issues once we want to dynamically adjust
> > > the
> > > > > > > > thresholds
> > > > > > > > > > >> and timeouts in future.
> > > > > > > > > > >>
> > > > > > > > > > >> The configuration execution.async-state.enabled seems
> > > > > > unnecessary,
> > > > > > > > as
> > > > > > > > > > >> the infrastructure may automatically get this
> > information
> > > > from
> > > > > > the
> > > > > > > > > > >> detailed state backend configurations. We may revisit
> > this
> > > > > part
> > > > > > > > after
> > > > > > > > > > >> the core designs have reached an agreement. WDYT?
> > > > > > > > > > >>
> > > > > > > > > > >>
> > > > > > > > > > >> Besides, inspired by Jeyhun's comments, it comes to me
> > > that
> > > > > > > > > > >>
> > > > > > > > > > >> 8. Should this FLIP introduce metrics that measure the
> > > time
> > > > a
> > > > > > > Flink
> > > > > > > > > > >> job is back-pressured by State IOs? Under the current
> > > > design,
> > > > > > this
> > > > > > > > > > >> metric could measure the time when the blocking buffer
> > is
> > > > full
> > > > > > and
> > > > > > > > > > >> yield() cannot get callbacks to process, which means the
> > > > > > operator
> > > > > > > is
> > > > > > > > > > >> fully waiting for state responses.
> > > > > > > > > > >>
> > > > > > > > > > >> Best regards,
> > > > > > > > > > >> Yunfeng
> > > > > > > > > > >>
> > > > > > > > > > >> On Mon, Mar 4, 2024 at 12:33 PM Zakelly Lan <
> > > > > > > zakelly....@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > > >> >
> > > > > > > > > > >> > Hi Yunfeng,
> > > > > > > > > > >> >
> > > > > > > > > > >> > Thanks for your detailed comments!
> > > > > > > > > > >> >
> > > > > > > > > > >> >> 1. Why do we need a close() method on StateIterator?
> > > This
> > > > > > > method
> > > > > > > > > seems
> > > > > > > > > > >> >> unused in the usage example codes.
> > > > > > > > > > >> >
> > > > > > > > > > >> >
> > > > > > > > > > >> > The `close()` is introduced to release internal
> > > resources,
> > > > > but
> > > > > > > it
> > > > > > > > > does not seem to require the user to call it. I removed this.
> > > > > > > > > > >> >
> > > > > > > > > > >> >> 2. In FutureUtils.combineAll()'s JavaDoc, it is
> > stated
> > > > that
> > > > > > "No
> > > > > > > > > null
> > > > > > > > > > >> >> entries are allowed". It might be better to further
> > > > explain
> > > > > > > what
> > > > > > > > > will
> > > > > > > > > > >> >> happen if a null value is passed, ignoring the value
> > in
> > > > the
> > > > > > > > > returned
> > > > > > > > > > >> >> Collection or throwing exceptions. Given that
> > > > > > > > > > >> >> FutureUtils.emptyFuture() can be returned in the
> > > example
> > > > > > code,
> > > > > > > I
> > > > > > > > > > >> >> suppose the former one might be correct.
> > > > > > > > > > >> >
> > > > > > > > > > >> >
> > > > > > > > > > >> > The statement "No null entries are allowed" refers to
> > > the
> > > > > > > > > parameters, it means some arrayList like [null, StateFuture1,
> > > > > > > > StateFuture2]
> > > > > > > > > passed in are not allowed, and an Exception will be thrown.
> > > > > > > > > > >> >
> > > > > > > > > > >> >> 1. According to Fig 2 of this FLIP, ... . This
> > > situation
> > > > > > should
> > > > > > > > be
> > > > > > > > > > >> >> avoided and the order of same-key records should be
> > > > > strictly
> > > > > > > > > > >> >> preserved.
> > > > > > > > > > >> >
> > > > > > > > > > >> >
> > > > > > > > > > >> > I will discuss this with some expert SQL developers.
> > And
> > > > if
> > > > > it
> > > > > > > is
> > > > > > > > > valid and common, I suggest a strict order preservation mode
> > > for
> > > > > > > > > StreamRecord processing. WDYT?
> > > > > > > > > > >> >
> > > > > > > > > > >> >> 2. The FLIP says that StateRequests submitted by
> > > > Callbacks
> > > > > > will
> > > > > > > > not
> > > > > > > > > > >> >> invoke further yield() methods. Given that yield() is
> > > > used
> > > > > > when
> > > > > > > > > there
> > > > > > > > > > >> >> is "too much" in-flight data, does it mean
> > > StateRequests
> > > > > > > > submitted
> > > > > > > > > by
> > > > > > > > > > >> >> Callbacks will never be "too much"? What if the total
> > > > > number
> > > > > > of
> > > > > > > > > > >> >> StateRequests exceed the capacity of Flink operator's
> > > > > memory
> > > > > > > > space?
> > > > > > > > > > >> >
> > > > > > > > > > >> >
> > > > > > > > > > >> > The amount of parallel StateRequests for one
> > > StreamRecord
> > > > > > cannot
> > > > > > > > be
> > > > > > > > > determined since the code is written by users. So the
> > in-flight
> > > > > > > requests
> > > > > > > > > may be "too much", and may cause OOM. Users should
> > re-configure
> > > > > their
> > > > > > > > job,
> > > > > > > > > controlling the amount of on-going StreamRecord. And I
> > suggest
> > > > two
> > > > > > ways
> > > > > > > > to
> > > > > > > > > avoid this:
> > > > > > > > > > >> >
> > > > > > > > > > >> > Adaptively adjust the count of on-going StreamRecord
> > > > > according
> > > > > > > to
> > > > > > > > > historical StateRequests amount.
> > > > > > > > > > >> > Also control the max StateRequests that can be
> > executed
> > > in
> > > > > > > > parallel
> > > > > > > > > for each StreamRecord, and if it exceeds, put the new
> > > > StateRequest
> > > > > in
> > > > > > > the
> > > > > > > > > blocking buffer waiting for execution (instead of invoking
> > > > > yield()).
> > > > > > > > > > >> >
> > > > > > > > > > >> > WDYT?
> > > > > > > > > > >> >
> > > > > > > > > > >> >
> > > > > > > > > > >> >> 3.1 I'm concerned that the out-of-order execution
> > mode,
> > > > > along
> > > > > > > > with
> > > > > > > > > the
> > > > > > > > > > >> >> epoch mechanism, would bring more complexity to the
> > > > > execution
> > > > > > > > model
> > > > > > > > > > >> >> than the performance improvement it promises. Could
> > we
> > > > add
> > > > > > some
> > > > > > > > > > >> >> benchmark results proving the benefit of this mode?
> > > > > > > > > > >> >
> > > > > > > > > > >> >
> > > > > > > > > > >> > Agreed, will do.
> > > > > > > > > > >> >
> > > > > > > > > > >> >> 3.2 The FLIP might need to add a public API section
> > > > > > describing
> > > > > > > > how
> > > > > > > > > > >> >> users or developers can switch between these two
> > > > execution
> > > > > > > modes.
> > > > > > > > > > >> >
> > > > > > > > > > >> >
> > > > > > > > > > >> > Good point. We will add a Public API section.
> > > > > > > > > > >> >
> > > > > > > > > > >> >> 3.3 Apart from the watermark and checkpoint mentioned
> > > in
> > > > > this
> > > > > > > > FLIP,
> > > > > > > > > > >> >> there are also more other events that might appear in
> > > the
> > > > > > > stream
> > > > > > > > of
> > > > > > > > > > >> >> data records. It might be better to generalize the
> > > > > execution
> > > > > > > mode
> > > > > > > > > > >> >> mechanism to handle all possible events.
> > > > > > > > > > >> >
> > > > > > > > > > >> >
> > > > > > > > > > >> > Yes, I missed this point. Thanks for the reminder.
> > > > > > > > > > >> >
> > > > > > > > > > >> >> 4. It might be better to treat callback-handling as a
> > > > > > > > > > >> >> MailboxDefaultAction, instead of Mails, to avoid the
> > > > > overhead
> > > > > > > of
> > > > > > > > > > >> >> repeatedly creating Mail objects.
> > > > > > > > > > >> >
> > > > > > > > > > >> >
> > > > > > > > > > >> >  I thought the intermediated wrapper for callback can
> > > not
> > > > be
> > > > > > > > > omitted, since there will be some context switch before each
> > > > > > execution.
> > > > > > > > The
> > > > > > > > > MailboxDefaultAction in most cases is processInput right?
> > While
> > > > the
> > > > > > > > > callback should be executed with higher priority. I'd suggest
> > > not
> > > > > > > > changing
> > > > > > > > > the basic logic of Mailbox and the default action since it is
> > > > very
> > > > > > > > critical
> > > > > > > > > for performance. But yes, we will try our best to avoid
> > > creating
> > > > > > > > > intermediated objects.
> > > > > > > > > > >> >
> > > > > > > > > > >> >> 5. Could this FLIP provide the current default values
> > > for
> > > > > > > things
> > > > > > > > > like
> > > > > > > > > > >> >> active buffer size thresholds and timeouts? These
> > could
> > > > > help
> > > > > > > with
> > > > > > > > > > >> >> memory consumption and latency analysis.
> > > > > > > > > > >> >
> > > > > > > > > > >> >
> > > > > > > > > > >> > Sure, we will introduce new configs as well as their
> > > > default
> > > > > > > > value.
> > > > > > > > > > >> >
> > > > > > > > > > >> >> 6. Why do we need to record the hashcode of a record
> > in
> > > > its
> > > > > > > > > > >> >> RecordContext? It seems not used.
> > > > > > > > > > >> >
> > > > > > > > > > >> >
> > > > > > > > > > >> > The context switch before each callback execution
> > > involves
> > > > > > > > > setCurrentKey, where the hashCode is re-calculated. We cache
> > it
> > > > for
> > > > > > > > > accelerating.
> > > > > > > > > > >> >
> > > > > > > > > > >> >> 7. In "timers can be stored on the JVM heap or
> > > RocksDB",
> > > > > the
> > > > > > > link
> > > > > > > > > > >> >> points to a document in flink-1.15. It might be
> > better
> > > to
> > > > > > > verify
> > > > > > > > > the
> > > > > > > > > > >> >> referenced content is still valid in the latest Flink
> > > and
> > > > > > > update
> > > > > > > > > the
> > > > > > > > > > >> >> link accordingly. Same for other references if any.
> > > > > > > > > > >> >
> > > > > > > > > > >> >
> > > > > > > > > > >> > Thanks for the reminder! Will check.
> > > > > > > > > > >> >
> > > > > > > > > > >> >
> > > > > > > > > > >> > Thanks a lot & Best,
> > > > > > > > > > >> > Zakelly
> > > > > > > > > > >> >
> > > > > > > > > > >> > On Sat, Mar 2, 2024 at 6:18 AM Jeyhun Karimov <
> > > > > > > > je.kari...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > > >> >>
> > > > > > > > > > >> >> Hi,
> > > > > > > > > > >> >>
> > > > > > > > > > >> >> Thanks for the great proposals. I have a few comments
> > > > > > comments:
> > > > > > > > > > >> >>
> > > > > > > > > > >> >> - Backpressure Handling. Flink's original
> > backpressure
> > > > > > handling
> > > > > > > > is
> > > > > > > > > quite
> > > > > > > > > > >> >> robust and the semantics is quite "simple" (simple is
> > > > > > > beautiful).
> > > > > > > > > > >> >> This mechanism has proven to perform better/robust
> > than
> > > > the
> > > > > > > other
> > > > > > > > > open
> > > > > > > > > > >> >> source streaming systems, where they were relying on
> > > some
> > > > > > > > loopback
> > > > > > > > > > >> >> information.
> > > > > > > > > > >> >> Now that the proposal also relies on loopback (yield
> > in
> > > > > this
> > > > > > > > > case), it is
> > > > > > > > > > >> >> not clear how well the new backpressure handling
> > > proposed
> > > > > in
> > > > > > > > > FLIP-425 is
> > > > > > > > > > >> >> robust and handle fluctuating workloads.
> > > > > > > > > > >> >>
> > > > > > > > > > >> >> - Watermark/Timer Handling: Similar arguments apply
> > for
> > > > > > > watermark
> > > > > > > > > and timer
> > > > > > > > > > >> >> handling. IMHO, we need more benchmarks showing the
> > > > > overhead
> > > > > > > > > > >> >> of epoch management with different parameters (e.g.,
> > > > window
> > > > > > > size,
> > > > > > > > > watermark
> > > > > > > > > > >> >> strategy, etc)
> > > > > > > > > > >> >>
> > > > > > > > > > >> >> - DFS consistency guarantees. The proposal in
> > FLIP-427
> > > is
> > > > > > > > > DFS-agnostic.
> > > > > > > > > > >> >> However, different cloud providers have different
> > > storage
> > > > > > > > > consistency
> > > > > > > > > > >> >> models.
> > > > > > > > > > >> >> How do we want to deal with them?
> > > > > > > > > > >> >>
> > > > > > > > > > >> >>  Regards,
> > > > > > > > > > >> >> Jeyhun
> > > > > > > > > > >> >>
> > > > > > > > > > >> >>
> > > > > > > > > > >> >>
> > > > > > > > > > >> >>
> > > > > > > > > > >> >> On Fri, Mar 1, 2024 at 6:08 PM Zakelly Lan <
> > > > > > > > zakelly....@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > > >> >>
> > > > > > > > > > >> >> > Thanks Piotr for sharing your thoughts!
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> > I guess it depends how we would like to treat the
> > > local
> > > > > > > disks.
> > > > > > > > > I've always
> > > > > > > > > > >> >> > > thought about them that almost always eventually
> > > all
> > > > > > state
> > > > > > > > > from the DFS
> > > > > > > > > > >> >> > > should end up cached in the local disks.
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> > OK I got it. In our proposal we treat local disk as
> > > an
> > > > > > > optional
> > > > > > > > > cache, so
> > > > > > > > > > >> >> > the basic design will handle the case with state
> > > > residing
> > > > > > in
> > > > > > > > DFS
> > > > > > > > > only. It
> > > > > > > > > > >> >> > is a more 'cloud-native' approach that does not
> > rely
> > > on
> > > > > any
> > > > > > > > > local storage
> > > > > > > > > > >> >> > assumptions, which allow users to dynamically
> > adjust
> > > > the
> > > > > > > > > capacity or I/O
> > > > > > > > > > >> >> > bound of remote storage to gain performance or save
> > > the
> > > > > > cost,
> > > > > > > > > even without
> > > > > > > > > > >> >> > a job restart.
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> > In
> > > > > > > > > > >> >> > > the currently proposed more fine grained
> > solution,
> > > > you
> > > > > > > make a
> > > > > > > > > single
> > > > > > > > > > >> >> > > request to DFS per each state access.
> > > > > > > > > > >> >> > >
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> > Ah that's not accurate. Actually we buffer the
> > state
> > > > > > requests
> > > > > > > > > and process
> > > > > > > > > > >> >> > them in batch, multiple requests will correspond to
> > > one
> > > > > DFS
> > > > > > > > > access (One
> > > > > > > > > > >> >> > block access for multiple keys performed by
> > RocksDB).
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> > In that benchmark you mentioned, are you requesting
> > > the
> > > > > > state
> > > > > > > > > > >> >> > > asynchronously from local disks into memory? If
> > the
> > > > > > benefit
> > > > > > > > > comes from
> > > > > > > > > > >> >> > > parallel I/O, then I would expect the benefit to
> > > > > > > > > disappear/shrink when
> > > > > > > > > > >> >> > > running multiple subtasks on the same machine, as
> > > > they
> > > > > > > would
> > > > > > > > > be making
> > > > > > > > > > >> >> > > their own parallel requests, right? Also enabling
> > > > > > > > > checkpointing would
> > > > > > > > > > >> >> > > further cut into the available I/O budget.
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> > That's an interesting topic. Our proposal is
> > > > specifically
> > > > > > > aimed
> > > > > > > > > at the
> > > > > > > > > > >> >> > scenario where the machine I/O is not fully loaded
> > > but
> > > > > the
> > > > > > > I/O
> > > > > > > > > latency has
> > > > > > > > > > >> >> > indeed become a bottleneck for each subtask. While
> > > the
> > > > > > > > > distributed file
> > > > > > > > > > >> >> > system is a prime example of a scenario
> > characterized
> > > > by
> > > > > > > > > abundant and
> > > > > > > > > > >> >> > easily scalable I/O bandwidth coupled with higher
> > I/O
> > > > > > > latency.
> > > > > > > > > You may
> > > > > > > > > > >> >> > expect to increase the parallelism of a job to
> > > enhance
> > > > > the
> > > > > > > > > performance as
> > > > > > > > > > >> >> > well, but that also brings in more waste of CPU's
> > and
> > > > > > memory
> > > > > > > > for
> > > > > > > > > building
> > > > > > > > > > >> >> > up more subtasks. This is one drawback for the
> > > > > > > > > computation-storage tightly
> > > > > > > > > > >> >> > coupled nodes. While in our proposal, the parallel
> > > I/O
> > > > > with
> > > > > > > all
> > > > > > > > > the
> > > > > > > > > > >> >> > callbacks still running in one task, pre-allocated
> > > > > > > > computational
> > > > > > > > > resources
> > > > > > > > > > >> >> > are better utilized. It is a much more lightweight
> > > way
> > > > to
> > > > > > > > > perform parallel
> > > > > > > > > > >> >> > I/O.
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> > Just with what granularity those async requests
> > > should
> > > > be
> > > > > > > made.
> > > > > > > > > > >> >> > > Making state access asynchronous is definitely
> > the
> > > > > right
> > > > > > > way
> > > > > > > > > to go!
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> > I think the current proposal is based on such core
> > > > ideas:
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> >    - A pure cloud-native disaggregated state.
> > > > > > > > > > >> >> >    - Fully utilize the given resources and try not
> > to
> > > > > waste
> > > > > > > > them
> > > > > > > > > (including
> > > > > > > > > > >> >> >    I/O).
> > > > > > > > > > >> >> >    - The ability to scale isolated resources (I/O
> > or
> > > > CPU
> > > > > or
> > > > > > > > > memory)
> > > > > > > > > > >> >> >    independently.
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> > We think a fine-grained granularity is more inline
> > > with
> > > > > > those
> > > > > > > > > ideas,
> > > > > > > > > > >> >> > especially without local disk assumptions and
> > without
> > > > any
> > > > > > > waste
> > > > > > > > > of I/O by
> > > > > > > > > > >> >> > prefetching. Please note that it is not a
> > replacement
> > > > of
> > > > > > the
> > > > > > > > > original local
> > > > > > > > > > >> >> > state with synchronous execution. Instead this is a
> > > > > > solution
> > > > > > > > > embracing the
> > > > > > > > > > >> >> > cloud-native era, providing much more scalability
> > and
> > > > > > > resource
> > > > > > > > > efficiency
> > > > > > > > > > >> >> > when handling a *huge state*.
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> > What also worries me a lot in this fine grained
> > model
> > > > is
> > > > > > the
> > > > > > > > > effect on the
> > > > > > > > > > >> >> > > checkpointing times.
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> > Your concerns are very reasonable. Faster
> > > checkpointing
> > > > > is
> > > > > > > > > always a core
> > > > > > > > > > >> >> > advantage of disaggregated state, but only for the
> > > > async
> > > > > > > phase.
> > > > > > > > > There will
> > > > > > > > > > >> >> > be some complexity introduced by in-flight
> > requests,
> > > > but
> > > > > > I'd
> > > > > > > > > suggest a
> > > > > > > > > > >> >> > checkpoint containing those in-flight state
> > requests
> > > as
> > > > > > part
> > > > > > > of
> > > > > > > > > the state,
> > > > > > > > > > >> >> > to accelerate the sync phase by skipping the buffer
> > > > > > draining.
> > > > > > > > > This makes
> > > > > > > > > > >> >> > the buffer size have little impact on checkpoint
> > > time.
> > > > > And
> > > > > > > all
> > > > > > > > > the changes
> > > > > > > > > > >> >> > keep within the execution model we proposed while
> > the
> > > > > > > > checkpoint
> > > > > > > > > barrier
> > > > > > > > > > >> >> > alignment or handling will not be touched in our
> > > > > proposal,
> > > > > > > so I
> > > > > > > > > guess
> > > > > > > > > > >> >> > the complexity is relatively controllable. I have
> > > faith
> > > > > in
> > > > > > > that
> > > > > > > > > :)
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> > Also regarding the overheads, it would be great if
> > > you
> > > > > > could
> > > > > > > > > provide
> > > > > > > > > > >> >> > > profiling results of the benchmarks that you
> > > > conducted
> > > > > to
> > > > > > > > > verify the
> > > > > > > > > > >> >> > > results. Or maybe if you could describe the steps
> > > to
> > > > > > > > reproduce
> > > > > > > > > the
> > > > > > > > > > >> >> > results?
> > > > > > > > > > >> >> > > Especially "Hashmap (sync)" vs "Hashmap with
> > async
> > > > > API".
> > > > > > > > > > >> >> > >
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> > Yes we could profile the benchmarks. And for the
> > > > > comparison
> > > > > > > of
> > > > > > > > > "Hashmap
> > > > > > > > > > >> >> > (sync)" vs "Hashmap with async API", we conduct a
> > > > > Wordcount
> > > > > > > job
> > > > > > > > > written
> > > > > > > > > > >> >> > with async APIs but disabling the async execution
> > by
> > > > > > directly
> > > > > > > > > completing
> > > > > > > > > > >> >> > the future using sync state access. This evaluates
> > > the
> > > > > > > overhead
> > > > > > > > > of newly
> > > > > > > > > > >> >> > introduced modules like 'AEC' in sync execution
> > (even
> > > > > > though
> > > > > > > > > they are not
> > > > > > > > > > >> >> > designed for it). The code will be provided later.
> > > For
> > > > > > other
> > > > > > > > > results of our
> > > > > > > > > > >> >> > PoC[1], you can follow the instructions here[2] to
> > > > > > reproduce.
> > > > > > > > > Since the
> > > > > > > > > > >> >> > compilation may take some effort, we will directly
> > > > > provide
> > > > > > > the
> > > > > > > > > jar for
> > > > > > > > > > >> >> > testing next week.
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> > And @Yunfeng Zhou, I have noticed your mail but it
> > > is a
> > > > > bit
> > > > > > > > late
> > > > > > > > > in my
> > > > > > > > > > >> >> > local time and the next few days are weekends. So I
> > > > will
> > > > > > > reply
> > > > > > > > > to you
> > > > > > > > > > >> >> > later. Thanks for your response!
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> > [1]
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-PoCResults
> > > > > > > > > > >> >> > [2]
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-Appendix:HowtorunthePoC
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> > Best,
> > > > > > > > > > >> >> > Zakelly
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> > On Fri, Mar 1, 2024 at 6:38 PM Yunfeng Zhou <
> > > > > > > > > flink.zhouyunf...@gmail.com>
> > > > > > > > > > >> >> > wrote:
> > > > > > > > > > >> >> >
> > > > > > > > > > >> >> > > Hi,
> > > > > > > > > > >> >> > >
> > > > > > > > > > >> >> > > Thanks for proposing this design! I just read
> > > > FLIP-424
> > > > > > and
> > > > > > > > > FLIP-425
> > > > > > > > > > >> >> > > and have some questions about the proposed
> > changes.
> > > > > > > > > > >> >> > >
> > > > > > > > > > >> >> > > For Async API (FLIP-424)
> > > > > > > > > > >> >> > >
> > > > > > > > > > >> >> > > 1. Why do we need a close() method on
> > > StateIterator?
> > > > > This
> > > > > > > > > method seems
> > > > > > > > > > >> >> > > unused in the usage example codes.
> > > > > > > > > > >> >> > >
> > > > > > > > > > >> >> > > 2. In FutureUtils.combineAll()'s JavaDoc, it is
> > > > stated
> > > > > > that
> > > > > > > > > "No null
> > > > > > > > > > >> >> > > entries are allowed". It might be better to
> > further
> > > > > > explain
> > > > > > > > > what will
> > > > > > > > > > >> >> > > happen if a null value is passed, ignoring the
> > > value
> > > > in
> > > > > > the
> > > > > > > > > returned
> > > > > > > > > > >> >> > > Collection or throwing exceptions. Given that
> > > > > > > > > > >> >> > > FutureUtils.emptyFuture() can be returned in the
> > > > > example
> > > > > > > > code,
> > > > > > > > > I
> > > > > > > > > > >> >> > > suppose the former one might be correct.
> > > > > > > > > > >> >> > >
> > > > > > > > > > >> >> > >
> > > > > > > > > > >> >> > > For Async Execution (FLIP-425)
> > > > > > > > > > >> >> > >
> > > > > > > > > > >> >> > > 1. According to Fig 2 of this FLIP, if a recordB
> > > has
> > > > > its
> > > > > > > key
> > > > > > > > > collide
> > > > > > > > > > >> >> > > with an ongoing recordA, its processElement()
> > > method
> > > > > can
> > > > > > > > still
> > > > > > > > > be
> > > > > > > > > > >> >> > > triggered immediately, and then it might be moved
> > > to
> > > > > the
> > > > > > > > > blocking
> > > > > > > > > > >> >> > > buffer in AEC if it involves state operations.
> > This
> > > > > means
> > > > > > > > that
> > > > > > > > > > >> >> > > recordB's output will precede recordA's output in
> > > > > > > downstream
> > > > > > > > > > >> >> > > operators, if recordA involves state operations
> > > while
> > > > > > > recordB
> > > > > > > > > does
> > > > > > > > > > >> >> > > not. This will harm the correctness of Flink jobs
> > > in
> > > > > some
> > > > > > > use
> > > > > > > > > cases.
> > > > > > > > > > >> >> > > For example, in dim table join cases, recordA
> > could
> > > > be
> > > > > a
> > > > > > > > delete
> > > > > > > > > > >> >> > > operation that involves state access, while
> > recordB
> > > > > could
> > > > > > > be
> > > > > > > > > an insert
> > > > > > > > > > >> >> > > operation that needs to visit external storage
> > > > without
> > > > > > > state
> > > > > > > > > access.
> > > > > > > > > > >> >> > > If recordB's output precedes recordA's, then an
> > > entry
> > > > > > that
> > > > > > > is
> > > > > > > > > supposed
> > > > > > > > > > >> >> > > to finally exist with recordB's value in the sink
> > > > table
> > > > > > > might
> > > > > > > > > actually
> > > > > > > > > > >> >> > > be deleted according to recordA's command. This
> > > > > situation
> > > > > > > > > should be
> > > > > > > > > > >> >> > > avoided and the order of same-key records should
> > be
> > > > > > > strictly
> > > > > > > > > > >> >> > > preserved.
> > > > > > > > > > >> >> > >
> > > > > > > > > > >> >> > > 2. The FLIP says that StateRequests submitted by
> > > > > > Callbacks
> > > > > > > > > will not
> > > > > > > > > > >> >> > > invoke further yield() methods. Given that
> > yield()
> > > is
> > > > > > used
> > > > > > > > > when there
> > > > > > > > > > >> >> > > is "too much" in-flight data, does it mean
> > > > > StateRequests
> > > > > > > > > submitted by
> > > > > > > > > > >> >> > > Callbacks will never be "too much"? What if the
> > > total
> > > > > > > number
> > > > > > > > of
> > > > > > > > > > >> >> > > StateRequests exceed the capacity of Flink
> > > operator's
> > > > > > > memory
> > > > > > > > > space?
> > > > > > > > > > >> >> > >
> > > > > > > > > > >> >> > > 3. In the "Watermark" section, this FLIP provided
> > > an
> > > > > > > > > out-of-order
> > > > > > > > > > >> >> > > execution mode apart from the default
> > > > strictly-ordered
> > > > > > > mode,
> > > > > > > > > which can
> > > > > > > > > > >> >> > > optimize performance by allowing more concurrent
> > > > > > > executions.
> > > > > > > > > > >> >> > >
> > > > > > > > > > >> >> > > 3.1 I'm concerned that the out-of-order execution
> > > > mode,
> > > > > > > along
> > > > > > > > > with the
> > > > > > > > > > >> >> > > epoch mechanism, would bring more complexity to
> > the
> > > > > > > execution
> > > > > > > > > model
> > > > > > > > > > >> >> > > than the performance improvement it promises.
> > Could
> > > > we
> > > > > > add
> > > > > > > > some
> > > > > > > > > > >> >> > > benchmark results proving the benefit of this
> > mode?
> > > > > > > > > > >> >> > >
> > > > > > > > > > >> >> > > 3.2 The FLIP might need to add a public API
> > section
> > > > > > > > describing
> > > > > > > > > how
> > > > > > > > > > >> >> > > users or developers can switch between these two
> > > > > > execution
> > > > > > > > > modes.
> > > > > > > > > > >> >> > >
> > > > > > > > > > >> >> > > 3.3 Apart from the watermark and checkpoint
> > > mentioned
> > > > > in
> > > > > > > this
> > > > > > > > > FLIP,
> > > > > > > > > > >> >> > > there are also more other events that might
> > appear
> > > in
> > > > > the
> > > > > > > > > stream of
> > > > > > > > > > >> >> > > data records. It might be better to generalize
> > the
> > > > > > > execution
> > > > > > > > > mode
> > > > > > > > > > >> >> > > mechanism to handle all possible events.
> > > > > > > > > > >> >> > >
> > > > > > > > > > >> >> > > 4. It might be better to treat callback-handling
> > > as a
> > > > > > > > > > >> >> > > MailboxDefaultAction, instead of Mails, to avoid
> > > the
> > > > > > > overhead
> > > > > > > > > of
> > > > > > > > > > >> >> > > repeatedly creating Mail objects.
> > > > > > > > > > >> >> > >
> > > > > > > > > > >> >> > > 5. Could this FLIP provide the current default
> > > values
> > > > > for
> > > > > > > > > things like
> > > > > > > > > > >> >> > > active buffer size thresholds and timeouts? These
> > > > could
> > > > > > > help
> > > > > > > > > with
> > > > > > > > > > >> >> > > memory consumption and latency analysis.
> > > > > > > > > > >> >> > >
> > > > > > > > > > >> >> > > 6. Why do we need to record the hashcode of a
> > > record
> > > > in
> > > > > > its
> > > > > > > > > > >> >> > > RecordContext? It seems not used.
> > > > > > > > > > >> >> > >
> > > > > > > > > > >> >> > > 7. In "timers can be stored on the JVM heap or
> > > > > RocksDB",
> > > > > > > the
> > > > > > > > > link
> > > > > > > > > > >> >> > > points to a document in flink-1.15. It might be
> > > > better
> > > > > to
> > > > > > > > > verify the
> > > > > > > > > > >> >> > > referenced content is still valid in the latest
> > > Flink
> > > > > and
> > > > > > > > > update the
> > > > > > > > > > >> >> > > link accordingly. Same for other references if
> > any.
> > > > > > > > > > >> >> > >
> > > > > > > > > > >> >> > > Best,
> > > > > > > > > > >> >> > > Yunfeng Zhou
> > > > > > > > > > >> >> > >
> > > > > > > > > > >> >> > > On Thu, Feb 29, 2024 at 2:17 PM Yuan Mei <
> > > > > > > > > yuanmei.w...@gmail.com> wrote:
> > > > > > > > > > >> >> > > >
> > > > > > > > > > >> >> > > > Hi Devs,
> > > > > > > > > > >> >> > > >
> > > > > > > > > > >> >> > > > This is a joint work of Yuan Mei, Zakelly Lan,
> > > > > Jinzhong
> > > > > > > Li,
> > > > > > > > > Hangxiang
> > > > > > > > > > >> >> > Yu,
> > > > > > > > > > >> >> > > > Yanfei Lei and Feng Wang. We'd like to start a
> > > > > > discussion
> > > > > > > > > about
> > > > > > > > > > >> >> > > introducing
> > > > > > > > > > >> >> > > > Disaggregated State Storage and Management in
> > > Flink
> > > > > > 2.0.
> > > > > > > > > > >> >> > > >
> > > > > > > > > > >> >> > > > The past decade has witnessed a dramatic shift
> > in
> > > > > > Flink's
> > > > > > > > > deployment
> > > > > > > > > > >> >> > > mode,
> > > > > > > > > > >> >> > > > workload patterns, and hardware improvements.
> > > We've
> > > > > > moved
> > > > > > > > > from the
> > > > > > > > > > >> >> > > > map-reduce era where workers are
> > > > computation-storage
> > > > > > > > tightly
> > > > > > > > > coupled
> > > > > > > > > > >> >> > > nodes
> > > > > > > > > > >> >> > > > to a cloud-native world where containerized
> > > > > deployments
> > > > > > > on
> > > > > > > > > Kubernetes
> > > > > > > > > > >> >> > > > become standard. To enable Flink's Cloud-Native
> > > > > future,
> > > > > > > we
> > > > > > > > > introduce
> > > > > > > > > > >> >> > > > Disaggregated State Storage and Management that
> > > > uses
> > > > > > DFS
> > > > > > > as
> > > > > > > > > primary
> > > > > > > > > > >> >> > > storage
> > > > > > > > > > >> >> > > > in Flink 2.0, as promised in the Flink 2.0
> > > Roadmap.
> > > > > > > > > > >> >> > > >
> > > > > > > > > > >> >> > > > Design Details can be found in FLIP-423[1].
> > > > > > > > > > >> >> > > >
> > > > > > > > > > >> >> > > > This new architecture is aimed to solve the
> > > > following
> > > > > > > > > challenges
> > > > > > > > > > >> >> > brought
> > > > > > > > > > >> >> > > in
> > > > > > > > > > >> >> > > > the cloud-native era for Flink.
> > > > > > > > > > >> >> > > > 1. Local Disk Constraints in containerization
> > > > > > > > > > >> >> > > > 2. Spiky Resource Usage caused by compaction in
> > > the
> > > > > > > current
> > > > > > > > > state model
> > > > > > > > > > >> >> > > > 3. Fast Rescaling for jobs with large states
> > > > > (hundreds
> > > > > > of
> > > > > > > > > Terabytes)
> > > > > > > > > > >> >> > > > 4. Light and Fast Checkpoint in a native way
> > > > > > > > > > >> >> > > >
> > > > > > > > > > >> >> > > > More specifically, we want to reach a consensus
> > > on
> > > > > the
> > > > > > > > > following issues
> > > > > > > > > > >> >> > > in
> > > > > > > > > > >> >> > > > this discussion:
> > > > > > > > > > >> >> > > >
> > > > > > > > > > >> >> > > > 1. Overall design
> > > > > > > > > > >> >> > > > 2. Proposed Changes
> > > > > > > > > > >> >> > > > 3. Design details to achieve Milestone1
> > > > > > > > > > >> >> > > >
> > > > > > > > > > >> >> > > > In M1, we aim to achieve an end-to-end baseline
> > > > > version
> > > > > > > > > using DFS as
> > > > > > > > > > >> >> > > > primary storage and complete core
> > > functionalities,
> > > > > > > > including:
> > > > > > > > > > >> >> > > >
> > > > > > > > > > >> >> > > > - Asynchronous State APIs (FLIP-424)[2]:
> > > Introduce
> > > > > new
> > > > > > > APIs
> > > > > > > > > for
> > > > > > > > > > >> >> > > > asynchronous state access.
> > > > > > > > > > >> >> > > > - Asynchronous Execution Model (FLIP-425)[3]:
> > > > > > Implement a
> > > > > > > > > non-blocking
> > > > > > > > > > >> >> > > > execution model leveraging the asynchronous
> > APIs
> > > > > > > introduced
> > > > > > > > > in
> > > > > > > > > > >> >> > FLIP-424.
> > > > > > > > > > >> >> > > > - Grouping Remote State Access (FLIP-426)[4]:
> > > > Enable
> > > > > > > > > retrieval of
> > > > > > > > > > >> >> > remote
> > > > > > > > > > >> >> > > > state data in batches to avoid unnecessary
> > > > round-trip
> > > > > > > costs
> > > > > > > > > for remote
> > > > > > > > > > >> >> > > > access
> > > > > > > > > > >> >> > > > - Disaggregated State Store (FLIP-427)[5]:
> > > > Introduce
> > > > > > the
> > > > > > > > > initial
> > > > > > > > > > >> >> > version
> > > > > > > > > > >> >> > > of
> > > > > > > > > > >> >> > > > the ForSt disaggregated state store.
> > > > > > > > > > >> >> > > > - Fault Tolerance/Rescale Integration
> > > > (FLIP-428)[6]:
> > > > > > > > > Integrate
> > > > > > > > > > >> >> > > > checkpointing mechanisms with the disaggregated
> > > > state
> > > > > > > store
> > > > > > > > > for fault
> > > > > > > > > > >> >> > > > tolerance and fast rescaling.
> > > > > > > > > > >> >> > > >
> > > > > > > > > > >> >> > > > We will vote on each FLIP in separate threads
> > to
> > > > make
> > > > > > > sure
> > > > > > > > > each FLIP
> > > > > > > > > > >> >> > > > reaches a consensus. But we want to keep the
> > > > > discussion
> > > > > > > > > within a
> > > > > > > > > > >> >> > focused
> > > > > > > > > > >> >> > > > thread (this thread) for easier tracking of
> > > > contexts
> > > > > to
> > > > > > > > avoid
> > > > > > > > > > >> >> > duplicated
> > > > > > > > > > >> >> > > > questions/discussions and also to think of the
> > > > > > > > > problem/solution in a
> > > > > > > > > > >> >> > full
> > > > > > > > > > >> >> > > > picture.
> > > > > > > > > > >> >> > > >
> > > > > > > > > > >> >> > > > Looking forward to your feedback
> > > > > > > > > > >> >> > > >
> > > > > > > > > > >> >> > > > Best,
> > > > > > > > > > >> >> > > > Yuan, Zakelly, Jinzhong, Hangxiang, Yanfei and
> > > Feng
> > > > > > > > > > >> >> > > >
> > > > > > > > > > >> >> > > > [1]
> > https://cwiki.apache.org/confluence/x/R4p3EQ
> > > > > > > > > > >> >> > > > [2]
> > https://cwiki.apache.org/confluence/x/SYp3EQ
> > > > > > > > > > >> >> > > > [3]
> > https://cwiki.apache.org/confluence/x/S4p3EQ
> > > > > > > > > > >> >> > > > [4]
> > https://cwiki.apache.org/confluence/x/TYp3EQ
> > > > > > > > > > >> >> > > > [5]
> > https://cwiki.apache.org/confluence/x/T4p3EQ
> > > > > > > > > > >> >> > > > [6]
> > https://cwiki.apache.org/confluence/x/UYp3EQ
> > > > > > > > > > >> >> > >
> > > > > > > > > > >> >> >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >

Reply via email to