Re: How to set environment variable for a spark job

2014-03-27 Thread santhoma
Got it finally, pasting it here so that it will be useful for others

val conf = new SparkConf()
 .setJars(jarList);
 conf.setExecutorEnv("ORACLE_HOME", myOraHome)
 conf.setExecutorEnv("SPARK_JAVA_OPTS",
"-Djava.library.path=/my/custom/path")




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-environment-variable-for-a-spark-job-tp3180p3323.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re:

2014-03-27 Thread Sonal Goyal
Hi Hahn,

What's the ulimit on your systems? Please check the following link for a 
discussion on the too many files open.

http://mail-archives.apache.org/mod_mbox/spark-user/201402.mbox/%3ccangvg8qpn_wllsrcjegdb7hmza2ux7myxzhfvtz+b-sdxdk...@mail.gmail.com%3E


Sent from my iPad

> On Mar 27, 2014, at 12:15 PM, Hahn Jiang  wrote:
> 
> Hi, all
> 
> I write a spark program on yarn. When I use small size input file, my program 
> can run well. But my job will failed if input size is more than 40G.
> 
> the error log:
> java.io.FileNotFoundException (java.io.FileNotFoundException: 
> /home/work/data12/yarn/nodemanager/usercache/appcache/application_1392894597330_86813/spark-local-20140327144433-716b/24/shuffle_0_22_890
>  (Too many open files))
> java.io.FileOutputStream.openAppend(Native Method)
> java.io.FileOutputStream.(FileOutputStream.java:192)
> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:113)
> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174)
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164)
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
> scala.collection.Iterator$class.foreach(Iterator.scala:727)
> org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.foreach(ExternalAppendOnlyMap.scala:239)
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
> org.apache.spark.scheduler.Task.run(Task.scala:53)
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> java.lang.Thread.run(Thread.java:662)
> 
> 
> my object:
> object Test {
> 
>   def main(args: Array[String]) {
> val sc = new SparkContext(args(0), "Test",
>   System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass))
> 
> val mg = sc.textFile("/user/.../part-*")
> val mct = sc.textFile("/user/.../part-*")
> 
> val pair1 = mg.map {
>   s =>
> val cols = s.split("\t")
> (cols(0), cols(1))
> }
> val pair2 = mct.map {
>   s =>
> val cols = s.split("\t")
> (cols(0), cols(1))
> }
> val merge = pair1.union(pair2)
> val result = merge.reduceByKey(_ + _)
> val outputPath = new Path("/user/xxx/temp/spark-output")
> outputPath.getFileSystem(new Configuration()).delete(outputPath, true)
> result.saveAsTextFile(outputPath.toString)
> 
> System.exit(0)
>   }
> 
> }
> 
> My spark version is 0.9 and I run my job use this command 
> "/opt/soft/spark/bin/spark-class org.apache.spark.deploy.yarn.Client --jar 
> ./spark-example_2.10-0.1-SNAPSHOT.jar --class Test --queue default --args 
> yarn-standalone --num-workers 500 --master-memory 7g --worker-memory 7g 
> --worker-cores 2"
> 


Re: Configuring distributed caching with Spark and YARN

2014-03-27 Thread santhoma
Curious to know, were you able to do distributed caching for spark?

I have done that for hadoop and pig, but could not find a way to do it in
spark



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-distributed-caching-with-Spark-and-YARN-tp1074p3325.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re:

2014-03-27 Thread Hahn Jiang
I set "ulimit -n 10" in conf/spark-env.sh, is it too small?


On Thu, Mar 27, 2014 at 3:36 PM, Sonal Goyal  wrote:

> Hi Hahn,
>
> What's the ulimit on your systems? Please check the following link for a
> discussion on the too many files open.
>
>
> http://mail-archives.apache.org/mod_mbox/spark-user/201402.mbox/%3ccangvg8qpn_wllsrcjegdb7hmza2ux7myxzhfvtz+b-sdxdk...@mail.gmail.com%3E
>
>
> Sent from my iPad
>
> > On Mar 27, 2014, at 12:15 PM, Hahn Jiang 
> wrote:
> >
> > Hi, all
> >
> > I write a spark program on yarn. When I use small size input file, my
> program can run well. But my job will failed if input size is more than 40G.
> >
> > the error log:
> > java.io.FileNotFoundException (java.io.FileNotFoundException:
> /home/work/data12/yarn/nodemanager/usercache/appcache/application_1392894597330_86813/spark-local-20140327144433-716b/24/shuffle_0_22_890
> (Too many open files))
> > java.io.FileOutputStream.openAppend(Native Method)
> > java.io.FileOutputStream.(FileOutputStream.java:192)
> >
> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:113)
> >
> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174)
> >
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164)
> >
> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
> > scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >
> org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.foreach(ExternalAppendOnlyMap.scala:239)
> >
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
> >
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
> > org.apache.spark.scheduler.Task.run(Task.scala:53)
> >
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
> >
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> > java.lang.Thread.run(Thread.java:662)
> >
> >
> > my object:
> > object Test {
> >
> >   def main(args: Array[String]) {
> > val sc = new SparkContext(args(0), "Test",
> >   System.getenv("SPARK_HOME"),
> SparkContext.jarOfClass(this.getClass))
> >
> > val mg = sc.textFile("/user/.../part-*")
> > val mct = sc.textFile("/user/.../part-*")
> >
> > val pair1 = mg.map {
> >   s =>
> > val cols = s.split("\t")
> > (cols(0), cols(1))
> > }
> > val pair2 = mct.map {
> >   s =>
> > val cols = s.split("\t")
> > (cols(0), cols(1))
> > }
> > val merge = pair1.union(pair2)
> > val result = merge.reduceByKey(_ + _)
> > val outputPath = new Path("/user/xxx/temp/spark-output")
> > outputPath.getFileSystem(new Configuration()).delete(outputPath,
> true)
> > result.saveAsTextFile(outputPath.toString)
> >
> > System.exit(0)
> >   }
> >
> > }
> >
> > My spark version is 0.9 and I run my job use this command
> "/opt/soft/spark/bin/spark-class org.apache.spark.deploy.yarn.Client --jar
> ./spark-example_2.10-0.1-SNAPSHOT.jar --class Test --queue default --args
> yarn-standalone --num-workers 500 --master-memory 7g --worker-memory 7g
> --worker-cores 2"
> >
>


Run spark on mesos remotely

2014-03-27 Thread Wush Wu
Dear all,

We have a spark 0.8.1 cluster on mesos 0.15. It works if I submit the job
from the master of mesos. That is to say, I spawn the spark shell or launch
the scala application on the master of mesos.

However, when I submit the job from another machine, the job will lost. The
logs shows that the mesos does not copy the spark-0.8.1-incubating.tar.gz
to the temporal working directory, so the job lost immediately. Is it
possible to submit the job from the machine not belong to mesos cluster?

Thanks!

Wush


Re: Announcing Spark SQL

2014-03-27 Thread Pascal Voitot Dev
Hi,
Quite interesting!

Suggestion: why not go even fancier & parse SQL queries at compile-time
with a macro ? ;)

Pascal



On Wed, Mar 26, 2014 at 10:58 PM, Michael Armbrust
wrote:

> Hey Everyone,
>
> This already went out to the dev list, but I wanted to put a pointer here
> as well to a new feature we are pretty excited about for Spark 1.0.
>
>
> http://databricks.com/blog/2014/03/26/Spark-SQL-manipulating-structured-data-using-Spark.html
>
> Michael
>


Re: Announcing Spark SQL

2014-03-27 Thread andy petrella
I hijack the thread, but my2c is that this feature is also important to
enable ad-hoc queries which is done at runtime. It doesn't remove interests
for such macro for precompiled jobs of course, but it may not be the first
use case envisioned with this Spark SQL.

Again, only my0.2c (ok I divided by 10 after writing my thoughts ^^)

Andy

On Thu, Mar 27, 2014 at 9:16 AM, Pascal Voitot Dev <
pascal.voitot@gmail.com> wrote:

> Hi,
> Quite interesting!
>
> Suggestion: why not go even fancier & parse SQL queries at compile-time
> with a macro ? ;)
>
> Pascal
>
>
>
> On Wed, Mar 26, 2014 at 10:58 PM, Michael Armbrust  > wrote:
>
>> Hey Everyone,
>>
>> This already went out to the dev list, but I wanted to put a pointer here
>> as well to a new feature we are pretty excited about for Spark 1.0.
>>
>>
>> http://databricks.com/blog/2014/03/26/Spark-SQL-manipulating-structured-data-using-Spark.html
>>
>> Michael
>>
>
>


java.lang.NoClassDefFoundError: org/apache/spark/util/Vector

2014-03-27 Thread Kal El
I am getting this error when I try to run K-Means in spark-0.9.0:
"java.lang.NoClassDefFoundError: org/apache/spark/util/Vector
        at java.lang.Class.getDeclaredMethods0(Native Method)
        at java.lang.Class.privateGetDeclaredMethods(Class.java:2531)
        at java.lang.Class.getMethod0(Class.java:2774)
        at java.lang.Class.getMethod(Class.java:1663)
        at 
scala.tools.nsc.util.ScalaClassLoader$class.run(ScalaClassLoader.scala:67)
        at 
scala.tools.nsc.util.ScalaClassLoader$URLClassLoader.run(ScalaClassLoader.scala:139)
        at scala.tools.nsc.CommonRunner$class.run(ObjectRunner.scala:28)
        at scala.tools.nsc.ObjectRunner$.run(ObjectRunner.scala:45)
        at scala.tools.nsc.CommonRunner$class.runAndCatch(ObjectRunner.scala:35)
        at scala.tools.nsc.ObjectRunner$.runAndCatch(ObjectRunner.scala:45)
        at 
scala.tools.nsc.MainGenericRunner.runTarget$1(MainGenericRunner.scala:74)
        at scala.tools.nsc.MainGenericRunner.process(MainGenericRunner.scala:96)
        at scala.tools.nsc.MainGenericRunner$.main(MainGenericRunner.scala:105)
        at scala.tools.nsc.MainGenericRunner.main(MainGenericRunner.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.util.Vector
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        ... 14 more
"

I have ran the same code on different machines but I haven't seen this error 
before and I haven't found a solution yet.

Thanks

Re: Announcing Spark SQL

2014-03-27 Thread Pascal Voitot Dev
Le 27 mars 2014 09:47, "andy petrella"  a écrit :
>
> I hijack the thread, but my2c is that this feature is also important to
enable ad-hoc queries which is done at runtime. It doesn't remove interests
for such macro for precompiled jobs of course, but it may not be the first
use case envisioned with this Spark SQL.
>

I'm not sure to see what you call "ad- hoc queries"... Any sample?

> Again, only my0.2c (ok I divided by 10 after writing my thoughts ^^)
>
> Andy
>
> On Thu, Mar 27, 2014 at 9:16 AM, Pascal Voitot Dev <
pascal.voitot@gmail.com> wrote:
>>
>> Hi,
>> Quite interesting!
>>
>> Suggestion: why not go even fancier & parse SQL queries at compile-time
with a macro ? ;)
>>
>> Pascal
>>
>>
>>
>> On Wed, Mar 26, 2014 at 10:58 PM, Michael Armbrust <
mich...@databricks.com> wrote:
>>>
>>> Hey Everyone,
>>>
>>> This already went out to the dev list, but I wanted to put a pointer
here as well to a new feature we are pretty excited about for Spark 1.0.
>>>
>>>
http://databricks.com/blog/2014/03/26/Spark-SQL-manipulating-structured-data-using-Spark.html
>>>
>>> Michael
>>
>>
>


Re: Announcing Spark SQL

2014-03-27 Thread andy petrella
I just mean queries sent at runtime ^^, like for any RDBMS.
In our project we have such requirement to have a layer to play with the
data (custom and low level service layer of a lambda arch), and something
like this is interesting.


On Thu, Mar 27, 2014 at 10:15 AM, Pascal Voitot Dev <
pascal.voitot@gmail.com> wrote:

>
> Le 27 mars 2014 09:47, "andy petrella"  a écrit :
>
> >
> > I hijack the thread, but my2c is that this feature is also important to
> enable ad-hoc queries which is done at runtime. It doesn't remove interests
> for such macro for precompiled jobs of course, but it may not be the first
> use case envisioned with this Spark SQL.
> >
>
> I'm not sure to see what you call "ad- hoc queries"... Any sample?
>
> > Again, only my0.2c (ok I divided by 10 after writing my thoughts ^^)
> >
> > Andy
> >
> > On Thu, Mar 27, 2014 at 9:16 AM, Pascal Voitot Dev <
> pascal.voitot@gmail.com> wrote:
> >>
> >> Hi,
> >> Quite interesting!
> >>
> >> Suggestion: why not go even fancier & parse SQL queries at compile-time
> with a macro ? ;)
> >>
> >> Pascal
> >>
> >>
> >>
> >> On Wed, Mar 26, 2014 at 10:58 PM, Michael Armbrust <
> mich...@databricks.com> wrote:
> >>>
> >>> Hey Everyone,
> >>>
> >>> This already went out to the dev list, but I wanted to put a pointer
> here as well to a new feature we are pretty excited about for Spark 1.0.
> >>>
> >>>
> http://databricks.com/blog/2014/03/26/Spark-SQL-manipulating-structured-data-using-Spark.html
> >>>
> >>> Michael
> >>
> >>
> >
>


Re: Change print() in JavaNetworkWordCount

2014-03-27 Thread Eduardo Costa Alfaia

Thank you very much  Sourav

BR

Em 3/26/14, 17:29, Sourav Chandra escreveu:

def print() {
def foreachFunc = (rdd: RDD[T], time: Time) => {
  val total = rdd.collect().toList
  println ("---")
  println ("Time: " + time)
  println ("---")
  total.foreach(println)
//  val first11 = rdd.take(11)
//  println ("---")
//  println ("Time: " + time)
//  println ("---")
//  first11.take(10).foreach(println)
//  if (first11.size > 10) println("...")
  println()
}
new ForEachDStream(this, 
context.sparkContext.clean(foreachFunc)).register()

  }



--
Informativa sulla Privacy: http://www.unibs.it/node/8155


Re: Announcing Spark SQL

2014-03-27 Thread Pascal Voitot Dev
On Thu, Mar 27, 2014 at 10:22 AM, andy petrella wrote:

> I just mean queries sent at runtime ^^, like for any RDBMS.
> In our project we have such requirement to have a layer to play with the
> data (custom and low level service layer of a lambda arch), and something
> like this is interesting.
>
>
Ok that's what I thought! But for these runtime queries, is a macro useful
for you?


>
>
> On Thu, Mar 27, 2014 at 10:15 AM, Pascal Voitot Dev <
> pascal.voitot@gmail.com> wrote:
>
>>
>> Le 27 mars 2014 09:47, "andy petrella"  a écrit
>> :
>>
>> >
>> > I hijack the thread, but my2c is that this feature is also important to
>> enable ad-hoc queries which is done at runtime. It doesn't remove interests
>> for such macro for precompiled jobs of course, but it may not be the first
>> use case envisioned with this Spark SQL.
>> >
>>
>> I'm not sure to see what you call "ad- hoc queries"... Any sample?
>>
>> > Again, only my0.2c (ok I divided by 10 after writing my thoughts ^^)
>> >
>> > Andy
>> >
>> > On Thu, Mar 27, 2014 at 9:16 AM, Pascal Voitot Dev <
>> pascal.voitot@gmail.com> wrote:
>> >>
>> >> Hi,
>> >> Quite interesting!
>> >>
>> >> Suggestion: why not go even fancier & parse SQL queries at
>> compile-time with a macro ? ;)
>> >>
>> >> Pascal
>> >>
>> >>
>> >>
>> >> On Wed, Mar 26, 2014 at 10:58 PM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>> >>>
>> >>> Hey Everyone,
>> >>>
>> >>> This already went out to the dev list, but I wanted to put a pointer
>> here as well to a new feature we are pretty excited about for Spark 1.0.
>> >>>
>> >>>
>> http://databricks.com/blog/2014/03/26/Spark-SQL-manipulating-structured-data-using-Spark.html
>> >>>
>> >>> Michael
>> >>
>> >>
>> >
>>
>
>


Re: Announcing Spark SQL

2014-03-27 Thread andy petrella
nope (what I said :-P)


On Thu, Mar 27, 2014 at 11:05 AM, Pascal Voitot Dev <
pascal.voitot@gmail.com> wrote:

>
>
>
> On Thu, Mar 27, 2014 at 10:22 AM, andy petrella 
> wrote:
>
>> I just mean queries sent at runtime ^^, like for any RDBMS.
>> In our project we have such requirement to have a layer to play with the
>> data (custom and low level service layer of a lambda arch), and something
>> like this is interesting.
>>
>>
> Ok that's what I thought! But for these runtime queries, is a macro useful
> for you?
>
>
>>
>>
>> On Thu, Mar 27, 2014 at 10:15 AM, Pascal Voitot Dev <
>> pascal.voitot@gmail.com> wrote:
>>
>>>
>>> Le 27 mars 2014 09:47, "andy petrella"  a
>>> écrit :
>>>
>>> >
>>> > I hijack the thread, but my2c is that this feature is also important
>>> to enable ad-hoc queries which is done at runtime. It doesn't remove
>>> interests for such macro for precompiled jobs of course, but it may not be
>>> the first use case envisioned with this Spark SQL.
>>> >
>>>
>>> I'm not sure to see what you call "ad- hoc queries"... Any sample?
>>>
>>> > Again, only my0.2c (ok I divided by 10 after writing my thoughts ^^)
>>> >
>>> > Andy
>>> >
>>> > On Thu, Mar 27, 2014 at 9:16 AM, Pascal Voitot Dev <
>>> pascal.voitot@gmail.com> wrote:
>>> >>
>>> >> Hi,
>>> >> Quite interesting!
>>> >>
>>> >> Suggestion: why not go even fancier & parse SQL queries at
>>> compile-time with a macro ? ;)
>>> >>
>>> >> Pascal
>>> >>
>>> >>
>>> >>
>>> >> On Wed, Mar 26, 2014 at 10:58 PM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>> >>>
>>> >>> Hey Everyone,
>>> >>>
>>> >>> This already went out to the dev list, but I wanted to put a pointer
>>> here as well to a new feature we are pretty excited about for Spark 1.0.
>>> >>>
>>> >>>
>>> http://databricks.com/blog/2014/03/26/Spark-SQL-manipulating-structured-data-using-Spark.html
>>> >>>
>>> >>> Michael
>>> >>
>>> >>
>>> >
>>>
>>
>>
>


Re: Announcing Spark SQL

2014-03-27 Thread Pascal Voitot Dev
On Thu, Mar 27, 2014 at 11:08 AM, andy petrella wrote:

> nope (what I said :-P)
>

That's also my answer to my own question :D

but I didn't understand that in your sentence: "my2c is that this feature
is also important to enable ad-hoc queries which is done at runtime."


>
>
> On Thu, Mar 27, 2014 at 11:05 AM, Pascal Voitot Dev <
> pascal.voitot@gmail.com> wrote:
>
>>
>>
>>
>> On Thu, Mar 27, 2014 at 10:22 AM, andy petrella 
>> wrote:
>>
>>> I just mean queries sent at runtime ^^, like for any RDBMS.
>>> In our project we have such requirement to have a layer to play with the
>>> data (custom and low level service layer of a lambda arch), and something
>>> like this is interesting.
>>>
>>>
>> Ok that's what I thought! But for these runtime queries, is a macro
>> useful for you?
>>
>>
>>>
>>>
>>> On Thu, Mar 27, 2014 at 10:15 AM, Pascal Voitot Dev <
>>> pascal.voitot@gmail.com> wrote:
>>>

 Le 27 mars 2014 09:47, "andy petrella"  a
 écrit :

 >
 > I hijack the thread, but my2c is that this feature is also important
 to enable ad-hoc queries which is done at runtime. It doesn't remove
 interests for such macro for precompiled jobs of course, but it may not be
 the first use case envisioned with this Spark SQL.
 >

 I'm not sure to see what you call "ad- hoc queries"... Any sample?

 > Again, only my0.2c (ok I divided by 10 after writing my thoughts ^^)
 >
 > Andy
 >
 > On Thu, Mar 27, 2014 at 9:16 AM, Pascal Voitot Dev <
 pascal.voitot@gmail.com> wrote:
 >>
 >> Hi,
 >> Quite interesting!
 >>
 >> Suggestion: why not go even fancier & parse SQL queries at
 compile-time with a macro ? ;)
 >>
 >> Pascal
 >>
 >>
 >>
 >> On Wed, Mar 26, 2014 at 10:58 PM, Michael Armbrust <
 mich...@databricks.com> wrote:
 >>>
 >>> Hey Everyone,
 >>>
 >>> This already went out to the dev list, but I wanted to put a
 pointer here as well to a new feature we are pretty excited about for Spark
 1.0.
 >>>
 >>>
 http://databricks.com/blog/2014/03/26/Spark-SQL-manipulating-structured-data-using-Spark.html
 >>>
 >>> Michael
 >>
 >>
 >

>>>
>>>
>>
>


Re: Running a task once on each executor

2014-03-27 Thread deenar.toraskar
Hi Christopher

>>which you would invoke as TaskNonce.getSingleton().doThisOnce() from
within the map closure.

Say I have a cluster with 24 workers (one thread per worker
SPARK_WORKER_CORES). My application would have 24 executors each with its
own VM.

The RDDs i process have millions of rows and many partitions. I could do

rdd.mapPartitions and it would still work as code would only be executed
once in each VM, but was wondering if there is more efficient way of doing
this by using a generated RDD with one partition per executor.

Also when I want to return some stats from each executor rdd.mapPartitions
would return multiple results.

Deenar





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-a-task-once-on-each-executor-tp3203p3337.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Announcing Spark SQL

2014-03-27 Thread yana
Does Shark not suit your needs? That's what we use at the moment and it's been 
good


Sent from my Samsung Galaxy S®4

 Original message 
From: andy petrella  
Date:03/27/2014  6:08 AM  (GMT-05:00) 
To: user@spark.apache.org 
Subject: Re: Announcing Spark SQL 

nope (what I said :-P)


On Thu, Mar 27, 2014 at 11:05 AM, Pascal Voitot Dev 
 wrote:



On Thu, Mar 27, 2014 at 10:22 AM, andy petrella  wrote:
I just mean queries sent at runtime ^^, like for any RDBMS.
In our project we have such requirement to have a layer to play with the data 
(custom and low level service layer of a lambda arch), and something like this 
is interesting.


Ok that's what I thought! But for these runtime queries, is a macro useful for 
you?
 


On Thu, Mar 27, 2014 at 10:15 AM, Pascal Voitot Dev 
 wrote:

Le 27 mars 2014 09:47, "andy petrella"  a écrit :


>
> I hijack the thread, but my2c is that this feature is also important to 
> enable ad-hoc queries which is done at runtime. It doesn't remove interests 
> for such macro for precompiled jobs of course, but it may not be the first 
> use case envisioned with this Spark SQL.
>
I'm not sure to see what you call "ad- hoc queries"... Any sample?

> Again, only my0.2c (ok I divided by 10 after writing my thoughts ^^)
>
> Andy
>
> On Thu, Mar 27, 2014 at 9:16 AM, Pascal Voitot Dev 
>  wrote:
>>
>> Hi,
>> Quite interesting!
>>
>> Suggestion: why not go even fancier & parse SQL queries at compile-time with 
>> a macro ? ;)
>>
>> Pascal
>>
>>
>>
>> On Wed, Mar 26, 2014 at 10:58 PM, Michael Armbrust  
>> wrote:
>>>
>>> Hey Everyone,
>>>
>>> This already went out to the dev list, but I wanted to put a pointer here 
>>> as well to a new feature we are pretty excited about for Spark 1.0.
>>>
>>> http://databricks.com/blog/2014/03/26/Spark-SQL-manipulating-structured-data-using-Spark.html
>>>
>>> Michael
>>
>>
>





Re: Announcing Spark SQL

2014-03-27 Thread andy petrella
Yes it could, of course. I didn't say that there is no tool to do it,
though ;-).

Andy


On Thu, Mar 27, 2014 at 12:49 PM, yana  wrote:

> Does Shark not suit your needs? That's what we use at the moment and it's
> been good
>
>
> Sent from my Samsung Galaxy S®4
>
>
>  Original message 
> From: andy petrella
> Date:03/27/2014 6:08 AM (GMT-05:00)
> To: user@spark.apache.org
> Subject: Re: Announcing Spark SQL
>
> nope (what I said :-P)
>
>
> On Thu, Mar 27, 2014 at 11:05 AM, Pascal Voitot Dev <
> pascal.voitot@gmail.com> wrote:
>
>>
>>
>>
>> On Thu, Mar 27, 2014 at 10:22 AM, andy petrella 
>> wrote:
>>
>>> I just mean queries sent at runtime ^^, like for any RDBMS.
>>> In our project we have such requirement to have a layer to play with the
>>> data (custom and low level service layer of a lambda arch), and something
>>> like this is interesting.
>>>
>>>
>> Ok that's what I thought! But for these runtime queries, is a macro
>> useful for you?
>>
>>
>>>
>>>
>>> On Thu, Mar 27, 2014 at 10:15 AM, Pascal Voitot Dev <
>>> pascal.voitot@gmail.com> wrote:
>>>

 Le 27 mars 2014 09:47, "andy petrella"  a
 écrit :

 >
 > I hijack the thread, but my2c is that this feature is also important
 to enable ad-hoc queries which is done at runtime. It doesn't remove
 interests for such macro for precompiled jobs of course, but it may not be
 the first use case envisioned with this Spark SQL.
 >

 I'm not sure to see what you call "ad- hoc queries"... Any sample?

 > Again, only my0.2c (ok I divided by 10 after writing my thoughts ^^)
 >
 > Andy
 >
 > On Thu, Mar 27, 2014 at 9:16 AM, Pascal Voitot Dev <
 pascal.voitot@gmail.com> wrote:
 >>
 >> Hi,
 >> Quite interesting!
 >>
 >> Suggestion: why not go even fancier & parse SQL queries at
 compile-time with a macro ? ;)
 >>
 >> Pascal
 >>
 >>
 >>
 >> On Wed, Mar 26, 2014 at 10:58 PM, Michael Armbrust <
 mich...@databricks.com> wrote:
 >>>
 >>> Hey Everyone,
 >>>
 >>> This already went out to the dev list, but I wanted to put a
 pointer here as well to a new feature we are pretty excited about for Spark
 1.0.
 >>>
 >>>
 http://databricks.com/blog/2014/03/26/Spark-SQL-manipulating-structured-data-using-Spark.html
 >>>
 >>> Michael
 >>
 >>
 >

>>>
>>>
>>
>


Re: Announcing Spark SQL

2014-03-27 Thread Pascal Voitot Dev
when there is something new, it's also cool to let imagination fly far away
;)


On Thu, Mar 27, 2014 at 2:20 PM, andy petrella wrote:

> Yes it could, of course. I didn't say that there is no tool to do it,
> though ;-).
>
> Andy
>
>
> On Thu, Mar 27, 2014 at 12:49 PM, yana  wrote:
>
>> Does Shark not suit your needs? That's what we use at the moment and it's
>> been good
>>
>>
>> Sent from my Samsung Galaxy S®4
>>
>>
>>  Original message 
>> From: andy petrella
>> Date:03/27/2014 6:08 AM (GMT-05:00)
>> To: user@spark.apache.org
>> Subject: Re: Announcing Spark SQL
>>
>> nope (what I said :-P)
>>
>>
>> On Thu, Mar 27, 2014 at 11:05 AM, Pascal Voitot Dev <
>> pascal.voitot@gmail.com> wrote:
>>
>>>
>>>
>>>
>>> On Thu, Mar 27, 2014 at 10:22 AM, andy petrella >> > wrote:
>>>
 I just mean queries sent at runtime ^^, like for any RDBMS.
 In our project we have such requirement to have a layer to play with
 the data (custom and low level service layer of a lambda arch), and
 something like this is interesting.


>>> Ok that's what I thought! But for these runtime queries, is a macro
>>> useful for you?
>>>
>>>


 On Thu, Mar 27, 2014 at 10:15 AM, Pascal Voitot Dev <
 pascal.voitot@gmail.com> wrote:

>
> Le 27 mars 2014 09:47, "andy petrella"  a
> écrit :
>
> >
> > I hijack the thread, but my2c is that this feature is also important
> to enable ad-hoc queries which is done at runtime. It doesn't remove
> interests for such macro for precompiled jobs of course, but it may not be
> the first use case envisioned with this Spark SQL.
> >
>
> I'm not sure to see what you call "ad- hoc queries"... Any sample?
>
> > Again, only my0.2c (ok I divided by 10 after writing my thoughts ^^)
> >
> > Andy
> >
> > On Thu, Mar 27, 2014 at 9:16 AM, Pascal Voitot Dev <
> pascal.voitot@gmail.com> wrote:
> >>
> >> Hi,
> >> Quite interesting!
> >>
> >> Suggestion: why not go even fancier & parse SQL queries at
> compile-time with a macro ? ;)
> >>
> >> Pascal
> >>
> >>
> >>
> >> On Wed, Mar 26, 2014 at 10:58 PM, Michael Armbrust <
> mich...@databricks.com> wrote:
> >>>
> >>> Hey Everyone,
> >>>
> >>> This already went out to the dev list, but I wanted to put a
> pointer here as well to a new feature we are pretty excited about for 
> Spark
> 1.0.
> >>>
> >>>
> http://databricks.com/blog/2014/03/26/Spark-SQL-manipulating-structured-data-using-Spark.html
> >>>
> >>> Michael
> >>
> >>
> >
>


>>>
>>
>


WikipediaPageRank Data Set

2014-03-27 Thread Niko Stahl
Hello,

I would like to run the
WikipediaPageRankexample,
but the Wikipedia dump XML files are no longer available on
Freebase. Does anyone know an alternative source for the data?

Thanks,
Niko


spark streaming: what is awaitTermination()?

2014-03-27 Thread Diana Carroll
The API docs for ssc.awaitTermination say simply "Wait for the execution to
stop. Any exceptions that occurs during the execution will be thrown in
this thread."

Can someone help me understand what this means?  What causes execution to
stop?  Why do we need to wait for that to happen?

I tried removing it from my simple NetworkWordCount example (running
locally, not on a cluster) and nothing changed.  In both cases, I end my
program by hitting Ctrl-C.

Thanks for any insight you can give me.

Diana


function state lost when next RDD is processed

2014-03-27 Thread Adrian Mocanu
Is there a way to pass a custom function to spark to run it on the entire 
stream? For example, say I have a function which sums up values in each RDD and 
then across RDDs.

I've tried with map, transform, reduce. They all apply my sum function on 1 
RDD. When the next RDD comes the function starts from 0 so the sum of the 
previous RDD is lost.

Does Spark support a way of passing a custom function so that its state is 
preserved across RDDs and not only within RDD?

Thanks
-Adrian



spark streaming and the spark shell

2014-03-27 Thread Diana Carroll
I'm working with spark streaming using spark-shell, and hoping folks could
answer a few questions I have.

I'm doing WordCount on a socket stream:

import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.Seconds
var ssc = new StreamingContext(sc,Seconds(5))
var mystream = ssc.socketTextStream("localhost",)
var words = mystream.flatMap(line => line.split(" "))
var wordCounts = words.map(x => (x, 1)).reduceByKey((x,y) => x+y)
wordCounts.print()
ssc.start()



1.  I'm assuming that using spark shell is an edge case, and that spark
streaming is really intended mostly for batch use.  True?

2.   I notice that once I start ssc.start(), my stream starts processing
and continues indefinitely...even if I close the socket on the server end
(I'm using unix command "nc" to mimic a server as explained in the
streaming programming guide .)  Can I tell my stream to detect if it's lost
a connection and therefore stop executing?  (Or even better, to attempt to
re-establish the connection?)

3.  I tried entering ssc.stop which resulted in an error:

Exception in thread "Thread-43" org.apache.spark.SparkException: Job
cancelled because SparkContext was shut down
14/03/27 07:36:13 ERROR ConnectionManager: Corresponding
SendingConnectionManagerId not found

But it did stop the DStream execution.

4.  Then I tried restarting the ssc again (ssc.start) and got another error:
org.apache.spark.SparkException: JobScheduler already started
Is restarting an ssc supported?

5.  When I perform an operation like wordCounts.print(), that operation
will execution on each batch, ever n seconds.  Is there a way I can undo
that operation?  That is, I want it to *stop* executing that print ever n
seconds...without having to stop the stream.

What I'm really asking is...can I explore DStreams interactively the way I
can explore my data in regular Spark.  In regular Spark, I might perform
various operations on an RDD to see what happens.  So at first, I might
have used "split(" ") to tokenize my input text, but now I want to try
using split(",") instead, after the stream has already started running.
 Can I do that?

I did find out that if add a new operation to an existing dstream (say,
words.print()) *after *the ssc.start it works. It *will* add the second
print() call to the execution list every n seconds.

but if I try to add new dstreams, e.g.
...

ssc.start()

var testpairs = words.map(x => (x, "TEST"))
testpairs.print()


I get an error:

14/03/27 07:57:50 ERROR JobScheduler: Error generating jobs for time
139593227 ms
java.lang.Exception:
org.apache.spark.streaming.dstream.MappedDStream@84f0f92 has not been
initialized


Is this sort of interactive use just not supported?

Thanks!

Diana


Spark powered wikipedia analysis and exploration

2014-03-27 Thread Guillaume Pitel

  
  
Hi Spark users,

I don't know if it's the right place to announce it, but Spark has a
new visible use case through a demo we put online here :

http://wikinsights.org

It allows you to explore the English Wikipedia with a few added
benefits from our proprietary semantic and relations analysis
method, so that you can see similar pages (based on text content or
links), see the most relevant words for a page, and other stuff.

Spark is used for the processing of the English Wikipedia, and for
the computation. It takes about 30 minutes for three iterations of
our method on the whole 4.4M documents * 2.1M words matrix, on a
smallish  cluster of 7 nodes with 4 core, 32GB RAM.

Any feedback is welcome (except on the aesthetic aspect, we already
know the UI is really bad)

Enjoy exploring Wikipedia in your spare time :)

Guillaume
-- 
  
  

  

  

  


Guillaume
PITEL, Président 
  +33(0)6 25 48 86 80
 
eXenSa
S.A.S. 
 41, rue Périer -
92120 Montrouge - FRANCE 
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37
05   

  

  

  

  



GC overhead limit exceeded

2014-03-27 Thread Sai Prasanna
"java.lang.OutOfMemoryError: GC overhead limit exceeded"

What is the problem. The same code, i run, one instance it runs in 8
second, next time it takes really long time, say 300-500 seconds...
I see the logs a lot of GC overhead limit exceeded is seen. What should be
done ??

Please can someone throw some light on it ??



-- 
*Sai Prasanna. AN*
*II M.Tech (CS), SSSIHL*


*Entire water in the ocean can never sink a ship, Unless it gets inside.All
the pressures of life can never hurt you, Unless you let them in.*


Re: GC overhead limit exceeded

2014-03-27 Thread Sean Owen
This is another way of Java saying "you ran out of heap space". As
less and less room is available, the GC kicks in more often, freeing
less each time. Before the very last byte of memory is gone, Java may
declare defeat. That's why it's taking so long, and you simply need a
larger heap in whatever is throwing this error.
--
Sean Owen | Director, Data Science | London


On Thu, Mar 27, 2014 at 4:21 PM, Sai Prasanna  wrote:
> "java.lang.OutOfMemoryError: GC overhead limit exceeded"
>
> What is the problem. The same code, i run, one instance it runs in 8 second,
> next time it takes really long time, say 300-500 seconds...
> I see the logs a lot of GC overhead limit exceeded is seen. What should be
> done ??
>
> Please can someone throw some light on it ??
>
>
>
> --
> Sai Prasanna. AN
> II M.Tech (CS), SSSIHL
>
> Entire water in the ocean can never sink a ship, Unless it gets inside.
> All the pressures of life can never hurt you, Unless you let them in.


StreamingContext.transform on a DStream

2014-03-27 Thread Adrian Mocanu
Found this transform fn in StreamingContext which takes in a DStream[_] and a 
function which acts on each of its RDDs
Unfortunately I can't figure out how to transform my DStream[(String,Int)] into 
DStream[_]


/*** Create a new DStream in which each RDD is generated by applying a function 
on RDDs of the DStreams. */
  def transform[T: ClassTag](
  dstreams: Seq[DStream[_]],
  transformFunc: (Seq[RDD[_]], Time) => RDD[T]
): DStream[T] = {
new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
  }

-Adrian



RE: StreamingContext.transform on a DStream

2014-03-27 Thread Adrian Mocanu
Please disregard I didn't see the Seq wrapper.

From: Adrian Mocanu [mailto:amoc...@verticalscope.com]
Sent: March-27-14 11:57 AM
To: u...@spark.incubator.apache.org
Subject: StreamingContext.transform on a DStream

Found this transform fn in StreamingContext which takes in a DStream[_] and a 
function which acts on each of its RDDs
Unfortunately I can't figure out how to transform my DStream[(String,Int)] into 
DStream[_]


/*** Create a new DStream in which each RDD is generated by applying a function 
on RDDs of the DStreams. */
  def transform[T: ClassTag](
  dstreams: Seq[DStream[_]],
  transformFunc: (Seq[RDD[_]], Time) => RDD[T]
): DStream[T] = {
new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
  }

-Adrian



Re: Running a task once on each executor

2014-03-27 Thread dmpour23
How exactly does rdd.mapPartitions  be executed once in each VM?

I am running  mapPartitions and the call function seems not to execute the
code?

JavaPairRDD twos = input.map(new
Split()).sortByKey().partitionBy(new HashPartitioner(k));
twos.values().saveAsTextFile(args[2]);

JavaRDD ls = twos.values().mapPartitions(new
FlatMapFunction, String>() {
@Override
public Iterable call(Iterator arg0) throws Exception {
   System.out.println("Usage should call my jar once: " + arg0);
   return Lists.newArrayList();}
});



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-a-task-once-on-each-executor-tp3203p3353.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: GC overhead limit exceeded

2014-03-27 Thread Ognen Duzlevski
Look at the tuning guide on Spark's webpage for strategies to cope with 
this.
I have run into quite a few memory issues like these, some are resolved 
by changing the StorageLevel strategy and employing things like Kryo, 
some are solved by specifying the number of tasks to break down a given 
operation into etc.


Ognen

On 3/27/14, 10:21 AM, Sai Prasanna wrote:

"java.lang.OutOfMemoryError: GC overhead limit exceeded"

What is the problem. The same code, i run, one instance it runs in 8 
second, next time it takes really long time, say 300-500 seconds...
I see the logs a lot of GC overhead limit exceeded is seen. What 
should be done ??


Please can someone throw some light on it ??



--
*Sai Prasanna. AN*
*II M.Tech (CS), SSSIHL*
*
Entire water in the ocean can never sink a ship, Unless it gets inside.
All the pressures of life can never hurt you, Unless you let them in.*




Re: Running a task once on each executor

2014-03-27 Thread Christopher Nguyen
Deenar, dmpour is correct in that there's a many-to-many mapping between
executors and partitions (an executor can be assigned multiple partitions,
and a given partition can in principle move a different executor).

I'm not sure why you seem to require this problem statement to be solved
with RDDs. It is fairly easy to have something executed once per JVM, using
the pattern I suggested. Is there some other requirement I have missed?

Sent while mobile. Pls excuse typos etc.
On Mar 27, 2014 9:06 AM, "dmpour23"  wrote:

> How exactly does rdd.mapPartitions  be executed once in each VM?
>
> I am running  mapPartitions and the call function seems not to execute the
> code?
>
> JavaPairRDD twos = input.map(new
> Split()).sortByKey().partitionBy(new HashPartitioner(k));
> twos.values().saveAsTextFile(args[2]);
>
> JavaRDD ls = twos.values().mapPartitions(new
> FlatMapFunction, String>() {
> @Override
> public Iterable call(Iterator arg0) throws Exception {
>System.out.println("Usage should call my jar once: " + arg0);
>return Lists.newArrayList();}
> });
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Running-a-task-once-on-each-executor-tp3203p3353.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Spark Streaming + Kafka + Mesos/Marathon strangeness

2014-03-27 Thread Scott Clasen
I think now that this is because spark.local.dir is defaulting to /tmp, and
since the tasks are not running on the same machine, the file is not found
when the second task takes over.

How do you set spark.local.dir appropriately when running on mesos?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Kafka-Mesos-Marathon-strangeness-tp3285p3356.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: GC overhead limit exceeded

2014-03-27 Thread Andrew Or
Are you caching a lot of RDD's? If so, maybe you should unpersist() the
ones that you're not using. Also, if you're on 0.9, make sure
spark.shuffle.spill is enabled (which it is by default). This allows your
application to spill in-memory content to disk if necessary.

How much memory are you giving to your executors? The default,
spark.executor.memory is 512m, which is quite low. Consider raising this.
Checking the web UI is a good way to figure out your runtime memory usage.


On Thu, Mar 27, 2014 at 9:22 AM, Ognen Duzlevski <
og...@plainvanillagames.com> wrote:

>  Look at the tuning guide on Spark's webpage for strategies to cope with
> this.
> I have run into quite a few memory issues like these, some are resolved by
> changing the StorageLevel strategy and employing things like Kryo, some are
> solved by specifying the number of tasks to break down a given operation
> into etc.
>
> Ognen
>
>
> On 3/27/14, 10:21 AM, Sai Prasanna wrote:
>
> "java.lang.OutOfMemoryError: GC overhead limit exceeded"
>
>  What is the problem. The same code, i run, one instance it runs in 8
> second, next time it takes really long time, say 300-500 seconds...
> I see the logs a lot of GC overhead limit exceeded is seen. What should be
> done ??
>
>  Please can someone throw some light on it ??
>
>
>
>  --
>  *Sai Prasanna. AN*
> *II M.Tech (CS), SSSIHL*
>
>
> * Entire water in the ocean can never sink a ship, Unless it gets inside.
> All the pressures of life can never hurt you, Unless you let them in.*
>
>
>


KafkaInputDStream mapping of partitions to tasks

2014-03-27 Thread Scott Clasen
I have a simple streaming job that creates a kafka input stream on a topic
with 8 partitions, and does a forEachRDD

The job and tasks are running on mesos, and there are two tasks running, but
only 1 task doing anything.

I also set spark.streaming.concurrentJobs=8  but still there is only 1 task
doing work. I would have expected that each task took a subset of the
partitions.

Is there a way to make more than one task share the work here?  Are my
expectations off here?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: YARN problem using an external jar in worker nodes Inbox x

2014-03-27 Thread Sung Hwan Chung
Yea it's in a standalone mode and I did use SparkContext.addJar method and
tried setting setExecutorEnv "SPARK_CLASSPATH", etc. but none of it worked.

I finally made it work by modifying the ClientBase.scala code where I set
'appMasterOnly' to false before the addJars contents were added to
distCacheMgr. But this is not what I should be doing, right?

Is there a problem with addJar method in 0.9.0?


On Wed, Mar 26, 2014 at 1:47 PM, Sandy Ryza  wrote:

> Hi Sung,
>
> Are you using yarn-standalone mode?  Have you specified the --addJars
> option with your external jars?
>
> -Sandy
>
>
> On Wed, Mar 26, 2014 at 1:17 PM, Sung Hwan Chung  > wrote:
>
>> Hello, (this is Yarn related)
>>
>> I'm able to load an external jar and use its classes within
>> ApplicationMaster. I wish to use this jar within worker nodes, so I added
>> sc.addJar(pathToJar) and ran.
>>
>> I get the following exception:
>>
>> org.apache.spark.SparkException: Job aborted: Task 0.0:1 failed 4 times 
>> (most recent failure: Exception failure: java.lang.NoClassDefFoundError: 
>> org/opencv/objdetect/HOGDescriptor)
>> Job aborted: Task 0.0:1 failed 4 times (most recent failure: Exception 
>> failure: java.lang.NoClassDefFoundError: org/opencv/objdetect/HOGDescriptor)
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>> scala.Option.foreach(Option.scala:236)
>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>> akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>> akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>>
>> And in worker node containers' stderr log (nothing in stdout log), I
>> don't see any reference to loading jars:
>>
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in 
>> [jar:file:/home/gpphddata/1/yarn/nm-local-dir/usercache/yarn/filecache/7394400996676014282/spark-assembly-0.9.0-incubating-hadoop2.0.2-alpha-gphd-2.0.1.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in 
>> [jar:file:/usr/lib/gphd/hadoop-2.0.2_alpha_gphd_2_0_1_0/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> 14/03/26 13:12:18 INFO slf4j.Slf4jLogger: Slf4jLogger started
>> 14/03/26 13:12:18 INFO Remoting: Starting remoting
>> 14/03/26 13:12:18 INFO Remoting: Remoting started; listening on addresses 
>> :[akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006]
>> 14/03/26 13:12:18 INFO Remoting: Remoting now listens on addresses: 
>> [akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006]
>> 14/03/26 13:12:18 INFO executor.CoarseGrainedExecutorBackend: Connecting to 
>> driver: 
>> akka.tcp://spark@alpinenode5.alpinenow.local:10314/user/CoarseGrainedScheduler
>> 14/03/26 13:12:18 ERROR executor.CoarseGrainedExecutorBackend: Driver 
>> Disassociated [akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006] 
>> -> [akka.tcp://spark@alpinenode5.alpinenow.local:10314] disassociated! 
>> Shutting down.
>>
>>
>>
>>
>>
>>
>> Any idea what's going on?
>>
>>
>


Re: GC overhead limit exceeded

2014-03-27 Thread Sai Prasanna
No i am running on 0.8.1.
Yes i am caching a lot, i am benchmarking a simple code in spark where in
512mb, 1g and 2g text files are taken, some basic intermediate operations
are done while the intermediate result which will be used in subsequent
operations are cached.

I thought that, we need not manually unpersist, if i need to cache
something and if cache is found full, automatically space will be created
by evacuating the earlier. Do i need to unpersist?

Moreover, if i run several times, will the previously cached RDDs still
remain in the cache? If so can i flush them manually out before the next
run? [something like complete cache flush]


On Thu, Mar 27, 2014 at 11:16 PM, Andrew Or  wrote:

> Are you caching a lot of RDD's? If so, maybe you should unpersist() the
> ones that you're not using. Also, if you're on 0.9, make sure
> spark.shuffle.spill is enabled (which it is by default). This allows your
> application to spill in-memory content to disk if necessary.
>
> How much memory are you giving to your executors? The default,
> spark.executor.memory is 512m, which is quite low. Consider raising this.
> Checking the web UI is a good way to figure out your runtime memory usage.
>
>
> On Thu, Mar 27, 2014 at 9:22 AM, Ognen Duzlevski <
> og...@plainvanillagames.com> wrote:
>
>>  Look at the tuning guide on Spark's webpage for strategies to cope with
>> this.
>> I have run into quite a few memory issues like these, some are resolved
>> by changing the StorageLevel strategy and employing things like Kryo, some
>> are solved by specifying the number of tasks to break down a given
>> operation into etc.
>>
>> Ognen
>>
>>
>> On 3/27/14, 10:21 AM, Sai Prasanna wrote:
>>
>> "java.lang.OutOfMemoryError: GC overhead limit exceeded"
>>
>>  What is the problem. The same code, i run, one instance it runs in 8
>> second, next time it takes really long time, say 300-500 seconds...
>> I see the logs a lot of GC overhead limit exceeded is seen. What should
>> be done ??
>>
>>  Please can someone throw some light on it ??
>>
>>
>>
>>  --
>>  *Sai Prasanna. AN*
>> *II M.Tech (CS), SSSIHL*
>>
>>
>> * Entire water in the ocean can never sink a ship, Unless it gets inside.
>> All the pressures of life can never hurt you, Unless you let them in.*
>>
>>
>>
>


-- 
*Sai Prasanna. AN*
*II M.Tech (CS), SSSIHL*


*Entire water in the ocean can never sink a ship, Unless it gets inside.All
the pressures of life can never hurt you, Unless you let them in.*


Re: Announcing Spark SQL

2014-03-27 Thread Patrick Wendell
Hey Rohit,

I think external tables based on Cassandra or other datastores will work
out-of-the box if you build Catalyst with Hive support.

Michael may have feelings about this but I'd guess the longer term design
for having schema support for Cassandra/HBase etc likely wouldn't rely on
hive external tables because it's an unnecessary layer of indirection.

Spark should be able to directly load an SchemaRDD from Cassandra by just
letting the user give relevant information about the Cassandra schema. And
it should let you write-back to Cassandra by giving a mapping of fields to
the respective cassandra columns. I think all of this would be fairly easy
to implement on SchemaRDD and likely will make it into Spark 1.1

- Patrick


On Wed, Mar 26, 2014 at 10:59 PM, Rohit Rai  wrote:

> Great work guys! Have been looking forward to this . . .
>
> In the blog it mentions support for reading from Hbase/Avro... What will
> be the recommended approach for this? Will it be writing custom wrappers
> for SQLContext like in HiveContext or using Hive's "EXTERNAL TABLE" support?
>
> I ask this because a few days back (based on your pull request in github)
> I started analyzing what it would take to support Spark SQL on Cassandra.
> One obvious approach will be to use Hive External Table support with our
> cassandra-hive handler. But second approach sounds tempting as it will give
> more fidelity.
>
> Regards,
> Rohit
>
> *Founder & CEO, **Tuplejump, Inc.*
> 
> www.tuplejump.com
> *The Data Engineering Platform*
>
>
> On Thu, Mar 27, 2014 at 9:12 AM, Michael Armbrust 
> wrote:
>
>> Any plans to make the SQL typesafe using something like Slick (
>>> http://slick.typesafe.com/)
>>>
>>
>> I would really like to do something like that, and maybe we will in a
>> couple of months. However, in the near term, I think the top priorities are
>> going to be performance and stability.
>>
>> Michael
>>
>
>


Re: KafkaInputDStream mapping of partitions to tasks

2014-03-27 Thread Patrick Wendell
If you call repartition() on the original stream you can set the level of
parallelism after it's ingested from Kafka. I'm not sure how it maps kafka
topic partitions to tasks for the ingest thought.


On Thu, Mar 27, 2014 at 11:09 AM, Scott Clasen wrote:

> I have a simple streaming job that creates a kafka input stream on a topic
> with 8 partitions, and does a forEachRDD
>
> The job and tasks are running on mesos, and there are two tasks running,
> but
> only 1 task doing anything.
>
> I also set spark.streaming.concurrentJobs=8  but still there is only 1 task
> doing work. I would have expected that each task took a subset of the
> partitions.
>
> Is there a way to make more than one task share the work here?  Are my
> expectations off here?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: spark streaming: what is awaitTermination()?

2014-03-27 Thread Tathagata Das
The execution of Spark Streaming (started with
StreamingContext.start()) can stop in two ways.
1. steamingContext.stop() is called (could be from a different thread)
2. some exception occurs in the processing of data.

awaitTermination is the right way for the main thread that started the
context to stay blocked, so that processing continues in the
background threads. The reason why removing awaitTermination is making
no difference because there is a bug in 0.9.0 that causes the main
function to not terminate even though the main thread has terminated
(one of the background thread is non-daemon). Also, without
awaitTermination, it is very hard to catch and print exceptions that
occur during the background data processing.

TD


On Thu, Mar 27, 2014 at 7:02 AM, Diana Carroll  wrote:
> The API docs for ssc.awaitTermination say simply "Wait for the execution to
> stop. Any exceptions that occurs during the execution will be thrown in this
> thread."
>
> Can someone help me understand what this means?  What causes execution to
> stop?  Why do we need to wait for that to happen?
>
> I tried removing it from my simple NetworkWordCount example (running
> locally, not on a cluster) and nothing changed.  In both cases, I end my
> program by hitting Ctrl-C.
>
> Thanks for any insight you can give me.
>
> Diana


Re: Run spark on mesos remotely

2014-03-27 Thread Mayur Rustagi
Yes but you have to maintain connection of that machine to the master
cluster as the driver with DAG scheduler runs on that machine.
Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Thu, Mar 27, 2014 at 4:09 AM, Wush Wu  wrote:

> Dear all,
>
> We have a spark 0.8.1 cluster on mesos 0.15. It works if I submit the job
> from the master of mesos. That is to say, I spawn the spark shell or launch
> the scala application on the master of mesos.
>
> However, when I submit the job from another machine, the job will lost.
> The logs shows that the mesos does not copy the
> spark-0.8.1-incubating.tar.gz to the temporal working directory, so the job
> lost immediately. Is it possible to submit the job from the machine not
> belong to mesos cluster?
>
> Thanks!
>
> Wush
>


Re:

2014-03-27 Thread Mayur Rustagi
You have to raise the global limit as root. Also you have to do that on the
whole cluster.
Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Thu, Mar 27, 2014 at 4:07 AM, Hahn Jiang wrote:

> I set "ulimit -n 10" in conf/spark-env.sh, is it too small?
>
>
> On Thu, Mar 27, 2014 at 3:36 PM, Sonal Goyal wrote:
>
>> Hi Hahn,
>>
>> What's the ulimit on your systems? Please check the following link for a
>> discussion on the too many files open.
>>
>>
>> http://mail-archives.apache.org/mod_mbox/spark-user/201402.mbox/%3ccangvg8qpn_wllsrcjegdb7hmza2ux7myxzhfvtz+b-sdxdk...@mail.gmail.com%3E
>>
>>
>> Sent from my iPad
>>
>> > On Mar 27, 2014, at 12:15 PM, Hahn Jiang 
>> wrote:
>> >
>> > Hi, all
>> >
>> > I write a spark program on yarn. When I use small size input file, my
>> program can run well. But my job will failed if input size is more than 40G.
>> >
>> > the error log:
>> > java.io.FileNotFoundException (java.io.FileNotFoundException:
>> /home/work/data12/yarn/nodemanager/usercache/appcache/application_1392894597330_86813/spark-local-20140327144433-716b/24/shuffle_0_22_890
>> (Too many open files))
>> > java.io.FileOutputStream.openAppend(Native Method)
>> > java.io.FileOutputStream.(FileOutputStream.java:192)
>> >
>> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:113)
>> >
>> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:174)
>> >
>> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:164)
>> >
>> org.apache.spark.scheduler.ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:161)
>> > scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >
>> org.apache.spark.util.collection.ExternalAppendOnlyMap$ExternalIterator.foreach(ExternalAppendOnlyMap.scala:239)
>> >
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:161)
>> >
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:102)
>> > org.apache.spark.scheduler.Task.run(Task.scala:53)
>> >
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
>> >
>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
>> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>> > java.lang.Thread.run(Thread.java:662)
>> >
>> >
>> > my object:
>> > object Test {
>> >
>> >   def main(args: Array[String]) {
>> > val sc = new SparkContext(args(0), "Test",
>> >   System.getenv("SPARK_HOME"),
>> SparkContext.jarOfClass(this.getClass))
>> >
>> > val mg = sc.textFile("/user/.../part-*")
>> > val mct = sc.textFile("/user/.../part-*")
>> >
>> > val pair1 = mg.map {
>> >   s =>
>> > val cols = s.split("\t")
>> > (cols(0), cols(1))
>> > }
>> > val pair2 = mct.map {
>> >   s =>
>> > val cols = s.split("\t")
>> > (cols(0), cols(1))
>> > }
>> > val merge = pair1.union(pair2)
>> > val result = merge.reduceByKey(_ + _)
>> > val outputPath = new Path("/user/xxx/temp/spark-output")
>> > outputPath.getFileSystem(new Configuration()).delete(outputPath,
>> true)
>> > result.saveAsTextFile(outputPath.toString)
>> >
>> > System.exit(0)
>> >   }
>> >
>> > }
>> >
>> > My spark version is 0.9 and I run my job use this command
>> "/opt/soft/spark/bin/spark-class org.apache.spark.deploy.yarn.Client --jar
>> ./spark-example_2.10-0.1-SNAPSHOT.jar --class Test --queue default --args
>> yarn-standalone --num-workers 500 --master-memory 7g --worker-memory 7g
>> --worker-cores 2"
>> >
>>
>
>


Re: Configuring distributed caching with Spark and YARN

2014-03-27 Thread Mayur Rustagi
is this equivalent to addjar?


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Thu, Mar 27, 2014 at 3:58 AM, santhoma  wrote:

> Curious to know, were you able to do distributed caching for spark?
>
> I have done that for hadoop and pig, but could not find a way to do it in
> spark
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-distributed-caching-with-Spark-and-YARN-tp1074p3325.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: GC overhead limit exceeded

2014-03-27 Thread Syed A. Hashmi
Which storage scheme are you using? I am guessing it is MEMORY_ONLY. In
large datasets, MEMORY_AND_DISK or MEMORY_AND_DISK_SER work better.

You can call unpersist on an RDD to remove it from Cache though.


On Thu, Mar 27, 2014 at 11:57 AM, Sai Prasanna wrote:

> No i am running on 0.8.1.
> Yes i am caching a lot, i am benchmarking a simple code in spark where in
> 512mb, 1g and 2g text files are taken, some basic intermediate operations
> are done while the intermediate result which will be used in subsequent
> operations are cached.
>
> I thought that, we need not manually unpersist, if i need to cache
> something and if cache is found full, automatically space will be created
> by evacuating the earlier. Do i need to unpersist?
>
> Moreover, if i run several times, will the previously cached RDDs still
> remain in the cache? If so can i flush them manually out before the next
> run? [something like complete cache flush]
>
>
> On Thu, Mar 27, 2014 at 11:16 PM, Andrew Or  wrote:
>
>> Are you caching a lot of RDD's? If so, maybe you should unpersist() the
>> ones that you're not using. Also, if you're on 0.9, make sure
>> spark.shuffle.spill is enabled (which it is by default). This allows your
>> application to spill in-memory content to disk if necessary.
>>
>> How much memory are you giving to your executors? The default,
>> spark.executor.memory is 512m, which is quite low. Consider raising this.
>> Checking the web UI is a good way to figure out your runtime memory usage.
>>
>>
>> On Thu, Mar 27, 2014 at 9:22 AM, Ognen Duzlevski <
>> og...@plainvanillagames.com> wrote:
>>
>>>  Look at the tuning guide on Spark's webpage for strategies to cope with
>>> this.
>>> I have run into quite a few memory issues like these, some are resolved
>>> by changing the StorageLevel strategy and employing things like Kryo, some
>>> are solved by specifying the number of tasks to break down a given
>>> operation into etc.
>>>
>>> Ognen
>>>
>>>
>>> On 3/27/14, 10:21 AM, Sai Prasanna wrote:
>>>
>>> "java.lang.OutOfMemoryError: GC overhead limit exceeded"
>>>
>>>  What is the problem. The same code, i run, one instance it runs in 8
>>> second, next time it takes really long time, say 300-500 seconds...
>>> I see the logs a lot of GC overhead limit exceeded is seen. What should
>>> be done ??
>>>
>>>  Please can someone throw some light on it ??
>>>
>>>
>>>
>>>  --
>>>  *Sai Prasanna. AN*
>>> *II M.Tech (CS), SSSIHL*
>>>
>>>
>>> * Entire water in the ocean can never sink a ship, Unless it gets
>>> inside. All the pressures of life can never hurt you, Unless you let them
>>> in.*
>>>
>>>
>>>
>>
>
>
> --
> *Sai Prasanna. AN*
> *II M.Tech (CS), SSSIHL*
>
>
> *Entire water in the ocean can never sink a ship, Unless it gets inside.
> All the pressures of life can never hurt you, Unless you let them in.*
>


how to create a DStream from bunch of RDDs

2014-03-27 Thread Adrian Mocanu
I create several RDDs by merging several consecutive RDDs from a DStream. Is 
there a way to add these new RDDs to a DStream?

-Adrian



Re: spark streaming and the spark shell

2014-03-27 Thread Tathagata Das
Very good questions! Responses inline.

TD

On Thu, Mar 27, 2014 at 8:02 AM, Diana Carroll  wrote:
> I'm working with spark streaming using spark-shell, and hoping folks could
> answer a few questions I have.
>
> I'm doing WordCount on a socket stream:
>
> import org.apache.spark.streaming.StreamingContext
> import org.apache.spark.streaming.StreamingContext._
> import org.apache.spark.streaming.Seconds
> var ssc = new StreamingContext(sc,Seconds(5))
> var mystream = ssc.socketTextStream("localhost",)
> var words = mystream.flatMap(line => line.split(" "))
> var wordCounts = words.map(x => (x, 1)).reduceByKey((x,y) => x+y)
> wordCounts.print()
> ssc.start()
>
>
>
> 1.  I'm assuming that using spark shell is an edge case, and that spark
> streaming is really intended mostly for batch use.  True?
>

Yes. Currently the spark-shell is not the intended execution mode for
Spark Streaming, even though it can be done for quick testing.

> 2.   I notice that once I start ssc.start(), my stream starts processing and
> continues indefinitely...even if I close the socket on the server end (I'm
> using unix command "nc" to mimic a server as explained in the streaming
> programming guide .)  Can I tell my stream to detect if it's lost a
> connection and therefore stop executing?  (Or even better, to attempt to
> re-establish the connection?)
>


Currently, not yet. But I am aware of this and this behavior will be
improved in the future.

> 3.  I tried entering ssc.stop which resulted in an error:
>
> Exception in thread "Thread-43" org.apache.spark.SparkException: Job
> cancelled because SparkContext was shut down
> 14/03/27 07:36:13 ERROR ConnectionManager: Corresponding
> SendingConnectionManagerId not found
>
> But it did stop the DStream execution.
>


Ah, that happens sometimes. The existing behavior of ssc.stop() is
that it will stop everything immediately.
I just opened a pull request for a more graceful shutting down of the
Spark streaming program.
https://github.com/apache/spark/pull/247

> 4.  Then I tried restarting the ssc again (ssc.start) and got another error:
> org.apache.spark.SparkException: JobScheduler already started
> Is restarting an ssc supported?
>


Restarting is ideally not supported. However, the behavior was not
explicitly checked. The above pull requests
makes the behavior more explicitly by throwing the right warnings and
exceptions.

> 5.  When I perform an operation like wordCounts.print(), that operation will
> execution on each batch, ever n seconds.  Is there a way I can undo that
> operation?  That is, I want it to *stop* executing that print ever n
> seconds...without having to stop the stream.
>
> What I'm really asking is...can I explore DStreams interactively the way I
> can explore my data in regular Spark.  In regular Spark, I might perform
> various operations on an RDD to see what happens.  So at first, I might have
> used "split(" ") to tokenize my input text, but now I want to try using
> split(",") instead, after the stream has already started running.  Can I do
> that?
>
> I did find out that if add a new operation to an existing dstream (say,
> words.print()) after the ssc.start it works. It *will* add the second
> print() call to the execution list every n seconds.
>
> but if I try to add new dstreams, e.g.
> ...
>
> ssc.start()
>
> var testpairs = words.map(x => (x, "TEST"))
> testpairs.print()
>
>
> I get an error:
>
> 14/03/27 07:57:50 ERROR JobScheduler: Error generating jobs for time
> 139593227 ms
> java.lang.Exception:
> org.apache.spark.streaming.dstream.MappedDStream@84f0f92 has not been
> initialized
>
>
> Is this sort of interactive use just not supported?


Modifying the DStream operations after the context has started is not
officially supported. However dynamically changing the computation can
be done using DStream.transform() or DStream.foreachRDD()
Both these operations allow you to do arbitrary RDD operations on each
RDD. So you can dynamically modify what RDD operations are used within
the DStream transform / foreachRDD (so you are not changing the
DStream operations, only whats inside the DStream operation). But to
use this really interactively, you have to write a bit of additional
code that allows the user to interactively specify the function
applied on each RDD.



>
> Thanks!
>
> Diana


Re: Spark Streaming + Kafka + Mesos/Marathon strangeness

2014-03-27 Thread Tathagata Das
spark.local.dir should be specified in the same way as other configuration
parameters.  


On Thu, Mar 27, 2014 at 10:32 AM, Scott Clasen wrote:

> I think now that this is because spark.local.dir is defaulting to /tmp, and
> since the tasks are not running on the same machine, the file is not found
> when the second task takes over.
>
> How do you set spark.local.dir appropriately when running on mesos?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Kafka-Mesos-Marathon-strangeness-tp3285p3356.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Spark Streaming + Kafka + Mesos/Marathon strangeness

2014-03-27 Thread Scott Clasen
Heh sorry that wasnt a clear question, I know 'how' to set it but dont know
what value to use in a mesos cluster, since the processes are running in lxc
containers they wont be sharing a filesystem (or machine for that matter)  

I cant use an s3n:// url for local dir can I?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Kafka-Mesos-Marathon-strangeness-tp3285p3373.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: KafkaInputDStream mapping of partitions to tasks

2014-03-27 Thread Scott Clasen
Actually looking closer it is stranger than I thought,

in the spark UI, one executor has executed 4 tasks, and one has executed
1928

Can anyone explain the workings of a KafkaInputStream wrt kafka partitions
and mapping to spark executors and tasks?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360p3374.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: spark streaming and the spark shell

2014-03-27 Thread Evgeny Shishkin
> 
>> 2.   I notice that once I start ssc.start(), my stream starts processing and
>> continues indefinitely...even if I close the socket on the server end (I'm
>> using unix command "nc" to mimic a server as explained in the streaming
>> programming guide .)  Can I tell my stream to detect if it's lost a
>> connection and therefore stop executing?  (Or even better, to attempt to
>> re-establish the connection?)
>> 
> 
> 
> Currently, not yet. But I am aware of this and this behavior will be
> improved in the future.

Now i understand why out spark streaming job starts to generate zero sized rdds 
from kafkainput, 
when one worker get OOM or crashes.

And we can’t detect it! Great. So spark streaming just doesn’t suite yet for 
24/7 operation =\

Re: YARN problem using an external jar in worker nodes Inbox x

2014-03-27 Thread Sandy Ryza
I just tried this in CDH (only a few patches ahead of 0.9.0) and was able
to include a dependency with --addJars successfully.

Can you share how you're invoking SparkContext.addJar?  Anything
interesting in the application master logs?

-Sandy




On Thu, Mar 27, 2014 at 11:35 AM, Sung Hwan Chung
wrote:

> Yea it's in a standalone mode and I did use SparkContext.addJar method and
> tried setting setExecutorEnv "SPARK_CLASSPATH", etc. but none of it worked.
>
> I finally made it work by modifying the ClientBase.scala code where I set
> 'appMasterOnly' to false before the addJars contents were added to
> distCacheMgr. But this is not what I should be doing, right?
>
> Is there a problem with addJar method in 0.9.0?
>
>
> On Wed, Mar 26, 2014 at 1:47 PM, Sandy Ryza wrote:
>
>> Hi Sung,
>>
>> Are you using yarn-standalone mode?  Have you specified the --addJars
>> option with your external jars?
>>
>> -Sandy
>>
>>
>> On Wed, Mar 26, 2014 at 1:17 PM, Sung Hwan Chung <
>> coded...@cs.stanford.edu> wrote:
>>
>>> Hello, (this is Yarn related)
>>>
>>> I'm able to load an external jar and use its classes within
>>> ApplicationMaster. I wish to use this jar within worker nodes, so I added
>>> sc.addJar(pathToJar) and ran.
>>>
>>> I get the following exception:
>>>
>>> org.apache.spark.SparkException: Job aborted: Task 0.0:1 failed 4 times 
>>> (most recent failure: Exception failure: java.lang.NoClassDefFoundError: 
>>> org/opencv/objdetect/HOGDescriptor)
>>> Job aborted: Task 0.0:1 failed 4 times (most recent failure: Exception 
>>> failure: java.lang.NoClassDefFoundError: org/opencv/objdetect/HOGDescriptor)
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>>> scala.Option.foreach(Option.scala:236)
>>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>> akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>> akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>
>>>
>>> And in worker node containers' stderr log (nothing in stdout log), I
>>> don't see any reference to loading jars:
>>>
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> SLF4J: Found binding in 
>>> [jar:file:/home/gpphddata/1/yarn/nm-local-dir/usercache/yarn/filecache/7394400996676014282/spark-assembly-0.9.0-incubating-hadoop2.0.2-alpha-gphd-2.0.1.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: Found binding in 
>>> [jar:file:/usr/lib/gphd/hadoop-2.0.2_alpha_gphd_2_0_1_0/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
>>> explanation.
>>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>>> 14/03/26 13:12:18 INFO slf4j.Slf4jLogger: Slf4jLogger started
>>> 14/03/26 13:12:18 INFO Remoting: Starting remoting
>>> 14/03/26 13:12:18 INFO Remoting: Remoting started; listening on addresses 
>>> :[akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006]
>>> 14/03/26 13:12:18 INFO Remoting: Remoting now listens on addresses: 
>>> [akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006]
>>> 14/03/26 13:12:18 INFO executor.CoarseGrainedExecutorBackend: Connecting to 
>>> driver: 
>>> akka.tcp://spark@alpinenode5.alpinenow.local:10314/user/CoarseGrainedScheduler
>>> 14/03/26 13:12:18 ERROR executor.CoarseGrainedExecutorBackend: Driver 
>>> Disassociated [akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006] 
>>> -> [akka.tcp://spark@alpinenode5.alpinenow.local:10314] disassociated! 
>>> Shutting down.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> Any idea what's going on?
>>>
>>>
>>
>


Re: YARN problem using an external jar in worker nodes Inbox x

2014-03-27 Thread Sung Hwan Chung
Well, it says that the jar was successfully added but can't reference
classes from it. Does this have anything to do with this bug?

http://stackoverflow.com/questions/22457645/when-to-use-spark-classpath-or-sparkcontext-addjar


On Thu, Mar 27, 2014 at 2:57 PM, Sandy Ryza  wrote:

> I just tried this in CDH (only a few patches ahead of 0.9.0) and was able
> to include a dependency with --addJars successfully.
>
> Can you share how you're invoking SparkContext.addJar?  Anything
> interesting in the application master logs?
>
> -Sandy
>
>
>
>
> On Thu, Mar 27, 2014 at 11:35 AM, Sung Hwan Chung <
> coded...@cs.stanford.edu> wrote:
>
>> Yea it's in a standalone mode and I did use SparkContext.addJar method
>> and tried setting setExecutorEnv "SPARK_CLASSPATH", etc. but none of it
>> worked.
>>
>> I finally made it work by modifying the ClientBase.scala code where I set
>> 'appMasterOnly' to false before the addJars contents were added to
>> distCacheMgr. But this is not what I should be doing, right?
>>
>> Is there a problem with addJar method in 0.9.0?
>>
>>
>> On Wed, Mar 26, 2014 at 1:47 PM, Sandy Ryza wrote:
>>
>>> Hi Sung,
>>>
>>> Are you using yarn-standalone mode?  Have you specified the --addJars
>>> option with your external jars?
>>>
>>> -Sandy
>>>
>>>
>>> On Wed, Mar 26, 2014 at 1:17 PM, Sung Hwan Chung <
>>> coded...@cs.stanford.edu> wrote:
>>>
 Hello, (this is Yarn related)

 I'm able to load an external jar and use its classes within
 ApplicationMaster. I wish to use this jar within worker nodes, so I added
 sc.addJar(pathToJar) and ran.

 I get the following exception:

 org.apache.spark.SparkException: Job aborted: Task 0.0:1 failed 4 times 
 (most recent failure: Exception failure: java.lang.NoClassDefFoundError: 
 org/opencv/objdetect/HOGDescriptor)
 Job aborted: Task 0.0:1 failed 4 times (most recent failure: Exception 
 failure: java.lang.NoClassDefFoundError: 
 org/opencv/objdetect/HOGDescriptor)
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
 scala.Option.foreach(Option.scala:236)
 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
 org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
 akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 akka.actor.ActorCell.invoke(ActorCell.scala:456)
 akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 akka.dispatch.Mailbox.run(Mailbox.scala:219)
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



 And in worker node containers' stderr log (nothing in stdout log), I
 don't see any reference to loading jars:

 SLF4J: Class path contains multiple SLF4J bindings.
 SLF4J: Found binding in 
 [jar:file:/home/gpphddata/1/yarn/nm-local-dir/usercache/yarn/filecache/7394400996676014282/spark-assembly-0.9.0-incubating-hadoop2.0.2-alpha-gphd-2.0.1.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: Found binding in 
 [jar:file:/usr/lib/gphd/hadoop-2.0.2_alpha_gphd_2_0_1_0/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
 explanation.
 SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
 14/03/26 13:12:18 INFO slf4j.Slf4jLogger: Slf4jLogger started
 14/03/26 13:12:18 INFO Remoting: Starting remoting
 14/03/26 13:12:18 INFO Remoting: Remoting started; listening on addresses 
 :[akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006]
 14/03/26 13:12:18 INFO Remoting: Remoting now listens on addresses: 
 [akka.tcp://sparkExecutor@alpinenode6.alpinenow.local:44006]
 14/03/26 13:12:18 INFO executor.CoarseGrainedExecutorBackend: Connecting 
 to driver: 
 akka.tcp://spark@alpinenode5.alpinenow.local:10314/user/CoarseGrainedSch

Re: KafkaInputDStream mapping of partitions to tasks

2014-03-27 Thread Evgeny Shishkin

On 28 Mar 2014, at 00:34, Scott Clasen  wrote:
Actually looking closer it is stranger than I thought,

in the spark UI, one executor has executed 4 tasks, and one has executed
1928

Can anyone explain the workings of a KafkaInputStream wrt kafka partitions
and mapping to spark executors and tasks?


Well, there are some issues with kafkainput now.
When you do KafkaUtils.createStream, it creates kafka high level consumer on 
one node!
I don’t really know how many rdd it will generate during batch window.
But when this rdd are created, spark schedules consecutive transformations on 
that one node,
because of locality.

You can try to repartition() those rdds. Sometime it helps.

To try to consume from kafka on multiple machines you can do (1 to 
N).map(KafkaUtils.createStream)
But then arises issue with kafka high-level consumer! 
Those consumers operate in one consumer group, and they try to decide which 
consumer consumes which partitions.
And it may just fail to do syncpartitionrebalance, and then you have only a few 
consumers really consuming. 
To mitigate this problem, you can set rebalance retries very high, and pray it 
helps.

Then arises yet another feature — if your receiver dies (OOM, hardware failer), 
you just stop receiving from kafka!
Brilliant.
And another feature — if you ask spark’s kafkainput to begin with 
auto.offset.reset = smallest, it will reset you offsets every time you ran 
application!
It does not comply with documentation (reset to earliest offsets if it does not 
find offsets on zookeeper),
it just erase your offsets and start again from zero!

And remember that you should restart your streaming app when there is any 
failure on receiver!

So, at the bottom — kafka input stream just does not work.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360p3374.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: KafkaInputDStream mapping of partitions to tasks

2014-03-27 Thread Scott Clasen
Evgeniy Shishkin wrote
> So, at the bottom — kafka input stream just does not work.


That was the conclusion I was coming to as well.  Are there open tickets
around fixing this up?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360p3379.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: YARN problem using an external jar in worker nodes Inbox x

2014-03-27 Thread Sandy Ryza
That bug only appears to apply to spark-shell.

Do things work in yarn-client mode or on a standalone cluster?  Are you
passing a path with parent directories to addJar?


On Thu, Mar 27, 2014 at 3:01 PM, Sung Hwan Chung
wrote:

> Well, it says that the jar was successfully added but can't reference
> classes from it. Does this have anything to do with this bug?
>
>
> http://stackoverflow.com/questions/22457645/when-to-use-spark-classpath-or-sparkcontext-addjar
>
>
> On Thu, Mar 27, 2014 at 2:57 PM, Sandy Ryza wrote:
>
>> I just tried this in CDH (only a few patches ahead of 0.9.0) and was able
>> to include a dependency with --addJars successfully.
>>
>> Can you share how you're invoking SparkContext.addJar?  Anything
>> interesting in the application master logs?
>>
>> -Sandy
>>
>>
>>
>>
>> On Thu, Mar 27, 2014 at 11:35 AM, Sung Hwan Chung <
>> coded...@cs.stanford.edu> wrote:
>>
>>> Yea it's in a standalone mode and I did use SparkContext.addJar method
>>> and tried setting setExecutorEnv "SPARK_CLASSPATH", etc. but none of it
>>> worked.
>>>
>>> I finally made it work by modifying the ClientBase.scala code where I
>>> set 'appMasterOnly' to false before the addJars contents were added to
>>> distCacheMgr. But this is not what I should be doing, right?
>>>
>>> Is there a problem with addJar method in 0.9.0?
>>>
>>>
>>> On Wed, Mar 26, 2014 at 1:47 PM, Sandy Ryza wrote:
>>>
 Hi Sung,

 Are you using yarn-standalone mode?  Have you specified the --addJars
 option with your external jars?

 -Sandy


 On Wed, Mar 26, 2014 at 1:17 PM, Sung Hwan Chung <
 coded...@cs.stanford.edu> wrote:

> Hello, (this is Yarn related)
>
> I'm able to load an external jar and use its classes within
> ApplicationMaster. I wish to use this jar within worker nodes, so I added
> sc.addJar(pathToJar) and ran.
>
> I get the following exception:
>
> org.apache.spark.SparkException: Job aborted: Task 0.0:1 failed 4 times 
> (most recent failure: Exception failure: java.lang.NoClassDefFoundError: 
> org/opencv/objdetect/HOGDescriptor)
> Job aborted: Task 0.0:1 failed 4 times (most recent failure: Exception 
> failure: java.lang.NoClassDefFoundError: 
> org/opencv/objdetect/HOGDescriptor)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
> scala.Option.foreach(Option.scala:236)
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> akka.actor.ActorCell.invoke(ActorCell.scala:456)
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> akka.dispatch.Mailbox.run(Mailbox.scala:219)
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
> And in worker node containers' stderr log (nothing in stdout log), I
> don't see any reference to loading jars:
>
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/home/gpphddata/1/yarn/nm-local-dir/usercache/yarn/filecache/7394400996676014282/spark-assembly-0.9.0-incubating-hadoop2.0.2-alpha-gphd-2.0.1.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/usr/lib/gphd/hadoop-2.0.2_alpha_gphd_2_0_1_0/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 14/03/26 13:12:18 INFO slf4j.Slf4jLogger: Slf4jLogger started
> 14/03/26 13:12:18 INFO Remoting: Starting remoting
> 14/03/26 13:12:18 INFO Remoting: Remoting started; listening on addresses 
> :[akka.tcp://sparkExecutor@alpinenode6.alpine

Re: spark streaming and the spark shell

2014-03-27 Thread Tathagata Das
Seems like the configuration of the Spark worker is not right. Either the
worker has not been given enough memory or the allocation of the memory to
the RDD storage needs to be fixed. If configured correctly, the Spark
workers should not get OOMs.



On Thu, Mar 27, 2014 at 2:52 PM, Evgeny Shishkin wrote:

>
> 2.   I notice that once I start ssc.start(), my stream starts processing
> and
> continues indefinitely...even if I close the socket on the server end (I'm
> using unix command "nc" to mimic a server as explained in the streaming
> programming guide .)  Can I tell my stream to detect if it's lost a
> connection and therefore stop executing?  (Or even better, to attempt to
> re-establish the connection?)
>
>
>
> Currently, not yet. But I am aware of this and this behavior will be
> improved in the future.
>
>
> Now i understand why out spark streaming job starts to generate zero sized
> rdds from kafkainput,
> when one worker get OOM or crashes.
>
> And we can't detect it! Great. So spark streaming just doesn't suite yet
> for 24/7 operation =\
>


Re: KafkaInputDStream mapping of partitions to tasks

2014-03-27 Thread Evgeny Shishkin

On 28 Mar 2014, at 01:11, Scott Clasen  wrote:

> Evgeniy Shishkin wrote
>> So, at the bottom — kafka input stream just does not work.
> 
> 
> That was the conclusion I was coming to as well.  Are there open tickets
> around fixing this up?
> 

I am not aware of such. Actually nobody complained on spark+kafka before.
So i thought it just works, and then we tried to build something on it and 
almost failed.

I think that it is possible to steal/replicate how twitter storm works with 
kafka.
They do manual partition assignment, at least this would help to balance load.

There is another issue.
ssc batch creates new rdds every batch duration, always, even it previous 
computation did not finish.

But with kafka, we can consume more rdds later, after we finish previous rdds.
That way it would be much much simpler to not get OOM’ed when starting from 
beginning,
because we can consume many data from kafka during batch duration and then get 
oom.

But we just can not start slow, can not limit how many to consume during batch. 
 


> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360p3379.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: spark streaming and the spark shell

2014-03-27 Thread Evgeny Shishkin

On 28 Mar 2014, at 01:13, Tathagata Das  wrote:

> Seems like the configuration of the Spark worker is not right. Either the 
> worker has not been given enough memory or the allocation of the memory to 
> the RDD storage needs to be fixed. If configured correctly, the Spark workers 
> should not get OOMs.


Yes, it is easy to start with latest offsets, get steady configuration and 
everything is nice.

Then your machine failes. And you stop receiving from kafka anything.

Then you notice this and restart your app hoping it would continue from offsets 
on zookeeper.
BUT NO
YOUR DEFAULT STREAM CONSUMERS JUST ERASED OFFSETS FROM ZOOKEEPER

After we fixed erasing offsets, we start from Some Offsets in the past.
And during batch duration we can’t limit how many messages we get from Kafka.
AND HERE WE OOM

And it's just a pain. Complete pain.

And you remember, only some machines consumes. Usually two or three. Because of 
broken high-level consumer in kafka.

Re: KafkaInputDStream mapping of partitions to tasks

2014-03-27 Thread Tathagata Das
Yes, no one has reported this issue before. I just opened a JIRA on what I
think is the main problem here
https://spark-project.atlassian.net/browse/SPARK-1340
Some of the receivers dont get restarted.
I have a bunch refactoring in the NetworkReceiver ready to be posted as a
PR that should fix this.

Regarding the second problem, I have been thinking of adding flow control
(i.e. limiting the rate of receiving) for a while, just havent gotten
around to it.
I added another JIRA for that for tracking this issue.
https://spark-project.atlassian.net/browse/SPARK-1341


TD


On Thu, Mar 27, 2014 at 3:23 PM, Evgeny Shishkin wrote:

>
> On 28 Mar 2014, at 01:11, Scott Clasen  wrote:
>
> > Evgeniy Shishkin wrote
> >> So, at the bottom -- kafka input stream just does not work.
> >
> >
> > That was the conclusion I was coming to as well.  Are there open tickets
> > around fixing this up?
> >
>
> I am not aware of such. Actually nobody complained on spark+kafka before.
> So i thought it just works, and then we tried to build something on it and
> almost failed.
>
> I think that it is possible to steal/replicate how twitter storm works
> with kafka.
> They do manual partition assignment, at least this would help to balance
> load.
>
> There is another issue.
> ssc batch creates new rdds every batch duration, always, even it previous
> computation did not finish.
>
> But with kafka, we can consume more rdds later, after we finish previous
> rdds.
> That way it would be much much simpler to not get OOM'ed when starting
> from beginning,
> because we can consume many data from kafka during batch duration and then
> get oom.
>
> But we just can not start slow, can not limit how many to consume during
> batch.
>
>
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360p3379.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>


Re: spark streaming and the spark shell

2014-03-27 Thread Tathagata Das
I see! As I said in the other thread, no one reported these issues until
now! A good and not-too-hard fix is to add the functionality of the
limiting the data rate that the receivers receives at. I have opened a
JIRA.

TD


On Thu, Mar 27, 2014 at 3:28 PM, Evgeny Shishkin wrote:

>
> On 28 Mar 2014, at 01:13, Tathagata Das 
> wrote:
>
> Seems like the configuration of the Spark worker is not right. Either the
> worker has not been given enough memory or the allocation of the memory to
> the RDD storage needs to be fixed. If configured correctly, the Spark
> workers should not get OOMs.
>
>
>
> Yes, it is easy to start with latest offsets, get steady configuration and
> everything is nice.
>
> Then your machine failes. And you stop receiving from kafka anything.
>
> Then you notice this and restart your app hoping it would continue from
> offsets on zookeeper.
> BUT NO
> YOUR DEFAULT STREAM CONSUMERS JUST ERASED OFFSETS FROM ZOOKEEPER
>
> After we fixed erasing offsets, we start from Some Offsets in the past.
> And during batch duration we can't limit how many messages we get from
> Kafka.
> AND HERE WE OOM
>
> And it's just a pain. Complete pain.
>
> And you remember, only some machines consumes. Usually two or three.
> Because of broken high-level consumer in kafka.
>


Re: KafkaInputDStream mapping of partitions to tasks

2014-03-27 Thread Evgeny Shishkin

On 28 Mar 2014, at 01:32, Tathagata Das  wrote:

> Yes, no one has reported this issue before. I just opened a JIRA on what I 
> think is the main problem here
> https://spark-project.atlassian.net/browse/SPARK-1340
> Some of the receivers dont get restarted. 
> I have a bunch refactoring in the NetworkReceiver ready to be posted as a PR 
> that should fix this. 
> 
> Regarding the second problem, I have been thinking of adding flow control 
> (i.e. limiting the rate of receiving) for a while, just havent gotten around 
> to it. 
> I added another JIRA for that for tracking this issue.
> https://spark-project.atlassian.net/browse/SPARK-1341
> 
> 

Thank you, i will participate and can provide testing of new code.
Sorry for capslock, i just debugged this whole day, literally. 


> TD
> 
> 
> On Thu, Mar 27, 2014 at 3:23 PM, Evgeny Shishkin  wrote:
> 
> On 28 Mar 2014, at 01:11, Scott Clasen  wrote:
> 
> > Evgeniy Shishkin wrote
> >> So, at the bottom — kafka input stream just does not work.
> >
> >
> > That was the conclusion I was coming to as well.  Are there open tickets
> > around fixing this up?
> >
> 
> I am not aware of such. Actually nobody complained on spark+kafka before.
> So i thought it just works, and then we tried to build something on it and 
> almost failed.
> 
> I think that it is possible to steal/replicate how twitter storm works with 
> kafka.
> They do manual partition assignment, at least this would help to balance load.
> 
> There is another issue.
> ssc batch creates new rdds every batch duration, always, even it previous 
> computation did not finish.
> 
> But with kafka, we can consume more rdds later, after we finish previous rdds.
> That way it would be much much simpler to not get OOM’ed when starting from 
> beginning,
> because we can consume many data from kafka during batch duration and then 
> get oom.
> 
> But we just can not start slow, can not limit how many to consume during 
> batch.
> 
> 
> >
> > --
> > View this message in context: 
> > http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360p3379.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> 



Re: spark streaming and the spark shell

2014-03-27 Thread Evgeny Shishkin

On 28 Mar 2014, at 01:37, Tathagata Das  wrote:

> I see! As I said in the other thread, no one reported these issues until now! 
> A good and not-too-hard fix is to add the functionality of the limiting the 
> data rate that the receivers receives at. I have opened a JIRA. 
> 

Yes, actually you should have another Jira on this
https://github.com/apache/incubator-spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala#L106

This just erases offsets from zookeeper. But auto.offsets.reset have another 
meaning.

What to do when there is no initial offset in Zookeeper or if an offset is out 
of range:
* smallest : automatically reset the offset to the smallest offset
* largest : automatically reset the offset to the largest offset
* anything else: throw exception to the consumer. If this is set to largest, 
the consumer may lose some messages when the number of partitions, for the 
topics it subscribes to, changes on the broker. To prevent data loss during 
partition addition, set auto.offset.reset to smallest

i will stress it — WHEN THERE IS NO INITIAL OFFSET OF IT IS OUT OF RANGE
not “hey! i’ll just reset your position because you restarted app"

> TD
> 
> 
> On Thu, Mar 27, 2014 at 3:28 PM, Evgeny Shishkin  wrote:
> 
> On 28 Mar 2014, at 01:13, Tathagata Das  wrote:
> 
>> Seems like the configuration of the Spark worker is not right. Either the 
>> worker has not been given enough memory or the allocation of the memory to 
>> the RDD storage needs to be fixed. If configured correctly, the Spark 
>> workers should not get OOMs.
> 
> 
> Yes, it is easy to start with latest offsets, get steady configuration and 
> everything is nice.
> 
> Then your machine failes. And you stop receiving from kafka anything.
> 
> Then you notice this and restart your app hoping it would continue from 
> offsets on zookeeper.
> BUT NO
> YOUR DEFAULT STREAM CONSUMERS JUST ERASED OFFSETS FROM ZOOKEEPER
> 
> After we fixed erasing offsets, we start from Some Offsets in the past.
> And during batch duration we can’t limit how many messages we get from Kafka.
> AND HERE WE OOM
> 
> And it's just a pain. Complete pain.
> 
> And you remember, only some machines consumes. Usually two or three. Because 
> of broken high-level consumer in kafka.
> 



Re: Spark Streaming + Kafka + Mesos/Marathon strangeness

2014-03-27 Thread Tathagata Das
The more I think about it the problem is not about /tmp, its more about the
workers not having enough memory. Blocks of received data could be falling
out of memory before it is getting processed.
BTW, what is the storage level that you are using for your input stream? If
you are using MEMORY_ONLY, then try MEMORY_AND_DISK. That is safer because
it ensure that if received data falls out of memory it will be at least
saved to disk.

TD


On Thu, Mar 27, 2014 at 2:29 PM, Scott Clasen wrote:

> Heh sorry that wasnt a clear question, I know 'how' to set it but dont know
> what value to use in a mesos cluster, since the processes are running in
> lxc
> containers they wont be sharing a filesystem (or machine for that matter)
>
> I cant use an s3n:// url for local dir can I?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Kafka-Mesos-Marathon-strangeness-tp3285p3373.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Spark Streaming + Kafka + Mesos/Marathon strangeness

2014-03-27 Thread Evgeny Shishkin

On 28 Mar 2014, at 01:44, Tathagata Das  wrote:

> The more I think about it the problem is not about /tmp, its more about the 
> workers not having enough memory. Blocks of received data could be falling 
> out of memory before it is getting processed. 
> BTW, what is the storage level that you are using for your input stream? If 
> you are using MEMORY_ONLY, then try MEMORY_AND_DISK. That is safer because it 
> ensure that if received data falls out of memory it will be at least saved to 
> disk.
> 
> TD
> 

And i saw such errors because of cleaner.rtt.
Thich erases everything. Even needed rdds.



> 
> On Thu, Mar 27, 2014 at 2:29 PM, Scott Clasen  wrote:
> Heh sorry that wasnt a clear question, I know 'how' to set it but dont know
> what value to use in a mesos cluster, since the processes are running in lxc
> containers they wont be sharing a filesystem (or machine for that matter)
> 
> I cant use an s3n:// url for local dir can I?
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Kafka-Mesos-Marathon-strangeness-tp3285p3373.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 



Re: KafkaInputDStream mapping of partitions to tasks

2014-03-27 Thread Evgeny Shishkin

On 28 Mar 2014, at 01:38, Evgeny Shishkin  wrote:

> 
> On 28 Mar 2014, at 01:32, Tathagata Das  wrote:
> 
>> Yes, no one has reported this issue before. I just opened a JIRA on what I 
>> think is the main problem here
>> https://spark-project.atlassian.net/browse/SPARK-1340
>> Some of the receivers dont get restarted. 
>> I have a bunch refactoring in the NetworkReceiver ready to be posted as a PR 
>> that should fix this. 
>> 

Regarding this Jira
by default spark commits offsets to zookeeper every so seconds.
Even if you fix reconnect to kafka, we do not know from which offsets it will 
begin to consume.
So it would not recompute rdd as it should. It will receive arbitrary data. 
From the past, or from the future.
With high-level consumer we just do not have control over this.

Hith-level consumer should not be used in production with spark. Period.
Spark should use low-level consumer, control offsets and partition assignment 
deterministically. 
Like storm does.

>> Regarding the second problem, I have been thinking of adding flow control 
>> (i.e. limiting the rate of receiving) for a while, just havent gotten around 
>> to it. 
>> I added another JIRA for that for tracking this issue.
>> https://spark-project.atlassian.net/browse/SPARK-1341
>> 
>> 

I think if we have fixed kafka input like above. We can control such window 
automatically — like tcp window, slow start, and such.
But it will be great to have some fix available now anyway.


> 
> Thank you, i will participate and can provide testing of new code.
> Sorry for capslock, i just debugged this whole day, literally. 
> 
> 
>> TD
>> 
>> 
>> On Thu, Mar 27, 2014 at 3:23 PM, Evgeny Shishkin  
>> wrote:
>> 
>> On 28 Mar 2014, at 01:11, Scott Clasen  wrote:
>> 
>> > Evgeniy Shishkin wrote
>> >> So, at the bottom — kafka input stream just does not work.
>> >
>> >
>> > That was the conclusion I was coming to as well.  Are there open tickets
>> > around fixing this up?
>> >
>> 
>> I am not aware of such. Actually nobody complained on spark+kafka before.
>> So i thought it just works, and then we tried to build something on it and 
>> almost failed.
>> 
>> I think that it is possible to steal/replicate how twitter storm works with 
>> kafka.
>> They do manual partition assignment, at least this would help to balance 
>> load.
>> 
>> There is another issue.
>> ssc batch creates new rdds every batch duration, always, even it previous 
>> computation did not finish.
>> 
>> But with kafka, we can consume more rdds later, after we finish previous 
>> rdds.
>> That way it would be much much simpler to not get OOM’ed when starting from 
>> beginning,
>> because we can consume many data from kafka during batch duration and then 
>> get oom.
>> 
>> But we just can not start slow, can not limit how many to consume during 
>> batch.
>> 
>> 
>> >
>> > --
>> > View this message in context: 
>> > http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360p3379.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> 
> 



Re: KafkaInputDStream mapping of partitions to tasks

2014-03-27 Thread Scott Clasen
Thanks everyone for the discussion.

Just to note, I restarted the job yet again, and this time there are indeed
tasks being executed by both worker nodes. So the behavior does seem
inconsistent/broken atm.

Then I added a third node to the cluster, and a third executor came up, and
everything broke :|



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360p3391.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RDD[Array] question

2014-03-27 Thread Walrus theCat
Sup y'all,

If I have an RDD[Array], if I do some operation in the RDD, then each Array
is going to get instantiated on some individual machine, correct (or does
it spread it out?)

Thanks


Re: Running a task once on each executor

2014-03-27 Thread deenar.toraskar
Christopher

Sorry I might be missing the obvious, but how do i get my function called on
all Executors used by the app? I dont want to use RDDs unless necessary.

once I start my shell or app, how do I get
TaskNonce.getSingleton().doThisOnce() executed on each executor?

@dmpour 
>>rdd.mapPartitions and it would still work as code would only be executed
once in each VM, but was wondering if there is more efficient way of doing
this by using a generated RDD with one partition per executor. 
This remark was misleading, what I meant was that in conjunction with the
TaskNonce pattern, my function would be called only once per executor as
long as the RDD had atleast one partition on each executor

Deenar





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-a-task-once-on-each-executor-tp3203p3393.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: KafkaInputDStream mapping of partitions to tasks

2014-03-27 Thread Evgeny Shishkin

On 28 Mar 2014, at 02:10, Scott Clasen  wrote:

> Thanks everyone for the discussion.
> 
> Just to note, I restarted the job yet again, and this time there are indeed
> tasks being executed by both worker nodes. So the behavior does seem
> inconsistent/broken atm.
> 
> Then I added a third node to the cluster, and a third executor came up, and
> everything broke :|
> 
> 

This is kafka’s high-level consumer. Try to raise rebalance retries.

Also, as this consumer is threaded, it have some protection against this 
failure - first it waits some time, and then rebalances.
But for spark cluster i think this time is not enough.
If there was a way to wait every spark executor to start, rebalance, and only 
when start to consume, this issue would be less visible.   



> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/KafkaInputDStream-mapping-of-partitions-to-tasks-tp3360p3391.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Run spark on mesos remotely

2014-03-27 Thread Wush Wu
Dear Rustagi,

Thanks for you response.

As far as I know, the DAG scheduler should be a part of spark. Therefore,
should I do something not mentioned in
http://spark.incubator.apache.org/docs/0.8.1/running-on-mesos.html to
launch the DAG scheduler?

By the way, I also notice that the user of the submission becomes the
account on my machine. I doubt that there is a permission issue, so the
executor cannot copy the file from spark.execute.uri. After changing the
account to the same username on the mesos cluster, the executor
successfully copy the file from  spark.execute.uri, but it reports the
following error message:

log4j:WARN No appenders could be found for logger
(org.apache.spark.executor.MesosExecutorBackend).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
org.apache.spark.SparkException: Error sending message to
BlockManagerMaster [message =
RegisterBlockManager(BlockManagerId(201403250945-3657629962-5050-10180-16,
pc104, 42356, 0),339585269,Actor[akka://spark/user/BlockManagerActor1])]
 at
org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:174)
at
org.apache.spark.storage.BlockManagerMaster.tell(BlockManagerMaster.scala:139)
 at
org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:57)
at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:127)
 at org.apache.spark.storage.BlockManager.(BlockManager.scala:105)
at org.apache.spark.storage.BlockManager.(BlockManager.scala:119)
 at
org.apache.spark.SparkEnv$.createFromSystemProperties(SparkEnv.scala:171)
at org.apache.spark.executor.Executor.(Executor.scala:111)
 at
org.apache.spark.executor.MesosExecutorBackend.registered(MesosExecutorBackend.scala:58)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[1] milliseconds
 at akka.dispatch.DefaultPromise.ready(Future.scala:870)
at akka.dispatch.DefaultPromise.result(Future.scala:874)
 at akka.dispatch.Await$.result(Future.scala:74)
at
org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:160)
 ... 8 more
Exception in thread "Thread-0"

Is there any suggestion to handle the error above?

Thanks,
Wush


2014-03-28 4:09 GMT+08:00 Mayur Rustagi :

> Yes but you have to maintain connection of that machine to the master
> cluster as the driver with DAG scheduler runs on that machine.
> Regards
> Mayur
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi 
>
>
>
> On Thu, Mar 27, 2014 at 4:09 AM, Wush Wu  wrote:
>
>> Dear all,
>>
>> We have a spark 0.8.1 cluster on mesos 0.15. It works if I submit the job
>> from the master of mesos. That is to say, I spawn the spark shell or launch
>> the scala application on the master of mesos.
>>
>> However, when I submit the job from another machine, the job will lost.
>> The logs shows that the mesos does not copy the
>> spark-0.8.1-incubating.tar.gz to the temporal working directory, so the job
>> lost immediately. Is it possible to submit the job from the machine not
>> belong to mesos cluster?
>>
>> Thanks!
>>
>> Wush
>>
>
>


Using ProtoBuf 2.5 for messages with Spark Streaming

2014-03-27 Thread Kanwaldeep
We are using Protocol Buffer 2.5 to send messages to Spark Streaming 0.9 with
Kafka stream setup. I have protocol Buffer 2.5 part of the uber jar deployed
on each of the spark worker nodes.  
The message is compiled using 2.5 but then on runtime it is being
de-serialized by 2.4.1 as I'm getting the following exception

java.lang.VerifyError (java.lang.VerifyError: class
com.snc.sinet.messages.XServerMessage$XServer overrides final method
getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet;)
java.lang.ClassLoader.defineClass1(Native Method)
java.lang.ClassLoader.defineClassCond(ClassLoader.java:631)
java.lang.ClassLoader.defineClass(ClassLoader.java:615)
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:141)

Suggestions on how I could still use ProtoBuf 2.5. Based on the article -
https://spark-project.atlassian.net/browse/SPARK-995 we should be able to
use different version of protobuf in the application.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-ProtoBuf-2-5-for-messages-with-Spark-Streaming-tp3396.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: pySpark memory usage

2014-03-27 Thread Jim Blomo
Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT built for Hadoop
1.0.4" from GitHub on 2014-03-18.

I tried batchSizes of 512, 10, and 1 and each got me further but none
have succeeded.

I can get this to work -- with manual interventions -- if I omit
`parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set batchSize=1.  5
of the 175 executors hung, and I had to kill the python process to get
things going again.  The only indication of this in the logs was `INFO
python.PythonRDD: stdin writer to Python finished early`.

With batchSize=1 and persist, a new memory error came up in several
tasks, before the app was failed:

14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
thread Thread[stdin writer for python,5,main]
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOfRange(Arrays.java:2694)
at java.lang.String.(String.java:203)
at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
at java.nio.CharBuffer.toString(CharBuffer.java:1201)
at org.apache.hadoop.io.Text.decode(Text.java:350)
at org.apache.hadoop.io.Text.decode(Text.java:327)
at org.apache.hadoop.io.Text.toString(Text.java:254)
at 
org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
at 
org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)

There are other exceptions, but I think they all stem from the above,
eg. org.apache.spark.SparkException: Error sending message to
BlockManagerMaster

Let me know if there are other settings I should try, or if I should
try a newer snapshot.

Thanks again!


On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia  wrote:
> Hey Jim,
>
> In Spark 0.9 we added a "batchSize" parameter to PySpark that makes it group 
> multiple objects together before passing them between Java and Python, but 
> this may be too high by default. Try passing batchSize=10 to your 
> SparkContext constructor to lower it (the default is 1024). Or even 
> batchSize=1 to match earlier versions.
>
> Matei
>
> On Mar 21, 2014, at 6:18 PM, Jim Blomo  wrote:
>
>> Hi all, I'm wondering if there's any settings I can use to reduce the
>> memory needed by the PythonRDD when computing simple stats.  I am
>> getting OutOfMemoryError exceptions while calculating count() on big,
>> but not absurd, records.  It seems like PythonRDD is trying to keep
>> too many of these records in memory, when all that is needed is to
>> stream through them and count.  Any tips for getting through this
>> workload?
>>
>>
>> Code:
>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed data
>>
>> # the biggest individual text line is ~3MB
>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda (y,s):
>> (loads(y), loads(s)))
>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>>
>> parsed.count()
>> # will never finish: executor.Executor: Uncaught exception will FAIL
>> all executors
>>
>> Incidentally the whole app appears to be killed, but this error is not
>> propagated to the shell.
>>
>> Cluster:
>> 15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB)
>>
>> Exception:
>> java.lang.OutOfMemoryError: Java heap space
>>at 
>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>>at 
>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>>at 
>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>>at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>at 
>> org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>>at 
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>at 
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>>at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>>at 
>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>


Re: GC overhead limit exceeded

2014-03-27 Thread Sai Prasanna
I dint mention anything, so by default it should be MEMORY_AND_DISK right?

My doubt was, between two different experiments, are the RDDs cached in
memory need to be unpersisted???
Or it doesnt matter ?


On Fri, Mar 28, 2014 at 1:43 AM, Syed A. Hashmi wrote:

> Which storage scheme are you using? I am guessing it is MEMORY_ONLY. In
> large datasets, MEMORY_AND_DISK or MEMORY_AND_DISK_SER work better.
>
> You can call unpersist on an RDD to remove it from Cache though.
>
>
> On Thu, Mar 27, 2014 at 11:57 AM, Sai Prasanna wrote:
>
>> No i am running on 0.8.1.
>> Yes i am caching a lot, i am benchmarking a simple code in spark where in
>> 512mb, 1g and 2g text files are taken, some basic intermediate operations
>> are done while the intermediate result which will be used in subsequent
>> operations are cached.
>>
>> I thought that, we need not manually unpersist, if i need to cache
>> something and if cache is found full, automatically space will be created
>> by evacuating the earlier. Do i need to unpersist?
>>
>> Moreover, if i run several times, will the previously cached RDDs still
>> remain in the cache? If so can i flush them manually out before the next
>> run? [something like complete cache flush]
>>
>>
>> On Thu, Mar 27, 2014 at 11:16 PM, Andrew Or wrote:
>>
>>> Are you caching a lot of RDD's? If so, maybe you should unpersist() the
>>> ones that you're not using. Also, if you're on 0.9, make sure
>>> spark.shuffle.spill is enabled (which it is by default). This allows your
>>> application to spill in-memory content to disk if necessary.
>>>
>>> How much memory are you giving to your executors? The default,
>>> spark.executor.memory is 512m, which is quite low. Consider raising this.
>>> Checking the web UI is a good way to figure out your runtime memory usage.
>>>
>>>
>>> On Thu, Mar 27, 2014 at 9:22 AM, Ognen Duzlevski <
>>> og...@plainvanillagames.com> wrote:
>>>
  Look at the tuning guide on Spark's webpage for strategies to cope
 with this.
 I have run into quite a few memory issues like these, some are resolved
 by changing the StorageLevel strategy and employing things like Kryo, some
 are solved by specifying the number of tasks to break down a given
 operation into etc.

 Ognen


 On 3/27/14, 10:21 AM, Sai Prasanna wrote:

 "java.lang.OutOfMemoryError: GC overhead limit exceeded"

  What is the problem. The same code, i run, one instance it runs in 8
 second, next time it takes really long time, say 300-500 seconds...
 I see the logs a lot of GC overhead limit exceeded is seen. What should
 be done ??

  Please can someone throw some light on it ??



  --
  *Sai Prasanna. AN*
 *II M.Tech (CS), SSSIHL*


 * Entire water in the ocean can never sink a ship, Unless it gets
 inside. All the pressures of life can never hurt you, Unless you let them
 in.*



>>>
>>
>>
>> --
>> *Sai Prasanna. AN*
>> *II M.Tech (CS), SSSIHL*
>>
>>
>> *Entire water in the ocean can never sink a ship, Unless it gets inside.
>> All the pressures of life can never hurt you, Unless you let them in.*
>>
>
>


-- 
*Sai Prasanna. AN*
*II M.Tech (CS), SSSIHL*


*Entire water in the ocean can never sink a ship, Unless it gets inside.All
the pressures of life can never hurt you, Unless you let them in.*


ArrayIndexOutOfBoundsException in ALS.implicit

2014-03-27 Thread bearrito
Usage of negative product id's causes the above exception.

The cause is the use of the product id's as a mechanism to index into the
the in and out block structures.

Specifically on 9.0 it occurs at
org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$makeInLinkBlock$2.apply(ALS.scala:262)

It seems reasonable to expect that product id's are positive, if a bit
opinionated.  I ran across this because the hash function I was using on my
product id's includes the negatives in it's range.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ArrayIndexOutOfBoundsException-in-ALS-implicit-tp3400.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: pySpark memory usage

2014-03-27 Thread Matei Zaharia
I see, did this also fail with previous versions of Spark (0.9 or 0.8)? We’ll 
try to look into these, seems like a serious error.

Matei

On Mar 27, 2014, at 7:27 PM, Jim Blomo  wrote:

> Thanks, Matei.  I am running "Spark 1.0.0-SNAPSHOT built for Hadoop
> 1.0.4" from GitHub on 2014-03-18.
> 
> I tried batchSizes of 512, 10, and 1 and each got me further but none
> have succeeded.
> 
> I can get this to work -- with manual interventions -- if I omit
> `parsed.persist(StorageLevel.MEMORY_AND_DISK)` and set batchSize=1.  5
> of the 175 executors hung, and I had to kill the python process to get
> things going again.  The only indication of this in the logs was `INFO
> python.PythonRDD: stdin writer to Python finished early`.
> 
> With batchSize=1 and persist, a new memory error came up in several
> tasks, before the app was failed:
> 
> 14/03/28 01:51:15 ERROR executor.Executor: Uncaught exception in
> thread Thread[stdin writer for python,5,main]
> java.lang.OutOfMemoryError: Java heap space
>at java.util.Arrays.copyOfRange(Arrays.java:2694)
>at java.lang.String.(String.java:203)
>at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561)
>at java.nio.CharBuffer.toString(CharBuffer.java:1201)
>at org.apache.hadoop.io.Text.decode(Text.java:350)
>at org.apache.hadoop.io.Text.decode(Text.java:327)
>at org.apache.hadoop.io.Text.toString(Text.java:254)
>at 
> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>at 
> org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:349)
>at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>at scala.collection.Iterator$$anon$12.next(Iterator.scala:357)
>at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>at 
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:242)
>at 
> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
> 
> There are other exceptions, but I think they all stem from the above,
> eg. org.apache.spark.SparkException: Error sending message to
> BlockManagerMaster
> 
> Let me know if there are other settings I should try, or if I should
> try a newer snapshot.
> 
> Thanks again!
> 
> 
> On Mon, Mar 24, 2014 at 9:35 AM, Matei Zaharia  
> wrote:
>> Hey Jim,
>> 
>> In Spark 0.9 we added a "batchSize" parameter to PySpark that makes it group 
>> multiple objects together before passing them between Java and Python, but 
>> this may be too high by default. Try passing batchSize=10 to your 
>> SparkContext constructor to lower it (the default is 1024). Or even 
>> batchSize=1 to match earlier versions.
>> 
>> Matei
>> 
>> On Mar 21, 2014, at 6:18 PM, Jim Blomo  wrote:
>> 
>>> Hi all, I'm wondering if there's any settings I can use to reduce the
>>> memory needed by the PythonRDD when computing simple stats.  I am
>>> getting OutOfMemoryError exceptions while calculating count() on big,
>>> but not absurd, records.  It seems like PythonRDD is trying to keep
>>> too many of these records in memory, when all that is needed is to
>>> stream through them and count.  Any tips for getting through this
>>> workload?
>>> 
>>> 
>>> Code:
>>> session = sc.textFile('s3://...json.gz') # ~54GB of compressed data
>>> 
>>> # the biggest individual text line is ~3MB
>>> parsed = session.map(lambda l: l.split("\t",1)).map(lambda (y,s):
>>> (loads(y), loads(s)))
>>> parsed.persist(StorageLevel.MEMORY_AND_DISK)
>>> 
>>> parsed.count()
>>> # will never finish: executor.Executor: Uncaught exception will FAIL
>>> all executors
>>> 
>>> Incidentally the whole app appears to be killed, but this error is not
>>> propagated to the shell.
>>> 
>>> Cluster:
>>> 15 m2.xlarges (17GB memory, 17GB swap, spark.executor.memory=10GB)
>>> 
>>> Exception:
>>> java.lang.OutOfMemoryError: Java heap space
>>>   at 
>>> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:132)
>>>   at 
>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:120)
>>>   at 
>>> org.apache.spark.api.python.PythonRDD$$anon$1.next(PythonRDD.scala:113)
>>>   at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>   at 
>>> org.apache.spark.api.python.PythonRDD$$anon$1.foreach(PythonRDD.scala:113)
>>>   at 
>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>   at 
>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>   at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:94)
>>>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:220)
>>>   at 
>>> org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:85)
>> 



Replicating RDD elements

2014-03-27 Thread David Thomas
How can we replicate RDD elements? Say I have 1 element and 100 nodes in
the cluster. I need to replicate this one item on all the nodes i.e.
effectively create an RDD of 100 elements.


Re: Configuring shuffle write directory

2014-03-27 Thread Tsai Li Ming
Anyone can help?

How can I configure a different spark.local.dir for each executor?


On 23 Mar, 2014, at 12:11 am, Tsai Li Ming  wrote:

> Hi,
> 
> Each of my worker node has its own unique spark.local.dir.
> 
> However, when I run spark-shell, the shuffle writes are always written to 
> /tmp despite being set when the worker node is started.
> 
> By specifying the spark.local.dir for the driver program, it seems to 
> override the executor? Is there a way to properly define it in the worker 
> node?
> 
> Thanks!



Re: Configuring shuffle write directory

2014-03-27 Thread Matei Zaharia
Yes, the problem is that the driver program is overriding it. Have you set it 
manually in the driver? Or how did you try setting it in workers? You should 
set it by adding

export SPARK_JAVA_OPTS=“-Dspark.local.dir=whatever”

to conf/spark-env.sh on those workers.

Matei

On Mar 27, 2014, at 9:04 PM, Tsai Li Ming  wrote:

> Anyone can help?
> 
> How can I configure a different spark.local.dir for each executor?
> 
> 
> On 23 Mar, 2014, at 12:11 am, Tsai Li Ming  wrote:
> 
>> Hi,
>> 
>> Each of my worker node has its own unique spark.local.dir.
>> 
>> However, when I run spark-shell, the shuffle writes are always written to 
>> /tmp despite being set when the worker node is started.
>> 
>> By specifying the spark.local.dir for the driver program, it seems to 
>> override the executor? Is there a way to properly define it in the worker 
>> node?
>> 
>> Thanks!
> 



Re: Configuring shuffle write directory

2014-03-27 Thread Tsai Li Ming
Yes, I have tried that by adding it to the Worker. I can see the 
"app-20140328124540-000” in the local spark directory of the worker.

But the “spark-local” directories are always written to /tmp since is the 
default spark.local.dir is taken from java.io.tempdir?



On 28 Mar, 2014, at 12:42 pm, Matei Zaharia  wrote:

> Yes, the problem is that the driver program is overriding it. Have you set it 
> manually in the driver? Or how did you try setting it in workers? You should 
> set it by adding
> 
> export SPARK_JAVA_OPTS=“-Dspark.local.dir=whatever”
> 
> to conf/spark-env.sh on those workers.
> 
> Matei
> 
> On Mar 27, 2014, at 9:04 PM, Tsai Li Ming  wrote:
> 
>> Anyone can help?
>> 
>> How can I configure a different spark.local.dir for each executor?
>> 
>> 
>> On 23 Mar, 2014, at 12:11 am, Tsai Li Ming  wrote:
>> 
>>> Hi,
>>> 
>>> Each of my worker node has its own unique spark.local.dir.
>>> 
>>> However, when I run spark-shell, the shuffle writes are always written to 
>>> /tmp despite being set when the worker node is started.
>>> 
>>> By specifying the spark.local.dir for the driver program, it seems to 
>>> override the executor? Is there a way to properly define it in the worker 
>>> node?
>>> 
>>> Thanks!
>> 
> 



Re: Configuring shuffle write directory

2014-03-27 Thread Matei Zaharia
I see, are you sure that was in spark-env.sh instead of spark-env.sh.template? 
You need to copy it to just a .sh file. Also make sure the file is executable.

Try doing println(sc.getConf.toDebugString) in your driver program and seeing 
what properties it prints. As far as I can tell, spark.local.dir should *not* 
be set there, so workers should get it from their spark-env.sh. It’s true that 
if you set spark.local.dir in the driver it would pass that on to the workers 
for that job.

Matei

On Mar 27, 2014, at 9:57 PM, Tsai Li Ming  wrote:

> Yes, I have tried that by adding it to the Worker. I can see the 
> "app-20140328124540-000” in the local spark directory of the worker.
> 
> But the “spark-local” directories are always written to /tmp since is the 
> default spark.local.dir is taken from java.io.tempdir?
> 
> 
> 
> On 28 Mar, 2014, at 12:42 pm, Matei Zaharia  wrote:
> 
>> Yes, the problem is that the driver program is overriding it. Have you set 
>> it manually in the driver? Or how did you try setting it in workers? You 
>> should set it by adding
>> 
>> export SPARK_JAVA_OPTS=“-Dspark.local.dir=whatever”
>> 
>> to conf/spark-env.sh on those workers.
>> 
>> Matei
>> 
>> On Mar 27, 2014, at 9:04 PM, Tsai Li Ming  wrote:
>> 
>>> Anyone can help?
>>> 
>>> How can I configure a different spark.local.dir for each executor?
>>> 
>>> 
>>> On 23 Mar, 2014, at 12:11 am, Tsai Li Ming  wrote:
>>> 
 Hi,
 
 Each of my worker node has its own unique spark.local.dir.
 
 However, when I run spark-shell, the shuffle writes are always written to 
 /tmp despite being set when the worker node is started.
 
 By specifying the spark.local.dir for the driver program, it seems to 
 override the executor? Is there a way to properly define it in the worker 
 node?
 
 Thanks!
>>> 
>> 
> 



Setting SPARK_MEM higher than available memory in driver

2014-03-27 Thread Tsai Li Ming
Hi,

My worker nodes have more memory than the host that I’m submitting my driver 
program, but it seems that SPARK_MEM is also setting the Xmx of the spark shell?

$ SPARK_MEM=100g MASTER=spark://XXX:7077 bin/spark-shell

Java HotSpot(TM) 64-Bit Server VM warning: INFO: 
os::commit_memory(0x7f736e13, 205634994176, 0) failed; error='Cannot 
allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (malloc) failed to allocate 205634994176 bytes for 
committing reserved memory.

I want to allocate at least 100GB of memory per executor. The allocated memory 
on the executor seems to depend on the -Xmx heap size of the driver?

Thanks!





Re: Running a task once on each executor

2014-03-27 Thread Christopher Nguyen
Deenar, yes, you may indeed be overthinking it a bit, about how Spark
executes maps/filters etc. I'll focus on the high-order bits so it's clear.

Let's assume you're doing this in Java. Then you'd pass some
*MyMapper*instance to J
*avaRDD#map(myMapper)*.

So you'd have a class *MyMapper extends Function*. The
*call()* method of that class is effectively the function that will be
executed by the workers on your RDD's rows.

Within that *MyMapper#call()*, you can access static members and methods of
*MyMapper* itself. You could implement your *runOnce() *there.



--
Christopher T. Nguyen
Co-founder & CEO, Adatao 
linkedin.com/in/ctnguyen



On Thu, Mar 27, 2014 at 4:20 PM, deenar.toraskar wrote:

> Christopher
>
> Sorry I might be missing the obvious, but how do i get my function called
> on
> all Executors used by the app? I dont want to use RDDs unless necessary.
>
> once I start my shell or app, how do I get
> TaskNonce.getSingleton().doThisOnce() executed on each executor?
>
> @dmpour
> >>rdd.mapPartitions and it would still work as code would only be executed
> once in each VM, but was wondering if there is more efficient way of doing
> this by using a generated RDD with one partition per executor.
> This remark was misleading, what I meant was that in conjunction with the
> TaskNonce pattern, my function would be called only once per executor as
> long as the RDD had atleast one partition on each executor
>
> Deenar
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Running-a-task-once-on-each-executor-tp3203p3393.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Setting SPARK_MEM higher than available memory in driver

2014-03-27 Thread Aaron Davidson
Assuming you're using a new enough version of Spark, you should use
spark.executor.memory to set the memory for your executors, without
changing the driver memory. See the docs for your version of Spark.


On Thu, Mar 27, 2014 at 10:48 PM, Tsai Li Ming wrote:

> Hi,
>
> My worker nodes have more memory than the host that I'm submitting my
> driver program, but it seems that SPARK_MEM is also setting the Xmx of the
> spark shell?
>
> $ SPARK_MEM=100g MASTER=spark://XXX:7077 bin/spark-shell
>
> Java HotSpot(TM) 64-Bit Server VM warning: INFO:
> os::commit_memory(0x7f736e13, 205634994176, 0) failed;
> error='Cannot allocate memory' (errno=12)
> #
> # There is insufficient memory for the Java Runtime Environment to
> continue.
> # Native memory allocation (malloc) failed to allocate 205634994176 bytes
> for committing reserved memory.
>
> I want to allocate at least 100GB of memory per executor. The allocated
> memory on the executor seems to depend on the -Xmx heap size of the driver?
>
> Thanks!
>
>
>
>


Re: Setting SPARK_MEM higher than available memory in driver

2014-03-27 Thread Tsai Li Ming
Thanks. Got it working.

On 28 Mar, 2014, at 2:02 pm, Aaron Davidson  wrote:

> Assuming you're using a new enough version of Spark, you should use 
> spark.executor.memory to set the memory for your executors, without changing 
> the driver memory. See the docs for your version of Spark.
> 
> 
> On Thu, Mar 27, 2014 at 10:48 PM, Tsai Li Ming  wrote:
> Hi,
> 
> My worker nodes have more memory than the host that I’m submitting my driver 
> program, but it seems that SPARK_MEM is also setting the Xmx of the spark 
> shell?
> 
> $ SPARK_MEM=100g MASTER=spark://XXX:7077 bin/spark-shell
> 
> Java HotSpot(TM) 64-Bit Server VM warning: INFO: 
> os::commit_memory(0x7f736e13, 205634994176, 0) failed; error='Cannot 
> allocate memory' (errno=12)
> #
> # There is insufficient memory for the Java Runtime Environment to continue.
> # Native memory allocation (malloc) failed to allocate 205634994176 bytes for 
> committing reserved memory.
> 
> I want to allocate at least 100GB of memory per executor. The allocated 
> memory on the executor seems to depend on the -Xmx heap size of the driver?
> 
> Thanks!
> 
> 
> 
> 



Re: GC overhead limit exceeded

2014-03-27 Thread Sai Prasanna
Oh sorry, that was a mistake, the default level is MEMORY_ONLY !!
My doubt was, between two different experiments, are the RDDs cached in
memory need to be unpersisted???
Or it doesnt matter ?


Re: Configuring shuffle write directory

2014-03-27 Thread Tsai Li Ming

Hi,

Thanks! I found out that I wasn’t setting the SPARK_JAVA_OPTS correctly..

I took a look at the process table and saw that the 
“org.apache.spark.executor.CoarseGrainedExecutorBackend” didn’t have the 
-Dspark.local.dir set.




On 28 Mar, 2014, at 1:05 pm, Matei Zaharia  wrote:

> I see, are you sure that was in spark-env.sh instead of 
> spark-env.sh.template? You need to copy it to just a .sh file. Also make sure 
> the file is executable.
> 
> Try doing println(sc.getConf.toDebugString) in your driver program and seeing 
> what properties it prints. As far as I can tell, spark.local.dir should *not* 
> be set there, so workers should get it from their spark-env.sh. It’s true 
> that if you set spark.local.dir in the driver it would pass that on to the 
> workers for that job.
> 
> Matei
> 
> On Mar 27, 2014, at 9:57 PM, Tsai Li Ming  wrote:
> 
>> Yes, I have tried that by adding it to the Worker. I can see the 
>> "app-20140328124540-000” in the local spark directory of the worker.
>> 
>> But the “spark-local” directories are always written to /tmp since is the 
>> default spark.local.dir is taken from java.io.tempdir?
>> 
>> 
>> 
>> On 28 Mar, 2014, at 12:42 pm, Matei Zaharia  wrote:
>> 
>>> Yes, the problem is that the driver program is overriding it. Have you set 
>>> it manually in the driver? Or how did you try setting it in workers? You 
>>> should set it by adding
>>> 
>>> export SPARK_JAVA_OPTS=“-Dspark.local.dir=whatever”
>>> 
>>> to conf/spark-env.sh on those workers.
>>> 
>>> Matei
>>> 
>>> On Mar 27, 2014, at 9:04 PM, Tsai Li Ming  wrote:
>>> 
 Anyone can help?
 
 How can I configure a different spark.local.dir for each executor?
 
 
 On 23 Mar, 2014, at 12:11 am, Tsai Li Ming  wrote:
 
> Hi,
> 
> Each of my worker node has its own unique spark.local.dir.
> 
> However, when I run spark-shell, the shuffle writes are always written to 
> /tmp despite being set when the worker node is started.
> 
> By specifying the spark.local.dir for the driver program, it seems to 
> override the executor? Is there a way to properly define it in the worker 
> node?
> 
> Thanks!
 
>>> 
>> 
>