Hi everyone,

I'm aware of the compatibility matrix for Flink which is available here
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/ -
which suggests 1.13.x and 1.13.x should work just fine.

However, if we try to restore a 1.13.0 checkpoint with 1.13.2, we'll get a
problem deserializing because (I think, correct me if I'm way off the
mark!) of this in 1.13.2:




private static class NoRescalingDescriptor extends
InflightDataRescalingDescriptor {

private static final long serialVersionUID = 1L;



whereas in 1.13.0 it's


private static class NoRescalingDescriptor extends
InflightDataRescalingDescriptor {
private static final long serialVersionUID = -5544173933105855751L;



Caused by: java.io.InvalidClassException:
org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor$NoRescalingDescriptor;
local class incompatible: stream classdesc serialVersionUID =
-5544173933105855751, local class serialVersionUID = 1
        at java.io.ObjectStreamClass.initNonProxy(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readClassDesc(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
        at java.util.HashMap.readObject(Unknown Source) ~[?:?]
        at jdk.internal.reflect.GeneratedMethodAccessor79.invoke(Unknown 
Source) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source) ~[?:?]
        at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
        at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
        at java.util.HashMap.readObject(Unknown Source) ~[?:?]
        at jdk.internal.reflect.GeneratedMethodAccessor79.invoke(Unknown 
Source) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source) ~[?:?]
        at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
        at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:615)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:593)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:59)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.retrieveCompletedCheckpoint(DefaultCompletedCheckpointStore.java:298)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.recover(DefaultCompletedCheckpointStore.java:151)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1513)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at <unknown class>.get(Unknown Source) ~[?:?]
        at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
~[flink-dist_2.11-1.13.2.jar:1.13.2]
        at <unknown class>.get(Unknown Source) ~[?:?]
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown
Source) ~[?:?]
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
~[?:?]
        at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
Source) ~[?:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
~[?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
~[?:?]
        at java.lang.Thread.run(Unknown Source) ~[?:?]


I'll open a JIRA if we like - I was wondering if we could revert the UID
change in 1.13.4  cas obviously that's a super important release but i'm
thinking about a few users I know that are looking to upgrade from 1.13.0
(which they're currently on) and as of now, I think this will prove a
hurdle and we'll have to use an alternative means (if possible) so they can
"upgrade" their Flink version with their existing jobs.

https://github.com/apache/flink/commit/8327f4486841cd1d6beb05418e6d4206a6f4858b
this
is the particular commit where we've noticed the serial version ID changing.

We'll be experimenting with a savepoint which will hopefully save the day
when the upgrade  happens, but figured I'd raise it here incase anyone sees
it before or knows why said change was made; I'm unsure as to what the
ideal solution is unfortunately - perhaps a more fine-grained compatibility
matrix will have to be devised (so 1.13.0 and 1.13.0 is fine, 1.13.1 and
1.13.2 and 1.13.3 is fine, and then anything onwards should be fine...you
just can't go from 1.13.0 unless you use savepoints?).

Many thanks as always,

Reply via email to