Hello,

I am attempting to upgrade  Beam app from 2.5.0 running on Flink 1.4.0 to
Beam 2.6.0 running on Flink 1.5.0. I am not aware of any state migration
changes needed for Flink 1.4.0 -> 1.5.0 so I am just starting a new App
with updated libs from Flink save-point captured by previous version of the
app.

There is not change in topology. Job is accepted without error to the new
cluster which suggests that all operators are matched with state based on
IDs. However, app runs only few seccons and then crash with:

java.lang.Exception: Exception while creating StreamOperatorStateContext.
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:730)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:295)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.util.FlinkException: Could not restore
operator state backend for
DoFnOperator_43996aa2908fa46bb50160f751f8cc09_(1/100) from any of the
1 provided restore options.
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:240)
        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139)
        ... 5 more
Caused by: java.io.IOException: Unable to restore operator state
[bundle-buffer-tag]. The previous serializer of the operator state
must be present; the serializer could have been removed from the
classpath, or its implementation have changed and could not be loaded.
This is a temporary restriction that will be fixed in future versions.
        at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:514)
        at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:63)
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151)
        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123)
        ... 7 more


Does this mean anything to anyone? Am I doing anything wrong or did
FlinkRunner change in some way? The mentioned "bundle-buffer-tag" seems to
be too deep internal in runner for my reach.

Any help is much appreciated.

Best,
Jozo

Reply via email to