Hello,
We have experienced some weird issues with POJO mapState in a streaming job
upon checkpointing when removing state, then modifying the state POJO and
restoring job

Caused by: java.lang.NullPointerException
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.<init>(PojoSerializer.java:123)
at
org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:186)

Reproduced in Flink 1.10 & 1.11
(full stack below)


*Context :* We have a streaming job with a state name "buffer" and POJO
Buffer inside a CoFlatMap function

MyCoFlat:
* public class MyCoFlat extends RichCoFlatMapFunction<Pojo1, Pojo1, v> {*
* transient MapState<String, Buffer> buffer;*
* @Override*
* public void open(Configuration parameters) {*
* buffer = getRuntimeContext().getMapState(new
MapStateDescriptor<>("buffer", String.class, Buffer.class));*

* } ....*

Buffer :




* public class Buffer { private String field1; private String field2;
private String field3; ... + empty constructor  + getter / setter for POJO
consideration*

We had some troubles with our job, so we rework 2 things :
 - we removed field2 in Buffer class,
 - we stopped using "buffer" state anymore

When restoring with savepoint (--allowNonRestoredState) we have the
exception below
The job is submitted to the cluster but fails on checkpointing, job is
totally stuck.


*Debug:*Debugging showed us some stuff, the exception is raised here (as
expected):













* public PojoSerializer( Class<T> clazz, TypeSerializer<?>[]
fieldSerializers, Field[] fields, ExecutionConfig executionConfig) {
this.clazz = checkNotNull(clazz); this.fieldSerializers =
(TypeSerializer<Object>[]) checkNotNull(fieldSerializers); this.fields =
checkNotNull(fields); this.numFields = fieldSerializers.length;
this.executionConfig = checkNotNull(executionConfig); for (int i = 0; i <
numFields; i++) { this.fields[i].setAccessible(true); <---- HERE }*

In our fields, we have field[0] & field[2] but field[1] is totally missing
from the array, that's why we have the NPE over here, when i=1

So what we have done is to put this state back in our streaming job (with
the missing field and POJO), redeploy with old savepoint and this went
totally fine
Then we have redeploy a job without this state
This has been a 2 times deployment for our job (1 -> modify the POJO, 2 ->
remove the state using this POJO)
But the non-used-anymore state is still (at least the serializer) in the
savepoints, we could be facing this problem again when we will
modify Buffer POJO later.
Finally we just modify a savepoint with API and remove this state once for
all, and restart from it.

I have a couple of questions here:
Why does flink keep a non-used state in a savepoint (even if it can not map
it into a new topology and allowNonRestoredState is checked ?)
Why does flink not handle this case ? Behaviour seems to be different
between an existing POJO state and this non used POJO state
How can I clean my savepoint ? I don't want them to contain non-used state

If anybody has experienced an issue like that before or knows how to handle
this, I would be glad to discuss !
Best regards,

------------------

Bastien DINE
Data Architect / Software Engineer / Sysadmin

Reply via email to