Hi,

Just to clarify -
I quickly went through the README of the project, and saw this:
"This error is seen after trying to read from a savepoint that was created
using the same case class as a key."

So, if I understood correctly, you were attempting to use the State
Processor API to access a savepoint that was written with a Scala
DataStream job, correct?

If that's the case, I'm afraid this would not work as of now. See [1] for a
similar scenario that others had also bumped into.
TL;DR is - the State Processor API currently is not guaranteed to work for
snapshots that are written with Scala DataStream jobs.

For now, I'll add a big warning about this to the docs.
But in general, it seems like we might want to consider bumping up the
priority for enabling this, as quite a few users are using the Scala
DataStream API for their jobs.

Just as a side comment: this repo looks like a very interesting project!

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-15719

On Wed, Feb 19, 2020 at 7:03 AM Mark Niehe <mark.ni...@segment.com> wrote:

> Hey all,
>
> I've run into an issue with the State Processor API. To highlight the
> issues I've been having, I've created a reference repository that will
> demonstrate the issue (repository:
> https://github.com/segmentio/flink-state-management).
>
> The current implementation of the pipeline has left us with keyed state
> that we no longer need, and we don't have references some of the old keys.
> My plan was to:
> 1. create a savepoint
> 2. read the keys from each operator (using State Processor API)
> 3. filter out all the keys that are longer used
> 4. bootstrap a new savepoint that contains the filtered state
>
> I managed to get this working using a sample pipeline and a very basic key
> (a string), but when I switched the key to be something more complex (a
> case class of two strings), I started seeing this exception:
> Caused by: org.apache.flink.util.StateMigrationException: The new key
> serializer must be compatible.
> at
> org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:170)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:157)
> at
> org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:141)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
> ... 13 more
>
> Has anyone come across this before and figured out a fix? Any help you can
> give would be greatly appreciated!
>
> Thanks,
> --
> <http://segment.com/>
> Mark Niehe ·  Software Engineer
> Integrations
> <https://segment.com/catalog?utm_source=signature&utm_medium=email>  ·
> Blog <https://segment.com/blog?utm_source=signature&utm_medium=email>  ·  
> We're
> Hiring! <https://segment.com/jobs?utm_source=signature&utm_medium=email>
>

Reply via email to