Hi Thias

Thank you for your reply. I can re-create a simplified use case at home and
stick it on github if you think it will help.

What I'm trying to access is pretty internal to Flink Stateful Functions.
It seems that a custom operator (
https://github.com/apache/flink-statefun/blob/09a5cba521e9f994896c746ec9f8cc6479403612/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.java#L188)
is accessing a KeyedStateBackend and creating an InternalListState, which
I'm not sure I'll be able to get my hands on using the State Processor API.

The only reason why I need to get my hands on all the states from this
Stateful Functions operator is because later I (think I) have to use
.removeOperator(uid) on a savepoint and replace it .withOperator(uid,
myTransformation) in order to transform my own, non-stateful-functions
keyed state which also belongs to this operator.

Kind regards
Fil

On Tue, 25 Oct 2022 at 16:24, Schwalbe Matthias <matthias.schwa...@viseca.ch>
wrote:

> Hi Filip,
>
>
>
> It looks like, your state primitive is used in the context of Windows:
>
> Keyed state works like this:
>
>    - It uses a cascade of key types to store and retrieve values:
>       - The key (set by .keyBy)
>       - A namespace (usually a VoidNamespace), unless it is used in
>       context of a specific window
>       - An optional key of the state primitive (if it is a MapState)
>
>
>
> In your case the state primitive is (probably) declared in the context of
> a window and hence when loading the state by means of StateProcessorAPI you
> also need to specify the correct Namespace TypeInformation.
>
> If I am in doubt, how a state primitive is set up, I let the debugger stop
> in a process function and walk up the call stack to find the proper
> components implementing it.
>
>
>
> If you share a little more of your code it is much easier to provide
> specific guidance 😊
>
> (e.g. ‘savepoint’ is never used again in your code snippet …)
>
>
>
> Sincere greeting
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* Filip Karnicki <filip.karni...@gmail.com>
> *Sent:* Tuesday, October 25, 2022 10:08 AM
> *To:* user <user@flink.apache.org>
> *Subject:* State Processor API - VoidNamespaceSerializer must be
> compatible with the old namespace serializer LongSerializer
>
>
>
> Hi, I'm trying to load a list state using the State Processor API (Flink
> 1.14.3)
>
>
>
> Cluster settings:
>
>
>
> state.backend: rocksdb
>
> state.backend.incremental: true
>
> (...)
>
>
>
> Code:
>
>
>
> val env = ExecutionEnvironment.getExecutionEnvironment
>
> val savepoint = Savepoint.load(env, pathToSavepoint, new
> EmbeddedRocksDBStateBackend(true))
>
>
> val tpe = new
> MessageTypeInformation(MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS,
> null) // using Flink Stateful Functions
> val envelopeSerializer: TypeSerializer[Message] =
> tpe.createSerializer(env.getConfig)
>
> val listDescriptor = new
> ListStateDescriptor[Message]("delayed-message-buffer",
> envelopeSerializer.duplicate)
>
> (...)
> override def open(parameters: Configuration): Unit = {
>
>     getRuntimeContext.getListState(listDescriptor) // fails with error [1]
>
> }
>
>
>
>
>
> Error [1]:
>
>
>
> Caused by: java.io.IOException: Failed to restore timer state
>
>             at
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177)
>
>             at
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:64)
>
>             at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183)
>
>             at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>
>             at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>
>             at
> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>
>             at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>
>             at java.base/java.lang.Thread.run(Thread.java:829)
>
> Caused by: java.lang.RuntimeException: Error while getting state
>
>             at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
>
>             at
> org.apache.flink.state.api.runtime.SavepointRuntimeContext.getListState(SavepointRuntimeContext.java:213)
>
>             at
> x.x.x.x.x.myModule.StateReader$$anon$1.open(StateReader.scala:527)
>
>             at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>
>             at
> org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)
>
>             at
> org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)
>
>             at
> org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:174)
>
>             ... 7 more
>
> Caused by: org.apache.flink.util.StateMigrationException: The new
> namespace serializer (
> org.apache.flink.runtime.state.VoidNamespaceSerializer@2806d6da) must be
> compatible with the old namespace serializer (
> org.apache.flink.api.common.typeutils.base.LongSerializer@52b06bef).
>
>             at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:685)
>
>             at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
>
>             at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
>
>             at
> org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
>
>             at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73)
>
>             at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)
>
>             at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)
>
>             at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
>
>             at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(
> *DefaultKeyedStateStore*.java:71)
>
>             ... 13 more
>
>
>
>
>
>
>
> It seems that *DefaultKeyedStateStore *always wants to use
> VoidNamespaceSerializer.INSTANCE despite my state being created with a
> LongSerializer namespace serializer.
>
>
>
> Is there anything anyone can immediately see me doing wrong?
>
>
>
> Thank you
>
> Fil
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to