Feng Jiajie created FLINK-15152:
-----------------------------------
Summary: Job running without periodic checkpoint for stop failed
at the beginning
Key: FLINK-15152
URL: https://issues.apache.org/jira/browse/FLINK-15152
Project: Flink
Issue Type: Bug
Components: Runtime / Checkpointing
Affects Versions: 1.9.1
Reporter: Feng Jiajie
I have a streaming job configured with periodically checkpoint, but after one
week running, I found there isn't any checkpoint file.
h2. Reproduce the problem:
# Job was submitted to YARN:
{code:java}
bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m
flink-example-1.0-SNAPSHOT.jar{code}
# Then immediately, before all the task switch to RUNNING (about seconds),
I(actually a job control script) send a stop with savepoint command by flink
cli:
{code:java}
bin/flink stop -yid application_1575872737452_0019
f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir
{code}
Then the job task continues to run normally, but no checkpointing.
h2. The cause of the problem:
# "stop with savepoint" command call the code
stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612)
and then triggerSynchronousSavepoint:
{code:java}
// we stop the checkpoint coordinator so that we are guaranteed
// to have only the data of the synchronous savepoint committed.
// in case of failure, and if the job restarts, the coordinator
// will be restarted by the CheckpointCoordinatorDeActivator.
checkpointCoordinator.stopCheckpointScheduler();{code}
# but "before all the task switch to RUNNING", checkpoint failed at
org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509
{code:java}
LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {}
instead. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job,
ExecutionState.RUNNING,
ee.getState());
throw new
CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code}
# finally, "stop with savepoint" failed, with
"checkpointCoordinator.stopCheckpointScheduler()" but without the termination
of the job.
sample code:
{code:java}
public class StreamingJob {
private static StateBackend makeRocksdbBackend() throws IOException {
RocksDBStateBackend rocksdbBackend = new
RocksDBStateBackend("file:///tmp/aaa");
rocksdbBackend.enableTtlCompactionFilter();
rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
return rocksdbBackend;
}
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// 10 sec
env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE);
env.setStateBackend(makeRocksdbBackend());
env.setRestartStrategy(RestartStrategies.noRestart());
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
checkpointConfig.setFailOnCheckpointingErrors(true);
DataStream<String> text = env.socketTextStream("127.0.0.1", 8912, "\n");
text.map(new MapFunction<String, Tuple2<Long, Long>>() {
@Override
public Tuple2<Long, Long> map(String s) {
String[] s1 = s.split(" ");
return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1]));
}
}).keyBy(0).flatMap(new CountWindowAverage()).print();
env.execute("Flink Streaming Java API Skeleton");
}
public static class CountWindowAverage extends
RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
private transient ValueState<Tuple2<Long, Long>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>>
out) throws Exception {
Tuple2<Long, Long> currentSum = sum.value();
currentSum.f0 += 1;
currentSum.f1 += input.f1;
sum.update(currentSum);
out.collect(new Tuple2<>(input.f0, currentSum.f1));
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {
}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was
set
sum = getRuntimeContext().getState(descriptor);
}
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)