Hi,

+ 1 for the suggestion.
Maybe we can the discussion with the FLIPs with minimum dependencies (from
the other new/proposed FLIPs).
Based on our discussion on a particular FLIP, the subsequent (or its
dependent) FLIP(s) can be updated accordingly?

Regards,
Jeyhun

On Wed, Mar 6, 2024 at 5:34 PM Gyula Fóra <gyula.f...@gmail.com> wrote:

> Hey all!
>
> This is a massive improvement / work. I just started going through the
> Flips and have a more or less meta comment.
>
> While it's good to keep the overall architecture discussion here, I think
> we should still have separate discussions for each FLIP where we can
> discuss interface details etc. With so much content if we start adding
> minor comments here that will lead to nowhere but those discussions are
> still important and we should have them in separate threads (one for each
> FLIP)
>
> What do you think?
> Gyula
>
> On Wed, Mar 6, 2024 at 8:50 AM Yanfei Lei <fredia...@gmail.com> wrote:
>
> > Hi team,
> >
> > Thanks for your discussion. Regarding FLIP-425, we have supplemented
> > several updates to answer high-frequency questions:
> >
> > 1. We captured a flame graph of the Hashmap state backend in
> > "Synchronous execution with asynchronous APIs"[1], which reveals that
> > the framework overhead (including reference counting, future-related
> > code and so on) consumes about 9% of the keyed operator CPU time.
> > 2. We added a set of comparative experiments for watermark processing,
> > the performance of Out-Of-Order mode is 70% better than
> > strictly-ordered mode under ~140MB state size. Instructions on how to
> > run this test have also been added[2].
> > 3. Regarding the order of StreamRecord, whether it has state access or
> > not. We supplemented a new *Strict order of 'processElement'*[3].
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-SynchronousexecutionwithasynchronousAPIs
> > [2]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-Strictly-orderedmodevs.Out-of-ordermodeforwatermarkprocessing
> > [3]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-425%3A+Asynchronous+Execution+Model#FLIP425:AsynchronousExecutionModel-ElementOrder
> >
> >
> > Best regards,
> > Yanfei
> >
> > Yunfeng Zhou <flink.zhouyunf...@gmail.com> 于2024年3月5日周二 09:25写道:
> > >
> > > Hi Zakelly,
> > >
> > > > 5. I'm not very sure ... revisiting this later since it is not
> > important.
> > >
> > > It seems that we still have some details to confirm about this
> > > question. Let's postpone this to after the critical parts of the
> > > design are settled.
> > >
> > > > 8. Yes, we had considered ... metrics should be like afterwards.
> > >
> > > Oh sorry I missed FLIP-431. I'm fine with discussing this topic in
> > milestone 2.
> > >
> > > Looking forward to the detailed design about the strict mode between
> > > same-key records and the benchmark results about the epoch mechanism.
> > >
> > > Best regards,
> > > Yunfeng
> > >
> > > On Mon, Mar 4, 2024 at 7:59 PM Zakelly Lan <zakelly....@gmail.com>
> > wrote:
> > > >
> > > > Hi Yunfeng,
> > > >
> > > > For 1:
> > > > I had a discussion with Lincoln Lee, and I realize it is a common
> case
> > the same-key record should be blocked before the `processElement`. It is
> > easier for users to understand. Thus I will introduce a strict mode for
> > this and make it default. My rough idea is just like yours, by invoking
> > some method of AEC instance before `processElement`. The detailed design
> > will be described in FLIP later.
> > > >
> > > > For 2:
> > > > I agree with you. We could throw exceptions for now and optimize this
> > later.
> > > >
> > > > For 5:
> > > >>
> > > >> It might be better to move the default values to the Proposed
> Changes
> > > >> section instead of making them public for now, as there will be
> > > >> compatibility issues once we want to dynamically adjust the
> thresholds
> > > >> and timeouts in future.
> > > >
> > > > Agreed. The whole framework is under experiment until we think it is
> > complete in 2.0 or later. The default value should be better determined
> > with more testing results and production experience.
> > > >
> > > >> The configuration execution.async-state.enabled seems unnecessary,
> as
> > > >> the infrastructure may automatically get this information from the
> > > >> detailed state backend configurations. We may revisit this part
> after
> > > >> the core designs have reached an agreement. WDYT?
> > > >
> > > >
> > > > I'm not very sure if there is any use case where users write their
> > code using async APIs but run their job in a synchronous way. The first
> two
> > scenarios that come to me are for benchmarking or for a small state,
> while
> > they don't want to rewrite their code. Actually it is easy to support, so
> > I'd suggest providing it. But I'm fine with revisiting this later since
> it
> > is not important. WDYT?
> > > >
> > > > For 8:
> > > > Yes, we had considered the I/O metrics group especially the
> > back-pressure, idle and task busy per second. In the current plan we can
> do
> > state access during back-pressure, meaning that those metrics for input
> > would better be redefined. I suggest we discuss these existing metrics as
> > well as some new metrics that should be introduced in FLIP-431 later in
> > milestone 2, since we have basically finished the framework thus we will
> > have a better view of what metrics should be like afterwards. WDYT?
> > > >
> > > >
> > > > Best,
> > > > Zakelly
> > > >
> > > > On Mon, Mar 4, 2024 at 6:49 PM Yunfeng Zhou <
> > flink.zhouyunf...@gmail.com> wrote:
> > > >>
> > > >> Hi Zakelly,
> > > >>
> > > >> Thanks for the responses!
> > > >>
> > > >> > 1. I will discuss this with some expert SQL developers. ... mode
> > for StreamRecord processing.
> > > >>
> > > >> In DataStream API there should also be use cases when the order of
> > > >> output is strictly required. I agree with it that SQL experts may
> help
> > > >> provide more concrete use cases that can accelerate our discussion,
> > > >> but please allow me to search for DataStream use cases that can
> prove
> > > >> the necessity of this strict order preservation mode, if answers
> from
> > > >> SQL experts are shown to be negative.
> > > >>
> > > >> For your convenience, my current rough idea is that we can add a
> > > >> module between the Input(s) and processElement() module in Fig 2 of
> > > >> FLIP-425. The module will be responsible for caching records whose
> > > >> keys collide with in-flight records, and AEC will only be
> responsible
> > > >> for handling async state calls, without knowing the record each call
> > > >> belongs to. We may revisit this topic once the necessity of the
> strict
> > > >> order mode is clarified.
> > > >>
> > > >>
> > > >> > 2. The amount of parallel StateRequests ... instead of invoking
> > yield
> > > >>
> > > >> Your suggestions generally appeal to me. I think we may let
> > > >> corresponding Flink jobs fail with OOM for now, since the majority
> of
> > > >> a StateRequest should just be references to existing Java objects,
> > > >> which only occupies very small memory space and can hardly cause OOM
> > > >> in common cases. We can monitor the pending StateRequests and if
> there
> > > >> is really a risk of OOM in extreme cases, we can throw Exceptions
> with
> > > >> proper messages notifying users what to do, like increasing memory
> > > >> through configurations.
> > > >>
> > > >> Your suggestions to adjust threshold adaptively or to use the
> blocking
> > > >> buffer sounds good, and in my opinion we can postpone them to future
> > > >> FLIPs since they seem to only benefit users in rare cases. Given
> that
> > > >> FLIP-423~428 has already been a big enough design, it might be
> better
> > > >> to focus on the most critical design for now and postpone
> > > >> optimizations like this. WDYT?
> > > >>
> > > >>
> > > >> > 5. Sure, we will introduce new configs as well as their default
> > value.
> > > >>
> > > >> Thanks for adding the default values and the values themselves LGTM.
> > > >> It might be better to move the default values to the Proposed
> Changes
> > > >> section instead of making them public for now, as there will be
> > > >> compatibility issues once we want to dynamically adjust the
> thresholds
> > > >> and timeouts in future.
> > > >>
> > > >> The configuration execution.async-state.enabled seems unnecessary,
> as
> > > >> the infrastructure may automatically get this information from the
> > > >> detailed state backend configurations. We may revisit this part
> after
> > > >> the core designs have reached an agreement. WDYT?
> > > >>
> > > >>
> > > >> Besides, inspired by Jeyhun's comments, it comes to me that
> > > >>
> > > >> 8. Should this FLIP introduce metrics that measure the time a Flink
> > > >> job is back-pressured by State IOs? Under the current design, this
> > > >> metric could measure the time when the blocking buffer is full and
> > > >> yield() cannot get callbacks to process, which means the operator is
> > > >> fully waiting for state responses.
> > > >>
> > > >> Best regards,
> > > >> Yunfeng
> > > >>
> > > >> On Mon, Mar 4, 2024 at 12:33 PM Zakelly Lan <zakelly....@gmail.com>
> > wrote:
> > > >> >
> > > >> > Hi Yunfeng,
> > > >> >
> > > >> > Thanks for your detailed comments!
> > > >> >
> > > >> >> 1. Why do we need a close() method on StateIterator? This method
> > seems
> > > >> >> unused in the usage example codes.
> > > >> >
> > > >> >
> > > >> > The `close()` is introduced to release internal resources, but it
> > does not seem to require the user to call it. I removed this.
> > > >> >
> > > >> >> 2. In FutureUtils.combineAll()'s JavaDoc, it is stated that "No
> > null
> > > >> >> entries are allowed". It might be better to further explain what
> > will
> > > >> >> happen if a null value is passed, ignoring the value in the
> > returned
> > > >> >> Collection or throwing exceptions. Given that
> > > >> >> FutureUtils.emptyFuture() can be returned in the example code, I
> > > >> >> suppose the former one might be correct.
> > > >> >
> > > >> >
> > > >> > The statement "No null entries are allowed" refers to the
> > parameters, it means some arrayList like [null, StateFuture1,
> StateFuture2]
> > passed in are not allowed, and an Exception will be thrown.
> > > >> >
> > > >> >> 1. According to Fig 2 of this FLIP, ... . This situation should
> be
> > > >> >> avoided and the order of same-key records should be strictly
> > > >> >> preserved.
> > > >> >
> > > >> >
> > > >> > I will discuss this with some expert SQL developers. And if it is
> > valid and common, I suggest a strict order preservation mode for
> > StreamRecord processing. WDYT?
> > > >> >
> > > >> >> 2. The FLIP says that StateRequests submitted by Callbacks will
> not
> > > >> >> invoke further yield() methods. Given that yield() is used when
> > there
> > > >> >> is "too much" in-flight data, does it mean StateRequests
> submitted
> > by
> > > >> >> Callbacks will never be "too much"? What if the total number of
> > > >> >> StateRequests exceed the capacity of Flink operator's memory
> space?
> > > >> >
> > > >> >
> > > >> > The amount of parallel StateRequests for one StreamRecord cannot
> be
> > determined since the code is written by users. So the in-flight requests
> > may be "too much", and may cause OOM. Users should re-configure their
> job,
> > controlling the amount of on-going StreamRecord. And I suggest two ways
> to
> > avoid this:
> > > >> >
> > > >> > Adaptively adjust the count of on-going StreamRecord according to
> > historical StateRequests amount.
> > > >> > Also control the max StateRequests that can be executed in
> parallel
> > for each StreamRecord, and if it exceeds, put the new StateRequest in the
> > blocking buffer waiting for execution (instead of invoking yield()).
> > > >> >
> > > >> > WDYT?
> > > >> >
> > > >> >
> > > >> >> 3.1 I'm concerned that the out-of-order execution mode, along
> with
> > the
> > > >> >> epoch mechanism, would bring more complexity to the execution
> model
> > > >> >> than the performance improvement it promises. Could we add some
> > > >> >> benchmark results proving the benefit of this mode?
> > > >> >
> > > >> >
> > > >> > Agreed, will do.
> > > >> >
> > > >> >> 3.2 The FLIP might need to add a public API section describing
> how
> > > >> >> users or developers can switch between these two execution modes.
> > > >> >
> > > >> >
> > > >> > Good point. We will add a Public API section.
> > > >> >
> > > >> >> 3.3 Apart from the watermark and checkpoint mentioned in this
> FLIP,
> > > >> >> there are also more other events that might appear in the stream
> of
> > > >> >> data records. It might be better to generalize the execution mode
> > > >> >> mechanism to handle all possible events.
> > > >> >
> > > >> >
> > > >> > Yes, I missed this point. Thanks for the reminder.
> > > >> >
> > > >> >> 4. It might be better to treat callback-handling as a
> > > >> >> MailboxDefaultAction, instead of Mails, to avoid the overhead of
> > > >> >> repeatedly creating Mail objects.
> > > >> >
> > > >> >
> > > >> >  I thought the intermediated wrapper for callback can not be
> > omitted, since there will be some context switch before each execution.
> The
> > MailboxDefaultAction in most cases is processInput right? While the
> > callback should be executed with higher priority. I'd suggest not
> changing
> > the basic logic of Mailbox and the default action since it is very
> critical
> > for performance. But yes, we will try our best to avoid creating
> > intermediated objects.
> > > >> >
> > > >> >> 5. Could this FLIP provide the current default values for things
> > like
> > > >> >> active buffer size thresholds and timeouts? These could help with
> > > >> >> memory consumption and latency analysis.
> > > >> >
> > > >> >
> > > >> > Sure, we will introduce new configs as well as their default
> value.
> > > >> >
> > > >> >> 6. Why do we need to record the hashcode of a record in its
> > > >> >> RecordContext? It seems not used.
> > > >> >
> > > >> >
> > > >> > The context switch before each callback execution involves
> > setCurrentKey, where the hashCode is re-calculated. We cache it for
> > accelerating.
> > > >> >
> > > >> >> 7. In "timers can be stored on the JVM heap or RocksDB", the link
> > > >> >> points to a document in flink-1.15. It might be better to verify
> > the
> > > >> >> referenced content is still valid in the latest Flink and update
> > the
> > > >> >> link accordingly. Same for other references if any.
> > > >> >
> > > >> >
> > > >> > Thanks for the reminder! Will check.
> > > >> >
> > > >> >
> > > >> > Thanks a lot & Best,
> > > >> > Zakelly
> > > >> >
> > > >> > On Sat, Mar 2, 2024 at 6:18 AM Jeyhun Karimov <
> je.kari...@gmail.com>
> > wrote:
> > > >> >>
> > > >> >> Hi,
> > > >> >>
> > > >> >> Thanks for the great proposals. I have a few comments comments:
> > > >> >>
> > > >> >> - Backpressure Handling. Flink's original backpressure handling
> is
> > quite
> > > >> >> robust and the semantics is quite "simple" (simple is beautiful).
> > > >> >> This mechanism has proven to perform better/robust than the other
> > open
> > > >> >> source streaming systems, where they were relying on some
> loopback
> > > >> >> information.
> > > >> >> Now that the proposal also relies on loopback (yield in this
> > case), it is
> > > >> >> not clear how well the new backpressure handling proposed in
> > FLIP-425 is
> > > >> >> robust and handle fluctuating workloads.
> > > >> >>
> > > >> >> - Watermark/Timer Handling: Similar arguments apply for watermark
> > and timer
> > > >> >> handling. IMHO, we need more benchmarks showing the overhead
> > > >> >> of epoch management with different parameters (e.g., window size,
> > watermark
> > > >> >> strategy, etc)
> > > >> >>
> > > >> >> - DFS consistency guarantees. The proposal in FLIP-427 is
> > DFS-agnostic.
> > > >> >> However, different cloud providers have different storage
> > consistency
> > > >> >> models.
> > > >> >> How do we want to deal with them?
> > > >> >>
> > > >> >>  Regards,
> > > >> >> Jeyhun
> > > >> >>
> > > >> >>
> > > >> >>
> > > >> >>
> > > >> >> On Fri, Mar 1, 2024 at 6:08 PM Zakelly Lan <
> zakelly....@gmail.com>
> > wrote:
> > > >> >>
> > > >> >> > Thanks Piotr for sharing your thoughts!
> > > >> >> >
> > > >> >> > I guess it depends how we would like to treat the local disks.
> > I've always
> > > >> >> > > thought about them that almost always eventually all state
> > from the DFS
> > > >> >> > > should end up cached in the local disks.
> > > >> >> >
> > > >> >> >
> > > >> >> > OK I got it. In our proposal we treat local disk as an optional
> > cache, so
> > > >> >> > the basic design will handle the case with state residing in
> DFS
> > only. It
> > > >> >> > is a more 'cloud-native' approach that does not rely on any
> > local storage
> > > >> >> > assumptions, which allow users to dynamically adjust the
> > capacity or I/O
> > > >> >> > bound of remote storage to gain performance or save the cost,
> > even without
> > > >> >> > a job restart.
> > > >> >> >
> > > >> >> > In
> > > >> >> > > the currently proposed more fine grained solution, you make a
> > single
> > > >> >> > > request to DFS per each state access.
> > > >> >> > >
> > > >> >> >
> > > >> >> > Ah that's not accurate. Actually we buffer the state requests
> > and process
> > > >> >> > them in batch, multiple requests will correspond to one DFS
> > access (One
> > > >> >> > block access for multiple keys performed by RocksDB).
> > > >> >> >
> > > >> >> > In that benchmark you mentioned, are you requesting the state
> > > >> >> > > asynchronously from local disks into memory? If the benefit
> > comes from
> > > >> >> > > parallel I/O, then I would expect the benefit to
> > disappear/shrink when
> > > >> >> > > running multiple subtasks on the same machine, as they would
> > be making
> > > >> >> > > their own parallel requests, right? Also enabling
> > checkpointing would
> > > >> >> > > further cut into the available I/O budget.
> > > >> >> >
> > > >> >> >
> > > >> >> > That's an interesting topic. Our proposal is specifically aimed
> > at the
> > > >> >> > scenario where the machine I/O is not fully loaded but the I/O
> > latency has
> > > >> >> > indeed become a bottleneck for each subtask. While the
> > distributed file
> > > >> >> > system is a prime example of a scenario characterized by
> > abundant and
> > > >> >> > easily scalable I/O bandwidth coupled with higher I/O latency.
> > You may
> > > >> >> > expect to increase the parallelism of a job to enhance the
> > performance as
> > > >> >> > well, but that also brings in more waste of CPU's and memory
> for
> > building
> > > >> >> > up more subtasks. This is one drawback for the
> > computation-storage tightly
> > > >> >> > coupled nodes. While in our proposal, the parallel I/O with all
> > the
> > > >> >> > callbacks still running in one task, pre-allocated
> computational
> > resources
> > > >> >> > are better utilized. It is a much more lightweight way to
> > perform parallel
> > > >> >> > I/O.
> > > >> >> >
> > > >> >> > Just with what granularity those async requests should be made.
> > > >> >> > > Making state access asynchronous is definitely the right way
> > to go!
> > > >> >> >
> > > >> >> >
> > > >> >> > I think the current proposal is based on such core ideas:
> > > >> >> >
> > > >> >> >    - A pure cloud-native disaggregated state.
> > > >> >> >    - Fully utilize the given resources and try not to waste
> them
> > (including
> > > >> >> >    I/O).
> > > >> >> >    - The ability to scale isolated resources (I/O or CPU or
> > memory)
> > > >> >> >    independently.
> > > >> >> >
> > > >> >> > We think a fine-grained granularity is more inline with those
> > ideas,
> > > >> >> > especially without local disk assumptions and without any waste
> > of I/O by
> > > >> >> > prefetching. Please note that it is not a replacement of the
> > original local
> > > >> >> > state with synchronous execution. Instead this is a solution
> > embracing the
> > > >> >> > cloud-native era, providing much more scalability and resource
> > efficiency
> > > >> >> > when handling a *huge state*.
> > > >> >> >
> > > >> >> > What also worries me a lot in this fine grained model is the
> > effect on the
> > > >> >> > > checkpointing times.
> > > >> >> >
> > > >> >> >
> > > >> >> > Your concerns are very reasonable. Faster checkpointing is
> > always a core
> > > >> >> > advantage of disaggregated state, but only for the async phase.
> > There will
> > > >> >> > be some complexity introduced by in-flight requests, but I'd
> > suggest a
> > > >> >> > checkpoint containing those in-flight state requests as part of
> > the state,
> > > >> >> > to accelerate the sync phase by skipping the buffer draining.
> > This makes
> > > >> >> > the buffer size have little impact on checkpoint time. And all
> > the changes
> > > >> >> > keep within the execution model we proposed while the
> checkpoint
> > barrier
> > > >> >> > alignment or handling will not be touched in our proposal, so I
> > guess
> > > >> >> > the complexity is relatively controllable. I have faith in that
> > :)
> > > >> >> >
> > > >> >> > Also regarding the overheads, it would be great if you could
> > provide
> > > >> >> > > profiling results of the benchmarks that you conducted to
> > verify the
> > > >> >> > > results. Or maybe if you could describe the steps to
> reproduce
> > the
> > > >> >> > results?
> > > >> >> > > Especially "Hashmap (sync)" vs "Hashmap with async API".
> > > >> >> > >
> > > >> >> >
> > > >> >> > Yes we could profile the benchmarks. And for the comparison of
> > "Hashmap
> > > >> >> > (sync)" vs "Hashmap with async API", we conduct a Wordcount job
> > written
> > > >> >> > with async APIs but disabling the async execution by directly
> > completing
> > > >> >> > the future using sync state access. This evaluates the overhead
> > of newly
> > > >> >> > introduced modules like 'AEC' in sync execution (even though
> > they are not
> > > >> >> > designed for it). The code will be provided later. For other
> > results of our
> > > >> >> > PoC[1], you can follow the instructions here[2] to reproduce.
> > Since the
> > > >> >> > compilation may take some effort, we will directly provide the
> > jar for
> > > >> >> > testing next week.
> > > >> >> >
> > > >> >> >
> > > >> >> > And @Yunfeng Zhou, I have noticed your mail but it is a bit
> late
> > in my
> > > >> >> > local time and the next few days are weekends. So I will reply
> > to you
> > > >> >> > later. Thanks for your response!
> > > >> >> >
> > > >> >> >
> > > >> >> > [1]
> > > >> >> >
> > > >> >> >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-PoCResults
> > > >> >> > [2]
> > > >> >> >
> > > >> >> >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-Appendix:HowtorunthePoC
> > > >> >> >
> > > >> >> >
> > > >> >> > Best,
> > > >> >> > Zakelly
> > > >> >> >
> > > >> >> >
> > > >> >> > On Fri, Mar 1, 2024 at 6:38 PM Yunfeng Zhou <
> > flink.zhouyunf...@gmail.com>
> > > >> >> > wrote:
> > > >> >> >
> > > >> >> > > Hi,
> > > >> >> > >
> > > >> >> > > Thanks for proposing this design! I just read FLIP-424 and
> > FLIP-425
> > > >> >> > > and have some questions about the proposed changes.
> > > >> >> > >
> > > >> >> > > For Async API (FLIP-424)
> > > >> >> > >
> > > >> >> > > 1. Why do we need a close() method on StateIterator? This
> > method seems
> > > >> >> > > unused in the usage example codes.
> > > >> >> > >
> > > >> >> > > 2. In FutureUtils.combineAll()'s JavaDoc, it is stated that
> > "No null
> > > >> >> > > entries are allowed". It might be better to further explain
> > what will
> > > >> >> > > happen if a null value is passed, ignoring the value in the
> > returned
> > > >> >> > > Collection or throwing exceptions. Given that
> > > >> >> > > FutureUtils.emptyFuture() can be returned in the example
> code,
> > I
> > > >> >> > > suppose the former one might be correct.
> > > >> >> > >
> > > >> >> > >
> > > >> >> > > For Async Execution (FLIP-425)
> > > >> >> > >
> > > >> >> > > 1. According to Fig 2 of this FLIP, if a recordB has its key
> > collide
> > > >> >> > > with an ongoing recordA, its processElement() method can
> still
> > be
> > > >> >> > > triggered immediately, and then it might be moved to the
> > blocking
> > > >> >> > > buffer in AEC if it involves state operations. This means
> that
> > > >> >> > > recordB's output will precede recordA's output in downstream
> > > >> >> > > operators, if recordA involves state operations while recordB
> > does
> > > >> >> > > not. This will harm the correctness of Flink jobs in some use
> > cases.
> > > >> >> > > For example, in dim table join cases, recordA could be a
> delete
> > > >> >> > > operation that involves state access, while recordB could be
> > an insert
> > > >> >> > > operation that needs to visit external storage without state
> > access.
> > > >> >> > > If recordB's output precedes recordA's, then an entry that is
> > supposed
> > > >> >> > > to finally exist with recordB's value in the sink table might
> > actually
> > > >> >> > > be deleted according to recordA's command. This situation
> > should be
> > > >> >> > > avoided and the order of same-key records should be strictly
> > > >> >> > > preserved.
> > > >> >> > >
> > > >> >> > > 2. The FLIP says that StateRequests submitted by Callbacks
> > will not
> > > >> >> > > invoke further yield() methods. Given that yield() is used
> > when there
> > > >> >> > > is "too much" in-flight data, does it mean StateRequests
> > submitted by
> > > >> >> > > Callbacks will never be "too much"? What if the total number
> of
> > > >> >> > > StateRequests exceed the capacity of Flink operator's memory
> > space?
> > > >> >> > >
> > > >> >> > > 3. In the "Watermark" section, this FLIP provided an
> > out-of-order
> > > >> >> > > execution mode apart from the default strictly-ordered mode,
> > which can
> > > >> >> > > optimize performance by allowing more concurrent executions.
> > > >> >> > >
> > > >> >> > > 3.1 I'm concerned that the out-of-order execution mode, along
> > with the
> > > >> >> > > epoch mechanism, would bring more complexity to the execution
> > model
> > > >> >> > > than the performance improvement it promises. Could we add
> some
> > > >> >> > > benchmark results proving the benefit of this mode?
> > > >> >> > >
> > > >> >> > > 3.2 The FLIP might need to add a public API section
> describing
> > how
> > > >> >> > > users or developers can switch between these two execution
> > modes.
> > > >> >> > >
> > > >> >> > > 3.3 Apart from the watermark and checkpoint mentioned in this
> > FLIP,
> > > >> >> > > there are also more other events that might appear in the
> > stream of
> > > >> >> > > data records. It might be better to generalize the execution
> > mode
> > > >> >> > > mechanism to handle all possible events.
> > > >> >> > >
> > > >> >> > > 4. It might be better to treat callback-handling as a
> > > >> >> > > MailboxDefaultAction, instead of Mails, to avoid the overhead
> > of
> > > >> >> > > repeatedly creating Mail objects.
> > > >> >> > >
> > > >> >> > > 5. Could this FLIP provide the current default values for
> > things like
> > > >> >> > > active buffer size thresholds and timeouts? These could help
> > with
> > > >> >> > > memory consumption and latency analysis.
> > > >> >> > >
> > > >> >> > > 6. Why do we need to record the hashcode of a record in its
> > > >> >> > > RecordContext? It seems not used.
> > > >> >> > >
> > > >> >> > > 7. In "timers can be stored on the JVM heap or RocksDB", the
> > link
> > > >> >> > > points to a document in flink-1.15. It might be better to
> > verify the
> > > >> >> > > referenced content is still valid in the latest Flink and
> > update the
> > > >> >> > > link accordingly. Same for other references if any.
> > > >> >> > >
> > > >> >> > > Best,
> > > >> >> > > Yunfeng Zhou
> > > >> >> > >
> > > >> >> > > On Thu, Feb 29, 2024 at 2:17 PM Yuan Mei <
> > yuanmei.w...@gmail.com> wrote:
> > > >> >> > > >
> > > >> >> > > > Hi Devs,
> > > >> >> > > >
> > > >> >> > > > This is a joint work of Yuan Mei, Zakelly Lan, Jinzhong Li,
> > Hangxiang
> > > >> >> > Yu,
> > > >> >> > > > Yanfei Lei and Feng Wang. We'd like to start a discussion
> > about
> > > >> >> > > introducing
> > > >> >> > > > Disaggregated State Storage and Management in Flink 2.0.
> > > >> >> > > >
> > > >> >> > > > The past decade has witnessed a dramatic shift in Flink's
> > deployment
> > > >> >> > > mode,
> > > >> >> > > > workload patterns, and hardware improvements. We've moved
> > from the
> > > >> >> > > > map-reduce era where workers are computation-storage
> tightly
> > coupled
> > > >> >> > > nodes
> > > >> >> > > > to a cloud-native world where containerized deployments on
> > Kubernetes
> > > >> >> > > > become standard. To enable Flink's Cloud-Native future, we
> > introduce
> > > >> >> > > > Disaggregated State Storage and Management that uses DFS as
> > primary
> > > >> >> > > storage
> > > >> >> > > > in Flink 2.0, as promised in the Flink 2.0 Roadmap.
> > > >> >> > > >
> > > >> >> > > > Design Details can be found in FLIP-423[1].
> > > >> >> > > >
> > > >> >> > > > This new architecture is aimed to solve the following
> > challenges
> > > >> >> > brought
> > > >> >> > > in
> > > >> >> > > > the cloud-native era for Flink.
> > > >> >> > > > 1. Local Disk Constraints in containerization
> > > >> >> > > > 2. Spiky Resource Usage caused by compaction in the current
> > state model
> > > >> >> > > > 3. Fast Rescaling for jobs with large states (hundreds of
> > Terabytes)
> > > >> >> > > > 4. Light and Fast Checkpoint in a native way
> > > >> >> > > >
> > > >> >> > > > More specifically, we want to reach a consensus on the
> > following issues
> > > >> >> > > in
> > > >> >> > > > this discussion:
> > > >> >> > > >
> > > >> >> > > > 1. Overall design
> > > >> >> > > > 2. Proposed Changes
> > > >> >> > > > 3. Design details to achieve Milestone1
> > > >> >> > > >
> > > >> >> > > > In M1, we aim to achieve an end-to-end baseline version
> > using DFS as
> > > >> >> > > > primary storage and complete core functionalities,
> including:
> > > >> >> > > >
> > > >> >> > > > - Asynchronous State APIs (FLIP-424)[2]: Introduce new APIs
> > for
> > > >> >> > > > asynchronous state access.
> > > >> >> > > > - Asynchronous Execution Model (FLIP-425)[3]: Implement a
> > non-blocking
> > > >> >> > > > execution model leveraging the asynchronous APIs introduced
> > in
> > > >> >> > FLIP-424.
> > > >> >> > > > - Grouping Remote State Access (FLIP-426)[4]: Enable
> > retrieval of
> > > >> >> > remote
> > > >> >> > > > state data in batches to avoid unnecessary round-trip costs
> > for remote
> > > >> >> > > > access
> > > >> >> > > > - Disaggregated State Store (FLIP-427)[5]: Introduce the
> > initial
> > > >> >> > version
> > > >> >> > > of
> > > >> >> > > > the ForSt disaggregated state store.
> > > >> >> > > > - Fault Tolerance/Rescale Integration (FLIP-428)[6]:
> > Integrate
> > > >> >> > > > checkpointing mechanisms with the disaggregated state store
> > for fault
> > > >> >> > > > tolerance and fast rescaling.
> > > >> >> > > >
> > > >> >> > > > We will vote on each FLIP in separate threads to make sure
> > each FLIP
> > > >> >> > > > reaches a consensus. But we want to keep the discussion
> > within a
> > > >> >> > focused
> > > >> >> > > > thread (this thread) for easier tracking of contexts to
> avoid
> > > >> >> > duplicated
> > > >> >> > > > questions/discussions and also to think of the
> > problem/solution in a
> > > >> >> > full
> > > >> >> > > > picture.
> > > >> >> > > >
> > > >> >> > > > Looking forward to your feedback
> > > >> >> > > >
> > > >> >> > > > Best,
> > > >> >> > > > Yuan, Zakelly, Jinzhong, Hangxiang, Yanfei and Feng
> > > >> >> > > >
> > > >> >> > > > [1] https://cwiki.apache.org/confluence/x/R4p3EQ
> > > >> >> > > > [2] https://cwiki.apache.org/confluence/x/SYp3EQ
> > > >> >> > > > [3] https://cwiki.apache.org/confluence/x/S4p3EQ
> > > >> >> > > > [4] https://cwiki.apache.org/confluence/x/TYp3EQ
> > > >> >> > > > [5] https://cwiki.apache.org/confluence/x/T4p3EQ
> > > >> >> > > > [6] https://cwiki.apache.org/confluence/x/UYp3EQ
> > > >> >> > >
> > > >> >> >
> >
>

Reply via email to