[ https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Feng Jiajie updated FLINK-15152: -------------------------------- Description: I have a streaming job configured with periodically checkpoint, but after one week running, I found there isn't any checkpoint file. h2. Reproduce the problem: 1. Job was submitted to YARN: {code:java} bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m flink-example-1.0-SNAPSHOT.jar{code} 2. Then immediately, before all the task switch to RUNNING (about seconds), I(actually a job control script) send a "stop with savepoint" command by flink cli: {code:java} bin/flink stop -yid application_1575872737452_0019 f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir {code} Then the job task continues to run normally, but no checkpointing. h2. The cause of the problem: 1. "stop with savepoint" command call the code stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612) and then triggerSynchronousSavepoint: {code:java} // we stop the checkpoint coordinator so that we are guaranteed // to have only the data of the synchronous savepoint committed. // in case of failure, and if the job restarts, the coordinator // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler();{code} 2. but "before all the task switch to RUNNING", checkpoint failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509 {code:java} LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.", tasksToTrigger[i].getTaskNameWithSubtaskIndex(), job, ExecutionState.RUNNING, ee.getState()); throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code} 3. finally, "stop with savepoint" failed, with "checkpointCoordinator.stopCheckpointScheduler()" but without the termination of the job. sample code: {code:java} public class StreamingJob { private static StateBackend makeRocksdbBackend() throws IOException { RocksDBStateBackend rocksdbBackend = new RocksDBStateBackend("file:///tmp/aaa"); rocksdbBackend.enableTtlCompactionFilter(); rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); return rocksdbBackend; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 10 sec env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE); env.setStateBackend(makeRocksdbBackend()); env.setRestartStrategy(RestartStrategies.noRestart()); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); checkpointConfig.setFailOnCheckpointingErrors(true); DataStream<String> text = env.socketTextStream("127.0.0.1", 8912, "\n"); text.map(new MapFunction<String, Tuple2<Long, Long>>() { @Override public Tuple2<Long, Long> map(String s) { String[] s1 = s.split(" "); return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1])); } }).keyBy(0).flatMap(new CountWindowAverage()).print(); env.execute("Flink Streaming Java API Skeleton"); } public static class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { private transient ValueState<Tuple2<Long, Long>> sum; @Override public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception { Tuple2<Long, Long> currentSum = sum.value(); currentSum.f0 += 1; currentSum.f1 += input.f1; sum.update(currentSum); out.collect(new Tuple2<>(input.f0, currentSum.f1)); } @Override public void open(Configuration config) { ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>( "average", // the state name TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() { }), // type information Tuple2.of(0L, 0L)); // default value of the state, if nothing was set sum = getRuntimeContext().getState(descriptor); } } } {code} was: I have a streaming job configured with periodically checkpoint, but after one week running, I found there isn't any checkpoint file. h2. Reproduce the problem: 1. Job was submitted to YARN: {code:java} bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m flink-example-1.0-SNAPSHOT.jar{code} 2. Then immediately, before all the task switch to RUNNING (about seconds), I(actually a job control script) send a stop with savepoint command by flink cli: {code:java} bin/flink stop -yid application_1575872737452_0019 f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir {code} Then the job task continues to run normally, but no checkpointing. h2. The cause of the problem: 1. "stop with savepoint" command call the code stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612) and then triggerSynchronousSavepoint: {code:java} // we stop the checkpoint coordinator so that we are guaranteed // to have only the data of the synchronous savepoint committed. // in case of failure, and if the job restarts, the coordinator // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler();{code} 2. but "before all the task switch to RUNNING", checkpoint failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509 {code:java} LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.", tasksToTrigger[i].getTaskNameWithSubtaskIndex(), job, ExecutionState.RUNNING, ee.getState()); throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code} 3. finally, "stop with savepoint" failed, with "checkpointCoordinator.stopCheckpointScheduler()" but without the termination of the job. sample code: {code:java} public class StreamingJob { private static StateBackend makeRocksdbBackend() throws IOException { RocksDBStateBackend rocksdbBackend = new RocksDBStateBackend("file:///tmp/aaa"); rocksdbBackend.enableTtlCompactionFilter(); rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); return rocksdbBackend; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 10 sec env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE); env.setStateBackend(makeRocksdbBackend()); env.setRestartStrategy(RestartStrategies.noRestart()); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); checkpointConfig.setFailOnCheckpointingErrors(true); DataStream<String> text = env.socketTextStream("127.0.0.1", 8912, "\n"); text.map(new MapFunction<String, Tuple2<Long, Long>>() { @Override public Tuple2<Long, Long> map(String s) { String[] s1 = s.split(" "); return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1])); } }).keyBy(0).flatMap(new CountWindowAverage()).print(); env.execute("Flink Streaming Java API Skeleton"); } public static class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { private transient ValueState<Tuple2<Long, Long>> sum; @Override public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception { Tuple2<Long, Long> currentSum = sum.value(); currentSum.f0 += 1; currentSum.f1 += input.f1; sum.update(currentSum); out.collect(new Tuple2<>(input.f0, currentSum.f1)); } @Override public void open(Configuration config) { ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>( "average", // the state name TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() { }), // type information Tuple2.of(0L, 0L)); // default value of the state, if nothing was set sum = getRuntimeContext().getState(descriptor); } } } {code} > Job running without periodic checkpoint for stop failed at the beginning > ------------------------------------------------------------------------ > > Key: FLINK-15152 > URL: https://issues.apache.org/jira/browse/FLINK-15152 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.9.1 > Reporter: Feng Jiajie > Priority: Major > Labels: checkpoint > > I have a streaming job configured with periodically checkpoint, but after one > week running, I found there isn't any checkpoint file. > h2. Reproduce the problem: > 1. Job was submitted to YARN: > {code:java} > bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m > flink-example-1.0-SNAPSHOT.jar{code} > 2. Then immediately, before all the task switch to RUNNING (about seconds), > I(actually a job control script) send a "stop with savepoint" command by > flink cli: > {code:java} > bin/flink stop -yid application_1575872737452_0019 > f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir > {code} > Then the job task continues to run normally, but no checkpointing. > h2. The cause of the problem: > 1. "stop with savepoint" command call the code > stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612) > and then triggerSynchronousSavepoint: > {code:java} > // we stop the checkpoint coordinator so that we are guaranteed > // to have only the data of the synchronous savepoint committed. > // in case of failure, and if the job restarts, the coordinator > // will be restarted by the CheckpointCoordinatorDeActivator. > checkpointCoordinator.stopCheckpointScheduler();{code} > 2. but "before all the task switch to RUNNING", checkpoint failed at > org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509 > {code:java} > LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} > instead. Aborting checkpoint.", > tasksToTrigger[i].getTaskNameWithSubtaskIndex(), > job, > ExecutionState.RUNNING, > ee.getState()); > throw new > CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code} > 3. finally, "stop with savepoint" failed, with > "checkpointCoordinator.stopCheckpointScheduler()" but without the termination > of the job. > > sample code: > {code:java} > public class StreamingJob { > private static StateBackend makeRocksdbBackend() throws IOException { > RocksDBStateBackend rocksdbBackend = new > RocksDBStateBackend("file:///tmp/aaa"); > rocksdbBackend.enableTtlCompactionFilter(); > > rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); > return rocksdbBackend; > } > public static void main(String[] args) throws Exception { > // set up the streaming execution environment > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // 10 sec > env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE); > env.setStateBackend(makeRocksdbBackend()); > env.setRestartStrategy(RestartStrategies.noRestart()); > CheckpointConfig checkpointConfig = env.getCheckpointConfig(); > checkpointConfig.enableExternalizedCheckpoints( > > CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > checkpointConfig.setFailOnCheckpointingErrors(true); > DataStream<String> text = env.socketTextStream("127.0.0.1", 8912, "\n"); > text.map(new MapFunction<String, Tuple2<Long, Long>>() { > @Override > public Tuple2<Long, Long> map(String s) { > String[] s1 = s.split(" "); > return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1])); > } > }).keyBy(0).flatMap(new CountWindowAverage()).print(); > env.execute("Flink Streaming Java API Skeleton"); > } > public static class CountWindowAverage extends > RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { > private transient ValueState<Tuple2<Long, Long>> sum; > @Override > public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, > Long>> out) throws Exception { > Tuple2<Long, Long> currentSum = sum.value(); > currentSum.f0 += 1; > currentSum.f1 += input.f1; > sum.update(currentSum); > out.collect(new Tuple2<>(input.f0, currentSum.f1)); > } > @Override > public void open(Configuration config) { > ValueStateDescriptor<Tuple2<Long, Long>> descriptor = > new ValueStateDescriptor<>( > "average", // the state name > TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() { > }), // type information > Tuple2.of(0L, 0L)); // default value of the state, if nothing > was set > sum = getRuntimeContext().getState(descriptor); > } > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)