Zakelly commented on code in PR #22973:
URL: https://github.com/apache/flink/pull/22973#discussion_r1560576402


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java:
##########
@@ -67,20 +75,51 @@ public TaskExecutorFileMergingManager() {
      * org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask}.
      */
     public @Nullable FileMergingSnapshotManager 
fileMergingSnapshotManagerForJob(
-            @Nonnull JobID jobId) {
+            @Nonnull JobID jobId,
+            Configuration clusterConfiguration,
+            Configuration jobConfiguration) {
+        boolean mergingEnabled =
+                jobConfiguration
+                        .getOptional(FILE_MERGING_ENABLED)
+                        
.orElse(clusterConfiguration.getBoolean(FILE_MERGING_ENABLED));
         synchronized (lock) {
             if (closed) {
                 throw new IllegalStateException(
                         "TaskExecutorFileMergingManager is already closed and 
cannot "
                                 + "register a new 
FileMergingSnapshotManager.");
             }
+            if (!mergingEnabled) {
+                return null;
+            }
             FileMergingSnapshotManager fileMergingSnapshotManager =
                     fileMergingSnapshotManagerByJobId.get(jobId);
-            if (fileMergingSnapshotManager == null) {
-                // TODO FLINK-32440: choose different 
FileMergingSnapshotManager by configuration
+            if (fileMergingSnapshotManager == null && mergingEnabled) {
+                FileMergingType fileMergingType =
+                        jobConfiguration
+                                        
.getOptional(FILE_MERGING_ACROSS_BOUNDARY)
+                                        .orElse(
+                                                
clusterConfiguration.getBoolean(
+                                                        
FILE_MERGING_ACROSS_BOUNDARY))
+                                ? FileMergingType.MERGE_ACROSS_CHECKPOINT
+                                : FileMergingType.MERGE_WITHIN_CHECKPOINT;
+                MemorySize maxFileSize =
+                        jobConfiguration
+                                .getOptional(FILE_MERGING_MAX_FILE_SIZE)
+                                
.orElse(clusterConfiguration.get(FILE_MERGING_MAX_FILE_SIZE));
+                Boolean usingBlockingPoll =

Review Comment:
   nit. `usingBlockingPool`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java:
##########
@@ -67,20 +75,51 @@ public TaskExecutorFileMergingManager() {
      * org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask}.
      */
     public @Nullable FileMergingSnapshotManager 
fileMergingSnapshotManagerForJob(
-            @Nonnull JobID jobId) {
+            @Nonnull JobID jobId,
+            Configuration clusterConfiguration,
+            Configuration jobConfiguration) {
+        boolean mergingEnabled =
+                jobConfiguration
+                        .getOptional(FILE_MERGING_ENABLED)
+                        
.orElse(clusterConfiguration.getBoolean(FILE_MERGING_ENABLED));
         synchronized (lock) {
             if (closed) {
                 throw new IllegalStateException(
                         "TaskExecutorFileMergingManager is already closed and 
cannot "
                                 + "register a new 
FileMergingSnapshotManager.");
             }
+            if (!mergingEnabled) {
+                return null;
+            }
             FileMergingSnapshotManager fileMergingSnapshotManager =
                     fileMergingSnapshotManagerByJobId.get(jobId);
-            if (fileMergingSnapshotManager == null) {
-                // TODO FLINK-32440: choose different 
FileMergingSnapshotManager by configuration
+            if (fileMergingSnapshotManager == null && mergingEnabled) {

Review Comment:
   no need to check `mergingEnabled ` here? If it is false, the function has 
returned above.



##########
docs/layouts/shortcodes/generated/checkpointing_configuration.html:
##########
@@ -44,6 +44,42 @@
             <td>String</td>
             <td>The default directory used for storing the data files and meta 
data of checkpoints in a Flink supported filesystem. The storage path must be 
accessible from all participating processes/nodes(i.e. all TaskManagers and 
JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only 
the meta data of checkpoints will be stored in this directory.</td>
         </tr>
+        <tr>
+            
<td><h5>state.checkpoints.file-merging.across-checkpoint-boundary</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Only relevant if <code 
class="highlighter-rouge">state.checkpoints.file-merging.enabled</code> is 
enabled.<br />Whether to allow merging data of multiple checkpoints into one 
physical file. If this option is set to false, only merge files within 
checkpoint boundaries will be merged. Otherwise, it is possible for the logical 
files of different checkpoints to share the same physical file.</td>
+        </tr>
+        <tr>
+            <td><h5>state.checkpoints.file-merging.enabled</h5></td>

Review Comment:
   Is it possible adjust the order of each document item/entry? This one better 
be the first.



##########
docs/layouts/shortcodes/generated/checkpointing_configuration.html:
##########
@@ -44,6 +44,42 @@
             <td>String</td>
             <td>The default directory used for storing the data files and meta 
data of checkpoints in a Flink supported filesystem. The storage path must be 
accessible from all participating processes/nodes(i.e. all TaskManagers and 
JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only 
the meta data of checkpoints will be stored in this directory.</td>
         </tr>
+        <tr>
+            
<td><h5>state.checkpoints.file-merging.across-checkpoint-boundary</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Only relevant if <code 
class="highlighter-rouge">state.checkpoints.file-merging.enabled</code> is 
enabled.<br />Whether to allow merging data of multiple checkpoints into one 
physical file. If this option is set to false, only merge files within 
checkpoint boundaries will be merged. Otherwise, it is possible for the logical 
files of different checkpoints to share the same physical file.</td>

Review Comment:
   ```suggestion
               <td>Only relevant if <code 
class="highlighter-rouge">state.checkpoints.file-merging.enabled</code> is 
enabled.<br />Whether to allow merging data of multiple checkpoints into one 
physical file. If this option is set to false, only merge files within 
checkpoint boundaries. Otherwise, it is possible for the logical files of 
different checkpoints to share the same physical file.</td>
   ```
   



##########
docs/layouts/shortcodes/generated/checkpointing_configuration.html:
##########
@@ -44,6 +44,42 @@
             <td>String</td>
             <td>The default directory used for storing the data files and meta 
data of checkpoints in a Flink supported filesystem. The storage path must be 
accessible from all participating processes/nodes(i.e. all TaskManagers and 
JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only 
the meta data of checkpoints will be stored in this directory.</td>
         </tr>
+        <tr>
+            
<td><h5>state.checkpoints.file-merging.across-checkpoint-boundary</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Only relevant if <code 
class="highlighter-rouge">state.checkpoints.file-merging.enabled</code> is 
enabled.<br />Whether to allow merging data of multiple checkpoints into one 
physical file. If this option is set to false, only merge files within 
checkpoint boundaries will be merged. Otherwise, it is possible for the logical 
files of different checkpoints to share the same physical file.</td>
+        </tr>
+        <tr>
+            <td><h5>state.checkpoints.file-merging.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to enable merging multiple checkpoint files into one, 
which will greatly reduce the number of small checkpoint files.</td>
+        </tr>
+        <tr>
+            <td><h5>state.checkpoints.file-merging.max-file-size</h5></td>
+            <td style="word-wrap: break-word;">32 mb</td>
+            <td>MemorySize</td>
+            <td>Max size of a physical file for merged checkpoints.</td>
+        </tr>
+        <tr>
+            
<td><h5>state.checkpoints.file-merging.max-space-amplification</h5></td>
+            <td style="word-wrap: break-word;">0.75</td>
+            <td>Float</td>
+            <td>A threshold that triggers a compaction (re-uploading) of one 
physical file. If the amount of invalid data in a physical file exceeds the 
threshold, a new physical file will be created and uploaded.</td>

Review Comment:
   ```suggestion
               <td>Space amplification stands for the magnification of the 
occupied space compared to the amount of valid data. The more space 
amplification is, the more waste of space will be. This configs a space 
amplification above which a re-uploading for physical files will be triggered 
to reclaim space.</td>
   ```



##########
flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java:
##########
@@ -107,6 +107,8 @@ public static final class Sections {
 
         public static final String TRACE_REPORTERS = "trace_reporters";
 
+        public static final String FILE_MERGING = "file_merging";

Review Comment:
   How about naming it `String CHECKPOINT_FILE_MERGING = 
"checkpoint_file_merging";` since it seems ambiguous here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java:
##########
@@ -67,20 +75,51 @@ public TaskExecutorFileMergingManager() {
      * org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask}.
      */
     public @Nullable FileMergingSnapshotManager 
fileMergingSnapshotManagerForJob(
-            @Nonnull JobID jobId) {
+            @Nonnull JobID jobId,
+            Configuration clusterConfiguration,
+            Configuration jobConfiguration) {
+        boolean mergingEnabled =
+                jobConfiguration
+                        .getOptional(FILE_MERGING_ENABLED)
+                        
.orElse(clusterConfiguration.getBoolean(FILE_MERGING_ENABLED));

Review Comment:
   Use `get` instead of `getBoolean` since the latter is deprecated



##########
docs/layouts/shortcodes/generated/checkpointing_configuration.html:
##########
@@ -44,6 +44,42 @@
             <td>String</td>
             <td>The default directory used for storing the data files and meta 
data of checkpoints in a Flink supported filesystem. The storage path must be 
accessible from all participating processes/nodes(i.e. all TaskManagers and 
JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only 
the meta data of checkpoints will be stored in this directory.</td>
         </tr>
+        <tr>
+            
<td><h5>state.checkpoints.file-merging.across-checkpoint-boundary</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Only relevant if <code 
class="highlighter-rouge">state.checkpoints.file-merging.enabled</code> is 
enabled.<br />Whether to allow merging data of multiple checkpoints into one 
physical file. If this option is set to false, only merge files within 
checkpoint boundaries will be merged. Otherwise, it is possible for the logical 
files of different checkpoints to share the same physical file.</td>
+        </tr>
+        <tr>
+            <td><h5>state.checkpoints.file-merging.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to enable merging multiple checkpoint files into one, 
which will greatly reduce the number of small checkpoint files.</td>

Review Comment:
   Maybe add some description: This is an experimental feature under evaluation.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java:
##########
@@ -67,20 +75,51 @@ public TaskExecutorFileMergingManager() {
      * org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask}.
      */
     public @Nullable FileMergingSnapshotManager 
fileMergingSnapshotManagerForJob(
-            @Nonnull JobID jobId) {
+            @Nonnull JobID jobId,
+            Configuration clusterConfiguration,
+            Configuration jobConfiguration) {
+        boolean mergingEnabled =
+                jobConfiguration
+                        .getOptional(FILE_MERGING_ENABLED)
+                        
.orElse(clusterConfiguration.getBoolean(FILE_MERGING_ENABLED));
         synchronized (lock) {
             if (closed) {
                 throw new IllegalStateException(
                         "TaskExecutorFileMergingManager is already closed and 
cannot "
                                 + "register a new 
FileMergingSnapshotManager.");
             }
+            if (!mergingEnabled) {
+                return null;
+            }
             FileMergingSnapshotManager fileMergingSnapshotManager =
                     fileMergingSnapshotManagerByJobId.get(jobId);
-            if (fileMergingSnapshotManager == null) {
-                // TODO FLINK-32440: choose different 
FileMergingSnapshotManager by configuration
+            if (fileMergingSnapshotManager == null && mergingEnabled) {
+                FileMergingType fileMergingType =
+                        jobConfiguration
+                                        
.getOptional(FILE_MERGING_ACROSS_BOUNDARY)
+                                        .orElse(
+                                                
clusterConfiguration.getBoolean(

Review Comment:
   Same as above



##########
docs/layouts/shortcodes/generated/checkpointing_configuration.html:
##########
@@ -44,6 +44,42 @@
             <td>String</td>
             <td>The default directory used for storing the data files and meta 
data of checkpoints in a Flink supported filesystem. The storage path must be 
accessible from all participating processes/nodes(i.e. all TaskManagers and 
JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only 
the meta data of checkpoints will be stored in this directory.</td>
         </tr>
+        <tr>
+            
<td><h5>state.checkpoints.file-merging.across-checkpoint-boundary</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Only relevant if <code 
class="highlighter-rouge">state.checkpoints.file-merging.enabled</code> is 
enabled.<br />Whether to allow merging data of multiple checkpoints into one 
physical file. If this option is set to false, only merge files within 
checkpoint boundaries will be merged. Otherwise, it is possible for the logical 
files of different checkpoints to share the same physical file.</td>
+        </tr>
+        <tr>
+            <td><h5>state.checkpoints.file-merging.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to enable merging multiple checkpoint files into one, 
which will greatly reduce the number of small checkpoint files.</td>
+        </tr>
+        <tr>
+            <td><h5>state.checkpoints.file-merging.max-file-size</h5></td>
+            <td style="word-wrap: break-word;">32 mb</td>
+            <td>MemorySize</td>
+            <td>Max size of a physical file for merged checkpoints.</td>
+        </tr>
+        <tr>
+            
<td><h5>state.checkpoints.file-merging.max-space-amplification</h5></td>
+            <td style="word-wrap: break-word;">0.75</td>

Review Comment:
   And this should be some value bigger than 1



##########
flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java:
##########
@@ -331,4 +332,103 @@ public class CheckpointingOptions {
                                     + StateRecoveryOptions.LOCAL_RECOVERY.key()
                                     + ". By default, local backup is 
deactivated. Local backup currently only "
                                     + "covers keyed state backends (including 
both the EmbeddedRocksDBStateBackend and the HashMapStateBackend).");
+
+    // ------------------------------------------------------------------------
+    //  Options related to file merging
+    // ------------------------------------------------------------------------
+
+    /**
+     * Whether to enable merging multiple checkpoint files into one, which 
will greatly reduce the
+     * number of small checkpoint files. See FLIP-306 for details.
+     */
+    @Experimental
+    @Documentation.Section(Documentation.Sections.FILE_MERGING)
+    public static final ConfigOption<Boolean> FILE_MERGING_ENABLED =
+            ConfigOptions.key("state.checkpoints.file-merging.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to enable merging multiple checkpoint 
files into one, which will greatly reduce"
+                                    + " the number of small checkpoint 
files.");
+
+    /**
+     * Whether to allow merging data of multiple checkpoints into one physical 
file. If this option
+     * is set to false, only files within checkpoint boundaries will be 
merged. Otherwise, it is
+     * possible for the logical files of different checkpoints to share the 
same physical file.
+     */
+    @Experimental
+    @Documentation.Section(Documentation.Sections.FILE_MERGING)
+    public static final ConfigOption<Boolean> FILE_MERGING_ACROSS_BOUNDARY =
+            
ConfigOptions.key("state.checkpoints.file-merging.across-checkpoint-boundary")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Only relevant if %s is enabled.",
+                                            
TextElement.code(FILE_MERGING_ENABLED.key()))
+                                    .linebreak()
+                                    .text(
+                                            "Whether to allow merging data of 
multiple checkpoints into one physical file. "
+                                                    + "If this option is set 
to false, "
+                                                    + "only merge files within 
checkpoint boundaries will be merged. "
+                                                    + "Otherwise, it is 
possible for the logical files of different "
+                                                    + "checkpoints to share 
the same physical file.")
+                                    .build());
+
+    /** The max size of a physical file for merged checkpoints. */
+    @Experimental
+    @Documentation.Section(Documentation.Sections.FILE_MERGING)
+    public static final ConfigOption<MemorySize> FILE_MERGING_MAX_FILE_SIZE =
+            ConfigOptions.key("state.checkpoints.file-merging.max-file-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.parse("32MB"))
+                    .withDescription("Max size of a physical file for merged 
checkpoints.");
+
+    /**
+     * Whether to use Blocking or Non-Blocking pool for merging physical 
files. A Non-Blocking pool
+     * will always provide usable physical file without blocking. It may 
create many physical files
+     * if poll file frequently. When poll a small file from a Blocking pool, 
it may be blocked until
+     * the file is returned.
+     */
+    @Experimental
+    @Documentation.Section(Documentation.Sections.FILE_MERGING)
+    public static final ConfigOption<Boolean> FILE_MERGING_POOL_BLOCKING =
+            ConfigOptions.key("state.checkpoints.file-merging.pool-blocking")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to use Blocking or Non-Blocking pool for 
merging physical files. "
+                                    + "A Non-Blocking pool will always provide 
usable physical file without blocking. It may create many physical files if 
poll file frequently. "
+                                    + "When poll a small file from a Blocking 
pool, it may be blocked until the file is returned.");
+
+    /**
+     * The upper limit of the file pool size based on the number of subtasks 
within each TM (only
+     * for merging private state at Task Manager level).
+     */
+    @Experimental
+    @Documentation.Section(Documentation.Sections.FILE_MERGING)
+    public static final ConfigOption<Integer> 
FILE_MERGING_MAX_SUBTASKS_PER_FILE =
+            
ConfigOptions.key("state.checkpoints.file-merging.max-subtasks-per-file")
+                    .intType()
+                    .defaultValue(0)
+                    .withDescription(
+                            "The upper limit of the file pool size based on 
the number of subtasks within each TM"
+                                    + "(only for merging private state at Task 
Manager level).");
+
+    /**
+     * A threshold that triggers a compaction (re-uploading) of one physical 
file. If the amount of
+     * invalid data in a physical file exceeds the threshold, a new physical 
file will be created
+     * and uploaded.
+     */
+    @Experimental
+    @Documentation.Section(Documentation.Sections.FILE_MERGING)
+    public static final ConfigOption<Float> 
FILE_MERGING_MAX_SPACE_AMPLIFICATION =

Review Comment:
   I'd suggest annotate `FILE_MERGING_MAX_SPACE_AMPLIFICATION`, 
`FILE_MERGING_MAX_SUBTASKS_PER_FILE` and `FILE_MERGING_POOL_BLOCKING` with 
`@Documentation.ExcludeFromDocumentation`, since these are to be implemented or 
user are not able to specify currently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to