Hi, I'm trying to load a list state using the State Processor API (Flink
1.14.3)

Cluster settings:

state.backend: rocksdb
state.backend.incremental: true
(...)


Code:

val env = ExecutionEnvironment.getExecutionEnvironment
val savepoint = Savepoint.load(env, pathToSavepoint, new
EmbeddedRocksDBStateBackend(true))

val tpe = new
MessageTypeInformation(MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS,
null) // using Flink Stateful Functions
val envelopeSerializer: TypeSerializer[Message] =
tpe.createSerializer(env.getConfig)
val listDescriptor = new
ListStateDescriptor[Message]("delayed-message-buffer",
envelopeSerializer.duplicate)

(...)
override def open(parameters: Configuration): Unit = {
    getRuntimeContext.getListState(listDescriptor) // fails with error [1]
}


Error [1]:

Caused by: java.io.IOException: Failed to restore timer state

            at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177)

            at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:64)

            at
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183)

            at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)

            at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)

            at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)

            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)

            at java.base/java.lang.Thread.run(Thread.java:829)

Caused by: java.lang.RuntimeException: Error while getting state

            at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)

            at
org.apache.flink.state.api.runtime.SavepointRuntimeContext.getListState(SavepointRuntimeContext.java:213)

            at
x.x.x.x.x.myModule.StateReader$$anon$1.open(StateReader.scala:527)

            at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)

            at
org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)

            at
org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)

            at
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:174)

            ... 7 more

Caused by: org.apache.flink.util.StateMigrationException: The new namespace
serializer (org.apache.flink.runtime.state.VoidNamespaceSerializer@2806d6da)
must be compatible with the old namespace serializer (
org.apache.flink.api.common.typeutils.base.LongSerializer@52b06bef).

            at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:685)

            at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)

            at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)

            at
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)

            at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73)

            at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)

            at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)

            at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)

            at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(
*DefaultKeyedStateStore*.java:71)

            ... 13 more




It seems that *DefaultKeyedStateStore *always wants to use
VoidNamespaceSerializer.INSTANCE despite my state being created with a
LongSerializer namespace serializer.


Is there anything anyone can immediately see me doing wrong?


Thank you

Fil

Reply via email to