Re: Is there a reduceByKey functionality in DataFrame API?

2016-08-29 Thread Marius Soutier
Not really, a grouped DataFrame only provides SQL-like functions like sum and 
avg (at least in 1.5).

> On 29.08.2016, at 14:56, ayan guha <guha.a...@gmail.com> wrote:
> 
> If you are confused because of  the name of two APIs. I think DF API name 
> groupBy came from SQL, but it works similarly as reducebykey.
> 
> On 29 Aug 2016 20:57, "Marius Soutier" <mps@gmail.com 
> <mailto:mps@gmail.com>> wrote:
> In DataFrames (and thus in 1.5 in general) this is not possible, correct?
> 
>> On 11.08.2016, at 05:42, Holden Karau <hol...@pigscanfly.ca 
>> <mailto:hol...@pigscanfly.ca>> wrote:
>> 
>> Hi Luis,
>> 
>> You might want to consider upgrading to Spark 2.0 - but in Spark 1.6.2 you 
>> can do groupBy followed by a reduce on the GroupedDataset ( 
>> http://spark.apache.org/docs/1.6.2/api/scala/index.html#org.apache.spark.sql.GroupedDataset
>>  
>> <http://spark.apache.org/docs/1.6.2/api/scala/index.html#org.apache.spark.sql.GroupedDataset>
>>  ) - this works on a per-key basis despite the different name. In Spark 2.0 
>> you would use groupByKey on the Dataset followed by reduceGroups ( 
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.KeyValueGroupedDataset
>>  
>> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.KeyValueGroupedDataset>
>>  ).
>> 
>> Cheers,
>> 
>> Holden :)
>> 
>> On Wed, Aug 10, 2016 at 5:15 PM, luismattor <luismat...@gmail.com 
>> <mailto:luismat...@gmail.com>> wrote:
>> Hi everyone,
>> 
>> Consider the following code:
>> 
>> val result = df.groupBy("col1").agg(min("col2"))
>> 
>> I know that rdd.reduceByKey(func) produces the same RDD as
>> rdd.groupByKey().mapValues(value => value.reduce(func)) However reducerByKey
>> is more efficient as it avoids shipping each value to the reducer doing the
>> aggregation (it ships partial aggregations instead).
>> 
>> I wonder whether the DataFrame API optimizes the code doing something
>> similar to what RDD.reduceByKey does.
>> 
>> I am using Spark 1.6.2.
>> 
>> Regards,
>> Luis
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-reduceByKey-functionality-in-DataFrame-API-tp27508.html
>>  
>> <http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-reduceByKey-functionality-in-DataFrame-API-tp27508.html>
>> Sent from the Apache Spark User List mailing list archive at Nabble.com 
>> <http://nabble.com/>.
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>> <mailto:user-unsubscr...@spark.apache.org>
>> 
>> 
>> 
>> 
>> -- 
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau <https://twitter.com/holdenkarau>



Re: Is there a reduceByKey functionality in DataFrame API?

2016-08-29 Thread Marius Soutier
In DataFrames (and thus in 1.5 in general) this is not possible, correct?

> On 11.08.2016, at 05:42, Holden Karau  wrote:
> 
> Hi Luis,
> 
> You might want to consider upgrading to Spark 2.0 - but in Spark 1.6.2 you 
> can do groupBy followed by a reduce on the GroupedDataset ( 
> http://spark.apache.org/docs/1.6.2/api/scala/index.html#org.apache.spark.sql.GroupedDataset
>  
> 
>  ) - this works on a per-key basis despite the different name. In Spark 2.0 
> you would use groupByKey on the Dataset followed by reduceGroups ( 
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.KeyValueGroupedDataset
>  
> 
>  ).
> 
> Cheers,
> 
> Holden :)
> 
> On Wed, Aug 10, 2016 at 5:15 PM, luismattor  > wrote:
> Hi everyone,
> 
> Consider the following code:
> 
> val result = df.groupBy("col1").agg(min("col2"))
> 
> I know that rdd.reduceByKey(func) produces the same RDD as
> rdd.groupByKey().mapValues(value => value.reduce(func)) However reducerByKey
> is more efficient as it avoids shipping each value to the reducer doing the
> aggregation (it ships partial aggregations instead).
> 
> I wonder whether the DataFrame API optimizes the code doing something
> similar to what RDD.reduceByKey does.
> 
> I am using Spark 1.6.2.
> 
> Regards,
> Luis
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-reduceByKey-functionality-in-DataFrame-API-tp27508.html
>  
> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> 
> 
> 
> 
> 
> -- 
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau 


Re: Spark Web UI port 4040 not working

2016-07-27 Thread Marius Soutier
That's to be expected - the application UI is not started by the master, but by 
the driver. So the UI will run on the machine that submits the job.


> On 26.07.2016, at 15:49, Jestin Ma  wrote:
> 
> I did netstat -apn | grep 4040 on machine 6, and I see
> 
> tcp0  0 :::4040 :::*
> LISTEN  30597/java
> 
> What does this mean?
> 
> On Tue, Jul 26, 2016 at 6:47 AM, Jestin Ma  > wrote:
> I do not deploy using cluster mode and I don't use EC2.
> 
> I just read that launching as client mode: "the driver is launched directly 
> within the spark-submit process which acts as a client to the cluster."
> 
> My current setup is that I have cluster machines 1, 2, 3, 4, 5, with 1 being 
> the master. 
> I submit from another cluster machine 6 in client mode. So I'm taking that 
> the driver is launched in my machine 6.
> 
> On Tue, Jul 26, 2016 at 6:38 AM, Jacek Laskowski  > wrote:
> Hi,
> 
> Do you perhaps deploy using cluster mode? Is this EC2? You'd need to
> figure out where the driver runs and use the machine's IP.
> 
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/ 
> Mastering Apache Spark http://bit.ly/mastering-apache-spark 
> 
> Follow me at https://twitter.com/jaceklaskowski 
> 
> 
> 
> On Tue, Jul 26, 2016 at 3:36 PM, Jestin Ma  > wrote:
> > I tried doing that on my master node.
> > I got nothing.
> > However, I grep'd port 8080 and I got the standalone UI.
> >
> > On Tue, Jul 26, 2016 at 12:39 AM, Chanh Le  > > wrote:
> >>
> >> You’re running in StandAlone Mode?
> >> Usually inside active task it will show the address of current job.
> >> or you can check in master node by using netstat -apn | grep 4040
> >>
> >>
> >>
> >> > On Jul 26, 2016, at 8:21 AM, Jestin Ma  >> > >
> >> > wrote:
> >> >
> >> > Hello, when running spark jobs, I can access the master UI (port 8080
> >> > one) no problem. However, I'm confused as to how to access the web UI to 
> >> > see
> >> > jobs/tasks/stages/etc.
> >> >
> >> > I can access the master UI at http://:8080. But port 4040
> >> > gives me a -connection cannot be reached-.
> >> >
> >> > Is the web UI http:// with a port of 4040?
> >> >
> >> > I'm running my Spark job on a cluster machine and submitting it to a
> >> > master node part of the cluster. I heard of ssh tunneling; is that 
> >> > relevant
> >> > here?
> >> >
> >> > Thank you!
> >>
> >
> 
> 



Re: Substract two DStreams

2016-06-28 Thread Marius Soutier
Sure, no problem.

> On 28.06.2016, at 08:57, Matthias Niehoff <matthias.nieh...@codecentric.de> 
> wrote:
> 
> ah, didn't know about this. That might actually work. I solved it by 
> implementing the leftJoinWithCassandraTable by myself which is nearly as fast 
> as the normal join. This should be faster than joining and subtracting then. 
> Anyway, thanks for the hint of the transformWith method!
> 
> Am 27. Juni 2016 um 14:32 schrieb Marius Soutier <mps@gmail.com 
> <mailto:mps@gmail.com>>:
> `transformWith` accepts another stream, wouldn't that work?
> 
>> On 27.06.2016, at 14:04, Matthias Niehoff <matthias.nieh...@codecentric.de 
>> <mailto:matthias.nieh...@codecentric.de>> wrote:
>> 
>> in transform I have only access to one stream and not do both the original 
>> and the change stream. in foreachRDD i can change the stream and have both 
>> the original RDD and the changed RDD to do a substract.
>> 
>> 2016-06-27 13:13 GMT+02:00 Marius Soutier <mps@gmail.com 
>> <mailto:mps@gmail.com>>:
>> Can't you use `transform` instead of `foreachRDD`?
>> 
>> 
>> 
>>> On 15.06.2016, at 15:18, Matthias Niehoff <matthias.nieh...@codecentric.de 
>>> <mailto:matthias.nieh...@codecentric.de>> wrote:
>>> 
>>> Hi,
>>> 
>>> i want to subtract 2 DStreams (based on the same Input Stream) to get all 
>>> elements that exist in the original stream, but not in the modified stream 
>>> (the modified Stream is changed using joinWithCassandraTable which does an 
>>> inner join and because of this might remove entries).
>>> 
>>> Subtract is only possible on RDDs. So I could use a foreachRDD right in the 
>>> beginning of the Stream processing and work on rdds. I think its quite ugly 
>>> to use the output op at the beginning and then implement a lot of 
>>> transformations in the foreachRDD. So could you think of different ways to 
>>> do an efficient diff between to DStreams?
>>> 
>>> Thank you
>>> 
>>> -- 
>>> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
>>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>>> tel: +49 (0) 721.9595-681 <tel:%2B49%20%280%29%20721.9595-681> | fax: +49 
>>> (0) 721.9595-666 <tel:%2B49%20%280%29%20721.9595-666> | mobil: +49 (0) 
>>> 172.1702676 <tel:%2B49%20%280%29%20172.1702676>
>>> www.codecentric.de <http://www.codecentric.de/> | blog.codecentric.de 
>>> <http://blog.codecentric.de/> | www.meettheexperts.de 
>>> <http://www.meettheexperts.de/> | www.more4fi.de <http://www.more4fi.de/> 
>>> 
>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>>> 
>>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche 
>>> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige 
>>> Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie 
>>> bitte sofort den Absender und löschen Sie diese E-Mail und evtl. 
>>> beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen 
>>> evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist 
>>> nicht gestattet
>> 
>> 
>> 
>> 
>> -- 
>> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>> tel: +49 (0) 721.9595-681 <tel:%2B49%20%280%29%20721.9595-681> | fax: +49 
>> (0) 721.9595-666 <tel:%2B49%20%280%29%20721.9595-666> | mobil: +49 (0) 
>> 172.1702676 <tel:%2B49%20%280%29%20172.1702676>
>> www.codecentric.de <http://www.codecentric.de/> | blog.codecentric.de 
>> <http://blog.codecentric.de/> | www.meettheexperts.de 
>> <http://www.meettheexperts.de/> | www.more4fi.de <http://www.more4fi.de/> 
>> 
>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>> 
>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche 
>> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige 
>> Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie 
>> bi

Re: Substract two DStreams

2016-06-27 Thread Marius Soutier
Can't you use `transform` instead of `foreachRDD`?


> On 15.06.2016, at 15:18, Matthias Niehoff  
> wrote:
> 
> Hi,
> 
> i want to subtract 2 DStreams (based on the same Input Stream) to get all 
> elements that exist in the original stream, but not in the modified stream 
> (the modified Stream is changed using joinWithCassandraTable which does an 
> inner join and because of this might remove entries).
> 
> Subtract is only possible on RDDs. So I could use a foreachRDD right in the 
> beginning of the Stream processing and work on rdds. I think its quite ugly 
> to use the output op at the beginning and then implement a lot of 
> transformations in the foreachRDD. So could you think of different ways to do 
> an efficient diff between to DStreams?
> 
> Thank you
> 
> -- 
> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> tel: +49 (0) 721.9595-681  | fax: +49 (0) 
> 721.9595-666  | mobil: +49 (0) 
> 172.1702676 
> www.codecentric.de  | blog.codecentric.de 
>  | www.meettheexperts.de 
>  | www.more4fi.de  
> 
> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
> 
> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche 
> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige 
> Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie 
> bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter 
> Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. 
> beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht 
> gestattet



Re: Use cases for kafka direct stream messageHandler

2016-03-08 Thread Marius Soutier

> On 04.03.2016, at 22:39, Cody Koeninger  wrote:
> 
> The only other valid use of messageHandler that I can think of is
> catching serialization problems on a per-message basis.  But with the
> new Kafka consumer library, that doesn't seem feasible anyway, and
> could be handled with a custom (de)serializer.

What do you mean, that doesn't seem feasible? You mean when using a custom 
deserializer? Right now I'm catching serialization problems in the message 
handler, after your proposed change I'd catch them in `map()`.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark.kryo.registrationRequired: Tuple2 is not registered

2015-09-10 Thread Marius Soutier
Found an issue for this:
https://issues.apache.org/jira/browse/SPARK-10251 
<https://issues.apache.org/jira/browse/SPARK-10251>
> On 09.09.2015, at 18:00, Marius Soutier <mps@gmail.com> wrote:
> 
> Hi all,
> 
> as indicated in the title, I’m using Kryo with a custom Kryo serializer, but 
> as soon as I enable `spark.kryo.registrationRequired`, my Spark Streaming job 
> fails to start with this exception:
> 
> Class is not registered: scala.collection.immutable.Range
> 
> When I register it, it continues with Tuple2, which I cannot serialize sanely 
> for all specialized forms. According to the documentation, this should be 
> handled by Chill. Is this a bug or what am I missing?
> 
> I’m using Spark 1.4.1.
> 
> 
> Cheers
> - Marius
> 



spark.kryo.registrationRequired: Tuple2 is not registered

2015-09-09 Thread Marius Soutier
Hi all,

as indicated in the title, I’m using Kryo with a custom Kryo serializer, but as 
soon as I enable `spark.kryo.registrationRequired`, my Spark Streaming job 
fails to start with this exception:

Class is not registered: scala.collection.immutable.Range

When I register it, it continues with Tuple2, which I cannot serialize sanely 
for all specialized forms. According to the documentation, this should be 
handled by Chill. Is this a bug or what am I missing?

I’m using Spark 1.4.1.


Cheers
- Marius


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark-1.2.2-bin-hadoop2.4.tgz missing

2015-04-20 Thread Marius Soutier
Same problem here...

 On 20.04.2015, at 09:59, Zsolt Tóth toth.zsolt@gmail.com wrote:
 
 Hi all,
 
 it looks like the 1.2.2 pre-built version for hadoop2.4 is not available on 
 the mirror sites. Am I missing something?
 
 Regards,
 Zsolt


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Streaming problems running 24x7

2015-04-20 Thread Marius Soutier
The processing speed displayed in the UI doesn’t seem to take everything into 
account. I also had a low processing time but had to increase batch duration 
from 30 seconds to 1 minute because waiting batches kept increasing. Now it 
runs fine.

 On 17.04.2015, at 13:30, González Salgado, Miquel 
 miquel.gonza...@tecsidel.es wrote:
 
 Hi,
  
 Thank you for your response, 
 I think it is not because of the processing speed, in fact the delay is under 
 1 second, while the batch interval is 10 seconds… The data volume is low (10 
 lines / second)
  
 Changing to local[8] was worsening the problem (cpu increase more quickly)
  
 By the way, I have seen some results changing to this call of Kafkautils: 
 
 KafkaUtils.createDirectStream
  
 CPU usage is low and stable, but memory is slowly increasing… But at least 
 the process last longer..
  
 Best regards, 
 Miquel
  
  
 De: bit1...@163.com mailto:bit1...@163.com [mailto:bit1...@163.com 
 mailto:bit1...@163.com] 
 Enviado el: jueves, 16 de abril de 2015 10:58
 Para: González Salgado, Miquel; user
 Asunto: Re: Streaming problems running 24x7
  
 From your description, looks like the data processing speed is far behind the 
 data receiving speed
  
 Could you try to increase the core number when you submit the application? 
 such as local[8]?
  
 bit1...@163.com mailto:bit1...@163.com
  
 From: Miquel mailto:miquel.gonza...@tecsidel.es
 Date: 2015-04-16 16:39
 To: user mailto:user@spark.apache.org
 Subject: Streaming problems running 24x7
 Hello,
 I'm finding problems to run a spark streaming job for more than a few hours
 (3 or 4). It begins working OK, but it degrades until failure. Some of the
 symptoms:
  
 - Consumed memory and CPU keeps getting higher ang higher, and finally some
 error is being thrown (java.lang.Exception: Could not compute split, block
 input-0-1429168311800 not found) and data stops being calculated. 
  
 - The delay showed in web UI keeps also increasing.
  
 - After some hours disk space is being consumed. There are a lot of
 directories with name like /tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c
  
 The job is basically reading information from kafka topic, and calculate
 several topN tables for some key and value camps related with netflow data,
 some of the parameters are this:
 - batch interval: 10 seconds
 - window calculation: 1 minute
 - spark.cleaner.ttl: 5 minutes
  
 The execution is standalone on one machine (16GB RAM , 12 cores), and the
 options to run it is as follows:
 /opt/spark/bin/spark-submit --driver-java-options -XX:+UseCompressedOops
 --jars $JARS --class $APPCLASS --master local[2] $APPJAR 
  
 someone has some clues about the problem? I don't know if it is a
 configuration problem or some error in the code that is causing memory
 leaks..
  
 Thank you in advance!
 Miquel
  
 PD: the code is basically this:--
  
 object NetflowTopn {

   var appPath = .
   var zkQuorum = 
   var group = 
   var topics = 
   var numThreads = 1
  
   var batch_interval = 10
   var n_window = 1
   var n_slide = 1
   var topnsize = 10
  
   var hm = Map[String,Int]()
   hm += ( unix_secs -  0 )
   hm += ( unix_nsecs - 1 )
   hm += ( sysuptime -  2 )
   hm += ( exaddr - 3 )
   hm += ( dpkts -  4 )
   hm += ( doctets -5 )
   hm += ( first -  6 )
   hm += ( last -   7 )
   hm += ( engine_type - 8 )
   hm += ( engine_id -   9 )
   hm += ( srcaddr -10 )
   hm += ( dstaddr -11 )
   hm += ( nexthop -12 )
   hm += ( input -  13 )
   hm += ( output - 14 )
   hm += ( srcport -15 )
   hm += ( dstport -16 )
   hm += ( prot -   17 )
   hm += ( tos -18 )
   hm += ( tcp_flags -  19 )
   hm += ( src_mask -   20 )
   hm += ( dst_mask -   21 )
   hm += ( src_as - 22 )
   hm += ( dst_as - 23 )
 
   def getKey (lcamps: Array[String], camp: String): String  = {
 if (camp == total) return total
 else return lcamps(hm(camp))
   }
  
   def getVal (lcamps: Array[String], camp: String): Long  = {
 if (camp == flows) return 1L
 else return lcamps(hm(camp)).toLong
   }
   
   def getKeyVal (line: String, keycamps: List[String], valcamp: String ) = {
 val arr = line.split(,)
 (keycamps.map(getKey(arr, _)).mkString(,)   ,   getVal(arr,valcamp) )
   }
  
   def writeOutput (data: Array[(Long, String)], keycamps_str: String,
 csvheader: String, valcamp: String, prefix: String) = {
   
val ts = System.currentTimeMillis
val f1 = appPath + /data/ + prefix + _ + keycamps_str + _ +
 valcamp + .csv
val f1f = new File(f1);
val ftmpf = new File(f1 + ts);
val pw = new PrintWriter(ftmpf)
pw.println(csvheader)
data.foreach{
 t =  pw.println (t._2 + , + t._1) 
}
pw.close
ftmpf.renameTo(f1f);

   }
  
 
   def main(args: Array[String]) {
 
 if (args.length  1) {
   

Re: Is the disk space in SPARK_LOCAL_DIRS cleanned up?

2015-04-14 Thread Marius Soutier
It cleans the work dir, and SPARK_LOCAL_DIRS should be cleaned automatically. 
From the source code comments:
// SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the
// application finishes.


 On 13.04.2015, at 11:26, Guillaume Pitel guillaume.pi...@exensa.com wrote:
 
 Does it also cleanup spark local dirs ? I thought it was only cleaning 
 $SPARK_HOME/work/
 
 Guillaume
 I have set SPARK_WORKER_OPTS in spark-env.sh for that. For example:
 
 export SPARK_WORKER_OPTS=-Dspark.worker.cleanup.enabled=true 
 -Dspark.worker.cleanup.appDataTtl=seconds
 
 On 11.04.2015, at 00:01, Wang, Ningjun (LNG-NPV) 
 ningjun.w...@lexisnexis.com mailto:ningjun.w...@lexisnexis.com wrote:
 
 Does anybody have an answer for this?
  
 Thanks
 Ningjun
  
 From: Wang, Ningjun (LNG-NPV) 
 Sent: Thursday, April 02, 2015 12:14 PM
 To: user@spark.apache.org mailto:user@spark.apache.org
 Subject: Is the disk space in SPARK_LOCAL_DIRS cleanned up?
  
 I set SPARK_LOCAL_DIRS   to   C:\temp\spark-temp. When RDDs are shuffled, 
 spark writes to this folder. I found that the disk space of this folder 
 keep on increase quickly and at certain point I will run out of disk space. 
  
 I wonder does spark clean up the disk space in this folder once the shuffle 
 operation is done? If not, I need to write a job to clean it up myself. But 
 how do I know which sub folders there can be removed?
  
 Ningjun
 
 
 
 -- 
 exensa_logo_mail.png
 Guillaume PITEL, Président 
 +33(0)626 222 431
 
 eXenSa S.A.S. http://www.exensa.com/ 
 41, rue Périer - 92120 Montrouge - FRANCE 
 Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: Is the disk space in SPARK_LOCAL_DIRS cleanned up?

2015-04-14 Thread Marius Soutier
That’s true, spill dirs don’t get cleaned up when something goes wrong. We are 
are restarting long running jobs once in a while for cleanups and have 
spark.cleaner.ttl set to a lower value than the default.

 On 14.04.2015, at 17:57, Guillaume Pitel guillaume.pi...@exensa.com wrote:
 
 Right, I remember now, the only problematic case is when things go bad and 
 the cleaner is not executed.
 
 Also, it can be a problem when reusing the same sparkcontext for many runs.
 
 Guillaume
 It cleans the work dir, and SPARK_LOCAL_DIRS should be cleaned 
 automatically. From the source code comments:
 // SPARK_LOCAL_DIRS environment variable, and deleted by the Worker when the
 // application finishes.
 
 
 On 13.04.2015, at 11:26, Guillaume Pitel guillaume.pi...@exensa.com 
 mailto:guillaume.pi...@exensa.com wrote:
 
 Does it also cleanup spark local dirs ? I thought it was only cleaning 
 $SPARK_HOME/work/
 
 Guillaume
 I have set SPARK_WORKER_OPTS in spark-env.sh for that. For example:
 
 export SPARK_WORKER_OPTS=-Dspark.worker.cleanup.enabled=true 
 -Dspark.worker.cleanup.appDataTtl=seconds
 
 On 11.04.2015, at 00:01, Wang, Ningjun (LNG-NPV) 
 ningjun.w...@lexisnexis.com mailto:ningjun.w...@lexisnexis.com wrote:
 
 Does anybody have an answer for this?
  
 Thanks
 Ningjun
  
 From: Wang, Ningjun (LNG-NPV) 
 Sent: Thursday, April 02, 2015 12:14 PM
 To: user@spark.apache.org mailto:user@spark.apache.org
 Subject: Is the disk space in SPARK_LOCAL_DIRS cleanned up?
  
 I set SPARK_LOCAL_DIRS   to   C:\temp\spark-temp. When RDDs are shuffled, 
 spark writes to this folder. I found that the disk space of this folder 
 keep on increase quickly and at certain point I will run out of disk 
 space. 
  
 I wonder does spark clean up the disk space in this folder once the 
 shuffle operation is done? If not, I need to write a job to clean it up 
 myself. But how do I know which sub folders there can be removed?
  
 Ningjun
 
 
 
 -- 
 exensa_logo_mail.png
 Guillaume PITEL, Président 
 +33(0)626 222 431
 
 eXenSa S.A.S. http://www.exensa.com/ 
 41, rue Périer - 92120 Montrouge - FRANCE 
 Tel +33(0)184 163 677 / Fax +33(0)972 283 705
 
 
 
 -- 
 exensa_logo_mail.png
 Guillaume PITEL, Président 
 +33(0)626 222 431
 
 eXenSa S.A.S. http://www.exensa.com/ 
 41, rue Périer - 92120 Montrouge - FRANCE 
 Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Re: Is the disk space in SPARK_LOCAL_DIRS cleanned up?

2015-04-13 Thread Marius Soutier
I have set SPARK_WORKER_OPTS in spark-env.sh for that. For example:

export SPARK_WORKER_OPTS=-Dspark.worker.cleanup.enabled=true 
-Dspark.worker.cleanup.appDataTtl=seconds

 On 11.04.2015, at 00:01, Wang, Ningjun (LNG-NPV) 
 ningjun.w...@lexisnexis.com wrote:
 
 Does anybody have an answer for this?
  
 Thanks
 Ningjun
  
 From: Wang, Ningjun (LNG-NPV) 
 Sent: Thursday, April 02, 2015 12:14 PM
 To: user@spark.apache.org mailto:user@spark.apache.org
 Subject: Is the disk space in SPARK_LOCAL_DIRS cleanned up?
  
 I set SPARK_LOCAL_DIRS   to   C:\temp\spark-temp. When RDDs are shuffled, 
 spark writes to this folder. I found that the disk space of this folder keep 
 on increase quickly and at certain point I will run out of disk space. 
  
 I wonder does spark clean up the disk space in this folder once the shuffle 
 operation is done? If not, I need to write a job to clean it up myself. But 
 how do I know which sub folders there can be removed?
  
 Ningjun



actorStream woes

2015-03-30 Thread Marius Soutier
Hi there,

I'm using Spark Streaming 1.2.1 with actorStreams. Initially, all goes well.

15/03/30 15:37:00 INFO spark.storage.MemoryStore: Block broadcast_1 stored as 
values in memory (estimated size 3.2 KB, free 1589.8 MB)
15/03/30 15:37:00 INFO spark.storage.BlockManagerInfo: Added broadcast_1_piece0 
in memory on master:54258 (size: 1771.0 B, free: 1589.8 MB)
15/03/30 15:37:04 INFO spark.storage.BlockManagerInfo: Added 
input-1-1427722624400 in memory on worker-1:40379 (size: 997.0 B, free: 2.1 
GB)
...

this works for a while (20-40 minutes) until suddenly:

[WARN] [03/30/2015 17:16:03.497] 
[stream-aggregate-transactions-akka.remote.default-remote-dispatcher-6] 
[akka.tcp://stream-aggregate-transactions@master-ip:7080/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FsparkExecutor%40worker-5%3A49144-4]
 Association with remote system [akka.tcp://sparkExecutor@worker-5:49144] has 
failed, address is now gated for [5000] ms. Reason is: [Disassociated].
[WARN] [03/30/2015 17:16:03.501] 
[stream-aggregate-transactions-akka.remote.default-remote-dispatcher-6] 
[akka.tcp://stream-aggregate-transactions@master-ip:7080/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FsparkExecutor%40worker-2%3A47280-7]
 Association with remote system [akka.tcp://sparkExecutor@worker-2:47280] has 
failed, address is now gated for [5000] ms. Reason is: [Disassociated].

15/03/30 17:16:04 ERROR spark.scheduler.TaskSchedulerImpl: Lost executor 1 on 
worker-2: remote Akka client disassociated

[WARN] [03/30/2015 17:16:09.427] 
[stream-aggregate-transactions-akka.remote.default-remote-dispatcher-6] 
[Remoting] Tried to associate with unreachable remote address 
[akka.tcp://sparkExecutor@worker-5:49144]. Address is now gated for 5000 ms, 
all messages to this address will be delivered to dead letters. Reason: 
Connection refused: worker-5/worker-5-ip:49144

15/03/30 17:16:09 INFO scheduler.cluster.SparkDeploySchedulerBackend: 
Registered executor: 
Actor[akka.tcp://sparkExecutor@worker-5:37442/user/Executor#-2032935972] with 
ID 6
15/03/30 17:16:09 INFO spark.scheduler.TaskSetManager: Starting task 2.0 in 
stage 1983.1 (TID 118538, worker-5, NODE_LOCAL, 1393 bytes)

15/03/30 17:16:10 INFO spark.storage.BlockManagerMasterActor: Registering block 
manager worker-5:39945 with 2.1 GB RAM, BlockManagerId(6, worker-5, 39945)

15/03/30 17:16:14 INFO spark.scheduler.TaskSetManager: Starting task 15.0 in 
stage 1983.1 (TID 118541, worker-5, NODE_LOCAL, 1393 bytes)
15/03/30 17:16:14 WARN spark.scheduler.TaskSetManager: Lost task 6.0 in stage 
1983.1 (TID 118539, worker-5): java.lang.Exception: Could not compute split, 
block input-2-1427728282000 not found
  at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
  at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
  at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
  at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
  at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
  at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  at org.apache.spark.scheduler.Task.run(Task.scala:56)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)


From there one it goes only downhill, foreachRDD(.saveAsParquet) is messing up 
my Parquet files (only contains _temporary directory).
In the Streaming UI, often times one of the receivers will stay at 0 records, 
the executor now uses 0 memory.

Batch interval is 30secs with 50-70 recods per receiver per batch. I have 5 
actor streams (one per node) with 10 total cores assigned.
Driver has 3 GB RAM, each worker 4 GB.

There is certainly no memory pressure, Memory Used is around 100kb, Input 
is around 10 MB.

Thanks for any pointers,
- Marius


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Processing of text file in large gzip archive

2015-03-16 Thread Marius Soutier

 1. I don't think textFile is capable of unpacking a .gz file. You need to use 
 hadoopFile or newAPIHadoop file for this.

Sorry that’s incorrect, textFile works fine on .gz files. What it can’t do is 
compute splits on gz files, so if you have a single file, you'll have a single 
partition.

Processing 30 GB of gzipped data should not take that long, at least with the 
Scala API. Python not sure, especially under 1.2.1.



Re: Spark Streaming recover from Checkpoint with Spark SQL

2015-03-12 Thread Marius Soutier
Thanks, the new guide did help - instantiating the SQLContext inside foreachRDD 
did the trick for me, but the SQLContext singleton works as well.

Now the only problem left is that spark.driver.port is not retained after 
starting from a checkpoint, so my Actor receivers are running on a random 
port...


 On 12.03.2015, at 02:35, Tathagata Das t...@databricks.com wrote:
 
 Can you show us the code that you are using?
 
 This might help. This is the updated streaming programming guide for 1.3, 
 soon to be up, this is a quick preview. 
 http://people.apache.org/~tdas/spark-1.3.0-temp-docs/streaming-programming-guide.html#dataframe-and-sql-operations
  
 http://people.apache.org/~tdas/spark-1.3.0-temp-docs/streaming-programming-guide.html#dataframe-and-sql-operations
 
 TD
 
 On Wed, Mar 11, 2015 at 12:20 PM, Marius Soutier mps@gmail.com 
 mailto:mps@gmail.com wrote:
 Forgot to mention, it works when using .foreachRDD(_.saveAsTextFile(“”)).
 
  On 11.03.2015, at 18:35, Marius Soutier mps@gmail.com 
  mailto:mps@gmail.com wrote:
 
  Hi,
 
  I’ve written a Spark Streaming Job that inserts into a Parquet, using 
  stream.foreachRDD(_insertInto(“table”, overwrite = true). Now I’ve added 
  checkpointing; everything works fine when starting from scratch. When 
  starting from a checkpoint however, the job doesn’t work and produces the 
  following exception in the foreachRDD:
 
  ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error running job 
  streaming job 142609383 ms.2
  org.apache.spark.SparkException: RDD transformations and actions can only 
  be invoked by the driver, not inside of other transformations; for example, 
  rdd1.map(x = rdd2.values.count() * x) is invalid because the values 
  transformation and count action cannot be performed inside of the rdd1.map 
  transformation. For more information, see SPARK-5063.
at org.apache.spark.rdd.RDD.sc 
  http://org.apache.spark.rdd.rdd.sc/(RDD.scala:90)
at org.apache.spark.rdd.RDD.init(RDD.scala:143)
at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108)
at 
  org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:114)
at 
  MyStreamingJob$$anonfun$createComputation$3.apply(MyStreamingJob:167)
at 
  MyStreamingJob$$anonfun$createComputation$3.apply(MyStreamingJob:167)
at 
  org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:535)
at 
  org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:535)
at 
  org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
at 
  org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at 
  org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 
 
 
 
  Cheers
  - Marius
 
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 mailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org 
 mailto:user-h...@spark.apache.org
 
 



Spark Streaming recover from Checkpoint with Spark SQL

2015-03-11 Thread Marius Soutier
Hi,

I’ve written a Spark Streaming Job that inserts into a Parquet, using 
stream.foreachRDD(_insertInto(“table”, overwrite = true). Now I’ve added 
checkpointing; everything works fine when starting from scratch. When starting 
from a checkpoint however, the job doesn’t work and produces the following 
exception in the foreachRDD:

ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error running job 
streaming job 142609383 ms.2
org.apache.spark.SparkException: RDD transformations and actions can only be 
invoked by the driver, not inside of other transformations; for example, 
rdd1.map(x = rdd2.values.count() * x) is invalid because the values 
transformation and count action cannot be performed inside of the rdd1.map 
transformation. For more information, see SPARK-5063.
at org.apache.spark.rdd.RDD.sc(RDD.scala:90)
at org.apache.spark.rdd.RDD.init(RDD.scala:143)
at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108)
at org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:114)
at MyStreamingJob$$anonfun$createComputation$3.apply(MyStreamingJob:167)
at MyStreamingJob$$anonfun$createComputation$3.apply(MyStreamingJob:167)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:535)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:535)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)




Cheers
- Marius


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming recover from Checkpoint with Spark SQL

2015-03-11 Thread Marius Soutier
Forgot to mention, it works when using .foreachRDD(_.saveAsTextFile(“”)).

 On 11.03.2015, at 18:35, Marius Soutier mps@gmail.com wrote:
 
 Hi,
 
 I’ve written a Spark Streaming Job that inserts into a Parquet, using 
 stream.foreachRDD(_insertInto(“table”, overwrite = true). Now I’ve added 
 checkpointing; everything works fine when starting from scratch. When 
 starting from a checkpoint however, the job doesn’t work and produces the 
 following exception in the foreachRDD:
 
 ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error running job 
 streaming job 142609383 ms.2
 org.apache.spark.SparkException: RDD transformations and actions can only be 
 invoked by the driver, not inside of other transformations; for example, 
 rdd1.map(x = rdd2.values.count() * x) is invalid because the values 
 transformation and count action cannot be performed inside of the rdd1.map 
 transformation. For more information, see SPARK-5063.
   at org.apache.spark.rdd.RDD.sc(RDD.scala:90)
   at org.apache.spark.rdd.RDD.init(RDD.scala:143)
   at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108)
   at org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:114)
   at MyStreamingJob$$anonfun$createComputation$3.apply(MyStreamingJob:167)
   at MyStreamingJob$$anonfun$createComputation$3.apply(MyStreamingJob:167)
   at 
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:535)
   at 
 org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:535)
   at 
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
   at 
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
   at 
 org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
 
 
 
 
 Cheers
 - Marius
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Executor lost with too many temp files

2015-02-26 Thread Marius Soutier
Yeah did that already (65k). We also disabled swapping and reduced the amount 
of memory allocated to Spark (available - 4). This seems to have resolved the 
situation.

Thanks!

 On 26.02.2015, at 05:43, Raghavendra Pandey raghavendra.pan...@gmail.com 
 wrote:
 
 Can you try increasing the ulimit -n on your machine.
 
 On Mon, Feb 23, 2015 at 10:55 PM, Marius Soutier mps@gmail.com 
 mailto:mps@gmail.com wrote:
 Hi Sameer,
 
 I’m still using Spark 1.1.1, I think the default is hash shuffle. No external 
 shuffle service.
 
 We are processing gzipped JSON files, the partitions are the amount of input 
 files. In my current data set we have ~850 files that amount to 60 GB (so 
 ~600 GB uncompressed). We have 5 workers with 8 cores and 48 GB RAM each. We 
 extract five different groups of data from this to filter, clean and 
 denormalize (i.e. join) it for easier downstream processing.
 
 By the way this code does not seem to complete at all without using 
 coalesce() at a low number, 5 or 10 work great. Everything above that make it 
 very likely it will crash, even on smaller datasets (~300 files). But I’m not 
 sure if this is related to the above issue.
 
 
 On 23.02.2015, at 18:15, Sameer Farooqui same...@databricks.com 
 mailto:same...@databricks.com wrote:
 
 Hi Marius,
 
 Are you using the sort or hash shuffle?
 
 Also, do you have the external shuffle service enabled (so that the Worker 
 JVM or NodeManager can still serve the map spill files after an Executor 
 crashes)?
 
 How many partitions are in your RDDs before and after the problematic 
 shuffle operation?
 
 
 
 On Monday, February 23, 2015, Marius Soutier mps@gmail.com 
 mailto:mps@gmail.com wrote:
 Hi guys,
 
 I keep running into a strange problem where my jobs start to fail with the 
 dreaded Resubmitted (resubmitted due to lost executor)” because of having 
 too many temp files from previous runs.
 
 Both /var/run and /spill have enough disk space left, but after a given 
 amount of jobs have run, following jobs will struggle with completion. There 
 are a lot of failures without any exception message, only the above 
 mentioned lost executor. As soon as I clear out /var/run/spark/work/ and the 
 spill disk, everything goes back to normal.
 
 Thanks for any hint,
 - Marius
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 For additional commands, e-mail: user-h...@spark.apache.org 
 
 
 



Effects of persist(XYZ_2)

2015-02-25 Thread Marius Soutier
Hi,

just a quick question about calling persist with the _2 option. Is the 2x 
replication only useful for fault tolerance, or will it also increase job speed 
by avoiding network transfers? Assuming I’m doing joins or other shuffle 
operations.

Thanks


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Effects of persist(XYZ_2)

2015-02-25 Thread Marius Soutier
Yes. Effectively, could it avoid network transfers? Or put differently, would 
an option like persist(MEMORY_ALL) improve job speed by caching an RDD on every 
worker?

 On 25.02.2015, at 11:42, Sean Owen so...@cloudera.com wrote:
 
 If you mean, can both copies of the blocks be used for computations?
 yes they can.
 
 On Wed, Feb 25, 2015 at 10:36 AM, Marius Soutier mps@gmail.com wrote:
 Hi,
 
 just a quick question about calling persist with the _2 option. Is the 2x 
 replication only useful for fault tolerance, or will it also increase job 
 speed by avoiding network transfers? Assuming I’m doing joins or other 
 shuffle operations.
 
 Thanks
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Executor lost with too many temp files

2015-02-23 Thread Marius Soutier
Hi Sameer,

I’m still using Spark 1.1.1, I think the default is hash shuffle. No external 
shuffle service.

We are processing gzipped JSON files, the partitions are the amount of input 
files. In my current data set we have ~850 files that amount to 60 GB (so ~600 
GB uncompressed). We have 5 workers with 8 cores and 48 GB RAM each. We extract 
five different groups of data from this to filter, clean and denormalize (i.e. 
join) it for easier downstream processing.

By the way this code does not seem to complete at all without using coalesce() 
at a low number, 5 or 10 work great. Everything above that make it very likely 
it will crash, even on smaller datasets (~300 files). But I’m not sure if this 
is related to the above issue.


 On 23.02.2015, at 18:15, Sameer Farooqui same...@databricks.com wrote:
 
 Hi Marius,
 
 Are you using the sort or hash shuffle?
 
 Also, do you have the external shuffle service enabled (so that the Worker 
 JVM or NodeManager can still serve the map spill files after an Executor 
 crashes)?
 
 How many partitions are in your RDDs before and after the problematic shuffle 
 operation?
 
 
 
 On Monday, February 23, 2015, Marius Soutier mps@gmail.com 
 mailto:mps@gmail.com wrote:
 Hi guys,
 
 I keep running into a strange problem where my jobs start to fail with the 
 dreaded Resubmitted (resubmitted due to lost executor)” because of having 
 too many temp files from previous runs.
 
 Both /var/run and /spill have enough disk space left, but after a given 
 amount of jobs have run, following jobs will struggle with completion. There 
 are a lot of failures without any exception message, only the above mentioned 
 lost executor. As soon as I clear out /var/run/spark/work/ and the spill 
 disk, everything goes back to normal.
 
 Thanks for any hint,
 - Marius
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org javascript:;
 For additional commands, e-mail: user-h...@spark.apache.org javascript:;
 



Executor lost with too many temp files

2015-02-23 Thread Marius Soutier
Hi guys,

I keep running into a strange problem where my jobs start to fail with the 
dreaded Resubmitted (resubmitted due to lost executor)” because of having too 
many temp files from previous runs.

Both /var/run and /spill have enough disk space left, but after a given amount 
of jobs have run, following jobs will struggle with completion. There are a lot 
of failures without any exception message, only the above mentioned lost 
executor. As soon as I clear out /var/run/spark/work/ and the spill disk, 
everything goes back to normal.

Thanks for any hint,
- Marius


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Executor Lost with StorageLevel.MEMORY_AND_DISK_SER

2015-02-10 Thread Marius Soutier
Unfortunately no. I just removed the persist statements to get the job to run, 
but now it sometimes fails with

Job aborted due to stage failure: Task 162 in stage 2.1 failed 4 times, most 
recent failure: Lost task 162.3 in stage 2.1 (TID 1105, xxx.compute.internal): 
java.io.FileNotFoundException: 
/tmp/spark-local-20150210030009-b4f1/3f/shuffle_4_655_49 (No space left on 
device)
Even though there’s plenty of disk space left.


On 10.02.2015, at 00:09, Muttineni, Vinay vmuttin...@ebay.com wrote:

 Hi Marius,
 Did you find a solution to this problem? I get the same error.
 Thanks,
 Vinay
 
 -Original Message-
 From: Marius Soutier [mailto:mps@gmail.com] 
 Sent: Monday, February 09, 2015 2:19 AM
 To: user
 Subject: Executor Lost with StorageLevel.MEMORY_AND_DISK_SER
 
 Hi there,
 
 I'm trying to improve performance on a job that has GC troubles and takes 
 longer to compute simply because it has to recompute failed tasks. After 
 deferring object creation as much as possible, I'm now trying to improve 
 memory usage with StorageLevel.MEMORY_AND_DISK_SER and a custom 
 KryoRegistrator that registers all used classes. This works fine both in unit 
 tests and on a local cluster (i.e. master and worker on my dev machine). On 
 the production cluster this fails without any error message except:
 
 Job aborted due to stage failure: Task 10 in stage 2.0 failed 4 times, most 
 recent failure: Lost task 10.3 in stage 2.0 (TID 20, xxx.compute.internal): 
 ExecutorLostFailure (executor lost) Driver stacktrace:
 
 Is there any way to understand what's going on? The logs don't show anything. 
 I'm using Spark 1.1.1.
 
 
 Thanks
 - Marius
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
 commands, e-mail: user-h...@spark.apache.org
 



Executor Lost with StorageLevel.MEMORY_AND_DISK_SER

2015-02-09 Thread Marius Soutier
Hi there,

I’m trying to improve performance on a job that has GC troubles and takes 
longer to compute simply because it has to recompute failed tasks. After 
deferring object creation as much as possible, I’m now trying to improve memory 
usage with StorageLevel.MEMORY_AND_DISK_SER and a custom KryoRegistrator that 
registers all used classes. This works fine both in unit tests and on a local 
cluster (i.e. master and worker on my dev machine). On the production cluster 
this fails without any error message except:

Job aborted due to stage failure: Task 10 in stage 2.0 failed 4 times, most 
recent failure: Lost task 10.3 in stage 2.0 (TID 20, xxx.compute.internal): 
ExecutorLostFailure (executor lost)
Driver stacktrace:

Is there any way to understand what’s going on? The logs don’t show anything. 
I’m using Spark 1.1.1.


Thanks
- Marius


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Intermittent test failures

2014-12-17 Thread Marius Soutier
Using TestSQLContext from multiple tests leads to:

SparkException: : Task not serializable

ERROR ContextCleaner: Error cleaning broadcast 10
java.lang.NullPointerException
at 
org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:246)
at 
org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:46)
at 
org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66)
at 
org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:185)
at 
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:147)
at 
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:138)
at scala.Option.foreach(Option.scala:236)


On 15.12.2014, at 22:36, Marius Soutier mps@gmail.com wrote:

 Ok, maybe these test versions will help me then. I’ll check it out.
 
 On 15.12.2014, at 22:33, Michael Armbrust mich...@databricks.com wrote:
 
 Using a single SparkContext should not cause this problem.  In the SQL tests 
 we use TestSQLContext and TestHive which are global singletons for all of 
 our unit testing.
 
 On Mon, Dec 15, 2014 at 1:27 PM, Marius Soutier mps@gmail.com wrote:
 Possible, yes, although I’m trying everything I can to prevent it, i.e. fork 
 in Test := true and isolated. Can you confirm that reusing a single 
 SparkContext for multiple tests poses a problem as well?
 
 Other than that, just switching from SQLContext to HiveContext also provoked 
 the error.
 
 
 On 15.12.2014, at 20:22, Michael Armbrust mich...@databricks.com wrote:
 
 Is it possible that you are starting more than one SparkContext in a single 
 JVM with out stopping previous ones?  I'd try testing with Spark 1.2, which 
 will throw an exception in this case.
 
 On Mon, Dec 15, 2014 at 8:48 AM, Marius Soutier mps@gmail.com wrote:
 Hi,
 
 I’m seeing strange, random errors when running unit tests for my Spark 
 jobs. In this particular case I’m using Spark SQL to read and write Parquet 
 files, and one error that I keep running into is this one:
 
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 
 in stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in stage 
 6.0 (TID 223, localhost): java.io.IOException: PARSING_ERROR(2)
 org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
 org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
 org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:545)
 
 I can only prevent this from happening by using isolated Specs tests thats 
 always create a new SparkContext that is not shared between tests (but 
 there can also be only a single SparkContext per test), and also by using 
 standard SQLContext instead of HiveContext. It does not seem to have 
 anything to do with the actual files that I also create during the test run 
 with SQLContext.saveAsParquetFile.
 
 
 Cheers
 - Marius
 
 
 PS The full trace:
 
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 
 in stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in stage 
 6.0 (TID 223, localhost): java.io.IOException: PARSING_ERROR(2)
 org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
 org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
 org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:545)
 
 org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125)
 
 org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
 
 org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58)
 
 org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
 
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:232)
 
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)
 
 org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)
 sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 java.lang.reflect.Method.invoke(Method.java:606)
 
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
 
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990

Migrating Parquet inputs

2014-12-15 Thread Marius Soutier
Hi,

is there an easy way to “migrate” parquet files or indicate optional values in 
sql statements? I added a couple of new fields that I also use in a 
schemaRDD.sql() which obviously fails for input files that don’t have the new 
fields.

Thanks
- Marius


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Intermittent test failures

2014-12-15 Thread Marius Soutier
Hi,

I’m seeing strange, random errors when running unit tests for my Spark jobs. In 
this particular case I’m using Spark SQL to read and write Parquet files, and 
one error that I keep running into is this one:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 in 
stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in stage 6.0 (TID 
223, localhost): java.io.IOException: PARSING_ERROR(2)
org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:545)

I can only prevent this from happening by using isolated Specs tests thats 
always create a new SparkContext that is not shared between tests (but there 
can also be only a single SparkContext per test), and also by using standard 
SQLContext instead of HiveContext. It does not seem to have anything to do with 
the actual files that I also create during the test run with 
SQLContext.saveAsParquetFile.


Cheers
- Marius


PS The full trace:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 in 
stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in stage 6.0 (TID 
223, localhost): java.io.IOException: PARSING_ERROR(2)
org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:545)

org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125)

org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58)

org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)

org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:232)

org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)

org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)
sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)

org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)

org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:160)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
 ~[spark-core_2.10-1.1.1.jar:1.1.1]
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
 ~[spark-core_2.10-1.1.1.jar:1.1.1]
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
 ~[spark-core_2.10-1.1.1.jar:1.1.1]
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
~[scala-library.jar:na]
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
~[scala-library.jar:na]
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) 
~[spark-core_2.10-1.1.1.jar:1.1.1]
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Intermittent test failures

2014-12-15 Thread Marius Soutier
Ok, maybe these test versions will help me then. I’ll check it out.

On 15.12.2014, at 22:33, Michael Armbrust mich...@databricks.com wrote:

 Using a single SparkContext should not cause this problem.  In the SQL tests 
 we use TestSQLContext and TestHive which are global singletons for all of our 
 unit testing.
 
 On Mon, Dec 15, 2014 at 1:27 PM, Marius Soutier mps@gmail.com wrote:
 Possible, yes, although I’m trying everything I can to prevent it, i.e. fork 
 in Test := true and isolated. Can you confirm that reusing a single 
 SparkContext for multiple tests poses a problem as well?
 
 Other than that, just switching from SQLContext to HiveContext also provoked 
 the error.
 
 
 On 15.12.2014, at 20:22, Michael Armbrust mich...@databricks.com wrote:
 
 Is it possible that you are starting more than one SparkContext in a single 
 JVM with out stopping previous ones?  I'd try testing with Spark 1.2, which 
 will throw an exception in this case.
 
 On Mon, Dec 15, 2014 at 8:48 AM, Marius Soutier mps@gmail.com wrote:
 Hi,
 
 I’m seeing strange, random errors when running unit tests for my Spark jobs. 
 In this particular case I’m using Spark SQL to read and write Parquet files, 
 and one error that I keep running into is this one:
 
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 
 in stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in stage 
 6.0 (TID 223, localhost): java.io.IOException: PARSING_ERROR(2)
 org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
 org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
 org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:545)
 
 I can only prevent this from happening by using isolated Specs tests thats 
 always create a new SparkContext that is not shared between tests (but there 
 can also be only a single SparkContext per test), and also by using standard 
 SQLContext instead of HiveContext. It does not seem to have anything to do 
 with the actual files that I also create during the test run with 
 SQLContext.saveAsParquetFile.
 
 
 Cheers
 - Marius
 
 
 PS The full trace:
 
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 
 in stage 6.0 failed 1 times, most recent failure: Lost task 19.0 in stage 
 6.0 (TID 223, localhost): java.io.IOException: PARSING_ERROR(2)
 org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:78)
 org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
 org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:545)
 
 org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:125)
 
 org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:88)
 org.xerial.snappy.SnappyInputStream.init(SnappyInputStream.java:58)
 
 org.apache.spark.io.SnappyCompressionCodec.compressedInputStream(CompressionCodec.scala:128)
 
 org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:232)
 
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readObject$1.apply$mcV$sp(TorrentBroadcast.scala:169)
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:927)
 
 org.apache.spark.broadcast.TorrentBroadcast.readObject(TorrentBroadcast.scala:155)
 sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 java.lang.reflect.Method.invoke(Method.java:606)
 
 java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 
 org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
 
 org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:160)
 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 java.lang.Thread.run(Thread.java:745)
 Driver stacktrace:
 at 
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
  ~[spark-core_2.10-1.1.1.jar:1.1.1

Re: Merging Parquet Files

2014-11-19 Thread Marius Soutier
You can also insert into existing tables via .insertInto(tableName, overwrite). 
You just have to import sqlContext._

On 19.11.2014, at 09:41, Daniel Haviv danielru...@gmail.com wrote:

 Hello,
 I'm writing a process that ingests json files and saves them a parquet files.
 The process is as such:
 
 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 val jsonRequests=sqlContext.jsonFile(/requests)
 val parquetRequests=sqlContext.parquetFile(/requests_parquet)
 
 jsonRequests.registerTempTable(jsonRequests)
 parquetRequests.registerTempTable(parquetRequests)
 
 val unified_requests=sqlContext.sql(select * from jsonRequests union select 
 * from parquetRequests)
 
 unified_requests.saveAsParquetFile(/tempdir)
 
 and then I delete /requests_parquet and rename /tempdir as /requests_parquet
 
 Is there a better way to achieve that ? 
 
 Another problem I have is that I get a lot of small json files and as a 
 result a lot of small parquet files, I'd like to merge the json files into a 
 few parquet files.. how I do that?
 
 Thank you,
 Daniel
 
 



Re: Snappy temp files not cleaned up

2014-11-06 Thread Marius Soutier
Default value is infinite, so you need to enable it. Personally I’ve setup a 
couple of cron jobs to clean up /tmp and /var/run/spark.

On 06.11.2014, at 08:15, Romi Kuntsman r...@totango.com wrote:

 Hello,
 
 Spark has an internal cleanup mechanism
 (defined by spark.cleaner.ttl, see 
 http://spark.apache.org/docs/latest/configuration.html)
 which cleans up tasks and stages.
 
 However, in our installation, we noticed that Snappy temporary files and 
 never cleaned up.
 
 Is it a misconfiguration? Missing feature? How do you deal with build-up of 
 temp files?
 
 Thanks,
 
 Romi Kuntsman, Big Data Engineer
 http://www.totango.com



Re: SparkSQL performance

2014-11-03 Thread Marius Soutier
I did some simple experiments with Impala and Spark, and Impala came out ahead. 
But it’s also less flexible, couldn’t handle irregular schemas, didn't support 
Json, and so on.

On 01.11.2014, at 02:20, Soumya Simanta soumya.sima...@gmail.com wrote:

 I agree. My personal experience with Spark core is that it performs really 
 well once you tune it properly. 
 
 As far I understand SparkSQL under the hood performs many of these 
 optimizations (order of Spark operations) and uses a more efficient storage 
 format. Is this assumption correct? 
 
 Has anyone done any comparison of SparkSQL with Impala ? The fact that many 
 of the queries don't even finish in the benchmark is quite surprising and 
 hard to believe. 
 
 A few months ago there were a few emails about Spark not being able to handle 
 large volumes (TBs) of data. That myth was busted recently when the folks at 
 Databricks published their sorting record results. 
  
 
 Thanks
 -Soumya
 
 
 
 
  
 
 On Fri, Oct 31, 2014 at 7:35 PM, Du Li l...@yahoo-inc.com wrote:
 We have seen all kinds of results published that often contradict each other. 
 My take is that the authors often know more tricks about how to tune their 
 own/familiar products than the others. So the product on focus is tuned for 
 ideal performance while the competitors are not. The authors are not 
 necessarily biased but as a consequence the results are.
 
 Ideally it’s critical for the user community to be informed of all the 
 in-depth tuning tricks of all products. However, realistically, there is a 
 big gap in terms of documentation. Hope the Spark folks will make a 
 difference. :-)
 
 Du
 
 
 From: Soumya Simanta soumya.sima...@gmail.com
 Date: Friday, October 31, 2014 at 4:04 PM
 To: user@spark.apache.org user@spark.apache.org
 Subject: SparkSQL performance
 
 I was really surprised to see the results here, esp. SparkSQL not completing
 http://www.citusdata.com/blog/86-making-postgresql-scale-hadoop-style
 
 I was under the impression that SparkSQL performs really well because it can 
 optimize the RDD operations and load only the columns that are required. This 
 essentially means in most cases SparkSQL should be as fast as Spark is. 
 
 I would be very interested to hear what others in the group have to say about 
 this. 
 
 Thanks
 -Soumya
 
 
 



Re: Submiting Spark application through code

2014-11-02 Thread Marius Soutier
Just a wild guess, but I had to exclude “javax.servlet.servlet-api” from my 
Hadoop dependencies to run a SparkContext.

In your build.sbt:

org.apache.hadoop % hadoop-common % “... exclude(javax.servlet, 
servlet-api),
org.apache.hadoop % hadoop-hdfs % “... exclude(javax.servlet, 
servlet-api”)

(or whatever Hadoop deps you use)

If you're using Maven:

 exclusions
  exclusion
   groupIdjavax.servlet/groupId
  artifactIdservlet-api/artifactId
...


On 31.10.2014, at 07:14, sivarani whitefeathers...@gmail.com wrote:

 I tried running it but dint work
 
 public static final SparkConf batchConf= new SparkConf();
 String master = spark://sivarani:7077;
 String spark_home =/home/sivarani/spark-1.0.2-bin-hadoop2/;
 String jar = /home/sivarani/build/Test.jar;
 public static final JavaSparkContext batchSparkContext = new
 JavaSparkContext(master,SparkTest,spark_home,new String[] {jar});
 
 public static void main(String args[]){
   runSpark(0,TestSubmit);}
 
   public static void runSpark(int crit, String dataFile){
 JavaRDDString logData = batchSparkContext.textFile(input, 10);
 flatMap
 maptoparr
 reduceByKey
 ListTuple2lt;String, Integer output1 = counts.collect();
}
   
 
 This works fine with spark-submit but when i tried to submit through code
 LeadBatchProcessing.runSpark(0, TestSubmit.csv);
 
 I get this following error 
 
 HTTP Status 500 - javax.servlet.ServletException:
 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 0.0:0 failed 4 times, most recent failure: TID 29 on host 172.18.152.36
 failed for unknown reason
 Job aborted due to stage failure: Task 0.0:0 failed 4 times, most recent
 failure: TID 29 on host 172.18.152.36 failed for unknown reason Driver
 stacktrace:
 
 
 
 Any Advice on this?
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Submiting-Spark-application-through-code-tp17452p17797.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 



Re: use additional ebs volumes for hsdf storage with spark-ec2

2014-11-01 Thread Marius Soutier
Are these /vols formatted? You typically need to format and define a mount 
point in /mnt for attached EBS volumes.

I’m not using the ec2 script, so I don’t know what is installed, but there’s 
usually an HDFS info service running on port 50070. After changing 
hdfs-site.xml, you have to restart the HDFS service. The Cloudera distribution 
supports this in the UI, otherwise depending on your version and so on there 
should be scripts in /usr/local/hadoop, /usr/lib/hadoop-hdfs, or something 
similar.

On 31.10.2014, at 05:56, Daniel Mahler dmah...@gmail.com wrote:

 Thanks Akhil. I tried changing /root/ephemeral-hdfs/conf/hdfs-site.xml to have
 
   property
 namedfs.data.dir/name
 
 value/vol,/vol0,/vol1,/vol2,/vol3,/vol4,/vol5,/vol6,/vol7,/mnt/ephemeral-hdfs/data,/mnt2/ephemeral-hdfs/data/value
   /property
 
 and then running
 
 /root/ephemeral-hdfs/bin/stop-all.sh
 copy-dir  /root/ephemeral-hdfs/conf/
 /root/ephemeral-hdfs/bin/start-all.sh
 
 to try and make sure the new configurations taks on the entire cluster.
 I then ran spark to write to the local hdfs.
 It failed after filling the original /mnt* mounted drives,,
 without writing anything to the attached /vol* drives.
 
 I also tried completely stopping and restarting the cluster,
 but restarting resets /root/ephemeral-hdfs/conf/hdfs-site.xml to the default 
 state.
 
 thanks
 Daniel
 
 
 
 On Thu, Oct 30, 2014 at 1:56 AM, Akhil Das ak...@sigmoidanalytics.com wrote:
 I think you can check in the core-site.xml or hdfs-site.xml file under 
 /root/ephemeral-hdfs/etc/hadoop/ where you can see data node dir property 
 which will be a comma separated list of volumes. 
 
 Thanks
 Best Regards
 
 On Thu, Oct 30, 2014 at 5:21 AM, Daniel Mahler dmah...@gmail.com wrote:
 I started my ec2 spark cluster with 
 
 ./ec2/spark---ebs-vol-{size=100,num=8,type=gp2} -t m3.xlarge -s 10 launch 
 mycluster
 
 I see the additional volumes attached but they do not seem to be set up for 
 hdfs.
 How can I check if they are being utilized on all workers,
 and how can I get all workers to utilize the extra volumes for hdfs.
 I do not have experience using hadoop directly, only through spark.
 
 thanks
 Daniel
 
 



Re: scala.collection.mutable.ArrayOps$ofRef$.length$extension since Spark 1.1.0

2014-10-27 Thread Marius Soutier
So, apparently `wholeTextFiles` runs the job again, passing null as argument 
list, which in turn blows up my argument parsing mechanics. I never thought I 
had to check for null again in a pure Scala environment ;)

On 26.10.2014, at 11:57, Marius Soutier mps@gmail.com wrote:

 I tried that already, same exception. I also tried using an accumulator to 
 collect all filenames. The filename is not the problem.
 
 Even this crashes with the same exception:
 
 sc.parallelize(files.value).map { fileName =
  println(sScanning $fileName)
  try {
println(sScanning $fileName)
sc.textFile(fileName).take(1)
sSuccessfully scanned $fileName
  } catch {
case t: Throwable = sFailed to process $fileName, reason 
 ${t.getStackTrace.head}
  }
}
.saveAsTextFile(output)
 
 The output file contains “Failed to process… for each file.
 
 
 On 26.10.2014, at 00:10, Buttler, David buttl...@llnl.gov wrote:
 
 This sounds like expected behavior to me.  The foreach call should be 
 distributed on the workers.  perhaps you want to use map instead, and then 
 collect the failed file names locally, or save the whole thing out to a file
 
 From: Marius Soutier [mps@gmail.com]
 Sent: Friday, October 24, 2014 6:35 AM
 To: user@spark.apache.org
 Subject: scala.collection.mutable.ArrayOps$ofRef$.length$extension since 
 Spark 1.1.0
 
 Hi,
 
 I’m running a job whose simple task it is to find files that cannot be read 
 (sometimes our gz files are corrupted).
 
 With 1.0.x, this worked perfectly. Since 1.1.0 however, I’m getting an 
 exception: 
 scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:114)
 
   sc.wholeTextFiles(input)
 .foreach { case (fileName, _) =
   try {
 println(sScanning $fileName)
 sc.textFile(fileName).take(1)
 println(sSuccessfully scanned $fileName)
   } catch {
 case t: Throwable = println(sFailed to process $fileName, reason 
 ${t.getStackTrace.head})
   }
 }
 
 
 Also since 1.1.0, the printlns are no longer visible on the console, only in 
 the Spark UI worker output.
 
 
 Thanks for any help
 - Marius
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: scala.collection.mutable.ArrayOps$ofRef$.length$extension since Spark 1.1.0

2014-10-26 Thread Marius Soutier
I tried that already, same exception. I also tried using an accumulator to 
collect all filenames. The filename is not the problem.

Even this crashes with the same exception:

sc.parallelize(files.value).map { fileName =
  println(sScanning $fileName)
  try {
println(sScanning $fileName)
sc.textFile(fileName).take(1)
sSuccessfully scanned $fileName
  } catch {
case t: Throwable = sFailed to process $fileName, reason 
${t.getStackTrace.head}
  }
}
.saveAsTextFile(output)

The output file contains “Failed to process… for each file.


On 26.10.2014, at 00:10, Buttler, David buttl...@llnl.gov wrote:

 This sounds like expected behavior to me.  The foreach call should be 
 distributed on the workers.  perhaps you want to use map instead, and then 
 collect the failed file names locally, or save the whole thing out to a file
 
 From: Marius Soutier [mps@gmail.com]
 Sent: Friday, October 24, 2014 6:35 AM
 To: user@spark.apache.org
 Subject: scala.collection.mutable.ArrayOps$ofRef$.length$extension since 
 Spark 1.1.0
 
 Hi,
 
 I’m running a job whose simple task it is to find files that cannot be read 
 (sometimes our gz files are corrupted).
 
 With 1.0.x, this worked perfectly. Since 1.1.0 however, I’m getting an 
 exception: 
 scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:114)
 
sc.wholeTextFiles(input)
  .foreach { case (fileName, _) =
try {
  println(sScanning $fileName)
  sc.textFile(fileName).take(1)
  println(sSuccessfully scanned $fileName)
} catch {
  case t: Throwable = println(sFailed to process $fileName, reason 
 ${t.getStackTrace.head})
}
  }
 
 
 Also since 1.1.0, the printlns are no longer visible on the console, only in 
 the Spark UI worker output.
 
 
 Thanks for any help
 - Marius
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



scala.collection.mutable.ArrayOps$ofRef$.length$extension since Spark 1.1.0

2014-10-24 Thread Marius Soutier
Hi,

I’m running a job whose simple task it is to find files that cannot be read 
(sometimes our gz files are corrupted).

With 1.0.x, this worked perfectly. Since 1.1.0 however, I’m getting an 
exception: 
scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:114)

sc.wholeTextFiles(input)
  .foreach { case (fileName, _) =
try {
  println(sScanning $fileName)
  sc.textFile(fileName).take(1)
  println(sSuccessfully scanned $fileName)
} catch {
  case t: Throwable = println(sFailed to process $fileName, reason 
${t.getStackTrace.head})
}
  }


Also since 1.1.0, the printlns are no longer visible on the console, only in 
the Spark UI worker output.


Thanks for any help
- Marius


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SparkSQL and columnar data

2014-10-23 Thread Marius Soutier
Hi guys,

another question: what’s the approach to working with column-oriented data, 
i.e. data with more than 1000 columns. Using Parquet for this should be fine, 
but how well does SparkSQL handle the big amount of columns? Is there a limit? 
Should we use standard Spark instead?

Thanks for any insights,
- Marius


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Python vs Scala performance

2014-10-22 Thread Marius Soutier
Hi there,

we have a small Spark cluster running and are processing around 40 GB of 
Gzip-compressed JSON data per day. I have written a couple of word count-like 
Scala jobs that essentially pull in all the data, do some joins, group bys and 
aggregations. A job takes around 40 minutes to complete.

Now one of the data scientists on the team wants to do write some jobs using 
Python. To learn Spark, he rewrote one of my Scala jobs in Python. From the 
API-side, everything looks more or less identical. However his jobs take 
between 5-8 hours to complete! We can also see that the execution plan is quite 
different, I’m seeing writes to the output much later than in Scala.

Is Python I/O really that slow?


Thanks
- Marius


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Python vs Scala performance

2014-10-22 Thread Marius Soutier
We’re using 1.1.0. Yes I expected Scala to be maybe twice as fast, but not 
that...

On 22.10.2014, at 13:02, Nicholas Chammas nicholas.cham...@gmail.com wrote:

 What version of Spark are you running? Some recent changes to how PySpark 
 works relative to Scala Spark may explain things.
 
 PySpark should not be that much slower, not by a stretch.
 
 On Wed, Oct 22, 2014 at 6:11 AM, Ashic Mahtab as...@live.com wrote:
 I'm no expert, but looked into how the python bits work a while back (was 
 trying to assess what it would take to add F# support). It seems python hosts 
 a jvm inside of it, and talks to scala spark in that jvm. The python server 
 bit translates the python calls to those in the jvm. The python spark 
 context is like an adapter to the jvm spark context. If you're seeing 
 performance discrepancies, this might be the reason why. If the code can be 
 organised to require fewer interactions with the adapter, that may improve 
 things. Take this with a pinch of salt...I might be way off on this :)
 
 Cheers,
 Ashic.
 
  From: mps@gmail.com
  Subject: Python vs Scala performance
  Date: Wed, 22 Oct 2014 12:00:41 +0200
  To: user@spark.apache.org
 
  
  Hi there,
  
  we have a small Spark cluster running and are processing around 40 GB of 
  Gzip-compressed JSON data per day. I have written a couple of word 
  count-like Scala jobs that essentially pull in all the data, do some joins, 
  group bys and aggregations. A job takes around 40 minutes to complete.
  
  Now one of the data scientists on the team wants to do write some jobs 
  using Python. To learn Spark, he rewrote one of my Scala jobs in Python. 
  From the API-side, everything looks more or less identical. However his 
  jobs take between 5-8 hours to complete! We can also see that the execution 
  plan is quite different, I’m seeing writes to the output much later than in 
  Scala.
  
  Is Python I/O really that slow?
  
  
  Thanks
  - Marius
  
  
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
  
 



Re: Python vs Scala performance

2014-10-22 Thread Marius Soutier
Didn’t seem to help:

conf = SparkConf().set(spark.shuffle.spill, 
false).set(spark.default.parallelism, 12)
sc = SparkContext(appName=’app_name', conf = conf)

but still taking as much time

On 22.10.2014, at 14:17, Nicholas Chammas nicholas.cham...@gmail.com wrote:

 Total guess without knowing anything about your code: Do either of these two 
 notes from the 1.1.0 release notes affect things at all?
 
 PySpark now performs external spilling during aggregations. Old behavior can 
 be restored by setting spark.shuffle.spill to false.
 PySpark uses a new heuristic for determining the parallelism of shuffle 
 operations. Old behavior can be restored by setting spark.default.parallelism 
 to the number of cores in the cluster.
 Nick
 



Re: Python vs Scala performance

2014-10-22 Thread Marius Soutier
Yeah we’re using Python 2.7.3.

On 22.10.2014, at 20:06, Nicholas Chammas nicholas.cham...@gmail.com wrote:

 On Wed, Oct 22, 2014 at 11:34 AM, Eustache DIEMERT eusta...@diemert.fr 
 wrote:
 
 
 
 Wild guess maybe, but do you decode the json records in Python ? it could be 
 much slower as the default lib is quite slow. 
 
 
 Oh yeah, this is a good place to look. Also, just upgrading to Python 2.7 may 
 be enough performance improvement because they merged in the fast JSON 
 deserializing from simplejson into the standard library. So you may not need 
 to use an external library like ujson, though that may help too.
 
 Nick
 
 ​



Re: Python vs Scala performance

2014-10-22 Thread Marius Soutier
Can’t install that on our cluster, but I can try locally. Is there a pre-built 
binary available?

On 22.10.2014, at 19:01, Davies Liu dav...@databricks.com wrote:

 In the master, you can easily profile you job, find the bottlenecks,
 see https://github.com/apache/spark/pull/2556
 
 Could you try it and show the stats?
 
 Davies


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



parquetFile and wilcards

2014-09-24 Thread Marius Soutier
Hello,

sc.textFile and so on support wildcards in their path, but apparently 
sqlc.parquetFile() does not. I always receive “File 
/file/to/path/*/input.parquet does not exist. Is this normal or a bug? Is 
there are a workaround?

Thanks
- Marius



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: parquetFile and wilcards

2014-09-24 Thread Marius Soutier
Thank you, that works!

On 24.09.2014, at 19:01, Michael Armbrust mich...@databricks.com wrote:

 This behavior is inherited from the parquet input format that we use.  You 
 could list the files manually and pass them as a comma separated list.
 
 On Wed, Sep 24, 2014 at 7:46 AM, Marius Soutier mps@gmail.com wrote:
 Hello,
 
 sc.textFile and so on support wildcards in their path, but apparently 
 sqlc.parquetFile() does not. I always receive “File 
 /file/to/path/*/input.parquet does not exist. Is this normal or a bug? Is 
 there are a workaround?
 
 Thanks
 - Marius
 
 
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 



Re: Serving data

2014-09-16 Thread Marius Soutier
Writing to Parquet and querying the result via SparkSQL works great (except for 
some strange SQL parser errors). However the problem remains, how do I get that 
data back to a dashboard. So I guess I’ll have to use a database after all.


You can batch up data  store into parquet partitions as well.  query it using 
another SparkSQL  shell, JDBC driver in SparkSQL is part 1.1 i believe. 


Re: Serving data

2014-09-15 Thread Marius Soutier
Thank you guys, I’ll try Parquet and if that’s not quick enough I’ll go the 
usual route with either read-only or normal database.

On 13.09.2014, at 12:45, andy petrella andy.petre...@gmail.com wrote:

 however, the cache is not guaranteed to remain, if other jobs are launched in 
 the cluster and require more memory than what's left in the overall caching 
 memory, previous RDDs will be discarded.
 
 Using an off heap cache like tachyon as a dump repo can help.
 
 In general, I'd say that using a persistent sink (like Cassandra for 
 instance) is best.
 
 my .2¢
 
 
 aℕdy ℙetrella
 about.me/noootsab
 
 
 
 On Sat, Sep 13, 2014 at 9:20 AM, Mayur Rustagi mayur.rust...@gmail.com 
 wrote:
 You can cache data in memory  query it using Spark Job Server. 
 Most folks dump data down to a queue/db for retrieval 
 You can batch up data  store into parquet partitions as well.  query it 
 using another SparkSQL  shell, JDBC driver in SparkSQL is part 1.1 i believe. 
 -- 
 Regards,
 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi
 
 
 On Fri, Sep 12, 2014 at 2:54 PM, Marius Soutier mps@gmail.com wrote:
 
 Hi there, 
 
 I’m pretty new to Spark, and so far I’ve written my jobs the same way I wrote 
 Scalding jobs - one-off, read data from HDFS, count words, write counts back 
 to HDFS. 
 
 Now I want to display these counts in a dashboard. Since Spark allows to 
 cache RDDs in-memory and you have to explicitly terminate your app (and 
 there’s even a new JDBC server in 1.1), I’m assuming it’s possible to keep an 
 app running indefinitely and query an in-memory RDD from the outside (via 
 SparkSQL for example). 
 
 Is this how others are using Spark? Or are you just dumping job results into 
 message queues or databases? 
 
 
 Thanks 
 - Marius 
 
 
 - 
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 For additional commands, e-mail: user-h...@spark.apache.org 
 
 
 
 



Re: Serving data

2014-09-15 Thread Marius Soutier
So you are living the dream of using HDFS as a database? ;)

On 15.09.2014, at 13:50, andy petrella andy.petre...@gmail.com wrote:

 I'm using Parquet in ADAM, and I can say that it works pretty fine!
 Enjoy ;-)
 
 aℕdy ℙetrella
 about.me/noootsab
 
 
 
 On Mon, Sep 15, 2014 at 1:41 PM, Marius Soutier mps@gmail.com wrote:
 Thank you guys, I’ll try Parquet and if that’s not quick enough I’ll go the 
 usual route with either read-only or normal database.
 
 On 13.09.2014, at 12:45, andy petrella andy.petre...@gmail.com wrote:
 
 however, the cache is not guaranteed to remain, if other jobs are launched 
 in the cluster and require more memory than what's left in the overall 
 caching memory, previous RDDs will be discarded.
 
 Using an off heap cache like tachyon as a dump repo can help.
 
 In general, I'd say that using a persistent sink (like Cassandra for 
 instance) is best.
 
 my .2¢
 
 
 aℕdy ℙetrella
 about.me/noootsab
 
 
 
 On Sat, Sep 13, 2014 at 9:20 AM, Mayur Rustagi mayur.rust...@gmail.com 
 wrote:
 You can cache data in memory  query it using Spark Job Server. 
 Most folks dump data down to a queue/db for retrieval 
 You can batch up data  store into parquet partitions as well.  query it 
 using another SparkSQL  shell, JDBC driver in SparkSQL is part 1.1 i 
 believe. 
 -- 
 Regards,
 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi
 
 
 On Fri, Sep 12, 2014 at 2:54 PM, Marius Soutier mps@gmail.com wrote:
 
 Hi there, 
 
 I’m pretty new to Spark, and so far I’ve written my jobs the same way I 
 wrote Scalding jobs - one-off, read data from HDFS, count words, write 
 counts back to HDFS. 
 
 Now I want to display these counts in a dashboard. Since Spark allows to 
 cache RDDs in-memory and you have to explicitly terminate your app (and 
 there’s even a new JDBC server in 1.1), I’m assuming it’s possible to keep 
 an app running indefinitely and query an in-memory RDD from the outside (via 
 SparkSQL for example). 
 
 Is this how others are using Spark? Or are you just dumping job results into 
 message queues or databases? 
 
 
 Thanks 
 - Marius 
 
 
 - 
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 For additional commands, e-mail: user-h...@spark.apache.org 
 
 
 
 
 
 



Serving data

2014-09-12 Thread Marius Soutier
Hi there,

I’m pretty new to Spark, and so far I’ve written my jobs the same way I wrote 
Scalding jobs - one-off, read data from HDFS, count words, write counts back to 
HDFS.

Now I want to display these counts in a dashboard. Since Spark allows to cache 
RDDs in-memory and you have to explicitly terminate your app (and there’s even 
a new JDBC server in 1.1), I’m assuming it’s possible to keep an app running 
indefinitely and query an in-memory RDD from the outside (via SparkSQL for 
example).

Is this how others are using Spark? Or are you just dumping job results into 
message queues or databases?


Thanks
- Marius


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org