I now have a complete pull request for this issue that I'd like to get
reviewed and committed.  The PR is available here:
https://github.com/apache/spark/pull/1890 and includes a testcase for the
issue I described.  I've also submitted a related PR (
https://github.com/apache/spark/pull/1827) that causes exceptions raised
while attempting to run the custom kryo registrator not to be swallowed.

Thanks,
Graham


On 12 August 2014 18:44, Graham Dennis <graham.den...@gmail.com> wrote:

> I've submitted a work-in-progress pull request for this issue that I'd
> like feedback on.  See https://github.com/apache/spark/pull/1890 . I've
> also submitted a pull request for the related issue that the exceptions hit
> when trying to use a custom kryo registrator are being swallowed:
> https://github.com/apache/spark/pull/1827
>
> The approach in my pull request is to get the Worker processes to download
> the application jars and add them to the Executor class path at launch
> time. There are a couple of things that still need to be done before this
> can be merged:
> 1. At the moment, the first time a task runs in the executor, the
> application jars are downloaded again.  My solution here would be to make
> the executor not download any jars that already exist.  Previously, the
> driver & executor kept track of the timestamp of jar files and would
> redownload 'updated' jars, however this never made sense as the previous
> version of the updated jar may have already been loaded into the executor,
> so the updated jar may have no effect.  As my current pull request removes
> the timestamp for jars, just checking whether the jar exists will allow us
> to avoid downloading the jars again.
> 2. Tests. :-)
>
> A side-benefit of my pull request is that you will be able to use custom
> serialisers that are distributed in a user jar.  Currently, the serialiser
> instance is created in the Executor process before the first task is
> received and therefore before any user jars are downloaded.  As this PR
> adds user jars to the Executor process at launch time, this won't be an
> issue.
>
>
> On 7 August 2014 12:01, Graham Dennis <graham.den...@gmail.com> wrote:
>
>> See my comment on https://issues.apache.org/jira/browse/SPARK-2878 for
>> the full stacktrace, but it's in the BlockManager/BlockManagerWorker where
>> it's trying to fulfil a "getBlock" request for another node.  The objects
>> that would be in the block haven't yet been serialised, and that then
>> causes the deserialisation to happen on that thread.  See
>> MemoryStore.scala:102.
>>
>>
>> On 7 August 2014 11:53, Reynold Xin <r...@databricks.com> wrote:
>>
>>> I don't think it was a conscious design decision to not include the
>>> application classes in the connection manager serializer. We should fix
>>> that. Where is it deserializing data in that thread?
>>>
>>>  4 might make sense in the long run, but it adds a lot of complexity to
>>> the code base (whole separate code base, task queue, blocking/non-blocking
>>> logic within task threads) that can be error prone, so I think it is best
>>> to stay away from that right now.
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis <graham.den...@gmail.com>
>>> wrote:
>>>
>>>> Hi Spark devs,
>>>>
>>>> I’ve posted an issue on JIRA (
>>>> https://issues.apache.org/jira/browse/SPARK-2878) which occurs when
>>>> using
>>>> Kryo serialisation with a custom Kryo registrator to register custom
>>>> classes with Kryo.  This is an insidious issue that
>>>> non-deterministically
>>>> causes Kryo to have different ID number => class name maps on different
>>>> nodes, which then causes weird exceptions (ClassCastException,
>>>> ClassNotFoundException, ArrayIndexOutOfBoundsException) at
>>>> deserialisation
>>>> time.  I’ve created a reliable reproduction for the issue here:
>>>> https://github.com/GrahamDennis/spark-kryo-serialisation
>>>>
>>>> I’m happy to try and put a pull request together to try and address
>>>> this,
>>>> but it’s not obvious to me the right way to solve this and I’d like to
>>>> get
>>>> feedback / ideas on how to address this.
>>>>
>>>> The root cause of the problem is a "Failed to run
>>>> spark.kryo.registrator”
>>>> error which non-deterministically occurs in some executor processes
>>>> during
>>>> operation.  My custom Kryo registrator is in the application jar, and
>>>> it is
>>>> accessible on the worker nodes.  This is demonstrated by the fact that
>>>> most
>>>> of the time the custom kryo registrator is successfully run.
>>>>
>>>> What’s happening is that Kryo serialisation/deserialisation is happening
>>>> most of the time on an “Executor task launch worker” thread, which has
>>>> the
>>>> thread's class loader set to contain the application jar.  This happens
>>>> in
>>>> `org.apache.spark.executor.Executor.TaskRunner.run`, and from what I can
>>>> tell, it is only these threads that have access to the application jar
>>>> (that contains the custom Kryo registrator).  However, the
>>>> ConnectionManager threads sometimes need to serialise/deserialise
>>>> objects
>>>> to satisfy “getBlock” requests when the objects haven’t previously been
>>>> serialised.  As the ConnectionManager threads don’t have the application
>>>> jar available from their class loader, when it tries to look up the
>>>> custom
>>>> Kryo registrator, this fails.  Spark then swallows this exception, which
>>>> results in a different ID number —> class mapping for this kryo
>>>> instance,
>>>> and this then causes deserialisation errors later on a different node.
>>>>
>>>> A related issue to the issue reported in SPARK-2878 is that Spark
>>>> probably
>>>> shouldn’t swallow the ClassNotFound exception for custom Kryo
>>>> registrators.
>>>>  The user has explicitly specified this class, and if it
>>>> deterministically
>>>> can’t be found, then it may cause problems at serialisation /
>>>> deserialisation time.  If only sometimes it can’t be found (as in this
>>>> case), then it leads to a data corruption issue later on.  Either way,
>>>> we’re better off dying due to the ClassNotFound exception earlier, than
>>>> the
>>>> weirder errors later on.
>>>>
>>>> I have some ideas on potential solutions to this issue, but I’m keen for
>>>> experienced eyes to critique these approaches:
>>>>
>>>> 1. The simplest approach to fixing this would be to just make the
>>>> application jar available to the connection manager threads, but I’m
>>>> guessing it’s a design decision to isolate the application jar to just
>>>> the
>>>> executor task runner threads.  Also, I don’t know if there are any other
>>>> threads that might be interacting with kryo serialisation /
>>>> deserialisation.
>>>> 2. Before looking up the custom Kryo registrator, change the thread’s
>>>> class
>>>> loader to include the application jar, then restore the class loader
>>>> after
>>>> the kryo registrator has been run.  I don’t know if this would have any
>>>> other side-effects.
>>>> 3. Always serialise / deserialise on the existing TaskRunner threads,
>>>> rather than delaying serialisation until later, when it can be done
>>>> only if
>>>> needed.  This approach would probably have negative performance
>>>> consequences.
>>>> 4. Create a new dedicated thread pool for lazy serialisation /
>>>> deserialisation that has the application jar on the class path.
>>>>  Serialisation / deserialisation would be the only thing these threads
>>>> do,
>>>> and this would minimise conflicts / interactions between the application
>>>> jar and other jars.
>>>>
>>>> #4 sounds like the best approach to me, but I think would require
>>>> considerable knowledge of Spark internals, which is beyond me at
>>>> present.
>>>>  Does anyone have any better (and ideally simpler) ideas?
>>>>
>>>> Cheers,
>>>>
>>>> Graham
>>>>
>>>
>>>
>>
>

Reply via email to