[
https://issues.apache.org/jira/browse/FLINK-37375?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17937879#comment-17937879
]
Jufang He commented on FLINK-37375:
-----------------------------------
[~zakelly] Yes, in this design, the async operations must finish before the
checkpoint marked complete.
Perhaps I can provide a more detailed explanation of the specific issue we
encountered to better understand this design. In our scenario of using Flink to
write data to Paimon, we faced the problem of slow nodes in DFS. The submission
of Paimon data is tightly integrated with Flink's checkpointing mechanism. The
writer operator flushes data to DFS during `prepareSnapshotPreBarrier`, and
after the data flush is completed, it generates metadata for the data files.
Before the barrier is sent downstream, this metadata is forwarded to the
downstream commit operator to ensure that the commit operator can obtain all
the file metadata required for this checkpoint when doing snapshot (i.e., when
all barriers are received). During the `notifyCheckpointComplete` phase, this
metadata is finally committed.
We know that `prepareSnapshotPreBarrier` occurs during the synchronous phase of
the checkpoint. If a slow DFS node is encountered, it can take a long time,
affecting throughput. Through the optimization in this design, data can be
flushed to local storage during `prepareSnapshotPreBarrier`, and the upload to
DFS can be handled asynchronously during the checkpoint's async phase. Flushing
to local storage is generally more stable in terms of performance, and
uploading data to DFS is performed asynchronously, which significantly reduces
the blocking time during the synchronous phase.
Therefore, in this design, these time-consuming operations are strongly tied to
the flink checkpoint. I believe similar issues might arise in other scenarios
(such as writing to other types of data lakes or implementing custom
time-consuming snapshot logic), so I propose providing an asyncOperate
interface in the checkpoint to universally address such problems.
> Checkpoint supports the Operator to customize asynchronous snapshot state
> -------------------------------------------------------------------------
>
> Key: FLINK-37375
> URL: https://issues.apache.org/jira/browse/FLINK-37375
> Project: Flink
> Issue Type: New Feature
> Components: Runtime / Checkpointing
> Affects Versions: 1.20.1
> Reporter: Jufang He
> Priority: Major
> Labels: pull-request-available
>
> In some Flink task operators, slow operations such as file uploads or data
> flushing may be performed during the synchronous phase of Checkpoint. Due to
> performance issues with external storage components, the synchronous phase
> may take too long to execute, significantly impacting the job's throughput.
> For example, during our internal use of Paimon, we observed that uploading
> files to HDFS during the Checkpoint synchronous phase could encounter random
> HDFS slow node issues, leading to a substantial negative impact on task
> throughput.
> To address this issue, I propose supporting a generic operator custom
> asynchronous snapshot feature, allowing users to move time-consuming logic to
> the asynchronous phase of Checkpoint, thereby minimizing the blocking of the
> main thread and improving task throughput. For instance, the Paimon writer
> operator could write data locally during the Checkpoint synchronous phase and
> upload files to remote storage during the asynchronous phase. Beyond the
> Paimon data upload scenario, other operator logic may also experience slow
> execution during the synchronous phase. This approach aims to uniformly
> optimize such issues.
> I drafted a flip for this issue:
> [https://docs.google.com/document/d/1lwxLEQjD6jVhZUBMRGhzQNWKSvdbPbYNQsV265gR4kw]
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)