Hi,

Jingsong, Yanfei, please check, if you can view the doc. Thanks.

Best regards,
Jing

On Fri, Apr 7, 2023 at 2:19 PM Zakelly Lan <zakelly....@gmail.com> wrote:

> Hi Yanfei,
>
> Thanks for your comments.
>
> > Does this result in a larger space amplification? Maybe a more
> suitable value can be determined through some experimental statistics
> after we implement this feature.
>
> Yes, it results in larger space amplification for shared states. I
> will do more tests and investigation.
>
>
> Thanks.
>
> Best regards,
> Zakelly
>
> On Fri, Apr 7, 2023 at 8:15 PM Zakelly Lan <zakelly....@gmail.com> wrote:
> >
> > Hi @Piotr and @Jingsong Li
> >
> > I have read access to the document, but I'm not sure whether the owner
> > of this document wants to make it public. Actually, the doc is for
> > FLINK-23342 and there is a candidate design very similar to this FLIP,
> > but only for the shared state. Like Yun said, the previous design is
> > not taken because of the code complexity, however I think it is
> > acceptable after implementing the POC[1]. I think we could focus on
> > the current plan, WDTY?
> >
> >
> > [1] POC of this FLIP:
> >
> https://github.com/Zakelly/flink/commit/98538185182e33739828ee36ab96dcf2aebad80c
> >
> > On Fri, Apr 7, 2023 at 8:13 PM Zakelly Lan <zakelly....@gmail.com>
> wrote:
> > >
> > > Hi Piotr,
> > >
> > > Thanks for your comments!
> > >
> > > (1) Sorry for the misleading, let me make it more clear. It is a
> > > concurrent checkpoint senario. Yes, the assumption you said needs to
> > > be followed, but the state handles here refer to the original SST
> > > files, not the underlying file. In this FLIP when checkpoint N and N+1
> > > are running concurrently, they reuse files from checkpoint N-1, and
> > > some of the files may be deleted when checkpoint N completes while
> > > checkpoint N+1 is still writing on it. There is no such problem for
> > > original shared states without file merging because when a state
> > > handle (or sst file here) from checkpoint N-1 is not referenced by
> > > checkpoint N, it will not be referenced by checkpoint N+1. So the
> > > subsumption of sst files from checkpoint N-1 are safe.
> > > For above example, when reaching step "d.", File 1 reached the size
> > > threshold and will not be used. The Chk-2 and Chk-3 are running
> > > concurrently, and the File 3 is being written by Chk-2, so it can not
> > > be used by Chk-3 (As described in section 4.6). Here comes the
> > > problem.
> > >
> > > (2) Please correct me if I'm wrong. The purpose of the
> > > `RecoverableWriter` is to provide a reliable file writer even
> > > tolerable with job failure and recovery. The implementation varies
> > > among the file systems, some of which involves writing into temporary
> > > files (such as HDFS). As a result, it may produce more RPC requests to
> > > the DFS.
> > > The goal of this FLIP is to reduce the pressure on DFS, especially the
> > > number of files and RPC requests. Currently the TMs are NOT using the
> > > RecoverableWriter to persist/upload the state files, and a file
> > > closing is enough. The section 4.1.1 is trying to omit this file
> > > closing but ensure file visibility in some DFS, thus reducing pressure
> > > on DFS. That's why I said the problems they want to solve are
> > > different. I'm not sure if I made myself clear.
> > >
> > >
> > > Thanks!
> > >
> > > Best regards,
> > > Zakelly
> > >
> > > On Fri, Apr 7, 2023 at 8:08 PM Zakelly Lan <zakelly....@gmail.com>
> wrote:
> > > >
> > > > Hi Yun,
> > > >
> > > > Thanks for your suggestions!
> > > >
> > > > I have read the FLINK-23342 and its design doc as you provided. First
> > > > of all the goal of this FLIP and the doc are similar, and the design
> > > > of this FLIP is pretty much like option 3. The main difference is
> that
> > > > we imply the concept of 'epoch' in the folder path for each
> > > > granularity. For shared state, the folder for each subtask is like
> > > > "${checkpointBaseDir}/shared/subtask-{index}-{parallelism}", so if
> the
> > > > ${checkpointBaseDir} changes (when user restart a job manually) or
> the
> > > > ${parallelism} changes (when rescaling), there will be a
> re-uploading,
> > > > and the JM takes care of the old artifacts. The folder path for
> > > > private state is in the form of
> > > > "${checkpointBaseDir}/tm-owned/${tmResourceId}" and the division of
> > > > responsibilities between JM and TM is similar. The design of this
> FLIP
> > > > inherits all the advantages of the design of option 3 in that doc,
> and
> > > > also avoids extra communication for epoch maintenance. As for the
> code
> > > > complexity, you may check the POC commit[1] and find that the
> > > > implementation is pretty clean and is a totally new code path making
> > > > nearly no influence on the old one. Comparing the number of lines of
> > > > code change with what's currently done for merging channel state[2]
> > > > (5200 vs. 2500 additions), I think it is acceptable considering we
> are
> > > > providing a unified file merging framework, which would save a lot of
> > > > effort in future. WDYT?
> > > >
> > > > Thanks.
> > > >
> > > > Best regards,
> > > > Zakelly
> > > >
> > > > [1] POC of this FLIP:
> > > >
> https://github.com/Zakelly/flink/commit/98538185182e33739828ee36ab96dcf2aebad80c
> > > > [2] Commit for FLINK-26803 (Merge the channel state files) :
> > > >
> https://github.com/apache/flink/commit/8be94e6663d8ac6e3d74bf4cd5f540cc96c8289e
> > > >
> > > >
> > > > On Fri, Apr 7, 2023 at 7:22 PM Yanfei Lei <fredia...@gmail.com>
> wrote:
> > > > >
> > > > > Thanks for your explanation Zakelly.
> > > > > (1) Keeping these merging granularities for different types of
> files
> > > > > as presets that are not configurable is a good idea to prevent
> > > > > performance degradation.
> > > > >
> > > > > (2)
> > > > > > For the third option, 64MB is an acceptable target size. The
> RocksDB state backend in Flink also chooses 64MB as the default target file
> size.
> > > > >
> > > > > Does this result in a larger space amplification? Maybe a more
> > > > > suitable value can be determined through some experimental
> statistics
> > > > > after we implement this feature.
> > > > >
> > > > > Best,
> > > > > Yanfei
> > > > >
> > > > > Jingsong Li <jingsongl...@gmail.com> 于2023年4月7日周五 17:09写道:
> > > > > >
> > > > > > Hi Yun,
> > > > > >
> > > > > > It looks like this doc needs permission to read? [1]
> > > > > >
> > > > > > [1]
> https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit#
> > > > > >
> > > > > > Best,
> > > > > > Jingsong
> > > > > >
> > > > > > On Fri, Apr 7, 2023 at 4:34 PM Piotr Nowojski <
> pnowoj...@apache.org> wrote:
> > > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > +1 To what Yun Tang wrote. We don't seem to have access to the
> design doc.
> > > > > > > Could you make it publicly visible or copy out its content to
> another
> > > > > > > document?
> > > > > > >
> > > > > > > Thanks for your answers Zakelly.
> > > > > > >
> > > > > > > (1)
> > > > > > > Yes, the current mechanism introduced in FLINK-24611 allows
> for checkpoint
> > > > > > > N, to only re-use shared state handles that have been already
> referenced by
> > > > > > > checkpoint N-1. But why do we need to break this assumption?
> In your step,
> > > > > > > "d.", TM could adhere to that assumption, and instead of
> reusing File-2, it
> > > > > > > could either re-use File-1, File-3 or create a new file.
> > > > > > >
> > > > > > > (2)
> > > > > > > Can you elaborate a bit more on this? As far as I recall, the
> purpose of
> > > > > > > the `RecoverableWriter` is to support exactly the things
> described in this
> > > > > > > FLIP, so what's the difference? If you are saying that for
> this FLIP you
> > > > > > > can implement something more efficiently for a given
> FileSystem, then why
> > > > > > > can it not be done the same way for the `RecoverableWriter`?
> > > > > > >
> > > > > > > Best,
> > > > > > > Piotrek
> > > > > > >
> > > > > > > czw., 6 kwi 2023 o 17:24 Yun Tang <myas...@live.com>
> napisał(a):
> > > > > > >
> > > > > > > > Hi Zakelly,
> > > > > > > >
> > > > > > > > Thanks for driving this work!
> > > > > > > >
> > > > > > > > I'm not sure did you ever read the discussion between
> Stephan, Roman,
> > > > > > > > Piotr, Yuan and I in the design doc [1] in nearly two years
> ago.
> > > > > > > >
> > > > > > > > From my understanding, your proposal is also a mixed state
> ownership: some
> > > > > > > > states are owned by the TM while some are owned by the JM.
> If my memory is
> > > > > > > > correct, we did not take the option-3 or option-5 in the
> design doc [1] for
> > > > > > > > the code complexity when implements the 1st version of
> changelog
> > > > > > > > state-backend.
> > > > > > > >
> > > > > > > > Could you also compare the current FLIP with the proposals
> in the design
> > > > > > > > doc[1]? From my understanding, we should at least consider
> to comapre with
> > > > > > > > option-3 and option-5 as they are all mixed solutions.
> > > > > > > >
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit#
> > > > > > > >
> > > > > > > > Best
> > > > > > > > Yun Tang
> > > > > > > >
> > > > > > > > ------------------------------
> > > > > > > > *From:* Zakelly Lan <zakelly....@gmail.com>
> > > > > > > > *Sent:* Thursday, April 6, 2023 16:38
> > > > > > > > *To:* dev@flink.apache.org <dev@flink.apache.org>
> > > > > > > > *Subject:* Re: [DISCUSS] FLIP-306: Unified File Merging
> Mechanism for
> > > > > > > > Checkpoints
> > > > > > > >
> > > > > > > > Hi Piotr,
> > > > > > > >
> > > > > > > > Thanks for all the feedback.
> > > > > > > >
> > > > > > > > (1) Thanks for the reminder. I have just seen the
> FLINK-24611, the delayed
> > > > > > > > deletion by JM resolves some sync problems between JM and
> TM, but I'm
> > > > > > > > afraid it is still not feasible for the file sharing in this
> FLIP.
> > > > > > > > Considering a concurrent checkpoint scenario as follows:
> > > > > > > >    a. Checkpoint 1 finishes. 1.sst, 2.sst and 3.sst are
> written in file 1,
> > > > > > > > and 4.sst is written in file 2.
> > > > > > > >    b. Checkpoint 2 starts based on checkpoint 1, including
> 1.sst, 2.sst
> > > > > > > > and 5.sst.
> > > > > > > >    c. Checkpoint 3 starts based on checkpoint 1, including
> 1.sst, 2.sst
> > > > > > > > and 5.sst as well.
> > > > > > > >    d. Checkpoint 3 reuses the file 2, TM writes 5.sst on it.
> > > > > > > >    e. Checkpoint 2 creates a new file 3, TM writes 5.sst on
> it.
> > > > > > > >    f. Checkpoint 2 finishes, checkpoint 1 is subsumed and
> the file 2 is
> > > > > > > > deleted, while checkpoint 3 still needs file 2.
> > > > > > > >
> > > > > > > > I attached a diagram to describe the scenario.
> > > > > > > > [image: concurrent cp.jpg]
> > > > > > > > The core issue is that this FLIP introduces a mechanism that
> allows
> > > > > > > > physical files to be potentially used by the next several
> checkpoints. JM
> > > > > > > > is uncertain whether there will be a TM continuing to write
> to a specific
> > > > > > > > file. So in this FLIP, TMs take the responsibility to delete
> the physical
> > > > > > > > files.
> > > > > > > >
> > > > > > > > (2) IIUC, the RecoverableWriter is introduced to persist
> data in the "in
> > > > > > > > progress" files after each checkpoint, and the
> implementation may be based
> > > > > > > > on the file sync in some file systems. However, since the
> sync is a heavy
> > > > > > > > operation for DFS, this FLIP wants to use flush instead of
> the sync with
> > > > > > > > the best effort. This only fits the case that the DFS is
> considered
> > > > > > > > reliable. The problems they want to solve are different.
> > > > > > > >
> > > > > > > > (3) Yes, if files are managed by JM via the shared registry,
> this problem
> > > > > > > > is solved. And as I mentioned in (1), there are some other
> corner cases
> > > > > > > > hard to resolve via the shared registry.
> > > > > > > >
> > > > > > > > The goal of this FLIP is to have a common way of merging
> files in all use
> > > > > > > > cases. For shared state it merges at subtask level, while
> for private state
> > > > > > > > (and changelog files, as I replied to Yanfei), files are
> merged at TM
> > > > > > > > level. So it is not contrary to the current plan for the
> unaligned
> > > > > > > > checkpoint state (FLINK-26803). You are right that the
> unaligned checkpoint
> > > > > > > > state would be merged with the operator's state file, so
> overall, it is
> > > > > > > > slightly better than what's currently done.
> > > > > > > >
> > > > > > > >
> > > > > > > > Thanks again for the valuable comments!
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > > Zakelly
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Apr 5, 2023 at 8:43 PM Piotr Nowojski <
> pnowoj...@apache.org>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > Thanks for coming up with the proposal, it's definitely
> valuable. I'm still
> > > > > > > > reading and trying to understand the proposal, but a couple
> of comments
> > > > > > > > from my side.
> > > > > > > >
> > > > > > > > (1)
> > > > > > > > > Ownership of a single checkpoint file is transferred to
> TM, while JM
> > > > > > > > manages the parent directory of these files.
> > > > > > > >
> > > > > > > > Have you seen
> https://issues.apache.org/jira/browse/FLINK-24611 before? I
> > > > > > > > don't fully remember why, but we have rejected the idea of
> moving the file
> > > > > > > > ownership to TM and instead reworked the shared file
> registry in a way that
> > > > > > > > I think should be sufficient for file sharing. Could you
> elaborate why we
> > > > > > > > need to move the file ownership to TM, and why is the
> current mechanism not
> > > > > > > > sufficient?
> > > > > > > >
> > > > > > > > (2)
> > > > > > > > > File visibility is needed when a Flink job recovers after
> a checkpoint is
> > > > > > > > materialized. In some DFS, such as most object storages, a
> file is only
> > > > > > > > visible after it is closed
> > > > > > > >
> > > > > > > > Is that really the case?
> > > > > > > >
> `org.apache.flink.core.fs.FileSystem#createRecoverableWriter` seems to be
> > > > > > > > addressing exactly this issue, and the most frequently used
> FileSystem (S3)
> > > > > > > > AFAIK supports it with no problems?
> > > > > > > >
> > > > > > > > (3)
> > > > > > > > > 4.1.2 Merge files within a subtask or a TM
> > > > > > > > > Given that TMs are reassigned after restoration, it is
> difficult to
> > > > > > > > manage physical files that contain data from multiple
> subtasks scattered
> > > > > > > > across different TMs (as depicted in Fig.3). There is no
> synchronization
> > > > > > > > mechanism between TMs, making file management in this
> scenario challenging.
> > > > > > > >
> > > > > > > > I think this is solved in many places already via the shared
> state managed
> > > > > > > > by the JM, as I mentioned in (1).
> > > > > > > >
> > > > > > > >
> > > > > > > > If I understand it correctly you are proposing to have a
> common
> > > > > > > > interface/way of merging small files, in all use cases, that
> would work
> > > > > > > > only across a single subtask? That's contrary to what's
> currently done for
> > > > > > > > unaligned checkpoints, right? But if this generic mechanism
> was to be used
> > > > > > > > for unaligned checkpoints, unaligned checkpoint state would
> have been
> > > > > > > > merged with the operators state file, so all in all there
> would be no
> > > > > > > > regression visible to a user? The limit is that we always
> have at least a
> > > > > > > > single file per subtask, but in exchange we are getting a
> simpler threading
> > > > > > > > model?
> > > > > > > >
> > > > > > > > Bets,
> > > > > > > > Piotrek
> > > > > > > >
> > > > > > > > wt., 4 kwi 2023 o 08:51 Zakelly Lan <zakelly....@gmail.com>
> napisał(a):
> > > > > > > >
> > > > > > > > > Hi Yanfei,
> > > > > > > > >
> > > > > > > > > Thank you for your prompt response.
> > > > > > > > >
> > > > > > > > > I agree that managing (deleting) only some folders with JM
> can greatly
> > > > > > > > > relieve JM's burden. Thanks for pointing this out.
> > > > > > > > >
> > > > > > > > > In general, merging at the TM level is more effective
> since there are
> > > > > > > > > usually more files to merge. Therefore, I believe it is
> better to
> > > > > > > > > merge files per TM as much as possible.  However, for
> shared state,
> > > > > > > > > merging at the subtask level is the best choice to prevent
> significant
> > > > > > > > > data transfer over the network after restoring. I think it
> is better
> > > > > > > > > to keep these merging granularities for different types of
> files as
> > > > > > > > > presets that are not configurable. WDYT?
> > > > > > > > >
> > > > > > > > > As for the DSTL files, they are merged per TM and placed
> in the
> > > > > > > > > task-owned folder. These files can be classified as shared
> state since
> > > > > > > > > they are shared across checkpoints. However, the DSTL file
> is a
> > > > > > > > > special case that will be subsumed by the first checkpoint
> of the
> > > > > > > > > newly restored job. Therefore, there is no need for new
> TMs to keep
> > > > > > > > > these files after the old checkpoint is subsumed, just
> like the
> > > > > > > > > private state files. Thus, it is feasible to merge DSTL
> files per TM
> > > > > > > > > without introducing complex file management across job
> attempts. So
> > > > > > > > > the possible performance degradation is avoided.
> > > > > > > > >
> > > > > > > > > The three newly introduced options have recommended
> defaults. For
> > > > > > > > > upcoming versions, this feature is turned off by default.
> For the
> > > > > > > > > second option, SEGMENTED_ACROSS_CP_BOUNDARY is the
> recommended default
> > > > > > > > > as it is more effective. Of course, if encountering some
> DFS that does
> > > > > > > > > not support file visibility until the file is closed, it
> is possible
> > > > > > > > > to fall back to another option automatically. For the
> third option,
> > > > > > > > > 64MB is an acceptable target size. The RocksDB state
> backend in Flink
> > > > > > > > > also chooses 64MB as the default target file size.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Thank you again for your quick response.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Best regards,
> > > > > > > > > Zakelly
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Mon, Apr 3, 2023 at 11:27 PM Yanfei Lei <
> fredia...@gmail.com> wrote:
> > > > > > > > > >
> > > > > > > > > > Hi Zakelly,
> > > > > > > > > >
> > > > > > > > > > Thanks for driving this,  this proposal enables the
> files merging of
> > > > > > > > > > different types of states to be grouped under a unified
> framework. I
> > > > > > > > > > think it has the added benefit of lightening the load on
> JM. As
> > > > > > > > > > FLINK-26590[1] described,  triggered checkpoints can be
> delayed by
> > > > > > > > > > discarding shared state when JM manages a large number
> of files. After
> > > > > > > > > > this FLIP, JM only needs to manage some folders, which
> greatly reduces
> > > > > > > > > > the burden on JM.
> > > > > > > > > >
> > > > > > > > > > In Section 4.1, two types of merging granularities(per
> subtask and per
> > > > > > > > > > task manager) are proposed, the shared state is managed
> by per subtask
> > > > > > > > > > granularity, but for the changelog state backend, its
> DSTL files are
> > > > > > > > > > shared between checkpoints, and are currently merged in
> batches at the
> > > > > > > > > > task manager level. When merging with the
> SEGMENTED_WITHIN_CP_BOUNDARY
> > > > > > > > > > mode, I'm concerned about the performance degradation of
> its merging,
> > > > > > > > > > hence I wonder if the merge granularities are
> configurable? Further,
> > > > > > > > > > from a user perspective, three new options are
> introduced in this
> > > > > > > > > > FLIP, do they have recommended defaults?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-26590
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Yanfei
> > > > > > > > > >
> > > > > > > > > > Zakelly Lan <zakelly....@gmail.com> 于2023年4月3日周一
> 18:36写道:
> > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Hi everyone,
> > > > > > > > > > >
> > > > > > > > > > > I would like to open a discussion on providing a
> unified file merging
> > > > > > > > > > > mechanism for checkpoints[1].
> > > > > > > > > > >
> > > > > > > > > > > Currently, many files are uploaded to the DFS during
> checkpoints,
> > > > > > > > > > > leading to the 'file flood' problem when running
> > > > > > > > > > > intensive workloads in a cluster.  To tackle this
> problem, various
> > > > > > > > > > > solutions have been proposed for different types
> > > > > > > > > > > of state files. Although these methods are similar,
> they lack a
> > > > > > > > > > > systematic view and approach. We believe that it is
> > > > > > > > > > > better to consider this problem as a whole and
> introduce a unified
> > > > > > > > > > > framework to address the file flood problem for
> > > > > > > > > > > all types of state files. A POC has been implemented
> based on current
> > > > > > > > > > > FLIP design, and the test results are promising.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Looking forward to your comments or feedback.
> > > > > > > > > > >
> > > > > > > > > > > Best regards,
> > > > > > > > > > > Zakelly
> > > > > > > > > > >
> > > > > > > > > > > [1]
> > > > > > > > >
> > > > > > > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints
> > > > > > > > >
> > > > > > > >
> > > > > > > >
>

Reply via email to