Yep, that is a valid bug! State is apparently not resolved with the correct classloader.
As a workaround, you can checkpoint byte arrays and serialize/deserialize the state into byte arrays yourself. You can use the apache commons SerializationUtil class, or Flinks InstantiationUtil class for that. You can get the ClassLoader for the user code (needed for deserialization) via "getRuntimeContext().getUserCodeClassLoader()". Let us know if that workaround works. We'll try to get a fix for that out very soon! Greetings, Stephan On Tue, Aug 18, 2015 at 12:23 PM, Robert Metzger <rmetz...@apache.org> wrote: > Java's HashMap is serializable. > If it is only the map, you can just use the HashMap<> as the state. > > If you have more data, you can use TupleX, for example: > > Tuple2<HashMap<Integer, String>, Long>(myMap, myLong); > > > On Tue, Aug 18, 2015 at 12:21 PM, Rico Bergmann <i...@ricobergmann.de> > wrote: > >> Hi! >> >> Using TupleX is not possible since the state is very big (a Hashtable). >> >> How would I have to do serialization into a byte array? >> >> Greets. Rico. >> >> >> >> Am 18.08.2015 um 11:44 schrieb Robert Metzger <rmetz...@apache.org>: >> >> Hi Rico, >> >> I'm pretty sure that this is a valid bug you've found, since this case is >> not yet tested (afaik). >> We'll fix the issue asap, until then, are you able to encapsulate your >> state in something that is available in Flink, for example a TupleX or just >> serialize it yourself into a byte[] ? >> >> On Tue, Aug 18, 2015 at 11:32 AM, Rico Bergmann <i...@ricobergmann.de> >> wrote: >> >>> Hi! >>> Is it possible to use your own class? >>> I'm using the file state handler at the Jobmanager and implemented the >>> Checkpointed interface. >>> >>> I tried this and got an exception: >>> >>> Error: java.lang.RuntimeException: Failed to deserialize state handle >>> and setup initial operator state. >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:544) >>> at java.lang.Thread.run(Thread.java:745) >>> Caused by: java.lang.ClassNotFoundException: >>> com.ottogroup.bi.searchlab.searchsessionizer.OperatorState >>> >>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>> at java.lang.Class.forName0(Native Method) >>> at java.lang.Class.forName(Class.java:348) >>> at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626) >>> at >>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) >>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) >>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) >>> at >>> org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:63) >>> at >>> org.apache.flink.runtime.state.ByteStreamStateHandle.getState(ByteStreamStateHandle.java:33) >>> at >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.restoreInitialState(AbstractUdfStreamOperator.java:83) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.setInitialState(StreamTask.java:276) >>> at >>> org.apache.flink.runtime.state.StateUtils.setOperatorState(StateUtils.java:51) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:541) >>> >>> >> >