Hello, We have experienced some weird issues with POJO mapState in a streaming job upon checkpointing when removing state, then modifying the state POJO and restoring job
Caused by: java.lang.NullPointerException at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.<init>(PojoSerializer.java:123) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:186) Reproduced in Flink 1.10 & 1.11 (full stack below) *Context :* We have a streaming job with a state name "buffer" and POJO Buffer inside a CoFlatMap function MyCoFlat: * public class MyCoFlat extends RichCoFlatMapFunction<Pojo1, Pojo1, v> {* * transient MapState<String, Buffer> buffer;* * @Override* * public void open(Configuration parameters) {* * buffer = getRuntimeContext().getMapState(new MapStateDescriptor<>("buffer", String.class, Buffer.class));* * } ....* Buffer : * public class Buffer { private String field1; private String field2; private String field3; ... + empty constructor + getter / setter for POJO consideration* We had some troubles with our job, so we rework 2 things : - we removed field2 in Buffer class, - we stopped using "buffer" state anymore When restoring with savepoint (--allowNonRestoredState) we have the exception below The job is submitted to the cluster but fails on checkpointing, job is totally stuck. *Debug:*Debugging showed us some stuff, the exception is raised here (as expected): * public PojoSerializer( Class<T> clazz, TypeSerializer<?>[] fieldSerializers, Field[] fields, ExecutionConfig executionConfig) { this.clazz = checkNotNull(clazz); this.fieldSerializers = (TypeSerializer<Object>[]) checkNotNull(fieldSerializers); this.fields = checkNotNull(fields); this.numFields = fieldSerializers.length; this.executionConfig = checkNotNull(executionConfig); for (int i = 0; i < numFields; i++) { this.fields[i].setAccessible(true); <---- HERE }* In our fields, we have field[0] & field[2] but field[1] is totally missing from the array, that's why we have the NPE over here, when i=1 So what we have done is to put this state back in our streaming job (with the missing field and POJO), redeploy with old savepoint and this went totally fine Then we have redeploy a job without this state This has been a 2 times deployment for our job (1 -> modify the POJO, 2 -> remove the state using this POJO) But the non-used-anymore state is still (at least the serializer) in the savepoints, we could be facing this problem again when we will modify Buffer POJO later. Finally we just modify a savepoint with API and remove this state once for all, and restart from it. I have a couple of questions here: Why does flink keep a non-used state in a savepoint (even if it can not map it into a new topology and allowNonRestoredState is checked ?) Why does flink not handle this case ? Behaviour seems to be different between an existing POJO state and this non used POJO state How can I clean my savepoint ? I don't want them to contain non-used state If anybody has experienced an issue like that before or knows how to handle this, I would be glad to discuss ! Best regards, ------------------ Bastien DINE Data Architect / Software Engineer / Sysadmin