[ https://issues.apache.org/jira/browse/FLINK-30412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17651167#comment-17651167 ]
Yun Gao commented on FLINK-30412: --------------------------------- Hi [~xtsong] [~xiaodao] I have a double check on the issue, the logic seems to be: # The `ExecutionGraphBuilder` seems to judge if it will create CheckpointCoordinator based on `snapshotSettings == null`. There is one point that seems a bit unexpected is that it seems in DataStream snapshotSettings is always set, which means we will also create the CheckpointCoordinator in batch mode. This might need some double checks. # For streaming mode, it should be reasonable since even if user disable the periodic checkpoints, they may still take savepoints, thus we still need to create the CheckpointCoordinator. But I'm still not get why the directory is leaked, since it seems after fixed in https://issues.apache.org/jira/browse/FLINK-23180, the directory should not be created if the checkpoint interval == Long.MAX_VALUE : [https://github.com/apache/flink/blob/4ea67f63eb1c43d7adf07c37946b20b525fb015d/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L335.] [~xiaodao] could you also attach the paths of the leaked directories ? > create many checkpoint empty dir when job not enable checkpoint > --------------------------------------------------------------- > > Key: FLINK-30412 > URL: https://issues.apache.org/jira/browse/FLINK-30412 > Project: Flink > Issue Type: Improvement > Components: API / DataStream > Affects Versions: 1.12.7, 1.13.6, 1.15.2 > Reporter: xiaodao > Priority: Major > > when we submit job to flink session cluster , after a long time, we find it > create too much > empty checkpoint dir,and it over hdfs max node limit ; > i found StreamingJobGraphGenerator set snapshot whennever the job is open > checkpoint; > jobGraph.setSnapshotSettings(settings) > {code:java} > private void configureCheckpointing() > CheckpointConfig cfg = streamGraph.getCheckpointConfig(); long interval = > cfg.getCheckpointInterval(); if (interval < MINIMAL_CHECKPOINT_TIME) { // > interval of max value means disable periodic checkpoint interval = > Long.MAX_VALUE; } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)