Hi Zakelly,

Thanks for driving this work!

I'm not sure did you ever read the discussion between Stephan, Roman, Piotr, 
Yuan and I in the design doc [1] in nearly two years ago.

From my understanding, your proposal is also a mixed state ownership: some 
states are owned by the TM while some are owned by the JM. If my memory is 
correct, we did not take the option-3 or option-5 in the design doc [1] for the 
code complexity when implements the 1st version of changelog state-backend.

Could you also compare the current FLIP with the proposals in the design 
doc[1]? From my understanding, we should at least consider to comapre with 
option-3 and option-5 as they are all mixed solutions.


[1] 
https://docs.google.com/document/d/1NJJQ30P27BmUvD7oa4FChvkYxMEgjRPTVdO1dHLl_9I/edit#

Best
Yun Tang

________________________________
From: Zakelly Lan <zakelly....@gmail.com>
Sent: Thursday, April 6, 2023 16:38
To: dev@flink.apache.org <dev@flink.apache.org>
Subject: Re: [DISCUSS] FLIP-306: Unified File Merging Mechanism for Checkpoints

Hi Piotr,

Thanks for all the feedback.

(1) Thanks for the reminder. I have just seen the FLINK-24611, the delayed 
deletion by JM resolves some sync problems between JM and TM, but I'm afraid it 
is still not feasible for the file sharing in this FLIP. Considering a 
concurrent checkpoint scenario as follows:
   a. Checkpoint 1 finishes. 1.sst, 2.sst and 3.sst are written in file 1, and 
4.sst is written in file 2.
   b. Checkpoint 2 starts based on checkpoint 1, including 1.sst, 2.sst and 
5.sst.
   c. Checkpoint 3 starts based on checkpoint 1, including 1.sst, 2.sst and 
5.sst as well.
   d. Checkpoint 3 reuses the file 2, TM writes 5.sst on it.
   e. Checkpoint 2 creates a new file 3, TM writes 5.sst on it.
   f. Checkpoint 2 finishes, checkpoint 1 is subsumed and the file 2 is 
deleted, while checkpoint 3 still needs file 2.

I attached a diagram to describe the scenario.
[concurrent cp.jpg]
The core issue is that this FLIP introduces a mechanism that allows physical 
files to be potentially used by the next several checkpoints. JM is uncertain 
whether there will be a TM continuing to write to a specific file. So in this 
FLIP, TMs take the responsibility to delete the physical files.

(2) IIUC, the RecoverableWriter is introduced to persist data in the "in 
progress" files after each checkpoint, and the implementation may be based on 
the file sync in some file systems. However, since the sync is a heavy 
operation for DFS, this FLIP wants to use flush instead of the sync with the 
best effort. This only fits the case that the DFS is considered reliable. The 
problems they want to solve are different.

(3) Yes, if files are managed by JM via the shared registry, this problem is 
solved. And as I mentioned in (1), there are some other corner cases hard to 
resolve via the shared registry.

The goal of this FLIP is to have a common way of merging files in all use 
cases. For shared state it merges at subtask level, while for private state 
(and changelog files, as I replied to Yanfei), files are merged at TM level. So 
it is not contrary to the current plan for the unaligned checkpoint state 
(FLINK-26803). You are right that the unaligned checkpoint state would be 
merged with the operator's state file, so overall, it is slightly better than 
what's currently done.


Thanks again for the valuable comments!

Best regards,
Zakelly



On Wed, Apr 5, 2023 at 8:43 PM Piotr Nowojski 
<pnowoj...@apache.org<mailto:pnowoj...@apache.org>> wrote:
Hi,

Thanks for coming up with the proposal, it's definitely valuable. I'm still
reading and trying to understand the proposal, but a couple of comments
from my side.

(1)
> Ownership of a single checkpoint file is transferred to TM, while JM
manages the parent directory of these files.

Have you seen https://issues.apache.org/jira/browse/FLINK-24611 before? I
don't fully remember why, but we have rejected the idea of moving the file
ownership to TM and instead reworked the shared file registry in a way that
I think should be sufficient for file sharing. Could you elaborate why we
need to move the file ownership to TM, and why is the current mechanism not
sufficient?

(2)
> File visibility is needed when a Flink job recovers after a checkpoint is
materialized. In some DFS, such as most object storages, a file is only
visible after it is closed

Is that really the case?
`org.apache.flink.core.fs.FileSystem#createRecoverableWriter` seems to be
addressing exactly this issue, and the most frequently used FileSystem (S3)
AFAIK supports it with no problems?

(3)
> 4.1.2 Merge files within a subtask or a TM
> Given that TMs are reassigned after restoration, it is difficult to
manage physical files that contain data from multiple subtasks scattered
across different TMs (as depicted in Fig.3). There is no synchronization
mechanism between TMs, making file management in this scenario challenging.

I think this is solved in many places already via the shared state managed
by the JM, as I mentioned in (1).


If I understand it correctly you are proposing to have a common
interface/way of merging small files, in all use cases, that would work
only across a single subtask? That's contrary to what's currently done for
unaligned checkpoints, right? But if this generic mechanism was to be used
for unaligned checkpoints, unaligned checkpoint state would have been
merged with the operators state file, so all in all there would be no
regression visible to a user? The limit is that we always have at least a
single file per subtask, but in exchange we are getting a simpler threading
model?

Bets,
Piotrek

wt., 4 kwi 2023 o 08:51 Zakelly Lan 
<zakelly....@gmail.com<mailto:zakelly....@gmail.com>> napisał(a):

> Hi Yanfei,
>
> Thank you for your prompt response.
>
> I agree that managing (deleting) only some folders with JM can greatly
> relieve JM's burden. Thanks for pointing this out.
>
> In general, merging at the TM level is more effective since there are
> usually more files to merge. Therefore, I believe it is better to
> merge files per TM as much as possible.  However, for shared state,
> merging at the subtask level is the best choice to prevent significant
> data transfer over the network after restoring. I think it is better
> to keep these merging granularities for different types of files as
> presets that are not configurable. WDYT?
>
> As for the DSTL files, they are merged per TM and placed in the
> task-owned folder. These files can be classified as shared state since
> they are shared across checkpoints. However, the DSTL file is a
> special case that will be subsumed by the first checkpoint of the
> newly restored job. Therefore, there is no need for new TMs to keep
> these files after the old checkpoint is subsumed, just like the
> private state files. Thus, it is feasible to merge DSTL files per TM
> without introducing complex file management across job attempts. So
> the possible performance degradation is avoided.
>
> The three newly introduced options have recommended defaults. For
> upcoming versions, this feature is turned off by default. For the
> second option, SEGMENTED_ACROSS_CP_BOUNDARY is the recommended default
> as it is more effective. Of course, if encountering some DFS that does
> not support file visibility until the file is closed, it is possible
> to fall back to another option automatically. For the third option,
> 64MB is an acceptable target size. The RocksDB state backend in Flink
> also chooses 64MB as the default target file size.
>
>
> Thank you again for your quick response.
>
>
> Best regards,
> Zakelly
>
>
> On Mon, Apr 3, 2023 at 11:27 PM Yanfei Lei 
> <fredia...@gmail.com<mailto:fredia...@gmail.com>> wrote:
> >
> > Hi Zakelly,
> >
> > Thanks for driving this,  this proposal enables the files merging of
> > different types of states to be grouped under a unified framework. I
> > think it has the added benefit of lightening the load on JM. As
> > FLINK-26590[1] described,  triggered checkpoints can be delayed by
> > discarding shared state when JM manages a large number of files. After
> > this FLIP, JM only needs to manage some folders, which greatly reduces
> > the burden on JM.
> >
> > In Section 4.1, two types of merging granularities(per subtask and per
> > task manager) are proposed, the shared state is managed by per subtask
> > granularity, but for the changelog state backend, its DSTL files are
> > shared between checkpoints, and are currently merged in batches at the
> > task manager level. When merging with the SEGMENTED_WITHIN_CP_BOUNDARY
> > mode, I'm concerned about the performance degradation of its merging,
> > hence I wonder if the merge granularities are configurable? Further,
> > from a user perspective, three new options are introduced in this
> > FLIP, do they have recommended defaults?
> >
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-26590
> >
> > Best,
> > Yanfei
> >
> > Zakelly Lan <zakelly....@gmail.com<mailto:zakelly....@gmail.com>> 
> > 于2023年4月3日周一 18:36写道:
> >
> > >
> > > Hi everyone,
> > >
> > > I would like to open a discussion on providing a unified file merging
> > > mechanism for checkpoints[1].
> > >
> > > Currently, many files are uploaded to the DFS during checkpoints,
> > > leading to the 'file flood' problem when running
> > > intensive workloads in a cluster.  To tackle this problem, various
> > > solutions have been proposed for different types
> > > of state files. Although these methods are similar, they lack a
> > > systematic view and approach. We believe that it is
> > > better to consider this problem as a whole and introduce a unified
> > > framework to address the file flood problem for
> > > all types of state files. A POC has been implemented based on current
> > > FLIP design, and the test results are promising.
> > >
> > >
> > > Looking forward to your comments or feedback.
> > >
> > > Best regards,
> > > Zakelly
> > >
> > > [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints
>

Reply via email to