Re: Shuffle Problems in 1.2.0

2015-01-12 Thread Sven Krasser
I've filed a ticket for this issue here:
https://issues.apache.org/jira/browse/SPARK-5209. (This reproduces the
problem on a smaller cluster size.)
-Sven

On Wed, Jan 7, 2015 at 11:13 AM, Sven Krasser  wrote:
> Could you try it on AWS using EMR? That'd give you an exact replica of the 
> environment that causes the error.
>
> Sent from my iPhone
>
>> On Jan 7, 2015, at 10:53 AM, Davies Liu  wrote:
>>
>> Hey Sven,
>>
>> I tried with all of your configurations, 2 node with 2 executors each,
>> but in standalone mode,
>> it worked fine.
>>
>> Could you try to narrow down the possible change of configurations?
>>
>> Davies
>>
>>> On Tue, Jan 6, 2015 at 8:03 PM, Sven Krasser  wrote:
>>> Hey Davies,
>>>
>>> Here are some more details on a configuration that causes this error for me.
>>> Launch an AWS Spark EMR cluster as follows:
>>>
>>> aws emr create-cluster --region us-west-1 --no-auto-terminate \
>>>
>>>   --ec2-attributes KeyName=your-key-here,SubnetId=your-subnet-here \
>>>
>>>   --bootstrap-actions
>>> Path=s3://support.elasticmapreduce/spark/install-spark,Args='["-g"]' \
>>>
>>>   --ami-version 3.3 --instance-groups
>>> InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge \
>>>
>>>   InstanceGroupType=CORE,InstanceCount=10,InstanceType=r3.xlarge --name
>>> "Spark Issue Repro" \
>>>
>>>   --visible-to-all-users --applications Name=Ganglia
>>>
>>> This is a 10 node cluster (not sure if this makes a difference outside of
>>> HDFS block locality). Then use this Gist here as your spark-defaults file
>>> (it'll configure 2 executors per job as well):
>>> https://gist.github.com/skrasser/9b978d3d572735298d16
>>>
>>> With that, I am seeing this again:
>>>
>>> 2015-01-07 03:43:51,751 ERROR [Executor task launch worker-1]
>>> executor.Executor (Logging.scala:logError(96)) - Exception in task 13.0 in
>>> stage 0.0 (TID 27)
>>> org.apache.spark.SparkException: PairwiseRDD: unexpected value:
>>> List([B@4cfae71c)
>>>
>>> Thanks for the performance pointers -- the repro script is fairly unpolished
>>> (just enough to cause the aforementioned exception).
>>>
>>> Hope this sheds some light on the error. From what I can tell so far,
>>> something in the spark-defaults file triggers it (with other settings it
>>> completes just fine).
>>>
>>> Thanks for your help!
>>> -Sven
>>>
>>>
 On Tue, Jan 6, 2015 at 12:29 PM, Davies Liu  wrote:

 I still can not reproduce it with 2 nodes (4 CPUs).

 Your repro.py could be faster (10 min) than before (22 min):

 inpdata.map(lambda (pc, x): (x, pc=='p' and 2 or
 1)).reduceByKey(lambda x, y: x|y).filter(lambda (x, pc):
 pc==3).collect()

 (also, no cache needed anymore)

 Davies



> On Tue, Jan 6, 2015 at 9:02 AM, Sven Krasser  wrote:
> The issue has been sensitive to the number of executors and input data
> size.
> I'm using 2 executors with 4 cores each, 25GB of memory, 3800MB of
> memory
> overhead for YARN. This will fit onto Amazon r3 instance types.
> -Sven
>
> On Tue, Jan 6, 2015 at 12:46 AM, Davies Liu 
> wrote:
>>
>> I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not
>> reproduce your failure. Should I test it with big memory node?
>>
>>> On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser  wrote:
>>> Thanks for the input! I've managed to come up with a repro of the
>>> error
>>> with
>>> test data only (and without any of the custom code in the original
>>> script),
>>> please see here:
>>>
>>> https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md
>>>
>>> The Gist contains a data generator and the script reproducing the
>>> error
>>> (plus driver and executor logs). If I run using full cluster capacity
>>> (32
>>> executors with 28GB), there are no issues. If I run on only two, the
>>> error
>>> appears again and the job fails:
>>>
>>> org.apache.spark.SparkException: PairwiseRDD: unexpected value:
>>> List([B@294b55b7)
>>>
>>>
>>> Any thoughts or any obvious problems you can spot by any chance?
>>>
>>> Thank you!
>>> -Sven
>>>
>>> On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen 
>>> wrote:

 It doesn’t seem like there’s a whole lot of clues to go on here
 without
 seeing the job code.  The original "org.apache.spark.SparkException:
 PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests
 that
 maybe
 there’s an issue with PySpark’s serialization / tracking of types,
 but
 it’s
 hard to say from this error trace alone.

 On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com)
 wrote:

 Hey Josh,

 I am still trying to prune this to a minimal example, but it has
 been
 tricky since scale seems to be a factor. The job runs over ~72

Re: Shuffle Problems in 1.2.0

2015-01-07 Thread Sven Krasser
Could you try it on AWS using EMR? That'd give you an exact replica of the 
environment that causes the error. 

Sent from my iPhone

> On Jan 7, 2015, at 10:53 AM, Davies Liu  wrote:
> 
> Hey Sven,
> 
> I tried with all of your configurations, 2 node with 2 executors each,
> but in standalone mode,
> it worked fine.
> 
> Could you try to narrow down the possible change of configurations?
> 
> Davies
> 
>> On Tue, Jan 6, 2015 at 8:03 PM, Sven Krasser  wrote:
>> Hey Davies,
>> 
>> Here are some more details on a configuration that causes this error for me.
>> Launch an AWS Spark EMR cluster as follows:
>> 
>> aws emr create-cluster --region us-west-1 --no-auto-terminate \
>> 
>>   --ec2-attributes KeyName=your-key-here,SubnetId=your-subnet-here \
>> 
>>   --bootstrap-actions
>> Path=s3://support.elasticmapreduce/spark/install-spark,Args='["-g"]' \
>> 
>>   --ami-version 3.3 --instance-groups
>> InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge \
>> 
>>   InstanceGroupType=CORE,InstanceCount=10,InstanceType=r3.xlarge --name
>> "Spark Issue Repro" \
>> 
>>   --visible-to-all-users --applications Name=Ganglia
>> 
>> This is a 10 node cluster (not sure if this makes a difference outside of
>> HDFS block locality). Then use this Gist here as your spark-defaults file
>> (it'll configure 2 executors per job as well):
>> https://gist.github.com/skrasser/9b978d3d572735298d16
>> 
>> With that, I am seeing this again:
>> 
>> 2015-01-07 03:43:51,751 ERROR [Executor task launch worker-1]
>> executor.Executor (Logging.scala:logError(96)) - Exception in task 13.0 in
>> stage 0.0 (TID 27)
>> org.apache.spark.SparkException: PairwiseRDD: unexpected value:
>> List([B@4cfae71c)
>> 
>> Thanks for the performance pointers -- the repro script is fairly unpolished
>> (just enough to cause the aforementioned exception).
>> 
>> Hope this sheds some light on the error. From what I can tell so far,
>> something in the spark-defaults file triggers it (with other settings it
>> completes just fine).
>> 
>> Thanks for your help!
>> -Sven
>> 
>> 
>>> On Tue, Jan 6, 2015 at 12:29 PM, Davies Liu  wrote:
>>> 
>>> I still can not reproduce it with 2 nodes (4 CPUs).
>>> 
>>> Your repro.py could be faster (10 min) than before (22 min):
>>> 
>>> inpdata.map(lambda (pc, x): (x, pc=='p' and 2 or
>>> 1)).reduceByKey(lambda x, y: x|y).filter(lambda (x, pc):
>>> pc==3).collect()
>>> 
>>> (also, no cache needed anymore)
>>> 
>>> Davies
>>> 
>>> 
>>> 
 On Tue, Jan 6, 2015 at 9:02 AM, Sven Krasser  wrote:
 The issue has been sensitive to the number of executors and input data
 size.
 I'm using 2 executors with 4 cores each, 25GB of memory, 3800MB of
 memory
 overhead for YARN. This will fit onto Amazon r3 instance types.
 -Sven
 
 On Tue, Jan 6, 2015 at 12:46 AM, Davies Liu 
 wrote:
> 
> I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not
> reproduce your failure. Should I test it with big memory node?
> 
>> On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser  wrote:
>> Thanks for the input! I've managed to come up with a repro of the
>> error
>> with
>> test data only (and without any of the custom code in the original
>> script),
>> please see here:
>> 
>> https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md
>> 
>> The Gist contains a data generator and the script reproducing the
>> error
>> (plus driver and executor logs). If I run using full cluster capacity
>> (32
>> executors with 28GB), there are no issues. If I run on only two, the
>> error
>> appears again and the job fails:
>> 
>> org.apache.spark.SparkException: PairwiseRDD: unexpected value:
>> List([B@294b55b7)
>> 
>> 
>> Any thoughts or any obvious problems you can spot by any chance?
>> 
>> Thank you!
>> -Sven
>> 
>> On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen 
>> wrote:
>>> 
>>> It doesn’t seem like there’s a whole lot of clues to go on here
>>> without
>>> seeing the job code.  The original "org.apache.spark.SparkException:
>>> PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests
>>> that
>>> maybe
>>> there’s an issue with PySpark’s serialization / tracking of types,
>>> but
>>> it’s
>>> hard to say from this error trace alone.
>>> 
>>> On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com)
>>> wrote:
>>> 
>>> Hey Josh,
>>> 
>>> I am still trying to prune this to a minimal example, but it has
>>> been
>>> tricky since scale seems to be a factor. The job runs over ~720GB of
>>> data
>>> (the cluster's total RAM is around ~900GB, split across 32
>>> executors).
>>> I've
>>> managed to run it over a vastly smaller data set without issues.
>>> Curiously,
>>> when I run it over slightly smaller data set of ~230GB (using
>>> sort-based
>>> s

Re: Shuffle Problems in 1.2.0

2015-01-07 Thread Davies Liu
Hey Sven,

I tried with all of your configurations, 2 node with 2 executors each,
but in standalone mode,
it worked fine.

Could you try to narrow down the possible change of configurations?

Davies

On Tue, Jan 6, 2015 at 8:03 PM, Sven Krasser  wrote:
> Hey Davies,
>
> Here are some more details on a configuration that causes this error for me.
> Launch an AWS Spark EMR cluster as follows:
>
> aws emr create-cluster --region us-west-1 --no-auto-terminate \
>
>--ec2-attributes KeyName=your-key-here,SubnetId=your-subnet-here \
>
>--bootstrap-actions
> Path=s3://support.elasticmapreduce/spark/install-spark,Args='["-g"]' \
>
>--ami-version 3.3 --instance-groups
> InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge \
>
>InstanceGroupType=CORE,InstanceCount=10,InstanceType=r3.xlarge --name
> "Spark Issue Repro" \
>
>--visible-to-all-users --applications Name=Ganglia
>
> This is a 10 node cluster (not sure if this makes a difference outside of
> HDFS block locality). Then use this Gist here as your spark-defaults file
> (it'll configure 2 executors per job as well):
> https://gist.github.com/skrasser/9b978d3d572735298d16
>
> With that, I am seeing this again:
>
> 2015-01-07 03:43:51,751 ERROR [Executor task launch worker-1]
> executor.Executor (Logging.scala:logError(96)) - Exception in task 13.0 in
> stage 0.0 (TID 27)
> org.apache.spark.SparkException: PairwiseRDD: unexpected value:
> List([B@4cfae71c)
>
> Thanks for the performance pointers -- the repro script is fairly unpolished
> (just enough to cause the aforementioned exception).
>
> Hope this sheds some light on the error. From what I can tell so far,
> something in the spark-defaults file triggers it (with other settings it
> completes just fine).
>
> Thanks for your help!
> -Sven
>
>
> On Tue, Jan 6, 2015 at 12:29 PM, Davies Liu  wrote:
>>
>> I still can not reproduce it with 2 nodes (4 CPUs).
>>
>> Your repro.py could be faster (10 min) than before (22 min):
>>
>> inpdata.map(lambda (pc, x): (x, pc=='p' and 2 or
>> 1)).reduceByKey(lambda x, y: x|y).filter(lambda (x, pc):
>> pc==3).collect()
>>
>> (also, no cache needed anymore)
>>
>> Davies
>>
>>
>>
>> On Tue, Jan 6, 2015 at 9:02 AM, Sven Krasser  wrote:
>> > The issue has been sensitive to the number of executors and input data
>> > size.
>> > I'm using 2 executors with 4 cores each, 25GB of memory, 3800MB of
>> > memory
>> > overhead for YARN. This will fit onto Amazon r3 instance types.
>> > -Sven
>> >
>> > On Tue, Jan 6, 2015 at 12:46 AM, Davies Liu 
>> > wrote:
>> >>
>> >> I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not
>> >> reproduce your failure. Should I test it with big memory node?
>> >>
>> >> On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser  wrote:
>> >> > Thanks for the input! I've managed to come up with a repro of the
>> >> > error
>> >> > with
>> >> > test data only (and without any of the custom code in the original
>> >> > script),
>> >> > please see here:
>> >> >
>> >> > https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md
>> >> >
>> >> > The Gist contains a data generator and the script reproducing the
>> >> > error
>> >> > (plus driver and executor logs). If I run using full cluster capacity
>> >> > (32
>> >> > executors with 28GB), there are no issues. If I run on only two, the
>> >> > error
>> >> > appears again and the job fails:
>> >> >
>> >> > org.apache.spark.SparkException: PairwiseRDD: unexpected value:
>> >> > List([B@294b55b7)
>> >> >
>> >> >
>> >> > Any thoughts or any obvious problems you can spot by any chance?
>> >> >
>> >> > Thank you!
>> >> > -Sven
>> >> >
>> >> > On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen 
>> >> > wrote:
>> >> >>
>> >> >> It doesn’t seem like there’s a whole lot of clues to go on here
>> >> >> without
>> >> >> seeing the job code.  The original "org.apache.spark.SparkException:
>> >> >> PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests
>> >> >> that
>> >> >> maybe
>> >> >> there’s an issue with PySpark’s serialization / tracking of types,
>> >> >> but
>> >> >> it’s
>> >> >> hard to say from this error trace alone.
>> >> >>
>> >> >> On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com)
>> >> >> wrote:
>> >> >>
>> >> >> Hey Josh,
>> >> >>
>> >> >> I am still trying to prune this to a minimal example, but it has
>> >> >> been
>> >> >> tricky since scale seems to be a factor. The job runs over ~720GB of
>> >> >> data
>> >> >> (the cluster's total RAM is around ~900GB, split across 32
>> >> >> executors).
>> >> >> I've
>> >> >> managed to run it over a vastly smaller data set without issues.
>> >> >> Curiously,
>> >> >> when I run it over slightly smaller data set of ~230GB (using
>> >> >> sort-based
>> >> >> shuffle), my job also fails, but I see no shuffle errors in the
>> >> >> executor
>> >> >> logs. All I see is the error below from the driver (this is also
>> >> >> what
>> >> >> the
>> >> >> driver prints when erroring out on the large data set, but

Re: Shuffle Problems in 1.2.0

2015-01-06 Thread Sven Krasser
Hey Davies,

Here are some more details on a configuration that causes this error for
me. Launch an AWS Spark EMR cluster as follows:


*aws emr create-cluster --region us-west-1 --no-auto-terminate \
   --ec2-attributes KeyName=your-key-here,SubnetId=your-subnet-here \
   --bootstrap-actions
Path=s3://support.elasticmapreduce/spark/install-spark,Args='["-g"]' \
   --ami-version 3.3 --instance-groups
InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge \
   InstanceGroupType=CORE,InstanceCount=10,InstanceType=r3.xlarge --name
"Spark Issue Repro" \--visible-to-all-users --applications Name=Ganglia*

This is a 10 node cluster (not sure if this makes a difference outside of
HDFS block locality). Then use this Gist here as your spark-defaults file
(it'll configure 2 executors per job as well):
https://gist.github.com/skrasser/9b978d3d572735298d16

With that, I am seeing this again:

2015-01-07 03:43:51,751 ERROR [Executor task launch worker-1]
executor.Executor (Logging.scala:logError(96)) - Exception in task
13.0 in stage 0.0 (TID 27)
org.apache.spark.SparkException: PairwiseRDD: unexpected value:
List([B@4cfae71c)

Thanks for the performance pointers -- the repro script is fairly
unpolished (just enough to cause the aforementioned exception).

Hope this sheds some light on the error. From what I can tell so far,
something in the spark-defaults file triggers it (with other settings it
completes just fine).

Thanks for your help!
-Sven


On Tue, Jan 6, 2015 at 12:29 PM, Davies Liu  wrote:

> I still can not reproduce it with 2 nodes (4 CPUs).
>
> Your repro.py could be faster (10 min) than before (22 min):
>
> inpdata.map(lambda (pc, x): (x, pc=='p' and 2 or
> 1)).reduceByKey(lambda x, y: x|y).filter(lambda (x, pc):
> pc==3).collect()
>
> (also, no cache needed anymore)
>
> Davies
>
>
>
> On Tue, Jan 6, 2015 at 9:02 AM, Sven Krasser  wrote:
> > The issue has been sensitive to the number of executors and input data
> size.
> > I'm using 2 executors with 4 cores each, 25GB of memory, 3800MB of memory
> > overhead for YARN. This will fit onto Amazon r3 instance types.
> > -Sven
> >
> > On Tue, Jan 6, 2015 at 12:46 AM, Davies Liu 
> wrote:
> >>
> >> I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not
> >> reproduce your failure. Should I test it with big memory node?
> >>
> >> On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser  wrote:
> >> > Thanks for the input! I've managed to come up with a repro of the
> error
> >> > with
> >> > test data only (and without any of the custom code in the original
> >> > script),
> >> > please see here:
> >> >
> https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md
> >> >
> >> > The Gist contains a data generator and the script reproducing the
> error
> >> > (plus driver and executor logs). If I run using full cluster capacity
> >> > (32
> >> > executors with 28GB), there are no issues. If I run on only two, the
> >> > error
> >> > appears again and the job fails:
> >> >
> >> > org.apache.spark.SparkException: PairwiseRDD: unexpected value:
> >> > List([B@294b55b7)
> >> >
> >> >
> >> > Any thoughts or any obvious problems you can spot by any chance?
> >> >
> >> > Thank you!
> >> > -Sven
> >> >
> >> > On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen 
> wrote:
> >> >>
> >> >> It doesn’t seem like there’s a whole lot of clues to go on here
> without
> >> >> seeing the job code.  The original "org.apache.spark.SparkException:
> >> >> PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests
> that
> >> >> maybe
> >> >> there’s an issue with PySpark’s serialization / tracking of types,
> but
> >> >> it’s
> >> >> hard to say from this error trace alone.
> >> >>
> >> >> On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com)
> >> >> wrote:
> >> >>
> >> >> Hey Josh,
> >> >>
> >> >> I am still trying to prune this to a minimal example, but it has been
> >> >> tricky since scale seems to be a factor. The job runs over ~720GB of
> >> >> data
> >> >> (the cluster's total RAM is around ~900GB, split across 32
> executors).
> >> >> I've
> >> >> managed to run it over a vastly smaller data set without issues.
> >> >> Curiously,
> >> >> when I run it over slightly smaller data set of ~230GB (using
> >> >> sort-based
> >> >> shuffle), my job also fails, but I see no shuffle errors in the
> >> >> executor
> >> >> logs. All I see is the error below from the driver (this is also what
> >> >> the
> >> >> driver prints when erroring out on the large data set, but I assumed
> >> >> the
> >> >> executor errors to be the root cause).
> >> >>
> >> >> Any idea on where to look in the interim for more hints? I'll
> continue
> >> >> to
> >> >> try to get to a minimal repro.
> >> >>
> >> >> 2014-12-30 21:35:34,539 INFO
> >> >> [sparkDriver-akka.actor.default-dispatcher-14]
> >> >> spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked
> >> >> to
> >> >> send map output locations for shuffle 0 to
> >> >> sparkexecu...@ip-10-20-80-60.us-we

Re: Shuffle Problems in 1.2.0

2015-01-06 Thread Davies Liu
I still can not reproduce it with 2 nodes (4 CPUs).

Your repro.py could be faster (10 min) than before (22 min):

inpdata.map(lambda (pc, x): (x, pc=='p' and 2 or
1)).reduceByKey(lambda x, y: x|y).filter(lambda (x, pc):
pc==3).collect()

(also, no cache needed anymore)

Davies



On Tue, Jan 6, 2015 at 9:02 AM, Sven Krasser  wrote:
> The issue has been sensitive to the number of executors and input data size.
> I'm using 2 executors with 4 cores each, 25GB of memory, 3800MB of memory
> overhead for YARN. This will fit onto Amazon r3 instance types.
> -Sven
>
> On Tue, Jan 6, 2015 at 12:46 AM, Davies Liu  wrote:
>>
>> I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not
>> reproduce your failure. Should I test it with big memory node?
>>
>> On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser  wrote:
>> > Thanks for the input! I've managed to come up with a repro of the error
>> > with
>> > test data only (and without any of the custom code in the original
>> > script),
>> > please see here:
>> > https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md
>> >
>> > The Gist contains a data generator and the script reproducing the error
>> > (plus driver and executor logs). If I run using full cluster capacity
>> > (32
>> > executors with 28GB), there are no issues. If I run on only two, the
>> > error
>> > appears again and the job fails:
>> >
>> > org.apache.spark.SparkException: PairwiseRDD: unexpected value:
>> > List([B@294b55b7)
>> >
>> >
>> > Any thoughts or any obvious problems you can spot by any chance?
>> >
>> > Thank you!
>> > -Sven
>> >
>> > On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen  wrote:
>> >>
>> >> It doesn’t seem like there’s a whole lot of clues to go on here without
>> >> seeing the job code.  The original "org.apache.spark.SparkException:
>> >> PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests that
>> >> maybe
>> >> there’s an issue with PySpark’s serialization / tracking of types, but
>> >> it’s
>> >> hard to say from this error trace alone.
>> >>
>> >> On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com)
>> >> wrote:
>> >>
>> >> Hey Josh,
>> >>
>> >> I am still trying to prune this to a minimal example, but it has been
>> >> tricky since scale seems to be a factor. The job runs over ~720GB of
>> >> data
>> >> (the cluster's total RAM is around ~900GB, split across 32 executors).
>> >> I've
>> >> managed to run it over a vastly smaller data set without issues.
>> >> Curiously,
>> >> when I run it over slightly smaller data set of ~230GB (using
>> >> sort-based
>> >> shuffle), my job also fails, but I see no shuffle errors in the
>> >> executor
>> >> logs. All I see is the error below from the driver (this is also what
>> >> the
>> >> driver prints when erroring out on the large data set, but I assumed
>> >> the
>> >> executor errors to be the root cause).
>> >>
>> >> Any idea on where to look in the interim for more hints? I'll continue
>> >> to
>> >> try to get to a minimal repro.
>> >>
>> >> 2014-12-30 21:35:34,539 INFO
>> >> [sparkDriver-akka.actor.default-dispatcher-14]
>> >> spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked
>> >> to
>> >> send map output locations for shuffle 0 to
>> >> sparkexecu...@ip-10-20-80-60.us-west-1.compute.internal:39739
>> >> 2014-12-30 21:35:39,512 INFO
>> >> [sparkDriver-akka.actor.default-dispatcher-17]
>> >> spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked
>> >> to
>> >> send map output locations for shuffle 0 to
>> >> sparkexecu...@ip-10-20-80-62.us-west-1.compute.internal:42277
>> >> 2014-12-30 21:35:58,893 WARN
>> >> [sparkDriver-akka.actor.default-dispatcher-16]
>> >> remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71))
>> >> -
>> >> Association with remote system
>> >>
>> >> [akka.tcp://sparkyar...@ip-10-20-80-64.us-west-1.compute.internal:49584] 
>> >> has
>> >> failed, address is now gated for [5000] ms. Reason is: [Disassociated].
>> >> 2014-12-30 21:35:59,044 ERROR [Yarn application state monitor]
>> >> cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Yarn
>> >> application has already exited with state FINISHED!
>> >> 2014-12-30 21:35:59,056 INFO  [Yarn application state monitor]
>> >> handler.ContextHandler (ContextHandler.java:doStop(788)) - stopped
>> >> o.e.j.s.ServletContextHandler{/stages/stage/kill,null}
>> >>
>> >> [...]
>> >>
>> >> 2014-12-30 21:35:59,111 INFO  [Yarn application state monitor]
>> >> ui.SparkUI
>> >> (Logging.scala:logInfo(59)) - Stopped Spark web UI at
>> >> http://ip-10-20-80-37.us-west-1.compute.internal:4040
>> >> 2014-12-30 21:35:59,130 INFO  [Yarn application state monitor]
>> >> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stopping
>> >> DAGScheduler
>> >> 2014-12-30 21:35:59,131 INFO  [Yarn application state monitor]
>> >> cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) -
>> >> Shutting
>> >> down all executors
>> >> 2014-12-30 21:35:59,132 INFO
>> >> [sparkD

Re: Shuffle Problems in 1.2.0

2015-01-06 Thread Sven Krasser
The issue has been sensitive to the number of executors and input data
size. I'm using 2 executors with 4 cores each, 25GB of memory, 3800MB of
memory overhead for YARN. This will fit onto Amazon r3 instance types.
-Sven

On Tue, Jan 6, 2015 at 12:46 AM, Davies Liu  wrote:

> I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not
> reproduce your failure. Should I test it with big memory node?
>
> On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser  wrote:
> > Thanks for the input! I've managed to come up with a repro of the error
> with
> > test data only (and without any of the custom code in the original
> script),
> > please see here:
> > https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md
> >
> > The Gist contains a data generator and the script reproducing the error
> > (plus driver and executor logs). If I run using full cluster capacity (32
> > executors with 28GB), there are no issues. If I run on only two, the
> error
> > appears again and the job fails:
> >
> > org.apache.spark.SparkException: PairwiseRDD: unexpected value:
> > List([B@294b55b7)
> >
> >
> > Any thoughts or any obvious problems you can spot by any chance?
> >
> > Thank you!
> > -Sven
> >
> > On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen  wrote:
> >>
> >> It doesn’t seem like there’s a whole lot of clues to go on here without
> >> seeing the job code.  The original "org.apache.spark.SparkException:
> >> PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests that
> maybe
> >> there’s an issue with PySpark’s serialization / tracking of types, but
> it’s
> >> hard to say from this error trace alone.
> >>
> >> On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com)
> >> wrote:
> >>
> >> Hey Josh,
> >>
> >> I am still trying to prune this to a minimal example, but it has been
> >> tricky since scale seems to be a factor. The job runs over ~720GB of
> data
> >> (the cluster's total RAM is around ~900GB, split across 32 executors).
> I've
> >> managed to run it over a vastly smaller data set without issues.
> Curiously,
> >> when I run it over slightly smaller data set of ~230GB (using sort-based
> >> shuffle), my job also fails, but I see no shuffle errors in the executor
> >> logs. All I see is the error below from the driver (this is also what
> the
> >> driver prints when erroring out on the large data set, but I assumed the
> >> executor errors to be the root cause).
> >>
> >> Any idea on where to look in the interim for more hints? I'll continue
> to
> >> try to get to a minimal repro.
> >>
> >> 2014-12-30 21:35:34,539 INFO
> >> [sparkDriver-akka.actor.default-dispatcher-14]
> >> spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to
> >> send map output locations for shuffle 0 to
> >> sparkexecu...@ip-10-20-80-60.us-west-1.compute.internal:39739
> >> 2014-12-30 21:35:39,512 INFO
> >> [sparkDriver-akka.actor.default-dispatcher-17]
> >> spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to
> >> send map output locations for shuffle 0 to
> >> sparkexecu...@ip-10-20-80-62.us-west-1.compute.internal:42277
> >> 2014-12-30 21:35:58,893 WARN
> >> [sparkDriver-akka.actor.default-dispatcher-16]
> >> remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) -
> >> Association with remote system
> >> [akka.tcp://sparkyar...@ip-10-20-80-64.us-west-1.compute.internal:49584]
> has
> >> failed, address is now gated for [5000] ms. Reason is: [Disassociated].
> >> 2014-12-30 21:35:59,044 ERROR [Yarn application state monitor]
> >> cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Yarn
> >> application has already exited with state FINISHED!
> >> 2014-12-30 21:35:59,056 INFO  [Yarn application state monitor]
> >> handler.ContextHandler (ContextHandler.java:doStop(788)) - stopped
> >> o.e.j.s.ServletContextHandler{/stages/stage/kill,null}
> >>
> >> [...]
> >>
> >> 2014-12-30 21:35:59,111 INFO  [Yarn application state monitor]
> ui.SparkUI
> >> (Logging.scala:logInfo(59)) - Stopped Spark web UI at
> >> http://ip-10-20-80-37.us-west-1.compute.internal:4040
> >> 2014-12-30 21:35:59,130 INFO  [Yarn application state monitor]
> >> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stopping
> DAGScheduler
> >> 2014-12-30 21:35:59,131 INFO  [Yarn application state monitor]
> >> cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) -
> Shutting
> >> down all executors
> >> 2014-12-30 21:35:59,132 INFO
> >> [sparkDriver-akka.actor.default-dispatcher-14]
> >> cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Asking
> each
> >> executor to shut down
> >> 2014-12-30 21:35:59,132 INFO  [Thread-2] scheduler.DAGScheduler
> >> (Logging.scala:logInfo(59)) - Job 1 failed: collect at
> >> /home/hadoop/test_scripts/test.py:63, took 980.751936 s
> >> Traceback (most recent call last):
> >>   File "/home/hadoop/test_scripts/test.py", line 63, in 
> >> result = j.collect()
> >>   File "/home/hadoop/spark/python/pyspark/rdd.py", line

Re: Shuffle Problems in 1.2.0

2015-01-06 Thread Davies Liu
I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not
reproduce your failure. Should I test it with big memory node?

On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser  wrote:
> Thanks for the input! I've managed to come up with a repro of the error with
> test data only (and without any of the custom code in the original script),
> please see here:
> https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md
>
> The Gist contains a data generator and the script reproducing the error
> (plus driver and executor logs). If I run using full cluster capacity (32
> executors with 28GB), there are no issues. If I run on only two, the error
> appears again and the job fails:
>
> org.apache.spark.SparkException: PairwiseRDD: unexpected value:
> List([B@294b55b7)
>
>
> Any thoughts or any obvious problems you can spot by any chance?
>
> Thank you!
> -Sven
>
> On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen  wrote:
>>
>> It doesn’t seem like there’s a whole lot of clues to go on here without
>> seeing the job code.  The original "org.apache.spark.SparkException:
>> PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests that maybe
>> there’s an issue with PySpark’s serialization / tracking of types, but it’s
>> hard to say from this error trace alone.
>>
>> On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com)
>> wrote:
>>
>> Hey Josh,
>>
>> I am still trying to prune this to a minimal example, but it has been
>> tricky since scale seems to be a factor. The job runs over ~720GB of data
>> (the cluster's total RAM is around ~900GB, split across 32 executors). I've
>> managed to run it over a vastly smaller data set without issues. Curiously,
>> when I run it over slightly smaller data set of ~230GB (using sort-based
>> shuffle), my job also fails, but I see no shuffle errors in the executor
>> logs. All I see is the error below from the driver (this is also what the
>> driver prints when erroring out on the large data set, but I assumed the
>> executor errors to be the root cause).
>>
>> Any idea on where to look in the interim for more hints? I'll continue to
>> try to get to a minimal repro.
>>
>> 2014-12-30 21:35:34,539 INFO
>> [sparkDriver-akka.actor.default-dispatcher-14]
>> spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to
>> send map output locations for shuffle 0 to
>> sparkexecu...@ip-10-20-80-60.us-west-1.compute.internal:39739
>> 2014-12-30 21:35:39,512 INFO
>> [sparkDriver-akka.actor.default-dispatcher-17]
>> spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to
>> send map output locations for shuffle 0 to
>> sparkexecu...@ip-10-20-80-62.us-west-1.compute.internal:42277
>> 2014-12-30 21:35:58,893 WARN
>> [sparkDriver-akka.actor.default-dispatcher-16]
>> remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) -
>> Association with remote system
>> [akka.tcp://sparkyar...@ip-10-20-80-64.us-west-1.compute.internal:49584] has
>> failed, address is now gated for [5000] ms. Reason is: [Disassociated].
>> 2014-12-30 21:35:59,044 ERROR [Yarn application state monitor]
>> cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Yarn
>> application has already exited with state FINISHED!
>> 2014-12-30 21:35:59,056 INFO  [Yarn application state monitor]
>> handler.ContextHandler (ContextHandler.java:doStop(788)) - stopped
>> o.e.j.s.ServletContextHandler{/stages/stage/kill,null}
>>
>> [...]
>>
>> 2014-12-30 21:35:59,111 INFO  [Yarn application state monitor] ui.SparkUI
>> (Logging.scala:logInfo(59)) - Stopped Spark web UI at
>> http://ip-10-20-80-37.us-west-1.compute.internal:4040
>> 2014-12-30 21:35:59,130 INFO  [Yarn application state monitor]
>> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stopping DAGScheduler
>> 2014-12-30 21:35:59,131 INFO  [Yarn application state monitor]
>> cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Shutting
>> down all executors
>> 2014-12-30 21:35:59,132 INFO
>> [sparkDriver-akka.actor.default-dispatcher-14]
>> cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Asking each
>> executor to shut down
>> 2014-12-30 21:35:59,132 INFO  [Thread-2] scheduler.DAGScheduler
>> (Logging.scala:logInfo(59)) - Job 1 failed: collect at
>> /home/hadoop/test_scripts/test.py:63, took 980.751936 s
>> Traceback (most recent call last):
>>   File "/home/hadoop/test_scripts/test.py", line 63, in 
>> result = j.collect()
>>   File "/home/hadoop/spark/python/pyspark/rdd.py", line 676, in collect
>> bytesInJava = self._jrdd.collect().iterator()
>>   File
>> "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
>> line 538, in __call__
>>   File
>> "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
>> 300, in get_return_value
>> py4j.protocol.Py4JJavaError2014-12-30 21:35:59,140 INFO  [Yarn application
>> state monitor] cluster.YarnClientSchedulerBackend
>> (Logging.scala:logInfo(59)) - Stopped
>> : An error occu

Re: Shuffle Problems in 1.2.0

2015-01-05 Thread Sven Krasser
Thanks for the input! I've managed to come up with a repro of the error
with test data only (and without any of the custom code in the original
script), please see here:
https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md

The Gist contains a data generator and the script reproducing the error
(plus driver and executor logs). If I run using full cluster capacity (32
executors with 28GB), there are no issues. If I run on only two, the error
appears again and the job fails:

org.apache.spark.SparkException: PairwiseRDD: unexpected value:
List([B@294b55b7)


Any thoughts or any obvious problems you can spot by any chance?

Thank you!
-Sven

On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen  wrote:

> It doesn’t seem like there’s a whole lot of clues to go on here without
> seeing the job code.  The original "org.apache.spark.SparkException:
> PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests that
> maybe there’s an issue with PySpark’s serialization / tracking of types,
> but it’s hard to say from this error trace alone.
>
> On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com)
> wrote:
>
>   Hey Josh,
>
> I am still trying to prune this to a minimal example, but it has been
> tricky since scale seems to be a factor. The job runs over ~720GB of data
> (the cluster's total RAM is around ~900GB, split across 32 executors). I've
> managed to run it over a vastly smaller data set without issues. Curiously,
> when I run it over slightly smaller data set of ~230GB (using sort-based
> shuffle), my job also fails, but I see no shuffle errors in the executor
> logs. All I see is the error below from the driver (this is also what the
> driver prints when erroring out on the large data set, but I assumed the
> executor errors to be the root cause).
>
> Any idea on where to look in the interim for more hints? I'll continue to
> try to get to a minimal repro.
>
> 2014-12-30 21:35:34,539 INFO
> [sparkDriver-akka.actor.default-dispatcher-14]
> spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to
> send map output locations for shuffle 0 to
> sparkexecu...@ip-10-20-80-60.us-west-1.compute.internal:39739
> 2014-12-30 21:35:39,512 INFO
> [sparkDriver-akka.actor.default-dispatcher-17]
> spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to
> send map output locations for shuffle 0 to
> sparkexecu...@ip-10-20-80-62.us-west-1.compute.internal:42277
> 2014-12-30 21:35:58,893 WARN
> [sparkDriver-akka.actor.default-dispatcher-16]
> remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) -
> Association with remote system
> [akka.tcp://sparkyar...@ip-10-20-80-64.us-west-1.compute.internal:49584]
> has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
> 2014-12-30 21:35:59,044 ERROR [Yarn application state monitor]
> cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Yarn
> application has already exited with state FINISHED!
> 2014-12-30 21:35:59,056 INFO  [Yarn application state monitor]
> handler.ContextHandler (ContextHandler.java:doStop(788)) - stopped
> o.e.j.s.ServletContextHandler{/stages/stage/kill,null}
>
> [...]
>
> 2014-12-30 21:35:59,111 INFO  [Yarn application state monitor] ui.SparkUI
> (Logging.scala:logInfo(59)) - Stopped Spark web UI at
> http://ip-10-20-80-37.us-west-1.compute.internal:4040
> 2014-12-30 21:35:59,130 INFO  [Yarn application state monitor]
> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stopping DAGScheduler
> 2014-12-30 21:35:59,131 INFO  [Yarn application state monitor]
> cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Shutting
> down all executors
> 2014-12-30 21:35:59,132 INFO
> [sparkDriver-akka.actor.default-dispatcher-14]
> cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Asking
> each executor to shut down
> 2014-12-30 21:35:59,132 INFO  [Thread-2] scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Job 1 failed: collect at
> /home/hadoop/test_scripts/test.py:63, took 980.751936 s
> Traceback (most recent call last):
>   File "/home/hadoop/test_scripts/test.py", line 63, in 
> result = j.collect()
>   File "/home/hadoop/spark/python/pyspark/rdd.py", line 676, in collect
> bytesInJava = self._jrdd.collect().iterator()
>   File
> "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
> line 538, in __call__
>   File
> "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
> 300, in get_return_value
> py4j.protocol.Py4JJavaError2014-12-30 21:35:59,140 INFO  [Yarn application
> state monitor] cluster.YarnClientSchedulerBackend
> (Logging.scala:logInfo(59)) - Stopped
> : An error occurred while calling o117.collect.
> : org.apache.spark.SparkException: Job cancelled because SparkContext was
> shut down
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
> at
> org.apache.spark.scheduler.DAGScheduler$$

Re: Shuffle Problems in 1.2.0

2015-01-04 Thread Josh Rosen
It doesn’t seem like there’s a whole lot of clues to go on here without seeing 
the job code.  The original "org.apache.spark.SparkException: PairwiseRDD: 
unexpected value: List([B@130dc7ad)” error suggests that maybe there’s an issue 
with PySpark’s serialization / tracking of types, but it’s hard to say from 
this error trace alone.

On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com) wrote:

Hey Josh,

I am still trying to prune this to a minimal example, but it has been tricky 
since scale seems to be a factor. The job runs over ~720GB of data (the 
cluster's total RAM is around ~900GB, split across 32 executors). I've managed 
to run it over a vastly smaller data set without issues. Curiously, when I run 
it over slightly smaller data set of ~230GB (using sort-based shuffle), my job 
also fails, but I see no shuffle errors in the executor logs. All I see is the 
error below from the driver (this is also what the driver prints when erroring 
out on the large data set, but I assumed the executor errors to be the root 
cause).

Any idea on where to look in the interim for more hints? I'll continue to try 
to get to a minimal repro.

2014-12-30 21:35:34,539 INFO  [sparkDriver-akka.actor.default-dispatcher-14] 
spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to send 
map output locations for shuffle 0 to 
sparkexecu...@ip-10-20-80-60.us-west-1.compute.internal:39739
2014-12-30 21:35:39,512 INFO  [sparkDriver-akka.actor.default-dispatcher-17] 
spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to send 
map output locations for shuffle 0 to 
sparkexecu...@ip-10-20-80-62.us-west-1.compute.internal:42277
2014-12-30 21:35:58,893 WARN  [sparkDriver-akka.actor.default-dispatcher-16] 
remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) - 
Association with remote system 
[akka.tcp://sparkyar...@ip-10-20-80-64.us-west-1.compute.internal:49584] has 
failed, address is now gated for [5000] ms. Reason is: [Disassociated].
2014-12-30 21:35:59,044 ERROR [Yarn application state monitor] 
cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Yarn 
application has already exited with state FINISHED!
2014-12-30 21:35:59,056 INFO  [Yarn application state monitor] 
handler.ContextHandler (ContextHandler.java:doStop(788)) - stopped 
o.e.j.s.ServletContextHandler{/stages/stage/kill,null}

[...]

2014-12-30 21:35:59,111 INFO  [Yarn application state monitor] ui.SparkUI 
(Logging.scala:logInfo(59)) - Stopped Spark web UI at 
http://ip-10-20-80-37.us-west-1.compute.internal:4040
2014-12-30 21:35:59,130 INFO  [Yarn application state monitor] 
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stopping DAGScheduler
2014-12-30 21:35:59,131 INFO  [Yarn application state monitor] 
cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Shutting down 
all executors
2014-12-30 21:35:59,132 INFO  [sparkDriver-akka.actor.default-dispatcher-14] 
cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Asking each 
executor to shut down
2014-12-30 21:35:59,132 INFO  [Thread-2] scheduler.DAGScheduler 
(Logging.scala:logInfo(59)) - Job 1 failed: collect at 
/home/hadoop/test_scripts/test.py:63, took 980.751936 s
Traceback (most recent call last):
  File "/home/hadoop/test_scripts/test.py", line 63, in 
    result = j.collect()
  File "/home/hadoop/spark/python/pyspark/rdd.py", line 676, in collect
    bytesInJava = self._jrdd.collect().iterator()
  File 
"/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 
538, in __call__
  File "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", 
line 300, in get_return_value
py4j.protocol.Py4JJavaError2014-12-30 21:35:59,140 INFO  [Yarn application 
state monitor] cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - 
Stopped
: An error occurred while calling o117.collect.
: org.apache.spark.SparkException: Job cancelled because SparkContext was shut 
down
    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
    at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:701)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
    at 
org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:701)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1428)
    at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1375)
    at 
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
    at 
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
    at akka.actor.ActorCell.terminate(ActorCell.scala:369)
    at akka.actor.ActorCell.invokeAll$1(

Re: Shuffle Problems in 1.2.0

2014-12-30 Thread Sven Krasser
Hey Josh,

I am still trying to prune this to a minimal example, but it has been
tricky since scale seems to be a factor. The job runs over ~720GB of data
(the cluster's total RAM is around ~900GB, split across 32 executors). I've
managed to run it over a vastly smaller data set without issues. Curiously,
when I run it over slightly smaller data set of ~230GB (using sort-based
shuffle), my job also fails, but I see no shuffle errors in the executor
logs. All I see is the error below from the driver (this is also what the
driver prints when erroring out on the large data set, but I assumed the
executor errors to be the root cause).

Any idea on where to look in the interim for more hints? I'll continue to
try to get to a minimal repro.

2014-12-30 21:35:34,539 INFO
[sparkDriver-akka.actor.default-dispatcher-14]
spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to
send map output locations for shuffle 0 to
sparkexecu...@ip-10-20-80-60.us-west-1.compute.internal:39739
2014-12-30 21:35:39,512 INFO
[sparkDriver-akka.actor.default-dispatcher-17]
spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to
send map output locations for shuffle 0 to
sparkexecu...@ip-10-20-80-62.us-west-1.compute.internal:42277
2014-12-30 21:35:58,893 WARN
[sparkDriver-akka.actor.default-dispatcher-16]
remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) -
Association with remote system
[akka.tcp://sparkyar...@ip-10-20-80-64.us-west-1.compute.internal:49584]
has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
2014-12-30 21:35:59,044 ERROR [Yarn application state monitor]
cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Yarn
application has already exited with state FINISHED!
2014-12-30 21:35:59,056 INFO  [Yarn application state monitor]
handler.ContextHandler (ContextHandler.java:doStop(788)) - stopped
o.e.j.s.ServletContextHandler{/stages/stage/kill,null}

[...]

2014-12-30 21:35:59,111 INFO  [Yarn application state monitor] ui.SparkUI
(Logging.scala:logInfo(59)) - Stopped Spark web UI at
http://ip-10-20-80-37.us-west-1.compute.internal:4040
2014-12-30 21:35:59,130 INFO  [Yarn application state monitor]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stopping DAGScheduler
2014-12-30 21:35:59,131 INFO  [Yarn application state monitor]
cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Shutting
down all executors
2014-12-30 21:35:59,132 INFO
[sparkDriver-akka.actor.default-dispatcher-14]
cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Asking
each executor to shut down
2014-12-30 21:35:59,132 INFO  [Thread-2] scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Job 1 failed: collect at
/home/hadoop/test_scripts/test.py:63, took 980.751936 s
Traceback (most recent call last):
  File "/home/hadoop/test_scripts/test.py", line 63, in 
result = j.collect()
  File "/home/hadoop/spark/python/pyspark/rdd.py", line 676, in collect
bytesInJava = self._jrdd.collect().iterator()
  File
"/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py",
line 538, in __call__
  File
"/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line
300, in get_return_value
py4j.protocol.Py4JJavaError2014-12-30 21:35:59,140 INFO  [Yarn application
state monitor] cluster.YarnClientSchedulerBackend
(Logging.scala:logInfo(59)) - Stopped
: An error occurred while calling o117.collect.
: org.apache.spark.SparkException: Job cancelled because SparkContext was
shut down
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:701)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at
org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:701)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1428)
at akka.actor.Actor$class.aroundPostStop(Actor.scala:475)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1375)
at
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
at
akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
at akka.actor.ActorCell.terminate(ActorCell.scala:369)
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue

Re: Shuffle Problems in 1.2.0

2014-12-30 Thread Josh Rosen
Hi Sven,

Do you have a small example program that you can share which will allow me
to reproduce this issue?  If you have a workload that runs into this, you
should be able to keep iteratively simplifying the job and reducing the
data set size until you hit a fairly minimal reproduction (assuming the
issue is deterministic, which it sounds like it is).

On Tue, Dec 30, 2014 at 9:49 AM, Sven Krasser  wrote:

> Hey all,
>
> Since upgrading to 1.2.0 a pyspark job that worked fine in 1.1.1 fails
> during shuffle. I've tried reverting from the sort-based shuffle back to
> the hash one, and that fails as well. Does anyone see similar problems or
> has an idea on where to look next?
>
> For the sort-based shuffle I get a bunch of exception like this in the
> executor logs:
>
> 2014-12-30 03:13:04,061 ERROR [Executor task launch worker-2] 
> executor.Executor (Logging.scala:logError(96)) - Exception in task 4523.0 in 
> stage 1.0 (TID 4524)
> org.apache.spark.SparkException: PairwiseRDD: unexpected value: 
> List([B@130dc7ad)
>   at 
> org.apache.spark.api.python.PairwiseRDD$$anonfun$compute$2.apply(PythonRDD.scala:307)
>   at 
> org.apache.spark.api.python.PairwiseRDD$$anonfun$compute$2.apply(PythonRDD.scala:305)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>   at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:219)
>   at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:56)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
>
> For the hash-based shuffle, there are now a bunch of these exceptions in the 
> logs:
>
>
> 2014-12-30 04:14:01,688 ERROR [Executor task launch worker-0] 
> executor.Executor (Logging.scala:logError(96)) - Exception in task 4479.0 in 
> stage 1.0 (TID 4480)
> java.io.FileNotFoundException: 
> /mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1419905501183_0004/spark-local-20141230035728-8fc0/23/merged_shuffle_1_68_0
>  (No such file or directory)
>   at java.io.FileOutputStream.open(Native Method)
>   at java.io.FileOutputStream.(FileOutputStream.java:221)
>   at 
> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
>   at 
> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192)
>   at 
> org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67)
>   at 
> org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>   at 
> org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:56)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
>
> Thank you!
> -Sven
>
>
>
> --
> http://sites.google.com/site/krasser/?utm_source=sig
>


Shuffle Problems in 1.2.0

2014-12-30 Thread Sven Krasser
Hey all,

Since upgrading to 1.2.0 a pyspark job that worked fine in 1.1.1 fails
during shuffle. I've tried reverting from the sort-based shuffle back to
the hash one, and that fails as well. Does anyone see similar problems or
has an idea on where to look next?

For the sort-based shuffle I get a bunch of exception like this in the
executor logs:

2014-12-30 03:13:04,061 ERROR [Executor task launch worker-2]
executor.Executor (Logging.scala:logError(96)) - Exception in task
4523.0 in stage 1.0 (TID 4524)
org.apache.spark.SparkException: PairwiseRDD: unexpected value:
List([B@130dc7ad)
at 
org.apache.spark.api.python.PairwiseRDD$$anonfun$compute$2.apply(PythonRDD.scala:307)
at 
org.apache.spark.api.python.PairwiseRDD$$anonfun$compute$2.apply(PythonRDD.scala:305)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:219)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

For the hash-based shuffle, there are now a bunch of these exceptions
in the logs:


2014-12-30 04:14:01,688 ERROR [Executor task launch worker-0]
executor.Executor (Logging.scala:logError(96)) - Exception in task
4479.0 in stage 1.0 (TID 4480)
java.io.FileNotFoundException:
/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1419905501183_0004/spark-local-20141230035728-8fc0/23/merged_shuffle_1_68_0
(No such file or directory)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.(FileOutputStream.java:221)
at 
org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)
at 
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192)
at 
org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67)
at 
org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Thank you!
-Sven



-- 
http://sites.google.com/site/krasser/?utm_source=sig