AFAIK the serializer used here is the CoderTypeSerializer which may not be recoverable because of changes to the contained Coder (TaggedKvCoder). It doesn't currently have a serialVersionUID, so even small changes could break serialization backwards-compatibility.
As of now Beam doesn't offer the same upgrade guarantees as Flink [1]. This should be improved for the next release. Thanks, Max [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/upgrading.html#compatibility-table On 20.08.18 17:46, Stephan Ewen wrote: > Hi Jozef! > > When restoring state, the serializer that created the state must still > be available, so the state can be read. > It looks like some serializer classes were removed between Beam versions > (or changed in an incompatible manner). > > Backwards compatibility of an operator implementation needs cooperation > from the operator. Withing Flink itself, when we change the way an > operator uses state, we keep the old codepath and classes in a > "backwards compatibility restore" that takes the old state and brings it > into the shape of the new state. > > I am not deeply into the of how Beam and the Flink runner implement > their use of state, but it looks this part is not present, which could > mean that savepoints taken from Beam applications are not backwards > compatible. > > > On Mon, Aug 20, 2018 at 4:03 PM, Jozef Vilcek <jozo.vil...@gmail.com > <mailto:jozo.vil...@gmail.com>> wrote: > > Hello, > > I am attempting to upgrade Beam app from 2.5.0 running on Flink > 1.4.0 to Beam 2.6.0 running on Flink 1.5.0. I am not aware of any > state migration changes needed for Flink 1.4.0 -> 1.5.0 so I am just > starting a new App with updated libs from Flink save-point captured > by previous version of the app. > > There is not change in topology. Job is accepted without error to > the new cluster which suggests that all operators are matched with > state based on IDs. However, app runs only few seccons and then > crash with: > > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.flink.util.FlinkException: Could not restore > operator state backend for > DoFnOperator_43996aa2908fa46bb50160f751f8cc09_(1/100) from any of the 1 > provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:240) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139) > ... 5 more > Caused by: java.io.IOException: Unable to restore operator state > [bundle-buffer-tag]. The previous serializer of the operator state must be > present; the serializer could have been removed from the classpath, or its > implementation have changed and could not be loaded. This is a temporary > restriction that will be fixed in future versions. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:514) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:63) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) > ... 7 more > > > Does this mean anything to anyone? Am I doing anything wrong or did > FlinkRunner change in some way? The mentioned "bundle-buffer-tag" > seems to be too deep internal in runner for my reach. > > Any help is much appreciated. > > Best, > Jozo > >