I now have a complete pull request for this issue that I'd like to get reviewed and committed. The PR is available here: https://github.com/apache/spark/pull/1890 and includes a testcase for the issue I described. I've also submitted a related PR ( https://github.com/apache/spark/pull/1827) that causes exceptions raised while attempting to run the custom kryo registrator not to be swallowed.
Thanks, Graham On 12 August 2014 18:44, Graham Dennis <graham.den...@gmail.com> wrote: > I've submitted a work-in-progress pull request for this issue that I'd > like feedback on. See https://github.com/apache/spark/pull/1890 . I've > also submitted a pull request for the related issue that the exceptions hit > when trying to use a custom kryo registrator are being swallowed: > https://github.com/apache/spark/pull/1827 > > The approach in my pull request is to get the Worker processes to download > the application jars and add them to the Executor class path at launch > time. There are a couple of things that still need to be done before this > can be merged: > 1. At the moment, the first time a task runs in the executor, the > application jars are downloaded again. My solution here would be to make > the executor not download any jars that already exist. Previously, the > driver & executor kept track of the timestamp of jar files and would > redownload 'updated' jars, however this never made sense as the previous > version of the updated jar may have already been loaded into the executor, > so the updated jar may have no effect. As my current pull request removes > the timestamp for jars, just checking whether the jar exists will allow us > to avoid downloading the jars again. > 2. Tests. :-) > > A side-benefit of my pull request is that you will be able to use custom > serialisers that are distributed in a user jar. Currently, the serialiser > instance is created in the Executor process before the first task is > received and therefore before any user jars are downloaded. As this PR > adds user jars to the Executor process at launch time, this won't be an > issue. > > > On 7 August 2014 12:01, Graham Dennis <graham.den...@gmail.com> wrote: > >> See my comment on https://issues.apache.org/jira/browse/SPARK-2878 for >> the full stacktrace, but it's in the BlockManager/BlockManagerWorker where >> it's trying to fulfil a "getBlock" request for another node. The objects >> that would be in the block haven't yet been serialised, and that then >> causes the deserialisation to happen on that thread. See >> MemoryStore.scala:102. >> >> >> On 7 August 2014 11:53, Reynold Xin <r...@databricks.com> wrote: >> >>> I don't think it was a conscious design decision to not include the >>> application classes in the connection manager serializer. We should fix >>> that. Where is it deserializing data in that thread? >>> >>> 4 might make sense in the long run, but it adds a lot of complexity to >>> the code base (whole separate code base, task queue, blocking/non-blocking >>> logic within task threads) that can be error prone, so I think it is best >>> to stay away from that right now. >>> >>> >>> >>> >>> >>> On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis <graham.den...@gmail.com> >>> wrote: >>> >>>> Hi Spark devs, >>>> >>>> I’ve posted an issue on JIRA ( >>>> https://issues.apache.org/jira/browse/SPARK-2878) which occurs when >>>> using >>>> Kryo serialisation with a custom Kryo registrator to register custom >>>> classes with Kryo. This is an insidious issue that >>>> non-deterministically >>>> causes Kryo to have different ID number => class name maps on different >>>> nodes, which then causes weird exceptions (ClassCastException, >>>> ClassNotFoundException, ArrayIndexOutOfBoundsException) at >>>> deserialisation >>>> time. I’ve created a reliable reproduction for the issue here: >>>> https://github.com/GrahamDennis/spark-kryo-serialisation >>>> >>>> I’m happy to try and put a pull request together to try and address >>>> this, >>>> but it’s not obvious to me the right way to solve this and I’d like to >>>> get >>>> feedback / ideas on how to address this. >>>> >>>> The root cause of the problem is a "Failed to run >>>> spark.kryo.registrator” >>>> error which non-deterministically occurs in some executor processes >>>> during >>>> operation. My custom Kryo registrator is in the application jar, and >>>> it is >>>> accessible on the worker nodes. This is demonstrated by the fact that >>>> most >>>> of the time the custom kryo registrator is successfully run. >>>> >>>> What’s happening is that Kryo serialisation/deserialisation is happening >>>> most of the time on an “Executor task launch worker” thread, which has >>>> the >>>> thread's class loader set to contain the application jar. This happens >>>> in >>>> `org.apache.spark.executor.Executor.TaskRunner.run`, and from what I can >>>> tell, it is only these threads that have access to the application jar >>>> (that contains the custom Kryo registrator). However, the >>>> ConnectionManager threads sometimes need to serialise/deserialise >>>> objects >>>> to satisfy “getBlock” requests when the objects haven’t previously been >>>> serialised. As the ConnectionManager threads don’t have the application >>>> jar available from their class loader, when it tries to look up the >>>> custom >>>> Kryo registrator, this fails. Spark then swallows this exception, which >>>> results in a different ID number —> class mapping for this kryo >>>> instance, >>>> and this then causes deserialisation errors later on a different node. >>>> >>>> A related issue to the issue reported in SPARK-2878 is that Spark >>>> probably >>>> shouldn’t swallow the ClassNotFound exception for custom Kryo >>>> registrators. >>>> The user has explicitly specified this class, and if it >>>> deterministically >>>> can’t be found, then it may cause problems at serialisation / >>>> deserialisation time. If only sometimes it can’t be found (as in this >>>> case), then it leads to a data corruption issue later on. Either way, >>>> we’re better off dying due to the ClassNotFound exception earlier, than >>>> the >>>> weirder errors later on. >>>> >>>> I have some ideas on potential solutions to this issue, but I’m keen for >>>> experienced eyes to critique these approaches: >>>> >>>> 1. The simplest approach to fixing this would be to just make the >>>> application jar available to the connection manager threads, but I’m >>>> guessing it’s a design decision to isolate the application jar to just >>>> the >>>> executor task runner threads. Also, I don’t know if there are any other >>>> threads that might be interacting with kryo serialisation / >>>> deserialisation. >>>> 2. Before looking up the custom Kryo registrator, change the thread’s >>>> class >>>> loader to include the application jar, then restore the class loader >>>> after >>>> the kryo registrator has been run. I don’t know if this would have any >>>> other side-effects. >>>> 3. Always serialise / deserialise on the existing TaskRunner threads, >>>> rather than delaying serialisation until later, when it can be done >>>> only if >>>> needed. This approach would probably have negative performance >>>> consequences. >>>> 4. Create a new dedicated thread pool for lazy serialisation / >>>> deserialisation that has the application jar on the class path. >>>> Serialisation / deserialisation would be the only thing these threads >>>> do, >>>> and this would minimise conflicts / interactions between the application >>>> jar and other jars. >>>> >>>> #4 sounds like the best approach to me, but I think would require >>>> considerable knowledge of Spark internals, which is beyond me at >>>> present. >>>> Does anyone have any better (and ideally simpler) ideas? >>>> >>>> Cheers, >>>> >>>> Graham >>>> >>> >>> >> >