I don't think it was a conscious design decision to not include the
application classes in the connection manager serializer. We should fix
that. Where is it deserializing data in that thread?

4 might make sense in the long run, but it adds a lot of complexity to the
code base (whole separate code base, task queue, blocking/non-blocking
logic within task threads) that can be error prone, so I think it is best
to stay away from that right now.





On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis <graham.den...@gmail.com>
wrote:

> Hi Spark devs,
>
> I’ve posted an issue on JIRA (
> https://issues.apache.org/jira/browse/SPARK-2878) which occurs when using
> Kryo serialisation with a custom Kryo registrator to register custom
> classes with Kryo.  This is an insidious issue that non-deterministically
> causes Kryo to have different ID number => class name maps on different
> nodes, which then causes weird exceptions (ClassCastException,
> ClassNotFoundException, ArrayIndexOutOfBoundsException) at deserialisation
> time.  I’ve created a reliable reproduction for the issue here:
> https://github.com/GrahamDennis/spark-kryo-serialisation
>
> I’m happy to try and put a pull request together to try and address this,
> but it’s not obvious to me the right way to solve this and I’d like to get
> feedback / ideas on how to address this.
>
> The root cause of the problem is a "Failed to run spark.kryo.registrator”
> error which non-deterministically occurs in some executor processes during
> operation.  My custom Kryo registrator is in the application jar, and it is
> accessible on the worker nodes.  This is demonstrated by the fact that most
> of the time the custom kryo registrator is successfully run.
>
> What’s happening is that Kryo serialisation/deserialisation is happening
> most of the time on an “Executor task launch worker” thread, which has the
> thread's class loader set to contain the application jar.  This happens in
> `org.apache.spark.executor.Executor.TaskRunner.run`, and from what I can
> tell, it is only these threads that have access to the application jar
> (that contains the custom Kryo registrator).  However, the
> ConnectionManager threads sometimes need to serialise/deserialise objects
> to satisfy “getBlock” requests when the objects haven’t previously been
> serialised.  As the ConnectionManager threads don’t have the application
> jar available from their class loader, when it tries to look up the custom
> Kryo registrator, this fails.  Spark then swallows this exception, which
> results in a different ID number —> class mapping for this kryo instance,
> and this then causes deserialisation errors later on a different node.
>
> A related issue to the issue reported in SPARK-2878 is that Spark probably
> shouldn’t swallow the ClassNotFound exception for custom Kryo registrators.
>  The user has explicitly specified this class, and if it deterministically
> can’t be found, then it may cause problems at serialisation /
> deserialisation time.  If only sometimes it can’t be found (as in this
> case), then it leads to a data corruption issue later on.  Either way,
> we’re better off dying due to the ClassNotFound exception earlier, than the
> weirder errors later on.
>
> I have some ideas on potential solutions to this issue, but I’m keen for
> experienced eyes to critique these approaches:
>
> 1. The simplest approach to fixing this would be to just make the
> application jar available to the connection manager threads, but I’m
> guessing it’s a design decision to isolate the application jar to just the
> executor task runner threads.  Also, I don’t know if there are any other
> threads that might be interacting with kryo serialisation /
> deserialisation.
> 2. Before looking up the custom Kryo registrator, change the thread’s class
> loader to include the application jar, then restore the class loader after
> the kryo registrator has been run.  I don’t know if this would have any
> other side-effects.
> 3. Always serialise / deserialise on the existing TaskRunner threads,
> rather than delaying serialisation until later, when it can be done only if
> needed.  This approach would probably have negative performance
> consequences.
> 4. Create a new dedicated thread pool for lazy serialisation /
> deserialisation that has the application jar on the class path.
>  Serialisation / deserialisation would be the only thing these threads do,
> and this would minimise conflicts / interactions between the application
> jar and other jars.
>
> #4 sounds like the best approach to me, but I think would require
> considerable knowledge of Spark internals, which is beyond me at present.
>  Does anyone have any better (and ideally simpler) ideas?
>
> Cheers,
>
> Graham
>

Reply via email to