Re: NoSuchMethodError: breeze.linalg.DenseMatrix

2014-05-15 Thread wxhsdp
Hi, DB

  i tried including breeze library by using spark 1.0, it works. but how can
i call
  the native library in standalone cluster mode.

  in local mode
  1. i include org.scalanlp % breeze-natives_2.10 % 0.7 dependency in
sbt build file
  2. i install openblas
  it works

  in standalone mode
  1. i include org.scalanlp % breeze-natives_2.10 % 0.7 dependency in
sbt build file
  2. install openblas in workers
  3. add sepatate jars using sc.addJar(). jars: breeze-natives_2.10-0.7.jar,
netlib-native_ref-linux-
  x86_64-1.1-natives.jar,
netlib-native_system-linux-x86_64-1.1-natives.jar
  4. i also include classpath of the above jars
  but does not work:(
  


DB Tsai-2 wrote
 Hi Wxhsdp,
 
 I also have some difficulties witth sc.addJar(). Since we include the
 breeze library by using Spark 1.0, we don't have the problem you ran into.
 However, when we add external jars via sc.addJar(), I found that the
 executors actually fetch the jars but the classloader still doesn't honor
 it. I'm trying to figure out the problem now.
 
 
 Sincerely,
 
 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai
 
 
 On Wed, May 14, 2014 at 5:46 AM, wxhsdp lt;

 wxhsdp@

 gt; wrote:
 
 Hi, DB
   i've add breeze jars to workers using sc.addJar()
   breeze jars include :
   breeze-natives_2.10-0.7.jar
   breeze-macros_2.10-0.3.jar
   breeze-macros_2.10-0.3.1.jar
   breeze_2.10-0.8-SNAPSHOT.jar
   breeze_2.10-0.7.jar

   almost all the jars about breeze i can find, but still
 NoSuchMethodError:
 breeze.linalg.DenseMatrix

   from the executor stderr, you can see the executor successsully fetches
 these jars, what's wrong
   about my method? thank you!

 14/05/14 20:36:02 INFO Executor: Fetching
 http://192.168.0.106:42883/jars/breeze-natives_2.10-0.7.jar with
 timestamp
 1400070957376
 14/05/14 20:36:02 INFO Utils: Fetching
 http://192.168.0.106:42883/jars/breeze-natives_2.10-0.7.jar to
 /tmp/fetchFileTemp7468892065227766972.tmp
 14/05/14 20:36:02 INFO Executor: Adding

 file:/home/wxhsdp/spark/spark/tags/v1.0.0-rc3/work/app-20140514203557-/0/./breeze-natives_2.10-0.7.jar
 to class loader
 14/05/14 20:36:02 INFO Executor: Fetching
 http://192.168.0.106:42883/jars/breeze-macros_2.10-0.3.jar with timestamp
 1400070957441
 14/05/14 20:36:02 INFO Utils: Fetching
 http://192.168.0.106:42883/jars/breeze-macros_2.10-0.3.jar to
 /tmp/fetchFileTemp2324565598765584917.tmp
 14/05/14 20:36:02 INFO Executor: Adding

 file:/home/wxhsdp/spark/spark/tags/v1.0.0-rc3/work/app-20140514203557-/0/./breeze-macros_2.10-0.3.jar
 to class loader
 14/05/14 20:36:02 INFO Executor: Fetching
 http://192.168.0.106:42883/jars/breeze_2.10-0.8-SNAPSHOT.jar with
 timestamp
 1400070957358
 14/05/14 20:36:02 INFO Utils: Fetching
 http://192.168.0.106:42883/jars/breeze_2.10-0.8-SNAPSHOT.jar to
 /tmp/fetchFileTemp8730123100104850193.tmp
 14/05/14 20:36:02 INFO Executor: Adding

 file:/home/wxhsdp/spark/spark/tags/v1.0.0-rc3/work/app-20140514203557-/0/./breeze_2.10-0.8-SNAPSHOT.jar
 to class loader
 14/05/14 20:36:02 INFO Executor: Fetching
 http://192.168.0.106:42883/jars/breeze-macros_2.10-0.3.1.jar with
 timestamp
 1400070957414
 14/05/14 20:36:02 INFO Utils: Fetching
 http://192.168.0.106:42883/jars/breeze-macros_2.10-0.3.1.jar to
 /tmp/fetchFileTemp3473404556989515218.tmp
 14/05/14 20:36:02 INFO Executor: Adding

 file:/home/wxhsdp/spark/spark/tags/v1.0.0-rc3/work/app-20140514203557-/0/./breeze-macros_2.10-0.3.1.jar
 to class loader
 14/05/14 20:36:02 INFO Executor: Fetching
 http://192.168.0.106:42883/jars/build-project_2.10-1.0.jar with timestamp
 1400070956753
 14/05/14 20:36:02 INFO Utils: Fetching
 http://192.168.0.106:42883/jars/build-project_2.10-1.0.jar to
 /tmp/fetchFileTemp1289055585501269156.tmp
 14/05/14 20:36:02 INFO Executor: Adding

 file:/home/wxhsdp/spark/spark/tags/v1.0.0-rc3/work/app-20140514203557-/0/./build-project_2.10-1.0.jar
 to class loader
 14/05/14 20:36:02 INFO Executor: Fetching
 http://192.168.0.106:42883/jars/breeze_2.10-0.7.jar with timestamp
 1400070957228
 14/05/14 20:36:02 INFO Utils: Fetching
 http://192.168.0.106:42883/jars/breeze_2.10-0.7.jar to
 /tmp/fetchFileTemp1287317286108432726.tmp
 14/05/14 20:36:02 INFO Executor: Adding

 file:/home/wxhsdp/spark/spark/tags/v1.0.0-rc3/work/app-20140514203557-/0/./breeze_2.10-0.7.jar
 to class loader


 DB Tsai-2 wrote
  Since the breeze jar is brought into spark by mllib package, you may
 want
  to add mllib as your dependency in spark 1.0. For bring it from your
  application yourself, you can either use sbt assembly in ur build
 project
  to generate a flat myApp-assembly.jar which contains breeze jar, or use
  spark add jar api like Yadid said.
 
 
  Sincerely,
 
  DB Tsai
  ---
  My Blog: https://www.dbtsai.com
  LinkedIn: https://www.linkedin.com/in/dbtsai
 
 
  On Sun, May 4, 2014 at 10:24 PM, wxhsdp lt;

  

A new resource for getting examples of Spark RDD API calls

2014-05-15 Thread zhen
Hi Everyone,

I found it quite difficult to find good examples for Spark RDD API calls. So
my student and I decided to go through the entire API and write examples for
the vast majority of API calls (basically examples for anything that is
remotely interesting). I think these examples maybe useful to other people.
Hence I have put them up on my web site. There is also a pdf version that
you can download from the web site.

http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

Please let me know if you find any errors in them. Or any better examples
you would like me to add into it.

Hope you find it useful.

Zhen



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/A-new-resource-for-getting-examples-of-Spark-RDD-API-calls-tp5529.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


same log4j slf4j error in spark 9.1

2014-05-15 Thread Adrian Mocanu
I recall someone from the Spark team (TD?) saying that Spark 9.1 will change 
the logger and the circular loop error between slf4j and log4j wouldn't show up.

Yet on Spark 9.1 I still get
SLF4J: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class 
path, preempting StackOverflowError.
SLF4J: See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more 
details.

Any solutions?

-Adrian


Preferred RDD Size

2014-05-15 Thread Sai Prasanna
Hi,

Is there any lower-bound on the size of RDD to optimally utilize the
in-memory framework Spark.
Say creating RDD for very small data set of some 64 MB is not as efficient
as that of some 256 MB, then accordingly the application can be tuned.

So is there a soft-lowerbound related to hadoop-block size or something
else ?

Thanks in Advance !


Re: No space left on device error when pulling data from s3

2014-05-15 Thread darkjh
Set `hadoop.tmp.dir` in `spark-env.sh` solved the problem. Spark job no
longer writes tmp files in /tmp/hadoop-root/.

  SPARK_JAVA_OPTS+= -Dspark.local.dir=/mnt/spark,/mnt2/spark
-Dhadoop.tmp.dir=/mnt/ephemeral-hdfs
  export SPARK_JAVA_OPTS

I'm wondering if we need to permanently add this in the spark-ec2 script.
Writing lots of tmp files in the 8GB `/` is not a great idea.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/No-space-left-on-device-error-when-pulling-data-from-s3-tp5450p5518.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How to use Mahout VectorWritable in Spark.

2014-05-15 Thread Dmitriy Lyubimov
PS spark shell with all proper imports are also supported natively in
Mahout (mahout spark-shell command). See M-1489 for specifics. There's also
a tutorial somewhere but i suspect it has not been yet finished/publised
via public link yet. Again, you need trunk to use spark shell there.


On Wed, May 14, 2014 at 12:43 AM, Stuti Awasthi stutiawas...@hcl.comwrote:

 Hi Xiangrui,
 Thanks for the response .. I tried few ways to include mahout-math jar
 while launching Spark shell.. but no success.. Can you please point what I
 am doing wrong

 1. mahout-math.jar exported in CLASSPATH, and PATH
 2. Tried Launching Spark Shell by :  MASTER=spark://HOSTNAME:PORT
 ADD_JARS=~/installations/work-space/mahout-math-0.7.jar
 park-0.9.0/bin/spark-shell

  After launching, I checked the environment details on WebUi: It looks
 like mahout-math jar is included.
 spark.jars  /home/hduser/installations/work-space/mahout-math-0.7.jar

 Then I try :
 scala import org.apache.mahout.math.VectorWritable
 console:10: error: object mahout is not a member of package org.apache
import org.apache.mahout.math.VectorWritable

 scala val raw = sc.sequenceFile(path, classOf[Text],
 classOf[VectorWritable])
 console:12: error: not found: type Text
val data =
 sc.sequenceFile(/stuti/ML/Clustering/KMeans/HAR/KMeans_dataset_seq/part-r-0,
 classOf[Text], classOf[VectorWritable])

^
 Im using Spark 0.9 and Hadoop 1.0.4 and Mahout 0.7

 Thanks
 Stuti



 -Original Message-
 From: Xiangrui Meng [mailto:men...@gmail.com]
 Sent: Wednesday, May 14, 2014 11:56 AM
 To: user@spark.apache.org
 Subject: Re: How to use Mahout VectorWritable in Spark.

 You need

  val raw = sc.sequenceFile(path, classOf[Text],
  classOf[VectorWriteable])

 to load the data. After that, you can do

  val data = raw.values.map(_.get)

 To get an RDD of mahout's Vector. You can use `--jar mahout-math.jar` when
 you launch spark-shell to include mahout-math.

 Best,
 Xiangrui

 On Tue, May 13, 2014 at 10:37 PM, Stuti Awasthi stutiawas...@hcl.com
 wrote:
  Hi All,
 
  I am very new to Spark and trying to play around with Mllib hence
  apologies for the basic question.
 
 
 
  I am trying to run KMeans algorithm using Mahout and Spark MLlib to
  see the performance. Now initial datasize was 10 GB. Mahout converts
  the data in Sequence File Text,VectorWritable which is used for KMeans
 Clustering.
  The Sequence File crated was ~ 6GB in size.
 
 
 
  Now I wanted if I can use the Mahout Sequence file to be executed in
  Spark MLlib for KMeans . I have read that SparkContext.sequenceFile
  may be used here. Hence I tried to read my sequencefile as below but
 getting the error :
 
 
 
  Command on Spark Shell :
 
  scala val data = sc.sequenceFile[String,VectorWritable](/
  KMeans_dataset_seq/part-r-0,String,VectorWritable)
 
  console:12: error: not found: type VectorWritable
 
 val data = sc.sequenceFile[String,VectorWritable](
  /KMeans_dataset_seq/part-r-0,String,VectorWritable)
 
 
 
  Here I have 2 ques:
 
  1.  Mahout has “Text” as Key but Spark is printing “not found: type:Text”
  hence I changed it to String.. Is this correct ???
 
  2. How will VectorWritable be found in Spark. Do I need to include
  Mahout jar in Classpath or any other option ??
 
 
 
  Please Suggest
 
 
 
  Regards
 
  Stuti Awasthi
 
 
 
  ::DISCLAIMER::
  --
  --
  
 
  The contents of this e-mail and any attachment(s) are confidential and
  intended for the named recipient(s) only.
  E-mail transmission is not guaranteed to be secure or error-free as
  information could be intercepted, corrupted, lost, destroyed, arrive
  late or incomplete, or may contain viruses in transmission. The e mail
  and its contents (with or without referred errors) shall therefore not
  attach any liability on the originator or HCL or its affiliates.
  Views or opinions, if any, presented in this email are solely those of
  the author and may not necessarily reflect the views or opinions of
  HCL or its affiliates. Any form of reproduction, dissemination,
  copying, disclosure, modification, distribution and / or publication
  of this message without the prior written consent of authorized
  representative of HCL is strictly prohibited. If you have received
  this email in error please delete it and notify the sender
  immediately.
  Before opening any email and/or attachments, please check them for
  viruses and other defects.
 
  --
  --
  



Re: How to use Mahout VectorWritable in Spark.

2014-05-15 Thread Dmitriy Lyubimov
PPS The shell/spark tutorial i've mentioned is actually being developed in
MAHOUT-1542. As it stands, i believe it is now complete in its core.


On Wed, May 14, 2014 at 5:48 PM, Dmitriy Lyubimov dlie...@gmail.com wrote:

 PS spark shell with all proper imports are also supported natively in
 Mahout (mahout spark-shell command). See M-1489 for specifics. There's also
 a tutorial somewhere but i suspect it has not been yet finished/publised
 via public link yet. Again, you need trunk to use spark shell there.


 On Wed, May 14, 2014 at 12:43 AM, Stuti Awasthi stutiawas...@hcl.comwrote:

 Hi Xiangrui,
 Thanks for the response .. I tried few ways to include mahout-math jar
 while launching Spark shell.. but no success.. Can you please point what I
 am doing wrong

 1. mahout-math.jar exported in CLASSPATH, and PATH
 2. Tried Launching Spark Shell by :  MASTER=spark://HOSTNAME:PORT
 ADD_JARS=~/installations/work-space/mahout-math-0.7.jar
 park-0.9.0/bin/spark-shell

  After launching, I checked the environment details on WebUi: It looks
 like mahout-math jar is included.
 spark.jars  /home/hduser/installations/work-space/mahout-math-0.7.jar

 Then I try :
 scala import org.apache.mahout.math.VectorWritable
 console:10: error: object mahout is not a member of package org.apache
import org.apache.mahout.math.VectorWritable

 scala val raw = sc.sequenceFile(path, classOf[Text],
 classOf[VectorWritable])
 console:12: error: not found: type Text
val data =
 sc.sequenceFile(/stuti/ML/Clustering/KMeans/HAR/KMeans_dataset_seq/part-r-0,
 classOf[Text], classOf[VectorWritable])

^
 Im using Spark 0.9 and Hadoop 1.0.4 and Mahout 0.7

 Thanks
 Stuti



 -Original Message-
 From: Xiangrui Meng [mailto:men...@gmail.com]
 Sent: Wednesday, May 14, 2014 11:56 AM
 To: user@spark.apache.org
 Subject: Re: How to use Mahout VectorWritable in Spark.

 You need

  val raw = sc.sequenceFile(path, classOf[Text],
  classOf[VectorWriteable])

 to load the data. After that, you can do

  val data = raw.values.map(_.get)

 To get an RDD of mahout's Vector. You can use `--jar mahout-math.jar`
 when you launch spark-shell to include mahout-math.

 Best,
 Xiangrui

 On Tue, May 13, 2014 at 10:37 PM, Stuti Awasthi stutiawas...@hcl.com
 wrote:
  Hi All,
 
  I am very new to Spark and trying to play around with Mllib hence
  apologies for the basic question.
 
 
 
  I am trying to run KMeans algorithm using Mahout and Spark MLlib to
  see the performance. Now initial datasize was 10 GB. Mahout converts
  the data in Sequence File Text,VectorWritable which is used for
 KMeans Clustering.
  The Sequence File crated was ~ 6GB in size.
 
 
 
  Now I wanted if I can use the Mahout Sequence file to be executed in
  Spark MLlib for KMeans . I have read that SparkContext.sequenceFile
  may be used here. Hence I tried to read my sequencefile as below but
 getting the error :
 
 
 
  Command on Spark Shell :
 
  scala val data = sc.sequenceFile[String,VectorWritable](/
  KMeans_dataset_seq/part-r-0,String,VectorWritable)
 
  console:12: error: not found: type VectorWritable
 
 val data = sc.sequenceFile[String,VectorWritable](
  /KMeans_dataset_seq/part-r-0,String,VectorWritable)
 
 
 
  Here I have 2 ques:
 
  1.  Mahout has “Text” as Key but Spark is printing “not found:
 type:Text”
  hence I changed it to String.. Is this correct ???
 
  2. How will VectorWritable be found in Spark. Do I need to include
  Mahout jar in Classpath or any other option ??
 
 
 
  Please Suggest
 
 
 
  Regards
 
  Stuti Awasthi
 
 
 
  ::DISCLAIMER::
  --
  --
  
 
  The contents of this e-mail and any attachment(s) are confidential and
  intended for the named recipient(s) only.
  E-mail transmission is not guaranteed to be secure or error-free as
  information could be intercepted, corrupted, lost, destroyed, arrive
  late or incomplete, or may contain viruses in transmission. The e mail
  and its contents (with or without referred errors) shall therefore not
  attach any liability on the originator or HCL or its affiliates.
  Views or opinions, if any, presented in this email are solely those of
  the author and may not necessarily reflect the views or opinions of
  HCL or its affiliates. Any form of reproduction, dissemination,
  copying, disclosure, modification, distribution and / or publication
  of this message without the prior written consent of authorized
  representative of HCL is strictly prohibited. If you have received
  this email in error please delete it and notify the sender
  immediately.
  Before opening any email and/or attachments, please check them for
  viruses and other defects.
 
  --
  

problem about broadcast variable in iteration

2014-05-15 Thread randylu
My code just like follows:
 1  var rdd1 = ...
 2  var rdd2 = ...
 3  var kv = ...
 4  for (i - 0 until n) {
 5var kvGlobal = sc.broadcast(kv)   // broadcast kv
 6rdd1 = rdd2.map {
 7  case t = doSomething(t, kvGlobal.value)
 8}
 9var tmp = rdd1.reduceByKey().collect()
10kv = updateKV(tmp)   // update kv for each
iteration
11rdd2 = rdd1
12 }
13 rdd2.saveAsTextFile()
  In 1st itreation, when processed line9, each slave need to read
broadcast_1;
  In 2nd iteration, when processed line9, each slave need to read
broadcast_1 and broadcast_2;
  In 3rd iteration, when processed line9, each slave need to read
broadcast_1, broadcast_2 and broadcast_3;
  ...
  broadcast_/n/ all correspond to kvGlobal at different iterations.
  why in /n/th iteration,  each slave need to read from broadcast_1 to
broadcast_/n/, why not just reading broadcast_/n/.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/problem-about-broadcast-variable-in-iteration-tp5479.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Equally weighted partitions in Spark

2014-05-15 Thread Syed A. Hashmi
I took a stab at it and wrote a
partitionerhttps://github.com/syedhashmi/spark/commit/4ca94cc155aea4be36505d5f37d037e209078196that
I intend to contribute back to main repo some time later. The
partitioner takes in parameter which governs minimum number of keys /
partition and once all partition hits that limit, it goes round robin. An
alternate strategy could be to go round robin by default. This partitioner
will guarantee equally sized partitions without tinkering with hash codes,
complex balancing computations, etc.

Wanted to get your thoughts on this and any critical comments suggesting
improvements.

Thanks,
Syed.


On Sat, May 3, 2014 at 6:12 PM, Chris Fregly ch...@fregly.com wrote:

 @deenar-  i like the custom partitioner strategy that you mentioned.  i
 think it's very useful.

 as a thought-exercise, is it possible to re-partition your RDD to
 more-evenly distribute the long-running tasks among the short-running tasks
 by ordering the key's differently?  this would play nice with the existing
 RangePartitioner.

 or perhaps manipulate the key's hashCode() to more-evenly-distribute the
 tasks to play nicely with the existing HashPartitioner?

 i don't know if either of these are beneficial, but throwing them out for
 the sake of conversation...

 -chris


 On Fri, May 2, 2014 at 11:10 AM, Andrew Ash and...@andrewash.com wrote:

 Deenar,

 I haven't heard of any activity to do partitioning in that way, but it
 does seem more broadly valuable.


 On Fri, May 2, 2014 at 10:15 AM, deenar.toraskar 
 deenar.toras...@db.comwrote:

 I have equal sized partitions now, but I want the RDD to be partitioned
 such
 that the partitions are equally weighted by some attribute of each RDD
 element (e.g. size or complexity).

 I have been looking at the RangePartitioner code and I have come up with
 something like

 EquallyWeightedPartitioner(noOfPartitions, weightFunction)

 1) take a sum or (sample) of complexities of all elements and calculate
 average weight per partition
 2) take a histogram of weights
 3) assign a list of partitions to each bucket
 4)  getPartition(key: Any): Int would
   a) get the weight and then find the bucket
   b) assign a random partition from the list of partitions associated
 with
 each bucket

 Just wanted to know if someone else had come across this issue before and
 there was a better way of doing this.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Equally-weighted-partitions-in-Spark-tp5171p5212.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.






Re: little confused about SPARK_JAVA_OPTS alternatives

2014-05-15 Thread Koert Kuipers
hey patrick,
i have a SparkConf i can add them too. i was looking for a way to do this
where they are not hardwired within scala, which is what SPARK_JAVA_OPTS
used to do.
i guess if i just set -Dspark.akka.frameSize=1 on my java app launch
then it will get picked up by the SparkConf too right?


On Wed, May 14, 2014 at 2:54 PM, Patrick Wendell pwend...@gmail.com wrote:

 Just wondering - how are you launching your application? If you want
 to set values like this the right way is to add them to the SparkConf
 when you create a SparkContext.

 val conf = new SparkConf().set(spark.akka.frameSize,
 1).setAppName(...).setMaster(...)
 val sc = new SparkContext(conf)

 - Patrick

 On Wed, May 14, 2014 at 9:09 AM, Koert Kuipers ko...@tresata.com wrote:
  i have some settings that i think are relevant for my application. they
 are
  spark.akka settings so i assume they are relevant for both executors and
 my
  driver program.
 
  i used to do:
  SPARK_JAVA_OPTS=-Dspark.akka.frameSize=1
 
  now this is deprecated. the alternatives mentioned are:
  * some spark-submit settings which are not relevant to me since i do not
 use
  spark-submit (i launch spark jobs from an existing application)
  * spark.executor.extraJavaOptions to set -X options. i am not sure what
 -X
  options are, but it doesnt sound like what i need, since its only for
  executors
  * SPARK_DAEMON_OPTS to set java options for standalone daemons (i.e.
 master,
  worker), that sounds like i should not use it since i am trying to change
  settings for an app, not a daemon.
 
  am i missing the correct setting to use?
  should i do -Dspark.akka.frameSize=1 on my application launch
 directly,
  and then also set spark.executor.extraJavaOptions? so basically repeat
 it?



Re: NoSuchMethodError: breeze.linalg.DenseMatrix

2014-05-15 Thread wxhsdp
finally i fixed it. previous failure is caused by lack of some jars.
i pasted the classpath in local mode to workers by using show
compile:dependencyClasspath
and it works!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-breeze-linalg-DenseMatrix-tp5310p5732.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


File present but file not found exception

2014-05-15 Thread Sai Prasanna
Hi Everyone,

I think all are pretty busy, the response time in this group has slightly
increased.

But anyways, this is a pretty silly problem, but could not get over.

I have a file in my localFS, but when i try to create an RDD out of it,
tasks fails with file not found exception is thrown at the log files.

*var file = sc.textFile(file:///home/sparkcluster/spark/input.txt);*
*file.top(1);*

input.txt exists in the above folder but still Spark coudnt find it. Some
parameters need to be set ??

Any help is really appreciated. Thanks !!


Re: run spark0.9.1 on yarn with hadoop CDH4

2014-05-15 Thread Arpit Tak
Also try this out , we have already done this ..
It will help you..
http://docs.sigmoidanalytics.com/index.php/Setup_hadoop_2.0.0-cdh4.2.0_and_spark_0.9.0_on_ubuntu_12.04




On Tue, May 6, 2014 at 10:17 PM, Andrew Lee alee...@hotmail.com wrote:

 Please check JAVA_HOME. Usually it should point to /usr/java/default on
 CentOS/Linux.

 or FYI: http://stackoverflow.com/questions/1117398/java-home-directory


  Date: Tue, 6 May 2014 00:23:02 -0700
  From: sln-1...@163.com
  To: u...@spark.incubator.apache.org
  Subject: run spark0.9.1 on yarn with hadoop CDH4

 
  Hi all,
  I have make HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory
 which
  contains the (client side) configuration files for the hadoop cluster.
  The command to launch the YARN Client which I run is like this:
 
  #
 
 SPARK_JAR=./~/spark-0.9.1/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar
  ./bin/spark-class org.apache.spark.deploy.yarn.Client\--jar
  examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.1.jar\--class
  org.apache.spark.examples.SparkPi\--args yarn-standalone \--num-workers 3
  \--master-memory 2g \--worker-memory 2g \--worker-cores 1
  ./bin/spark-class: line 152: /usr/lib/jvm/java-7-sun/bin/java: No such
 file
  or directory
  ./bin/spark-class: line 152: exec: /usr/lib/jvm/java-7-sun/bin/java:
 cannot
  execute: No such file or directory
  How to make it runs well?
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/run-spark0-9-1-on-yarn-with-hadoop-CDH4-tp5426.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.



Test

2014-05-15 Thread Matei Zaharia


Re: 0.9 wont start cluster on ec2, SSH connection refused?

2014-05-15 Thread wxhsdp
Hi, mayur

i've met the same problem. the instances are on, i can see them from ec2
console, and connect to them

wxhsdp@ubuntu:~/spark/spark/tags/v1.0.0-rc3/ec2$ ssh -i wxhsdp-us-east.pem
root@54.86.181.108
The authenticity of host '54.86.181.108 (54.86.181.108)' can't be
established.
ECDSA key fingerprint is
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added '54.86.181.108' (ECDSA) to the list of known
hosts.
Last login: Sun Feb  2 23:13:21 2014 from
c-50-131-222-227.hsd1.ca.comcast.net

   __|  __|_  )
   _|  ( /   Amazon Linux AMI
  ___|\___|___|

https://aws.amazon.com/amazon-linux-ami/2013.03-release-notes/
There are 42 security update(s) out of 257 total update(s) available
Run sudo yum update to apply all updates.
Amazon Linux version 2014.03 is available.

but there's /root/ephemeral-hdfs and spark directory as said in the
ec2-scripts.html




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/0-9-wont-start-cluster-on-ec2-SSH-connection-refused-tp4146p5498.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


pyspark python exceptions / py4j exceptions

2014-05-15 Thread Patrick Donovan
Hello,

I'm trying to write a python function that does something like:

def foo(line):
try:
return stuff(line)
except Exception:
raise MoreInformativeException(line)

and then use it in a map like so:

rdd.map(foo)

and have my MoreInformativeException make it back if/when something goes
wrong, but all I can get back are errors like:

Py4JJavaError(u'An error occurred while calling o50.saveAsTextFile.\n',
JavaObject id=o51)

Is there any way to recover my MoreInformativeException back in python
land? If I reach into java via py4j can I get it back somehow? If so, how?

Thanks!


Re: NoSuchMethodError: breeze.linalg.DenseMatrix

2014-05-15 Thread DB Tsai
Hi Wxhsdp,

I also have some difficulties witth sc.addJar(). Since we include the
breeze library by using Spark 1.0, we don't have the problem you ran into.
However, when we add external jars via sc.addJar(), I found that the
executors actually fetch the jars but the classloader still doesn't honor
it. I'm trying to figure out the problem now.


Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Wed, May 14, 2014 at 5:46 AM, wxhsdp wxh...@gmail.com wrote:

 Hi, DB
   i've add breeze jars to workers using sc.addJar()
   breeze jars include :
   breeze-natives_2.10-0.7.jar
   breeze-macros_2.10-0.3.jar
   breeze-macros_2.10-0.3.1.jar
   breeze_2.10-0.8-SNAPSHOT.jar
   breeze_2.10-0.7.jar

   almost all the jars about breeze i can find, but still NoSuchMethodError:
 breeze.linalg.DenseMatrix

   from the executor stderr, you can see the executor successsully fetches
 these jars, what's wrong
   about my method? thank you!

 14/05/14 20:36:02 INFO Executor: Fetching
 http://192.168.0.106:42883/jars/breeze-natives_2.10-0.7.jar with timestamp
 1400070957376
 14/05/14 20:36:02 INFO Utils: Fetching
 http://192.168.0.106:42883/jars/breeze-natives_2.10-0.7.jar to
 /tmp/fetchFileTemp7468892065227766972.tmp
 14/05/14 20:36:02 INFO Executor: Adding

 file:/home/wxhsdp/spark/spark/tags/v1.0.0-rc3/work/app-20140514203557-/0/./breeze-natives_2.10-0.7.jar
 to class loader
 14/05/14 20:36:02 INFO Executor: Fetching
 http://192.168.0.106:42883/jars/breeze-macros_2.10-0.3.jar with timestamp
 1400070957441
 14/05/14 20:36:02 INFO Utils: Fetching
 http://192.168.0.106:42883/jars/breeze-macros_2.10-0.3.jar to
 /tmp/fetchFileTemp2324565598765584917.tmp
 14/05/14 20:36:02 INFO Executor: Adding

 file:/home/wxhsdp/spark/spark/tags/v1.0.0-rc3/work/app-20140514203557-/0/./breeze-macros_2.10-0.3.jar
 to class loader
 14/05/14 20:36:02 INFO Executor: Fetching
 http://192.168.0.106:42883/jars/breeze_2.10-0.8-SNAPSHOT.jar with
 timestamp
 1400070957358
 14/05/14 20:36:02 INFO Utils: Fetching
 http://192.168.0.106:42883/jars/breeze_2.10-0.8-SNAPSHOT.jar to
 /tmp/fetchFileTemp8730123100104850193.tmp
 14/05/14 20:36:02 INFO Executor: Adding

 file:/home/wxhsdp/spark/spark/tags/v1.0.0-rc3/work/app-20140514203557-/0/./breeze_2.10-0.8-SNAPSHOT.jar
 to class loader
 14/05/14 20:36:02 INFO Executor: Fetching
 http://192.168.0.106:42883/jars/breeze-macros_2.10-0.3.1.jar with
 timestamp
 1400070957414
 14/05/14 20:36:02 INFO Utils: Fetching
 http://192.168.0.106:42883/jars/breeze-macros_2.10-0.3.1.jar to
 /tmp/fetchFileTemp3473404556989515218.tmp
 14/05/14 20:36:02 INFO Executor: Adding

 file:/home/wxhsdp/spark/spark/tags/v1.0.0-rc3/work/app-20140514203557-/0/./breeze-macros_2.10-0.3.1.jar
 to class loader
 14/05/14 20:36:02 INFO Executor: Fetching
 http://192.168.0.106:42883/jars/build-project_2.10-1.0.jar with timestamp
 1400070956753
 14/05/14 20:36:02 INFO Utils: Fetching
 http://192.168.0.106:42883/jars/build-project_2.10-1.0.jar to
 /tmp/fetchFileTemp1289055585501269156.tmp
 14/05/14 20:36:02 INFO Executor: Adding

 file:/home/wxhsdp/spark/spark/tags/v1.0.0-rc3/work/app-20140514203557-/0/./build-project_2.10-1.0.jar
 to class loader
 14/05/14 20:36:02 INFO Executor: Fetching
 http://192.168.0.106:42883/jars/breeze_2.10-0.7.jar with timestamp
 1400070957228
 14/05/14 20:36:02 INFO Utils: Fetching
 http://192.168.0.106:42883/jars/breeze_2.10-0.7.jar to
 /tmp/fetchFileTemp1287317286108432726.tmp
 14/05/14 20:36:02 INFO Executor: Adding

 file:/home/wxhsdp/spark/spark/tags/v1.0.0-rc3/work/app-20140514203557-/0/./breeze_2.10-0.7.jar
 to class loader


 DB Tsai-2 wrote
  Since the breeze jar is brought into spark by mllib package, you may want
  to add mllib as your dependency in spark 1.0. For bring it from your
  application yourself, you can either use sbt assembly in ur build project
  to generate a flat myApp-assembly.jar which contains breeze jar, or use
  spark add jar api like Yadid said.
 
 
  Sincerely,
 
  DB Tsai
  ---
  My Blog: https://www.dbtsai.com
  LinkedIn: https://www.linkedin.com/in/dbtsai
 
 
  On Sun, May 4, 2014 at 10:24 PM, wxhsdp lt;

  wxhsdp@

  gt; wrote:
 
  Hi, DB, i think it's something related to sbt publishLocal
 
  if i remove the breeze dependency in my sbt file, breeze can not be
 found
 
  [error] /home/wxhsdp/spark/example/test/src/main/scala/test.scala:5: not
  found: object breeze
  [error] import breeze.linalg._
  [error]^
 
  here's my sbt file:
 
  name := Build Project
 
  version := 1.0
 
  scalaVersion := 2.10.4
 
  libraryDependencies += org.apache.spark %% spark-core %
  1.0.0-SNAPSHOT
 
  resolvers += Akka Repository at http://repo.akka.io/releases/;
 
  i run sbt publishLocal on the Spark tree.
 
  but if i manully put spark-assembly-1.0.0-SNAPSHOT-hadoop1.0.4.jar in
  /lib
  directory, sbt package is
  ok, 

Re: Equally weighted partitions in Spark

2014-05-15 Thread deenar.toraskar
This is my first implementation. There are a few rough edges, but when I run
this I get the following exception. The class extends Partitioner which in
turn extends Serializable. Any idea what I am doing wrong?

scala res156.partitionBy(new EqualWeightPartitioner(1000, res156,
weightFunction))
14/05/09 16:59:36 INFO SparkContext: Starting job: histogram at
console:197
14/05/09 16:59:36 INFO DAGScheduler: Got job 18 (histogram at console:197)
with 250 output partitions (allowLocal=false)
14/05/09 16:59:36 INFO DAGScheduler: Final stage: Stage 18 (histogram at
console:197)
14/05/09 16:59:36 INFO DAGScheduler: Parents of final stage: List()
14/05/09 16:59:36 INFO DAGScheduler: Missing parents: List()
14/05/09 16:59:36 INFO DAGScheduler: Submitting Stage 18
(MapPartitionsRDD[36] at histogram at console:197), which has no missing
parents
14/05/09 16:59:36 INFO DAGScheduler: Failed to run histogram at
console:197
org.apache.spark.SparkException: Job aborted: Task not serializable:
java.io.NotSerializableException: scala.util.Random
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
==


import scala.reflect.ClassTag

import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
import org.apache.spark.Partitioner 
import java.util
import scala.Array
import scala.reflect._
import scala.util.Random

/**
 * A [[org.apache.spark.Partitioner]] that partitions sortable records by
range into roughly
 * equal ranges. The ranges are determined by sampling the content of the
RDD passed in.
 */
class EqualWeightPartitioner[K : Ordering : ClassTag, V](
partitions: Int,
@transient rdd: RDD[_ : Product2[K,V]],
weightFunction:  K = Double)  extends Partitioner {

  private val ordering = implicitly[Ordering[K]]
  private val histogram = rdd.map(x= (x._1, weightFunction(x._1))).map(x=
x._2.toDouble).histogram(partitions)
  private val bucketSize = (histogram._1(1) - histogram._1(0))

  // need to refine this algorithm to use single partitions

  // An array of upper bounds for the first (partitions - 1) partitions
  private val bucketToPartitionMap: Array[(Int, Int)] = {
if (partitions == 1) {
  Array()
} else {
  val bucketWeights = histogram._2.zipWithIndex.map{case(y,idx)=
(y*(histogram._1(idx) + histogram._1(idx+1))/2)}.map{var s :Double= 0.0; d
= {s += d; s}}
  val averageWeight = bucketWeights.last/partitions
  val bucketPartition : Array[(Int, Int)] =
bucketWeights.map(x=Math.max(0,Math.round(x/averageWeight)-1).toInt).zipWithIndex
  bucketPartition.map(x=x._2 match { case 0 = (0, x._1) case _ =
(Math.min(partitions-1,bucketPartition(x._2-1)._1+1), x._1)})
}
  }

  def numPartitions = partitions

  def getPartition(key: Any): Int = {
val rnd = new Random
val k = key.asInstanceOf[K]
val weight : Double = weightFunction(k)
val bucketIndex : Int = Math.min(1,(weight/bucketSize).toInt)
val partitionRange : (Int, Int) = bucketToPartitionMap(bucketIndex - 1)
partitionRange._1 + rnd.nextInt((partitionRange._2 - partitionRange._1 +
1 ))
  }

  override def equals(other: Any): Boolean = other match {
case r: EqualWeightPartitioner[_,_] =
  r.bucketToPartitionMap.sameElements(bucketToPartitionMap)
case _ =
  false
  }

}


Re: Unable to load native-hadoop library problem

2014-05-15 Thread Andrew Or
This seems unrelated to not being able to load native-hadoop library. Is it
failing to connect to ResourceManager? Have you verified that there is an
RM process listening on port 8032 at the specified IP?


On Tue, May 6, 2014 at 6:25 PM, Sophia sln-1...@163.com wrote:

 Hi,everyone,
 [root@CHBM220 spark-0.9.1]#

 SPARK_JAR=.assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar
 ./bin/spark-class org.apache.spark.deploy.yarn.Client --jar
 examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.1.jar --class
 org.apache.spark.examples.SparkPi --args yarn-standalone --num-workers 3
 --master-memory 2g --worker-memory 2g --worker-cores 1
 14/05/07 09:05:14 WARN NativeCodeLoader: Unable to load native-hadoop
 library for your platform... using builtin-java classes where applicable
 14/05/07 09:05:14 INFO RMProxy: Connecting to ResourceManager at
 CHBM220/192.168.10.220:8032
 Then it stopped,my hadoop_conf_dir has been configued well,what should I do
 to?
 Wish you happy everyday.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Unable-to-load-native-hadoop-library-problem-tp5469.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



filling missing values in a sequence

2014-05-15 Thread Mohit Jaggi
Hi,
I am trying to find a way to fill in missing values in an RDD. The RDD is a
sorted sequence.
For example, (1, 2, 3, 5, 8, 11, ...)
I need to fill in the missing numbers and get (1,2,3,4,5,6,7,8,9,10,11)

One way to do this is to slide and zip
rdd1 =  sc.parallelize(List(1, 2, 3, 5, 8, 11, ...))
x = rdd1.first
rdd2 = rdd1 filter (_ != x)
rdd3 = rdd2 zip rdd1
rdd4 = rdd3 flatmap { (x, y) = generate missing elements between x and y }

Another method which I think is more efficient is to use mapParititions()
on rdd1 to be able to iterate on elements of rdd1 in each partition.
However, that leaves the boundaries of the partitions to be unfilled. Is
there a way within the function passed to mapPartitions, to read the first
element in the next partition?

The latter approach also appears to work for a general sliding window
calculation on the RDD. The former technique requires a lot of sliding and
zipping and I believe it is not efficient. If only I could read the next
partition...I have tried passing a pointer to rdd1 to the function passed
to mapPartitions but the rdd1 pointer turns out to be NULL, I guess because
Spark cannot deal with a mapper calling another mapper (since it happens on
a worker not the driver)

Mohit.


Re: sbt run with spark.ContextCleaner ERROR

2014-05-15 Thread Nan Zhu
same problem +1, 

though does not change the program result 

-- 
Nan Zhu


On Tuesday, May 6, 2014 at 11:58 PM, Tathagata Das wrote:

 Okay, this needs to be fixed. Thanks for reporting this!
 
 
 
 On Mon, May 5, 2014 at 11:00 PM, wxhsdp wxh...@gmail.com 
 (mailto:wxh...@gmail.com) wrote:
  Hi, TD
  
  i tried on v1.0.0-rc3 and still got the error
  
  
  
  --
  View this message in context: 
  http://apache-spark-user-list.1001560.n3.nabble.com/sbt-run-with-spark-ContextCleaner-ERROR-tp5304p5421.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com 
  (http://Nabble.com).
 



Re: is Mesos falling out of favor?

2014-05-15 Thread deric
I'm also using right now SPARK_EXECUTOR_URI, though I would prefer
distributing Spark as a binary package.

For running examples with `./bin/run-example ...` it works fine, however
tasks from spark-shell are getting lost.

Error: Could not find or load main class
org.apache.spark.executor.MesosExecutorBackend

which looks more like problem with sbin/spark-executor and missing paths to
jar. Anyone encountered this error before?

I guess Yahoo invested quite a lot of effort into YARN and Spark integration
(moreover when Mahout is migrating to Spark there's much more interest in
Hadoop and Spark integration). If there would be some Mesos company
working on Spark - Mesos integration it could be at least on the same level.

I don't see any other reason why would be YARN better than Mesos, personally
I like the latter, however I haven't checked YARN for a while, maybe they've
made a significant progress. I think Mesos is more universal and flexible
than YARN.  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/is-Mesos-falling-out-of-favor-tp5444p5481.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Easy one

2014-05-15 Thread Laeeq Ahmed
Hi Ian,

Don't use SPARK_MEM in spark-env.sh. It will get it set for all of your jobs. 
The better way is to use only the second option 
sconf.setExecutorEnv(spark.executor.memory, 4g”) i.e. set it in the driver 
program. In this way every job will have memory according to requirment.

For example you have 4gb memory on each worker node, you won't be able to run 
more than one job with 4gb given to each. Two jobs with 2GB(or slightly less) 
each will work. 

Laeeq


On Wednesday, May 7, 2014 2:29 AM, Ian Ferreira ianferre...@hotmail.com wrote:
 
Hi there,

Why can’t I seem to kick the executor memory higher? See below from EC2 
deployment using m1.large


And in the spark-env.sh
export SPARK_MEM=6154m


And in the spark context
sconf.setExecutorEnv(spark.executor.memory, 4g”)

Cheers
- Ian

Re: Equivalent of collect() on DStream

2014-05-15 Thread Stephen Boesch
It seems the concept I had been missing is to invoke the DStream foreach
method.  This method  takes a  function expecting an RDD and applies the
function to each RDD within the DStream.


2014-05-14 21:33 GMT-07:00 Stephen Boesch java...@gmail.com:

 Looking further it appears the functionality I am seeking is in the
 following *private[spark] * class ForEachdStream

 (version 0.8.1 , yes we are presently using an older release..)

 private[streaming]
 class ForEachDStream[T: ClassManifest] (
 parent: DStream[T],
 *foreachFunc: (RDD[T], Time) = Unit*
   ) extends DStream[Unit](parent.ssc) {

 I would like to have access to this structure - particularly the ability
 to define an foreachFunc that gets applied to each RDD within the
 DStream.  Is there a means to do so?



 2014-05-14 21:25 GMT-07:00 Stephen Boesch java...@gmail.com:


 Given that collect() does not exist on DStream apparently my mental model
 of Streaming RDD (DStream) needs correction/refinement.  So what is the
 means to convert DStream data into a JVM in-memory representation.  All of
 the methods on DStream i.e. filter, map, transform, reduce, etc generate
 other DStream's, and not an in memory data structure.







Re: How to use Mahout VectorWritable in Spark.

2014-05-15 Thread Dmitriy Lyubimov
Mahout now supports doing its distributed linalg natively on Spark so the
problem of sequence file input load into Spark is already solved there
 (trunk, http://mahout.apache.org/users/sparkbindings/home.html,
drmFromHDFS() call -- and then you can access to the direct rdd via rdd
matrix property if needed).

if you specifically try ensure interoperability with MLlib, however, I did
not try that -- however, Mahout's linalg  tits bindings to Spark works
with Kryo serializer only, so if/when MLLib  algorithms do not  support
kryo serializer, it would not be interoperable.

-d


On Tue, May 13, 2014 at 10:37 PM, Stuti Awasthi stutiawas...@hcl.comwrote:

  Hi All,

 I am very new to Spark and trying to play around with Mllib hence
 apologies for the basic question.



 I am trying to run KMeans algorithm using Mahout and Spark MLlib to see
 the performance. Now initial datasize was 10 GB. Mahout converts the data
 in Sequence File Text,VectorWritable which is used for KMeans
 Clustering.  The Sequence File crated was ~ 6GB in size.



 Now I wanted if I can use the Mahout Sequence file to be executed in Spark
 MLlib for KMeans . I have read that SparkContext.sequenceFile may be used
 here. Hence I tried to read my sequencefile as below but getting the error :



 Command on Spark Shell :

 scala val data = sc.sequenceFile[String,VectorWritable](/
 KMeans_dataset_seq/part-r-0,String,VectorWritable)

 console:12: error: not found: type VectorWritable

val data = sc.sequenceFile[String,VectorWritable](
 /KMeans_dataset_seq/part-r-0,String,VectorWritable)



 Here I have 2 ques:

 1.  Mahout has “Text” as Key but Spark is printing “not found: type:Text”
 hence I changed it to String.. Is this correct ???

 2. How will VectorWritable be found in Spark. Do I need to include Mahout
 jar in Classpath or any other option ??



 Please Suggest



 Regards

 Stuti Awasthi



 ::DISCLAIMER::

 

 The contents of this e-mail and any attachment(s) are confidential and
 intended for the named recipient(s) only.
 E-mail transmission is not guaranteed to be secure or error-free as
 information could be intercepted, corrupted,
 lost, destroyed, arrive late or incomplete, or may contain viruses in
 transmission. The e mail and its contents
 (with or without referred errors) shall therefore not attach any liability
 on the originator or HCL or its affiliates.
 Views or opinions, if any, presented in this email are solely those of the
 author and may not necessarily reflect the
 views or opinions of HCL or its affiliates. Any form of reproduction,
 dissemination, copying, disclosure, modification,
 distribution and / or publication of this message without the prior
 written consent of authorized representative of
 HCL is strictly prohibited. If you have received this email in error
 please delete it and notify the sender immediately.
 Before opening any email and/or attachments, please check them for viruses
 and other defects.


 



Re: How to run shark?

2014-05-15 Thread Mayur Rustagi
Mostly your shark server is not started.
Are you connecting to the cluster or running in local mode?
What is the lowest error on the stack.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Mon, May 12, 2014 at 2:07 PM, Sophia sln-1...@163.com wrote:

 When I run the shark command line,it turns out like this,and I cannot see
 something like shark.How can I do? the log:
 -
 Starting the Shark Command Line Client
 14/05/12 16:32:49 WARN conf.Configuration: mapred.max.split.size is
 deprecated. Instead, use mapreduce.input.fileinputformat.split.maxsize
 14/05/12 16:32:49 WARN conf.Configuration: mapred.min.split.size is
 deprecated. Instead, use mapreduce.input.fileinputformat.split.minsize
 14/05/12 16:32:49 WARN conf.Configuration: mapred.min.split.size.per.rack
 is
 deprecated. Instead, use
 mapreduce.input.fileinputformat.split.minsize.per.rack
 14/05/12 16:32:49 WARN conf.Configuration: mapred.min.split.size.per.node
 is
 deprecated. Instead, use
 mapreduce.input.fileinputformat.split.minsize.per.node
 14/05/12 16:32:49 WARN conf.Configuration: mapred.reduce.tasks is
 deprecated. Instead, use mapreduce.job.reduces
 14/05/12 16:32:49 WARN conf.Configuration:
 mapred.reduce.tasks.speculative.execution is deprecated. Instead, use
 mapreduce.reduce.speculative
 14/05/12 16:32:49 WARN conf.Configuration:
 org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@51f782b8:an
 attempt
 to override final parameter:
 mapreduce.job.end-notification.max.retry.interval;  Ignoring.
 14/05/12 16:32:49 WARN conf.Configuration:
 org.apache.hadoop.hive.conf.LoopingByteArrayInputStream@51f782b8:an
 attempt
 to override final parameter: mapreduce.job.end-notification.max.attempts;
 Ignoring.

 Logging initialized using configuration in

 jar:file:/root/shark-0.9.1-bin-hadoop2/lib_managed/jars/edu.berkeley.cs.shark/hive-common/hive-common-0.11.0-shark-0.9.1.jar!/hive-log4j.properties
 Hive history
 file=/tmp/root/hive_job_log_root_8413@CHBM220_201405121632_457581193.txt
 SLF4J: Class path contains multiple SLF4J bindings.
 SLF4J: Found binding in

 [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: Found binding in

 [jar:file:/root/shark-0.9.1-bin-hadoop2/lib_managed/jars/org.slf4j/slf4j-log4j12/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
 explanation.
 1.363: [GC 131072K-11100K(502464K), 0.0087470 secs]
 [ERROR] [05/12/2014 16:33:00.461] [main] [Remoting] Remoting error:
 [Startup
 timed out] [
 akka.remote.RemoteTransportException: Startup timed out
 at
 akka.remote.Remoting.akka$remote$Remoting$$notifyError(Remoting.scala:129)
 at akka.remote.Remoting.start(Remoting.scala:191)
 at
 akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184)
 at
 akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:579)
 at akka.actor.ActorSystemImpl._start(ActorSystem.scala:577)
 at akka.actor.ActorSystemImpl.start(ActorSystem.scala:588)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:111)
 at akka.actor.ActorSystem$.apply(ActorSystem.scala:104)
 at
 org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:96)
 at org.apache.spark.SparkEnv$.create(SparkEnv.scala:126)
 at org.apache.spark.SparkContext.init(SparkContext.scala:139)
 at shark.SharkContext.init(SharkContext.scala:42)
 at shark.SharkContext.init(SharkContext.scala:61)
 at shark.SharkEnv$.initWithSharkContext(SharkEnv.scala:78)
 at shark.SharkEnv$.init(SharkEnv.scala:38)
 at shark.SharkCliDriver.init(SharkCliDriver.scala:278)
 at shark.SharkCliDriver$.main(SharkCliDriver.scala:162)
 at shark.SharkCliDriver.main(SharkCliDriver.scala)
 Caused by: java.util.concurrent.TimeoutException: Futures timed out after
 [1 milliseconds]
 at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at
 scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at

 scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
 at scala.concurrent.Await$.result(package.scala:107)
 at akka.remote.Remoting.start(Remoting.scala:173)
 ... 16 more
 ]
 Exception in thread main java.util.concurrent.TimeoutException: Futures
 timed out after [1 milliseconds]
 at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at
 scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at

 

Average of each RDD in Stream

2014-05-15 Thread Laeeq Ahmed
Hi,

I use the following code for calculating average. The problem is that the 
reduce operation return a DStream here and not a tuple as it normally does 
without Streaming. So how can we get the sum and the count from the DStream. 
Can we cast it to tuple?


val numbers = ssc.textFileStream(args(1))
    val sumandcount = numbers.map(n = (n.toDouble, 1)).reduce{ (a, b) = (a._1 
+ b._1, a._2 + b._2) }
    sumandcount.print()


Regards,
Laeeq


spark+mesos: configure mesos 'callback' port?

2014-05-15 Thread Scott Clasen
Is anyone aware of a way to configure the mesos GroupProcess port on the
mesos slave/task which the mesos master calls back on?

The log line that shows this port looks like below (mesos 0.17.0)

I0507 02:37:20.893334 11638 group.cpp:310] Group process ((2)@1.2.3.4:54321)
connected to ZooKeeper.

I would really like to be able to configure the port (54321 in the example).

Anyone know how?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-mesos-configure-mesos-callback-port-tp5475.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: 1.0.0 Release Date?

2014-05-15 Thread Madhu
Spark 1.0.0 rc5 is available and open for voting
Give it a try and vote on it at the dev user list.



-
Madhu
https://www.linkedin.com/in/msiddalingaiah
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/1-0-0-Release-Date-tp5664p5716.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: pySpark memory usage

2014-05-15 Thread Jim Blomo
Should add that I had to tweak the numbers a bit to keep above swap
threshold, but below the Too many open files error (`ulimit -n` is
32768).

On Wed, May 14, 2014 at 10:47 AM, Jim Blomo jim.bl...@gmail.com wrote:
 That worked amazingly well, thank you Matei!  Numbers that worked for
 me were 400 for the textFile()s, 1500 for the join()s.

 On Mon, May 12, 2014 at 7:58 PM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 Hey Jim, unfortunately external spilling is not implemented in Python right 
 now. While it would be possible to update combineByKey to do smarter stuff 
 here, one simple workaround you can try is to launch more map tasks (or more 
 reduce tasks). To set the minimum number of map tasks, you can pass it as a 
 second argument to textFile and such (e.g. sc.textFile(“s3n://foo.txt”, 
 1000)).

 Matei

 On May 12, 2014, at 5:47 PM, Jim Blomo jim.bl...@gmail.com wrote:

 Thanks, Aaron, this looks like a good solution!  Will be trying it out 
 shortly.

 I noticed that the S3 exception seem to occur more frequently when the
 box is swapping.  Why is the box swapping?  combineByKey seems to make
 the assumption that it can fit an entire partition in memory when
 doing the combineLocally step.  I'm going to try to break this apart
 but will need some sort of heuristic options include looking at memory
 usage via the resource module and trying to keep below
 'spark.executor.memory', or using batchSize to limit the number of
 entries in the dictionary.  Let me know if you have any opinions.

 On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson ilike...@gmail.com wrote:
 I'd just like to update this thread by pointing to the PR based on our
 initial design: https://github.com/apache/spark/pull/640

 This solution is a little more general and avoids catching IOException
 altogether. Long live exception propagation!


 On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell pwend...@gmail.com 
 wrote:

 Hey Jim,

 This IOException thing is a general issue that we need to fix and your
 observation is spot-in. There is actually a JIRA for it here I created a 
 few
 days ago:
 https://issues.apache.org/jira/browse/SPARK-1579

 Aaron is assigned on that one but not actively working on it, so we'd
 welcome a PR from you on this if you are interested.

 The first thought we had was to set a volatile flag when the reader sees
 an exception (indicating there was a failure in the task) and avoid
 swallowing the IOException in the writer if this happens. But I think 
 there
 is a race here where the writer sees the error first before the reader 
 knows
 what is going on.

 Anyways maybe if you have a simpler solution you could sketch it out in
 the JIRA and we could talk over there. The current proposal in the JIRA is
 somewhat complicated...

 - Patrick






 On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo jim.bl...@gmail.com wrote:

 FYI, it looks like this stdin writer to Python finished early error was
 caused by a break in the connection to S3, from which the data was being
 pulled.  A recent commit to PythonRDD noted that the current exception
 catching can potentially mask an exception for the data source, and that 
 is
 indeed what I see happening.  The underlying libraries (jets3t and
 httpclient) do have retry capabilities, but I don't see a great way of
 setting them through Spark code.  Instead I added the patch below which
 kills the worker on the exception.  This allows me to completely load the
 data source after a few worker retries.

 Unfortunately, java.net.SocketException is the same error that is
 sometimes expected from the client when using methods like take().  One
 approach around this conflation is to create a new locally scoped 
 exception
 class, eg. WriterException, catch java.net.SocketException during output
 writing, then re-throw the new exception.  The worker thread could then
 distinguish between the reasons java.net.SocketException might be thrown.
 Perhaps there is a more elegant way to do this in Scala, though?

 Let me know if I should open a ticket or discuss this on the developers
 list instead.  Best,

 Jim

 diff --git
 a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 index 0d71fdb..f31158c 100644
 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag](
 readerException = e
 Try(worker.shutdownOutput()) // kill Python worker process

 +  case e: java.net.SocketException =
 +   // This can happen if a connection to the datasource, eg S3,
 resets
 +   // or is otherwise broken
 +readerException = e
 +Try(worker.shutdownOutput()) // kill Python worker process
 +
   case e: IOException =
 // This can happen for legitimate reasons if the Python code
 stops returning 

Re: Job failed: java.io.NotSerializableException: org.apache.spark.SparkContext

2014-05-15 Thread Shivani Rao
This is something that I have bumped into time and again. the object that
contains your main() should also be serializable then you won't have this
issue.

For example


object Test extends serializable{
def main(){

 // set up spark context
// read your data
// create your RDD's (grouped by key)
// write out your RDD to hdfs
}
}

HTH,

Thanks
Shivani


On Mon, May 12, 2014 at 2:27 AM, yh18190 yh18...@gmail.com wrote:

 Hi,

 I am facing above exception when I am trying to apply a method(ComputeDwt)
 on RDD[(Int,ArrayBuffer[(Int,Double)])] input.
 I am even using extends Serialization option to serialize objects in
 spark.Here is the code snippet.

 Could anyone suggest me what could be the problem and what should be done
 to
 overcome this issue.???

 input:series:RDD[(Int,ArrayBuffer[(Int,Double)])]
 DWTsample extends Serialization is a class having computeDwt function.
 sc: sparkContext

  val  kk:RDD[(Int,List[Double])]=series.map(t=(t._1,new
 DWTsample().computeDwt(sc,t._2)))

 Error:
 org.apache.spark.SparkException: Job failed:
 java.io.NotSerializableException: org.apache.spark.SparkContext
 org.apache.spark.SparkException: Job failed:
 java.io.NotSerializableException: org.apache.spark.SparkContext
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
 at

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at
 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
 at
 org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
 at
 org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
 at

 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
 at
 org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
 at
 org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Job-failed-java-io-NotSerializableException-org-apache-spark-SparkContext-tp5585.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




-- 
Software Engineer
Analytics Engineering Team@ Box
Mountain View, CA


Re: pySpark memory usage

2014-05-15 Thread Jim Blomo
That worked amazingly well, thank you Matei!  Numbers that worked for
me were 400 for the textFile()s, 1500 for the join()s.

On Mon, May 12, 2014 at 7:58 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 Hey Jim, unfortunately external spilling is not implemented in Python right 
 now. While it would be possible to update combineByKey to do smarter stuff 
 here, one simple workaround you can try is to launch more map tasks (or more 
 reduce tasks). To set the minimum number of map tasks, you can pass it as a 
 second argument to textFile and such (e.g. sc.textFile(“s3n://foo.txt”, 
 1000)).

 Matei

 On May 12, 2014, at 5:47 PM, Jim Blomo jim.bl...@gmail.com wrote:

 Thanks, Aaron, this looks like a good solution!  Will be trying it out 
 shortly.

 I noticed that the S3 exception seem to occur more frequently when the
 box is swapping.  Why is the box swapping?  combineByKey seems to make
 the assumption that it can fit an entire partition in memory when
 doing the combineLocally step.  I'm going to try to break this apart
 but will need some sort of heuristic options include looking at memory
 usage via the resource module and trying to keep below
 'spark.executor.memory', or using batchSize to limit the number of
 entries in the dictionary.  Let me know if you have any opinions.

 On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson ilike...@gmail.com wrote:
 I'd just like to update this thread by pointing to the PR based on our
 initial design: https://github.com/apache/spark/pull/640

 This solution is a little more general and avoids catching IOException
 altogether. Long live exception propagation!


 On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell pwend...@gmail.com wrote:

 Hey Jim,

 This IOException thing is a general issue that we need to fix and your
 observation is spot-in. There is actually a JIRA for it here I created a 
 few
 days ago:
 https://issues.apache.org/jira/browse/SPARK-1579

 Aaron is assigned on that one but not actively working on it, so we'd
 welcome a PR from you on this if you are interested.

 The first thought we had was to set a volatile flag when the reader sees
 an exception (indicating there was a failure in the task) and avoid
 swallowing the IOException in the writer if this happens. But I think there
 is a race here where the writer sees the error first before the reader 
 knows
 what is going on.

 Anyways maybe if you have a simpler solution you could sketch it out in
 the JIRA and we could talk over there. The current proposal in the JIRA is
 somewhat complicated...

 - Patrick






 On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo jim.bl...@gmail.com wrote:

 FYI, it looks like this stdin writer to Python finished early error was
 caused by a break in the connection to S3, from which the data was being
 pulled.  A recent commit to PythonRDD noted that the current exception
 catching can potentially mask an exception for the data source, and that 
 is
 indeed what I see happening.  The underlying libraries (jets3t and
 httpclient) do have retry capabilities, but I don't see a great way of
 setting them through Spark code.  Instead I added the patch below which
 kills the worker on the exception.  This allows me to completely load the
 data source after a few worker retries.

 Unfortunately, java.net.SocketException is the same error that is
 sometimes expected from the client when using methods like take().  One
 approach around this conflation is to create a new locally scoped 
 exception
 class, eg. WriterException, catch java.net.SocketException during output
 writing, then re-throw the new exception.  The worker thread could then
 distinguish between the reasons java.net.SocketException might be thrown.
 Perhaps there is a more elegant way to do this in Scala, though?

 Let me know if I should open a ticket or discuss this on the developers
 list instead.  Best,

 Jim

 diff --git
 a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 index 0d71fdb..f31158c 100644
 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag](
 readerException = e
 Try(worker.shutdownOutput()) // kill Python worker process

 +  case e: java.net.SocketException =
 +   // This can happen if a connection to the datasource, eg S3,
 resets
 +   // or is otherwise broken
 +readerException = e
 +Try(worker.shutdownOutput()) // kill Python worker process
 +
   case e: IOException =
 // This can happen for legitimate reasons if the Python code
 stops returning data
 // before we are done passing elements through, e.g., for
 take(). Just log a message to


 On Wed, Apr 9, 2014 at 7:04 PM, Jim Blomo jim.bl...@gmail.com wrote:

 This dataset is uncompressed text at ~54GB. 

Re: Equivalent of collect() on DStream

2014-05-15 Thread Stephen Boesch
Looking further it appears the functionality I am seeking is in the
following *private[spark] * class ForEachdStream

(version 0.8.1 , yes we are presently using an older release..)

private[streaming]
class ForEachDStream[T: ClassManifest] (
parent: DStream[T],
*foreachFunc: (RDD[T], Time) = Unit*
  ) extends DStream[Unit](parent.ssc) {

I would like to have access to this structure - particularly the ability to
define an foreachFunc that gets applied to each RDD within the DStream.
 Is there a means to do so?



2014-05-14 21:25 GMT-07:00 Stephen Boesch java...@gmail.com:


 Given that collect() does not exist on DStream apparently my mental model
 of Streaming RDD (DStream) needs correction/refinement.  So what is the
 means to convert DStream data into a JVM in-memory representation.  All of
 the methods on DStream i.e. filter, map, transform, reduce, etc generate
 other DStream's, and not an in memory data structure.






Re: pySpark memory usage

2014-05-15 Thread Matei Zaharia
Cool, that’s good to hear. We’d also like to add spilling in Python itself, or 
at least make it exit with a good message if it can’t do it.

Matei

On May 14, 2014, at 10:47 AM, Jim Blomo jim.bl...@gmail.com wrote:

 That worked amazingly well, thank you Matei!  Numbers that worked for
 me were 400 for the textFile()s, 1500 for the join()s.
 
 On Mon, May 12, 2014 at 7:58 PM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 Hey Jim, unfortunately external spilling is not implemented in Python right 
 now. While it would be possible to update combineByKey to do smarter stuff 
 here, one simple workaround you can try is to launch more map tasks (or more 
 reduce tasks). To set the minimum number of map tasks, you can pass it as a 
 second argument to textFile and such (e.g. sc.textFile(“s3n://foo.txt”, 
 1000)).
 
 Matei
 
 On May 12, 2014, at 5:47 PM, Jim Blomo jim.bl...@gmail.com wrote:
 
 Thanks, Aaron, this looks like a good solution!  Will be trying it out 
 shortly.
 
 I noticed that the S3 exception seem to occur more frequently when the
 box is swapping.  Why is the box swapping?  combineByKey seems to make
 the assumption that it can fit an entire partition in memory when
 doing the combineLocally step.  I'm going to try to break this apart
 but will need some sort of heuristic options include looking at memory
 usage via the resource module and trying to keep below
 'spark.executor.memory', or using batchSize to limit the number of
 entries in the dictionary.  Let me know if you have any opinions.
 
 On Sun, May 4, 2014 at 8:02 PM, Aaron Davidson ilike...@gmail.com wrote:
 I'd just like to update this thread by pointing to the PR based on our
 initial design: https://github.com/apache/spark/pull/640
 
 This solution is a little more general and avoids catching IOException
 altogether. Long live exception propagation!
 
 
 On Mon, Apr 28, 2014 at 1:28 PM, Patrick Wendell pwend...@gmail.com 
 wrote:
 
 Hey Jim,
 
 This IOException thing is a general issue that we need to fix and your
 observation is spot-in. There is actually a JIRA for it here I created a 
 few
 days ago:
 https://issues.apache.org/jira/browse/SPARK-1579
 
 Aaron is assigned on that one but not actively working on it, so we'd
 welcome a PR from you on this if you are interested.
 
 The first thought we had was to set a volatile flag when the reader sees
 an exception (indicating there was a failure in the task) and avoid
 swallowing the IOException in the writer if this happens. But I think 
 there
 is a race here where the writer sees the error first before the reader 
 knows
 what is going on.
 
 Anyways maybe if you have a simpler solution you could sketch it out in
 the JIRA and we could talk over there. The current proposal in the JIRA is
 somewhat complicated...
 
 - Patrick
 
 
 
 
 
 
 On Mon, Apr 28, 2014 at 1:01 PM, Jim Blomo jim.bl...@gmail.com wrote:
 
 FYI, it looks like this stdin writer to Python finished early error was
 caused by a break in the connection to S3, from which the data was being
 pulled.  A recent commit to PythonRDD noted that the current exception
 catching can potentially mask an exception for the data source, and that 
 is
 indeed what I see happening.  The underlying libraries (jets3t and
 httpclient) do have retry capabilities, but I don't see a great way of
 setting them through Spark code.  Instead I added the patch below which
 kills the worker on the exception.  This allows me to completely load the
 data source after a few worker retries.
 
 Unfortunately, java.net.SocketException is the same error that is
 sometimes expected from the client when using methods like take().  One
 approach around this conflation is to create a new locally scoped 
 exception
 class, eg. WriterException, catch java.net.SocketException during output
 writing, then re-throw the new exception.  The worker thread could then
 distinguish between the reasons java.net.SocketException might be thrown.
 Perhaps there is a more elegant way to do this in Scala, though?
 
 Let me know if I should open a ticket or discuss this on the developers
 list instead.  Best,
 
 Jim
 
 diff --git
 a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 index 0d71fdb..f31158c 100644
 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
 @@ -95,6 +95,12 @@ private[spark] class PythonRDD[T: ClassTag](
readerException = e
Try(worker.shutdownOutput()) // kill Python worker process
 
 +  case e: java.net.SocketException =
 +   // This can happen if a connection to the datasource, eg S3,
 resets
 +   // or is otherwise broken
 +readerException = e
 +Try(worker.shutdownOutput()) // kill Python worker process
 +
  case e: IOException =
// This can happen for legitimate reasons if the Python 

Re: little confused about SPARK_JAVA_OPTS alternatives

2014-05-15 Thread Patrick Wendell
Just wondering - how are you launching your application? If you want
to set values like this the right way is to add them to the SparkConf
when you create a SparkContext.

val conf = new SparkConf().set(spark.akka.frameSize,
1).setAppName(...).setMaster(...)
val sc = new SparkContext(conf)

- Patrick

On Wed, May 14, 2014 at 9:09 AM, Koert Kuipers ko...@tresata.com wrote:
 i have some settings that i think are relevant for my application. they are
 spark.akka settings so i assume they are relevant for both executors and my
 driver program.

 i used to do:
 SPARK_JAVA_OPTS=-Dspark.akka.frameSize=1

 now this is deprecated. the alternatives mentioned are:
 * some spark-submit settings which are not relevant to me since i do not use
 spark-submit (i launch spark jobs from an existing application)
 * spark.executor.extraJavaOptions to set -X options. i am not sure what -X
 options are, but it doesnt sound like what i need, since its only for
 executors
 * SPARK_DAEMON_OPTS to set java options for standalone daemons (i.e. master,
 worker), that sounds like i should not use it since i am trying to change
 settings for an app, not a daemon.

 am i missing the correct setting to use?
 should i do -Dspark.akka.frameSize=1 on my application launch directly,
 and then also set spark.executor.extraJavaOptions? so basically repeat it?


Re: problem about broadcast variable in iteration

2014-05-15 Thread Earthson
RDD is not cached? 

Because recomputing may be required, every broadcast object is included in
the dependences of RDDs, this may also have memory issue(when n and kv is
too large in your case).



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/problem-about-broadcast-variable-in-iteration-tp5479p5495.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: spark 0.9.1 textFile hdfs unknown host exception

2014-05-15 Thread Eugen Cepoi
Solved: Putting HADOOP_CONF_DIR in spark-env of the workers solved the
problem.


The difference between HadoopRDD and NewHadoopRDD is that the old one
creates the JobConf on worker side, where the new one creates an instance
of JobConf on driver side and then broadcasts it.

I tried creating myself the HadoopRDD and tweaked a bit things in order to
log the properties in the conf when loaded on worker side and on driver
side. On worker side I see a dummy conf that looks like the default conf to
me, where on driver side I get the right conf with the namenodes etc.

My guess is that HADOOP_CONF_DIR is not shared with the workers when set
only on the driver (it was not defined in spark-env)?

Also wouldn't it be more natural to create the conf on driver side and then
share it with the workers?





2014-05-09 10:51 GMT+02:00 Eugen Cepoi cepoi.eu...@gmail.com:

 Hi,

 I have some strange behaviour when using textFile to read some data from
 HDFS in spark 0.9.1.
 I get UnknownHost exceptions,  where hadoop client tries to resolve the
 dfs.nameservices and fails.

 So far:
  - this has been tested inside the shell
  - the exact same code works with spark-0.8.1
  - the shell is launched with HADOOP_CONF_DIR pointing to our HA conf
  - if before that some other rdd is created from HDFS and succeeds than,
 this works also (might be related in the way the default hadoop
 configuration is being shared?)
  - if using the new MR API it works
sc.newAPIHadoopFile(path, classOf[TextInputFormat],
 classOf[LongWritable], classOf[Text],
 sc.hadoopConfiguration).map(_._2.toString)

 Hadoop disitribution: 2.0.0-cdh4.1.2
 Spark 0.9.1 - packaged with correct version of hadoop

 Eugen



Schema view of HadoopRDD

2014-05-15 Thread Debasish Das
Hi,

For each line that we read as textLine from HDFS, we have a schema..if
there is an API that takes the schema as List[Symbol] and maps each token
to the Symbol it will be helpful...

Does RDDs provide a schema view of the dataset on HDFS ?

Thanks.
Deb


Re: problem about broadcast variable in iteration

2014-05-15 Thread randylu
rdd1 is cached, but it has no effect: 
 1  var rdd1 = ... 
 2  var rdd2 = ... 
 3  var kv = ... 
 4  for (i - 0 until n) { 
 5var kvGlobal = sc.broadcast(kv)   // broadcast kv 
 6rdd1 = rdd2.map { 
 7  case t = doSomething(t, kvGlobal.value) 
 8}.cache()
 9var tmp = rdd1.reduceByKey().collect() 
10kv = updateKV(tmp)   // update kv for each
iteration 
11rdd2 = rdd1 
12 } 
13 rdd2.saveAsTextFile() 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/problem-about-broadcast-variable-in-iteration-tp5479p5496.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


os buffer cache does not cache shuffle output file

2014-05-15 Thread wxhsdp
Hi, 
  patrick said The intermediate shuffle output gets written to disk, but it
often hits the OS-buffer cache
  since it's not explicitly fsync'ed, so in many cases it stays entirely in
memory. The behavior of the 
  shuffle is agnostic to whether the base RDD is in cache or in disk. 

  i do a test with one groupBy action and found the intermediate shuffle
files are written to disk
  with sufficient free memory, the shuffle size is about 500MB, and there 's
1.5GB free memory,
  and i notice that disk used increases about 500MB during the process.

  here's the log using vmstat, you can see the cache column increases when
reading from disk, but
  buff column is unchanged, so the data written to disk is not buffered 

procs ---memory-- ---swap-- -io -system--
cpu
 r  b   swpd   free buffcache  si   sobiboin   
cs us sy id wa
 2  0  10256 1616852   6664 55734400 0 51380  972  2852 88  7  0 
5
 1  0  10256 1592636   6664 58067600 0 0 949  3777 91  9 
0  0
 1  0  10256 1568228   6672 60401600 0   576   923  3640 94  6 
0  0
 2  0  10256 1545836   6672 62734800 0 0 893  3261 95  5 
0  0
 1  0  10256 1521552   6672 65066800 0 0 884  3401 89 11 
0  0
 2  0  10256 1497144   6672 67401200 0 0 911  3275 91  9 
0  0
 1  0  10256 1469260   6676 70072800 4 60668 1044 3366 85 15  0 
0
 1  0  10256 1453076   6684 70246400 0   924   853 2596 97  3  0 
0

  is the buffer cache in write through mode? something i need to configure? 
  my os is ubuntu 13.10 64bits.
  thanks!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/os-buffer-cache-does-not-cache-shuffle-output-file-tp5478.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Schema view of HadoopRDD

2014-05-15 Thread rxin
The new Spark SQL component is defined for this!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Re-Schema-view-of-HadoopRDD-tp5627p5723.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Is there any problem on the spark mailing list?

2014-05-15 Thread Cheney Sun
I can't receive any spark-user mail since yesterday. Can you guys receive any
new mail?

--
Cheney





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-any-problem-on-the-spark-mailing-list-tp5509.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: problem about broadcast variable in iteration

2014-05-15 Thread randylu
But when i put broadcast variable out of for-circle, it workes well(if not
concerned about memory issue as you pointed out): 
 1  var rdd1 = ... 
 2  var rdd2 = ... 
 3  var kv = ... 
 4  var kvGlobal = sc.broadcast(kv)   // broadcast kv 
 5  for (i - 0 until n) { 
 6rdd1 = rdd2.map { 
 7  case t = doSomething(t, kvGlobal.value) 
 8}.cache()
 9var tmp = rdd1.reduceByKey().collect() 
10kv = updateKV(tmp)   // update kv for each
iteration 
11kvGlobal = sc.broadcast(kv)   // broadcast kv 
12rdd2 = rdd1 
13 } 
14 rdd2.saveAsTextFile() 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/problem-about-broadcast-variable-in-iteration-tp5479p5497.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Task not serializable?

2014-05-15 Thread pedro
I'me still fairly new to this, but I found problems using classes in maps if
they used instance variables in part of the map function. It seems like for
maps and such to work correctly, it needs to be purely functional
programming.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Re-Task-not-serializable-tp3507p5506.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Spark unit testing best practices

2014-05-15 Thread Andras Nemeth
Hi,

Spark's local mode is great to create simple unit tests for our spark
logic. The disadvantage however is that certain types of problems are never
exposed in local mode because things never need to be put on the wire.

E.g. if I accidentally use a closure which has something non-serializable
in it, then my test will happily succeed in local mode but go down in
flames on a real cluster.

Other example is kryo: I'd like to use setRegistrationRequired(true) to
avoid any hidden performance problems due to forgotten registration. And of
course I'd like things to fail in tests. But it won't happen because we
never actually need to serialize the RDDs in local mode.

So, is there some good solution to the above problems? Is there some
local-like mode which simulates serializations as well? Or is there an easy
way to start up *from code* a standalone spark cluster on the machine
running the unit test?

Thanks,
Andras


Re: is Mesos falling out of favor?

2014-05-15 Thread Scott Clasen
curious what the bug is and what it breaks?  I have spark 0.9.0 running on
mesos 0.17.0 and seems to work correctly.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/is-Mesos-falling-out-of-favor-tp5444p5483.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Not getting mails from user group

2014-05-15 Thread Laeeq Ahmed
Hi all,

There seems to be a problem. I am not getting mails from spark user group from 
two days.

Regards,
Laeeq


Spark to utilize HDFS's mmap caching

2014-05-15 Thread Chanwit Kaewkasi
Hi all,

Can Spark (0.9.x) utilize the caching feature in HDFS 2.3 via
sc.textFile() and other HDFS-related APIs?

http://hadoop.apache.org/docs/r2.3.0/hadoop-project-dist/hadoop-hdfs/CentralizedCacheManagement.html

Best regards,

-chanwit

--
Chanwit Kaewkasi
linkedin.com/in/chanwit