Nico Kruber created FLINK-6791: ---------------------------------- Summary: Using MemoryStateBackend as checkpoint stream back-end may block checkpoint/savepoint creation Key: FLINK-6791 URL: https://issues.apache.org/jira/browse/FLINK-6791 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.2.1, 1.3.0 Reporter: Nico Kruber
If the `MemoryStateBackend` is used as the checkpoint stream back-end in e.g. RocksDBStateBackend, it will block further checkpoint/savepoint creation if the checkpoint data reaches the back-end's max state size. In that case, an error message is logged at the task manager but the save-/checkpoint never completes and although the job continues, no further checkpoints will be made. Please see the following example that should be reproducible: {code:java} env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend(1000 * 1024 * 1024, false), false)); env.enableCheckpointing(100L); final long numKeys = 100_000L; DataStreamSource<Tuple1<Long>> source1 = env.addSource(new RichParallelSourceFunction<Tuple1<Long>>() { private volatile boolean running = true; @Override public void run(SourceContext<Tuple1<Long>> ctx) throws Exception { long counter = 0; while (running) { synchronized (ctx.getCheckpointLock()) { ctx.collect(Tuple1.of(counter % numKeys)); counter++; } Thread.yield(); } } @Override public void cancel() { running = false; } }); source1.keyBy(0) .map(new RichMapFunction<Tuple1<Long>, Tuple1<Long>>() { private transient ValueState<List<Long>> val; @Override public Tuple1<Long> map(Tuple1<Long> value) throws Exception { val.update(Collections.nCopies(100, value.f0)); return value; } @Override public void open(final Configuration parameters) throws Exception { ValueStateDescriptor<List<Long>> descriptor = new ValueStateDescriptor<>( "data", // the state name TypeInformation.of(new TypeHint<List<Long>>() { }) // type information ); val = getRuntimeContext().getState(descriptor); } }).uid("identity-map-with-state") .addSink(new DiscardingSink<Tuple1<Long>>()); env.execute("failingsnapshots"); {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)