Re: Shuffle Problems in 1.2.0
I've filed a ticket for this issue here: https://issues.apache.org/jira/browse/SPARK-5209. (This reproduces the problem on a smaller cluster size.) -Sven On Wed, Jan 7, 2015 at 11:13 AM, Sven Krasser wrote: > Could you try it on AWS using EMR? That'd give you an exact replica of the > environment that causes the error. > > Sent from my iPhone > >> On Jan 7, 2015, at 10:53 AM, Davies Liu wrote: >> >> Hey Sven, >> >> I tried with all of your configurations, 2 node with 2 executors each, >> but in standalone mode, >> it worked fine. >> >> Could you try to narrow down the possible change of configurations? >> >> Davies >> >>> On Tue, Jan 6, 2015 at 8:03 PM, Sven Krasser wrote: >>> Hey Davies, >>> >>> Here are some more details on a configuration that causes this error for me. >>> Launch an AWS Spark EMR cluster as follows: >>> >>> aws emr create-cluster --region us-west-1 --no-auto-terminate \ >>> >>> --ec2-attributes KeyName=your-key-here,SubnetId=your-subnet-here \ >>> >>> --bootstrap-actions >>> Path=s3://support.elasticmapreduce/spark/install-spark,Args='["-g"]' \ >>> >>> --ami-version 3.3 --instance-groups >>> InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge \ >>> >>> InstanceGroupType=CORE,InstanceCount=10,InstanceType=r3.xlarge --name >>> "Spark Issue Repro" \ >>> >>> --visible-to-all-users --applications Name=Ganglia >>> >>> This is a 10 node cluster (not sure if this makes a difference outside of >>> HDFS block locality). Then use this Gist here as your spark-defaults file >>> (it'll configure 2 executors per job as well): >>> https://gist.github.com/skrasser/9b978d3d572735298d16 >>> >>> With that, I am seeing this again: >>> >>> 2015-01-07 03:43:51,751 ERROR [Executor task launch worker-1] >>> executor.Executor (Logging.scala:logError(96)) - Exception in task 13.0 in >>> stage 0.0 (TID 27) >>> org.apache.spark.SparkException: PairwiseRDD: unexpected value: >>> List([B@4cfae71c) >>> >>> Thanks for the performance pointers -- the repro script is fairly unpolished >>> (just enough to cause the aforementioned exception). >>> >>> Hope this sheds some light on the error. From what I can tell so far, >>> something in the spark-defaults file triggers it (with other settings it >>> completes just fine). >>> >>> Thanks for your help! >>> -Sven >>> >>> On Tue, Jan 6, 2015 at 12:29 PM, Davies Liu wrote: I still can not reproduce it with 2 nodes (4 CPUs). Your repro.py could be faster (10 min) than before (22 min): inpdata.map(lambda (pc, x): (x, pc=='p' and 2 or 1)).reduceByKey(lambda x, y: x|y).filter(lambda (x, pc): pc==3).collect() (also, no cache needed anymore) Davies > On Tue, Jan 6, 2015 at 9:02 AM, Sven Krasser wrote: > The issue has been sensitive to the number of executors and input data > size. > I'm using 2 executors with 4 cores each, 25GB of memory, 3800MB of > memory > overhead for YARN. This will fit onto Amazon r3 instance types. > -Sven > > On Tue, Jan 6, 2015 at 12:46 AM, Davies Liu > wrote: >> >> I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not >> reproduce your failure. Should I test it with big memory node? >> >>> On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser wrote: >>> Thanks for the input! I've managed to come up with a repro of the >>> error >>> with >>> test data only (and without any of the custom code in the original >>> script), >>> please see here: >>> >>> https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md >>> >>> The Gist contains a data generator and the script reproducing the >>> error >>> (plus driver and executor logs). If I run using full cluster capacity >>> (32 >>> executors with 28GB), there are no issues. If I run on only two, the >>> error >>> appears again and the job fails: >>> >>> org.apache.spark.SparkException: PairwiseRDD: unexpected value: >>> List([B@294b55b7) >>> >>> >>> Any thoughts or any obvious problems you can spot by any chance? >>> >>> Thank you! >>> -Sven >>> >>> On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen >>> wrote: It doesn’t seem like there’s a whole lot of clues to go on here without seeing the job code. The original "org.apache.spark.SparkException: PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests that maybe there’s an issue with PySpark’s serialization / tracking of types, but it’s hard to say from this error trace alone. On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com) wrote: Hey Josh, I am still trying to prune this to a minimal example, but it has been tricky since scale seems to be a factor. The job runs over ~72
Re: Shuffle Problems in 1.2.0
Could you try it on AWS using EMR? That'd give you an exact replica of the environment that causes the error. Sent from my iPhone > On Jan 7, 2015, at 10:53 AM, Davies Liu wrote: > > Hey Sven, > > I tried with all of your configurations, 2 node with 2 executors each, > but in standalone mode, > it worked fine. > > Could you try to narrow down the possible change of configurations? > > Davies > >> On Tue, Jan 6, 2015 at 8:03 PM, Sven Krasser wrote: >> Hey Davies, >> >> Here are some more details on a configuration that causes this error for me. >> Launch an AWS Spark EMR cluster as follows: >> >> aws emr create-cluster --region us-west-1 --no-auto-terminate \ >> >> --ec2-attributes KeyName=your-key-here,SubnetId=your-subnet-here \ >> >> --bootstrap-actions >> Path=s3://support.elasticmapreduce/spark/install-spark,Args='["-g"]' \ >> >> --ami-version 3.3 --instance-groups >> InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge \ >> >> InstanceGroupType=CORE,InstanceCount=10,InstanceType=r3.xlarge --name >> "Spark Issue Repro" \ >> >> --visible-to-all-users --applications Name=Ganglia >> >> This is a 10 node cluster (not sure if this makes a difference outside of >> HDFS block locality). Then use this Gist here as your spark-defaults file >> (it'll configure 2 executors per job as well): >> https://gist.github.com/skrasser/9b978d3d572735298d16 >> >> With that, I am seeing this again: >> >> 2015-01-07 03:43:51,751 ERROR [Executor task launch worker-1] >> executor.Executor (Logging.scala:logError(96)) - Exception in task 13.0 in >> stage 0.0 (TID 27) >> org.apache.spark.SparkException: PairwiseRDD: unexpected value: >> List([B@4cfae71c) >> >> Thanks for the performance pointers -- the repro script is fairly unpolished >> (just enough to cause the aforementioned exception). >> >> Hope this sheds some light on the error. From what I can tell so far, >> something in the spark-defaults file triggers it (with other settings it >> completes just fine). >> >> Thanks for your help! >> -Sven >> >> >>> On Tue, Jan 6, 2015 at 12:29 PM, Davies Liu wrote: >>> >>> I still can not reproduce it with 2 nodes (4 CPUs). >>> >>> Your repro.py could be faster (10 min) than before (22 min): >>> >>> inpdata.map(lambda (pc, x): (x, pc=='p' and 2 or >>> 1)).reduceByKey(lambda x, y: x|y).filter(lambda (x, pc): >>> pc==3).collect() >>> >>> (also, no cache needed anymore) >>> >>> Davies >>> >>> >>> On Tue, Jan 6, 2015 at 9:02 AM, Sven Krasser wrote: The issue has been sensitive to the number of executors and input data size. I'm using 2 executors with 4 cores each, 25GB of memory, 3800MB of memory overhead for YARN. This will fit onto Amazon r3 instance types. -Sven On Tue, Jan 6, 2015 at 12:46 AM, Davies Liu wrote: > > I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not > reproduce your failure. Should I test it with big memory node? > >> On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser wrote: >> Thanks for the input! I've managed to come up with a repro of the >> error >> with >> test data only (and without any of the custom code in the original >> script), >> please see here: >> >> https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md >> >> The Gist contains a data generator and the script reproducing the >> error >> (plus driver and executor logs). If I run using full cluster capacity >> (32 >> executors with 28GB), there are no issues. If I run on only two, the >> error >> appears again and the job fails: >> >> org.apache.spark.SparkException: PairwiseRDD: unexpected value: >> List([B@294b55b7) >> >> >> Any thoughts or any obvious problems you can spot by any chance? >> >> Thank you! >> -Sven >> >> On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen >> wrote: >>> >>> It doesn’t seem like there’s a whole lot of clues to go on here >>> without >>> seeing the job code. The original "org.apache.spark.SparkException: >>> PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests >>> that >>> maybe >>> there’s an issue with PySpark’s serialization / tracking of types, >>> but >>> it’s >>> hard to say from this error trace alone. >>> >>> On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com) >>> wrote: >>> >>> Hey Josh, >>> >>> I am still trying to prune this to a minimal example, but it has >>> been >>> tricky since scale seems to be a factor. The job runs over ~720GB of >>> data >>> (the cluster's total RAM is around ~900GB, split across 32 >>> executors). >>> I've >>> managed to run it over a vastly smaller data set without issues. >>> Curiously, >>> when I run it over slightly smaller data set of ~230GB (using >>> sort-based >>> s
Re: Shuffle Problems in 1.2.0
Hey Sven, I tried with all of your configurations, 2 node with 2 executors each, but in standalone mode, it worked fine. Could you try to narrow down the possible change of configurations? Davies On Tue, Jan 6, 2015 at 8:03 PM, Sven Krasser wrote: > Hey Davies, > > Here are some more details on a configuration that causes this error for me. > Launch an AWS Spark EMR cluster as follows: > > aws emr create-cluster --region us-west-1 --no-auto-terminate \ > >--ec2-attributes KeyName=your-key-here,SubnetId=your-subnet-here \ > >--bootstrap-actions > Path=s3://support.elasticmapreduce/spark/install-spark,Args='["-g"]' \ > >--ami-version 3.3 --instance-groups > InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge \ > >InstanceGroupType=CORE,InstanceCount=10,InstanceType=r3.xlarge --name > "Spark Issue Repro" \ > >--visible-to-all-users --applications Name=Ganglia > > This is a 10 node cluster (not sure if this makes a difference outside of > HDFS block locality). Then use this Gist here as your spark-defaults file > (it'll configure 2 executors per job as well): > https://gist.github.com/skrasser/9b978d3d572735298d16 > > With that, I am seeing this again: > > 2015-01-07 03:43:51,751 ERROR [Executor task launch worker-1] > executor.Executor (Logging.scala:logError(96)) - Exception in task 13.0 in > stage 0.0 (TID 27) > org.apache.spark.SparkException: PairwiseRDD: unexpected value: > List([B@4cfae71c) > > Thanks for the performance pointers -- the repro script is fairly unpolished > (just enough to cause the aforementioned exception). > > Hope this sheds some light on the error. From what I can tell so far, > something in the spark-defaults file triggers it (with other settings it > completes just fine). > > Thanks for your help! > -Sven > > > On Tue, Jan 6, 2015 at 12:29 PM, Davies Liu wrote: >> >> I still can not reproduce it with 2 nodes (4 CPUs). >> >> Your repro.py could be faster (10 min) than before (22 min): >> >> inpdata.map(lambda (pc, x): (x, pc=='p' and 2 or >> 1)).reduceByKey(lambda x, y: x|y).filter(lambda (x, pc): >> pc==3).collect() >> >> (also, no cache needed anymore) >> >> Davies >> >> >> >> On Tue, Jan 6, 2015 at 9:02 AM, Sven Krasser wrote: >> > The issue has been sensitive to the number of executors and input data >> > size. >> > I'm using 2 executors with 4 cores each, 25GB of memory, 3800MB of >> > memory >> > overhead for YARN. This will fit onto Amazon r3 instance types. >> > -Sven >> > >> > On Tue, Jan 6, 2015 at 12:46 AM, Davies Liu >> > wrote: >> >> >> >> I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not >> >> reproduce your failure. Should I test it with big memory node? >> >> >> >> On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser wrote: >> >> > Thanks for the input! I've managed to come up with a repro of the >> >> > error >> >> > with >> >> > test data only (and without any of the custom code in the original >> >> > script), >> >> > please see here: >> >> > >> >> > https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md >> >> > >> >> > The Gist contains a data generator and the script reproducing the >> >> > error >> >> > (plus driver and executor logs). If I run using full cluster capacity >> >> > (32 >> >> > executors with 28GB), there are no issues. If I run on only two, the >> >> > error >> >> > appears again and the job fails: >> >> > >> >> > org.apache.spark.SparkException: PairwiseRDD: unexpected value: >> >> > List([B@294b55b7) >> >> > >> >> > >> >> > Any thoughts or any obvious problems you can spot by any chance? >> >> > >> >> > Thank you! >> >> > -Sven >> >> > >> >> > On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen >> >> > wrote: >> >> >> >> >> >> It doesn’t seem like there’s a whole lot of clues to go on here >> >> >> without >> >> >> seeing the job code. The original "org.apache.spark.SparkException: >> >> >> PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests >> >> >> that >> >> >> maybe >> >> >> there’s an issue with PySpark’s serialization / tracking of types, >> >> >> but >> >> >> it’s >> >> >> hard to say from this error trace alone. >> >> >> >> >> >> On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com) >> >> >> wrote: >> >> >> >> >> >> Hey Josh, >> >> >> >> >> >> I am still trying to prune this to a minimal example, but it has >> >> >> been >> >> >> tricky since scale seems to be a factor. The job runs over ~720GB of >> >> >> data >> >> >> (the cluster's total RAM is around ~900GB, split across 32 >> >> >> executors). >> >> >> I've >> >> >> managed to run it over a vastly smaller data set without issues. >> >> >> Curiously, >> >> >> when I run it over slightly smaller data set of ~230GB (using >> >> >> sort-based >> >> >> shuffle), my job also fails, but I see no shuffle errors in the >> >> >> executor >> >> >> logs. All I see is the error below from the driver (this is also >> >> >> what >> >> >> the >> >> >> driver prints when erroring out on the large data set, but
Re: Shuffle Problems in 1.2.0
Hey Davies, Here are some more details on a configuration that causes this error for me. Launch an AWS Spark EMR cluster as follows: *aws emr create-cluster --region us-west-1 --no-auto-terminate \ --ec2-attributes KeyName=your-key-here,SubnetId=your-subnet-here \ --bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark,Args='["-g"]' \ --ami-version 3.3 --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge \ InstanceGroupType=CORE,InstanceCount=10,InstanceType=r3.xlarge --name "Spark Issue Repro" \--visible-to-all-users --applications Name=Ganglia* This is a 10 node cluster (not sure if this makes a difference outside of HDFS block locality). Then use this Gist here as your spark-defaults file (it'll configure 2 executors per job as well): https://gist.github.com/skrasser/9b978d3d572735298d16 With that, I am seeing this again: 2015-01-07 03:43:51,751 ERROR [Executor task launch worker-1] executor.Executor (Logging.scala:logError(96)) - Exception in task 13.0 in stage 0.0 (TID 27) org.apache.spark.SparkException: PairwiseRDD: unexpected value: List([B@4cfae71c) Thanks for the performance pointers -- the repro script is fairly unpolished (just enough to cause the aforementioned exception). Hope this sheds some light on the error. From what I can tell so far, something in the spark-defaults file triggers it (with other settings it completes just fine). Thanks for your help! -Sven On Tue, Jan 6, 2015 at 12:29 PM, Davies Liu wrote: > I still can not reproduce it with 2 nodes (4 CPUs). > > Your repro.py could be faster (10 min) than before (22 min): > > inpdata.map(lambda (pc, x): (x, pc=='p' and 2 or > 1)).reduceByKey(lambda x, y: x|y).filter(lambda (x, pc): > pc==3).collect() > > (also, no cache needed anymore) > > Davies > > > > On Tue, Jan 6, 2015 at 9:02 AM, Sven Krasser wrote: > > The issue has been sensitive to the number of executors and input data > size. > > I'm using 2 executors with 4 cores each, 25GB of memory, 3800MB of memory > > overhead for YARN. This will fit onto Amazon r3 instance types. > > -Sven > > > > On Tue, Jan 6, 2015 at 12:46 AM, Davies Liu > wrote: > >> > >> I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not > >> reproduce your failure. Should I test it with big memory node? > >> > >> On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser wrote: > >> > Thanks for the input! I've managed to come up with a repro of the > error > >> > with > >> > test data only (and without any of the custom code in the original > >> > script), > >> > please see here: > >> > > https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md > >> > > >> > The Gist contains a data generator and the script reproducing the > error > >> > (plus driver and executor logs). If I run using full cluster capacity > >> > (32 > >> > executors with 28GB), there are no issues. If I run on only two, the > >> > error > >> > appears again and the job fails: > >> > > >> > org.apache.spark.SparkException: PairwiseRDD: unexpected value: > >> > List([B@294b55b7) > >> > > >> > > >> > Any thoughts or any obvious problems you can spot by any chance? > >> > > >> > Thank you! > >> > -Sven > >> > > >> > On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen > wrote: > >> >> > >> >> It doesn’t seem like there’s a whole lot of clues to go on here > without > >> >> seeing the job code. The original "org.apache.spark.SparkException: > >> >> PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests > that > >> >> maybe > >> >> there’s an issue with PySpark’s serialization / tracking of types, > but > >> >> it’s > >> >> hard to say from this error trace alone. > >> >> > >> >> On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com) > >> >> wrote: > >> >> > >> >> Hey Josh, > >> >> > >> >> I am still trying to prune this to a minimal example, but it has been > >> >> tricky since scale seems to be a factor. The job runs over ~720GB of > >> >> data > >> >> (the cluster's total RAM is around ~900GB, split across 32 > executors). > >> >> I've > >> >> managed to run it over a vastly smaller data set without issues. > >> >> Curiously, > >> >> when I run it over slightly smaller data set of ~230GB (using > >> >> sort-based > >> >> shuffle), my job also fails, but I see no shuffle errors in the > >> >> executor > >> >> logs. All I see is the error below from the driver (this is also what > >> >> the > >> >> driver prints when erroring out on the large data set, but I assumed > >> >> the > >> >> executor errors to be the root cause). > >> >> > >> >> Any idea on where to look in the interim for more hints? I'll > continue > >> >> to > >> >> try to get to a minimal repro. > >> >> > >> >> 2014-12-30 21:35:34,539 INFO > >> >> [sparkDriver-akka.actor.default-dispatcher-14] > >> >> spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked > >> >> to > >> >> send map output locations for shuffle 0 to > >> >> sparkexecu...@ip-10-20-80-60.us-we
Re: Shuffle Problems in 1.2.0
I still can not reproduce it with 2 nodes (4 CPUs). Your repro.py could be faster (10 min) than before (22 min): inpdata.map(lambda (pc, x): (x, pc=='p' and 2 or 1)).reduceByKey(lambda x, y: x|y).filter(lambda (x, pc): pc==3).collect() (also, no cache needed anymore) Davies On Tue, Jan 6, 2015 at 9:02 AM, Sven Krasser wrote: > The issue has been sensitive to the number of executors and input data size. > I'm using 2 executors with 4 cores each, 25GB of memory, 3800MB of memory > overhead for YARN. This will fit onto Amazon r3 instance types. > -Sven > > On Tue, Jan 6, 2015 at 12:46 AM, Davies Liu wrote: >> >> I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not >> reproduce your failure. Should I test it with big memory node? >> >> On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser wrote: >> > Thanks for the input! I've managed to come up with a repro of the error >> > with >> > test data only (and without any of the custom code in the original >> > script), >> > please see here: >> > https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md >> > >> > The Gist contains a data generator and the script reproducing the error >> > (plus driver and executor logs). If I run using full cluster capacity >> > (32 >> > executors with 28GB), there are no issues. If I run on only two, the >> > error >> > appears again and the job fails: >> > >> > org.apache.spark.SparkException: PairwiseRDD: unexpected value: >> > List([B@294b55b7) >> > >> > >> > Any thoughts or any obvious problems you can spot by any chance? >> > >> > Thank you! >> > -Sven >> > >> > On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen wrote: >> >> >> >> It doesn’t seem like there’s a whole lot of clues to go on here without >> >> seeing the job code. The original "org.apache.spark.SparkException: >> >> PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests that >> >> maybe >> >> there’s an issue with PySpark’s serialization / tracking of types, but >> >> it’s >> >> hard to say from this error trace alone. >> >> >> >> On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com) >> >> wrote: >> >> >> >> Hey Josh, >> >> >> >> I am still trying to prune this to a minimal example, but it has been >> >> tricky since scale seems to be a factor. The job runs over ~720GB of >> >> data >> >> (the cluster's total RAM is around ~900GB, split across 32 executors). >> >> I've >> >> managed to run it over a vastly smaller data set without issues. >> >> Curiously, >> >> when I run it over slightly smaller data set of ~230GB (using >> >> sort-based >> >> shuffle), my job also fails, but I see no shuffle errors in the >> >> executor >> >> logs. All I see is the error below from the driver (this is also what >> >> the >> >> driver prints when erroring out on the large data set, but I assumed >> >> the >> >> executor errors to be the root cause). >> >> >> >> Any idea on where to look in the interim for more hints? I'll continue >> >> to >> >> try to get to a minimal repro. >> >> >> >> 2014-12-30 21:35:34,539 INFO >> >> [sparkDriver-akka.actor.default-dispatcher-14] >> >> spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked >> >> to >> >> send map output locations for shuffle 0 to >> >> sparkexecu...@ip-10-20-80-60.us-west-1.compute.internal:39739 >> >> 2014-12-30 21:35:39,512 INFO >> >> [sparkDriver-akka.actor.default-dispatcher-17] >> >> spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked >> >> to >> >> send map output locations for shuffle 0 to >> >> sparkexecu...@ip-10-20-80-62.us-west-1.compute.internal:42277 >> >> 2014-12-30 21:35:58,893 WARN >> >> [sparkDriver-akka.actor.default-dispatcher-16] >> >> remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) >> >> - >> >> Association with remote system >> >> >> >> [akka.tcp://sparkyar...@ip-10-20-80-64.us-west-1.compute.internal:49584] >> >> has >> >> failed, address is now gated for [5000] ms. Reason is: [Disassociated]. >> >> 2014-12-30 21:35:59,044 ERROR [Yarn application state monitor] >> >> cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Yarn >> >> application has already exited with state FINISHED! >> >> 2014-12-30 21:35:59,056 INFO [Yarn application state monitor] >> >> handler.ContextHandler (ContextHandler.java:doStop(788)) - stopped >> >> o.e.j.s.ServletContextHandler{/stages/stage/kill,null} >> >> >> >> [...] >> >> >> >> 2014-12-30 21:35:59,111 INFO [Yarn application state monitor] >> >> ui.SparkUI >> >> (Logging.scala:logInfo(59)) - Stopped Spark web UI at >> >> http://ip-10-20-80-37.us-west-1.compute.internal:4040 >> >> 2014-12-30 21:35:59,130 INFO [Yarn application state monitor] >> >> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stopping >> >> DAGScheduler >> >> 2014-12-30 21:35:59,131 INFO [Yarn application state monitor] >> >> cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - >> >> Shutting >> >> down all executors >> >> 2014-12-30 21:35:59,132 INFO >> >> [sparkD
Re: Shuffle Problems in 1.2.0
The issue has been sensitive to the number of executors and input data size. I'm using 2 executors with 4 cores each, 25GB of memory, 3800MB of memory overhead for YARN. This will fit onto Amazon r3 instance types. -Sven On Tue, Jan 6, 2015 at 12:46 AM, Davies Liu wrote: > I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not > reproduce your failure. Should I test it with big memory node? > > On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser wrote: > > Thanks for the input! I've managed to come up with a repro of the error > with > > test data only (and without any of the custom code in the original > script), > > please see here: > > https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md > > > > The Gist contains a data generator and the script reproducing the error > > (plus driver and executor logs). If I run using full cluster capacity (32 > > executors with 28GB), there are no issues. If I run on only two, the > error > > appears again and the job fails: > > > > org.apache.spark.SparkException: PairwiseRDD: unexpected value: > > List([B@294b55b7) > > > > > > Any thoughts or any obvious problems you can spot by any chance? > > > > Thank you! > > -Sven > > > > On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen wrote: > >> > >> It doesn’t seem like there’s a whole lot of clues to go on here without > >> seeing the job code. The original "org.apache.spark.SparkException: > >> PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests that > maybe > >> there’s an issue with PySpark’s serialization / tracking of types, but > it’s > >> hard to say from this error trace alone. > >> > >> On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com) > >> wrote: > >> > >> Hey Josh, > >> > >> I am still trying to prune this to a minimal example, but it has been > >> tricky since scale seems to be a factor. The job runs over ~720GB of > data > >> (the cluster's total RAM is around ~900GB, split across 32 executors). > I've > >> managed to run it over a vastly smaller data set without issues. > Curiously, > >> when I run it over slightly smaller data set of ~230GB (using sort-based > >> shuffle), my job also fails, but I see no shuffle errors in the executor > >> logs. All I see is the error below from the driver (this is also what > the > >> driver prints when erroring out on the large data set, but I assumed the > >> executor errors to be the root cause). > >> > >> Any idea on where to look in the interim for more hints? I'll continue > to > >> try to get to a minimal repro. > >> > >> 2014-12-30 21:35:34,539 INFO > >> [sparkDriver-akka.actor.default-dispatcher-14] > >> spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to > >> send map output locations for shuffle 0 to > >> sparkexecu...@ip-10-20-80-60.us-west-1.compute.internal:39739 > >> 2014-12-30 21:35:39,512 INFO > >> [sparkDriver-akka.actor.default-dispatcher-17] > >> spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to > >> send map output locations for shuffle 0 to > >> sparkexecu...@ip-10-20-80-62.us-west-1.compute.internal:42277 > >> 2014-12-30 21:35:58,893 WARN > >> [sparkDriver-akka.actor.default-dispatcher-16] > >> remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) - > >> Association with remote system > >> [akka.tcp://sparkyar...@ip-10-20-80-64.us-west-1.compute.internal:49584] > has > >> failed, address is now gated for [5000] ms. Reason is: [Disassociated]. > >> 2014-12-30 21:35:59,044 ERROR [Yarn application state monitor] > >> cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Yarn > >> application has already exited with state FINISHED! > >> 2014-12-30 21:35:59,056 INFO [Yarn application state monitor] > >> handler.ContextHandler (ContextHandler.java:doStop(788)) - stopped > >> o.e.j.s.ServletContextHandler{/stages/stage/kill,null} > >> > >> [...] > >> > >> 2014-12-30 21:35:59,111 INFO [Yarn application state monitor] > ui.SparkUI > >> (Logging.scala:logInfo(59)) - Stopped Spark web UI at > >> http://ip-10-20-80-37.us-west-1.compute.internal:4040 > >> 2014-12-30 21:35:59,130 INFO [Yarn application state monitor] > >> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stopping > DAGScheduler > >> 2014-12-30 21:35:59,131 INFO [Yarn application state monitor] > >> cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - > Shutting > >> down all executors > >> 2014-12-30 21:35:59,132 INFO > >> [sparkDriver-akka.actor.default-dispatcher-14] > >> cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Asking > each > >> executor to shut down > >> 2014-12-30 21:35:59,132 INFO [Thread-2] scheduler.DAGScheduler > >> (Logging.scala:logInfo(59)) - Job 1 failed: collect at > >> /home/hadoop/test_scripts/test.py:63, took 980.751936 s > >> Traceback (most recent call last): > >> File "/home/hadoop/test_scripts/test.py", line 63, in > >> result = j.collect() > >> File "/home/hadoop/spark/python/pyspark/rdd.py", line
Re: Shuffle Problems in 1.2.0
I had ran your scripts in 5 nodes ( 2 CPUs, 8G mem) cluster, can not reproduce your failure. Should I test it with big memory node? On Mon, Jan 5, 2015 at 4:00 PM, Sven Krasser wrote: > Thanks for the input! I've managed to come up with a repro of the error with > test data only (and without any of the custom code in the original script), > please see here: > https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md > > The Gist contains a data generator and the script reproducing the error > (plus driver and executor logs). If I run using full cluster capacity (32 > executors with 28GB), there are no issues. If I run on only two, the error > appears again and the job fails: > > org.apache.spark.SparkException: PairwiseRDD: unexpected value: > List([B@294b55b7) > > > Any thoughts or any obvious problems you can spot by any chance? > > Thank you! > -Sven > > On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen wrote: >> >> It doesn’t seem like there’s a whole lot of clues to go on here without >> seeing the job code. The original "org.apache.spark.SparkException: >> PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests that maybe >> there’s an issue with PySpark’s serialization / tracking of types, but it’s >> hard to say from this error trace alone. >> >> On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com) >> wrote: >> >> Hey Josh, >> >> I am still trying to prune this to a minimal example, but it has been >> tricky since scale seems to be a factor. The job runs over ~720GB of data >> (the cluster's total RAM is around ~900GB, split across 32 executors). I've >> managed to run it over a vastly smaller data set without issues. Curiously, >> when I run it over slightly smaller data set of ~230GB (using sort-based >> shuffle), my job also fails, but I see no shuffle errors in the executor >> logs. All I see is the error below from the driver (this is also what the >> driver prints when erroring out on the large data set, but I assumed the >> executor errors to be the root cause). >> >> Any idea on where to look in the interim for more hints? I'll continue to >> try to get to a minimal repro. >> >> 2014-12-30 21:35:34,539 INFO >> [sparkDriver-akka.actor.default-dispatcher-14] >> spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to >> send map output locations for shuffle 0 to >> sparkexecu...@ip-10-20-80-60.us-west-1.compute.internal:39739 >> 2014-12-30 21:35:39,512 INFO >> [sparkDriver-akka.actor.default-dispatcher-17] >> spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to >> send map output locations for shuffle 0 to >> sparkexecu...@ip-10-20-80-62.us-west-1.compute.internal:42277 >> 2014-12-30 21:35:58,893 WARN >> [sparkDriver-akka.actor.default-dispatcher-16] >> remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) - >> Association with remote system >> [akka.tcp://sparkyar...@ip-10-20-80-64.us-west-1.compute.internal:49584] has >> failed, address is now gated for [5000] ms. Reason is: [Disassociated]. >> 2014-12-30 21:35:59,044 ERROR [Yarn application state monitor] >> cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Yarn >> application has already exited with state FINISHED! >> 2014-12-30 21:35:59,056 INFO [Yarn application state monitor] >> handler.ContextHandler (ContextHandler.java:doStop(788)) - stopped >> o.e.j.s.ServletContextHandler{/stages/stage/kill,null} >> >> [...] >> >> 2014-12-30 21:35:59,111 INFO [Yarn application state monitor] ui.SparkUI >> (Logging.scala:logInfo(59)) - Stopped Spark web UI at >> http://ip-10-20-80-37.us-west-1.compute.internal:4040 >> 2014-12-30 21:35:59,130 INFO [Yarn application state monitor] >> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stopping DAGScheduler >> 2014-12-30 21:35:59,131 INFO [Yarn application state monitor] >> cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Shutting >> down all executors >> 2014-12-30 21:35:59,132 INFO >> [sparkDriver-akka.actor.default-dispatcher-14] >> cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Asking each >> executor to shut down >> 2014-12-30 21:35:59,132 INFO [Thread-2] scheduler.DAGScheduler >> (Logging.scala:logInfo(59)) - Job 1 failed: collect at >> /home/hadoop/test_scripts/test.py:63, took 980.751936 s >> Traceback (most recent call last): >> File "/home/hadoop/test_scripts/test.py", line 63, in >> result = j.collect() >> File "/home/hadoop/spark/python/pyspark/rdd.py", line 676, in collect >> bytesInJava = self._jrdd.collect().iterator() >> File >> "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", >> line 538, in __call__ >> File >> "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line >> 300, in get_return_value >> py4j.protocol.Py4JJavaError2014-12-30 21:35:59,140 INFO [Yarn application >> state monitor] cluster.YarnClientSchedulerBackend >> (Logging.scala:logInfo(59)) - Stopped >> : An error occu
Re: Shuffle Problems in 1.2.0
Thanks for the input! I've managed to come up with a repro of the error with test data only (and without any of the custom code in the original script), please see here: https://gist.github.com/skrasser/4bd7b41550988c8f6071#file-gistfile1-md The Gist contains a data generator and the script reproducing the error (plus driver and executor logs). If I run using full cluster capacity (32 executors with 28GB), there are no issues. If I run on only two, the error appears again and the job fails: org.apache.spark.SparkException: PairwiseRDD: unexpected value: List([B@294b55b7) Any thoughts or any obvious problems you can spot by any chance? Thank you! -Sven On Sun, Jan 4, 2015 at 1:11 PM, Josh Rosen wrote: > It doesn’t seem like there’s a whole lot of clues to go on here without > seeing the job code. The original "org.apache.spark.SparkException: > PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests that > maybe there’s an issue with PySpark’s serialization / tracking of types, > but it’s hard to say from this error trace alone. > > On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com) > wrote: > > Hey Josh, > > I am still trying to prune this to a minimal example, but it has been > tricky since scale seems to be a factor. The job runs over ~720GB of data > (the cluster's total RAM is around ~900GB, split across 32 executors). I've > managed to run it over a vastly smaller data set without issues. Curiously, > when I run it over slightly smaller data set of ~230GB (using sort-based > shuffle), my job also fails, but I see no shuffle errors in the executor > logs. All I see is the error below from the driver (this is also what the > driver prints when erroring out on the large data set, but I assumed the > executor errors to be the root cause). > > Any idea on where to look in the interim for more hints? I'll continue to > try to get to a minimal repro. > > 2014-12-30 21:35:34,539 INFO > [sparkDriver-akka.actor.default-dispatcher-14] > spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to > send map output locations for shuffle 0 to > sparkexecu...@ip-10-20-80-60.us-west-1.compute.internal:39739 > 2014-12-30 21:35:39,512 INFO > [sparkDriver-akka.actor.default-dispatcher-17] > spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to > send map output locations for shuffle 0 to > sparkexecu...@ip-10-20-80-62.us-west-1.compute.internal:42277 > 2014-12-30 21:35:58,893 WARN > [sparkDriver-akka.actor.default-dispatcher-16] > remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) - > Association with remote system > [akka.tcp://sparkyar...@ip-10-20-80-64.us-west-1.compute.internal:49584] > has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. > 2014-12-30 21:35:59,044 ERROR [Yarn application state monitor] > cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Yarn > application has already exited with state FINISHED! > 2014-12-30 21:35:59,056 INFO [Yarn application state monitor] > handler.ContextHandler (ContextHandler.java:doStop(788)) - stopped > o.e.j.s.ServletContextHandler{/stages/stage/kill,null} > > [...] > > 2014-12-30 21:35:59,111 INFO [Yarn application state monitor] ui.SparkUI > (Logging.scala:logInfo(59)) - Stopped Spark web UI at > http://ip-10-20-80-37.us-west-1.compute.internal:4040 > 2014-12-30 21:35:59,130 INFO [Yarn application state monitor] > scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stopping DAGScheduler > 2014-12-30 21:35:59,131 INFO [Yarn application state monitor] > cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Shutting > down all executors > 2014-12-30 21:35:59,132 INFO > [sparkDriver-akka.actor.default-dispatcher-14] > cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Asking > each executor to shut down > 2014-12-30 21:35:59,132 INFO [Thread-2] scheduler.DAGScheduler > (Logging.scala:logInfo(59)) - Job 1 failed: collect at > /home/hadoop/test_scripts/test.py:63, took 980.751936 s > Traceback (most recent call last): > File "/home/hadoop/test_scripts/test.py", line 63, in > result = j.collect() > File "/home/hadoop/spark/python/pyspark/rdd.py", line 676, in collect > bytesInJava = self._jrdd.collect().iterator() > File > "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", > line 538, in __call__ > File > "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line > 300, in get_return_value > py4j.protocol.Py4JJavaError2014-12-30 21:35:59,140 INFO [Yarn application > state monitor] cluster.YarnClientSchedulerBackend > (Logging.scala:logInfo(59)) - Stopped > : An error occurred while calling o117.collect. > : org.apache.spark.SparkException: Job cancelled because SparkContext was > shut down > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702) > at > org.apache.spark.scheduler.DAGScheduler$$
Re: Shuffle Problems in 1.2.0
It doesn’t seem like there’s a whole lot of clues to go on here without seeing the job code. The original "org.apache.spark.SparkException: PairwiseRDD: unexpected value: List([B@130dc7ad)” error suggests that maybe there’s an issue with PySpark’s serialization / tracking of types, but it’s hard to say from this error trace alone. On December 30, 2014 at 5:17:08 PM, Sven Krasser (kras...@gmail.com) wrote: Hey Josh, I am still trying to prune this to a minimal example, but it has been tricky since scale seems to be a factor. The job runs over ~720GB of data (the cluster's total RAM is around ~900GB, split across 32 executors). I've managed to run it over a vastly smaller data set without issues. Curiously, when I run it over slightly smaller data set of ~230GB (using sort-based shuffle), my job also fails, but I see no shuffle errors in the executor logs. All I see is the error below from the driver (this is also what the driver prints when erroring out on the large data set, but I assumed the executor errors to be the root cause). Any idea on where to look in the interim for more hints? I'll continue to try to get to a minimal repro. 2014-12-30 21:35:34,539 INFO [sparkDriver-akka.actor.default-dispatcher-14] spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to send map output locations for shuffle 0 to sparkexecu...@ip-10-20-80-60.us-west-1.compute.internal:39739 2014-12-30 21:35:39,512 INFO [sparkDriver-akka.actor.default-dispatcher-17] spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to send map output locations for shuffle 0 to sparkexecu...@ip-10-20-80-62.us-west-1.compute.internal:42277 2014-12-30 21:35:58,893 WARN [sparkDriver-akka.actor.default-dispatcher-16] remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system [akka.tcp://sparkyar...@ip-10-20-80-64.us-west-1.compute.internal:49584] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 2014-12-30 21:35:59,044 ERROR [Yarn application state monitor] cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Yarn application has already exited with state FINISHED! 2014-12-30 21:35:59,056 INFO [Yarn application state monitor] handler.ContextHandler (ContextHandler.java:doStop(788)) - stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null} [...] 2014-12-30 21:35:59,111 INFO [Yarn application state monitor] ui.SparkUI (Logging.scala:logInfo(59)) - Stopped Spark web UI at http://ip-10-20-80-37.us-west-1.compute.internal:4040 2014-12-30 21:35:59,130 INFO [Yarn application state monitor] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stopping DAGScheduler 2014-12-30 21:35:59,131 INFO [Yarn application state monitor] cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Shutting down all executors 2014-12-30 21:35:59,132 INFO [sparkDriver-akka.actor.default-dispatcher-14] cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Asking each executor to shut down 2014-12-30 21:35:59,132 INFO [Thread-2] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Job 1 failed: collect at /home/hadoop/test_scripts/test.py:63, took 980.751936 s Traceback (most recent call last): File "/home/hadoop/test_scripts/test.py", line 63, in result = j.collect() File "/home/hadoop/spark/python/pyspark/rdd.py", line 676, in collect bytesInJava = self._jrdd.collect().iterator() File "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError2014-12-30 21:35:59,140 INFO [Yarn application state monitor] cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Stopped : An error occurred while calling o117.collect. : org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:701) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:701) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1428) at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1375) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) at akka.actor.ActorCell.terminate(ActorCell.scala:369) at akka.actor.ActorCell.invokeAll$1(
Re: Shuffle Problems in 1.2.0
Hey Josh, I am still trying to prune this to a minimal example, but it has been tricky since scale seems to be a factor. The job runs over ~720GB of data (the cluster's total RAM is around ~900GB, split across 32 executors). I've managed to run it over a vastly smaller data set without issues. Curiously, when I run it over slightly smaller data set of ~230GB (using sort-based shuffle), my job also fails, but I see no shuffle errors in the executor logs. All I see is the error below from the driver (this is also what the driver prints when erroring out on the large data set, but I assumed the executor errors to be the root cause). Any idea on where to look in the interim for more hints? I'll continue to try to get to a minimal repro. 2014-12-30 21:35:34,539 INFO [sparkDriver-akka.actor.default-dispatcher-14] spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to send map output locations for shuffle 0 to sparkexecu...@ip-10-20-80-60.us-west-1.compute.internal:39739 2014-12-30 21:35:39,512 INFO [sparkDriver-akka.actor.default-dispatcher-17] spark.MapOutputTrackerMasterActor (Logging.scala:logInfo(59)) - Asked to send map output locations for shuffle 0 to sparkexecu...@ip-10-20-80-62.us-west-1.compute.internal:42277 2014-12-30 21:35:58,893 WARN [sparkDriver-akka.actor.default-dispatcher-16] remote.ReliableDeliverySupervisor (Slf4jLogger.scala:apply$mcV$sp(71)) - Association with remote system [akka.tcp://sparkyar...@ip-10-20-80-64.us-west-1.compute.internal:49584] has failed, address is now gated for [5000] ms. Reason is: [Disassociated]. 2014-12-30 21:35:59,044 ERROR [Yarn application state monitor] cluster.YarnClientSchedulerBackend (Logging.scala:logError(75)) - Yarn application has already exited with state FINISHED! 2014-12-30 21:35:59,056 INFO [Yarn application state monitor] handler.ContextHandler (ContextHandler.java:doStop(788)) - stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null} [...] 2014-12-30 21:35:59,111 INFO [Yarn application state monitor] ui.SparkUI (Logging.scala:logInfo(59)) - Stopped Spark web UI at http://ip-10-20-80-37.us-west-1.compute.internal:4040 2014-12-30 21:35:59,130 INFO [Yarn application state monitor] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Stopping DAGScheduler 2014-12-30 21:35:59,131 INFO [Yarn application state monitor] cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Shutting down all executors 2014-12-30 21:35:59,132 INFO [sparkDriver-akka.actor.default-dispatcher-14] cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Asking each executor to shut down 2014-12-30 21:35:59,132 INFO [Thread-2] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Job 1 failed: collect at /home/hadoop/test_scripts/test.py:63, took 980.751936 s Traceback (most recent call last): File "/home/hadoop/test_scripts/test.py", line 63, in result = j.collect() File "/home/hadoop/spark/python/pyspark/rdd.py", line 676, in collect bytesInJava = self._jrdd.collect().iterator() File "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError2014-12-30 21:35:59,140 INFO [Yarn application state monitor] cluster.YarnClientSchedulerBackend (Logging.scala:logInfo(59)) - Stopped : An error occurred while calling o117.collect. : org.apache.spark.SparkException: Job cancelled because SparkContext was shut down at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702) at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:701) at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:701) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1428) at akka.actor.Actor$class.aroundPostStop(Actor.scala:475) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundPostStop(DAGScheduler.scala:1375) at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) at akka.actor.ActorCell.terminate(ActorCell.scala:369) at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue
Re: Shuffle Problems in 1.2.0
Hi Sven, Do you have a small example program that you can share which will allow me to reproduce this issue? If you have a workload that runs into this, you should be able to keep iteratively simplifying the job and reducing the data set size until you hit a fairly minimal reproduction (assuming the issue is deterministic, which it sounds like it is). On Tue, Dec 30, 2014 at 9:49 AM, Sven Krasser wrote: > Hey all, > > Since upgrading to 1.2.0 a pyspark job that worked fine in 1.1.1 fails > during shuffle. I've tried reverting from the sort-based shuffle back to > the hash one, and that fails as well. Does anyone see similar problems or > has an idea on where to look next? > > For the sort-based shuffle I get a bunch of exception like this in the > executor logs: > > 2014-12-30 03:13:04,061 ERROR [Executor task launch worker-2] > executor.Executor (Logging.scala:logError(96)) - Exception in task 4523.0 in > stage 1.0 (TID 4524) > org.apache.spark.SparkException: PairwiseRDD: unexpected value: > List([B@130dc7ad) > at > org.apache.spark.api.python.PairwiseRDD$$anonfun$compute$2.apply(PythonRDD.scala:307) > at > org.apache.spark.api.python.PairwiseRDD$$anonfun$compute$2.apply(PythonRDD.scala:305) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:219) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > For the hash-based shuffle, there are now a bunch of these exceptions in the > logs: > > > 2014-12-30 04:14:01,688 ERROR [Executor task launch worker-0] > executor.Executor (Logging.scala:logError(96)) - Exception in task 4479.0 in > stage 1.0 (TID 4480) > java.io.FileNotFoundException: > /mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1419905501183_0004/spark-local-20141230035728-8fc0/23/merged_shuffle_1_68_0 > (No such file or directory) > at java.io.FileOutputStream.open(Native Method) > at java.io.FileOutputStream.(FileOutputStream.java:221) > at > org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) > at > org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192) > at > org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67) > at > org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > Thank you! > -Sven > > > > -- > http://sites.google.com/site/krasser/?utm_source=sig >
Shuffle Problems in 1.2.0
Hey all, Since upgrading to 1.2.0 a pyspark job that worked fine in 1.1.1 fails during shuffle. I've tried reverting from the sort-based shuffle back to the hash one, and that fails as well. Does anyone see similar problems or has an idea on where to look next? For the sort-based shuffle I get a bunch of exception like this in the executor logs: 2014-12-30 03:13:04,061 ERROR [Executor task launch worker-2] executor.Executor (Logging.scala:logError(96)) - Exception in task 4523.0 in stage 1.0 (TID 4524) org.apache.spark.SparkException: PairwiseRDD: unexpected value: List([B@130dc7ad) at org.apache.spark.api.python.PairwiseRDD$$anonfun$compute$2.apply(PythonRDD.scala:307) at org.apache.spark.api.python.PairwiseRDD$$anonfun$compute$2.apply(PythonRDD.scala:305) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:219) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:65) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) For the hash-based shuffle, there are now a bunch of these exceptions in the logs: 2014-12-30 04:14:01,688 ERROR [Executor task launch worker-0] executor.Executor (Logging.scala:logError(96)) - Exception in task 4479.0 in stage 1.0 (TID 4480) java.io.FileNotFoundException: /mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1419905501183_0004/spark-local-20141230035728-8fc0/23/merged_shuffle_1_68_0 (No such file or directory) at java.io.FileOutputStream.open(Native Method) at java.io.FileOutputStream.(FileOutputStream.java:221) at org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192) at org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67) at org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Thank you! -Sven -- http://sites.google.com/site/krasser/?utm_source=sig