Hi Piotr,

Thanks for your comments!

(1) Sorry for the misleading, let me make it more clear. It is a
concurrent checkpoint senario. Yes, the assumption you said needs to
be followed, but the state handles here refer to the original SST
files, not the underlying file. In this FLIP when checkpoint N and N+1
are running concurrently, they reuse files from checkpoint N-1, and
some of the files may be deleted when checkpoint N completes while
checkpoint N+1 is still writing on it. There is no such problem for
original shared states without file merging because when a state
handle (or sst file here) from checkpoint N-1 is not referenced by
checkpoint N, it will not be referenced by checkpoint N+1. So the
subsumption of sst files from checkpoint N-1 are safe.
For above example, when reaching step "d.", File 1 reached the size
threshold and will not be used. The Chk-2 and Chk-3 are running
concurrently, and the File 3 is being written by Chk-2, so it can not
be used by Chk-3 (As described in section 4.6). Here comes the
problem.

(2) Please correct me if I'm wrong. The purpose of the
`RecoverableWriter` is to provide a reliable file writer even
tolerable with job failure and recovery. The implementation varies
among the file systems, some of which involves writing into temporary
files (such as HDFS). As a result, it may produce more RPC requests to
the DFS.
The goal of this FLIP is to reduce the pressure on DFS, especially the
number of files and RPC requests. Currently the TMs are NOT using the
RecoverableWriter to persist/upload the state files, and a file
closing is enough. The section 4.1.1 is trying to omit this file
closing but ensure file visibility in some DFS, thus reducing pressure
on DFS. That's why I said the problems they want to solve are
different. I'm not sure if I made myself clear.


Thanks!

Best regards,
Zakelly

On Fri, Apr 7, 2023 at 8:08 PM Zakelly Lan <zakelly....@gmail.com> wrote:
>
> Hi Yun,
>
> Thanks for your suggestions!
>
> I have read the FLINK-23342 and its design doc as you provided. First
> of all the goal of this FLIP and the doc are similar, and the design
> of this FLIP is pretty much like option 3. The main difference is that
> we imply the concept of 'epoch' in the folder path for each
> granularity. For shared state, the folder for each subtask is like
> "${checkpointBaseDir}/shared/subtask-{index}-{parallelism}", so if the
> ${checkpointBaseDir} changes (when user restart a job manually) or the
> ${parallelism} changes (when rescaling), there will be a re-uploading,
> and the JM takes care of the old artifacts. The folder path for
> private state is in the form of
> "${checkpointBaseDir}/tm-owned/${tmResourceId}" and the division of
> responsibilities between JM and TM is similar. The design of this FLIP
> inherits all the advantages of the design of option 3 in that doc, and
> also avoids extra communication for epoch maintenance. As for the code
> complexity, you may check the POC commit[1] and find that the
> implementation is pretty clean and is a totally new code path making
> nearly no influence on the old one. Comparing the number of lines of
> code change with what's currently done for merging channel state[2]
> (5200 vs. 2500 additions), I think it is acceptable considering we are
> providing a unified file merging framework, which would save a lot of
> effort in future. WDYT?
>
> Thanks.
>
> Best regards,
> Zakelly
>
> [1] POC of this FLIP:
> https://github.com/Zakelly/flink/commit/98538185182e33739828ee36ab96dcf2aebad80c
> [2] Commit for FLINK-26803 (Merge the channel state files) :
> https://github.com/apache/flink/commit/8be94e6663d8ac6e3d74bf4cd5f540cc96c8289e
>
>
> On Fri, Apr 7, 2023 at 7:22 PM Yanfei Lei <fredia...@gmail.com> wrote:
> >
> > Thanks for your explanation Zakelly.
> > (1) Keeping these merging granularities for different types of files
> > as presets that are not configurable is a good idea to prevent
> > performance degradation.
> >
> > (2)
> > > For the third option, 64MB is an acceptable target size. The RocksDB 
> > > state backend in Flink also chooses 64MB as the default target file size.
> >
> > Does this result in a larger space amplification? Maybe a more
> > suitable value can be determined through some experimental statistics
> > after we implement this feature.
> >
> > Best,
> > Yanfei
> >
> > Jingsong Li <jingsongl...@gmail.com> 于2023年4月7日周五 17:09写道:
> > >
> > > Hi Yun,
> > >
> > > It looks like this doc needs permission to read? [1]
> > >
> > > [1] 
> > > https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit#
> > >
> > > Best,
> > > Jingsong
> > >
> > > On Fri, Apr 7, 2023 at 4:34 PM Piotr Nowojski <pnowoj...@apache.org> 
> > > wrote:
> > > >
> > > > Hi,
> > > >
> > > > +1 To what Yun Tang wrote. We don't seem to have access to the design 
> > > > doc.
> > > > Could you make it publicly visible or copy out its content to another
> > > > document?
> > > >
> > > > Thanks for your answers Zakelly.
> > > >
> > > > (1)
> > > > Yes, the current mechanism introduced in FLINK-24611 allows for 
> > > > checkpoint
> > > > N, to only re-use shared state handles that have been already 
> > > > referenced by
> > > > checkpoint N-1. But why do we need to break this assumption? In your 
> > > > step,
> > > > "d.", TM could adhere to that assumption, and instead of reusing 
> > > > File-2, it
> > > > could either re-use File-1, File-3 or create a new file.
> > > >
> > > > (2)
> > > > Can you elaborate a bit more on this? As far as I recall, the purpose of
> > > > the `RecoverableWriter` is to support exactly the things described in 
> > > > this
> > > > FLIP, so what's the difference? If you are saying that for this FLIP you
> > > > can implement something more efficiently for a given FileSystem, then 
> > > > why
> > > > can it not be done the same way for the `RecoverableWriter`?
> > > >
> > > > Best,
> > > > Piotrek
> > > >
> > > > czw., 6 kwi 2023 o 17:24 Yun Tang <myas...@live.com> napisał(a):
> > > >
> > > > > Hi Zakelly,
> > > > >
> > > > > Thanks for driving this work!
> > > > >
> > > > > I'm not sure did you ever read the discussion between Stephan, Roman,
> > > > > Piotr, Yuan and I in the design doc [1] in nearly two years ago.
> > > > >
> > > > > From my understanding, your proposal is also a mixed state ownership: 
> > > > > some
> > > > > states are owned by the TM while some are owned by the JM. If my 
> > > > > memory is
> > > > > correct, we did not take the option-3 or option-5 in the design doc 
> > > > > [1] for
> > > > > the code complexity when implements the 1st version of changelog
> > > > > state-backend.
> > > > >
> > > > > Could you also compare the current FLIP with the proposals in the 
> > > > > design
> > > > > doc[1]? From my understanding, we should at least consider to comapre 
> > > > > with
> > > > > option-3 and option-5 as they are all mixed solutions.
> > > > >
> > > > >
> > > > > [1]
> > > > > https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit#
> > > > >
> > > > > Best
> > > > > Yun Tang
> > > > >
> > > > > ------------------------------
> > > > > *From:* Zakelly Lan <zakelly....@gmail.com>
> > > > > *Sent:* Thursday, April 6, 2023 16:38
> > > > > *To:* dev@flink.apache.org <dev@flink.apache.org>
> > > > > *Subject:* Re: [DISCUSS] FLIP-306: Unified File Merging Mechanism for
> > > > > Checkpoints
> > > > >
> > > > > Hi Piotr,
> > > > >
> > > > > Thanks for all the feedback.
> > > > >
> > > > > (1) Thanks for the reminder. I have just seen the FLINK-24611, the 
> > > > > delayed
> > > > > deletion by JM resolves some sync problems between JM and TM, but I'm
> > > > > afraid it is still not feasible for the file sharing in this FLIP.
> > > > > Considering a concurrent checkpoint scenario as follows:
> > > > >    a. Checkpoint 1 finishes. 1.sst, 2.sst and 3.sst are written in 
> > > > > file 1,
> > > > > and 4.sst is written in file 2.
> > > > >    b. Checkpoint 2 starts based on checkpoint 1, including 1.sst, 
> > > > > 2.sst
> > > > > and 5.sst.
> > > > >    c. Checkpoint 3 starts based on checkpoint 1, including 1.sst, 
> > > > > 2.sst
> > > > > and 5.sst as well.
> > > > >    d. Checkpoint 3 reuses the file 2, TM writes 5.sst on it.
> > > > >    e. Checkpoint 2 creates a new file 3, TM writes 5.sst on it.
> > > > >    f. Checkpoint 2 finishes, checkpoint 1 is subsumed and the file 2 
> > > > > is
> > > > > deleted, while checkpoint 3 still needs file 2.
> > > > >
> > > > > I attached a diagram to describe the scenario.
> > > > > [image: concurrent cp.jpg]
> > > > > The core issue is that this FLIP introduces a mechanism that allows
> > > > > physical files to be potentially used by the next several 
> > > > > checkpoints. JM
> > > > > is uncertain whether there will be a TM continuing to write to a 
> > > > > specific
> > > > > file. So in this FLIP, TMs take the responsibility to delete the 
> > > > > physical
> > > > > files.
> > > > >
> > > > > (2) IIUC, the RecoverableWriter is introduced to persist data in the 
> > > > > "in
> > > > > progress" files after each checkpoint, and the implementation may be 
> > > > > based
> > > > > on the file sync in some file systems. However, since the sync is a 
> > > > > heavy
> > > > > operation for DFS, this FLIP wants to use flush instead of the sync 
> > > > > with
> > > > > the best effort. This only fits the case that the DFS is considered
> > > > > reliable. The problems they want to solve are different.
> > > > >
> > > > > (3) Yes, if files are managed by JM via the shared registry, this 
> > > > > problem
> > > > > is solved. And as I mentioned in (1), there are some other corner 
> > > > > cases
> > > > > hard to resolve via the shared registry.
> > > > >
> > > > > The goal of this FLIP is to have a common way of merging files in all 
> > > > > use
> > > > > cases. For shared state it merges at subtask level, while for private 
> > > > > state
> > > > > (and changelog files, as I replied to Yanfei), files are merged at TM
> > > > > level. So it is not contrary to the current plan for the unaligned
> > > > > checkpoint state (FLINK-26803). You are right that the unaligned 
> > > > > checkpoint
> > > > > state would be merged with the operator's state file, so overall, it 
> > > > > is
> > > > > slightly better than what's currently done.
> > > > >
> > > > >
> > > > > Thanks again for the valuable comments!
> > > > >
> > > > > Best regards,
> > > > > Zakelly
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Apr 5, 2023 at 8:43 PM Piotr Nowojski <pnowoj...@apache.org>
> > > > > wrote:
> > > > >
> > > > > Hi,
> > > > >
> > > > > Thanks for coming up with the proposal, it's definitely valuable. I'm 
> > > > > still
> > > > > reading and trying to understand the proposal, but a couple of 
> > > > > comments
> > > > > from my side.
> > > > >
> > > > > (1)
> > > > > > Ownership of a single checkpoint file is transferred to TM, while JM
> > > > > manages the parent directory of these files.
> > > > >
> > > > > Have you seen https://issues.apache.org/jira/browse/FLINK-24611 
> > > > > before? I
> > > > > don't fully remember why, but we have rejected the idea of moving the 
> > > > > file
> > > > > ownership to TM and instead reworked the shared file registry in a 
> > > > > way that
> > > > > I think should be sufficient for file sharing. Could you elaborate 
> > > > > why we
> > > > > need to move the file ownership to TM, and why is the current 
> > > > > mechanism not
> > > > > sufficient?
> > > > >
> > > > > (2)
> > > > > > File visibility is needed when a Flink job recovers after a 
> > > > > > checkpoint is
> > > > > materialized. In some DFS, such as most object storages, a file is 
> > > > > only
> > > > > visible after it is closed
> > > > >
> > > > > Is that really the case?
> > > > > `org.apache.flink.core.fs.FileSystem#createRecoverableWriter` seems 
> > > > > to be
> > > > > addressing exactly this issue, and the most frequently used 
> > > > > FileSystem (S3)
> > > > > AFAIK supports it with no problems?
> > > > >
> > > > > (3)
> > > > > > 4.1.2 Merge files within a subtask or a TM
> > > > > > Given that TMs are reassigned after restoration, it is difficult to
> > > > > manage physical files that contain data from multiple subtasks 
> > > > > scattered
> > > > > across different TMs (as depicted in Fig.3). There is no 
> > > > > synchronization
> > > > > mechanism between TMs, making file management in this scenario 
> > > > > challenging.
> > > > >
> > > > > I think this is solved in many places already via the shared state 
> > > > > managed
> > > > > by the JM, as I mentioned in (1).
> > > > >
> > > > >
> > > > > If I understand it correctly you are proposing to have a common
> > > > > interface/way of merging small files, in all use cases, that would 
> > > > > work
> > > > > only across a single subtask? That's contrary to what's currently 
> > > > > done for
> > > > > unaligned checkpoints, right? But if this generic mechanism was to be 
> > > > > used
> > > > > for unaligned checkpoints, unaligned checkpoint state would have been
> > > > > merged with the operators state file, so all in all there would be no
> > > > > regression visible to a user? The limit is that we always have at 
> > > > > least a
> > > > > single file per subtask, but in exchange we are getting a simpler 
> > > > > threading
> > > > > model?
> > > > >
> > > > > Bets,
> > > > > Piotrek
> > > > >
> > > > > wt., 4 kwi 2023 o 08:51 Zakelly Lan <zakelly....@gmail.com> 
> > > > > napisał(a):
> > > > >
> > > > > > Hi Yanfei,
> > > > > >
> > > > > > Thank you for your prompt response.
> > > > > >
> > > > > > I agree that managing (deleting) only some folders with JM can 
> > > > > > greatly
> > > > > > relieve JM's burden. Thanks for pointing this out.
> > > > > >
> > > > > > In general, merging at the TM level is more effective since there 
> > > > > > are
> > > > > > usually more files to merge. Therefore, I believe it is better to
> > > > > > merge files per TM as much as possible.  However, for shared state,
> > > > > > merging at the subtask level is the best choice to prevent 
> > > > > > significant
> > > > > > data transfer over the network after restoring. I think it is better
> > > > > > to keep these merging granularities for different types of files as
> > > > > > presets that are not configurable. WDYT?
> > > > > >
> > > > > > As for the DSTL files, they are merged per TM and placed in the
> > > > > > task-owned folder. These files can be classified as shared state 
> > > > > > since
> > > > > > they are shared across checkpoints. However, the DSTL file is a
> > > > > > special case that will be subsumed by the first checkpoint of the
> > > > > > newly restored job. Therefore, there is no need for new TMs to keep
> > > > > > these files after the old checkpoint is subsumed, just like the
> > > > > > private state files. Thus, it is feasible to merge DSTL files per TM
> > > > > > without introducing complex file management across job attempts. So
> > > > > > the possible performance degradation is avoided.
> > > > > >
> > > > > > The three newly introduced options have recommended defaults. For
> > > > > > upcoming versions, this feature is turned off by default. For the
> > > > > > second option, SEGMENTED_ACROSS_CP_BOUNDARY is the recommended 
> > > > > > default
> > > > > > as it is more effective. Of course, if encountering some DFS that 
> > > > > > does
> > > > > > not support file visibility until the file is closed, it is possible
> > > > > > to fall back to another option automatically. For the third option,
> > > > > > 64MB is an acceptable target size. The RocksDB state backend in 
> > > > > > Flink
> > > > > > also chooses 64MB as the default target file size.
> > > > > >
> > > > > >
> > > > > > Thank you again for your quick response.
> > > > > >
> > > > > >
> > > > > > Best regards,
> > > > > > Zakelly
> > > > > >
> > > > > >
> > > > > > On Mon, Apr 3, 2023 at 11:27 PM Yanfei Lei <fredia...@gmail.com> 
> > > > > > wrote:
> > > > > > >
> > > > > > > Hi Zakelly,
> > > > > > >
> > > > > > > Thanks for driving this,  this proposal enables the files merging 
> > > > > > > of
> > > > > > > different types of states to be grouped under a unified 
> > > > > > > framework. I
> > > > > > > think it has the added benefit of lightening the load on JM. As
> > > > > > > FLINK-26590[1] described,  triggered checkpoints can be delayed by
> > > > > > > discarding shared state when JM manages a large number of files. 
> > > > > > > After
> > > > > > > this FLIP, JM only needs to manage some folders, which greatly 
> > > > > > > reduces
> > > > > > > the burden on JM.
> > > > > > >
> > > > > > > In Section 4.1, two types of merging granularities(per subtask 
> > > > > > > and per
> > > > > > > task manager) are proposed, the shared state is managed by per 
> > > > > > > subtask
> > > > > > > granularity, but for the changelog state backend, its DSTL files 
> > > > > > > are
> > > > > > > shared between checkpoints, and are currently merged in batches 
> > > > > > > at the
> > > > > > > task manager level. When merging with the 
> > > > > > > SEGMENTED_WITHIN_CP_BOUNDARY
> > > > > > > mode, I'm concerned about the performance degradation of its 
> > > > > > > merging,
> > > > > > > hence I wonder if the merge granularities are configurable? 
> > > > > > > Further,
> > > > > > > from a user perspective, three new options are introduced in this
> > > > > > > FLIP, do they have recommended defaults?
> > > > > > >
> > > > > > >
> > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-26590
> > > > > > >
> > > > > > > Best,
> > > > > > > Yanfei
> > > > > > >
> > > > > > > Zakelly Lan <zakelly....@gmail.com> 于2023年4月3日周一 18:36写道:
> > > > > > >
> > > > > > > >
> > > > > > > > Hi everyone,
> > > > > > > >
> > > > > > > > I would like to open a discussion on providing a unified file 
> > > > > > > > merging
> > > > > > > > mechanism for checkpoints[1].
> > > > > > > >
> > > > > > > > Currently, many files are uploaded to the DFS during 
> > > > > > > > checkpoints,
> > > > > > > > leading to the 'file flood' problem when running
> > > > > > > > intensive workloads in a cluster.  To tackle this problem, 
> > > > > > > > various
> > > > > > > > solutions have been proposed for different types
> > > > > > > > of state files. Although these methods are similar, they lack a
> > > > > > > > systematic view and approach. We believe that it is
> > > > > > > > better to consider this problem as a whole and introduce a 
> > > > > > > > unified
> > > > > > > > framework to address the file flood problem for
> > > > > > > > all types of state files. A POC has been implemented based on 
> > > > > > > > current
> > > > > > > > FLIP design, and the test results are promising.
> > > > > > > >
> > > > > > > >
> > > > > > > > Looking forward to your comments or feedback.
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > > Zakelly
> > > > > > > >
> > > > > > > > [1]
> > > > > >
> > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints
> > > > > >
> > > > >
> > > > >

Reply via email to