AFAIK the serializer used here is the CoderTypeSerializer which may not
be recoverable because of changes to the contained Coder
(TaggedKvCoder). It doesn't currently have a serialVersionUID, so even
small changes could break serialization backwards-compatibility.

As of now Beam doesn't offer the same upgrade guarantees as Flink [1].
This should be improved for the next release.

Thanks,
Max

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table

On 20.08.18 17:46, Stephan Ewen wrote:
> Hi Jozef!
> 
> When restoring state, the serializer that created the state must still
> be available, so the state can be read.
> It looks like some serializer classes were removed between Beam versions
> (or changed in an incompatible manner).
> 
> Backwards compatibility of an operator implementation needs cooperation
> from the operator. Withing Flink itself, when we change the way an
> operator uses state, we keep the old codepath and classes in a
> "backwards compatibility restore" that takes the old state and brings it
> into the shape of the new state. 
> 
> I am not deeply into the of how Beam and the Flink runner implement
> their use of state, but it looks this part is not present, which could
> mean that savepoints taken from Beam applications are not backwards
> compatible.
> 
> 
> On Mon, Aug 20, 2018 at 4:03 PM, Jozef Vilcek <jozo.vil...@gmail.com
> <mailto:jozo.vil...@gmail.com>> wrote:
> 
>     Hello,
> 
>     I am attempting to upgrade  Beam app from 2.5.0 running on Flink
>     1.4.0 to Beam 2.6.0 running on Flink 1.5.0. I am not aware of any
>     state migration changes needed for Flink 1.4.0 -> 1.5.0 so I am just
>     starting a new App with updated libs from Flink save-point captured
>     by previous version of the app.
> 
>     There is not change in topology. Job is accepted without error to
>     the new cluster which suggests that all operators are matched with
>     state based on IDs. However, app runs only few seccons and then
>     crash with:
> 
>     java.lang.Exception: Exception while creating StreamOperatorStateContext.
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>       at java.lang.Thread.run(Thread.java:745)
>     Caused by: org.apache.flink.util.FlinkException: Could not restore 
> operator state backend for 
> DoFnOperator_43996aa2908fa46bb50160f751f8cc09_(1/100) from any of the 1 
> provided restore options.
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:240)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139)
>       ... 5 more
>     Caused by: java.io.IOException: Unable to restore operator state 
> [bundle-buffer-tag]. The previous serializer of the operator state must be 
> present; the serializer could have been removed from the classpath, or its 
> implementation have changed and could not be loaded. This is a temporary 
> restriction that will be fixed in future versions.
>       at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:514)
>       at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:63)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>       ... 7 more
> 
> 
>     Does this mean anything to anyone? Am I doing anything wrong or did
>     FlinkRunner change in some way? The mentioned "bundle-buffer-tag"
>     seems to be too deep internal in runner for my reach.
> 
>     Any help is much appreciated.
> 
>     Best,
>     Jozo
> 
> 

Reply via email to