Hi Bastien,

Flink supports to register state via state descriptor when calling 
runtimeContext.getState(). However, once the state is registered, it cannot be 
removed anymore. And when you restore from savepoint, the previous state is 
registered again [1]. Flink does not to drop state directly and you could use 
state processor API [2] to remove related state.


From: bastien dine <bastien.d...@gmail.com>
Sent: Tuesday, December 8, 2020 0:28
To: user <user@flink.apache.org>
Subject: Problem when restoring from savepoint with missing state & POJO 

We have experienced some weird issues with POJO mapState in a streaming job 
upon checkpointing when removing state, then modifying the state POJO and 
restoring job

Caused by: java.lang.NullPointerException

Reproduced in Flink 1.10 & 1.11
(full stack below)

Context :
We have a streaming job with a state name "buffer" and POJO Buffer inside a 
CoFlatMap function

public class MyCoFlat extends RichCoFlatMapFunction<Pojo1, Pojo1, v> {
transient MapState<String, Buffer> buffer;
public void open(Configuration parameters) {
buffer = getRuntimeContext().getMapState(new MapStateDescriptor<>("buffer", 
String.class, Buffer.class));

Buffer :
public class Buffer {
private String field1;
private String field2;
private String field3;
... + empty constructor  + getter / setter for POJO consideration

We had some troubles with our job, so we rework 2 things :
 - we removed field2 in Buffer class,
 - we stopped using "buffer" state anymore

When restoring with savepoint (--allowNonRestoredState) we have the exception 
The job is submitted to the cluster but fails on checkpointing, job is totally 

Debugging showed us some stuff, the exception is raised here (as expected):

public PojoSerializer(
Class<T> clazz,
TypeSerializer<?>[] fieldSerializers,
Field[] fields,
ExecutionConfig executionConfig) {
this.clazz = checkNotNull(clazz);
this.fieldSerializers = (TypeSerializer<Object>[]) 
this.fields = checkNotNull(fields);
this.numFields = fieldSerializers.length;
this.executionConfig = checkNotNull(executionConfig);
for (int i = 0; i < numFields; i++) {
this.fields[i].setAccessible(true); <---- HERE

In our fields, we have field[0] & field[2] but field[1] is totally missing from 
the array, that's why we have the NPE over here, when i=1

So what we have done is to put this state back in our streaming job (with the 
missing field and POJO), redeploy with old savepoint and this went totally fine
Then we have redeploy a job without this state
This has been a 2 times deployment for our job (1 -> modify the POJO, 2 -> 
remove the state using this POJO)
But the non-used-anymore state is still (at least the serializer) in the 
savepoints, we could be facing this problem again when we will modify Buffer 
POJO later.
Finally we just modify a savepoint with API and remove this state once for all, 
and restart from it.

I have a couple of questions here:
Why does flink keep a non-used state in a savepoint (even if it can not map it 
into a new topology and allowNonRestoredState is checked ?)
Why does flink not handle this case ? Behaviour seems to be different between 
an existing POJO state and this non used POJO state
How can I clean my savepoint ? I don't want them to contain non-used state

If anybody has experienced an issue like that before or knows how to handle 
this, I would be glad to discuss !
Best regards,


Bastien DINE
Data Architect / Software Engineer / Sysadmin

Reply via email to