Hi,

Thanks for the great proposals. I have a few comments comments:

- Backpressure Handling. Flink's original backpressure handling is quite
robust and the semantics is quite "simple" (simple is beautiful).
This mechanism has proven to perform better/robust than the other open
source streaming systems, where they were relying on some loopback
information.
Now that the proposal also relies on loopback (yield in this case), it is
not clear how well the new backpressure handling proposed in FLIP-425 is
robust and handle fluctuating workloads.

- Watermark/Timer Handling: Similar arguments apply for watermark and timer
handling. IMHO, we need more benchmarks showing the overhead
of epoch management with different parameters (e.g., window size, watermark
strategy, etc)

- DFS consistency guarantees. The proposal in FLIP-427 is DFS-agnostic.
However, different cloud providers have different storage consistency
models.
How do we want to deal with them?

 Regards,
Jeyhun




On Fri, Mar 1, 2024 at 6:08 PM Zakelly Lan <zakelly....@gmail.com> wrote:

> Thanks Piotr for sharing your thoughts!
>
> I guess it depends how we would like to treat the local disks. I've always
> > thought about them that almost always eventually all state from the DFS
> > should end up cached in the local disks.
>
>
> OK I got it. In our proposal we treat local disk as an optional cache, so
> the basic design will handle the case with state residing in DFS only. It
> is a more 'cloud-native' approach that does not rely on any local storage
> assumptions, which allow users to dynamically adjust the capacity or I/O
> bound of remote storage to gain performance or save the cost, even without
> a job restart.
>
> In
> > the currently proposed more fine grained solution, you make a single
> > request to DFS per each state access.
> >
>
> Ah that's not accurate. Actually we buffer the state requests and process
> them in batch, multiple requests will correspond to one DFS access (One
> block access for multiple keys performed by RocksDB).
>
> In that benchmark you mentioned, are you requesting the state
> > asynchronously from local disks into memory? If the benefit comes from
> > parallel I/O, then I would expect the benefit to disappear/shrink when
> > running multiple subtasks on the same machine, as they would be making
> > their own parallel requests, right? Also enabling checkpointing would
> > further cut into the available I/O budget.
>
>
> That's an interesting topic. Our proposal is specifically aimed at the
> scenario where the machine I/O is not fully loaded but the I/O latency has
> indeed become a bottleneck for each subtask. While the distributed file
> system is a prime example of a scenario characterized by abundant and
> easily scalable I/O bandwidth coupled with higher I/O latency. You may
> expect to increase the parallelism of a job to enhance the performance as
> well, but that also brings in more waste of CPU's and memory for building
> up more subtasks. This is one drawback for the computation-storage tightly
> coupled nodes. While in our proposal, the parallel I/O with all the
> callbacks still running in one task, pre-allocated computational resources
> are better utilized. It is a much more lightweight way to perform parallel
> I/O.
>
> Just with what granularity those async requests should be made.
> > Making state access asynchronous is definitely the right way to go!
>
>
> I think the current proposal is based on such core ideas:
>
>    - A pure cloud-native disaggregated state.
>    - Fully utilize the given resources and try not to waste them (including
>    I/O).
>    - The ability to scale isolated resources (I/O or CPU or memory)
>    independently.
>
> We think a fine-grained granularity is more inline with those ideas,
> especially without local disk assumptions and without any waste of I/O by
> prefetching. Please note that it is not a replacement of the original local
> state with synchronous execution. Instead this is a solution embracing the
> cloud-native era, providing much more scalability and resource efficiency
> when handling a *huge state*.
>
> What also worries me a lot in this fine grained model is the effect on the
> > checkpointing times.
>
>
> Your concerns are very reasonable. Faster checkpointing is always a core
> advantage of disaggregated state, but only for the async phase. There will
> be some complexity introduced by in-flight requests, but I'd suggest a
> checkpoint containing those in-flight state requests as part of the state,
> to accelerate the sync phase by skipping the buffer draining. This makes
> the buffer size have little impact on checkpoint time. And all the changes
> keep within the execution model we proposed while the checkpoint barrier
> alignment or handling will not be touched in our proposal, so I guess
> the complexity is relatively controllable. I have faith in that :)
>
> Also regarding the overheads, it would be great if you could provide
> > profiling results of the benchmarks that you conducted to verify the
> > results. Or maybe if you could describe the steps to reproduce the
> results?
> > Especially "Hashmap (sync)" vs "Hashmap with async API".
> >
>
> Yes we could profile the benchmarks. And for the comparison of "Hashmap
> (sync)" vs "Hashmap with async API", we conduct a Wordcount job written
> with async APIs but disabling the async execution by directly completing
> the future using sync state access. This evaluates the overhead of newly
> introduced modules like 'AEC' in sync execution (even though they are not
> designed for it). The code will be provided later. For other results of our
> PoC[1], you can follow the instructions here[2] to reproduce. Since the
> compilation may take some effort, we will directly provide the jar for
> testing next week.
>
>
> And @Yunfeng Zhou, I have noticed your mail but it is a bit late in my
> local time and the next few days are weekends. So I will reply to you
> later. Thanks for your response!
>
>
> [1]
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-PoCResults
> [2]
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855#FLIP423:DisaggregatedStateStorageandManagement(UmbrellaFLIP)-Appendix:HowtorunthePoC
>
>
> Best,
> Zakelly
>
>
> On Fri, Mar 1, 2024 at 6:38 PM Yunfeng Zhou <flink.zhouyunf...@gmail.com>
> wrote:
>
> > Hi,
> >
> > Thanks for proposing this design! I just read FLIP-424 and FLIP-425
> > and have some questions about the proposed changes.
> >
> > For Async API (FLIP-424)
> >
> > 1. Why do we need a close() method on StateIterator? This method seems
> > unused in the usage example codes.
> >
> > 2. In FutureUtils.combineAll()'s JavaDoc, it is stated that "No null
> > entries are allowed". It might be better to further explain what will
> > happen if a null value is passed, ignoring the value in the returned
> > Collection or throwing exceptions. Given that
> > FutureUtils.emptyFuture() can be returned in the example code, I
> > suppose the former one might be correct.
> >
> >
> > For Async Execution (FLIP-425)
> >
> > 1. According to Fig 2 of this FLIP, if a recordB has its key collide
> > with an ongoing recordA, its processElement() method can still be
> > triggered immediately, and then it might be moved to the blocking
> > buffer in AEC if it involves state operations. This means that
> > recordB's output will precede recordA's output in downstream
> > operators, if recordA involves state operations while recordB does
> > not. This will harm the correctness of Flink jobs in some use cases.
> > For example, in dim table join cases, recordA could be a delete
> > operation that involves state access, while recordB could be an insert
> > operation that needs to visit external storage without state access.
> > If recordB's output precedes recordA's, then an entry that is supposed
> > to finally exist with recordB's value in the sink table might actually
> > be deleted according to recordA's command. This situation should be
> > avoided and the order of same-key records should be strictly
> > preserved.
> >
> > 2. The FLIP says that StateRequests submitted by Callbacks will not
> > invoke further yield() methods. Given that yield() is used when there
> > is "too much" in-flight data, does it mean StateRequests submitted by
> > Callbacks will never be "too much"? What if the total number of
> > StateRequests exceed the capacity of Flink operator's memory space?
> >
> > 3. In the "Watermark" section, this FLIP provided an out-of-order
> > execution mode apart from the default strictly-ordered mode, which can
> > optimize performance by allowing more concurrent executions.
> >
> > 3.1 I'm concerned that the out-of-order execution mode, along with the
> > epoch mechanism, would bring more complexity to the execution model
> > than the performance improvement it promises. Could we add some
> > benchmark results proving the benefit of this mode?
> >
> > 3.2 The FLIP might need to add a public API section describing how
> > users or developers can switch between these two execution modes.
> >
> > 3.3 Apart from the watermark and checkpoint mentioned in this FLIP,
> > there are also more other events that might appear in the stream of
> > data records. It might be better to generalize the execution mode
> > mechanism to handle all possible events.
> >
> > 4. It might be better to treat callback-handling as a
> > MailboxDefaultAction, instead of Mails, to avoid the overhead of
> > repeatedly creating Mail objects.
> >
> > 5. Could this FLIP provide the current default values for things like
> > active buffer size thresholds and timeouts? These could help with
> > memory consumption and latency analysis.
> >
> > 6. Why do we need to record the hashcode of a record in its
> > RecordContext? It seems not used.
> >
> > 7. In "timers can be stored on the JVM heap or RocksDB", the link
> > points to a document in flink-1.15. It might be better to verify the
> > referenced content is still valid in the latest Flink and update the
> > link accordingly. Same for other references if any.
> >
> > Best,
> > Yunfeng Zhou
> >
> > On Thu, Feb 29, 2024 at 2:17 PM Yuan Mei <yuanmei.w...@gmail.com> wrote:
> > >
> > > Hi Devs,
> > >
> > > This is a joint work of Yuan Mei, Zakelly Lan, Jinzhong Li, Hangxiang
> Yu,
> > > Yanfei Lei and Feng Wang. We'd like to start a discussion about
> > introducing
> > > Disaggregated State Storage and Management in Flink 2.0.
> > >
> > > The past decade has witnessed a dramatic shift in Flink's deployment
> > mode,
> > > workload patterns, and hardware improvements. We've moved from the
> > > map-reduce era where workers are computation-storage tightly coupled
> > nodes
> > > to a cloud-native world where containerized deployments on Kubernetes
> > > become standard. To enable Flink's Cloud-Native future, we introduce
> > > Disaggregated State Storage and Management that uses DFS as primary
> > storage
> > > in Flink 2.0, as promised in the Flink 2.0 Roadmap.
> > >
> > > Design Details can be found in FLIP-423[1].
> > >
> > > This new architecture is aimed to solve the following challenges
> brought
> > in
> > > the cloud-native era for Flink.
> > > 1. Local Disk Constraints in containerization
> > > 2. Spiky Resource Usage caused by compaction in the current state model
> > > 3. Fast Rescaling for jobs with large states (hundreds of Terabytes)
> > > 4. Light and Fast Checkpoint in a native way
> > >
> > > More specifically, we want to reach a consensus on the following issues
> > in
> > > this discussion:
> > >
> > > 1. Overall design
> > > 2. Proposed Changes
> > > 3. Design details to achieve Milestone1
> > >
> > > In M1, we aim to achieve an end-to-end baseline version using DFS as
> > > primary storage and complete core functionalities, including:
> > >
> > > - Asynchronous State APIs (FLIP-424)[2]: Introduce new APIs for
> > > asynchronous state access.
> > > - Asynchronous Execution Model (FLIP-425)[3]: Implement a non-blocking
> > > execution model leveraging the asynchronous APIs introduced in
> FLIP-424.
> > > - Grouping Remote State Access (FLIP-426)[4]: Enable retrieval of
> remote
> > > state data in batches to avoid unnecessary round-trip costs for remote
> > > access
> > > - Disaggregated State Store (FLIP-427)[5]: Introduce the initial
> version
> > of
> > > the ForSt disaggregated state store.
> > > - Fault Tolerance/Rescale Integration (FLIP-428)[6]: Integrate
> > > checkpointing mechanisms with the disaggregated state store for fault
> > > tolerance and fast rescaling.
> > >
> > > We will vote on each FLIP in separate threads to make sure each FLIP
> > > reaches a consensus. But we want to keep the discussion within a
> focused
> > > thread (this thread) for easier tracking of contexts to avoid
> duplicated
> > > questions/discussions and also to think of the problem/solution in a
> full
> > > picture.
> > >
> > > Looking forward to your feedback
> > >
> > > Best,
> > > Yuan, Zakelly, Jinzhong, Hangxiang, Yanfei and Feng
> > >
> > > [1] https://cwiki.apache.org/confluence/x/R4p3EQ
> > > [2] https://cwiki.apache.org/confluence/x/SYp3EQ
> > > [3] https://cwiki.apache.org/confluence/x/S4p3EQ
> > > [4] https://cwiki.apache.org/confluence/x/TYp3EQ
> > > [5] https://cwiki.apache.org/confluence/x/T4p3EQ
> > > [6] https://cwiki.apache.org/confluence/x/UYp3EQ
> >
>

Reply via email to