Is anyone using Amazon EC2?

2015-05-23 Thread Joe Wass
I used Spark on EC2 a while ago


Is anyone using Amazon EC2? (second attempt!)

2015-05-23 Thread Joe Wass
I used Spark on EC2 a while ago, but recent revisions seem to have broken
the functionality.

Is anyone actually using Spark on EC2 at the moment?

The bug in question is:

https://issues.apache.org/jira/browse/SPARK-5008

It makes it impossible to use persistent HDFS without a workround on each
slave node.

No-one seems to be interested in the bug, so I wonder if other people
aren't actually having this problem. If this is the case, any suggestions?

Joe


Re: Is anyone using Amazon EC2?

2015-05-23 Thread Joe Wass
Sorry guys, my email submitted before I finished writing it. Check my other
message (with the same subject)!

On 23 May 2015 at 20:25, Shafaq  wrote:

> Yes-Spark EC2 cluster . Looking into migrating to spark emr.
> Adding more ec2 is not possible afaik.
> On May 23, 2015 11:22 AM, "Johan Beisser"  wrote:
>
>> Yes.
>>
>> We're looking at bootstrapping in EMR...
>> On Sat, May 23, 2015 at 07:21 Joe Wass  wrote:
>>
>>> I used Spark on EC2 a while ago
>>>
>>


Re: Spark dramatically slow when I add "saveAsTextFile"

2015-05-24 Thread Joe Wass
This may sound like an obvious question, but are you sure that the program
is doing any work when you don't have a saveAsTextFile? If there are
transformations but no actions to actually collect the data, there's no
need for Spark to execute the transformations.

As to the question of 'is this taking too long', I can't answer that. But
your code was HTML escaped and therefore difficult to read, perhaps you
should post a link to a Gist.

Joe

On 24 May 2015 at 10:36, allanjie  wrote:

> *Problem Description*:
>
> The program running in  stand-alone spark cluster (1 master, 6 workers with
> 8g ram and 2 cores).
> Input: a 468MB file with 133433 records stored in HDFS.
> Output: just 2MB file will stored in HDFS
> The program has two map operations and one reduceByKey operation.
> Finally I save the result to HDFS using "*saveAsTextFile*".
> *Problem*: if I don't add "saveAsTextFile", the program runs very fast(a
> few
> seconds), otherwise extremely slow until about 30 mins.
>
> *My program (is very Simple)*
> public static void main(String[] args) throws IOException{
> /**Parameter Setting***/
>  String localPointPath =
> "/home/hduser/skyrock/skyrockImageFeatures.csv";
>  String remoteFilePath =
> "hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv";
>  String outputPath =
> "hdfs://HadoopV26Master:9000/user/sparkoutput/";
>  final int row = 133433;
>  final int col = 458;
>  final double dc = Double.valueOf(args[0]);
>
> SparkConf conf = new SparkConf().
> setAppName("distance")
> .set("spark.executor.memory",
> "4g").set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> .set("spark.eventLog.enabled", "true");
> JavaSparkContext sc = new JavaSparkContext(conf);
>
> JavaRDD textFile = sc.textFile(remoteFilePath);
>
> //Broadcast variable, the dimension of this double array:
> 133433*458
> final Broadcast broadcastPoints =
> sc.broadcast(createBroadcastPoints(localPointPath,row,col));
> /**
>  * Compute the distance in terms of each point on each
> instance.
>  * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1
>  */
> JavaPairRDD distance =
> textFile.flatMapToPair(new
> PairFlatMapFunction(){
> public Iterable>
> call(String v1) throws
> Exception{
> List al =
> Arrays.asList(v1.split(","));
> double[] featureVals = new
> double[al.size()];
> for(int j=0;j featureVals[j] =
> Double.valueOf(al.get(j+1));
> int jIndex = Integer.valueOf(al.get(0));
> double[][] allPoints =
> broadcastPoints.value();
> double sum = 0;
> List> list =
> new
> ArrayList>();
> for(int i=0;i sum = 0;
> for(int j=0;j sum +=
> (allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]);
> }
> list.add(new
> Tuple2(jIndex, Math.sqrt(sum) ));
> }
> return list;
> }
> });
>
> //Create zeroOne density
> JavaPairRDD densityZeroOne =
> distance.mapValues(new
> Function(){
> public Integer call(Double v1) throws Exception {
> if(v1 return 1;
> else return 0;
> }
>
> });
> //  //Combine the density
> JavaPairRDD counts =
> densityZeroOne.reduceByKey(new
> Function2() {
> public Integer call(Integer v1, Integer
> v2) throws Exception {
> return v1+v2;
> }
> });
> counts.*saveAsTextFile*(outputPath+args[1]);
> sc.stop();
> }
>
> *If I comment "saveAsTextFile", log will be:*
> Picked up _JAVA_OPTIONS: -Xmx4g
> 15/05/24 15:21:30 INFO spark.SparkContext: Running Spark version 1.3.1
> 15/05/24 15:21:30 WARN util.NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/0

Are failures normal / to be expected on an AWS cluster?

2014-12-20 Thread Joe Wass
I have a Spark job running on about 300 GB of log files, on Amazon EC2,
with 10 x Large instances (each with 600 GB disk). The job hasn't yet
completed.

So far, 18 stages have completed (2 of which have retries) and 3 stages
have failed. In each failed stage there are ~160 successful tasks, but
"CANNOT FIND ADDRESS" for half of the executors.

Are these numbers normal for AWS? Should a certain number of faults be
expected? I know that AWS isn't meant to be perfect, but this doesn't seem
right.

Cheers

Joe


PermGen issues on AWS

2015-01-09 Thread Joe Wass
I'm running on an AWS cluster of 10 x m1.large (64 bit, 7.5 GiB RAM). FWIW
I'm using the Flambo Clojure wrapper which uses the Java API but I don't
think that should make any difference. I'm running with the following
command:

spark/bin/spark-submit --class mything.core --name "My Thing" --conf
spark.yarn.executor.memoryOverhead=4096 --conf
spark.executor.extraJavaOptions="-XX:+CMSClassUnloadingEnabled
-XX:+CMSPermGenSweepingEnabled" /root/spark/code/myjar.jar

For one of the stages I'm getting errors:

 - ExecutorLostFailure (executor lost)
 - Resubmitted (resubmitted due to lost executor)

And I think they're caused by slave executor JVMs dying up with this error:

java.lang.OutOfMemoryError: PermGen space
java.lang.Class.getDeclaredConstructors0(Native Method)
java.lang.Class.privateGetDeclaredConstructors(Class.java:2585)
java.lang.Class.getConstructor0(Class.java:2885)
java.lang.Class.newInstance(Class.java:350)

sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:399)

sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:396)
java.security.AccessController.doPrivileged(Native Method)

sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:395)

sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:113)

sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:331)

java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376)
java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72)
java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493)
java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
java.security.AccessController.doPrivileged(Native Method)
java.io.ObjectStreamClass.(ObjectStreamClass.java:468)
java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)

java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)


1 stage out of 14 (so far) is failing. My failing stage is 1768 succeeded /
1862 (940 failed). 7 tasks failed with OOM, 919 were "Resubmitted
(resubmitted due to lost executor)".

Now my "Aggregated Metrics by Executor" shows that 10 out of 16 executors
show "CANNOT FIND ADDRESS" which I imagine means the JVM blew up and hasn't
been restarted. Now the 'Executors' tab shows only 7 executors.

 - Is this normal?
 - Any ideas why this is happening?
 - Any other measures I can take to prevent this?
 - Is the rest of my app going to run on a reduced number of executors?
 - Can I re-start the executors mid-application? This is a long-running
job, so I'd like to do what I can whilst it's running, if possible.
 - Am I correct in thinking that the --conf arguments are supplied to the
JVMs of the slave executors, so they will be receiving the extraJavaOptions
and memoryOverhead?

Thanks very much!

Joe


Re: PermGen issues on AWS

2015-01-09 Thread Joe Wass
Thanks, I noticed this after posting. I'll try that.
I also think that perhaps Clojure might be creating more classes than the
equivalent Java would, so I'll nudge it a bit higher.

On 9 January 2015 at 11:45, Sean Owen  wrote:

> It's normal for PermGen to be a bit more of an issue with Spark than
> for other JVM-based applications. You should simply increase the
> PermGen size, which I don't see in your command. -XX:MaxPermSize=256m
> allows it to grow to 256m for example. The right size depends on your
> total heap size and app.
>
> Also, Java 8 no longer has a permanent generation, so this particular
> type of problem and tuning is not needed. You might consider running
> on Java 8.
>
> On Fri, Jan 9, 2015 at 10:38 AM, Joe Wass  wrote:
> > I'm running on an AWS cluster of 10 x m1.large (64 bit, 7.5 GiB RAM).
> FWIW
> > I'm using the Flambo Clojure wrapper which uses the Java API but I don't
> > think that should make any difference. I'm running with the following
> > command:
> >
> > spark/bin/spark-submit --class mything.core --name "My Thing" --conf
> > spark.yarn.executor.memoryOverhead=4096 --conf
> > spark.executor.extraJavaOptions="-XX:+CMSClassUnloadingEnabled
> > -XX:+CMSPermGenSweepingEnabled" /root/spark/code/myjar.jar
> >
> > For one of the stages I'm getting errors:
> >
> >  - ExecutorLostFailure (executor lost)
> >  - Resubmitted (resubmitted due to lost executor)
> >
> > And I think they're caused by slave executor JVMs dying up with this
> error:
> >
> > java.lang.OutOfMemoryError: PermGen space
> > java.lang.Class.getDeclaredConstructors0(Native Method)
> > java.lang.Class.privateGetDeclaredConstructors(Class.java:2585)
> > java.lang.Class.getConstructor0(Class.java:2885)
> > java.lang.Class.newInstance(Class.java:350)
> >
> >
> sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:399)
> >
> >
> sun.reflect.MethodAccessorGenerator$1.run(MethodAccessorGenerator.java:396)
> > java.security.AccessController.doPrivileged(Native Method)
> >
> >
> sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:395)
> >
> >
> sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:113)
> >
> >
> sun.reflect.ReflectionFactory.newConstructorForSerialization(ReflectionFactory.java:331)
> >
> >
> java.io.ObjectStreamClass.getSerializableConstructor(ObjectStreamClass.java:1376)
> > java.io.ObjectStreamClass.access$1500(ObjectStreamClass.java:72)
> > java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:493)
> > java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
> > java.security.AccessController.doPrivileged(Native Method)
> > java.io.ObjectStreamClass.(ObjectStreamClass.java:468)
> > java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
> >
>  java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)
> >
> > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
> >
>  java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> >
> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> >
>  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> >
> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> >
> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> >
> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> >
>  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> >
> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> >
> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> >
> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> >
>  java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> >
> > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> >
> > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> >
> >
> > 1 stage out of 14 (so far) is failing. My failing stage is 1768
> succeeded /
> > 1862 (940 failed). 7 tasks failed with OOM, 919 were "Resubmitted
> > (resubmitted due to lost executor)".
> >
> > Now my "Aggregated Metrics by Executor" shows that 10 out of 16 executors
> > show "CANNOT FIND ADDRESS" which I imagin

Accidental kill in UI

2015-01-09 Thread Joe Wass
So I had a Spark job with various failures, and I decided to kill it and
start again. I clicked the 'kill' link in the web console, restarted the
job on the command line and headed back to the web console and refreshed to
see how my job was doing... the URL at the time was:

/stages/stage/kill?id=1&terminate=true

Which of course terminated the stage again. No loss, but if I'd waited a
few hours before doing that, I would have lost data.

I know to be careful next time, but isn't 'don't modify state as a result
of a GET request' the first rule of HTTP? It could lead to an expensive
mistake. Making this a POST would be a simple fix.

Does anyone else think this is worth creating an issue for?


ephemeral-hdfs vs persistent-hdfs - performance

2015-02-03 Thread Joe Wass
I want to process about 800 GB of data on an Amazon EC2 cluster. So, I need
to store the input in HDFS somehow.

I currently have a cluster of 5 x m3.xlarge, each of which has 80GB disk.
Each HDFS node reports 73 GB, and the total capacity is ~370 GB.

If I want to process 800 GB of data (assuming I can't split the jobs up),
I'm guessing I need to get persistent-hdfs involved.

1 - Does persistent-hdfs have noticeably different performance than
ephemeral-hdfs?
2 - If so, is there a recommended configuration (like storing input and
output on persistent, but persisted RDDs on ephemeral?)

This seems like a common use-case, so sorry if this has already been
covered.

Joe


Re: ephemeral-hdfs vs persistent-hdfs - performance

2015-02-03 Thread Joe Wass
The data is coming from S3 in the first place, and the results will be
uploaded back there. But even in the same availability zone, fetching 170
GB (that's gzipped) is slow. From what I understand of the pipelines,
multiple transforms on the same RDD might involve re-reading the input,
which very quickly add up in comparison to having the data locally. Unless
I persisted the data (which I am in fact doing) but that would involve
storing approximately the same amount of data in HDFS, which wouldn't fit.

Also, I understood that S3 was unsuitable for practical? See "Why you
cannot use S3 as a replacement for HDFS"[0]. I'd love to be proved wrong,
though, that would make things a lot easier.

[0] http://wiki.apache.org/hadoop/AmazonS3



On 3 February 2015 at 16:45, David Rosenstrauch  wrote:

> You could also just push the data to Amazon S3, which would un-link the
> size of the cluster needed to process the data from the size of the data.
>
> DR
>
>
> On 02/03/2015 11:43 AM, Joe Wass wrote:
>
>> I want to process about 800 GB of data on an Amazon EC2 cluster. So, I
>> need
>> to store the input in HDFS somehow.
>>
>> I currently have a cluster of 5 x m3.xlarge, each of which has 80GB disk.
>> Each HDFS node reports 73 GB, and the total capacity is ~370 GB.
>>
>> If I want to process 800 GB of data (assuming I can't split the jobs up),
>> I'm guessing I need to get persistent-hdfs involved.
>>
>> 1 - Does persistent-hdfs have noticeably different performance than
>> ephemeral-hdfs?
>> 2 - If so, is there a recommended configuration (like storing input and
>> output on persistent, but persisted RDDs on ephemeral?)
>>
>> This seems like a common use-case, so sorry if this has already been
>> covered.
>>
>> Joe
>>
>>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Kyro serialization and OOM

2015-02-03 Thread Joe Wass
I have about 500 MB of data and I'm trying to process it on a single
`local` instance. I'm getting an Out of Memory exception. Stack trace at
the end.

Spark 1.1.1
My JVM has --Xmx2g

spark.driver.memory = 1000M
spark.executor.memory = 1000M
spark.kryoserializer.buffer.mb = 256
spark.kryoserializer.buffer.max.mb = 256

The objects I'm dealing with are well constrained. Each can be no more than
500 bytes at the very most. I ran into problems with the kryo buffer being
too small but I think that 256 MB should do the job. The docs say "This
must be larger than any object you attempt to serialize". No danger of that.

My input is a single file (on average each line is 500 bytes). I'm
performing various filter, map, flatMap, groupByKey and reduceByKey. The
only 'actions' I'm performing are foreach, which inserts values into a
database.

On input, I'm parsing the lines and then persisting with DISK_ONLY.

I'm foreaching over the keys and then foreaching over the values of key
value RDDs. The docs say that groupByKey returns (K, Iterable). So the
values (which can be large) shouldn't be serialized as a single list.

So I don't think I should be loading anything larger than 256 MB at once.

My code works for small sample toy data and I'm trying it out on a bit
more. As I understand it, the way that Spark partitions data means that it
(in most cases) any job that will run on a cluster will also run on a
single instance, given enough time.

I think I've given enough memory to cover my serialization needs as I
understand them. Have I misunderstood?

Joe

Stack trace:

INFO  org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in
stage 30.0 (TID 116, localhost, PROCESS_LOCAL, 993 bytes)
INFO  org.apache.spark.executor.Executor - Running task 0.0 in stage 30.0
(TID 116)

...

ERROR org.apache.spark.executor.Executor - Exception in task 0.0 in stage
30.0 (TID 116)
java.lang.OutOfMemoryError: Java heap space
at com.esotericsoftware.kryo.io.Output.(Output.java:35)
at
org.apache.spark.serializer.KryoSerializer.newKryoOutput(KryoSerializer.scala:58)
at
org.apache.spark.serializer.KryoSerializerInstance.output$lzycompute(KryoSerializer.scala:151)
at
org.apache.spark.serializer.KryoSerializerInstance.output(KryoSerializer.scala:151)
at
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:155)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:188)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

...

WARN  org.apache.spark.scheduler.TaskSetManager - Lost task 0.0 in stage
30.0 (TID 116, localhost): java.lang.OutOfMemoryError: Java heap space
com.esotericsoftware.kryo.io.Output.(Output.java:35)

org.apache.spark.serializer.KryoSerializer.newKryoOutput(KryoSerializer.scala:58)

org.apache.spark.serializer.KryoSerializerInstance.output$lzycompute(KryoSerializer.scala:151)

org.apache.spark.serializer.KryoSerializerInstance.output(KryoSerializer.scala:151)

org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:155)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:188)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)


Re: ephemeral-hdfs vs persistent-hdfs - performance

2015-02-03 Thread Joe Wass
Thanks very much, that's good to know, I'll certainly give it a look.

Can you give me a hint about you unzip your input files on the fly? I
thought that it wasn't possible to parallelize zipped inputs unless they
were unzipped before passing to Spark?

Joe

On 3 February 2015 at 17:48, David Rosenstrauch  wrote:

> We use S3 as a main storage for all our input data and our generated
> (output) data.  (10's of terabytes of data daily.)  We read gzipped data
> directly from S3 in our Hadoop/Spark jobs - it's not crazily slow, as long
> as you parallelize the work well by distributing the processing across
> enough machines.  (About 100 nodes, in our case.)
>
> The way we generally operate is re: storage is:  read input directly from
> s3, write output from Hadoop/Spark jobs to HDFS, then after job is complete
> distcp the relevant output from HDFS back to S3.  Works for us ... YMMV.
> :-)
>
> HTH,
>
> DR
>
>
> On 02/03/2015 12:32 PM, Joe Wass wrote:
>
>> The data is coming from S3 in the first place, and the results will be
>> uploaded back there. But even in the same availability zone, fetching 170
>> GB (that's gzipped) is slow. From what I understand of the pipelines,
>> multiple transforms on the same RDD might involve re-reading the input,
>> which very quickly add up in comparison to having the data locally. Unless
>> I persisted the data (which I am in fact doing) but that would involve
>> storing approximately the same amount of data in HDFS, which wouldn't fit.
>>
>> Also, I understood that S3 was unsuitable for practical? See "Why you
>> cannot use S3 as a replacement for HDFS"[0]. I'd love to be proved wrong,
>> though, that would make things a lot easier.
>>
>> [0] http://wiki.apache.org/hadoop/AmazonS3
>>
>>
>>
>> On 3 February 2015 at 16:45, David Rosenstrauch 
>> wrote:
>>
>>  You could also just push the data to Amazon S3, which would un-link the
>>> size of the cluster needed to process the data from the size of the data.
>>>
>>> DR
>>>
>>>
>>> On 02/03/2015 11:43 AM, Joe Wass wrote:
>>>
>>>  I want to process about 800 GB of data on an Amazon EC2 cluster. So, I
>>>> need
>>>> to store the input in HDFS somehow.
>>>>
>>>> I currently have a cluster of 5 x m3.xlarge, each of which has 80GB
>>>> disk.
>>>> Each HDFS node reports 73 GB, and the total capacity is ~370 GB.
>>>>
>>>> If I want to process 800 GB of data (assuming I can't split the jobs
>>>> up),
>>>> I'm guessing I need to get persistent-hdfs involved.
>>>>
>>>> 1 - Does persistent-hdfs have noticeably different performance than
>>>> ephemeral-hdfs?
>>>> 2 - If so, is there a recommended configuration (like storing input and
>>>> output on persistent, but persisted RDDs on ephemeral?)
>>>>
>>>> This seems like a common use-case, so sorry if this has already been
>>>> covered.
>>>>
>>>> Joe
>>>>
>>>>
>>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>>
>>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


How many stages in my application?

2015-02-04 Thread Joe Wass
I'm sitting here looking at my application crunching gigabytes of data on a
cluster and I have no idea if it's an hour away from completion or a
minute. The web UI shows progress through each stage, but not how many
stages remaining. How can I work out how many stages my program will take
automatically?

My application has a slightly interesting DAG (re-use of functions that
contain Spark transformations, persistent RDDs). Not that complex, but not
'step 1, step 2, step 3'.

I'm guessing that if the driver program runs sequentially sending messages
to Spark, then Spark has no knowledge of the structure of the driver
program. Therefore it's necessary to execute it on a small test dataset and
see how many stages result?

When I set spark.eventLog.enabled = true and run on (very small) test data
I don't get any stage messages in my STDOUT or in the log file. This is on
a `local` instance.

Did I miss something obvious?

Thanks!

Joe


Re: How many stages in my application?

2015-02-05 Thread Joe Wass
Thanks Akhil and Mark. I can of course count events (assuming I can deduce
the shuffle boundaries), but like I said the program isn't simple and I'd
have to do this manually every time I change the code. So I rather find a
way of doing this automatically if possible.

On 4 February 2015 at 19:41, Mark Hamstra  wrote:

> But there isn't a 1-1 mapping from operations to stages since multiple
> operations will be pipelined into a single stage if no shuffle is
> required.  To determine the number of stages in a job you really need to be
> looking for shuffle boundaries.
>
> On Wed, Feb 4, 2015 at 11:27 AM, Akhil Das 
> wrote:
>
>> You can easily understand the flow by looking at the number of operations
>> in your program (like map, groupBy, join etc.), first of all you list out
>> the number of operations happening in your application and then from the
>> webui you will be able to see how many operations have happened so far.
>>
>> Thanks
>> Best Regards
>>
>> On Wed, Feb 4, 2015 at 4:33 PM, Joe Wass  wrote:
>>
>>> I'm sitting here looking at my application crunching gigabytes of data
>>> on a cluster and I have no idea if it's an hour away from completion or a
>>> minute. The web UI shows progress through each stage, but not how many
>>> stages remaining. How can I work out how many stages my program will take
>>> automatically?
>>>
>>> My application has a slightly interesting DAG (re-use of functions that
>>> contain Spark transformations, persistent RDDs). Not that complex, but not
>>> 'step 1, step 2, step 3'.
>>>
>>> I'm guessing that if the driver program runs sequentially sending
>>> messages to Spark, then Spark has no knowledge of the structure of the
>>> driver program. Therefore it's necessary to execute it on a small test
>>> dataset and see how many stages result?
>>>
>>> When I set spark.eventLog.enabled = true and run on (very small) test
>>> data I don't get any stage messages in my STDOUT or in the log file. This
>>> is on a `local` instance.
>>>
>>> Did I miss something obvious?
>>>
>>> Thanks!
>>>
>>> Joe
>>>
>>
>>
>


How do I set spark.local.dirs?

2015-02-06 Thread Joe Wass
I'm running on EC2 and I want to set the directory to use on the slaves
(mounted EBS volumes).

I have set:
spark.local.dir /vol3/my-spark-dir
in
   /root/spark/conf/spark-defaults.conf

and replicated to all nodes. I have verified that in the console the value
in the config corresponds. I have checked that these values are present in
nodes.

But it's still creating temp files in the wrong (default) place:

/mnt2/spark

How do I get my slaves to pick up this value? How can I verify that they
have?

Thanks!

Joe


Has Spark 1.2.0 changed EC2 persistent-hdfs?

2015-02-13 Thread Joe Wass
I've updated to Spark 1.2.0 and the EC2 and the persistent-hdfs behaviour
appears to have changed.

My launch script is

spark-1.2.0-bin-hadoop2.4/ec2/spark-ec2 --instance-type=m3.xlarge -s 5
--ebs-vol-size=1000 launch myproject

When I ssh into master I get:

$ df -h
FilesystemSize  Used Avail Use% Mounted on
/dev/xvda17.9G  2.9G  5.0G  37% /
tmpfs 7.3G 0  7.3G   0% /dev/shm
/dev/xvdb  37G  1.3G   34G   4% /mnt
/dev/xvdc  37G  177M   35G   1% /mnt2
/dev/xvds1000G   33M 1000G   1% /vol0

that /vol0 is the place I want (and assume) persistent-hdfs to go. But when
I look at the size I get:

$ persistent-hdfs/bin/start-all.sh
$ persistent-hdfs/bin/hadoop dfsadmin -report
Warning: $HADOOP_HOME is deprecated.

Configured Capacity: 42275430400 (39.37 GB)
Present Capacity: 2644878 (24.63 GB)
DFS Remaining: 26448601088 (24.63 GB)
DFS Used: 143360 (140 KB)
DFS Used%: 0%
Under replicated blocks: 0
Blocks with corrupt replicas: 0
Missing blocks: 0

-
Datanodes available: 5 (5 total, 0 dead)

Name: 10.46.11.156:60010
Decommission Status : Normal
Configured Capacity: 8455086080 (7.87 GB)
DFS Used: 28672 (28 KB)
Non DFS Used: 3165372416 (2.95 GB)
DFS Remaining: 5289684992(4.93 GB)
DFS Used%: 0%
DFS Remaining%: 62.56%
Last contact: Fri Feb 13 17:41:46 UTC 2015


Name: 10.41.51.155:60010
Decommission Status : Normal
Configured Capacity: 8455086080 (7.87 GB)
DFS Used: 28672 (28 KB)
Non DFS Used: 3165364224 (2.95 GB)
DFS Remaining: 5289693184(4.93 GB)
DFS Used%: 0%
DFS Remaining%: 62.56%
Last contact: Fri Feb 13 17:41:46 UTC 2015


Name: 10.38.30.254:60010
Decommission Status : Normal
Configured Capacity: 8455086080 (7.87 GB)
DFS Used: 28672 (28 KB)
Non DFS Used: 3165249536 (2.95 GB)
DFS Remaining: 5289807872(4.93 GB)
DFS Used%: 0%
DFS Remaining%: 62.56%
Last contact: Fri Feb 13 17:41:46 UTC 2015


Name: 10.204.134.84:60010
Decommission Status : Normal
Configured Capacity: 8455086080 (7.87 GB)
DFS Used: 28672 (28 KB)
Non DFS Used: 3165343744 (2.95 GB)
DFS Remaining: 5289713664(4.93 GB)
DFS Used%: 0%
DFS Remaining%: 62.56%
Last contact: Fri Feb 13 17:41:46 UTC 2015


Name: 10.33.15.134:60010
Decommission Status : Normal
Configured Capacity: 8455086080 (7.87 GB)
DFS Used: 28672 (28 KB)
Non DFS Used: 3165356032 (2.95 GB)
DFS Remaining: 5289701376(4.93 GB)
DFS Used%: 0%
DFS Remaining%: 62.56%
Last contact: Fri Feb 13 17:41:46 UTC 2015


That's tiny. My suspicions are aroused when I see:

$ ls /vol
persistent-hdfs

/vol is on the small /dev/xvda1 not the large EBS /dev/xvds

I thought I'd be able to edit persistent-hdfs/conf/core-site.xml to change
the volume:


  hadoop.tmp.dir
  /vol0/persistent-hdfs  


And then

persistent-hdfs/bin/stop-all.sh && persistent-hdfs/bin/start-all.sh

but when I do that, the persistent HDFS won't start for whatever reason. I
can't run

$ persistent-hdfs/bin/hadoop dfsadmin -report

15/02/13 18:50:25 INFO ipc.Client: Retrying connect to server:
ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010. Already
tried 0 time(s).
15/02/13 18:50:26 INFO ipc.Client: Retrying connect to server:
ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010. Already
tried 1 time(s).
15/02/13 18:50:27 INFO ipc.Client: Retrying connect to server:
ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010. Already
tried 2 time(s).

So, it looks like I can't use the EBS for persistent-hdfs. I was doing it
before, so something must have changed in the last couple of weeks (last
time I was using 1.1.0).

Is this a bug? Has the behaviour of AWS changed? Am I doing something
stupid? How do I fix it?

Thanks in advance!

Joe


Re: Has Spark 1.2.0 changed EC2 persistent-hdfs?

2015-02-13 Thread Joe Wass
Looks like this is caused by issue SPARK-5008:
https://issues.apache.org/jira/browse/SPARK-5008

On 13 February 2015 at 19:04, Joe Wass  wrote:

> I've updated to Spark 1.2.0 and the EC2 and the persistent-hdfs behaviour
> appears to have changed.
>
> My launch script is
>
> spark-1.2.0-bin-hadoop2.4/ec2/spark-ec2 --instance-type=m3.xlarge -s 5
> --ebs-vol-size=1000 launch myproject
>
> When I ssh into master I get:
>
> $ df -h
> FilesystemSize  Used Avail Use% Mounted on
> /dev/xvda17.9G  2.9G  5.0G  37% /
> tmpfs 7.3G 0  7.3G   0% /dev/shm
> /dev/xvdb  37G  1.3G   34G   4% /mnt
> /dev/xvdc  37G  177M   35G   1% /mnt2
> /dev/xvds1000G   33M 1000G   1% /vol0
>
> that /vol0 is the place I want (and assume) persistent-hdfs to go. But
> when I look at the size I get:
>
> $ persistent-hdfs/bin/start-all.sh
> $ persistent-hdfs/bin/hadoop dfsadmin -report
> Warning: $HADOOP_HOME is deprecated.
>
> Configured Capacity: 42275430400 (39.37 GB)
> Present Capacity: 2644878 (24.63 GB)
> DFS Remaining: 26448601088 (24.63 GB)
> DFS Used: 143360 (140 KB)
> DFS Used%: 0%
> Under replicated blocks: 0
> Blocks with corrupt replicas: 0
> Missing blocks: 0
>
> -
> Datanodes available: 5 (5 total, 0 dead)
>
> Name: 10.46.11.156:60010
> Decommission Status : Normal
> Configured Capacity: 8455086080 (7.87 GB)
> DFS Used: 28672 (28 KB)
> Non DFS Used: 3165372416 (2.95 GB)
> DFS Remaining: 5289684992(4.93 GB)
> DFS Used%: 0%
> DFS Remaining%: 62.56%
> Last contact: Fri Feb 13 17:41:46 UTC 2015
>
>
> Name: 10.41.51.155:60010
> Decommission Status : Normal
> Configured Capacity: 8455086080 (7.87 GB)
> DFS Used: 28672 (28 KB)
> Non DFS Used: 3165364224 (2.95 GB)
> DFS Remaining: 5289693184(4.93 GB)
> DFS Used%: 0%
> DFS Remaining%: 62.56%
> Last contact: Fri Feb 13 17:41:46 UTC 2015
>
>
> Name: 10.38.30.254:60010
> Decommission Status : Normal
> Configured Capacity: 8455086080 (7.87 GB)
> DFS Used: 28672 (28 KB)
> Non DFS Used: 3165249536 (2.95 GB)
> DFS Remaining: 5289807872(4.93 GB)
> DFS Used%: 0%
> DFS Remaining%: 62.56%
> Last contact: Fri Feb 13 17:41:46 UTC 2015
>
>
> Name: 10.204.134.84:60010
> Decommission Status : Normal
> Configured Capacity: 8455086080 (7.87 GB)
> DFS Used: 28672 (28 KB)
> Non DFS Used: 3165343744 (2.95 GB)
> DFS Remaining: 5289713664(4.93 GB)
> DFS Used%: 0%
> DFS Remaining%: 62.56%
> Last contact: Fri Feb 13 17:41:46 UTC 2015
>
>
> Name: 10.33.15.134:60010
> Decommission Status : Normal
> Configured Capacity: 8455086080 (7.87 GB)
> DFS Used: 28672 (28 KB)
> Non DFS Used: 3165356032 (2.95 GB)
> DFS Remaining: 5289701376(4.93 GB)
> DFS Used%: 0%
> DFS Remaining%: 62.56%
> Last contact: Fri Feb 13 17:41:46 UTC 2015
>
>
> That's tiny. My suspicions are aroused when I see:
>
> $ ls /vol
> persistent-hdfs
>
> /vol is on the small /dev/xvda1 not the large EBS /dev/xvds
>
> I thought I'd be able to edit persistent-hdfs/conf/core-site.xml to change
> the volume:
>
> 
>   hadoop.tmp.dir
>   /vol0/persistent-hdfs  
> 
>
> And then
>
> persistent-hdfs/bin/stop-all.sh && persistent-hdfs/bin/start-all.sh
>
> but when I do that, the persistent HDFS won't start for whatever reason. I
> can't run
>
> $ persistent-hdfs/bin/hadoop dfsadmin -report
>
> 15/02/13 18:50:25 INFO ipc.Client: Retrying connect to server:
> ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010.
> Already tried 0 time(s).
> 15/02/13 18:50:26 INFO ipc.Client: Retrying connect to server:
> ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010.
> Already tried 1 time(s).
> 15/02/13 18:50:27 INFO ipc.Client: Retrying connect to server:
> ec2-54-70-252-81.us-west-2.compute.amazonaws.com/10.23.161.84:9010.
> Already tried 2 time(s).
>
> So, it looks like I can't use the EBS for persistent-hdfs. I was doing it
> before, so something must have changed in the last couple of weeks (last
> time I was using 1.1.0).
>
> Is this a bug? Has the behaviour of AWS changed? Am I doing something
> stupid? How do I fix it?
>
> Thanks in advance!
>
> Joe
>
>
>


Unzipping large files and 2GB partition size.

2015-02-19 Thread Joe Wass
On the advice of some recent discussions on this list, I thought I would
try and consume gz files directly. I'm reading them, doing a preliminary
map, then repartitioning, then doing normal spark things.

As I understand it, zip files aren't readable in partitions because of the
format, so I thought that repartitioning would be the next best thing for
parallelism. I have about 200 files, some about 1GB compressed and some
over 2GB uncompressed.

I'm hitting the 2GB maximum partition size. It's been discussed on this
list (topic: "2GB limit for partitions?", tickets SPARK-1476 and
SPARK-1391).  Stack trace at the end. This happened at 10 hours in
(probably when it saw its first file). I can't just re-run it quickly!

Does anyone have any advice? Might I solve this by re-partitioning as the
first step after reading the file(s)? Or is it effectively impossible to
read a gz file that expands to over 2GB? Does anyone have any experience
with this?

Thanks in advance

Joe

Stack trace:

Exception in thread "main" 15/02/18 20:44:25 INFO scheduler.TaskSetManager:
Lost task 5.3 in stage 1.0 (TID 283) on executor:
java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE)
[duplicate 6]
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2
in stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage
1.0: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
at
org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
at
org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432)
at org.apache.spark.storage.BlockManager.get(BlockManager.scala:618)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)


Re: Unzipping large files and 2GB partition size.

2015-02-19 Thread Joe Wass
Thanks for your reply Sean.

Looks like it's happening in a map:

15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing tasks
from Stage 1 (MappedRDD[17] at mapToPair at
NativeMethodAccessorImpl.java:-2)

That's my initial 'parse' stage, done before repartitioning. It reduces the
data size significantly so I thought it would be sensible to do before
repartitioning, which involves moving lots of data around. That might be a
stupid idea in hindsight!

So the obvious thing to try would be to try repartitioning before the map
as the first transformation. I would have done that if I could be sure that
it would succeed or fail quickly.

I'm not entirely clear about the lazy execution of transformations in DAG.
It could be that the error is manifesting during the mapToPair, but caused
by the earlier read from text file stage.

Thanks for pointers to those compression formats. I'll give them a go
(although it's not trivial to re-encode 200 GB of data on S3, so if I can
get this working reasonably with gzip I'd like to).

Any advice about whether this error can be worked round with an early
partition?

Cheers

Joe


On 19 February 2015 at 09:51, Sean Owen  wrote:

> gzip and zip are not splittable compression formats; bzip and lzo are.
> Ideally, use a splittable compression format.
>
> Repartitioning is not a great solution since it means a shuffle, typically.
>
> This is not necessarily related to how big your partitions are. The
> question is, when does this happen? what operation?
>
> On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass  wrote:
> > On the advice of some recent discussions on this list, I thought I would
> try
> > and consume gz files directly. I'm reading them, doing a preliminary map,
> > then repartitioning, then doing normal spark things.
> >
> > As I understand it, zip files aren't readable in partitions because of
> the
> > format, so I thought that repartitioning would be the next best thing for
> > parallelism. I have about 200 files, some about 1GB compressed and some
> over
> > 2GB uncompressed.
> >
> > I'm hitting the 2GB maximum partition size. It's been discussed on this
> list
> > (topic: "2GB limit for partitions?", tickets SPARK-1476 and SPARK-1391).
> > Stack trace at the end. This happened at 10 hours in (probably when it
> saw
> > its first file). I can't just re-run it quickly!
> >
> > Does anyone have any advice? Might I solve this by re-partitioning as the
> > first step after reading the file(s)? Or is it effectively impossible to
> > read a gz file that expands to over 2GB? Does anyone have any experience
> > with this?
> >
> > Thanks in advance
> >
> > Joe
> >
> > Stack trace:
> >
> > Exception in thread "main" 15/02/18 20:44:25 INFO
> scheduler.TaskSetManager:
> > Lost task 5.3 in stage 1.0 (TID 283) on executor:
> > java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE)
> > [duplicate 6]
> > org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 2 in
> > stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage
> 1.0:
> > java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
> > at
> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
> > at
> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
> > at
> > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
> > at
> > org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432)
> > at
> org.apache.spark.storage.BlockManager.get(BlockManager.scala:618)
> > at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>


Re: Unzipping large files and 2GB partition size.

2015-02-19 Thread Joe Wass
Thanks for your detailed reply Imran. I'm writing this in Clojure (using
Flambo which uses the Java API) but I don't think that's relevant. So
here's the pseudocode (sorry I've not written Scala for a long time):

val rawData = sc.hadoopFile("/dir/to/gzfiles") // NB multiple files.
val parsedFiles = rawData.map(parseFunction)   // can return nil on failure
val filtered = parsedFiles.filter(notNil)
val partitioned = filtered.repartition(100) // guessed number
val persisted = partitioned.persist(StorageLevels.DISK_ONLY)

val resultA = stuffA(persisted)
val resultB = stuffB(persisted)
val resultC = stuffC(persisted)

So, I think I'm already doing what you suggested. I would have assumed that
partition size would be («size of expanded file» / «number of partitions»).
In this case, 100 (which I picked out of the air).

I wonder whether the «size of expanded file» is actually the size of all
concatenated input files (probably about 800 GB)? In that case should I
multiply it by the number of files? Or perhaps I'm barking up completely
the wrong tree.

Joe




On 19 February 2015 at 14:44, Imran Rashid  wrote:

> Hi Joe,
>
> The issue is not that you have input partitions that are bigger than 2GB
> -- its just that they are getting cached.  You can see in the stack trace,
> the problem is when you try to read data out of the DiskStore:
>
> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
>
> Also, just because you see this:
>
> 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
> tasks from Stage 1 (MappedRDD[17] at mapToPair at
> NativeMethodAccessorImpl.java:-2)
>
> it doesn't *necessarily* mean that this is coming from your map.  It can
> be pretty confusing how your operations on RDDs get turned into stages, it
> could be a lot more than just your map.  and actually, it might not even be
> your map at all -- some of the other operations you invoke call map
> underneath the covers.  So its hard to say what is going on here w/ out
> seeing more code.  Anyway, maybe you've already considered all this (you
> did mention the lazy execution of the DAG), but I wanted to make sure.  it
> might help to use rdd.setName() and also to look at rdd.toDebugString.
>
> As far as what you can do about this -- it could be as simple as moving
> your rdd.persist() to after you have compressed and repartitioned your
> data.  eg., I'm blindly guessing you have something like this:
>
> val rawData = sc.hadoopFile(...)
> rawData.persist(DISK)
> rawData.count()
> val compressedData = rawData.map{...}
> val repartitionedData = compressedData.repartition(N)
> ...
>
> change it to something like:
>
> val rawData = sc.hadoopFile(...)
> val compressedData = rawData.map{...}
> val repartitionedData = compressedData.repartition(N)
> repartitionedData.persist(DISK)
> repartitionedData.count()
> ...
>
>
> The point is, you avoid caching any data until you have ensured that the
> partitions are small.  You might have big partitions before that in
> rawData, but that is OK.
>
> Imran
>
>
> On Thu, Feb 19, 2015 at 4:43 AM, Joe Wass  wrote:
>
>> Thanks for your reply Sean.
>>
>> Looks like it's happening in a map:
>>
>> 15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing
>> tasks from Stage 1 (MappedRDD[17] at mapToPair at
>> NativeMethodAccessorImpl.java:-2)
>>
>> That's my initial 'parse' stage, done before repartitioning. It reduces
>> the data size significantly so I thought it would be sensible to do before
>> repartitioning, which involves moving lots of data around. That might be a
>> stupid idea in hindsight!
>>
>> So the obvious thing to try would be to try repartitioning before the map
>> as the first transformation. I would have done that if I could be sure that
>> it would succeed or fail quickly.
>>
>> I'm not entirely clear about the lazy execution of transformations in
>> DAG. It could be that the error is manifesting during the mapToPair, but
>> caused by the earlier read from text file stage.
>>
>> Thanks for pointers to those compression formats. I'll give them a go
>> (although it's not trivial to re-encode 200 GB of data on S3, so if I can
>> get this working reasonably with gzip I'd like to).
>>
>> Any advice about whether this error can be worked round with an early
>> partition?
>>
>> Cheers
>>
>> Joe
>>
>>
>> On 19 February 2015 at 09:51, Sean Owen  wrote:
>>
>>> gzip and zip are not splittable compression formats; bzip and lzo are.
>>> Ideally, use a splittable compression format.
>&g

Running out of space (when there's no shortage)

2015-02-24 Thread Joe Wass
I'm running a cluster of 3 Amazon EC2 machines (small number because it's
expensive when experiments keep crashing after a day!).

Today's crash looks like this (stacktrace at end of message).
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 0

On my three nodes, I have plenty of space and inodes:

A $ df -i
FilesystemInodes   IUsed   IFree IUse% Mounted on
/dev/xvda1524288   97937  426351   19% /
tmpfs1909200   1 19091991% /dev/shm
/dev/xvdb2457600  54 24575461% /mnt
/dev/xvdc2457600  24 24575761% /mnt2
/dev/xvds831869296   23844 8318454521% /vol0

A $ df -h
FilesystemSize  Used Avail Use% Mounted on
/dev/xvda17.9G  3.4G  4.5G  44% /
tmpfs 7.3G 0  7.3G   0% /dev/shm
/dev/xvdb  37G  1.2G   34G   4% /mnt
/dev/xvdc  37G  177M   35G   1% /mnt2
/dev/xvds1000G  802G  199G  81% /vol0

B $ df -i
FilesystemInodes   IUsed   IFree IUse% Mounted on
/dev/xvda1524288   97947  426341   19% /
tmpfs1906639   1 19066381% /dev/shm
/dev/xvdb2457600  54 24575461% /mnt
/dev/xvdc2457600  24 24575761% /mnt2
/dev/xvds816200704   24223 8161764811% /vol0

B $ df -h
FilesystemSize  Used Avail Use% Mounted on
/dev/xvda17.9G  3.6G  4.3G  46% /
tmpfs 7.3G 0  7.3G   0% /dev/shm
/dev/xvdb  37G  1.2G   34G   4% /mnt
/dev/xvdc  37G  177M   35G   1% /mnt2
/dev/xvds1000G  805G  195G  81% /vol0

C $df -i
FilesystemInodes   IUsed   IFree IUse% Mounted on
/dev/xvda1524288   97938  426350   19% /
tmpfs1906897   1 19068961% /dev/shm
/dev/xvdb2457600  54 24575461% /mnt
/dev/xvdc2457600  24 24575761% /mnt2
/dev/xvds755218352   24024 7551943281% /vol0
root@ip-10-204-136-223 ~]$

C $ df -h
FilesystemSize  Used Avail Use% Mounted on
/dev/xvda17.9G  3.4G  4.5G  44% /
tmpfs 7.3G 0  7.3G   0% /dev/shm
/dev/xvdb  37G  1.2G   34G   4% /mnt
/dev/xvdc  37G  177M   35G   1% /mnt2
/dev/xvds1000G  820G  181G  82% /vol0

The devices may be ~80% full but that still leaves ~200G free on each. My
spark-env.sh has

export SPARK_LOCAL_DIRS="/vol0/spark"

I have manually verified that on each slave the only temporary files are
stored on /vol0, all looking something like this

/vol0/spark/spark-f05d407c/spark-fca3e573/spark-78c06215/spark-4f0c4236/20/rdd_8_884

So it looks like all the files are being stored on the large drives
(incidentally they're AWS EBS volumes, but that's the only way to get
enough storage). My process crashed before with a slightly different
exception under the same circumstances: kryo.KryoException:
java.io.IOException: No space left on device

These both happen after several hours and several GB of temporary files.

Why does Spark think it's run out of space?

TIA

Joe

Stack trace 1:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 0
at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384)
at
org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at
org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:380)
at
org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:176)
at
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
at
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:40)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
at
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:93)
at
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:92)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:109)
at
org.apache.spark.s

Re: Running out of space (when there's no shortage)

2015-02-24 Thread Joe Wass
Thanks everyone.

Yiannis, do you know if there's a bug report for this regression? For some
other (possibly connected) reason I upgraded from 1.1.1 to 1.2.1, but I
can't remember what the bug was.

Joe




On 24 February 2015 at 19:26, Yiannis Gkoufas  wrote:

> Hi there,
>
> I assume you are using spark 1.2.1 right?
> I faced the exact same issue and switched to 1.1.1 with the same
> configuration and it was solved.
> On 24 Feb 2015 19:22, "Ted Yu"  wrote:
>
>> Here is a tool which may give you some clue:
>> http://file-leak-detector.kohsuke.org/
>>
>> Cheers
>>
>> On Tue, Feb 24, 2015 at 11:04 AM, Vladimir Rodionov <
>> vrodio...@splicemachine.com> wrote:
>>
>>> Usually it happens in Linux when application deletes file w/o double
>>> checking that there are no open FDs (resource leak). In this case, Linux
>>> holds all space allocated and does not release it until application
>>> exits (crashes in your case). You check file system and everything is
>>> normal, you have enough space and you have no idea why does application
>>> report "no space left on device".
>>>
>>> Just a guess.
>>>
>>> -Vladimir Rodionov
>>>
>>> On Tue, Feb 24, 2015 at 8:34 AM, Joe Wass  wrote:
>>>
>>>> I'm running a cluster of 3 Amazon EC2 machines (small number because
>>>> it's expensive when experiments keep crashing after a day!).
>>>>
>>>> Today's crash looks like this (stacktrace at end of message).
>>>> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
>>>> output location for shuffle 0
>>>>
>>>> On my three nodes, I have plenty of space and inodes:
>>>>
>>>> A $ df -i
>>>> FilesystemInodes   IUsed   IFree IUse% Mounted on
>>>> /dev/xvda1524288   97937  426351   19% /
>>>> tmpfs1909200   1 19091991% /dev/shm
>>>> /dev/xvdb2457600  54 24575461% /mnt
>>>> /dev/xvdc2457600  24 24575761% /mnt2
>>>> /dev/xvds831869296   23844 8318454521% /vol0
>>>>
>>>> A $ df -h
>>>> FilesystemSize  Used Avail Use% Mounted on
>>>> /dev/xvda17.9G  3.4G  4.5G  44% /
>>>> tmpfs 7.3G 0  7.3G   0% /dev/shm
>>>> /dev/xvdb  37G  1.2G   34G   4% /mnt
>>>> /dev/xvdc  37G  177M   35G   1% /mnt2
>>>> /dev/xvds1000G  802G  199G  81% /vol0
>>>>
>>>> B $ df -i
>>>> FilesystemInodes   IUsed   IFree IUse% Mounted on
>>>> /dev/xvda1524288   97947  426341   19% /
>>>> tmpfs1906639   1 19066381% /dev/shm
>>>> /dev/xvdb2457600  54 24575461% /mnt
>>>> /dev/xvdc2457600  24 24575761% /mnt2
>>>> /dev/xvds816200704   24223 8161764811% /vol0
>>>>
>>>> B $ df -h
>>>> FilesystemSize  Used Avail Use% Mounted on
>>>> /dev/xvda17.9G  3.6G  4.3G  46% /
>>>> tmpfs 7.3G 0  7.3G   0% /dev/shm
>>>> /dev/xvdb  37G  1.2G   34G   4% /mnt
>>>> /dev/xvdc  37G  177M   35G   1% /mnt2
>>>> /dev/xvds1000G  805G  195G  81% /vol0
>>>>
>>>> C $df -i
>>>> FilesystemInodes   IUsed   IFree IUse% Mounted on
>>>> /dev/xvda1524288   97938  426350   19% /
>>>> tmpfs1906897   1 19068961% /dev/shm
>>>> /dev/xvdb2457600  54 24575461% /mnt
>>>> /dev/xvdc2457600  24 24575761% /mnt2
>>>> /dev/xvds755218352   24024 7551943281% /vol0
>>>> root@ip-10-204-136-223 ~]$
>>>>
>>>> C $ df -h
>>>> FilesystemSize  Used Avail Use% Mounted on
>>>> /dev/xvda17.9G  3.4G  4.5G  44% /
>>>> tmpfs 7.3G 0  7.3G   0% /dev/shm
>>>> /dev/xvdb  37G  1.2G   34G   4% /mnt
>>>> /dev/xvdc  37G  177M   35G   1% /mnt2
>>>> /dev/xvds1000G  820G  181G  82% /vol0
>>>>
>>>> The devices may be ~80% full but that still leaves ~200G free on each.
>>>> My spark-env.sh has
>>>>
>>>> export SPARK_LOCAL_DIRS=&q