Preferred Executor launch path?

2014-08-27 Thread Graham Dennis
Hi all,

In the process of trying to resolve SPARK-3166 (inability to ship custom
serialisers in application jars)
https://issues.apache.org/jira/browse/SPARK-3166 I've discovered that
there's a bit of duplicated code for building the command for launching
Executors across SparkDeploySchedulerBackend.scala,
MesosSchedulerBackend.scala, and CoarseMesosSchedulerBackend.scala

Importantly, there is a slight difference in their behaviour where
SparkDeploySchedulerBackend doesn't launch the Executor with the
spark-class script, but instead tries to do something similar in
CommandUtils.scala.  MesosSchedulerBackend.scala and
CoarseMesosSchedulerBackend.scala both use the spark-class script.  Is the
latter the preferred approach?  So should I refactor all of these to use
spark-class, or is there a reason for the differing behaviour?

Secondly, the goal of SPARK-3166 is to have the user jar available to the
executor process at launch time (rather than when the first task is
received).  I'd like to get some feedback on what the preferred classpath
order should be.  The items to be ordered to determine the classpath are:

* The output of the compute-classpath script
* The config option spark.executor.extraClassPath
* The application jar (and anything added via SparkContext.addJar)

Complicating the matter is that the 'deploy' backend currently supports the
"spark.files.userClassPathFirst" option, but this is not supported by the
Mesos backends (and I don't think it's supported by the YARN backend).

Ignoring the "userClassPathFirst" option, the current behaviour for the
classpath is effectively:
1. The output of compute-classpath
2. The config option spark.executor.extraClassPath
3. The application jar (and anything added via SparkContext.addJar).

What should the preferred order be if userClassPathFirst is true?
 Currently the behaviour for the Deploy backend is effectively:
1. The application jar (and anything added via SparkContext.addJar)
2. The output of compute-classpath
3. The config option spark.executor.extraClassPath

To me it makes more sense for this to be in the order (application jar;
spark.executor.extraClassPath; compute-classpath).  Agree? Disagree?

Thanks,
Graham


Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing

2014-08-25 Thread Graham Dennis
Hi,

Unless you manually patched Spark, if you have Reynold’s patch for SPARK-2878, 
you also have the patch for SPARK-2893 which makes the underlying cause much 
more obvious and explicit.  So the below is unlikely to be related to 
SPARK-2878.

Graham

On 26 Aug 2014, at 4:13 am, npanj  wrote:

> I am running the code with @rxin's patch in standalone mode.  In my case I am
> registering "org.apache.spark.graphx.GraphKryoRegistrator" . 
> 
> Recently I started to see "com.esotericsoftware.kryo.KryoException:
> java.io.IOException: failed to uncompress the chunk: PARSING_ERROR" . Has
> anyone seen this? Could it be related to this issue? > Here it trace: 
> --
> vids (org.apache.spark.graphx.impl.VertexAttributeBlock)
>com.esotericsoftware.kryo.io.Input.fill(Input.java:142)
>com.esotericsoftware.kryo.io.Input.require(Input.java:169)
>com.esotericsoftware.kryo.io.Input.readLong_slow(Input.java:710)
>com.esotericsoftware.kryo.io.Input.readLong(Input.java:665)
> 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$LongArraySerializer.read(DefaultArraySerializers.java:127)
> 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$LongArraySerializer.read(DefaultArraySerializers.java:107)
>com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:699)
> 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:611)
> 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
>com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
>com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
> 
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)
> 
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> 
> org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext(BlockManager.scala:1054)
>scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
> 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>scala.collection.Iterator$class.foreach(Iterator.scala:727)
>scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> 
> org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192)
> 
> org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78)
> 
> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
> 
> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
>scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> 
> org.apache.spark.graphx.EdgeRDD$$anonfun$mapEdgePartitions$1.apply(EdgeRDD.scala:87)
> 
> org.apache.spark.graphx.EdgeRDD$$anonfun$mapEdgePartitions$1.apply(EdgeRDD.scala:85)
>org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
>org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
> 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>org.apache.spark.scheduler.Task.run(Task.scala:54)
> 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:202)
> 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 
> --
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/SPARK-2878-Kryo-serialisation-with-custom-Kryo-registrator-failing-tp7719p7989.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: dev-unsubscr...@

Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing

2014-08-14 Thread Graham Dennis
That should work, but would you also make these changes to the
JavaSerializer?  The API of these is the same so that you can select one or
the other (or in theory a custom serializer)?  This also wouldn't address
the problem of shipping custom *serializers* (not kryo registrators) in
user jars.

On 14 August 2014 19:23, Reynold Xin  wrote:

> Graham,
>
> SparkEnv only creates a KryoSerializer, but as I understand that
> serializer doesn't actually initializes the registrator since that is only
> called when newKryo() is called when KryoSerializerInstance is initialized.
>
> Basically I'm thinking a quick fix for 1.2:
>
> 1. Add a classLoader field to KryoSerializer; initialize new
> KryoSerializerInstance with that class loader
>
>  2. Set that classLoader to the executor's class loader when Executor is
> initialized.
>
> Then all deser calls should be using the executor's class loader.
>
>
>
>
> On Thu, Aug 14, 2014 at 12:53 AM, Graham Dennis 
> wrote:
>
>> Hi Reynold,
>>
>> That would solve this specific issue, but you'd need to be careful that
>> you never created a serialiser instance before the first task is received.
>>  Currently in Executor.TaskRunner.run a closure serialiser instance is
>> created before any application jars are downloaded, but that could be
>> moved.  To me, this seems a little fragile.
>>
>> However there is a related issue where you can't ship a custom serialiser
>> in an application jar because the serialiser is instantiated when the
>> SparkEnv object is created, which is before any tasks are received by the
>> executor.  The above approach wouldn't help with this problem.
>>  Additionally, the YARN scheduler currently uses this approach of adding
>> the application jar to the Executor classpath, so it would make things a
>> bit more uniform.
>>
>> Cheers,
>> Graham
>>
>>
>> On 14 August 2014 17:37, Reynold Xin  wrote:
>>
>>> Graham,
>>>
>>> Thanks for working on this. This is an important bug to fix.
>>>
>>>  I don't have the whole context and obviously I haven't spent nearly as
>>> much time on this as you have, but I'm wondering what if we always pass the
>>> executor's ClassLoader to the Kryo serializer? Will that solve this problem?
>>>
>>>
>>>
>>>
>>> On Wed, Aug 13, 2014 at 11:59 PM, Graham Dennis >> > wrote:
>>>
>>>> Hi Deb,
>>>>
>>>> The only alternative serialiser is the JavaSerialiser (the default).
>>>>  Theoretically Spark supports custom serialisers, but due to a related
>>>> issue, custom serialisers currently can't live in application jars and must
>>>> be available to all executors at launch.  My PR fixes this issue as well,
>>>> allowing custom serialisers to be shipped in application jars.
>>>>
>>>> Graham
>>>>
>>>>
>>>> On 14 August 2014 16:56, Debasish Das  wrote:
>>>>
>>>>> Sorry I just saw Graham's email after sending my previous email about
>>>>> this bug...
>>>>>
>>>>> I have been seeing this same issue on our ALS runs last week but I
>>>>> thought it was due my hacky way to run mllib 1.1 snapshot on core 1.0...
>>>>>
>>>>> What's the status of this PR ? Will this fix be back-ported to 1.0.1
>>>>> as we are running 1.0.1 stable standalone cluster ?
>>>>>
>>>>> Till the PR merges does it make sense to not use Kryo ? What are the
>>>>> other recommended efficient serializers ?
>>>>>
>>>>> Thanks.
>>>>> Deb
>>>>>
>>>>>
>>>>> On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis <
>>>>> graham.den...@gmail.com> wrote:
>>>>>
>>>>>> I now have a complete pull request for this issue that I'd like to get
>>>>>> reviewed and committed.  The PR is available here:
>>>>>> https://github.com/apache/spark/pull/1890 and includes a testcase
>>>>>> for the
>>>>>> issue I described.  I've also submitted a related PR (
>>>>>> https://github.com/apache/spark/pull/1827) that causes exceptions
>>>>>> raised
>>>>>> while attempting to run the custom kryo registrator not to be
>>>>>> swallowed.
>>>>>>
>>>>>> Thanks,
>>>>>> Grah

Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing

2014-08-14 Thread Graham Dennis
In part, my assertion was based on a comment by sryza on my PR (
https://github.com/apache/spark/pull/1890#issuecomment-51805750), however I
thought I had also seen it in the YARN code base.  However, now that I look
for it, I can't find where this happens, so perhaps I was imagining the
YARN behaviour.


On 14 August 2014 17:57, Debasish Das  wrote:

> By the way I have seen this same problem while deploying 1.1.0-SNAPSHOT on
> YARN as well...
>
> So it is a common problem in both standalone and YARN mode deployment...
>
>
> On Thu, Aug 14, 2014 at 12:53 AM, Graham Dennis 
> wrote:
>
>> Hi Reynold,
>>
>> That would solve this specific issue, but you'd need to be careful that
>> you never created a serialiser instance before the first task is received.
>>  Currently in Executor.TaskRunner.run a closure serialiser instance is
>> created before any application jars are downloaded, but that could be
>> moved.  To me, this seems a little fragile.
>>
>> However there is a related issue where you can't ship a custom serialiser
>> in an application jar because the serialiser is instantiated when the
>> SparkEnv object is created, which is before any tasks are received by the
>> executor.  The above approach wouldn't help with this problem.
>>  Additionally, the YARN scheduler currently uses this approach of adding
>> the application jar to the Executor classpath, so it would make things a
>> bit more uniform.
>>
>> Cheers,
>> Graham
>>
>>
>> On 14 August 2014 17:37, Reynold Xin  wrote:
>>
>>> Graham,
>>>
>>> Thanks for working on this. This is an important bug to fix.
>>>
>>>  I don't have the whole context and obviously I haven't spent nearly as
>>> much time on this as you have, but I'm wondering what if we always pass the
>>> executor's ClassLoader to the Kryo serializer? Will that solve this problem?
>>>
>>>
>>>
>>>
>>> On Wed, Aug 13, 2014 at 11:59 PM, Graham Dennis >> > wrote:
>>>
>>>> Hi Deb,
>>>>
>>>> The only alternative serialiser is the JavaSerialiser (the default).
>>>>  Theoretically Spark supports custom serialisers, but due to a related
>>>> issue, custom serialisers currently can't live in application jars and must
>>>> be available to all executors at launch.  My PR fixes this issue as well,
>>>> allowing custom serialisers to be shipped in application jars.
>>>>
>>>> Graham
>>>>
>>>>
>>>> On 14 August 2014 16:56, Debasish Das  wrote:
>>>>
>>>>> Sorry I just saw Graham's email after sending my previous email about
>>>>> this bug...
>>>>>
>>>>> I have been seeing this same issue on our ALS runs last week but I
>>>>> thought it was due my hacky way to run mllib 1.1 snapshot on core 1.0...
>>>>>
>>>>> What's the status of this PR ? Will this fix be back-ported to 1.0.1
>>>>> as we are running 1.0.1 stable standalone cluster ?
>>>>>
>>>>> Till the PR merges does it make sense to not use Kryo ? What are the
>>>>> other recommended efficient serializers ?
>>>>>
>>>>> Thanks.
>>>>> Deb
>>>>>
>>>>>
>>>>> On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis <
>>>>> graham.den...@gmail.com> wrote:
>>>>>
>>>>>> I now have a complete pull request for this issue that I'd like to get
>>>>>> reviewed and committed.  The PR is available here:
>>>>>> https://github.com/apache/spark/pull/1890 and includes a testcase
>>>>>> for the
>>>>>> issue I described.  I've also submitted a related PR (
>>>>>> https://github.com/apache/spark/pull/1827) that causes exceptions
>>>>>> raised
>>>>>> while attempting to run the custom kryo registrator not to be
>>>>>> swallowed.
>>>>>>
>>>>>> Thanks,
>>>>>> Graham
>>>>>>
>>>>>>
>>>>>> On 12 August 2014 18:44, Graham Dennis 
>>>>>> wrote:
>>>>>>
>>>>>> > I've submitted a work-in-progress pull request for this issue that
>>>>>> I'd
>>>>>> > like feedback on.  See https://github.com/apache/spark/pull/1890 .
>>>>>> I&#x

Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing

2014-08-14 Thread Graham Dennis
Hi Reynold,

That would solve this specific issue, but you'd need to be careful that you
never created a serialiser instance before the first task is received.
 Currently in Executor.TaskRunner.run a closure serialiser instance is
created before any application jars are downloaded, but that could be
moved.  To me, this seems a little fragile.

However there is a related issue where you can't ship a custom serialiser
in an application jar because the serialiser is instantiated when the
SparkEnv object is created, which is before any tasks are received by the
executor.  The above approach wouldn't help with this problem.
 Additionally, the YARN scheduler currently uses this approach of adding
the application jar to the Executor classpath, so it would make things a
bit more uniform.

Cheers,
Graham


On 14 August 2014 17:37, Reynold Xin  wrote:

> Graham,
>
> Thanks for working on this. This is an important bug to fix.
>
> I don't have the whole context and obviously I haven't spent nearly as
> much time on this as you have, but I'm wondering what if we always pass the
> executor's ClassLoader to the Kryo serializer? Will that solve this problem?
>
>
>
>
> On Wed, Aug 13, 2014 at 11:59 PM, Graham Dennis 
> wrote:
>
>> Hi Deb,
>>
>> The only alternative serialiser is the JavaSerialiser (the default).
>>  Theoretically Spark supports custom serialisers, but due to a related
>> issue, custom serialisers currently can't live in application jars and must
>> be available to all executors at launch.  My PR fixes this issue as well,
>> allowing custom serialisers to be shipped in application jars.
>>
>> Graham
>>
>>
>> On 14 August 2014 16:56, Debasish Das  wrote:
>>
>>> Sorry I just saw Graham's email after sending my previous email about
>>> this bug...
>>>
>>> I have been seeing this same issue on our ALS runs last week but I
>>> thought it was due my hacky way to run mllib 1.1 snapshot on core 1.0...
>>>
>>> What's the status of this PR ? Will this fix be back-ported to 1.0.1 as
>>> we are running 1.0.1 stable standalone cluster ?
>>>
>>> Till the PR merges does it make sense to not use Kryo ? What are the
>>> other recommended efficient serializers ?
>>>
>>> Thanks.
>>> Deb
>>>
>>>
>>> On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis 
>>> wrote:
>>>
>>>> I now have a complete pull request for this issue that I'd like to get
>>>> reviewed and committed.  The PR is available here:
>>>> https://github.com/apache/spark/pull/1890 and includes a testcase for
>>>> the
>>>> issue I described.  I've also submitted a related PR (
>>>> https://github.com/apache/spark/pull/1827) that causes exceptions
>>>> raised
>>>> while attempting to run the custom kryo registrator not to be swallowed.
>>>>
>>>> Thanks,
>>>> Graham
>>>>
>>>>
>>>> On 12 August 2014 18:44, Graham Dennis  wrote:
>>>>
>>>> > I've submitted a work-in-progress pull request for this issue that I'd
>>>> > like feedback on.  See https://github.com/apache/spark/pull/1890 .
>>>> I've
>>>> > also submitted a pull request for the related issue that the
>>>> exceptions hit
>>>> > when trying to use a custom kryo registrator are being swallowed:
>>>> > https://github.com/apache/spark/pull/1827
>>>> >
>>>> > The approach in my pull request is to get the Worker processes to
>>>> download
>>>> > the application jars and add them to the Executor class path at launch
>>>> > time. There are a couple of things that still need to be done before
>>>> this
>>>> > can be merged:
>>>> > 1. At the moment, the first time a task runs in the executor, the
>>>> > application jars are downloaded again.  My solution here would be to
>>>> make
>>>> > the executor not download any jars that already exist.  Previously,
>>>> the
>>>> > driver & executor kept track of the timestamp of jar files and would
>>>> > redownload 'updated' jars, however this never made sense as the
>>>> previous
>>>> > version of the updated jar may have already been loaded into the
>>>> executor,
>>>> > so the updated jar may have no effect.  As my current pull request
>>>> removes
>>>> > the ti

Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing

2014-08-14 Thread Graham Dennis
Hi Deb,

The only alternative serialiser is the JavaSerialiser (the default).
 Theoretically Spark supports custom serialisers, but due to a related
issue, custom serialisers currently can't live in application jars and must
be available to all executors at launch.  My PR fixes this issue as well,
allowing custom serialisers to be shipped in application jars.

Graham


On 14 August 2014 16:56, Debasish Das  wrote:

> Sorry I just saw Graham's email after sending my previous email about this
> bug...
>
> I have been seeing this same issue on our ALS runs last week but I thought
> it was due my hacky way to run mllib 1.1 snapshot on core 1.0...
>
> What's the status of this PR ? Will this fix be back-ported to 1.0.1 as we
> are running 1.0.1 stable standalone cluster ?
>
> Till the PR merges does it make sense to not use Kryo ? What are the other
> recommended efficient serializers ?
>
> Thanks.
> Deb
>
>
> On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis 
> wrote:
>
>> I now have a complete pull request for this issue that I'd like to get
>> reviewed and committed.  The PR is available here:
>> https://github.com/apache/spark/pull/1890 and includes a testcase for the
>> issue I described.  I've also submitted a related PR (
>> https://github.com/apache/spark/pull/1827) that causes exceptions raised
>> while attempting to run the custom kryo registrator not to be swallowed.
>>
>> Thanks,
>> Graham
>>
>>
>> On 12 August 2014 18:44, Graham Dennis  wrote:
>>
>> > I've submitted a work-in-progress pull request for this issue that I'd
>> > like feedback on.  See https://github.com/apache/spark/pull/1890 . I've
>> > also submitted a pull request for the related issue that the exceptions
>> hit
>> > when trying to use a custom kryo registrator are being swallowed:
>> > https://github.com/apache/spark/pull/1827
>> >
>> > The approach in my pull request is to get the Worker processes to
>> download
>> > the application jars and add them to the Executor class path at launch
>> > time. There are a couple of things that still need to be done before
>> this
>> > can be merged:
>> > 1. At the moment, the first time a task runs in the executor, the
>> > application jars are downloaded again.  My solution here would be to
>> make
>> > the executor not download any jars that already exist.  Previously, the
>> > driver & executor kept track of the timestamp of jar files and would
>> > redownload 'updated' jars, however this never made sense as the previous
>> > version of the updated jar may have already been loaded into the
>> executor,
>> > so the updated jar may have no effect.  As my current pull request
>> removes
>> > the timestamp for jars, just checking whether the jar exists will allow
>> us
>> > to avoid downloading the jars again.
>> > 2. Tests. :-)
>> >
>> > A side-benefit of my pull request is that you will be able to use custom
>> > serialisers that are distributed in a user jar.  Currently, the
>> serialiser
>> > instance is created in the Executor process before the first task is
>> > received and therefore before any user jars are downloaded.  As this PR
>> > adds user jars to the Executor process at launch time, this won't be an
>> > issue.
>> >
>> >
>> > On 7 August 2014 12:01, Graham Dennis  wrote:
>> >
>> >> See my comment on https://issues.apache.org/jira/browse/SPARK-2878 for
>> >> the full stacktrace, but it's in the BlockManager/BlockManagerWorker
>> where
>> >> it's trying to fulfil a "getBlock" request for another node.  The
>> objects
>> >> that would be in the block haven't yet been serialised, and that then
>> >> causes the deserialisation to happen on that thread.  See
>> >> MemoryStore.scala:102.
>> >>
>> >>
>> >> On 7 August 2014 11:53, Reynold Xin  wrote:
>> >>
>> >>> I don't think it was a conscious design decision to not include the
>> >>> application classes in the connection manager serializer. We should
>> fix
>> >>> that. Where is it deserializing data in that thread?
>> >>>
>> >>>  4 might make sense in the long run, but it adds a lot of complexity
>> to
>> >>> the code base (whole separate code base, task queue,
>> blocking/non-blocking
>> >>> logic within task threads) that can be error prone, 

Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing

2014-08-13 Thread Graham Dennis
I now have a complete pull request for this issue that I'd like to get
reviewed and committed.  The PR is available here:
https://github.com/apache/spark/pull/1890 and includes a testcase for the
issue I described.  I've also submitted a related PR (
https://github.com/apache/spark/pull/1827) that causes exceptions raised
while attempting to run the custom kryo registrator not to be swallowed.

Thanks,
Graham


On 12 August 2014 18:44, Graham Dennis  wrote:

> I've submitted a work-in-progress pull request for this issue that I'd
> like feedback on.  See https://github.com/apache/spark/pull/1890 . I've
> also submitted a pull request for the related issue that the exceptions hit
> when trying to use a custom kryo registrator are being swallowed:
> https://github.com/apache/spark/pull/1827
>
> The approach in my pull request is to get the Worker processes to download
> the application jars and add them to the Executor class path at launch
> time. There are a couple of things that still need to be done before this
> can be merged:
> 1. At the moment, the first time a task runs in the executor, the
> application jars are downloaded again.  My solution here would be to make
> the executor not download any jars that already exist.  Previously, the
> driver & executor kept track of the timestamp of jar files and would
> redownload 'updated' jars, however this never made sense as the previous
> version of the updated jar may have already been loaded into the executor,
> so the updated jar may have no effect.  As my current pull request removes
> the timestamp for jars, just checking whether the jar exists will allow us
> to avoid downloading the jars again.
> 2. Tests. :-)
>
> A side-benefit of my pull request is that you will be able to use custom
> serialisers that are distributed in a user jar.  Currently, the serialiser
> instance is created in the Executor process before the first task is
> received and therefore before any user jars are downloaded.  As this PR
> adds user jars to the Executor process at launch time, this won't be an
> issue.
>
>
> On 7 August 2014 12:01, Graham Dennis  wrote:
>
>> See my comment on https://issues.apache.org/jira/browse/SPARK-2878 for
>> the full stacktrace, but it's in the BlockManager/BlockManagerWorker where
>> it's trying to fulfil a "getBlock" request for another node.  The objects
>> that would be in the block haven't yet been serialised, and that then
>> causes the deserialisation to happen on that thread.  See
>> MemoryStore.scala:102.
>>
>>
>> On 7 August 2014 11:53, Reynold Xin  wrote:
>>
>>> I don't think it was a conscious design decision to not include the
>>> application classes in the connection manager serializer. We should fix
>>> that. Where is it deserializing data in that thread?
>>>
>>>  4 might make sense in the long run, but it adds a lot of complexity to
>>> the code base (whole separate code base, task queue, blocking/non-blocking
>>> logic within task threads) that can be error prone, so I think it is best
>>> to stay away from that right now.
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis 
>>> wrote:
>>>
>>>> Hi Spark devs,
>>>>
>>>> I’ve posted an issue on JIRA (
>>>> https://issues.apache.org/jira/browse/SPARK-2878) which occurs when
>>>> using
>>>> Kryo serialisation with a custom Kryo registrator to register custom
>>>> classes with Kryo.  This is an insidious issue that
>>>> non-deterministically
>>>> causes Kryo to have different ID number => class name maps on different
>>>> nodes, which then causes weird exceptions (ClassCastException,
>>>> ClassNotFoundException, ArrayIndexOutOfBoundsException) at
>>>> deserialisation
>>>> time.  I’ve created a reliable reproduction for the issue here:
>>>> https://github.com/GrahamDennis/spark-kryo-serialisation
>>>>
>>>> I’m happy to try and put a pull request together to try and address
>>>> this,
>>>> but it’s not obvious to me the right way to solve this and I’d like to
>>>> get
>>>> feedback / ideas on how to address this.
>>>>
>>>> The root cause of the problem is a "Failed to run
>>>> spark.kryo.registrator”
>>>> error which non-deterministically occurs in some executor processes
>>>> during
>>>> operation.  My custom Kryo registrator is in the application jar, and
>>>> it is
>

Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing

2014-08-12 Thread Graham Dennis
I've submitted a work-in-progress pull request for this issue that I'd like
feedback on.  See https://github.com/apache/spark/pull/1890 . I've also
submitted a pull request for the related issue that the exceptions hit when
trying to use a custom kryo registrator are being swallowed:
https://github.com/apache/spark/pull/1827

The approach in my pull request is to get the Worker processes to download
the application jars and add them to the Executor class path at launch
time. There are a couple of things that still need to be done before this
can be merged:
1. At the moment, the first time a task runs in the executor, the
application jars are downloaded again.  My solution here would be to make
the executor not download any jars that already exist.  Previously, the
driver & executor kept track of the timestamp of jar files and would
redownload 'updated' jars, however this never made sense as the previous
version of the updated jar may have already been loaded into the executor,
so the updated jar may have no effect.  As my current pull request removes
the timestamp for jars, just checking whether the jar exists will allow us
to avoid downloading the jars again.
2. Tests. :-)

A side-benefit of my pull request is that you will be able to use custom
serialisers that are distributed in a user jar.  Currently, the serialiser
instance is created in the Executor process before the first task is
received and therefore before any user jars are downloaded.  As this PR
adds user jars to the Executor process at launch time, this won't be an
issue.


On 7 August 2014 12:01, Graham Dennis  wrote:

> See my comment on https://issues.apache.org/jira/browse/SPARK-2878 for
> the full stacktrace, but it's in the BlockManager/BlockManagerWorker where
> it's trying to fulfil a "getBlock" request for another node.  The objects
> that would be in the block haven't yet been serialised, and that then
> causes the deserialisation to happen on that thread.  See
> MemoryStore.scala:102.
>
>
> On 7 August 2014 11:53, Reynold Xin  wrote:
>
>> I don't think it was a conscious design decision to not include the
>> application classes in the connection manager serializer. We should fix
>> that. Where is it deserializing data in that thread?
>>
>> 4 might make sense in the long run, but it adds a lot of complexity to
>> the code base (whole separate code base, task queue, blocking/non-blocking
>> logic within task threads) that can be error prone, so I think it is best
>> to stay away from that right now.
>>
>>
>>
>>
>>
>> On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis 
>> wrote:
>>
>>> Hi Spark devs,
>>>
>>> I’ve posted an issue on JIRA (
>>> https://issues.apache.org/jira/browse/SPARK-2878) which occurs when
>>> using
>>> Kryo serialisation with a custom Kryo registrator to register custom
>>> classes with Kryo.  This is an insidious issue that non-deterministically
>>> causes Kryo to have different ID number => class name maps on different
>>> nodes, which then causes weird exceptions (ClassCastException,
>>> ClassNotFoundException, ArrayIndexOutOfBoundsException) at
>>> deserialisation
>>> time.  I’ve created a reliable reproduction for the issue here:
>>> https://github.com/GrahamDennis/spark-kryo-serialisation
>>>
>>> I’m happy to try and put a pull request together to try and address this,
>>> but it’s not obvious to me the right way to solve this and I’d like to
>>> get
>>> feedback / ideas on how to address this.
>>>
>>> The root cause of the problem is a "Failed to run spark.kryo.registrator”
>>> error which non-deterministically occurs in some executor processes
>>> during
>>> operation.  My custom Kryo registrator is in the application jar, and it
>>> is
>>> accessible on the worker nodes.  This is demonstrated by the fact that
>>> most
>>> of the time the custom kryo registrator is successfully run.
>>>
>>> What’s happening is that Kryo serialisation/deserialisation is happening
>>> most of the time on an “Executor task launch worker” thread, which has
>>> the
>>> thread's class loader set to contain the application jar.  This happens
>>> in
>>> `org.apache.spark.executor.Executor.TaskRunner.run`, and from what I can
>>> tell, it is only these threads that have access to the application jar
>>> (that contains the custom Kryo registrator).  However, the
>>> ConnectionManager threads sometimes need to serialise/deserialise objects
>>> to satisfy “getBlock” requests when the objects haven’t p

Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing

2014-08-06 Thread Graham Dennis
See my comment on https://issues.apache.org/jira/browse/SPARK-2878 for the
full stacktrace, but it's in the BlockManager/BlockManagerWorker where it's
trying to fulfil a "getBlock" request for another node.  The objects that
would be in the block haven't yet been serialised, and that then causes the
deserialisation to happen on that thread.  See MemoryStore.scala:102.


On 7 August 2014 11:53, Reynold Xin  wrote:

> I don't think it was a conscious design decision to not include the
> application classes in the connection manager serializer. We should fix
> that. Where is it deserializing data in that thread?
>
> 4 might make sense in the long run, but it adds a lot of complexity to the
> code base (whole separate code base, task queue, blocking/non-blocking
> logic within task threads) that can be error prone, so I think it is best
> to stay away from that right now.
>
>
>
>
>
> On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis 
> wrote:
>
>> Hi Spark devs,
>>
>> I’ve posted an issue on JIRA (
>> https://issues.apache.org/jira/browse/SPARK-2878) which occurs when using
>> Kryo serialisation with a custom Kryo registrator to register custom
>> classes with Kryo.  This is an insidious issue that non-deterministically
>> causes Kryo to have different ID number => class name maps on different
>> nodes, which then causes weird exceptions (ClassCastException,
>> ClassNotFoundException, ArrayIndexOutOfBoundsException) at deserialisation
>> time.  I’ve created a reliable reproduction for the issue here:
>> https://github.com/GrahamDennis/spark-kryo-serialisation
>>
>> I’m happy to try and put a pull request together to try and address this,
>> but it’s not obvious to me the right way to solve this and I’d like to get
>> feedback / ideas on how to address this.
>>
>> The root cause of the problem is a "Failed to run spark.kryo.registrator”
>> error which non-deterministically occurs in some executor processes during
>> operation.  My custom Kryo registrator is in the application jar, and it
>> is
>> accessible on the worker nodes.  This is demonstrated by the fact that
>> most
>> of the time the custom kryo registrator is successfully run.
>>
>> What’s happening is that Kryo serialisation/deserialisation is happening
>> most of the time on an “Executor task launch worker” thread, which has the
>> thread's class loader set to contain the application jar.  This happens in
>> `org.apache.spark.executor.Executor.TaskRunner.run`, and from what I can
>> tell, it is only these threads that have access to the application jar
>> (that contains the custom Kryo registrator).  However, the
>> ConnectionManager threads sometimes need to serialise/deserialise objects
>> to satisfy “getBlock” requests when the objects haven’t previously been
>> serialised.  As the ConnectionManager threads don’t have the application
>> jar available from their class loader, when it tries to look up the custom
>> Kryo registrator, this fails.  Spark then swallows this exception, which
>> results in a different ID number —> class mapping for this kryo instance,
>> and this then causes deserialisation errors later on a different node.
>>
>> A related issue to the issue reported in SPARK-2878 is that Spark probably
>> shouldn’t swallow the ClassNotFound exception for custom Kryo
>> registrators.
>>  The user has explicitly specified this class, and if it deterministically
>> can’t be found, then it may cause problems at serialisation /
>> deserialisation time.  If only sometimes it can’t be found (as in this
>> case), then it leads to a data corruption issue later on.  Either way,
>> we’re better off dying due to the ClassNotFound exception earlier, than
>> the
>> weirder errors later on.
>>
>> I have some ideas on potential solutions to this issue, but I’m keen for
>> experienced eyes to critique these approaches:
>>
>> 1. The simplest approach to fixing this would be to just make the
>> application jar available to the connection manager threads, but I’m
>> guessing it’s a design decision to isolate the application jar to just the
>> executor task runner threads.  Also, I don’t know if there are any other
>> threads that might be interacting with kryo serialisation /
>> deserialisation.
>> 2. Before looking up the custom Kryo registrator, change the thread’s
>> class
>> loader to include the application jar, then restore the class loader after
>> the kryo registrator has been run.  I don’t know if this would have any
>> other side-effects.
>> 3. Always serialise / deserialis

[SPARK-2878] Kryo serialisation with custom Kryo registrator failing

2014-08-06 Thread Graham Dennis
Hi Spark devs,

I’ve posted an issue on JIRA (
https://issues.apache.org/jira/browse/SPARK-2878) which occurs when using
Kryo serialisation with a custom Kryo registrator to register custom
classes with Kryo.  This is an insidious issue that non-deterministically
causes Kryo to have different ID number => class name maps on different
nodes, which then causes weird exceptions (ClassCastException,
ClassNotFoundException, ArrayIndexOutOfBoundsException) at deserialisation
time.  I’ve created a reliable reproduction for the issue here:
https://github.com/GrahamDennis/spark-kryo-serialisation

I’m happy to try and put a pull request together to try and address this,
but it’s not obvious to me the right way to solve this and I’d like to get
feedback / ideas on how to address this.

The root cause of the problem is a "Failed to run spark.kryo.registrator”
error which non-deterministically occurs in some executor processes during
operation.  My custom Kryo registrator is in the application jar, and it is
accessible on the worker nodes.  This is demonstrated by the fact that most
of the time the custom kryo registrator is successfully run.

What’s happening is that Kryo serialisation/deserialisation is happening
most of the time on an “Executor task launch worker” thread, which has the
thread's class loader set to contain the application jar.  This happens in
`org.apache.spark.executor.Executor.TaskRunner.run`, and from what I can
tell, it is only these threads that have access to the application jar
(that contains the custom Kryo registrator).  However, the
ConnectionManager threads sometimes need to serialise/deserialise objects
to satisfy “getBlock” requests when the objects haven’t previously been
serialised.  As the ConnectionManager threads don’t have the application
jar available from their class loader, when it tries to look up the custom
Kryo registrator, this fails.  Spark then swallows this exception, which
results in a different ID number —> class mapping for this kryo instance,
and this then causes deserialisation errors later on a different node.

A related issue to the issue reported in SPARK-2878 is that Spark probably
shouldn’t swallow the ClassNotFound exception for custom Kryo registrators.
 The user has explicitly specified this class, and if it deterministically
can’t be found, then it may cause problems at serialisation /
deserialisation time.  If only sometimes it can’t be found (as in this
case), then it leads to a data corruption issue later on.  Either way,
we’re better off dying due to the ClassNotFound exception earlier, than the
weirder errors later on.

I have some ideas on potential solutions to this issue, but I’m keen for
experienced eyes to critique these approaches:

1. The simplest approach to fixing this would be to just make the
application jar available to the connection manager threads, but I’m
guessing it’s a design decision to isolate the application jar to just the
executor task runner threads.  Also, I don’t know if there are any other
threads that might be interacting with kryo serialisation / deserialisation.
2. Before looking up the custom Kryo registrator, change the thread’s class
loader to include the application jar, then restore the class loader after
the kryo registrator has been run.  I don’t know if this would have any
other side-effects.
3. Always serialise / deserialise on the existing TaskRunner threads,
rather than delaying serialisation until later, when it can be done only if
needed.  This approach would probably have negative performance
consequences.
4. Create a new dedicated thread pool for lazy serialisation /
deserialisation that has the application jar on the class path.
 Serialisation / deserialisation would be the only thing these threads do,
and this would minimise conflicts / interactions between the application
jar and other jars.

#4 sounds like the best approach to me, but I think would require
considerable knowledge of Spark internals, which is beyond me at present.
 Does anyone have any better (and ideally simpler) ideas?

Cheers,

Graham