Hello Yun, Thank you very much for your response, that's what I thought, However, it does not seem possible to remove only one state using the state processor API, We use it a lot, and we can only remove all of the operator states, not one specifically, Am I missing something?
Best Regards, Bastien ------------------ Bastien DINE Data Architect / Software Engineer / Sysadmin bastiendine.io Le mar. 8 déc. 2020 à 08:54, Yun Tang <[email protected]> a écrit : > Hi Bastien, > > Flink supports to register state via state descriptor when > calling runtimeContext.getState(). However, once the state is registered, > it cannot be removed anymore. And when you restore from savepoint, the > previous state is registered again [1]. Flink does not to drop state > directly and you could use state processor API [2] to remove related state. > > > [1] > https://github.com/apache/flink/blob/d94c7a451d22f861bd3f79435f777b427020eba0/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/AbstractRocksDBRestoreOperation.java#L171 > [2] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html > > ------------------------------ > *From:* bastien dine <[email protected]> > *Sent:* Tuesday, December 8, 2020 0:28 > *To:* user <[email protected]> > *Subject:* Problem when restoring from savepoint with missing state & > POJO modification > > Hello, > We have experienced some weird issues with POJO mapState in a streaming > job upon checkpointing when removing state, then modifying the state POJO > and restoring job > > Caused by: java.lang.NullPointerException > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.<init>(PojoSerializer.java:123) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.duplicate(PojoSerializer.java:186) > > Reproduced in Flink 1.10 & 1.11 > (full stack below) > > > *Context : *We have a streaming job with a state name "buffer" and POJO > Buffer inside a CoFlatMap function > > MyCoFlat: > *public class MyCoFlat extends RichCoFlatMapFunction<Pojo1, Pojo1, v> {* > *transient MapState<String, Buffer> buffer;* > *@Override* > *public void open(Configuration parameters) {* > *buffer = getRuntimeContext().getMapState(new > MapStateDescriptor<>("buffer", String.class, Buffer.class));* > > *} ....* > > Buffer : > > > > > *public class Buffer { private String field1; private String field2; > private String field3; ... + empty constructor + getter / setter for POJO > consideration* > > We had some troubles with our job, so we rework 2 things : > - we removed field2 in Buffer class, > - we stopped using "buffer" state anymore > > When restoring with savepoint (--allowNonRestoredState) we have the > exception below > The job is submitted to the cluster but fails on checkpointing, job is > totally stuck. > > > *Debug: *Debugging showed us some stuff, the exception is raised here (as > expected): > > > > > > > > > > > > > > *public PojoSerializer( Class<T> clazz, TypeSerializer<?>[] > fieldSerializers, Field[] fields, ExecutionConfig executionConfig) { > this.clazz = checkNotNull(clazz); this.fieldSerializers = > (TypeSerializer<Object>[]) checkNotNull(fieldSerializers); this.fields = > checkNotNull(fields); this.numFields = fieldSerializers.length; > this.executionConfig = checkNotNull(executionConfig); for (int i = 0; i < > numFields; i++) { this.fields[i].setAccessible(true); <---- HERE }* > > In our fields, we have field[0] & field[2] but field[1] is totally missing > from the array, that's why we have the NPE over here, when i=1 > > So what we have done is to put this state back in our streaming job (with > the missing field and POJO), redeploy with old savepoint and this went > totally fine > Then we have redeploy a job without this state > This has been a 2 times deployment for our job (1 -> modify the POJO, 2 -> > remove the state using this POJO) > But the non-used-anymore state is still (at least the serializer) in the > savepoints, we could be facing this problem again when we will > modify Buffer POJO later. > Finally we just modify a savepoint with API and remove this state once for > all, and restart from it. > > I have a couple of questions here: > Why does flink keep a non-used state in a savepoint (even if it can not > map it into a new topology and allowNonRestoredState is checked ?) > Why does flink not handle this case ? Behaviour seems to be different > between an existing POJO state and this non used POJO state > How can I clean my savepoint ? I don't want them to contain non-used state > > If anybody has experienced an issue like that before or knows how to > handle this, I would be glad to discuss ! > Best regards, > > ------------------ > > Bastien DINE > Data Architect / Software Engineer / Sysadmin >
