Is anyone using Amazon EC2?
I used Spark on EC2 a while ago
Is anyone using Amazon EC2? (second attempt!)
I used Spark on EC2 a while ago, but recent revisions seem to have broken the functionality. Is anyone actually using Spark on EC2 at the moment? The bug in question is: https://issues.apache.org/jira/browse/SPARK-5008 It makes it impossible to use persistent HDFS without a workround on each slave node. No-one seems to be interested in the bug, so I wonder if other people aren't actually having this problem. If this is the case, any suggestions? Joe
Re: Is anyone using Amazon EC2?
Sorry guys, my email submitted before I finished writing it. Check my other message (with the same subject)! On 23 May 2015 at 20:25, Shafaq wrote: > Yes-Spark EC2 cluster . Looking into migrating to spark emr. > Adding more ec2 is not possible afaik. > On May 23, 2015 11:22 AM, "Johan Beisser" wrote: > >> Yes. >> >> We're looking at bootstrapping in EMR... >> On Sat, May 23, 2015 at 07:21 Joe Wass wrote: >> >>> I used Spark on EC2 a while ago >>> >>
Re: Spark dramatically slow when I add "saveAsTextFile"
This may sound like an obvious question, but are you sure that the program is doing any work when you don't have a saveAsTextFile? If there are transformations but no actions to actually collect the data, there's no need for Spark to execute the transformations. As to the question of 'is this taking too long', I can't answer that. But your code was HTML escaped and therefore difficult to read, perhaps you should post a link to a Gist. Joe On 24 May 2015 at 10:36, allanjie wrote: > *Problem Description*: > > The program running in stand-alone spark cluster (1 master, 6 workers with > 8g ram and 2 cores). > Input: a 468MB file with 133433 records stored in HDFS. > Output: just 2MB file will stored in HDFS > The program has two map operations and one reduceByKey operation. > Finally I save the result to HDFS using "*saveAsTextFile*". > *Problem*: if I don't add "saveAsTextFile", the program runs very fast(a > few > seconds), otherwise extremely slow until about 30 mins. > > *My program (is very Simple)* > public static void main(String[] args) throws IOException{ > /**Parameter Setting***/ > String localPointPath = > "/home/hduser/skyrock/skyrockImageFeatures.csv"; > String remoteFilePath = > "hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv"; > String outputPath = > "hdfs://HadoopV26Master:9000/user/sparkoutput/"; > final int row = 133433; > final int col = 458; > final double dc = Double.valueOf(args[0]); > > SparkConf conf = new SparkConf(). > setAppName("distance") > .set("spark.executor.memory", > "4g").set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > .set("spark.eventLog.enabled", "true"); > JavaSparkContext sc = new JavaSparkContext(conf); > > JavaRDD textFile = sc.textFile(remoteFilePath); > > //Broadcast variable, the dimension of this double array: > 133433*458 > final Broadcast broadcastPoints = > sc.broadcast(createBroadcastPoints(localPointPath,row,col)); > /** > * Compute the distance in terms of each point on each > instance. > * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1 > */ > JavaPairRDD distance = > textFile.flatMapToPair(new > PairFlatMapFunction(){ > public Iterable> > call(String v1) throws > Exception{ > List al = > Arrays.asList(v1.split(",")); > double[] featureVals = new > double[al.size()]; > for(int j=0;j featureVals[j] = > Double.valueOf(al.get(j+1)); > int jIndex = Integer.valueOf(al.get(0)); > double[][] allPoints = > broadcastPoints.value(); > double sum = 0; > List> list = > new > ArrayList>(); > for(int i=0;i sum = 0; > for(int j=0;j sum += > (allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]); > } > list.add(new > Tuple2 (jIndex, Math.sqrt(sum) )); > } > return list; > } > }); > > //Create zeroOne density > JavaPairRDD densityZeroOne = > distance.mapValues(new > Function(){ > public Integer call(Double v1) throws Exception { > if(v1 return 1; > else return 0; > } > > }); > // //Combine the density > JavaPairRDD counts = > densityZeroOne.reduceByKey(new > Function2() { > public Integer call(Integer v1, Integer > v2) throws Exception { > return v1+v2; > } > }); > counts.*saveAsTextFile*(outputPath+args[1]); > sc.stop(); > } > > *If I comment "saveAsTextFile", log will be:* > Picked up _JAVA_OPTIONS: -Xmx4g > 15/05/24 15:21:30 INFO spark.SparkContext: Running Spark version 1.3.1 > 15/05/24 15:21:30 WARN util.NativeCodeLoader: Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > 15/0
Are failures normal / to be expected on an AWS cluster?
I have a Spark job running on about 300 GB of log files, on Amazon EC2, with 10 x Large instances (each with 600 GB disk). The job hasn't yet completed. So far, 18 stages have completed (2 of which have retries) and 3 stages have failed. In each failed stage there are ~160 successful tasks, but "CANNOT FIND ADDRESS" for half of the executors. Are these numbers normal for AWS? Should a certain number of faults be expected? I know that AWS isn't meant to be perfect, but this doesn't seem right. Cheers Joe
PermGen issues on AWS
I'm running on an AWS cluster of 10 x m1.large (64 bit, 7.5 GiB RAM). FWIW I'm using the Flambo Clojure wrapper which uses the Java API but I don't think that should make any difference. I'm running with the following command: spark/bin/spark-submit --class mything.core --name "My Thing" --conf spark.yarn.executor.memoryOverhead=4096 --conf spark.executor.extraJavaOptions="-XX:+CMSClassUnloadingEnabled -XX:+CMSPermGenSweepingEnabled" /root/spark/code/myjar.jar For one of the stages I'm getting errors: - ExecutorLostFailure (executor lost) - Resubmitted (resubmitted due to lost executor) And I think they're caused by slave executor JVMs dying up with this error: java.lang.OutOfMemoryError: PermGen space java.lang.Class.getDeclaredConstructors0(Native Method) java.lang.Class.privateGetDeclaredConstructors(Class.java:2585) java.lang.Class.getConstructor0(Class.java:2885) java.lang.Class.newInstance(Class.java:350) sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:399) sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:396) java.security.AccessController.doPrivileged(Native Method) sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:395) sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:113) sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:331) java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376) java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72) java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493) java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468) java.security.AccessController.doPrivileged(Native Method) java.io.ObjectStreamClass.(ObjectStreamClass.java:468) java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365) java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602) java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 1 stage out of 14 (so far) is failing. My failing stage is 1768 succeeded / 1862 (940 failed). 7 tasks failed with OOM, 919 were "Resubmitted (resubmitted due to lost executor)". Now my "Aggregated Metrics by Executor" shows that 10 out of 16 executors show "CANNOT FIND ADDRESS" which I imagine means the JVM blew up and hasn't been restarted. Now the 'Executors' tab shows only 7 executors. - Is this normal? - Any ideas why this is happening? - Any other measures I can take to prevent this? - Is the rest of my app going to run on a reduced number of executors? - Can I re-start the executors mid-application? This is a long-running job, so I'd like to do what I can whilst it's running, if possible. - Am I correct in thinking that the --conf arguments are supplied to the JVMs of the slave executors, so they will be receiving the extraJavaOptions and memoryOverhead? Thanks very much! Joe
Re: PermGen issues on AWS
Thanks, I noticed this after posting. I'll try that. I also think that perhaps Clojure might be creating more classes than the equivalent Java would, so I'll nudge it a bit higher. On 9 January 2015 at 11:45, Sean Owen wrote: > It's normal for PermGen to be a bit more of an issue with Spark than > for other JVM-based applications. You should simply increase the > PermGen size, which I don't see in your command. -XX:MaxPermSize=256m > allows it to grow to 256m for example. The right size depends on your > total heap size and app. > > Also, Java 8 no longer has a permanent generation, so this particular > type of problem and tuning is not needed. You might consider running > on Java 8. > > On Fri, Jan 9, 2015 at 10:38 AM, Joe Wass wrote: > > I'm running on an AWS cluster of 10 x m1.large (64 bit, 7.5 GiB RAM). > FWIW > > I'm using the Flambo Clojure wrapper which uses the Java API but I don't > > think that should make any difference. I'm running with the following > > command: > > > > spark/bin/spark-submit --class mything.core --name "My Thing" --conf > > spark.yarn.executor.memoryOverhead=4096 --conf > > spark.executor.extraJavaOptions="-XX:+CMSClassUnloadingEnabled > > -XX:+CMSPermGenSweepingEnabled" /root/spark/code/myjar.jar > > > > For one of the stages I'm getting errors: > > > > - ExecutorLostFailure (executor lost) > > - Resubmitted (resubmitted due to lost executor) > > > > And I think they're caused by slave executor JVMs dying up with this > error: > > > > java.lang.OutOfMemoryError: PermGen space > > java.lang.Class.getDeclaredConstructors0(Native Method) > > java.lang.Class.privateGetDeclaredConstructors(Class.java:2585) > > java.lang.Class.getConstructor0(Class.java:2885) > > java.lang.Class.newInstance(Class.java:350) > > > > > sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:399) > > > > > sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:396) > > java.security.AccessController.doPrivileged(Native Method) > > > > > sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:395) > > > > > sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:113) > > > > > sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:331) > > > > > java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376) > > java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72) > > java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493) > > java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468) > > java.security.AccessController.doPrivileged(Native Method) > > java.io.ObjectStreamClass.(ObjectStreamClass.java:468) > > java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365) > > > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602) > > > > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) > > > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > > > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > > > > > > 1 stage out of 14 (so far) is failing. My failing stage is 1768 > succeeded / > > 1862 (940 failed). 7 tasks failed with OOM, 919 were "Resubmitted > > (resubmitted due to lost executor)". > > > > Now my "Aggregated Metrics by Executor" shows that 10 out of 16 executors > > show "CANNOT FIND ADDRESS" which I imagin
Accidental kill in UI
So I had a Spark job with various failures, and I decided to kill it and start again. I clicked the 'kill' link in the web console, restarted the job on the command line and headed back to the web console and refreshed to see how my job was doing... the URL at the time was: /stages/stage/kill?id=1&terminate=true Which of course terminated the stage again. No loss, but if I'd waited a few hours before doing that, I would have lost data. I know to be careful next time, but isn't 'don't modify state as a result of a GET request' the first rule of HTTP? It could lead to an expensive mistake. Making this a POST would be a simple fix. Does anyone else think this is worth creating an issue for?
ephemeral-hdfs vs persistent-hdfs - performance
I want to process about 800 GB of data on an Amazon EC2 cluster. So, I need to store the input in HDFS somehow. I currently have a cluster of 5 x m3.xlarge, each of which has 80GB disk. Each HDFS node reports 73 GB, and the total capacity is ~370 GB. If I want to process 800 GB of data (assuming I can't split the jobs up), I'm guessing I need to get persistent-hdfs involved. 1 - Does persistent-hdfs have noticeably different performance than ephemeral-hdfs? 2 - If so, is there a recommended configuration (like storing input and output on persistent, but persisted RDDs on ephemeral?) This seems like a common use-case, so sorry if this has already been covered. Joe
Re: ephemeral-hdfs vs persistent-hdfs - performance
The data is coming from S3 in the first place, and the results will be uploaded back there. But even in the same availability zone, fetching 170 GB (that's gzipped) is slow. From what I understand of the pipelines, multiple transforms on the same RDD might involve re-reading the input, which very quickly add up in comparison to having the data locally. Unless I persisted the data (which I am in fact doing) but that would involve storing approximately the same amount of data in HDFS, which wouldn't fit. Also, I understood that S3 was unsuitable for practical? See "Why you cannot use S3 as a replacement for HDFS"[0]. I'd love to be proved wrong, though, that would make things a lot easier. [0] http://wiki.apache.org/hadoop/AmazonS3 On 3 February 2015 at 16:45, David Rosenstrauch wrote: > You could also just push the data to Amazon S3, which would un-link the > size of the cluster needed to process the data from the size of the data. > > DR > > > On 02/03/2015 11:43 AM, Joe Wass wrote: > >> I want to process about 800 GB of data on an Amazon EC2 cluster. So, I >> need >> to store the input in HDFS somehow. >> >> I currently have a cluster of 5 x m3.xlarge, each of which has 80GB disk. >> Each HDFS node reports 73 GB, and the total capacity is ~370 GB. >> >> If I want to process 800 GB of data (assuming I can't split the jobs up), >> I'm guessing I need to get persistent-hdfs involved. >> >> 1 - Does persistent-hdfs have noticeably different performance than >> ephemeral-hdfs? >> 2 - If so, is there a recommended configuration (like storing input and >> output on persistent, but persisted RDDs on ephemeral?) >> >> This seems like a common use-case, so sorry if this has already been >> covered. >> >> Joe >> >> > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Kyro serialization and OOM
I have about 500 MB of data and I'm trying to process it on a single `local` instance. I'm getting an Out of Memory exception. Stack trace at the end. Spark 1.1.1 My JVM has --Xmx2g spark.driver.memory = 1000M spark.executor.memory = 1000M spark.kryoserializer.buffer.mb = 256 spark.kryoserializer.buffer.max.mb = 256 The objects I'm dealing with are well constrained. Each can be no more than 500 bytes at the very most. I ran into problems with the kryo buffer being too small but I think that 256 MB should do the job. The docs say "This must be larger than any object you attempt to serialize". No danger of that. My input is a single file (on average each line is 500 bytes). I'm performing various filter, map, flatMap, groupByKey and reduceByKey. The only 'actions' I'm performing are foreach, which inserts values into a database. On input, I'm parsing the lines and then persisting with DISK_ONLY. I'm foreaching over the keys and then foreaching over the values of key value RDDs. The docs say that groupByKey returns (K, Iterable). So the values (which can be large) shouldn't be serialized as a single list. So I don't think I should be loading anything larger than 256 MB at once. My code works for small sample toy data and I'm trying it out on a bit more. As I understand it, the way that Spark partitions data means that it (in most cases) any job that will run on a cluster will also run on a single instance, given enough time. I think I've given enough memory to cover my serialization needs as I understand them. Have I misunderstood? Joe Stack trace: INFO org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 30.0 (TID 116, localhost, PROCESS_LOCAL, 993 bytes) INFO org.apache.spark.executor.Executor - Running task 0.0 in stage 30.0 (TID 116) ... ERROR org.apache.spark.executor.Executor - Exception in task 0.0 in stage 30.0 (TID 116) java.lang.OutOfMemoryError: Java heap space at com.esotericsoftware.kryo.io.Output.(Output.java:35) at org.apache.spark.serializer.KryoSerializer.newKryoOutput(KryoSerializer.scala:58) at org.apache.spark.serializer.KryoSerializerInstance.output$lzycompute(KryoSerializer.scala:151) at org.apache.spark.serializer.KryoSerializerInstance.output(KryoSerializer.scala:151) at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:155) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:188) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ... WARN org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage 30.0 (TID 116, localhost): java.lang.OutOfMemoryError: Java heap space com.esotericsoftware.kryo.io.Output.(Output.java:35) org.apache.spark.serializer.KryoSerializer.newKryoOutput(KryoSerializer.scala:58) org.apache.spark.serializer.KryoSerializerInstance.output$lzycompute(KryoSerializer.scala:151) org.apache.spark.serializer.KryoSerializerInstance.output(KryoSerializer.scala:151) org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:155) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:188) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)
Re: ephemeral-hdfs vs persistent-hdfs - performance
Thanks very much, that's good to know, I'll certainly give it a look. Can you give me a hint about you unzip your input files on the fly? I thought that it wasn't possible to parallelize zipped inputs unless they were unzipped before passing to Spark? Joe On 3 February 2015 at 17:48, David Rosenstrauch wrote: > We use S3 as a main storage for all our input data and our generated > (output) data. (10's of terabytes of data daily.) We read gzipped data > directly from S3 in our Hadoop/Spark jobs - it's not crazily slow, as long > as you parallelize the work well by distributing the processing across > enough machines. (About 100 nodes, in our case.) > > The way we generally operate is re: storage is: read input directly from > s3, write output from Hadoop/Spark jobs to HDFS, then after job is complete > distcp the relevant output from HDFS back to S3. Works for us ... YMMV. > :-) > > HTH, > > DR > > > On 02/03/2015 12:32 PM, Joe Wass wrote: > >> The data is coming from S3 in the first place, and the results will be >> uploaded back there. But even in the same availability zone, fetching 170 >> GB (that's gzipped) is slow. From what I understand of the pipelines, >> multiple transforms on the same RDD might involve re-reading the input, >> which very quickly add up in comparison to having the data locally. Unless >> I persisted the data (which I am in fact doing) but that would involve >> storing approximately the same amount of data in HDFS, which wouldn't fit. >> >> Also, I understood that S3 was unsuitable for practical? See "Why you >> cannot use S3 as a replacement for HDFS"[0]. I'd love to be proved wrong, >> though, that would make things a lot easier. >> >> [0] http://wiki.apache.org/hadoop/AmazonS3 >> >> >> >> On 3 February 2015 at 16:45, David Rosenstrauch >> wrote: >> >> You could also just push the data to Amazon S3, which would un-link the >>> size of the cluster needed to process the data from the size of the data. >>> >>> DR >>> >>> >>> On 02/03/2015 11:43 AM, Joe Wass wrote: >>> >>> I want to process about 800 GB of data on an Amazon EC2 cluster. So, I >>>> need >>>> to store the input in HDFS somehow. >>>> >>>> I currently have a cluster of 5 x m3.xlarge, each of which has 80GB >>>> disk. >>>> Each HDFS node reports 73 GB, and the total capacity is ~370 GB. >>>> >>>> If I want to process 800 GB of data (assuming I can't split the jobs >>>> up), >>>> I'm guessing I need to get persistent-hdfs involved. >>>> >>>> 1 - Does persistent-hdfs have noticeably different performance than >>>> ephemeral-hdfs? >>>> 2 - If so, is there a recommended configuration (like storing input and >>>> output on persistent, but persisted RDDs on ephemeral?) >>>> >>>> This seems like a common use-case, so sorry if this has already been >>>> covered. >>>> >>>> Joe >>>> >>>> >>>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >>> >> > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
How many stages in my application?
I'm sitting here looking at my application crunching gigabytes of data on a cluster and I have no idea if it's an hour away from completion or a minute. The web UI shows progress through each stage, but not how many stages remaining. How can I work out how many stages my program will take automatically? My application has a slightly interesting DAG (re-use of functions that contain Spark transformations, persistent RDDs). Not that complex, but not 'step 1, step 2, step 3'. I'm guessing that if the driver program runs sequentially sending messages to Spark, then Spark has no knowledge of the structure of the driver program. Therefore it's necessary to execute it on a small test dataset and see how many stages result? When I set spark.eventLog.enabled = true and run on (very small) test data I don't get any stage messages in my STDOUT or in the log file. This is on a `local` instance. Did I miss something obvious? Thanks! Joe
Re: How many stages in my application?
Thanks Akhil and Mark. I can of course count events (assuming I can deduce the shuffle boundaries), but like I said the program isn't simple and I'd have to do this manually every time I change the code. So I rather find a way of doing this automatically if possible. On 4 February 2015 at 19:41, Mark Hamstra wrote: > But there isn't a 1-1 mapping from operations to stages since multiple > operations will be pipelined into a single stage if no shuffle is > required. To determine the number of stages in a job you really need to be > looking for shuffle boundaries. > > On Wed, Feb 4, 2015 at 11:27 AM, Akhil Das > wrote: > >> You can easily understand the flow by looking at the number of operations >> in your program (like map, groupBy, join etc.), first of all you list out >> the number of operations happening in your application and then from the >> webui you will be able to see how many operations have happened so far. >> >> Thanks >> Best Regards >> >> On Wed, Feb 4, 2015 at 4:33 PM, Joe Wass wrote: >> >>> I'm sitting here looking at my application crunching gigabytes of data >>> on a cluster and I have no idea if it's an hour away from completion or a >>> minute. The web UI shows progress through each stage, but not how many >>> stages remaining. How can I work out how many stages my program will take >>> automatically? >>> >>> My application has a slightly interesting DAG (re-use of functions that >>> contain Spark transformations, persistent RDDs). Not that complex, but not >>> 'step 1, step 2, step 3'. >>> >>> I'm guessing that if the driver program runs sequentially sending >>> messages to Spark, then Spark has no knowledge of the structure of the >>> driver program. Therefore it's necessary to execute it on a small test >>> dataset and see how many stages result? >>> >>> When I set spark.eventLog.enabled = true and run on (very small) test >>> data I don't get any stage messages in my STDOUT or in the log file. This >>> is on a `local` instance. >>> >>> Did I miss something obvious? >>> >>> Thanks! >>> >>> Joe >>> >> >> >
How do I set spark.local.dirs?
I'm running on EC2 and I want to set the directory to use on the slaves (mounted EBS volumes). I have set: spark.local.dir /vol3/my-spark-dir in /root/spark/conf/spark-defaults.conf and replicated to all nodes. I have verified that in the console the value in the config corresponds. I have checked that these values are present in nodes. But it's still creating temp files in the wrong (default) place: /mnt2/spark How do I get my slaves to pick up this value? How can I verify that they have? Thanks! Joe
Has Spark 1.2.0 changed EC2 persistent-hdfs?
I've updated to Spark 1.2.0 and the EC2 and the persistent-hdfs behaviour appears to have changed. My launch script is spark-1.2.0-bin-hadoop2.4/ec2/spark-ec2 --instance-type=m3.xlarge -s 5 --ebs-vol-size=1000 launch myproject When I ssh into master I get: $ df -h FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 2.9G 5.0G 37% / tmpfs 7.3G 0 7.3G 0% /dev/shm /dev/xvdb 37G 1.3G 34G 4% /mnt /dev/xvdc 37G 177M 35G 1% /mnt2 /dev/xvds1000G 33M 1000G 1% /vol0 that /vol0 is the place I want (and assume) persistent-hdfs to go. But when I look at the size I get: $ persistent-hdfs/bin/start-all.sh $ persistent-hdfs/bin/hadoop dfsadmin -report Warning: $HADOOP_HOME is deprecated. Configured Capacity: 42275430400 (39.37 GB) Present Capacity: 2644878 (24.63 GB) DFS Remaining: 26448601088 (24.63 GB) DFS Used: 143360 (140 KB) DFS Used%: 0% Under replicated blocks: 0 Blocks with corrupt replicas: 0 Missing blocks: 0 - Datanodes available: 5 (5 total, 0 dead) Name: 10.46.11.156:60010 Decommission Status : Normal Configured Capacity: 8455086080 (7.87 GB) DFS Used: 28672 (28 KB) Non DFS Used: 3165372416 (2.95 GB) DFS Remaining: 5289684992(4.93 GB) DFS Used%: 0% DFS Remaining%: 62.56% Last contact: Fri Feb 13 17:41:46 UTC 2015 Name: 10.41.51.155:60010 Decommission Status : Normal Configured Capacity: 8455086080 (7.87 GB) DFS Used: 28672 (28 KB) Non DFS Used: 3165364224 (2.95 GB) DFS Remaining: 5289693184(4.93 GB) DFS Used%: 0% DFS Remaining%: 62.56% Last contact: Fri Feb 13 17:41:46 UTC 2015 Name: 10.38.30.254:60010 Decommission Status : Normal Configured Capacity: 8455086080 (7.87 GB) DFS Used: 28672 (28 KB) Non DFS Used: 3165249536 (2.95 GB) DFS Remaining: 5289807872(4.93 GB) DFS Used%: 0% DFS Remaining%: 62.56% Last contact: Fri Feb 13 17:41:46 UTC 2015 Name: 10.204.134.84:60010 Decommission Status : Normal Configured Capacity: 8455086080 (7.87 GB) DFS Used: 28672 (28 KB) Non DFS Used: 3165343744 (2.95 GB) DFS Remaining: 5289713664(4.93 GB) DFS Used%: 0% DFS Remaining%: 62.56% Last contact: Fri Feb 13 17:41:46 UTC 2015 Name: 10.33.15.134:60010 Decommission Status : Normal Configured Capacity: 8455086080 (7.87 GB) DFS Used: 28672 (28 KB) Non DFS Used: 3165356032 (2.95 GB) DFS Remaining: 5289701376(4.93 GB) DFS Used%: 0% DFS Remaining%: 62.56% Last contact: Fri Feb 13 17:41:46 UTC 2015 That's tiny. My suspicions are aroused when I see: $ ls /vol persistent-hdfs /vol is on the small /dev/xvda1 not the large EBS /dev/xvds I thought I'd be able to edit persistent-hdfs/conf/core-site.xml to change the volume: hadoop.tmp.dir /vol0/persistent-hdfs And then persistent-hdfs/bin/stop-all.sh && persistent-hdfs/bin/start-all.sh but when I do that, the persistent HDFS won't start for whatever reason. I can't run $ persistent-hdfs/bin/hadoop dfsadmin -report 15/02/13 18:50:25 INFO ipc.Client: Retrying connect to server: ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010. Already tried 0 time(s). 15/02/13 18:50:26 INFO ipc.Client: Retrying connect to server: ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010. Already tried 1 time(s). 15/02/13 18:50:27 INFO ipc.Client: Retrying connect to server: ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010. Already tried 2 time(s). So, it looks like I can't use the EBS for persistent-hdfs. I was doing it before, so something must have changed in the last couple of weeks (last time I was using 1.1.0). Is this a bug? Has the behaviour of AWS changed? Am I doing something stupid? How do I fix it? Thanks in advance! Joe
Re: Has Spark 1.2.0 changed EC2 persistent-hdfs?
Looks like this is caused by issue SPARK-5008: https://issues.apache.org/jira/browse/SPARK-5008 On 13 February 2015 at 19:04, Joe Wass wrote: > I've updated to Spark 1.2.0 and the EC2 and the persistent-hdfs behaviour > appears to have changed. > > My launch script is > > spark-1.2.0-bin-hadoop2.4/ec2/spark-ec2 --instance-type=m3.xlarge -s 5 > --ebs-vol-size=1000 launch myproject > > When I ssh into master I get: > > $ df -h > FilesystemSize Used Avail Use% Mounted on > /dev/xvda17.9G 2.9G 5.0G 37% / > tmpfs 7.3G 0 7.3G 0% /dev/shm > /dev/xvdb 37G 1.3G 34G 4% /mnt > /dev/xvdc 37G 177M 35G 1% /mnt2 > /dev/xvds1000G 33M 1000G 1% /vol0 > > that /vol0 is the place I want (and assume) persistent-hdfs to go. But > when I look at the size I get: > > $ persistent-hdfs/bin/start-all.sh > $ persistent-hdfs/bin/hadoop dfsadmin -report > Warning: $HADOOP_HOME is deprecated. > > Configured Capacity: 42275430400 (39.37 GB) > Present Capacity: 2644878 (24.63 GB) > DFS Remaining: 26448601088 (24.63 GB) > DFS Used: 143360 (140 KB) > DFS Used%: 0% > Under replicated blocks: 0 > Blocks with corrupt replicas: 0 > Missing blocks: 0 > > - > Datanodes available: 5 (5 total, 0 dead) > > Name: 10.46.11.156:60010 > Decommission Status : Normal > Configured Capacity: 8455086080 (7.87 GB) > DFS Used: 28672 (28 KB) > Non DFS Used: 3165372416 (2.95 GB) > DFS Remaining: 5289684992(4.93 GB) > DFS Used%: 0% > DFS Remaining%: 62.56% > Last contact: Fri Feb 13 17:41:46 UTC 2015 > > > Name: 10.41.51.155:60010 > Decommission Status : Normal > Configured Capacity: 8455086080 (7.87 GB) > DFS Used: 28672 (28 KB) > Non DFS Used: 3165364224 (2.95 GB) > DFS Remaining: 5289693184(4.93 GB) > DFS Used%: 0% > DFS Remaining%: 62.56% > Last contact: Fri Feb 13 17:41:46 UTC 2015 > > > Name: 10.38.30.254:60010 > Decommission Status : Normal > Configured Capacity: 8455086080 (7.87 GB) > DFS Used: 28672 (28 KB) > Non DFS Used: 3165249536 (2.95 GB) > DFS Remaining: 5289807872(4.93 GB) > DFS Used%: 0% > DFS Remaining%: 62.56% > Last contact: Fri Feb 13 17:41:46 UTC 2015 > > > Name: 10.204.134.84:60010 > Decommission Status : Normal > Configured Capacity: 8455086080 (7.87 GB) > DFS Used: 28672 (28 KB) > Non DFS Used: 3165343744 (2.95 GB) > DFS Remaining: 5289713664(4.93 GB) > DFS Used%: 0% > DFS Remaining%: 62.56% > Last contact: Fri Feb 13 17:41:46 UTC 2015 > > > Name: 10.33.15.134:60010 > Decommission Status : Normal > Configured Capacity: 8455086080 (7.87 GB) > DFS Used: 28672 (28 KB) > Non DFS Used: 3165356032 (2.95 GB) > DFS Remaining: 5289701376(4.93 GB) > DFS Used%: 0% > DFS Remaining%: 62.56% > Last contact: Fri Feb 13 17:41:46 UTC 2015 > > > That's tiny. My suspicions are aroused when I see: > > $ ls /vol > persistent-hdfs > > /vol is on the small /dev/xvda1 not the large EBS /dev/xvds > > I thought I'd be able to edit persistent-hdfs/conf/core-site.xml to change > the volume: > > > hadoop.tmp.dir > /vol0/persistent-hdfs > > > And then > > persistent-hdfs/bin/stop-all.sh && persistent-hdfs/bin/start-all.sh > > but when I do that, the persistent HDFS won't start for whatever reason. I > can't run > > $ persistent-hdfs/bin/hadoop dfsadmin -report > > 15/02/13 18:50:25 INFO ipc.Client: Retrying connect to server: > ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010. > Already tried 0 time(s). > 15/02/13 18:50:26 INFO ipc.Client: Retrying connect to server: > ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010. > Already tried 1 time(s). > 15/02/13 18:50:27 INFO ipc.Client: Retrying connect to server: > ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010. > Already tried 2 time(s). > > So, it looks like I can't use the EBS for persistent-hdfs. I was doing it > before, so something must have changed in the last couple of weeks (last > time I was using 1.1.0). > > Is this a bug? Has the behaviour of AWS changed? Am I doing something > stupid? How do I fix it? > > Thanks in advance! > > Joe > > >
Unzipping large files and 2GB partition size.
On the advice of some recent discussions on this list, I thought I would try and consume gz files directly. I'm reading them, doing a preliminary map, then repartitioning, then doing normal spark things. As I understand it, zip files aren't readable in partitions because of the format, so I thought that repartitioning would be the next best thing for parallelism. I have about 200 files, some about 1GB compressed and some over 2GB uncompressed. I'm hitting the 2GB maximum partition size. It's been discussed on this list (topic: "2GB limit for partitions?", tickets SPARK-1476 and SPARK-1391). Stack trace at the end. This happened at 10 hours in (probably when it saw its first file). I can't just re-run it quickly! Does anyone have any advice? Might I solve this by re-partitioning as the first step after reading the file(s)? Or is it effectively impossible to read a gz file that expands to over 2GB? Does anyone have any experience with this? Thanks in advance Joe Stack trace: Exception in thread "main" 15/02/18 20:44:25 INFO scheduler.TaskSetManager: Lost task 5.3 in stage 1.0 (TID 283) on executor: java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE) [duplicate 6] org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1.0: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432) at org.apache.spark.storage.BlockManager.get(BlockManager.scala:618) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
Re: Unzipping large files and 2GB partition size.
Thanks for your reply Sean. Looks like it's happening in a map: 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing tasks from Stage 1 (MappedRDD[17] at mapToPair at NativeMethodAccessorImpl.java:-2) That's my initial 'parse' stage, done before repartitioning. It reduces the data size significantly so I thought it would be sensible to do before repartitioning, which involves moving lots of data around. That might be a stupid idea in hindsight! So the obvious thing to try would be to try repartitioning before the map as the first transformation. I would have done that if I could be sure that it would succeed or fail quickly. I'm not entirely clear about the lazy execution of transformations in DAG. It could be that the error is manifesting during the mapToPair, but caused by the earlier read from text file stage. Thanks for pointers to those compression formats. I'll give them a go (although it's not trivial to re-encode 200 GB of data on S3, so if I can get this working reasonably with gzip I'd like to). Any advice about whether this error can be worked round with an early partition? Cheers Joe On 19 February 2015 at 09:51, Sean Owen wrote: > gzip and zip are not splittable compression formats; bzip and lzo are. > Ideally, use a splittable compression format. > > Repartitioning is not a great solution since it means a shuffle, typically. > > This is not necessarily related to how big your partitions are. The > question is, when does this happen? what operation? > > On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass wrote: > > On the advice of some recent discussions on this list, I thought I would > try > > and consume gz files directly. I'm reading them, doing a preliminary map, > > then repartitioning, then doing normal spark things. > > > > As I understand it, zip files aren't readable in partitions because of > the > > format, so I thought that repartitioning would be the next best thing for > > parallelism. I have about 200 files, some about 1GB compressed and some > over > > 2GB uncompressed. > > > > I'm hitting the 2GB maximum partition size. It's been discussed on this > list > > (topic: "2GB limit for partitions?", tickets SPARK-1476 and SPARK-1391). > > Stack trace at the end. This happened at 10 hours in (probably when it > saw > > its first file). I can't just re-run it quickly! > > > > Does anyone have any advice? Might I solve this by re-partitioning as the > > first step after reading the file(s)? Or is it effectively impossible to > > read a gz file that expands to over 2GB? Does anyone have any experience > > with this? > > > > Thanks in advance > > > > Joe > > > > Stack trace: > > > > Exception in thread "main" 15/02/18 20:44:25 INFO > scheduler.TaskSetManager: > > Lost task 5.3 in stage 1.0 (TID 283) on executor: > > java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE) > > [duplicate 6] > > org.apache.spark.SparkException: Job aborted due to stage failure: Task > 2 in > > stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage > 1.0: > > java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE > > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829) > > at > org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123) > > at > org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) > > at > > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517) > > at > > org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432) > > at > org.apache.spark.storage.BlockManager.get(BlockManager.scala:618) > > at > org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) >
Re: Unzipping large files and 2GB partition size.
Thanks for your detailed reply Imran. I'm writing this in Clojure (using Flambo which uses the Java API) but I don't think that's relevant. So here's the pseudocode (sorry I've not written Scala for a long time): val rawData = sc.hadoopFile("/dir/to/gzfiles") // NB multiple files. val parsedFiles = rawData.map(parseFunction) // can return nil on failure val filtered = parsedFiles.filter(notNil) val partitioned = filtered.repartition(100) // guessed number val persisted = partitioned.persist(StorageLevels.DISK_ONLY) val resultA = stuffA(persisted) val resultB = stuffB(persisted) val resultC = stuffC(persisted) So, I think I'm already doing what you suggested. I would have assumed that partition size would be («size of expanded file» / «number of partitions»). In this case, 100 (which I picked out of the air). I wonder whether the «size of expanded file» is actually the size of all concatenated input files (probably about 800 GB)? In that case should I multiply it by the number of files? Or perhaps I'm barking up completely the wrong tree. Joe On 19 February 2015 at 14:44, Imran Rashid wrote: > Hi Joe, > > The issue is not that you have input partitions that are bigger than 2GB > -- its just that they are getting cached. You can see in the stack trace, > the problem is when you try to read data out of the DiskStore: > > org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132) > > Also, just because you see this: > > 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing > tasks from Stage 1 (MappedRDD[17] at mapToPair at > NativeMethodAccessorImpl.java:-2) > > it doesn't *necessarily* mean that this is coming from your map. It can > be pretty confusing how your operations on RDDs get turned into stages, it > could be a lot more than just your map. and actually, it might not even be > your map at all -- some of the other operations you invoke call map > underneath the covers. So its hard to say what is going on here w/ out > seeing more code. Anyway, maybe you've already considered all this (you > did mention the lazy execution of the DAG), but I wanted to make sure. it > might help to use rdd.setName() and also to look at rdd.toDebugString. > > As far as what you can do about this -- it could be as simple as moving > your rdd.persist() to after you have compressed and repartitioned your > data. eg., I'm blindly guessing you have something like this: > > val rawData = sc.hadoopFile(...) > rawData.persist(DISK) > rawData.count() > val compressedData = rawData.map{...} > val repartitionedData = compressedData.repartition(N) > ... > > change it to something like: > > val rawData = sc.hadoopFile(...) > val compressedData = rawData.map{...} > val repartitionedData = compressedData.repartition(N) > repartitionedData.persist(DISK) > repartitionedData.count() > ... > > > The point is, you avoid caching any data until you have ensured that the > partitions are small. You might have big partitions before that in > rawData, but that is OK. > > Imran > > > On Thu, Feb 19, 2015 at 4:43 AM, Joe Wass wrote: > >> Thanks for your reply Sean. >> >> Looks like it's happening in a map: >> >> 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing >> tasks from Stage 1 (MappedRDD[17] at mapToPair at >> NativeMethodAccessorImpl.java:-2) >> >> That's my initial 'parse' stage, done before repartitioning. It reduces >> the data size significantly so I thought it would be sensible to do before >> repartitioning, which involves moving lots of data around. That might be a >> stupid idea in hindsight! >> >> So the obvious thing to try would be to try repartitioning before the map >> as the first transformation. I would have done that if I could be sure that >> it would succeed or fail quickly. >> >> I'm not entirely clear about the lazy execution of transformations in >> DAG. It could be that the error is manifesting during the mapToPair, but >> caused by the earlier read from text file stage. >> >> Thanks for pointers to those compression formats. I'll give them a go >> (although it's not trivial to re-encode 200 GB of data on S3, so if I can >> get this working reasonably with gzip I'd like to). >> >> Any advice about whether this error can be worked round with an early >> partition? >> >> Cheers >> >> Joe >> >> >> On 19 February 2015 at 09:51, Sean Owen wrote: >> >>> gzip and zip are not splittable compression formats; bzip and lzo are. >>> Ideally, use a splittable compression format. >&g
Running out of space (when there's no shortage)
I'm running a cluster of 3 Amazon EC2 machines (small number because it's expensive when experiments keep crashing after a day!). Today's crash looks like this (stacktrace at end of message). org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 On my three nodes, I have plenty of space and inodes: A $ df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 97937 426351 19% / tmpfs1909200 1 19091991% /dev/shm /dev/xvdb2457600 54 24575461% /mnt /dev/xvdc2457600 24 24575761% /mnt2 /dev/xvds831869296 23844 8318454521% /vol0 A $ df -h FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 3.4G 4.5G 44% / tmpfs 7.3G 0 7.3G 0% /dev/shm /dev/xvdb 37G 1.2G 34G 4% /mnt /dev/xvdc 37G 177M 35G 1% /mnt2 /dev/xvds1000G 802G 199G 81% /vol0 B $ df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 97947 426341 19% / tmpfs1906639 1 19066381% /dev/shm /dev/xvdb2457600 54 24575461% /mnt /dev/xvdc2457600 24 24575761% /mnt2 /dev/xvds816200704 24223 8161764811% /vol0 B $ df -h FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 3.6G 4.3G 46% / tmpfs 7.3G 0 7.3G 0% /dev/shm /dev/xvdb 37G 1.2G 34G 4% /mnt /dev/xvdc 37G 177M 35G 1% /mnt2 /dev/xvds1000G 805G 195G 81% /vol0 C $df -i FilesystemInodes IUsed IFree IUse% Mounted on /dev/xvda1524288 97938 426350 19% / tmpfs1906897 1 19068961% /dev/shm /dev/xvdb2457600 54 24575461% /mnt /dev/xvdc2457600 24 24575761% /mnt2 /dev/xvds755218352 24024 7551943281% /vol0 root@ip-10-204-136-223 ~]$ C $ df -h FilesystemSize Used Avail Use% Mounted on /dev/xvda17.9G 3.4G 4.5G 44% / tmpfs 7.3G 0 7.3G 0% /dev/shm /dev/xvdb 37G 1.2G 34G 4% /mnt /dev/xvdc 37G 177M 35G 1% /mnt2 /dev/xvds1000G 820G 181G 82% /vol0 The devices may be ~80% full but that still leaves ~200G free on each. My spark-env.sh has export SPARK_LOCAL_DIRS="/vol0/spark" I have manually verified that on each slave the only temporary files are stored on /vol0, all looking something like this /vol0/spark/spark-f05d407c/spark-fca3e573/spark-78c06215/spark-4f0c4236/20/rdd_8_884 So it looks like all the files are being stored on the large drives (incidentally they're AWS EBS volumes, but that's the only way to get enough storage). My process crashed before with a slightly different exception under the same circumstances: kryo.KryoException: java.io.IOException: No space left on device These both happen after several hours and several GB of temporary files. Why does Spark think it's run out of space? TIA Joe Stack trace 1: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384) at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380) at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93) at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:109) at org.apache.spark.s
Re: Running out of space (when there's no shortage)
Thanks everyone. Yiannis, do you know if there's a bug report for this regression? For some other (possibly connected) reason I upgraded from 1.1.1 to 1.2.1, but I can't remember what the bug was. Joe On 24 February 2015 at 19:26, Yiannis Gkoufas wrote: > Hi there, > > I assume you are using spark 1.2.1 right? > I faced the exact same issue and switched to 1.1.1 with the same > configuration and it was solved. > On 24 Feb 2015 19:22, "Ted Yu" wrote: > >> Here is a tool which may give you some clue: >> http://file-leak-detector.kohsuke.org/ >> >> Cheers >> >> On Tue, Feb 24, 2015 at 11:04 AM, Vladimir Rodionov < >> vrodio...@splicemachine.com> wrote: >> >>> Usually it happens in Linux when application deletes file w/o double >>> checking that there are no open FDs (resource leak). In this case, Linux >>> holds all space allocated and does not release it until application >>> exits (crashes in your case). You check file system and everything is >>> normal, you have enough space and you have no idea why does application >>> report "no space left on device". >>> >>> Just a guess. >>> >>> -Vladimir Rodionov >>> >>> On Tue, Feb 24, 2015 at 8:34 AM, Joe Wass wrote: >>> >>>> I'm running a cluster of 3 Amazon EC2 machines (small number because >>>> it's expensive when experiments keep crashing after a day!). >>>> >>>> Today's crash looks like this (stacktrace at end of message). >>>> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an >>>> output location for shuffle 0 >>>> >>>> On my three nodes, I have plenty of space and inodes: >>>> >>>> A $ df -i >>>> FilesystemInodes IUsed IFree IUse% Mounted on >>>> /dev/xvda1524288 97937 426351 19% / >>>> tmpfs1909200 1 19091991% /dev/shm >>>> /dev/xvdb2457600 54 24575461% /mnt >>>> /dev/xvdc2457600 24 24575761% /mnt2 >>>> /dev/xvds831869296 23844 8318454521% /vol0 >>>> >>>> A $ df -h >>>> FilesystemSize Used Avail Use% Mounted on >>>> /dev/xvda17.9G 3.4G 4.5G 44% / >>>> tmpfs 7.3G 0 7.3G 0% /dev/shm >>>> /dev/xvdb 37G 1.2G 34G 4% /mnt >>>> /dev/xvdc 37G 177M 35G 1% /mnt2 >>>> /dev/xvds1000G 802G 199G 81% /vol0 >>>> >>>> B $ df -i >>>> FilesystemInodes IUsed IFree IUse% Mounted on >>>> /dev/xvda1524288 97947 426341 19% / >>>> tmpfs1906639 1 19066381% /dev/shm >>>> /dev/xvdb2457600 54 24575461% /mnt >>>> /dev/xvdc2457600 24 24575761% /mnt2 >>>> /dev/xvds816200704 24223 8161764811% /vol0 >>>> >>>> B $ df -h >>>> FilesystemSize Used Avail Use% Mounted on >>>> /dev/xvda17.9G 3.6G 4.3G 46% / >>>> tmpfs 7.3G 0 7.3G 0% /dev/shm >>>> /dev/xvdb 37G 1.2G 34G 4% /mnt >>>> /dev/xvdc 37G 177M 35G 1% /mnt2 >>>> /dev/xvds1000G 805G 195G 81% /vol0 >>>> >>>> C $df -i >>>> FilesystemInodes IUsed IFree IUse% Mounted on >>>> /dev/xvda1524288 97938 426350 19% / >>>> tmpfs1906897 1 19068961% /dev/shm >>>> /dev/xvdb2457600 54 24575461% /mnt >>>> /dev/xvdc2457600 24 24575761% /mnt2 >>>> /dev/xvds755218352 24024 7551943281% /vol0 >>>> root@ip-10-204-136-223 ~]$ >>>> >>>> C $ df -h >>>> FilesystemSize Used Avail Use% Mounted on >>>> /dev/xvda17.9G 3.4G 4.5G 44% / >>>> tmpfs 7.3G 0 7.3G 0% /dev/shm >>>> /dev/xvdb 37G 1.2G 34G 4% /mnt >>>> /dev/xvdc 37G 177M 35G 1% /mnt2 >>>> /dev/xvds1000G 820G 181G 82% /vol0 >>>> >>>> The devices may be ~80% full but that still leaves ~200G free on each. >>>> My spark-env.sh has >>>> >>>> export SPARK_LOCAL_DIRS=&q