Hi Zakelly,

Since we already had some discussions on this topic in the doc I mentioned, 
could you please describe the difference in your FLIP?

I think we should better have a comparing table across different options just 
like the doc wrote. And we could also list some of them in your Rejected 
Alternatives part.


Best
Yun Tang
________________________________
From: Zakelly Lan <zakelly....@gmail.com>
Sent: Tuesday, April 11, 2023 17:57
To: dev@flink.apache.org <dev@flink.apache.org>
Subject: Re: [DISCUSS] FLIP-306: Unified File Merging Mechanism for Checkpoints

Hi Rui Fan,

Thanks for your comments!

> (1) The temporary segment will remain in the physical file for a short time, 
> right?

Yes, any written segment will remain in the physical file until the
physical file is deleted. It is controlled by the reference counting.
And as discussed in 4.7, this will result in a space amplification
problem.


> (2) Is subtask granularity confused with shared state?

Merging files at granularity of subtask is a general solution for
shared states, considering the file may be reused by the following
checkpoint after job restore. This design is applicable to sst files
and any other shared states that may arise in the future. However, the
DSTL files are a special case of shared states, since these files will
no longer be shared after job restore. Therefore, we may do an
optimization for these files and merge them at the TM level.
Currently, the DSTL files are not in the shared directory of
checkpoint storage, and I suggest we keep it as it is. I agree that
this may bring in some confusion, and I suggest the FLIP mainly
discuss the general situation and list the special situations
separately without bringing in new concepts. I will add another
paragraph describing the file merging for DSTL files. WDYT?


> (3) When rescaling, do all shared files need to be copied?

I agree with you that only sst files of the base DB need to be copied
(or re-uploaded in the next checkpoint). However, section 4.2
simplifies file copying issues (copying all files), following the
concept of shared state.


> (4) Does the space magnification ratio need a configuration option?

Thanks for the reminder, I will add an option in this FLIP.


> (5) How many physical files can a TM write at the same checkpoint at the same 
> time?

This is a very good point. Actually, there is a file reuse pool as
section 4.6 described. There could be multiple files within this pool,
supporting concurrent writing by multiple writers. I suggest providing
two configurations to control the file number:

  state.checkpoints.file-merging.max-file-pool-size: Specifies the
upper limit of the file pool size.
  state.checkpoints.file-merging.max-subtasks-per-file: Specifies the
lower limit of the file pool size based on the number of subtasks
within each TM.

The number of simultaneously open files is controlled by these two
options, and the first option takes precedence over the second.

WDYT?



Thanks a lot for your valuable insight.

Best regards,
Zakelly


On Mon, Apr 10, 2023 at 7:08 PM Rui Fan <1996fan...@gmail.com> wrote:
>
> Hi all,
>
> Thanks Zakelly driving this proposal, and thank you all for
> the warm discussions. It's really a useful feature.
>
> I have a few questions about this FLIP.
>
> (1) The temporary segment will remain in the physical file for
> a short time, right?
>
> FLIP proposes to write segments instead of physical files.
> If the physical files are written directly, these temporary files
> will be deleted after the checkpoint is aborted. When writing
> a segment, how to delete the temporary segment?
> Decrement the reference count value by 1?
>
> (2) Is subtask granularity confused with shared state?
>
> From the "4.1.2 Merge files within a subtask or a TM" part,
> based on the principle of sst files, it is concluded that
> "For shared states, files are merged within each subtask."
>
> I'm not sure whether this conclusion is general or just for sst.
> As Yanfei mentioned before:
>
> > DSTL files are shared between checkpoints, and are
> > currently merged in batches at the task manager level.
>
> DSTL files as the shared state in FLIP-306, however, it
> would be better to merge at TM granularity. So, I'm not
> sure whether the subtask granularity confused with
> shared state?
>
> And I'm not familiar with DSTL file merging, should
> shared state be divided into shared subtask state
> and shared TM state?
>
> (3) When rescaling, do all shared files need to be copied?
>
> From the "4.2 Rescaling and Physical File Lifecycle" part,
> I see a lot of file copying.
>
> As I understand, only sst files of the baseDB need to be copied.
> From the restore code[1], when restoreWithRescaling, flink will
> init a base DB instance, read all contents from other temporary
> rocksdb instances, and write them into the base DB, and then
> the temporary rocksdb instance will be discarded.
>
> So, I think copying the files of the base rocksdb is enough, and
> the files of other rocksdb instances aren't used.
>
> Or do not copy any files during recovery, upload all sst files
> at the first checkpoint.
>
> (4) Does the space magnification ratio need a configuration option?
>
> From the step1 of  "4.7 Space amplification" part, I see:
>
> > Checking whether the space amplification of each file is greater than a
> preset threshold and collecting files that exceed the threshold for
> compaction.
>
> Should we add a configuration option about the compaction threshold?
> I didn't see it at "5. Public interfaces and User Cases" part.
>
> (5) How many physical files can a TM write at the same
> checkpoint at the same time?
>
> From the "5. Public interfaces and User Cases" part, I see:
>
> > A configuration option that sets a maximum size limit for physical files.
>
> I guess that each type of state(private or shared state) will only
> write one file at the same time at the same checkpoint.
> When the file reaches the maximum size, flink will start writing
> the next file, right?
>
> If yes, for shared state, will
> "state.backend.rocksdb.checkpoint.transfer.thread.num"
> be invalid?
>
> For private state, a TM may have many tasks (because of slot
> sharing, more than 20 tasks may run in a slot), and the
> performance of all tasks serially writing files may be poor,
> eventually resulting in longer checkpoint time.
>
> That's why FLINK-26803[2] introduced a configuration option:
> "execution.checkpointing.unaligned.max-subtasks-per-channel-state-file".
> Flink users can set the maximum number of subtasks that
> share the same channel state file.
>
> That's all my questions right now, please correct me if
> anything is wrong.
>
> Anyway, this FLIP is useful for the stability of large-scale
> flink production. Looking forward to its completion and
> eventual acceptance by the community.
>
> [1]
> https://github.com/apache/flink/blob/65710b437318364ec19c0369d038ac2222c10498/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L292
> [2] https://issues.apache.org/jira/browse/FLINK-26803
>
> Best,
> Rui Fan
>
> On Fri, Apr 7, 2023 at 8:42 PM Jing Ge <j...@ververica.com.invalid> wrote:
>
> > Hi,
> >
> > Jingsong, Yanfei, please check, if you can view the doc. Thanks.
> >
> > Best regards,
> > Jing
> >
> > On Fri, Apr 7, 2023 at 2:19 PM Zakelly Lan <zakelly....@gmail.com> wrote:
> >
> > > Hi Yanfei,
> > >
> > > Thanks for your comments.
> > >
> > > > Does this result in a larger space amplification? Maybe a more
> > > suitable value can be determined through some experimental statistics
> > > after we implement this feature.
> > >
> > > Yes, it results in larger space amplification for shared states. I
> > > will do more tests and investigation.
> > >
> > >
> > > Thanks.
> > >
> > > Best regards,
> > > Zakelly
> > >
> > > On Fri, Apr 7, 2023 at 8:15 PM Zakelly Lan <zakelly....@gmail.com>
> > wrote:
> > > >
> > > > Hi @Piotr and @Jingsong Li
> > > >
> > > > I have read access to the document, but I'm not sure whether the owner
> > > > of this document wants to make it public. Actually, the doc is for
> > > > FLINK-23342 and there is a candidate design very similar to this FLIP,
> > > > but only for the shared state. Like Yun said, the previous design is
> > > > not taken because of the code complexity, however I think it is
> > > > acceptable after implementing the POC[1]. I think we could focus on
> > > > the current plan, WDTY?
> > > >
> > > >
> > > > [1] POC of this FLIP:
> > > >
> > >
> > https://github.com/Zakelly/flink/commit/98538185182e33739828ee36ab96dcf2aebad80c
> > > >
> > > > On Fri, Apr 7, 2023 at 8:13 PM Zakelly Lan <zakelly....@gmail.com>
> > > wrote:
> > > > >
> > > > > Hi Piotr,
> > > > >
> > > > > Thanks for your comments!
> > > > >
> > > > > (1) Sorry for the misleading, let me make it more clear. It is a
> > > > > concurrent checkpoint senario. Yes, the assumption you said needs to
> > > > > be followed, but the state handles here refer to the original SST
> > > > > files, not the underlying file. In this FLIP when checkpoint N and
> > N+1
> > > > > are running concurrently, they reuse files from checkpoint N-1, and
> > > > > some of the files may be deleted when checkpoint N completes while
> > > > > checkpoint N+1 is still writing on it. There is no such problem for
> > > > > original shared states without file merging because when a state
> > > > > handle (or sst file here) from checkpoint N-1 is not referenced by
> > > > > checkpoint N, it will not be referenced by checkpoint N+1. So the
> > > > > subsumption of sst files from checkpoint N-1 are safe.
> > > > > For above example, when reaching step "d.", File 1 reached the size
> > > > > threshold and will not be used. The Chk-2 and Chk-3 are running
> > > > > concurrently, and the File 3 is being written by Chk-2, so it can not
> > > > > be used by Chk-3 (As described in section 4.6). Here comes the
> > > > > problem.
> > > > >
> > > > > (2) Please correct me if I'm wrong. The purpose of the
> > > > > `RecoverableWriter` is to provide a reliable file writer even
> > > > > tolerable with job failure and recovery. The implementation varies
> > > > > among the file systems, some of which involves writing into temporary
> > > > > files (such as HDFS). As a result, it may produce more RPC requests
> > to
> > > > > the DFS.
> > > > > The goal of this FLIP is to reduce the pressure on DFS, especially
> > the
> > > > > number of files and RPC requests. Currently the TMs are NOT using the
> > > > > RecoverableWriter to persist/upload the state files, and a file
> > > > > closing is enough. The section 4.1.1 is trying to omit this file
> > > > > closing but ensure file visibility in some DFS, thus reducing
> > pressure
> > > > > on DFS. That's why I said the problems they want to solve are
> > > > > different. I'm not sure if I made myself clear.
> > > > >
> > > > >
> > > > > Thanks!
> > > > >
> > > > > Best regards,
> > > > > Zakelly
> > > > >
> > > > > On Fri, Apr 7, 2023 at 8:08 PM Zakelly Lan <zakelly....@gmail.com>
> > > wrote:
> > > > > >
> > > > > > Hi Yun,
> > > > > >
> > > > > > Thanks for your suggestions!
> > > > > >
> > > > > > I have read the FLINK-23342 and its design doc as you provided.
> > First
> > > > > > of all the goal of this FLIP and the doc are similar, and the
> > design
> > > > > > of this FLIP is pretty much like option 3. The main difference is
> > > that
> > > > > > we imply the concept of 'epoch' in the folder path for each
> > > > > > granularity. For shared state, the folder for each subtask is like
> > > > > > "${checkpointBaseDir}/shared/subtask-{index}-{parallelism}", so if
> > > the
> > > > > > ${checkpointBaseDir} changes (when user restart a job manually) or
> > > the
> > > > > > ${parallelism} changes (when rescaling), there will be a
> > > re-uploading,
> > > > > > and the JM takes care of the old artifacts. The folder path for
> > > > > > private state is in the form of
> > > > > > "${checkpointBaseDir}/tm-owned/${tmResourceId}" and the division of
> > > > > > responsibilities between JM and TM is similar. The design of this
> > > FLIP
> > > > > > inherits all the advantages of the design of option 3 in that doc,
> > > and
> > > > > > also avoids extra communication for epoch maintenance. As for the
> > > code
> > > > > > complexity, you may check the POC commit[1] and find that the
> > > > > > implementation is pretty clean and is a totally new code path
> > making
> > > > > > nearly no influence on the old one. Comparing the number of lines
> > of
> > > > > > code change with what's currently done for merging channel state[2]
> > > > > > (5200 vs. 2500 additions), I think it is acceptable considering we
> > > are
> > > > > > providing a unified file merging framework, which would save a lot
> > of
> > > > > > effort in future. WDYT?
> > > > > >
> > > > > > Thanks.
> > > > > >
> > > > > > Best regards,
> > > > > > Zakelly
> > > > > >
> > > > > > [1] POC of this FLIP:
> > > > > >
> > >
> > https://github.com/Zakelly/flink/commit/98538185182e33739828ee36ab96dcf2aebad80c
> > > > > > [2] Commit for FLINK-26803 (Merge the channel state files) :
> > > > > >
> > >
> > https://github.com/apache/flink/commit/8be94e6663d8ac6e3d74bf4cd5f540cc96c8289e
> > > > > >
> > > > > >
> > > > > > On Fri, Apr 7, 2023 at 7:22 PM Yanfei Lei <fredia...@gmail.com>
> > > wrote:
> > > > > > >
> > > > > > > Thanks for your explanation Zakelly.
> > > > > > > (1) Keeping these merging granularities for different types of
> > > files
> > > > > > > as presets that are not configurable is a good idea to prevent
> > > > > > > performance degradation.
> > > > > > >
> > > > > > > (2)
> > > > > > > > For the third option, 64MB is an acceptable target size. The
> > > RocksDB state backend in Flink also chooses 64MB as the default target
> > file
> > > size.
> > > > > > >
> > > > > > > Does this result in a larger space amplification? Maybe a more
> > > > > > > suitable value can be determined through some experimental
> > > statistics
> > > > > > > after we implement this feature.
> > > > > > >
> > > > > > > Best,
> > > > > > > Yanfei
> > > > > > >
> > > > > > > Jingsong Li <jingsongl...@gmail.com> 于2023年4月7日周五 17:09写道:
> > > > > > > >
> > > > > > > > Hi Yun,
> > > > > > > >
> > > > > > > > It looks like this doc needs permission to read? [1]
> > > > > > > >
> > > > > > > > [1]
> > >
> > https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit#
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Jingsong
> > > > > > > >
> > > > > > > > On Fri, Apr 7, 2023 at 4:34 PM Piotr Nowojski <
> > > pnowoj...@apache.org> wrote:
> > > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > >
> > > > > > > > > +1 To what Yun Tang wrote. We don't seem to have access to
> > the
> > > design doc.
> > > > > > > > > Could you make it publicly visible or copy out its content to
> > > another
> > > > > > > > > document?
> > > > > > > > >
> > > > > > > > > Thanks for your answers Zakelly.
> > > > > > > > >
> > > > > > > > > (1)
> > > > > > > > > Yes, the current mechanism introduced in FLINK-24611 allows
> > > for checkpoint
> > > > > > > > > N, to only re-use shared state handles that have been already
> > > referenced by
> > > > > > > > > checkpoint N-1. But why do we need to break this assumption?
> > > In your step,
> > > > > > > > > "d.", TM could adhere to that assumption, and instead of
> > > reusing File-2, it
> > > > > > > > > could either re-use File-1, File-3 or create a new file.
> > > > > > > > >
> > > > > > > > > (2)
> > > > > > > > > Can you elaborate a bit more on this? As far as I recall, the
> > > purpose of
> > > > > > > > > the `RecoverableWriter` is to support exactly the things
> > > described in this
> > > > > > > > > FLIP, so what's the difference? If you are saying that for
> > > this FLIP you
> > > > > > > > > can implement something more efficiently for a given
> > > FileSystem, then why
> > > > > > > > > can it not be done the same way for the `RecoverableWriter`?
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Piotrek
> > > > > > > > >
> > > > > > > > > czw., 6 kwi 2023 o 17:24 Yun Tang <myas...@live.com>
> > > napisał(a):
> > > > > > > > >
> > > > > > > > > > Hi Zakelly,
> > > > > > > > > >
> > > > > > > > > > Thanks for driving this work!
> > > > > > > > > >
> > > > > > > > > > I'm not sure did you ever read the discussion between
> > > Stephan, Roman,
> > > > > > > > > > Piotr, Yuan and I in the design doc [1] in nearly two years
> > > ago.
> > > > > > > > > >
> > > > > > > > > > From my understanding, your proposal is also a mixed state
> > > ownership: some
> > > > > > > > > > states are owned by the TM while some are owned by the JM.
> > > If my memory is
> > > > > > > > > > correct, we did not take the option-3 or option-5 in the
> > > design doc [1] for
> > > > > > > > > > the code complexity when implements the 1st version of
> > > changelog
> > > > > > > > > > state-backend.
> > > > > > > > > >
> > > > > > > > > > Could you also compare the current FLIP with the proposals
> > > in the design
> > > > > > > > > > doc[1]? From my understanding, we should at least consider
> > > to comapre with
> > > > > > > > > > option-3 and option-5 as they are all mixed solutions.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > [1]
> > > > > > > > > >
> > >
> > https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit#
> > > > > > > > > >
> > > > > > > > > > Best
> > > > > > > > > > Yun Tang
> > > > > > > > > >
> > > > > > > > > > ------------------------------
> > > > > > > > > > *From:* Zakelly Lan <zakelly....@gmail.com>
> > > > > > > > > > *Sent:* Thursday, April 6, 2023 16:38
> > > > > > > > > > *To:* dev@flink.apache.org <dev@flink.apache.org>
> > > > > > > > > > *Subject:* Re: [DISCUSS] FLIP-306: Unified File Merging
> > > Mechanism for
> > > > > > > > > > Checkpoints
> > > > > > > > > >
> > > > > > > > > > Hi Piotr,
> > > > > > > > > >
> > > > > > > > > > Thanks for all the feedback.
> > > > > > > > > >
> > > > > > > > > > (1) Thanks for the reminder. I have just seen the
> > > FLINK-24611, the delayed
> > > > > > > > > > deletion by JM resolves some sync problems between JM and
> > > TM, but I'm
> > > > > > > > > > afraid it is still not feasible for the file sharing in
> > this
> > > FLIP.
> > > > > > > > > > Considering a concurrent checkpoint scenario as follows:
> > > > > > > > > >    a. Checkpoint 1 finishes. 1.sst, 2.sst and 3.sst are
> > > written in file 1,
> > > > > > > > > > and 4.sst is written in file 2.
> > > > > > > > > >    b. Checkpoint 2 starts based on checkpoint 1, including
> > > 1.sst, 2.sst
> > > > > > > > > > and 5.sst.
> > > > > > > > > >    c. Checkpoint 3 starts based on checkpoint 1, including
> > > 1.sst, 2.sst
> > > > > > > > > > and 5.sst as well.
> > > > > > > > > >    d. Checkpoint 3 reuses the file 2, TM writes 5.sst on
> > it.
> > > > > > > > > >    e. Checkpoint 2 creates a new file 3, TM writes 5.sst on
> > > it.
> > > > > > > > > >    f. Checkpoint 2 finishes, checkpoint 1 is subsumed and
> > > the file 2 is
> > > > > > > > > > deleted, while checkpoint 3 still needs file 2.
> > > > > > > > > >
> > > > > > > > > > I attached a diagram to describe the scenario.
> > > > > > > > > > [image: concurrent cp.jpg]
> > > > > > > > > > The core issue is that this FLIP introduces a mechanism
> > that
> > > allows
> > > > > > > > > > physical files to be potentially used by the next several
> > > checkpoints. JM
> > > > > > > > > > is uncertain whether there will be a TM continuing to write
> > > to a specific
> > > > > > > > > > file. So in this FLIP, TMs take the responsibility to
> > delete
> > > the physical
> > > > > > > > > > files.
> > > > > > > > > >
> > > > > > > > > > (2) IIUC, the RecoverableWriter is introduced to persist
> > > data in the "in
> > > > > > > > > > progress" files after each checkpoint, and the
> > > implementation may be based
> > > > > > > > > > on the file sync in some file systems. However, since the
> > > sync is a heavy
> > > > > > > > > > operation for DFS, this FLIP wants to use flush instead of
> > > the sync with
> > > > > > > > > > the best effort. This only fits the case that the DFS is
> > > considered
> > > > > > > > > > reliable. The problems they want to solve are different.
> > > > > > > > > >
> > > > > > > > > > (3) Yes, if files are managed by JM via the shared
> > registry,
> > > this problem
> > > > > > > > > > is solved. And as I mentioned in (1), there are some other
> > > corner cases
> > > > > > > > > > hard to resolve via the shared registry.
> > > > > > > > > >
> > > > > > > > > > The goal of this FLIP is to have a common way of merging
> > > files in all use
> > > > > > > > > > cases. For shared state it merges at subtask level, while
> > > for private state
> > > > > > > > > > (and changelog files, as I replied to Yanfei), files are
> > > merged at TM
> > > > > > > > > > level. So it is not contrary to the current plan for the
> > > unaligned
> > > > > > > > > > checkpoint state (FLINK-26803). You are right that the
> > > unaligned checkpoint
> > > > > > > > > > state would be merged with the operator's state file, so
> > > overall, it is
> > > > > > > > > > slightly better than what's currently done.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Thanks again for the valuable comments!
> > > > > > > > > >
> > > > > > > > > > Best regards,
> > > > > > > > > > Zakelly
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Wed, Apr 5, 2023 at 8:43 PM Piotr Nowojski <
> > > pnowoj...@apache.org>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > Hi,
> > > > > > > > > >
> > > > > > > > > > Thanks for coming up with the proposal, it's definitely
> > > valuable. I'm still
> > > > > > > > > > reading and trying to understand the proposal, but a couple
> > > of comments
> > > > > > > > > > from my side.
> > > > > > > > > >
> > > > > > > > > > (1)
> > > > > > > > > > > Ownership of a single checkpoint file is transferred to
> > > TM, while JM
> > > > > > > > > > manages the parent directory of these files.
> > > > > > > > > >
> > > > > > > > > > Have you seen
> > > https://issues.apache.org/jira/browse/FLINK-24611 before? I
> > > > > > > > > > don't fully remember why, but we have rejected the idea of
> > > moving the file
> > > > > > > > > > ownership to TM and instead reworked the shared file
> > > registry in a way that
> > > > > > > > > > I think should be sufficient for file sharing. Could you
> > > elaborate why we
> > > > > > > > > > need to move the file ownership to TM, and why is the
> > > current mechanism not
> > > > > > > > > > sufficient?
> > > > > > > > > >
> > > > > > > > > > (2)
> > > > > > > > > > > File visibility is needed when a Flink job recovers after
> > > a checkpoint is
> > > > > > > > > > materialized. In some DFS, such as most object storages, a
> > > file is only
> > > > > > > > > > visible after it is closed
> > > > > > > > > >
> > > > > > > > > > Is that really the case?
> > > > > > > > > >
> > > `org.apache.flink.core.fs.FileSystem#createRecoverableWriter` seems to be
> > > > > > > > > > addressing exactly this issue, and the most frequently used
> > > FileSystem (S3)
> > > > > > > > > > AFAIK supports it with no problems?
> > > > > > > > > >
> > > > > > > > > > (3)
> > > > > > > > > > > 4.1.2 Merge files within a subtask or a TM
> > > > > > > > > > > Given that TMs are reassigned after restoration, it is
> > > difficult to
> > > > > > > > > > manage physical files that contain data from multiple
> > > subtasks scattered
> > > > > > > > > > across different TMs (as depicted in Fig.3). There is no
> > > synchronization
> > > > > > > > > > mechanism between TMs, making file management in this
> > > scenario challenging.
> > > > > > > > > >
> > > > > > > > > > I think this is solved in many places already via the
> > shared
> > > state managed
> > > > > > > > > > by the JM, as I mentioned in (1).
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > If I understand it correctly you are proposing to have a
> > > common
> > > > > > > > > > interface/way of merging small files, in all use cases,
> > that
> > > would work
> > > > > > > > > > only across a single subtask? That's contrary to what's
> > > currently done for
> > > > > > > > > > unaligned checkpoints, right? But if this generic mechanism
> > > was to be used
> > > > > > > > > > for unaligned checkpoints, unaligned checkpoint state would
> > > have been
> > > > > > > > > > merged with the operators state file, so all in all there
> > > would be no
> > > > > > > > > > regression visible to a user? The limit is that we always
> > > have at least a
> > > > > > > > > > single file per subtask, but in exchange we are getting a
> > > simpler threading
> > > > > > > > > > model?
> > > > > > > > > >
> > > > > > > > > > Bets,
> > > > > > > > > > Piotrek
> > > > > > > > > >
> > > > > > > > > > wt., 4 kwi 2023 o 08:51 Zakelly Lan <zakelly....@gmail.com
> > >
> > > napisał(a):
> > > > > > > > > >
> > > > > > > > > > > Hi Yanfei,
> > > > > > > > > > >
> > > > > > > > > > > Thank you for your prompt response.
> > > > > > > > > > >
> > > > > > > > > > > I agree that managing (deleting) only some folders with
> > JM
> > > can greatly
> > > > > > > > > > > relieve JM's burden. Thanks for pointing this out.
> > > > > > > > > > >
> > > > > > > > > > > In general, merging at the TM level is more effective
> > > since there are
> > > > > > > > > > > usually more files to merge. Therefore, I believe it is
> > > better to
> > > > > > > > > > > merge files per TM as much as possible.  However, for
> > > shared state,
> > > > > > > > > > > merging at the subtask level is the best choice to
> > prevent
> > > significant
> > > > > > > > > > > data transfer over the network after restoring. I think
> > it
> > > is better
> > > > > > > > > > > to keep these merging granularities for different types
> > of
> > > files as
> > > > > > > > > > > presets that are not configurable. WDYT?
> > > > > > > > > > >
> > > > > > > > > > > As for the DSTL files, they are merged per TM and placed
> > > in the
> > > > > > > > > > > task-owned folder. These files can be classified as
> > shared
> > > state since
> > > > > > > > > > > they are shared across checkpoints. However, the DSTL
> > file
> > > is a
> > > > > > > > > > > special case that will be subsumed by the first
> > checkpoint
> > > of the
> > > > > > > > > > > newly restored job. Therefore, there is no need for new
> > > TMs to keep
> > > > > > > > > > > these files after the old checkpoint is subsumed, just
> > > like the
> > > > > > > > > > > private state files. Thus, it is feasible to merge DSTL
> > > files per TM
> > > > > > > > > > > without introducing complex file management across job
> > > attempts. So
> > > > > > > > > > > the possible performance degradation is avoided.
> > > > > > > > > > >
> > > > > > > > > > > The three newly introduced options have recommended
> > > defaults. For
> > > > > > > > > > > upcoming versions, this feature is turned off by default.
> > > For the
> > > > > > > > > > > second option, SEGMENTED_ACROSS_CP_BOUNDARY is the
> > > recommended default
> > > > > > > > > > > as it is more effective. Of course, if encountering some
> > > DFS that does
> > > > > > > > > > > not support file visibility until the file is closed, it
> > > is possible
> > > > > > > > > > > to fall back to another option automatically. For the
> > > third option,
> > > > > > > > > > > 64MB is an acceptable target size. The RocksDB state
> > > backend in Flink
> > > > > > > > > > > also chooses 64MB as the default target file size.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Thank you again for your quick response.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Best regards,
> > > > > > > > > > > Zakelly
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Mon, Apr 3, 2023 at 11:27 PM Yanfei Lei <
> > > fredia...@gmail.com> wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > Hi Zakelly,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for driving this,  this proposal enables the
> > > files merging of
> > > > > > > > > > > > different types of states to be grouped under a unified
> > > framework. I
> > > > > > > > > > > > think it has the added benefit of lightening the load
> > on
> > > JM. As
> > > > > > > > > > > > FLINK-26590[1] described,  triggered checkpoints can be
> > > delayed by
> > > > > > > > > > > > discarding shared state when JM manages a large number
> > > of files. After
> > > > > > > > > > > > this FLIP, JM only needs to manage some folders, which
> > > greatly reduces
> > > > > > > > > > > > the burden on JM.
> > > > > > > > > > > >
> > > > > > > > > > > > In Section 4.1, two types of merging granularities(per
> > > subtask and per
> > > > > > > > > > > > task manager) are proposed, the shared state is managed
> > > by per subtask
> > > > > > > > > > > > granularity, but for the changelog state backend, its
> > > DSTL files are
> > > > > > > > > > > > shared between checkpoints, and are currently merged in
> > > batches at the
> > > > > > > > > > > > task manager level. When merging with the
> > > SEGMENTED_WITHIN_CP_BOUNDARY
> > > > > > > > > > > > mode, I'm concerned about the performance degradation
> > of
> > > its merging,
> > > > > > > > > > > > hence I wonder if the merge granularities are
> > > configurable? Further,
> > > > > > > > > > > > from a user perspective, three new options are
> > > introduced in this
> > > > > > > > > > > > FLIP, do they have recommended defaults?
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-26590
> > > > > > > > > > > >
> > > > > > > > > > > > Best,
> > > > > > > > > > > > Yanfei
> > > > > > > > > > > >
> > > > > > > > > > > > Zakelly Lan <zakelly....@gmail.com> 于2023年4月3日周一
> > > 18:36写道:
> > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Hi everyone,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I would like to open a discussion on providing a
> > > unified file merging
> > > > > > > > > > > > > mechanism for checkpoints[1].
> > > > > > > > > > > > >
> > > > > > > > > > > > > Currently, many files are uploaded to the DFS during
> > > checkpoints,
> > > > > > > > > > > > > leading to the 'file flood' problem when running
> > > > > > > > > > > > > intensive workloads in a cluster.  To tackle this
> > > problem, various
> > > > > > > > > > > > > solutions have been proposed for different types
> > > > > > > > > > > > > of state files. Although these methods are similar,
> > > they lack a
> > > > > > > > > > > > > systematic view and approach. We believe that it is
> > > > > > > > > > > > > better to consider this problem as a whole and
> > > introduce a unified
> > > > > > > > > > > > > framework to address the file flood problem for
> > > > > > > > > > > > > all types of state files. A POC has been implemented
> > > based on current
> > > > > > > > > > > > > FLIP design, and the test results are promising.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Looking forward to your comments or feedback.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Best regards,
> > > > > > > > > > > > > Zakelly
> > > > > > > > > > > > >
> > > > > > > > > > > > > [1]
> > > > > > > > > > >
> > > > > > > > > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > >
> >

Reply via email to