I don't think it was a conscious design decision to not include the application classes in the connection manager serializer. We should fix that. Where is it deserializing data in that thread?
4 might make sense in the long run, but it adds a lot of complexity to the code base (whole separate code base, task queue, blocking/non-blocking logic within task threads) that can be error prone, so I think it is best to stay away from that right now. On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis <graham.den...@gmail.com> wrote: > Hi Spark devs, > > I’ve posted an issue on JIRA ( > https://issues.apache.org/jira/browse/SPARK-2878) which occurs when using > Kryo serialisation with a custom Kryo registrator to register custom > classes with Kryo. This is an insidious issue that non-deterministically > causes Kryo to have different ID number => class name maps on different > nodes, which then causes weird exceptions (ClassCastException, > ClassNotFoundException, ArrayIndexOutOfBoundsException) at deserialisation > time. I’ve created a reliable reproduction for the issue here: > https://github.com/GrahamDennis/spark-kryo-serialisation > > I’m happy to try and put a pull request together to try and address this, > but it’s not obvious to me the right way to solve this and I’d like to get > feedback / ideas on how to address this. > > The root cause of the problem is a "Failed to run spark.kryo.registrator” > error which non-deterministically occurs in some executor processes during > operation. My custom Kryo registrator is in the application jar, and it is > accessible on the worker nodes. This is demonstrated by the fact that most > of the time the custom kryo registrator is successfully run. > > What’s happening is that Kryo serialisation/deserialisation is happening > most of the time on an “Executor task launch worker” thread, which has the > thread's class loader set to contain the application jar. This happens in > `org.apache.spark.executor.Executor.TaskRunner.run`, and from what I can > tell, it is only these threads that have access to the application jar > (that contains the custom Kryo registrator). However, the > ConnectionManager threads sometimes need to serialise/deserialise objects > to satisfy “getBlock” requests when the objects haven’t previously been > serialised. As the ConnectionManager threads don’t have the application > jar available from their class loader, when it tries to look up the custom > Kryo registrator, this fails. Spark then swallows this exception, which > results in a different ID number —> class mapping for this kryo instance, > and this then causes deserialisation errors later on a different node. > > A related issue to the issue reported in SPARK-2878 is that Spark probably > shouldn’t swallow the ClassNotFound exception for custom Kryo registrators. > The user has explicitly specified this class, and if it deterministically > can’t be found, then it may cause problems at serialisation / > deserialisation time. If only sometimes it can’t be found (as in this > case), then it leads to a data corruption issue later on. Either way, > we’re better off dying due to the ClassNotFound exception earlier, than the > weirder errors later on. > > I have some ideas on potential solutions to this issue, but I’m keen for > experienced eyes to critique these approaches: > > 1. The simplest approach to fixing this would be to just make the > application jar available to the connection manager threads, but I’m > guessing it’s a design decision to isolate the application jar to just the > executor task runner threads. Also, I don’t know if there are any other > threads that might be interacting with kryo serialisation / > deserialisation. > 2. Before looking up the custom Kryo registrator, change the thread’s class > loader to include the application jar, then restore the class loader after > the kryo registrator has been run. I don’t know if this would have any > other side-effects. > 3. Always serialise / deserialise on the existing TaskRunner threads, > rather than delaying serialisation until later, when it can be done only if > needed. This approach would probably have negative performance > consequences. > 4. Create a new dedicated thread pool for lazy serialisation / > deserialisation that has the application jar on the class path. > Serialisation / deserialisation would be the only thing these threads do, > and this would minimise conflicts / interactions between the application > jar and other jars. > > #4 sounds like the best approach to me, but I think would require > considerable knowledge of Spark internals, which is beyond me at present. > Does anyone have any better (and ideally simpler) ideas? > > Cheers, > > Graham >