Hi,

Thanks for proposing this design! I just read FLIP-424 and FLIP-425
and have some questions about the proposed changes.

For Async API (FLIP-424)

1. Why do we need a close() method on StateIterator? This method seems
unused in the usage example codes.

2. In FutureUtils.combineAll()'s JavaDoc, it is stated that "No null
entries are allowed". It might be better to further explain what will
happen if a null value is passed, ignoring the value in the returned
Collection or throwing exceptions. Given that
FutureUtils.emptyFuture() can be returned in the example code, I
suppose the former one might be correct.


For Async Execution (FLIP-425)

1. According to Fig 2 of this FLIP, if a recordB has its key collide
with an ongoing recordA, its processElement() method can still be
triggered immediately, and then it might be moved to the blocking
buffer in AEC if it involves state operations. This means that
recordB's output will precede recordA's output in downstream
operators, if recordA involves state operations while recordB does
not. This will harm the correctness of Flink jobs in some use cases.
For example, in dim table join cases, recordA could be a delete
operation that involves state access, while recordB could be an insert
operation that needs to visit external storage without state access.
If recordB's output precedes recordA's, then an entry that is supposed
to finally exist with recordB's value in the sink table might actually
be deleted according to recordA's command. This situation should be
avoided and the order of same-key records should be strictly
preserved.

2. The FLIP says that StateRequests submitted by Callbacks will not
invoke further yield() methods. Given that yield() is used when there
is "too much" in-flight data, does it mean StateRequests submitted by
Callbacks will never be "too much"? What if the total number of
StateRequests exceed the capacity of Flink operator's memory space?

3. In the "Watermark" section, this FLIP provided an out-of-order
execution mode apart from the default strictly-ordered mode, which can
optimize performance by allowing more concurrent executions.

3.1 I'm concerned that the out-of-order execution mode, along with the
epoch mechanism, would bring more complexity to the execution model
than the performance improvement it promises. Could we add some
benchmark results proving the benefit of this mode?

3.2 The FLIP might need to add a public API section describing how
users or developers can switch between these two execution modes.

3.3 Apart from the watermark and checkpoint mentioned in this FLIP,
there are also more other events that might appear in the stream of
data records. It might be better to generalize the execution mode
mechanism to handle all possible events.

4. It might be better to treat callback-handling as a
MailboxDefaultAction, instead of Mails, to avoid the overhead of
repeatedly creating Mail objects.

5. Could this FLIP provide the current default values for things like
active buffer size thresholds and timeouts? These could help with
memory consumption and latency analysis.

6. Why do we need to record the hashcode of a record in its
RecordContext? It seems not used.

7. In "timers can be stored on the JVM heap or RocksDB", the link
points to a document in flink-1.15. It might be better to verify the
referenced content is still valid in the latest Flink and update the
link accordingly. Same for other references if any.

Best,
Yunfeng Zhou

On Thu, Feb 29, 2024 at 2:17 PM Yuan Mei <yuanmei.w...@gmail.com> wrote:
>
> Hi Devs,
>
> This is a joint work of Yuan Mei, Zakelly Lan, Jinzhong Li, Hangxiang Yu,
> Yanfei Lei and Feng Wang. We'd like to start a discussion about introducing
> Disaggregated State Storage and Management in Flink 2.0.
>
> The past decade has witnessed a dramatic shift in Flink's deployment mode,
> workload patterns, and hardware improvements. We've moved from the
> map-reduce era where workers are computation-storage tightly coupled nodes
> to a cloud-native world where containerized deployments on Kubernetes
> become standard. To enable Flink's Cloud-Native future, we introduce
> Disaggregated State Storage and Management that uses DFS as primary storage
> in Flink 2.0, as promised in the Flink 2.0 Roadmap.
>
> Design Details can be found in FLIP-423[1].
>
> This new architecture is aimed to solve the following challenges brought in
> the cloud-native era for Flink.
> 1. Local Disk Constraints in containerization
> 2. Spiky Resource Usage caused by compaction in the current state model
> 3. Fast Rescaling for jobs with large states (hundreds of Terabytes)
> 4. Light and Fast Checkpoint in a native way
>
> More specifically, we want to reach a consensus on the following issues in
> this discussion:
>
> 1. Overall design
> 2. Proposed Changes
> 3. Design details to achieve Milestone1
>
> In M1, we aim to achieve an end-to-end baseline version using DFS as
> primary storage and complete core functionalities, including:
>
> - Asynchronous State APIs (FLIP-424)[2]: Introduce new APIs for
> asynchronous state access.
> - Asynchronous Execution Model (FLIP-425)[3]: Implement a non-blocking
> execution model leveraging the asynchronous APIs introduced in FLIP-424.
> - Grouping Remote State Access (FLIP-426)[4]: Enable retrieval of remote
> state data in batches to avoid unnecessary round-trip costs for remote
> access
> - Disaggregated State Store (FLIP-427)[5]: Introduce the initial version of
> the ForSt disaggregated state store.
> - Fault Tolerance/Rescale Integration (FLIP-428)[6]: Integrate
> checkpointing mechanisms with the disaggregated state store for fault
> tolerance and fast rescaling.
>
> We will vote on each FLIP in separate threads to make sure each FLIP
> reaches a consensus. But we want to keep the discussion within a focused
> thread (this thread) for easier tracking of contexts to avoid duplicated
> questions/discussions and also to think of the problem/solution in a full
> picture.
>
> Looking forward to your feedback
>
> Best,
> Yuan, Zakelly, Jinzhong, Hangxiang, Yanfei and Feng
>
> [1] https://cwiki.apache.org/confluence/x/R4p3EQ
> [2] https://cwiki.apache.org/confluence/x/SYp3EQ
> [3] https://cwiki.apache.org/confluence/x/S4p3EQ
> [4] https://cwiki.apache.org/confluence/x/TYp3EQ
> [5] https://cwiki.apache.org/confluence/x/T4p3EQ
> [6] https://cwiki.apache.org/confluence/x/UYp3EQ

Reply via email to