Hi Jozef!

When restoring state, the serializer that created the state must still be
available, so the state can be read.
It looks like some serializer classes were removed between Beam versions
(or changed in an incompatible manner).

Backwards compatibility of an operator implementation needs cooperation
from the operator. Withing Flink itself, when we change the way an operator
uses state, we keep the old codepath and classes in a "backwards
compatibility restore" that takes the old state and brings it into the
shape of the new state.

I am not deeply into the of how Beam and the Flink runner implement their
use of state, but it looks this part is not present, which could mean that
savepoints taken from Beam applications are not backwards compatible.


On Mon, Aug 20, 2018 at 4:03 PM, Jozef Vilcek <jozo.vil...@gmail.com> wrote:

> Hello,
>
> I am attempting to upgrade  Beam app from 2.5.0 running on Flink 1.4.0 to
> Beam 2.6.0 running on Flink 1.5.0. I am not aware of any state migration
> changes needed for Flink 1.4.0 -> 1.5.0 so I am just starting a new App
> with updated libs from Flink save-point captured by previous version of the
> app.
>
> There is not change in topology. Job is accepted without error to the new
> cluster which suggests that all operators are matched with state based on
> IDs. However, app runs only few seccons and then crash with:
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
> state backend for DoFnOperator_43996aa2908fa46bb50160f751f8cc09_(1/100) from 
> any of the 1 provided restore options.
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:240)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139)
>       ... 5 more
> Caused by: java.io.IOException: Unable to restore operator state 
> [bundle-buffer-tag]. The previous serializer of the operator state must be 
> present; the serializer could have been removed from the classpath, or its 
> implementation have changed and could not be loaded. This is a temporary 
> restriction that will be fixed in future versions.
>       at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:514)
>       at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:63)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
>       ... 7 more
>
>
> Does this mean anything to anyone? Am I doing anything wrong or did
> FlinkRunner change in some way? The mentioned "bundle-buffer-tag" seems to
> be too deep internal in runner for my reach.
>
> Any help is much appreciated.
>
> Best,
> Jozo
>

Reply via email to