Hi Spark devs,

I’ve posted an issue on JIRA (
https://issues.apache.org/jira/browse/SPARK-2878) which occurs when using
Kryo serialisation with a custom Kryo registrator to register custom
classes with Kryo.  This is an insidious issue that non-deterministically
causes Kryo to have different ID number => class name maps on different
nodes, which then causes weird exceptions (ClassCastException,
ClassNotFoundException, ArrayIndexOutOfBoundsException) at deserialisation
time.  I’ve created a reliable reproduction for the issue here:
https://github.com/GrahamDennis/spark-kryo-serialisation

I’m happy to try and put a pull request together to try and address this,
but it’s not obvious to me the right way to solve this and I’d like to get
feedback / ideas on how to address this.

The root cause of the problem is a "Failed to run spark.kryo.registrator”
error which non-deterministically occurs in some executor processes during
operation.  My custom Kryo registrator is in the application jar, and it is
accessible on the worker nodes.  This is demonstrated by the fact that most
of the time the custom kryo registrator is successfully run.

What’s happening is that Kryo serialisation/deserialisation is happening
most of the time on an “Executor task launch worker” thread, which has the
thread's class loader set to contain the application jar.  This happens in
`org.apache.spark.executor.Executor.TaskRunner.run`, and from what I can
tell, it is only these threads that have access to the application jar
(that contains the custom Kryo registrator).  However, the
ConnectionManager threads sometimes need to serialise/deserialise objects
to satisfy “getBlock” requests when the objects haven’t previously been
serialised.  As the ConnectionManager threads don’t have the application
jar available from their class loader, when it tries to look up the custom
Kryo registrator, this fails.  Spark then swallows this exception, which
results in a different ID number —> class mapping for this kryo instance,
and this then causes deserialisation errors later on a different node.

A related issue to the issue reported in SPARK-2878 is that Spark probably
shouldn’t swallow the ClassNotFound exception for custom Kryo registrators.
 The user has explicitly specified this class, and if it deterministically
can’t be found, then it may cause problems at serialisation /
deserialisation time.  If only sometimes it can’t be found (as in this
case), then it leads to a data corruption issue later on.  Either way,
we’re better off dying due to the ClassNotFound exception earlier, than the
weirder errors later on.

I have some ideas on potential solutions to this issue, but I’m keen for
experienced eyes to critique these approaches:

1. The simplest approach to fixing this would be to just make the
application jar available to the connection manager threads, but I’m
guessing it’s a design decision to isolate the application jar to just the
executor task runner threads.  Also, I don’t know if there are any other
threads that might be interacting with kryo serialisation / deserialisation.
2. Before looking up the custom Kryo registrator, change the thread’s class
loader to include the application jar, then restore the class loader after
the kryo registrator has been run.  I don’t know if this would have any
other side-effects.
3. Always serialise / deserialise on the existing TaskRunner threads,
rather than delaying serialisation until later, when it can be done only if
needed.  This approach would probably have negative performance
consequences.
4. Create a new dedicated thread pool for lazy serialisation /
deserialisation that has the application jar on the class path.
 Serialisation / deserialisation would be the only thing these threads do,
and this would minimise conflicts / interactions between the application
jar and other jars.

#4 sounds like the best approach to me, but I think would require
considerable knowledge of Spark internals, which is beyond me at present.
 Does anyone have any better (and ideally simpler) ideas?

Cheers,

Graham

Reply via email to