Ah, this might be in code that runs at a different layer from the StateBackend. Can you maybe pinpoint which of your user classes is this anonymous class and where it is used? Maybe by replacing them by non-anonymous classes and checking which replacement fixes the problem.
- Aljoscha On Fri, 1 Jul 2016 at 16:27 Josh <jof...@gmail.com> wrote: > I've just double checked and I do still get the ClassNotFound error for an > anonymous class, on a job which uses the RocksDBStateBackend. > > In case it helps, this was the full stack trace: > > java.lang.RuntimeException: Failed to deserialize state handle and setup > initial operator state. > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:551) > at java.lang.Thread.run(Thread. > > java:745) > Caused by: java.lang.ClassNotFoundException: > com.me.flink.job.MyJob$$anon$1$$anon$4$$anon$3 > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1620) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1521) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1781) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) > at > java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:503) > at > org.apache.flink.api.common.state.StateDescriptor.readObject(StateDescriptor.java:268) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) > at java.util.ArrayList.readObject(ArrayList.java:791) > at sun.reflect.GeneratedMethodAccessor51.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) > at java.util.HashMap.readObject(HashMap.java:1396) > at sun.reflect.GeneratedMethodAccessor27.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1714) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1347) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) > at > org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:58) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > > > On Fri, Jul 1, 2016 at 10:21 AM, Josh <jof...@gmail.com> wrote: > >> Thanks guys, that's very helpful info! >> >> @Aljoscha I thought I saw this exception on a job that was using the >> RocksDB state backend, but I'm not sure. I will do some more tests today to >> double check. If it's still a problem I'll try the explicit class >> definitions solution. >> >> Josh >> >> On Thu, Jun 30, 2016 at 1:27 PM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Also, you're using the FsStateBackend, correct? >>> >>> Reason I'm asking is that the problem should not occur for the RocksDB >>> state backend. There, we don't serialize any user code, only binary data. A >>> while back I wanted to change the FsStateBackend to also work like this. >>> Now might be a good time to actually do this. :-) >>> >>> On Thu, 30 Jun 2016 at 14:10 Till Rohrmann <trohrm...@apache.org> wrote: >>> >>>> Hi Josh, >>>> >>>> you could also try to replace your anonymous classes by explicit class >>>> definitions. This should assign these classes a fixed name independent of >>>> the other anonymous classes. Then the class loader should be able to >>>> deserialize your serialized data. >>>> >>>> Cheers, >>>> Till >>>> >>>> On Thu, Jun 30, 2016 at 1:55 PM, Aljoscha Krettek <aljos...@apache.org> >>>> wrote: >>>> >>>>> Hi Josh, >>>>> I think in your case the problem is that Scala might choose different >>>>> names for synthetic/generated classes. This will trip up the code that is >>>>> trying to restore from a snapshot that was done with an earlier version of >>>>> the code where classes where named differently. >>>>> >>>>> I'm afraid I don't know how to solve this one right now, except by >>>>> switching to Java. >>>>> >>>>> Cheers, >>>>> Aljoscha >>>>> >>>>> On Thu, 30 Jun 2016 at 13:38 Maximilian Michels <m...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi Josh, >>>>>> >>>>>> You have to assign UIDs to all operators to change the topology. Plus, >>>>>> you have to add dummy operators for all UIDs which you removed; this >>>>>> is a limitation currently because Flink will attempt to find all UIDs >>>>>> of the old job. >>>>>> >>>>>> Cheers, >>>>>> Max >>>>>> >>>>>> On Wed, Jun 29, 2016 at 9:00 PM, Josh <jof...@gmail.com> wrote: >>>>>> > Hi all, >>>>>> > Is there any information out there on how to avoid breaking saved >>>>>> > states/savepoints when making changes to a Flink job and >>>>>> redeploying it? >>>>>> > >>>>>> > I want to know how to avoid exceptions like this: >>>>>> > >>>>>> > java.lang.RuntimeException: Failed to deserialize state handle and >>>>>> setup >>>>>> > initial operator state. >>>>>> > at >>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:551) >>>>>> > at java.lang.Thread.run(Thread.java:745) >>>>>> > Caused by: java.lang.ClassNotFoundException: >>>>>> > com.me.flink.MyJob$$anon$1$$anon$7$$anon$4 >>>>>> > >>>>>> > >>>>>> > The best information I could find in the docs is here: >>>>>> > >>>>>> > >>>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html >>>>>> > >>>>>> > >>>>>> > Having made the suggested changes to my job (i.e. giving a uid to >>>>>> every >>>>>> > stateful sink and map function), what changes to the job/topology >>>>>> are then >>>>>> > allowed/not allowed? >>>>>> > >>>>>> > >>>>>> > If I'm 'naming' my states by providing uids, why does Flink need to >>>>>> look for >>>>>> > a specific class, like com.me.flink.MyJob$$anon$1$$anon$7$$anon$4 ? >>>>>> > >>>>>> > >>>>>> > Thanks for any advice, >>>>>> > >>>>>> > Josh >>>>>> >>>>> >>>> >> >