Zakelly commented on code in PR #22973: URL: https://github.com/apache/flink/pull/22973#discussion_r1560576402
########## flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java: ########## @@ -67,20 +75,51 @@ public TaskExecutorFileMergingManager() { * org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask}. */ public @Nullable FileMergingSnapshotManager fileMergingSnapshotManagerForJob( - @Nonnull JobID jobId) { + @Nonnull JobID jobId, + Configuration clusterConfiguration, + Configuration jobConfiguration) { + boolean mergingEnabled = + jobConfiguration + .getOptional(FILE_MERGING_ENABLED) + .orElse(clusterConfiguration.getBoolean(FILE_MERGING_ENABLED)); synchronized (lock) { if (closed) { throw new IllegalStateException( "TaskExecutorFileMergingManager is already closed and cannot " + "register a new FileMergingSnapshotManager."); } + if (!mergingEnabled) { + return null; + } FileMergingSnapshotManager fileMergingSnapshotManager = fileMergingSnapshotManagerByJobId.get(jobId); - if (fileMergingSnapshotManager == null) { - // TODO FLINK-32440: choose different FileMergingSnapshotManager by configuration + if (fileMergingSnapshotManager == null && mergingEnabled) { + FileMergingType fileMergingType = + jobConfiguration + .getOptional(FILE_MERGING_ACROSS_BOUNDARY) + .orElse( + clusterConfiguration.getBoolean( + FILE_MERGING_ACROSS_BOUNDARY)) + ? FileMergingType.MERGE_ACROSS_CHECKPOINT + : FileMergingType.MERGE_WITHIN_CHECKPOINT; + MemorySize maxFileSize = + jobConfiguration + .getOptional(FILE_MERGING_MAX_FILE_SIZE) + .orElse(clusterConfiguration.get(FILE_MERGING_MAX_FILE_SIZE)); + Boolean usingBlockingPoll = Review Comment: nit. `usingBlockingPool` ########## flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java: ########## @@ -67,20 +75,51 @@ public TaskExecutorFileMergingManager() { * org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask}. */ public @Nullable FileMergingSnapshotManager fileMergingSnapshotManagerForJob( - @Nonnull JobID jobId) { + @Nonnull JobID jobId, + Configuration clusterConfiguration, + Configuration jobConfiguration) { + boolean mergingEnabled = + jobConfiguration + .getOptional(FILE_MERGING_ENABLED) + .orElse(clusterConfiguration.getBoolean(FILE_MERGING_ENABLED)); synchronized (lock) { if (closed) { throw new IllegalStateException( "TaskExecutorFileMergingManager is already closed and cannot " + "register a new FileMergingSnapshotManager."); } + if (!mergingEnabled) { + return null; + } FileMergingSnapshotManager fileMergingSnapshotManager = fileMergingSnapshotManagerByJobId.get(jobId); - if (fileMergingSnapshotManager == null) { - // TODO FLINK-32440: choose different FileMergingSnapshotManager by configuration + if (fileMergingSnapshotManager == null && mergingEnabled) { Review Comment: no need to check `mergingEnabled ` here? If it is false, the function has returned above. ########## docs/layouts/shortcodes/generated/checkpointing_configuration.html: ########## @@ -44,6 +44,42 @@ <td>String</td> <td>The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only the meta data of checkpoints will be stored in this directory.</td> </tr> + <tr> + <td><h5>state.checkpoints.file-merging.across-checkpoint-boundary</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Only relevant if <code class="highlighter-rouge">state.checkpoints.file-merging.enabled</code> is enabled.<br />Whether to allow merging data of multiple checkpoints into one physical file. If this option is set to false, only merge files within checkpoint boundaries will be merged. Otherwise, it is possible for the logical files of different checkpoints to share the same physical file.</td> + </tr> + <tr> + <td><h5>state.checkpoints.file-merging.enabled</h5></td> Review Comment: Is it possible adjust the order of each document item/entry? This one better be the first. ########## docs/layouts/shortcodes/generated/checkpointing_configuration.html: ########## @@ -44,6 +44,42 @@ <td>String</td> <td>The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only the meta data of checkpoints will be stored in this directory.</td> </tr> + <tr> + <td><h5>state.checkpoints.file-merging.across-checkpoint-boundary</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Only relevant if <code class="highlighter-rouge">state.checkpoints.file-merging.enabled</code> is enabled.<br />Whether to allow merging data of multiple checkpoints into one physical file. If this option is set to false, only merge files within checkpoint boundaries will be merged. Otherwise, it is possible for the logical files of different checkpoints to share the same physical file.</td> Review Comment: ```suggestion <td>Only relevant if <code class="highlighter-rouge">state.checkpoints.file-merging.enabled</code> is enabled.<br />Whether to allow merging data of multiple checkpoints into one physical file. If this option is set to false, only merge files within checkpoint boundaries. Otherwise, it is possible for the logical files of different checkpoints to share the same physical file.</td> ``` ########## docs/layouts/shortcodes/generated/checkpointing_configuration.html: ########## @@ -44,6 +44,42 @@ <td>String</td> <td>The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only the meta data of checkpoints will be stored in this directory.</td> </tr> + <tr> + <td><h5>state.checkpoints.file-merging.across-checkpoint-boundary</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Only relevant if <code class="highlighter-rouge">state.checkpoints.file-merging.enabled</code> is enabled.<br />Whether to allow merging data of multiple checkpoints into one physical file. If this option is set to false, only merge files within checkpoint boundaries will be merged. Otherwise, it is possible for the logical files of different checkpoints to share the same physical file.</td> + </tr> + <tr> + <td><h5>state.checkpoints.file-merging.enabled</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Whether to enable merging multiple checkpoint files into one, which will greatly reduce the number of small checkpoint files.</td> + </tr> + <tr> + <td><h5>state.checkpoints.file-merging.max-file-size</h5></td> + <td style="word-wrap: break-word;">32 mb</td> + <td>MemorySize</td> + <td>Max size of a physical file for merged checkpoints.</td> + </tr> + <tr> + <td><h5>state.checkpoints.file-merging.max-space-amplification</h5></td> + <td style="word-wrap: break-word;">0.75</td> + <td>Float</td> + <td>A threshold that triggers a compaction (re-uploading) of one physical file. If the amount of invalid data in a physical file exceeds the threshold, a new physical file will be created and uploaded.</td> Review Comment: ```suggestion <td>Space amplification stands for the magnification of the occupied space compared to the amount of valid data. The more space amplification is, the more waste of space will be. This configs a space amplification above which a re-uploading for physical files will be triggered to reclaim space.</td> ``` ########## flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java: ########## @@ -107,6 +107,8 @@ public static final class Sections { public static final String TRACE_REPORTERS = "trace_reporters"; + public static final String FILE_MERGING = "file_merging"; Review Comment: How about naming it `String CHECKPOINT_FILE_MERGING = "checkpoint_file_merging";` since it seems ambiguous here. ########## flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java: ########## @@ -67,20 +75,51 @@ public TaskExecutorFileMergingManager() { * org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask}. */ public @Nullable FileMergingSnapshotManager fileMergingSnapshotManagerForJob( - @Nonnull JobID jobId) { + @Nonnull JobID jobId, + Configuration clusterConfiguration, + Configuration jobConfiguration) { + boolean mergingEnabled = + jobConfiguration + .getOptional(FILE_MERGING_ENABLED) + .orElse(clusterConfiguration.getBoolean(FILE_MERGING_ENABLED)); Review Comment: Use `get` instead of `getBoolean` since the latter is deprecated ########## docs/layouts/shortcodes/generated/checkpointing_configuration.html: ########## @@ -44,6 +44,42 @@ <td>String</td> <td>The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only the meta data of checkpoints will be stored in this directory.</td> </tr> + <tr> + <td><h5>state.checkpoints.file-merging.across-checkpoint-boundary</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Only relevant if <code class="highlighter-rouge">state.checkpoints.file-merging.enabled</code> is enabled.<br />Whether to allow merging data of multiple checkpoints into one physical file. If this option is set to false, only merge files within checkpoint boundaries will be merged. Otherwise, it is possible for the logical files of different checkpoints to share the same physical file.</td> + </tr> + <tr> + <td><h5>state.checkpoints.file-merging.enabled</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Whether to enable merging multiple checkpoint files into one, which will greatly reduce the number of small checkpoint files.</td> Review Comment: Maybe add some description: This is an experimental feature under evaluation. ########## flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java: ########## @@ -67,20 +75,51 @@ public TaskExecutorFileMergingManager() { * org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask}. */ public @Nullable FileMergingSnapshotManager fileMergingSnapshotManagerForJob( - @Nonnull JobID jobId) { + @Nonnull JobID jobId, + Configuration clusterConfiguration, + Configuration jobConfiguration) { + boolean mergingEnabled = + jobConfiguration + .getOptional(FILE_MERGING_ENABLED) + .orElse(clusterConfiguration.getBoolean(FILE_MERGING_ENABLED)); synchronized (lock) { if (closed) { throw new IllegalStateException( "TaskExecutorFileMergingManager is already closed and cannot " + "register a new FileMergingSnapshotManager."); } + if (!mergingEnabled) { + return null; + } FileMergingSnapshotManager fileMergingSnapshotManager = fileMergingSnapshotManagerByJobId.get(jobId); - if (fileMergingSnapshotManager == null) { - // TODO FLINK-32440: choose different FileMergingSnapshotManager by configuration + if (fileMergingSnapshotManager == null && mergingEnabled) { + FileMergingType fileMergingType = + jobConfiguration + .getOptional(FILE_MERGING_ACROSS_BOUNDARY) + .orElse( + clusterConfiguration.getBoolean( Review Comment: Same as above ########## docs/layouts/shortcodes/generated/checkpointing_configuration.html: ########## @@ -44,6 +44,42 @@ <td>String</td> <td>The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only the meta data of checkpoints will be stored in this directory.</td> </tr> + <tr> + <td><h5>state.checkpoints.file-merging.across-checkpoint-boundary</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Only relevant if <code class="highlighter-rouge">state.checkpoints.file-merging.enabled</code> is enabled.<br />Whether to allow merging data of multiple checkpoints into one physical file. If this option is set to false, only merge files within checkpoint boundaries will be merged. Otherwise, it is possible for the logical files of different checkpoints to share the same physical file.</td> + </tr> + <tr> + <td><h5>state.checkpoints.file-merging.enabled</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Whether to enable merging multiple checkpoint files into one, which will greatly reduce the number of small checkpoint files.</td> + </tr> + <tr> + <td><h5>state.checkpoints.file-merging.max-file-size</h5></td> + <td style="word-wrap: break-word;">32 mb</td> + <td>MemorySize</td> + <td>Max size of a physical file for merged checkpoints.</td> + </tr> + <tr> + <td><h5>state.checkpoints.file-merging.max-space-amplification</h5></td> + <td style="word-wrap: break-word;">0.75</td> Review Comment: And this should be some value bigger than 1 ########## flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java: ########## @@ -331,4 +332,103 @@ public class CheckpointingOptions { + StateRecoveryOptions.LOCAL_RECOVERY.key() + ". By default, local backup is deactivated. Local backup currently only " + "covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend)."); + + // ------------------------------------------------------------------------ + // Options related to file merging + // ------------------------------------------------------------------------ + + /** + * Whether to enable merging multiple checkpoint files into one, which will greatly reduce the + * number of small checkpoint files. See FLIP-306 for details. + */ + @Experimental + @Documentation.Section(Documentation.Sections.FILE_MERGING) + public static final ConfigOption<Boolean> FILE_MERGING_ENABLED = + ConfigOptions.key("state.checkpoints.file-merging.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to enable merging multiple checkpoint files into one, which will greatly reduce" + + " the number of small checkpoint files."); + + /** + * Whether to allow merging data of multiple checkpoints into one physical file. If this option + * is set to false, only files within checkpoint boundaries will be merged. Otherwise, it is + * possible for the logical files of different checkpoints to share the same physical file. + */ + @Experimental + @Documentation.Section(Documentation.Sections.FILE_MERGING) + public static final ConfigOption<Boolean> FILE_MERGING_ACROSS_BOUNDARY = + ConfigOptions.key("state.checkpoints.file-merging.across-checkpoint-boundary") + .booleanType() + .defaultValue(false) + .withDescription( + Description.builder() + .text( + "Only relevant if %s is enabled.", + TextElement.code(FILE_MERGING_ENABLED.key())) + .linebreak() + .text( + "Whether to allow merging data of multiple checkpoints into one physical file. " + + "If this option is set to false, " + + "only merge files within checkpoint boundaries will be merged. " + + "Otherwise, it is possible for the logical files of different " + + "checkpoints to share the same physical file.") + .build()); + + /** The max size of a physical file for merged checkpoints. */ + @Experimental + @Documentation.Section(Documentation.Sections.FILE_MERGING) + public static final ConfigOption<MemorySize> FILE_MERGING_MAX_FILE_SIZE = + ConfigOptions.key("state.checkpoints.file-merging.max-file-size") + .memoryType() + .defaultValue(MemorySize.parse("32MB")) + .withDescription("Max size of a physical file for merged checkpoints."); + + /** + * Whether to use Blocking or Non-Blocking pool for merging physical files. A Non-Blocking pool + * will always provide usable physical file without blocking. It may create many physical files + * if poll file frequently. When poll a small file from a Blocking pool, it may be blocked until + * the file is returned. + */ + @Experimental + @Documentation.Section(Documentation.Sections.FILE_MERGING) + public static final ConfigOption<Boolean> FILE_MERGING_POOL_BLOCKING = + ConfigOptions.key("state.checkpoints.file-merging.pool-blocking") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to use Blocking or Non-Blocking pool for merging physical files. " + + "A Non-Blocking pool will always provide usable physical file without blocking. It may create many physical files if poll file frequently. " + + "When poll a small file from a Blocking pool, it may be blocked until the file is returned."); + + /** + * The upper limit of the file pool size based on the number of subtasks within each TM (only + * for merging private state at Task Manager level). + */ + @Experimental + @Documentation.Section(Documentation.Sections.FILE_MERGING) + public static final ConfigOption<Integer> FILE_MERGING_MAX_SUBTASKS_PER_FILE = + ConfigOptions.key("state.checkpoints.file-merging.max-subtasks-per-file") + .intType() + .defaultValue(0) + .withDescription( + "The upper limit of the file pool size based on the number of subtasks within each TM" + + "(only for merging private state at Task Manager level)."); + + /** + * A threshold that triggers a compaction (re-uploading) of one physical file. If the amount of + * invalid data in a physical file exceeds the threshold, a new physical file will be created + * and uploaded. + */ + @Experimental + @Documentation.Section(Documentation.Sections.FILE_MERGING) + public static final ConfigOption<Float> FILE_MERGING_MAX_SPACE_AMPLIFICATION = Review Comment: I'd suggest annotate `FILE_MERGING_MAX_SPACE_AMPLIFICATION`, `FILE_MERGING_MAX_SUBTASKS_PER_FILE` and `FILE_MERGING_POOL_BLOCKING` with `@Documentation.ExcludeFromDocumentation`, since these are to be implemented or user are not able to specify currently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org