Hi, I'm trying to load a list state using the State Processor API (Flink 1.14.3)
Cluster settings: state.backend: rocksdb state.backend.incremental: true (...) Code: val env = ExecutionEnvironment.getExecutionEnvironment val savepoint = Savepoint.load(env, pathToSavepoint, new EmbeddedRocksDBStateBackend(true)) val tpe = new MessageTypeInformation(MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS, null) // using Flink Stateful Functions val envelopeSerializer: TypeSerializer[Message] = tpe.createSerializer(env.getConfig) val listDescriptor = new ListStateDescriptor[Message]("delayed-message-buffer", envelopeSerializer.duplicate) (...) override def open(parameters: Configuration): Unit = { getRuntimeContext.getListState(listDescriptor) // fails with error [1] } Error [1]: Caused by: java.io.IOException: Failed to restore timer state at org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177) at org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:64) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.RuntimeException: Error while getting state at org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74) at org.apache.flink.state.api.runtime.SavepointRuntimeContext.getListState(SavepointRuntimeContext.java:213) at x.x.x.x.x.myModule.StateReader$$anon$1.open(StateReader.scala:527) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) at org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106) at org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66) at org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:174) ... 7 more Caused by: org.apache.flink.util.StateMigrationException: The new namespace serializer (org.apache.flink.runtime.state.VoidNamespaceSerializer@2806d6da) must be compatible with the old namespace serializer ( org.apache.flink.api.common.typeutils.base.LongSerializer@52b06bef). at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:685) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837) at org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState( *DefaultKeyedStateStore*.java:71) ... 13 more It seems that *DefaultKeyedStateStore *always wants to use VoidNamespaceSerializer.INSTANCE despite my state being created with a LongSerializer namespace serializer. Is there anything anyone can immediately see me doing wrong? Thank you Fil