See my comment on https://issues.apache.org/jira/browse/SPARK-2878 for the full stacktrace, but it's in the BlockManager/BlockManagerWorker where it's trying to fulfil a "getBlock" request for another node. The objects that would be in the block haven't yet been serialised, and that then causes the deserialisation to happen on that thread. See MemoryStore.scala:102.
On 7 August 2014 11:53, Reynold Xin <r...@databricks.com> wrote: > I don't think it was a conscious design decision to not include the > application classes in the connection manager serializer. We should fix > that. Where is it deserializing data in that thread? > > 4 might make sense in the long run, but it adds a lot of complexity to the > code base (whole separate code base, task queue, blocking/non-blocking > logic within task threads) that can be error prone, so I think it is best > to stay away from that right now. > > > > > > On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis <graham.den...@gmail.com> > wrote: > >> Hi Spark devs, >> >> I’ve posted an issue on JIRA ( >> https://issues.apache.org/jira/browse/SPARK-2878) which occurs when using >> Kryo serialisation with a custom Kryo registrator to register custom >> classes with Kryo. This is an insidious issue that non-deterministically >> causes Kryo to have different ID number => class name maps on different >> nodes, which then causes weird exceptions (ClassCastException, >> ClassNotFoundException, ArrayIndexOutOfBoundsException) at deserialisation >> time. I’ve created a reliable reproduction for the issue here: >> https://github.com/GrahamDennis/spark-kryo-serialisation >> >> I’m happy to try and put a pull request together to try and address this, >> but it’s not obvious to me the right way to solve this and I’d like to get >> feedback / ideas on how to address this. >> >> The root cause of the problem is a "Failed to run spark.kryo.registrator” >> error which non-deterministically occurs in some executor processes during >> operation. My custom Kryo registrator is in the application jar, and it >> is >> accessible on the worker nodes. This is demonstrated by the fact that >> most >> of the time the custom kryo registrator is successfully run. >> >> What’s happening is that Kryo serialisation/deserialisation is happening >> most of the time on an “Executor task launch worker” thread, which has the >> thread's class loader set to contain the application jar. This happens in >> `org.apache.spark.executor.Executor.TaskRunner.run`, and from what I can >> tell, it is only these threads that have access to the application jar >> (that contains the custom Kryo registrator). However, the >> ConnectionManager threads sometimes need to serialise/deserialise objects >> to satisfy “getBlock” requests when the objects haven’t previously been >> serialised. As the ConnectionManager threads don’t have the application >> jar available from their class loader, when it tries to look up the custom >> Kryo registrator, this fails. Spark then swallows this exception, which >> results in a different ID number —> class mapping for this kryo instance, >> and this then causes deserialisation errors later on a different node. >> >> A related issue to the issue reported in SPARK-2878 is that Spark probably >> shouldn’t swallow the ClassNotFound exception for custom Kryo >> registrators. >> The user has explicitly specified this class, and if it deterministically >> can’t be found, then it may cause problems at serialisation / >> deserialisation time. If only sometimes it can’t be found (as in this >> case), then it leads to a data corruption issue later on. Either way, >> we’re better off dying due to the ClassNotFound exception earlier, than >> the >> weirder errors later on. >> >> I have some ideas on potential solutions to this issue, but I’m keen for >> experienced eyes to critique these approaches: >> >> 1. The simplest approach to fixing this would be to just make the >> application jar available to the connection manager threads, but I’m >> guessing it’s a design decision to isolate the application jar to just the >> executor task runner threads. Also, I don’t know if there are any other >> threads that might be interacting with kryo serialisation / >> deserialisation. >> 2. Before looking up the custom Kryo registrator, change the thread’s >> class >> loader to include the application jar, then restore the class loader after >> the kryo registrator has been run. I don’t know if this would have any >> other side-effects. >> 3. Always serialise / deserialise on the existing TaskRunner threads, >> rather than delaying serialisation until later, when it can be done only >> if >> needed. This approach would probably have negative performance >> consequences. >> 4. Create a new dedicated thread pool for lazy serialisation / >> deserialisation that has the application jar on the class path. >> Serialisation / deserialisation would be the only thing these threads do, >> and this would minimise conflicts / interactions between the application >> jar and other jars. >> >> #4 sounds like the best approach to me, but I think would require >> considerable knowledge of Spark internals, which is beyond me at present. >> Does anyone have any better (and ideally simpler) ideas? >> >> Cheers, >> >> Graham >> > >