Hi Zakelly,

Thanks for the clarification!

Currently, I understand what you mean, and LGTM.

Best,
Rui Fan

On Fri, May 5, 2023 at 12:27 PM Zakelly Lan <zakelly....@gmail.com> wrote:

> Hi all,
>
> @Yun Tang and I have an offline discussion, and we agreed that:
>
> 1. The design of this FLIP is pretty much like the option 3 in design
> doc[1] for FLINK-23342, and it is almost the best solution in general.
> Based on our production experience, this FLIP can solve the file flood
> problem very well.
> 2. There is a corner case that the directory may be left over when the
> job stops, so I added some content in section 4.8.
>
> Best,
> Zakelly
>
>
> [1]
> https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit#
>
> On Fri, May 5, 2023 at 11:19 AM Zakelly Lan <zakelly....@gmail.com> wrote:
> >
> > Hi Rui Fan,
> >
> > Thanks for your reply.
> >
> > > Maybe re-uploaded in the next checkpoint is also a general solution
> > > for shared state? If yes, could we consider it as an optimization?
> > > And we can do it after the FLIP is done.
> >
> > Yes, it is a general solution for shared states. Maybe in the first
> > version we can let the shared states not re-use any previous state
> > handle after restoring, thus the state backend will do a full snapshot
> > and re-uploading the files it needs. This could cover the scenario
> > that rocksdb only uploads the base DB files. And later we could
> > consider performing fast copy in DFS to optimize the re-uploading.
> > WDYT?
> >
> >
> > > I'm not sure why we need 2 configurations, or whether 1 configuration
> > > is enough here.
> >
> > > The `max-file-pool-size` is hard to give a default value. For jobs with
> > > many tasks in a TM, it may be useful for file merging. However,
> > > it doesn't work well for jobs with a small number of tasks in a TM.
> >
> > > I prefer just adding the `max-file-pool-size`, and
> > > the `pool size = number of tasks / max-file-pool-size`. WDYT?
> >
> >
> > Sorry for not explaining clearly. The value of pool size is calculated
> by:
> >
> > 1. pool size = number of tasks / max-subtasks-per-file
> > 2. if pool size > max-file-pool-size then pool size = max-file-pool-size
> >
> > The `max-subtasks-per-file` addresses the issue of sequential file
> > writing, while the `max-file-pool-size` acts as a safeguard to prevent
> > an excessively large file pool. WDYT?
> >
> >
> > Thanks again for your thoughts.
> >
> > Best,
> > Zakelly
> >
> > On Thu, May 4, 2023 at 3:52 PM Rui Fan <1996fan...@gmail.com> wrote:
> > >
> > > Hi Zakelly,
> > >
> > > Sorry for the late reply, I still have some minor questions.
> > >
> > > >> (3) When rescaling, do all shared files need to be copied?
> > > >
> > > > I agree with you that only sst files of the base DB need to be copied
> > > > (or re-uploaded in the next checkpoint). However, section 4.2
> > > > simplifies file copying issues (copying all files), following the
> > > > concept of shared state.
> > >
> > > Maybe re-uploaded in the next checkpoint is also a general solution
> > > for shared state? If yes, could we consider it as an optimization?
> > > And we can do it after the FLIP is done.
> > >
> > > >> (5) How many physical files can a TM write at the same checkpoint
> at the
> > > same time?
> > > >
> > > > This is a very good point. Actually, there is a file reuse pool as
> > > > section 4.6 described. There could be multiple files within this
> pool,
> > > > supporting concurrent writing by multiple writers. I suggest
> providing
> > > > two configurations to control the file number:
> > > >
> > > >   state.checkpoints.file-merging.max-file-pool-size: Specifies the
> > > > upper limit of the file pool size.
> > > >   state.checkpoints.file-merging.max-subtasks-per-file: Specifies the
> > > > lower limit of the file pool size based on the number of subtasks
> > > > within each TM.
> > > >
> > > > The number of simultaneously open files is controlled by these two
> > > > options, and the first option takes precedence over the second.
> > >
> > > I'm not sure why we need 2 configurations, or whether 1 configuration
> > > is enough here.
> > >
> > > The `max-file-pool-size` is hard to give a default value. For jobs with
> > > many tasks in a TM, it may be useful for file merging. However,
> > > it doesn't work well for jobs with a small number of tasks in a TM.
> > >
> > > I prefer just adding the `max-file-pool-size`, and
> > > the `pool size = number of tasks / max-file-pool-size`. WDYT?
> > >
> > > Maybe I missed some information. Please correct me if I'm wrong,
> thanks.
> > >
> > > Best,
> > > Rui Fan
> > >
> > > On Fri, Apr 28, 2023 at 12:10 AM Zakelly Lan <zakelly....@gmail.com>
> wrote:
> > >
> > > > Hi all,
> > > >
> > > > Thanks for all the feedback so far.
> > > >
> > > > The discussion has been going on for some time, and all the comments
> > > > and suggestions are addressed. So I would like to start a vote on
> this
> > > > FLIP, which begins a week later (May. 5th at 10:00 AM GMT).
> > > >
> > > > If you have any concerns, please don't hesitate to follow up on this
> > > > discussion.
> > > >
> > > >
> > > > Best regards,
> > > > Zakelly
> > > >
> > > > On Fri, Apr 28, 2023 at 12:03 AM Zakelly Lan <zakelly....@gmail.com>
> > > > wrote:
> > > > >
> > > > > Hi Yuan,
> > > > >
> > > > > Thanks for sharing your thoughts. Like you said, the code changes
> and
> > > > > complexities are shaded in the newly introduced file management in
> TM,
> > > > > while the old file management remains the same. It is safe for us
> to
> > > > > take a small step towards decentralized file management in this
> way. I
> > > > > put the POC branch here[1] so everyone can check the code change.
> > > > >
> > > > > Best regards,
> > > > > Zakelly
> > > > >
> > > > > [1] https://github.com/Zakelly/flink/tree/flip306_poc
> > > > >
> > > > > On Thu, Apr 27, 2023 at 8:13 PM Yuan Mei <yuanmei.w...@gmail.com>
> wrote:
> > > > > >
> > > > > > Hey all,
> > > > > >
> > > > > > Thanks @Zakelly for driving this effort and thanks everyone for
> the
> > > > warm
> > > > > > discussion. Sorry for the late response.
> > > > > >
> > > > > > As I and Zakelly have already discussed and reviewed the design
> > > > carefully
> > > > > > when drafting this FLIP, I do not have additional inputs here.
> But I
> > > > want
> > > > > > to highlight several points that I've been quoted and explain
> why I
> > > > think
> > > > > > the current design is a reasonable and clean one.
> > > > > >
> > > > > > *Why this FLIP is proposed*
> > > > > > File Flooding is a problem for Flink I've seen many people bring
> up
> > > > > > throughout the years, especially for large clusters.
> Unfortunately,
> > > > there
> > > > > > are not yet accepted solutions for the most commonly used state
> backend
> > > > > > like RocksDB. This FLIP was originally targeted to address
> merging
> > > > > > SST(KeyedState) checkpoint files.
> > > > > >
> > > > > > While we are comparing different design choices, we found that
> > > > different
> > > > > > types of checkpoint files (OPState, Unaligned CP channel state,
> > > > Changelog
> > > > > > incremental state) share similar considerations, for example,
> file
> > > > > > management, file merging granularity, and e.t.c. That's why we
> want to
> > > > > > abstract a unified framework for merging these different types of
> > > > > > checkpoint files and provide flexibility to choose between
> merging
> > > > > > efficiency, rescaling/restoring cost, File system capabilities
> > > > (affecting
> > > > > > File visibility), and e.t.c.
> > > > > >
> > > > > > *File Ownership moved from JM to TM, WHY*
> > > > > > One of the major differences in the proposed design is moving
> file
> > > > > > ownership from JM to TM. A lot of questions/concerns are coming
> from
> > > > here,
> > > > > > let me answer them one by one:
> > > > > >
> > > > > > *1. Why the current JM SharedRegistry is not enough and do we
> have to
> > > > > > introduce more complexity?*
> > > > > > SharedRegistry maintains the mapping between *a file -> max CP ID
> > > > using the
> > > > > > file*
> > > > > > For merging files, we have to introduce another level of mapping
> *a
> > > > file ->
> > > > > > checkpoint file segment (merged files)*
> > > > > > So yes, no matter what, the second level of mapping has to be
> managed
> > > > > > somewhere, either JM or TM.
> > > > > >
> > > > > > *2. Why the **complexity (second level of mapping)** cannot be
> > > > maintained
> > > > > > in JM?*
> > > > > > - As a centralized service, JM has already been complicated and
> > > > overloaded.
> > > > > > As mentioned by @Yanfei Lei <fredia...@gmail.com>, "triggering
> > > > checkpoints
> > > > > > can be delayed by discarding shared state when JM manages a large
> > > > number of
> > > > > > files FLINK-26590". This ends up setting the JM thread pool to
> 500!
> > > > > > - As explained by @Zakelly in the previous thread, the contract
> "for
> > > > > > Checkpoint N, only re-use shared state handles that have been
> already
> > > > > > referenced by checkpoint N-1" is not guaranteed for the
> concurrent
> > > > > > checkpoint in the current JM-owned design.  This problem can not
> be
> > > > > > addressed without significant changes in how SharedRegistry and
> > > > checkpoint
> > > > > > subsume work, which, I do not think is worth it since
> > > > "concurrent_CP>1" is
> > > > > > not used that much in prod.
> > > > > >
> > > > > > *3. We have similar discussions before, moving ownership from JM
> to
> > > > TM, why
> > > > > > it is not adopted at that time? *
> > > > > > As mentioned by Yun and Piotr, we have had similar discussions
> to move
> > > > > > ownership from JM to TM when designing the changelog state
> backend. The
> > > > > > reason why we stuck to JM ownership at that time is mainly due to
> > > > > > engineering time/effort constraints.
> > > > > > This time, since we need an extra level of mapping, which
> complicates
> > > > the
> > > > > > JM logic even further, we indeed need to shade the complexity
> within
> > > > the TM
> > > > > > to avoid more communications between JM and TM.
> > > > > > Zakelly has already shared the code branch (about 2000 lines),
> and it
> > > > is
> > > > > > simple.
> > > > > >
> > > > > > *4. Cloud-Native Trend*
> > > > > > The current centralized file management framework contradicts the
> > > > > > cloud-native trend. That's also one of the reasons moving
> ownership
> > > > from JM
> > > > > > to TM was first proposed. The proposed design and implementation
> is a
> > > > > > worthy try-out in this direction. I'd like to put some more
> effort in
> > > > this
> > > > > > direction if this really turns out working well.
> > > > > >
> > > > > > One more thing I want to mention is that the proposed design
> shaded
> > > > all the
> > > > > > code changes and complexities in the newly introduced File
> management
> > > > in
> > > > > > TM. That says without enabling File merging, the code path of
> File
> > > > managing
> > > > > > remains the same as before. So it is also a safe change in this
> sense.
> > > > > >
> > > > > > Best,
> > > > > > Yuan
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Apr 12, 2023 at 5:23 PM Zakelly Lan <
> zakelly....@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > > Hi Yun,
> > > > > > >
> > > > > > > I reorganized our discussion and added a comparison table of
> state
> > > > > > > ownership with some previous designs. Please take a look at
> section
> > > > > > > "4.9. State ownership comparison with other designs".
> > > > > > >
> > > > > > > But I don't see them as alternatives since the design of state
> > > > > > > ownership is integrated with this FLIP. That is to say, we are
> > > > > > > providing a file merging solution including file management
> for new
> > > > > > > merged files, other ownership models are not feasible for the
> current
> > > > > > > merging plan. If the state ownership changes, the design of
> merging
> > > > > > > files at different granularities also needs to be changed
> > > > accordingly.
> > > > > > > WDYT?
> > > > > > >
> > > > > > >
> > > > > > > Best regards,
> > > > > > > Zakelly
> > > > > > >
> > > > > > > On Tue, Apr 11, 2023 at 10:18 PM Yun Tang <myas...@live.com>
> wrote:
> > > > > > > >
> > > > > > > > Hi Zakelly,
> > > > > > > >
> > > > > > > > Since we already had some discussions on this topic in the
> doc I
> > > > > > > mentioned, could you please describe the difference in your
> FLIP?
> > > > > > > >
> > > > > > > > I think we should better have a comparing table across
> different
> > > > options
> > > > > > > just like the doc wrote. And we could also list some of them
> in your
> > > > > > > Rejected Alternatives part.
> > > > > > > >
> > > > > > > >
> > > > > > > > Best
> > > > > > > > Yun Tang
> > > > > > > > ________________________________
> > > > > > > > From: Zakelly Lan <zakelly....@gmail.com>
> > > > > > > > Sent: Tuesday, April 11, 2023 17:57
> > > > > > > > To: dev@flink.apache.org <dev@flink.apache.org>
> > > > > > > > Subject: Re: [DISCUSS] FLIP-306: Unified File Merging
> Mechanism for
> > > > > > > Checkpoints
> > > > > > > >
> > > > > > > > Hi Rui Fan,
> > > > > > > >
> > > > > > > > Thanks for your comments!
> > > > > > > >
> > > > > > > > > (1) The temporary segment will remain in the physical file
> for a
> > > > short
> > > > > > > time, right?
> > > > > > > >
> > > > > > > > Yes, any written segment will remain in the physical file
> until the
> > > > > > > > physical file is deleted. It is controlled by the reference
> > > > counting.
> > > > > > > > And as discussed in 4.7, this will result in a space
> amplification
> > > > > > > > problem.
> > > > > > > >
> > > > > > > >
> > > > > > > > > (2) Is subtask granularity confused with shared state?
> > > > > > > >
> > > > > > > > Merging files at granularity of subtask is a general
> solution for
> > > > > > > > shared states, considering the file may be reused by the
> following
> > > > > > > > checkpoint after job restore. This design is applicable to
> sst
> > > > files
> > > > > > > > and any other shared states that may arise in the future.
> However,
> > > > the
> > > > > > > > DSTL files are a special case of shared states, since these
> files
> > > > will
> > > > > > > > no longer be shared after job restore. Therefore, we may do
> an
> > > > > > > > optimization for these files and merge them at the TM level.
> > > > > > > > Currently, the DSTL files are not in the shared directory of
> > > > > > > > checkpoint storage, and I suggest we keep it as it is. I
> agree that
> > > > > > > > this may bring in some confusion, and I suggest the FLIP
> mainly
> > > > > > > > discuss the general situation and list the special situations
> > > > > > > > separately without bringing in new concepts. I will add
> another
> > > > > > > > paragraph describing the file merging for DSTL files. WDYT?
> > > > > > > >
> > > > > > > >
> > > > > > > > > (3) When rescaling, do all shared files need to be copied?
> > > > > > > >
> > > > > > > > I agree with you that only sst files of the base DB need to
> be
> > > > copied
> > > > > > > > (or re-uploaded in the next checkpoint). However, section 4.2
> > > > > > > > simplifies file copying issues (copying all files),
> following the
> > > > > > > > concept of shared state.
> > > > > > > >
> > > > > > > >
> > > > > > > > > (4) Does the space magnification ratio need a configuration
> > > > option?
> > > > > > > >
> > > > > > > > Thanks for the reminder, I will add an option in this FLIP.
> > > > > > > >
> > > > > > > >
> > > > > > > > > (5) How many physical files can a TM write at the same
> > > > checkpoint at
> > > > > > > the same time?
> > > > > > > >
> > > > > > > > This is a very good point. Actually, there is a file reuse
> pool as
> > > > > > > > section 4.6 described. There could be multiple files within
> this
> > > > pool,
> > > > > > > > supporting concurrent writing by multiple writers. I suggest
> > > > providing
> > > > > > > > two configurations to control the file number:
> > > > > > > >
> > > > > > > >   state.checkpoints.file-merging.max-file-pool-size:
> Specifies the
> > > > > > > > upper limit of the file pool size.
> > > > > > > >   state.checkpoints.file-merging.max-subtasks-per-file:
> Specifies
> > > > the
> > > > > > > > lower limit of the file pool size based on the number of
> subtasks
> > > > > > > > within each TM.
> > > > > > > >
> > > > > > > > The number of simultaneously open files is controlled by
> these two
> > > > > > > > options, and the first option takes precedence over the
> second.
> > > > > > > >
> > > > > > > > WDYT?
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > Thanks a lot for your valuable insight.
> > > > > > > >
> > > > > > > > Best regards,
> > > > > > > > Zakelly
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Apr 10, 2023 at 7:08 PM Rui Fan <
> 1996fan...@gmail.com>
> > > > wrote:
> > > > > > > > >
> > > > > > > > > Hi all,
> > > > > > > > >
> > > > > > > > > Thanks Zakelly driving this proposal, and thank you all for
> > > > > > > > > the warm discussions. It's really a useful feature.
> > > > > > > > >
> > > > > > > > > I have a few questions about this FLIP.
> > > > > > > > >
> > > > > > > > > (1) The temporary segment will remain in the physical file
> for
> > > > > > > > > a short time, right?
> > > > > > > > >
> > > > > > > > > FLIP proposes to write segments instead of physical files.
> > > > > > > > > If the physical files are written directly, these
> temporary files
> > > > > > > > > will be deleted after the checkpoint is aborted. When
> writing
> > > > > > > > > a segment, how to delete the temporary segment?
> > > > > > > > > Decrement the reference count value by 1?
> > > > > > > > >
> > > > > > > > > (2) Is subtask granularity confused with shared state?
> > > > > > > > >
> > > > > > > > > From the "4.1.2 Merge files within a subtask or a TM" part,
> > > > > > > > > based on the principle of sst files, it is concluded that
> > > > > > > > > "For shared states, files are merged within each subtask."
> > > > > > > > >
> > > > > > > > > I'm not sure whether this conclusion is general or just
> for sst.
> > > > > > > > > As Yanfei mentioned before:
> > > > > > > > >
> > > > > > > > > > DSTL files are shared between checkpoints, and are
> > > > > > > > > > currently merged in batches at the task manager level.
> > > > > > > > >
> > > > > > > > > DSTL files as the shared state in FLIP-306, however, it
> > > > > > > > > would be better to merge at TM granularity. So, I'm not
> > > > > > > > > sure whether the subtask granularity confused with
> > > > > > > > > shared state?
> > > > > > > > >
> > > > > > > > > And I'm not familiar with DSTL file merging, should
> > > > > > > > > shared state be divided into shared subtask state
> > > > > > > > > and shared TM state?
> > > > > > > > >
> > > > > > > > > (3) When rescaling, do all shared files need to be copied?
> > > > > > > > >
> > > > > > > > > From the "4.2 Rescaling and Physical File Lifecycle" part,
> > > > > > > > > I see a lot of file copying.
> > > > > > > > >
> > > > > > > > > As I understand, only sst files of the baseDB need to be
> copied.
> > > > > > > > > From the restore code[1], when restoreWithRescaling, flink
> will
> > > > > > > > > init a base DB instance, read all contents from other
> temporary
> > > > > > > > > rocksdb instances, and write them into the base DB, and
> then
> > > > > > > > > the temporary rocksdb instance will be discarded.
> > > > > > > > >
> > > > > > > > > So, I think copying the files of the base rocksdb is
> enough, and
> > > > > > > > > the files of other rocksdb instances aren't used.
> > > > > > > > >
> > > > > > > > > Or do not copy any files during recovery, upload all sst
> files
> > > > > > > > > at the first checkpoint.
> > > > > > > > >
> > > > > > > > > (4) Does the space magnification ratio need a configuration
> > > > option?
> > > > > > > > >
> > > > > > > > > From the step1 of  "4.7 Space amplification" part, I see:
> > > > > > > > >
> > > > > > > > > > Checking whether the space amplification of each file is
> > > > greater
> > > > > > > than a
> > > > > > > > > preset threshold and collecting files that exceed the
> threshold
> > > > for
> > > > > > > > > compaction.
> > > > > > > > >
> > > > > > > > > Should we add a configuration option about the compaction
> > > > threshold?
> > > > > > > > > I didn't see it at "5. Public interfaces and User Cases"
> part.
> > > > > > > > >
> > > > > > > > > (5) How many physical files can a TM write at the same
> > > > > > > > > checkpoint at the same time?
> > > > > > > > >
> > > > > > > > > From the "5. Public interfaces and User Cases" part, I see:
> > > > > > > > >
> > > > > > > > > > A configuration option that sets a maximum size limit for
> > > > physical
> > > > > > > files.
> > > > > > > > >
> > > > > > > > > I guess that each type of state(private or shared state)
> will
> > > > only
> > > > > > > > > write one file at the same time at the same checkpoint.
> > > > > > > > > When the file reaches the maximum size, flink will start
> writing
> > > > > > > > > the next file, right?
> > > > > > > > >
> > > > > > > > > If yes, for shared state, will
> > > > > > > > > "state.backend.rocksdb.checkpoint.transfer.thread.num"
> > > > > > > > > be invalid?
> > > > > > > > >
> > > > > > > > > For private state, a TM may have many tasks (because of
> slot
> > > > > > > > > sharing, more than 20 tasks may run in a slot), and the
> > > > > > > > > performance of all tasks serially writing files may be
> poor,
> > > > > > > > > eventually resulting in longer checkpoint time.
> > > > > > > > >
> > > > > > > > > That's why FLINK-26803[2] introduced a configuration
> option:
> > > > > > > > >
> > > > > > >
> > > >
> "execution.checkpointing.unaligned.max-subtasks-per-channel-state-file".
> > > > > > > > > Flink users can set the maximum number of subtasks that
> > > > > > > > > share the same channel state file.
> > > > > > > > >
> > > > > > > > > That's all my questions right now, please correct me if
> > > > > > > > > anything is wrong.
> > > > > > > > >
> > > > > > > > > Anyway, this FLIP is useful for the stability of
> large-scale
> > > > > > > > > flink production. Looking forward to its completion and
> > > > > > > > > eventual acceptance by the community.
> > > > > > > > >
> > > > > > > > > [1]
> > > > > > > > >
> > > > > > >
> > > >
> https://github.com/apache/flink/blob/65710b437318364ec19c0369d038ac2222c10498/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L292
> > > > > > > > > [2] https://issues.apache.org/jira/browse/FLINK-26803
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Rui Fan
> > > > > > > > >
> > > > > > > > > On Fri, Apr 7, 2023 at 8:42 PM Jing Ge
> > > > <j...@ververica.com.invalid>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi,
> > > > > > > > > >
> > > > > > > > > > Jingsong, Yanfei, please check, if you can view the doc.
> > > > Thanks.
> > > > > > > > > >
> > > > > > > > > > Best regards,
> > > > > > > > > > Jing
> > > > > > > > > >
> > > > > > > > > > On Fri, Apr 7, 2023 at 2:19 PM Zakelly Lan <
> > > > zakelly....@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Yanfei,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for your comments.
> > > > > > > > > > >
> > > > > > > > > > > > Does this result in a larger space amplification?
> Maybe a
> > > > more
> > > > > > > > > > > suitable value can be determined through some
> experimental
> > > > > > > statistics
> > > > > > > > > > > after we implement this feature.
> > > > > > > > > > >
> > > > > > > > > > > Yes, it results in larger space amplification for
> shared
> > > > states. I
> > > > > > > > > > > will do more tests and investigation.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Thanks.
> > > > > > > > > > >
> > > > > > > > > > > Best regards,
> > > > > > > > > > > Zakelly
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Apr 7, 2023 at 8:15 PM Zakelly Lan <
> > > > zakelly....@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > Hi @Piotr and @Jingsong Li
> > > > > > > > > > > >
> > > > > > > > > > > > I have read access to the document, but I'm not sure
> > > > whether the
> > > > > > > owner
> > > > > > > > > > > > of this document wants to make it public. Actually,
> the
> > > > doc is
> > > > > > > for
> > > > > > > > > > > > FLINK-23342 and there is a candidate design very
> similar
> > > > to this
> > > > > > > FLIP,
> > > > > > > > > > > > but only for the shared state. Like Yun said, the
> previous
> > > > > > > design is
> > > > > > > > > > > > not taken because of the code complexity, however I
> think
> > > > it is
> > > > > > > > > > > > acceptable after implementing the POC[1]. I think we
> could
> > > > focus
> > > > > > > on
> > > > > > > > > > > > the current plan, WDTY?
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > [1] POC of this FLIP:
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > >
> > > >
> https://github.com/Zakelly/flink/commit/98538185182e33739828ee36ab96dcf2aebad80c
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Apr 7, 2023 at 8:13 PM Zakelly Lan <
> > > > > > > zakelly....@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > Hi Piotr,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks for your comments!
> > > > > > > > > > > > >
> > > > > > > > > > > > > (1) Sorry for the misleading, let me make it more
> clear.
> > > > It is
> > > > > > > a
> > > > > > > > > > > > > concurrent checkpoint senario. Yes, the assumption
> you
> > > > said
> > > > > > > needs to
> > > > > > > > > > > > > be followed, but the state handles here refer to
> the
> > > > original
> > > > > > > SST
> > > > > > > > > > > > > files, not the underlying file. In this FLIP when
> > > > checkpoint N
> > > > > > > and
> > > > > > > > > > N+1
> > > > > > > > > > > > > are running concurrently, they reuse files from
> > > > checkpoint
> > > > > > > N-1, and
> > > > > > > > > > > > > some of the files may be deleted when checkpoint N
> > > > completes
> > > > > > > while
> > > > > > > > > > > > > checkpoint N+1 is still writing on it. There is no
> such
> > > > > > > problem for
> > > > > > > > > > > > > original shared states without file merging
> because when
> > > > a
> > > > > > > state
> > > > > > > > > > > > > handle (or sst file here) from checkpoint N-1 is
> not
> > > > > > > referenced by
> > > > > > > > > > > > > checkpoint N, it will not be referenced by
> checkpoint
> > > > N+1. So
> > > > > > > the
> > > > > > > > > > > > > subsumption of sst files from checkpoint N-1 are
> safe.
> > > > > > > > > > > > > For above example, when reaching step "d.", File 1
> > > > reached the
> > > > > > > size
> > > > > > > > > > > > > threshold and will not be used. The Chk-2 and
> Chk-3 are
> > > > running
> > > > > > > > > > > > > concurrently, and the File 3 is being written by
> Chk-2,
> > > > so it
> > > > > > > can not
> > > > > > > > > > > > > be used by Chk-3 (As described in section 4.6).
> Here
> > > > comes the
> > > > > > > > > > > > > problem.
> > > > > > > > > > > > >
> > > > > > > > > > > > > (2) Please correct me if I'm wrong. The purpose of
> the
> > > > > > > > > > > > > `RecoverableWriter` is to provide a reliable file
> writer
> > > > even
> > > > > > > > > > > > > tolerable with job failure and recovery. The
> > > > implementation
> > > > > > > varies
> > > > > > > > > > > > > among the file systems, some of which involves
> writing
> > > > into
> > > > > > > temporary
> > > > > > > > > > > > > files (such as HDFS). As a result, it may produce
> more
> > > > RPC
> > > > > > > requests
> > > > > > > > > > to
> > > > > > > > > > > > > the DFS.
> > > > > > > > > > > > > The goal of this FLIP is to reduce the pressure on
> DFS,
> > > > > > > especially
> > > > > > > > > > the
> > > > > > > > > > > > > number of files and RPC requests. Currently the
> TMs are
> > > > NOT
> > > > > > > using the
> > > > > > > > > > > > > RecoverableWriter to persist/upload the state
> files, and
> > > > a file
> > > > > > > > > > > > > closing is enough. The section 4.1.1 is trying to
> omit
> > > > this
> > > > > > > file
> > > > > > > > > > > > > closing but ensure file visibility in some DFS,
> thus
> > > > reducing
> > > > > > > > > > pressure
> > > > > > > > > > > > > on DFS. That's why I said the problems they want to
> > > > solve are
> > > > > > > > > > > > > different. I'm not sure if I made myself clear.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks!
> > > > > > > > > > > > >
> > > > > > > > > > > > > Best regards,
> > > > > > > > > > > > > Zakelly
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Fri, Apr 7, 2023 at 8:08 PM Zakelly Lan <
> > > > > > > zakelly....@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Yun,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks for your suggestions!
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I have read the FLINK-23342 and its design doc
> as you
> > > > > > > provided.
> > > > > > > > > > First
> > > > > > > > > > > > > > of all the goal of this FLIP and the doc are
> similar,
> > > > and the
> > > > > > > > > > design
> > > > > > > > > > > > > > of this FLIP is pretty much like option 3. The
> main
> > > > > > > difference is
> > > > > > > > > > > that
> > > > > > > > > > > > > > we imply the concept of 'epoch' in the folder
> path for
> > > > each
> > > > > > > > > > > > > > granularity. For shared state, the folder for
> each
> > > > subtask
> > > > > > > is like
> > > > > > > > > > > > > >
> > > > "${checkpointBaseDir}/shared/subtask-{index}-{parallelism}",
> > > > > > > so if
> > > > > > > > > > > the
> > > > > > > > > > > > > > ${checkpointBaseDir} changes (when user restart
> a job
> > > > > > > manually) or
> > > > > > > > > > > the
> > > > > > > > > > > > > > ${parallelism} changes (when rescaling), there
> will be
> > > > a
> > > > > > > > > > > re-uploading,
> > > > > > > > > > > > > > and the JM takes care of the old artifacts. The
> folder
> > > > path
> > > > > > > for
> > > > > > > > > > > > > > private state is in the form of
> > > > > > > > > > > > > > "${checkpointBaseDir}/tm-owned/${tmResourceId}"
> and the
> > > > > > > division of
> > > > > > > > > > > > > > responsibilities between JM and TM is similar.
> The
> > > > design of
> > > > > > > this
> > > > > > > > > > > FLIP
> > > > > > > > > > > > > > inherits all the advantages of the design of
> option 3
> > > > in
> > > > > > > that doc,
> > > > > > > > > > > and
> > > > > > > > > > > > > > also avoids extra communication for epoch
> maintenance.
> > > > As
> > > > > > > for the
> > > > > > > > > > > code
> > > > > > > > > > > > > > complexity, you may check the POC commit[1] and
> find
> > > > that the
> > > > > > > > > > > > > > implementation is pretty clean and is a totally
> new
> > > > code path
> > > > > > > > > > making
> > > > > > > > > > > > > > nearly no influence on the old one. Comparing the
> > > > number of
> > > > > > > lines
> > > > > > > > > > of
> > > > > > > > > > > > > > code change with what's currently done for
> merging
> > > > channel
> > > > > > > state[2]
> > > > > > > > > > > > > > (5200 vs. 2500 additions), I think it is
> acceptable
> > > > > > > considering we
> > > > > > > > > > > are
> > > > > > > > > > > > > > providing a unified file merging framework, which
> > > > would save
> > > > > > > a lot
> > > > > > > > > > of
> > > > > > > > > > > > > > effort in future. WDYT?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Best regards,
> > > > > > > > > > > > > > Zakelly
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > [1] POC of this FLIP:
> > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > >
> > > >
> https://github.com/Zakelly/flink/commit/98538185182e33739828ee36ab96dcf2aebad80c
> > > > > > > > > > > > > > [2] Commit for FLINK-26803 (Merge the channel
> state
> > > > files) :
> > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > >
> > > >
> https://github.com/apache/flink/commit/8be94e6663d8ac6e3d74bf4cd5f540cc96c8289e
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Fri, Apr 7, 2023 at 7:22 PM Yanfei Lei <
> > > > > > > fredia...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks for your explanation Zakelly.
> > > > > > > > > > > > > > > (1) Keeping these merging granularities for
> different
> > > > > > > types of
> > > > > > > > > > > files
> > > > > > > > > > > > > > > as presets that are not configurable is a good
> idea
> > > > to
> > > > > > > prevent
> > > > > > > > > > > > > > > performance degradation.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > (2)
> > > > > > > > > > > > > > > > For the third option, 64MB is an acceptable
> target
> > > > size.
> > > > > > > The
> > > > > > > > > > > RocksDB state backend in Flink also chooses 64MB as the
> > > > default
> > > > > > > target
> > > > > > > > > > file
> > > > > > > > > > > size.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Does this result in a larger space
> amplification?
> > > > Maybe a
> > > > > > > more
> > > > > > > > > > > > > > > suitable value can be determined through some
> > > > experimental
> > > > > > > > > > > statistics
> > > > > > > > > > > > > > > after we implement this feature.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > Yanfei
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Jingsong Li <jingsongl...@gmail.com>
> 于2023年4月7日周五
> > > > 17:09写道:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi Yun,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > It looks like this doc needs permission to
> read?
> > > > [1]
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > [1]
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > >
> > > >
> https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit#
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > Jingsong
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Fri, Apr 7, 2023 at 4:34 PM Piotr
> Nowojski <
> > > > > > > > > > > pnowoj...@apache.org> wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Hi,
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > +1 To what Yun Tang wrote. We don't seem
> to have
> > > > > > > access to
> > > > > > > > > > the
> > > > > > > > > > > design doc.
> > > > > > > > > > > > > > > > > Could you make it publicly visible or copy
> out
> > > > its
> > > > > > > content to
> > > > > > > > > > > another
> > > > > > > > > > > > > > > > > document?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thanks for your answers Zakelly.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > (1)
> > > > > > > > > > > > > > > > > Yes, the current mechanism introduced in
> > > > FLINK-24611
> > > > > > > allows
> > > > > > > > > > > for checkpoint
> > > > > > > > > > > > > > > > > N, to only re-use shared state handles
> that have
> > > > been
> > > > > > > already
> > > > > > > > > > > referenced by
> > > > > > > > > > > > > > > > > checkpoint N-1. But why do we need to
> break this
> > > > > > > assumption?
> > > > > > > > > > > In your step,
> > > > > > > > > > > > > > > > > "d.", TM could adhere to that assumption,
> and
> > > > instead
> > > > > > > of
> > > > > > > > > > > reusing File-2, it
> > > > > > > > > > > > > > > > > could either re-use File-1, File-3 or
> create a
> > > > new
> > > > > > > file.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > (2)
> > > > > > > > > > > > > > > > > Can you elaborate a bit more on this? As
> far as I
> > > > > > > recall, the
> > > > > > > > > > > purpose of
> > > > > > > > > > > > > > > > > the `RecoverableWriter` is to support
> exactly the
> > > > > > > things
> > > > > > > > > > > described in this
> > > > > > > > > > > > > > > > > FLIP, so what's the difference? If you are
> > > > saying that
> > > > > > > for
> > > > > > > > > > > this FLIP you
> > > > > > > > > > > > > > > > > can implement something more efficiently
> for a
> > > > given
> > > > > > > > > > > FileSystem, then why
> > > > > > > > > > > > > > > > > can it not be done the same way for the
> > > > > > > `RecoverableWriter`?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > > Piotrek
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > czw., 6 kwi 2023 o 17:24 Yun Tang <
> > > > myas...@live.com>
> > > > > > > > > > > napisał(a):
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Hi Zakelly,
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Thanks for driving this work!
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > I'm not sure did you ever read the
> discussion
> > > > between
> > > > > > > > > > > Stephan, Roman,
> > > > > > > > > > > > > > > > > > Piotr, Yuan and I in the design doc [1]
> in
> > > > nearly
> > > > > > > two years
> > > > > > > > > > > ago.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > From my understanding, your proposal is
> also a
> > > > mixed
> > > > > > > state
> > > > > > > > > > > ownership: some
> > > > > > > > > > > > > > > > > > states are owned by the TM while some are
> > > > owned by
> > > > > > > the JM.
> > > > > > > > > > > If my memory is
> > > > > > > > > > > > > > > > > > correct, we did not take the option-3 or
> > > > option-5 in
> > > > > > > the
> > > > > > > > > > > design doc [1] for
> > > > > > > > > > > > > > > > > > the code complexity when implements the
> 1st
> > > > version
> > > > > > > of
> > > > > > > > > > > changelog
> > > > > > > > > > > > > > > > > > state-backend.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Could you also compare the current FLIP
> with
> > > > the
> > > > > > > proposals
> > > > > > > > > > > in the design
> > > > > > > > > > > > > > > > > > doc[1]? From my understanding, we should
> at
> > > > least
> > > > > > > consider
> > > > > > > > > > > to comapre with
> > > > > > > > > > > > > > > > > > option-3 and option-5 as they are all
> mixed
> > > > > > > solutions.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > [1]
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > >
> > > >
> https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit#
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Best
> > > > > > > > > > > > > > > > > > Yun Tang
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > ------------------------------
> > > > > > > > > > > > > > > > > > *From:* Zakelly Lan <
> zakelly....@gmail.com>
> > > > > > > > > > > > > > > > > > *Sent:* Thursday, April 6, 2023 16:38
> > > > > > > > > > > > > > > > > > *To:* dev@flink.apache.org <
> > > > dev@flink.apache.org>
> > > > > > > > > > > > > > > > > > *Subject:* Re: [DISCUSS] FLIP-306:
> Unified File
> > > > > > > Merging
> > > > > > > > > > > Mechanism for
> > > > > > > > > > > > > > > > > > Checkpoints
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Hi Piotr,
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Thanks for all the feedback.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > (1) Thanks for the reminder. I have just
> seen
> > > > the
> > > > > > > > > > > FLINK-24611, the delayed
> > > > > > > > > > > > > > > > > > deletion by JM resolves some sync
> problems
> > > > between
> > > > > > > JM and
> > > > > > > > > > > TM, but I'm
> > > > > > > > > > > > > > > > > > afraid it is still not feasible for the
> file
> > > > sharing
> > > > > > > in
> > > > > > > > > > this
> > > > > > > > > > > FLIP.
> > > > > > > > > > > > > > > > > > Considering a concurrent checkpoint
> scenario as
> > > > > > > follows:
> > > > > > > > > > > > > > > > > >    a. Checkpoint 1 finishes. 1.sst,
> 2.sst and
> > > > 3.sst
> > > > > > > are
> > > > > > > > > > > written in file 1,
> > > > > > > > > > > > > > > > > > and 4.sst is written in file 2.
> > > > > > > > > > > > > > > > > >    b. Checkpoint 2 starts based on
> checkpoint
> > > > 1,
> > > > > > > including
> > > > > > > > > > > 1.sst, 2.sst
> > > > > > > > > > > > > > > > > > and 5.sst.
> > > > > > > > > > > > > > > > > >    c. Checkpoint 3 starts based on
> checkpoint
> > > > 1,
> > > > > > > including
> > > > > > > > > > > 1.sst, 2.sst
> > > > > > > > > > > > > > > > > > and 5.sst as well.
> > > > > > > > > > > > > > > > > >    d. Checkpoint 3 reuses the file 2, TM
> writes
> > > > > > > 5.sst on
> > > > > > > > > > it.
> > > > > > > > > > > > > > > > > >    e. Checkpoint 2 creates a new file 3,
> TM
> > > > writes
> > > > > > > 5.sst on
> > > > > > > > > > > it.
> > > > > > > > > > > > > > > > > >    f. Checkpoint 2 finishes, checkpoint
> 1 is
> > > > > > > subsumed and
> > > > > > > > > > > the file 2 is
> > > > > > > > > > > > > > > > > > deleted, while checkpoint 3 still needs
> file 2.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > I attached a diagram to describe the
> scenario.
> > > > > > > > > > > > > > > > > > [image: concurrent cp.jpg]
> > > > > > > > > > > > > > > > > > The core issue is that this FLIP
> introduces a
> > > > > > > mechanism
> > > > > > > > > > that
> > > > > > > > > > > allows
> > > > > > > > > > > > > > > > > > physical files to be potentially used by
> the
> > > > next
> > > > > > > several
> > > > > > > > > > > checkpoints. JM
> > > > > > > > > > > > > > > > > > is uncertain whether there will be a TM
> > > > continuing
> > > > > > > to write
> > > > > > > > > > > to a specific
> > > > > > > > > > > > > > > > > > file. So in this FLIP, TMs take the
> > > > responsibility to
> > > > > > > > > > delete
> > > > > > > > > > > the physical
> > > > > > > > > > > > > > > > > > files.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > (2) IIUC, the RecoverableWriter is
> introduced
> > > > to
> > > > > > > persist
> > > > > > > > > > > data in the "in
> > > > > > > > > > > > > > > > > > progress" files after each checkpoint,
> and the
> > > > > > > > > > > implementation may be based
> > > > > > > > > > > > > > > > > > on the file sync in some file systems.
> However,
> > > > > > > since the
> > > > > > > > > > > sync is a heavy
> > > > > > > > > > > > > > > > > > operation for DFS, this FLIP wants to
> use flush
> > > > > > > instead of
> > > > > > > > > > > the sync with
> > > > > > > > > > > > > > > > > > the best effort. This only fits the case
> that
> > > > the
> > > > > > > DFS is
> > > > > > > > > > > considered
> > > > > > > > > > > > > > > > > > reliable. The problems they want to
> solve are
> > > > > > > different.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > (3) Yes, if files are managed by JM via
> the
> > > > shared
> > > > > > > > > > registry,
> > > > > > > > > > > this problem
> > > > > > > > > > > > > > > > > > is solved. And as I mentioned in (1),
> there
> > > > are some
> > > > > > > other
> > > > > > > > > > > corner cases
> > > > > > > > > > > > > > > > > > hard to resolve via the shared registry.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > The goal of this FLIP is to have a
> common way
> > > > of
> > > > > > > merging
> > > > > > > > > > > files in all use
> > > > > > > > > > > > > > > > > > cases. For shared state it merges at
> subtask
> > > > level,
> > > > > > > while
> > > > > > > > > > > for private state
> > > > > > > > > > > > > > > > > > (and changelog files, as I replied to
> Yanfei),
> > > > files
> > > > > > > are
> > > > > > > > > > > merged at TM
> > > > > > > > > > > > > > > > > > level. So it is not contrary to the
> current
> > > > plan for
> > > > > > > the
> > > > > > > > > > > unaligned
> > > > > > > > > > > > > > > > > > checkpoint state (FLINK-26803). You are
> right
> > > > that
> > > > > > > the
> > > > > > > > > > > unaligned checkpoint
> > > > > > > > > > > > > > > > > > state would be merged with the
> operator's state
> > > > > > > file, so
> > > > > > > > > > > overall, it is
> > > > > > > > > > > > > > > > > > slightly better than what's currently
> done.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Thanks again for the valuable comments!
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Best regards,
> > > > > > > > > > > > > > > > > > Zakelly
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > On Wed, Apr 5, 2023 at 8:43 PM Piotr
> Nowojski <
> > > > > > > > > > > pnowoj...@apache.org>
> > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Hi,
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Thanks for coming up with the proposal,
> it's
> > > > > > > definitely
> > > > > > > > > > > valuable. I'm still
> > > > > > > > > > > > > > > > > > reading and trying to understand the
> proposal,
> > > > but a
> > > > > > > couple
> > > > > > > > > > > of comments
> > > > > > > > > > > > > > > > > > from my side.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > (1)
> > > > > > > > > > > > > > > > > > > Ownership of a single checkpoint file
> is
> > > > > > > transferred to
> > > > > > > > > > > TM, while JM
> > > > > > > > > > > > > > > > > > manages the parent directory of these
> files.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Have you seen
> > > > > > > > > > > https://issues.apache.org/jira/browse/FLINK-24611
> before? I
> > > > > > > > > > > > > > > > > > don't fully remember why, but we have
> rejected
> > > > the
> > > > > > > idea of
> > > > > > > > > > > moving the file
> > > > > > > > > > > > > > > > > > ownership to TM and instead reworked the
> > > > shared file
> > > > > > > > > > > registry in a way that
> > > > > > > > > > > > > > > > > > I think should be sufficient for file
> sharing.
> > > > Could
> > > > > > > you
> > > > > > > > > > > elaborate why we
> > > > > > > > > > > > > > > > > > need to move the file ownership to TM,
> and why
> > > > is the
> > > > > > > > > > > current mechanism not
> > > > > > > > > > > > > > > > > > sufficient?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > (2)
> > > > > > > > > > > > > > > > > > > File visibility is needed when a Flink
> job
> > > > > > > recovers after
> > > > > > > > > > > a checkpoint is
> > > > > > > > > > > > > > > > > > materialized. In some DFS, such as most
> object
> > > > > > > storages, a
> > > > > > > > > > > file is only
> > > > > > > > > > > > > > > > > > visible after it is closed
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Is that really the case?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > >
> `org.apache.flink.core.fs.FileSystem#createRecoverableWriter`
> > > > > > > seems to be
> > > > > > > > > > > > > > > > > > addressing exactly this issue, and the
> most
> > > > > > > frequently used
> > > > > > > > > > > FileSystem (S3)
> > > > > > > > > > > > > > > > > > AFAIK supports it with no problems?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > (3)
> > > > > > > > > > > > > > > > > > > 4.1.2 Merge files within a subtask or
> a TM
> > > > > > > > > > > > > > > > > > > Given that TMs are reassigned after
> > > > restoration,
> > > > > > > it is
> > > > > > > > > > > difficult to
> > > > > > > > > > > > > > > > > > manage physical files that contain data
> from
> > > > multiple
> > > > > > > > > > > subtasks scattered
> > > > > > > > > > > > > > > > > > across different TMs (as depicted in
> Fig.3).
> > > > There
> > > > > > > is no
> > > > > > > > > > > synchronization
> > > > > > > > > > > > > > > > > > mechanism between TMs, making file
> management
> > > > in this
> > > > > > > > > > > scenario challenging.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > I think this is solved in many places
> already
> > > > via the
> > > > > > > > > > shared
> > > > > > > > > > > state managed
> > > > > > > > > > > > > > > > > > by the JM, as I mentioned in (1).
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > If I understand it correctly you are
> proposing
> > > > to
> > > > > > > have a
> > > > > > > > > > > common
> > > > > > > > > > > > > > > > > > interface/way of merging small files, in
> all
> > > > use
> > > > > > > cases,
> > > > > > > > > > that
> > > > > > > > > > > would work
> > > > > > > > > > > > > > > > > > only across a single subtask? That's
> contrary
> > > > to
> > > > > > > what's
> > > > > > > > > > > currently done for
> > > > > > > > > > > > > > > > > > unaligned checkpoints, right? But if this
> > > > generic
> > > > > > > mechanism
> > > > > > > > > > > was to be used
> > > > > > > > > > > > > > > > > > for unaligned checkpoints, unaligned
> checkpoint
> > > > > > > state would
> > > > > > > > > > > have been
> > > > > > > > > > > > > > > > > > merged with the operators state file, so
> all
> > > > in all
> > > > > > > there
> > > > > > > > > > > would be no
> > > > > > > > > > > > > > > > > > regression visible to a user? The limit
> is
> > > > that we
> > > > > > > always
> > > > > > > > > > > have at least a
> > > > > > > > > > > > > > > > > > single file per subtask, but in exchange
> we are
> > > > > > > getting a
> > > > > > > > > > > simpler threading
> > > > > > > > > > > > > > > > > > model?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Bets,
> > > > > > > > > > > > > > > > > > Piotrek
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > wt., 4 kwi 2023 o 08:51 Zakelly Lan <
> > > > > > > zakelly....@gmail.com
> > > > > > > > > > >
> > > > > > > > > > > napisał(a):
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Hi Yanfei,
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Thank you for your prompt response.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > I agree that managing (deleting) only
> some
> > > > folders
> > > > > > > with
> > > > > > > > > > JM
> > > > > > > > > > > can greatly
> > > > > > > > > > > > > > > > > > > relieve JM's burden. Thanks for
> pointing
> > > > this out.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > In general, merging at the TM level is
> more
> > > > > > > effective
> > > > > > > > > > > since there are
> > > > > > > > > > > > > > > > > > > usually more files to merge.
> Therefore, I
> > > > believe
> > > > > > > it is
> > > > > > > > > > > better to
> > > > > > > > > > > > > > > > > > > merge files per TM as much as possible.
> > > > However,
> > > > > > > for
> > > > > > > > > > > shared state,
> > > > > > > > > > > > > > > > > > > merging at the subtask level is the
> best
> > > > choice to
> > > > > > > > > > prevent
> > > > > > > > > > > significant
> > > > > > > > > > > > > > > > > > > data transfer over the network after
> > > > restoring. I
> > > > > > > think
> > > > > > > > > > it
> > > > > > > > > > > is better
> > > > > > > > > > > > > > > > > > > to keep these merging granularities for
> > > > different
> > > > > > > types
> > > > > > > > > > of
> > > > > > > > > > > files as
> > > > > > > > > > > > > > > > > > > presets that are not configurable.
> WDYT?
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > As for the DSTL files, they are merged
> per
> > > > TM and
> > > > > > > placed
> > > > > > > > > > > in the
> > > > > > > > > > > > > > > > > > > task-owned folder. These files can be
> > > > classified as
> > > > > > > > > > shared
> > > > > > > > > > > state since
> > > > > > > > > > > > > > > > > > > they are shared across checkpoints.
> However,
> > > > the
> > > > > > > DSTL
> > > > > > > > > > file
> > > > > > > > > > > is a
> > > > > > > > > > > > > > > > > > > special case that will be subsumed by
> the
> > > > first
> > > > > > > > > > checkpoint
> > > > > > > > > > > of the
> > > > > > > > > > > > > > > > > > > newly restored job. Therefore, there
> is no
> > > > need
> > > > > > > for new
> > > > > > > > > > > TMs to keep
> > > > > > > > > > > > > > > > > > > these files after the old checkpoint is
> > > > subsumed,
> > > > > > > just
> > > > > > > > > > > like the
> > > > > > > > > > > > > > > > > > > private state files. Thus, it is
> feasible to
> > > > merge
> > > > > > > DSTL
> > > > > > > > > > > files per TM
> > > > > > > > > > > > > > > > > > > without introducing complex file
> management
> > > > across
> > > > > > > job
> > > > > > > > > > > attempts. So
> > > > > > > > > > > > > > > > > > > the possible performance degradation is
> > > > avoided.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > The three newly introduced options have
> > > > recommended
> > > > > > > > > > > defaults. For
> > > > > > > > > > > > > > > > > > > upcoming versions, this feature is
> turned
> > > > off by
> > > > > > > default.
> > > > > > > > > > > For the
> > > > > > > > > > > > > > > > > > > second option,
> SEGMENTED_ACROSS_CP_BOUNDARY
> > > > is the
> > > > > > > > > > > recommended default
> > > > > > > > > > > > > > > > > > > as it is more effective. Of course, if
> > > > > > > encountering some
> > > > > > > > > > > DFS that does
> > > > > > > > > > > > > > > > > > > not support file visibility until the
> file is
> > > > > > > closed, it
> > > > > > > > > > > is possible
> > > > > > > > > > > > > > > > > > > to fall back to another option
> > > > automatically. For
> > > > > > > the
> > > > > > > > > > > third option,
> > > > > > > > > > > > > > > > > > > 64MB is an acceptable target size. The
> > > > RocksDB
> > > > > > > state
> > > > > > > > > > > backend in Flink
> > > > > > > > > > > > > > > > > > > also chooses 64MB as the default
> target file
> > > > size.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Thank you again for your quick
> response.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Best regards,
> > > > > > > > > > > > > > > > > > > Zakelly
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > On Mon, Apr 3, 2023 at 11:27 PM Yanfei
> Lei <
> > > > > > > > > > > fredia...@gmail.com> wrote:
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Hi Zakelly,
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Thanks for driving this,  this
> proposal
> > > > enables
> > > > > > > the
> > > > > > > > > > > files merging of
> > > > > > > > > > > > > > > > > > > > different types of states to be
> grouped
> > > > under a
> > > > > > > unified
> > > > > > > > > > > framework. I
> > > > > > > > > > > > > > > > > > > > think it has the added benefit of
> > > > lightening the
> > > > > > > load
> > > > > > > > > > on
> > > > > > > > > > > JM. As
> > > > > > > > > > > > > > > > > > > > FLINK-26590[1] described,  triggered
> > > > checkpoints
> > > > > > > can be
> > > > > > > > > > > delayed by
> > > > > > > > > > > > > > > > > > > > discarding shared state when JM
> manages a
> > > > large
> > > > > > > number
> > > > > > > > > > > of files. After
> > > > > > > > > > > > > > > > > > > > this FLIP, JM only needs to manage
> some
> > > > folders,
> > > > > > > which
> > > > > > > > > > > greatly reduces
> > > > > > > > > > > > > > > > > > > > the burden on JM.
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > In Section 4.1, two types of merging
> > > > > > > granularities(per
> > > > > > > > > > > subtask and per
> > > > > > > > > > > > > > > > > > > > task manager) are proposed, the
> shared
> > > > state is
> > > > > > > managed
> > > > > > > > > > > by per subtask
> > > > > > > > > > > > > > > > > > > > granularity, but for the changelog
> state
> > > > > > > backend, its
> > > > > > > > > > > DSTL files are
> > > > > > > > > > > > > > > > > > > > shared between checkpoints, and are
> > > > currently
> > > > > > > merged in
> > > > > > > > > > > batches at the
> > > > > > > > > > > > > > > > > > > > task manager level. When merging
> with the
> > > > > > > > > > > SEGMENTED_WITHIN_CP_BOUNDARY
> > > > > > > > > > > > > > > > > > > > mode, I'm concerned about the
> performance
> > > > > > > degradation
> > > > > > > > > > of
> > > > > > > > > > > its merging,
> > > > > > > > > > > > > > > > > > > > hence I wonder if the merge
> granularities
> > > > are
> > > > > > > > > > > configurable? Further,
> > > > > > > > > > > > > > > > > > > > from a user perspective, three new
> options
> > > > are
> > > > > > > > > > > introduced in this
> > > > > > > > > > > > > > > > > > > > FLIP, do they have recommended
> defaults?
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > [1]
> > > > > > > https://issues.apache.org/jira/browse/FLINK-26590
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > > > > > Yanfei
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > Zakelly Lan <zakelly....@gmail.com>
> > > > 于2023年4月3日周一
> > > > > > > > > > > 18:36写道:
> > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > Hi everyone,
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > I would like to open a discussion
> on
> > > > providing
> > > > > > > a
> > > > > > > > > > > unified file merging
> > > > > > > > > > > > > > > > > > > > > mechanism for checkpoints[1].
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > Currently, many files are uploaded
> to
> > > > the DFS
> > > > > > > during
> > > > > > > > > > > checkpoints,
> > > > > > > > > > > > > > > > > > > > > leading to the 'file flood'
> problem when
> > > > > > > running
> > > > > > > > > > > > > > > > > > > > > intensive workloads in a cluster.
> To
> > > > tackle
> > > > > > > this
> > > > > > > > > > > problem, various
> > > > > > > > > > > > > > > > > > > > > solutions have been proposed for
> > > > different
> > > > > > > types
> > > > > > > > > > > > > > > > > > > > > of state files. Although these
> methods
> > > > are
> > > > > > > similar,
> > > > > > > > > > > they lack a
> > > > > > > > > > > > > > > > > > > > > systematic view and approach. We
> believe
> > > > that
> > > > > > > it is
> > > > > > > > > > > > > > > > > > > > > better to consider this problem as
> a
> > > > whole and
> > > > > > > > > > > introduce a unified
> > > > > > > > > > > > > > > > > > > > > framework to address the file flood
> > > > problem for
> > > > > > > > > > > > > > > > > > > > > all types of state files. A POC
> has been
> > > > > > > implemented
> > > > > > > > > > > based on current
> > > > > > > > > > > > > > > > > > > > > FLIP design, and the test results
> are
> > > > > > > promising.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > Looking forward to your comments or
> > > > feedback.
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > Best regards,
> > > > > > > > > > > > > > > > > > > > > Zakelly
> > > > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > > > [1]
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > >
> > > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > >
> > > > > > >
> > > >
> > > >
>

Reply via email to