Nico Kruber created FLINK-6791:
----------------------------------

             Summary: Using MemoryStateBackend as checkpoint stream back-end 
may block checkpoint/savepoint creation
                 Key: FLINK-6791
                 URL: https://issues.apache.org/jira/browse/FLINK-6791
             Project: Flink
          Issue Type: Bug
          Components: State Backends, Checkpointing
    Affects Versions: 1.2.1, 1.3.0
            Reporter: Nico Kruber


If the `MemoryStateBackend` is used as the checkpoint stream back-end in e.g. 
RocksDBStateBackend, it will block further checkpoint/savepoint creation if the 
checkpoint data reaches the back-end's max state size. In that case, an error 
message is logged at the task manager but the save-/checkpoint never completes 
and although the job continues, no further checkpoints will be made.

Please see the following example that should be reproducible:

{code:java}
env.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend(1000 * 1024 
* 1024, false), false));

env.enableCheckpointing(100L);

final long numKeys = 100_000L;
DataStreamSource<Tuple1<Long>> source1 =
                env.addSource(new RichParallelSourceFunction<Tuple1<Long>>() {
                        private volatile boolean running = true;

                        @Override
                        public void run(SourceContext<Tuple1<Long>> ctx) throws 
Exception {
                                long counter = 0;

                                while (running) {
                                        synchronized (ctx.getCheckpointLock()) {
                                                ctx.collect(Tuple1.of(counter % 
numKeys));
                                                counter++;
                                        }

                                        Thread.yield();
                                }
                        }

                        @Override
                        public void cancel() {
                                running = false;
                        }
                });

source1.keyBy(0)
                .map(new RichMapFunction<Tuple1<Long>, Tuple1<Long>>() {
                        private transient ValueState<List<Long>> val;

                        @Override
                        public Tuple1<Long> map(Tuple1<Long> value)
                                        throws Exception {
                                val.update(Collections.nCopies(100, value.f0));
                                return value;
                        }

                        @Override
                        public void open(final Configuration parameters) throws 
Exception {
                                ValueStateDescriptor<List<Long>> descriptor =
                                                new ValueStateDescriptor<>(
                                                                "data", // the 
state name
                                                                
TypeInformation.of(new TypeHint<List<Long>>() {
                                                                }) // type 
information
                                                );
                                val = getRuntimeContext().getState(descriptor);
                        }
                }).uid("identity-map-with-state")
                .addSink(new DiscardingSink<Tuple1<Long>>());

env.execute("failingsnapshots");
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to