Hi Roman,
Thanks for the quick response. It wasn't that, but your comment about erasure
made me realize I should have debugged the code and looked at the types.
Apparently setting TTL changes the serializer, so I also had to add TTL in the
WindowReaderFunction.
Regards,
Alexis.
-Original Message-
From: Roman Khachatryan
Sent: Freitag, 8. April 2022 11:48
To: Alexis Sarda-Espinosa
Cc: user@flink.apache.org
Subject: Re: Using state processor API to read state defined with a TypeHint
Hi Alexis,
I think your setup is fine, but probably Java type erasure makes Flink consider
the two serializers as different.
Could you try creating a MapStateDescriptor by explicitly providing serializers
(constructed manually)?
Regards,
Roman
On Fri, Apr 8, 2022 at 10:01 AM Alexis Sarda-Espinosa
wrote:
>
> Hi everyone,
>
>
>
> I have a ProcessWindowFunction that uses Global window state. It uses
> MapState with a descriptor defined like this:
>
>
>
> MapStateDescriptor> msd = new MapStateDescriptor<>(
>
> "descriptorName",
>
> TypeInformation.of(Long.class),
>
> TypeInformation.of(new TypeHint>() {})
>
> );
>
>
>
> Now I’m trying to access a checkpoint’s state data to read that (created with
> RocksDB as backend in Flink 1.14.4). I have a WindowReaderFunction Integer, String, TimeWindow> that defines the same descriptor and calls this
> in readWindow:
>
>
>
> MapState> mapState =
> context.globalState().getMapState(msd);
>
>
>
> After loading the savepoint with EmbeddedRocksDBStateBackend(true), I try to
> configure the reader function like this:
>
>
>
> savepoint
>
> .window(SlidingEventTimeWindows.of(Time.minutes(11L),
> Time.minutes(1L)))
>
> .process(
>
> "my-uid",
>
> new StateReaderFunction(),
>
> Types.STRING,
>
> TypeInformation.of(MyPojo.class),
>
> Types.INT
>
> )
>
> .print();
>
>
>
> But I am getting this exception:
>
>
>
> Caused by: org.apache.flink.util.StateMigrationException: The new state
> serializer
> (org.apache.flink.api.common.typeutils.base.MapSerializer@a07c9263) must not
> be incompatible with the old state serializer
> (org.apache.flink.api.common.typeutils.base.MapSerializer@706b3103).
>
>
>
> Does someone know what I’m doing wrong in my setup?
>
>
>
> Regards,
>
> Alexis.
>
>