Re: Why Spark require this object to be serializerable?

2014-04-29 Thread Earthson
The code is
here:https://github.com/Earthson/sparklda/blob/master/src/main/scala/net/earthson/nlp/lda/lda.scala

I've change it to from Broadcast to Serializable. Now it works:) But There
are too many rdd cache, It is the problem?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-Spark-require-this-object-to-be-serializerable-tp5009p5024.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Issue during Spark streaming with ZeroMQ source

2014-04-29 Thread Francis . Hu
Hi, all

 

I installed spark-0.9.1 and zeromq 4.0.1 , and then run below example:

 

./bin/run-example org.apache.spark.streaming.examples.SimpleZeroMQPublisher
tcp://127.0.1.1:1234 foo.bar`

./bin/run-example org.apache.spark.streaming.examples.ZeroMQWordCount
local[2] tcp://127.0.1.1:1234 foo`

 

No any message was received in ZeroMQWordCount side. 

 

Does anyone know what the issue is ? 

 

 

Thanks,

Francis

 



Re: Issue during Spark streaming with ZeroMQ source

2014-04-29 Thread Prashant Sharma
Unfortunately zeromq 4.0.1 is not supported.
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala#L63Says
about the version. You will need that version of zeromq to see it
work. Basically I have seen it working nicely with zeromq 2.2.0 and if you
have jzmq libraries installed performance is much better.

Prashant Sharma


On Tue, Apr 29, 2014 at 12:29 PM, Francis.Hu
francis...@reachjunction.comwrote:

  Hi, all



 I installed spark-0.9.1 and zeromq 4.0.1 , and then run below example:



 ./bin/run-example
 org.apache.spark.streaming.examples.SimpleZeroMQPublisher *tcp*://
 127.0.1.1:1234 foo.bar`

 ./bin/run-example org.apache.spark.streaming.examples.ZeroMQWordCount
 local[2] *tcp*://127.0.1.1:1234 *foo*`



 No any message was received in ZeroMQWordCount side.



 Does anyone know what the issue is ?





 Thanks,

 Francis





Re: 答复: Issue during Spark streaming with ZeroMQ source

2014-04-29 Thread Prashant Sharma
Well that is not going to be easy, simply because we depend on akka-zeromq
for zeromq support. And since akka does not support the latest zeromq
library yet, I doubt if there is something simple that can be done to
support it.

Prashant Sharma


On Tue, Apr 29, 2014 at 2:44 PM, Francis.Hu francis...@reachjunction.comwrote:

  Thanks, Prashant Sharma





 It works right now after degrade zeromq from 4.0.1 to  2.2.

 Do you know the new release of spark  whether it will upgrade zeromq ?

 Many of our programs are using zeromq 4.0.1, so if in next release ,spark
 streaming can release with a newer zeromq  that would be better for us.





 Francis.



 *发件人:* Prashant Sharma [mailto:scrapco...@gmail.com]
 *发送时间:* Tuesday, April 29, 2014 15:53
 *收件人:* user@spark.apache.org
 *主题:* Re: Issue during Spark streaming with ZeroMQ source



 Unfortunately zeromq 4.0.1 is not supported.
 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala#L63Says
  about the version. You will need that version of zeromq to see it
 work. Basically I have seen it working nicely with zeromq 2.2.0 and if you
 have jzmq libraries installed performance is much better.


   Prashant Sharma



 On Tue, Apr 29, 2014 at 12:29 PM, Francis.Hu francis...@reachjunction.com
 wrote:

 Hi, all



 I installed spark-0.9.1 and zeromq 4.0.1 , and then run below example:



 ./bin/run-example
 org.apache.spark.streaming.examples.SimpleZeroMQPublisher *tcp*://
 127.0.1.1:1234 foo.bar`

 ./bin/run-example org.apache.spark.streaming.examples.ZeroMQWordCount
 local[2] *tcp*://127.0.1.1:1234 *foo*`



 No any message was received in ZeroMQWordCount side.



 Does anyone know what the issue is ?





 Thanks,

 Francis







Re: launching concurrent jobs programmatically

2014-04-29 Thread ishaaq
Very interesting.

One of spark's attractive features is being able to do stuff interactively
via spark-shell. Is something like that still available via Ooyala's job
server?

Or do you use the spark-shell independently of that? If the latter then how
do you manage custom jars for spark-shell? Our app has a number of jars that
I don't particularly want to have to upload each time I want to run a small
ad-hoc spark-shell session.

Thanks,
Ishaaq



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/launching-concurrent-jobs-programmatically-tp4990p5033.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Fwd: Spark RDD cache memory usage

2014-04-29 Thread Han JU
Hi,

By default a fraction of the executor memory (60%) is reserved for RDD
caching, so if there's no explicit caching in the code (eg. rdd.cache()
etc.), or if we persist RDD with StorageLevel.DISK_ONLY, is this part of
memory wated? Does Spark allocates the RDD cache memory dynamically? Or
does spark automatically caches RDDs when it can?

Thanks.

-- 
*JU Han*

Data Engineer @ Botify.com

+33 061960


Re: Joining not-pair RDDs in Spark

2014-04-29 Thread Daniel Darabos
Create a key and join on that.

val callPricesByHour = callPrices.map(p = ((p.year, p.month, p.day,
p.hour), p))
val callsByHour = calls.map(c = ((c.year, c.month, c.day, c.hour), c))
val bills = callPricesByHour.join(callsByHour).mapValues({ case (p, c) =
BillRow(c.customer, c.hour, c.minutes * p.basePrice) }).values

You should be able to expand this approach to three RDDs too.


On Tue, Apr 29, 2014 at 11:55 AM, jsantos jsan...@tecsisa.com wrote:

 In the context of telecom industry, let's supose we have several existing
 RDDs populated from some tables in Cassandra:

 val callPrices: RDD[PriceRow]
 val calls: RDD[CallRow]
 val offersInCourse: RDD[OfferRow]

 where types are defined as follows,

 /** Represents the price per minute for a concrete hour */
 case class PriceRow(
 val year: Int,
 val month: Int,
 val day: Int,
 val hour: Int,
 val basePrice: Float)

 /** Call registries*/
 case class CallRow(
 val customer: String,
 val year: Int,
 val month: Int,
 val day: Int,
 val minutes: Int)

 /** Is there any discount that could be applicable here? */
 case class OfferRow(
 val offerName: String,
 val hour: Int,//[0..23]
 val discount: Float)//[0..1]

 Assuming we cannot use `flatMap` to mix these three RDDs like this way
 (since RDD is not really 'monadic'):

 /**
  * The final bill at a concrete hour for a call
  * is defined as {{{
  *def billPerHour(minutes: Int,basePrice:Float,discount:Float)
 =
  *  minutes * basePrice * discount
  * }}}
  */
 val bills: RDD[BillRow] = for{
 price - callPrices
 call - calls if call.hour==price.hour
 offer - offersInCourse if offer.hour==price.hour
 } yield BillRow(
 call.customer,
 call.hour,
 billPerHour(call.minutes,price.basePrice,offer.discount))

 case class BillRow(
 val customer: String,
 val hour: DateTime,
 val amount: Float)

 which is the best practise for generating a new RDD that join all these
 three RDDs and represents the bill for a concrete customer?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Joining-not-pair-RDDs-in-Spark-tp5034.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: User/Product Clustering with pySpark ALS

2014-04-29 Thread Nick Pentreath
There's no easy way to d this currently. The pieces are there from the PySpark 
code for regression which should be adaptable.


But you'd have to roll your own solution.




This is something I also want so I intend to put together a pull request for 
this soon
—
Sent from Mailbox

On Tue, Apr 29, 2014 at 4:28 PM, Laird, Benjamin
benjamin.la...@capitalone.com wrote:

 Hi all -
 I’m using pySpark/MLLib ALS for user/item clustering and would like to 
 directly access the user/product RDDs (called userFeatures/productFeatures in 
 class MatrixFactorizationModel in 
 mllib/recommendation/MatrixFactorizationModel.scala
 This doesn’t seem to complex, but it doesn’t seem like the functionality is 
 currently available. I think it requires accessing the underlying java mode 
 like so:
 model = ALS.train(ratings,1,iterations=1,blocks=5)
 userFeatures = RDD(model.javamodel.userFeatures, sc, ???)
 However, I don’t know what to pass as the deserializer. I need these low 
 dimensional vectors as an RDD to then use in Kmeans clustering. Has anyone 
 done something similar?
 Ben
 
 The information contained in this e-mail is confidential and/or proprietary 
 to Capital One and/or its affiliates. The information transmitted herewith is 
 intended only for use by the individual or entity to which it is addressed.  
 If the reader of this message is not the intended recipient, you are hereby 
 notified that any review, retransmission, dissemination, distribution, 
 copying or other use of, or taking of any action in reliance upon this 
 information is strictly prohibited. If you have received this communication 
 in error, please contact the sender and delete the material from your 
 computer.

Re: Storage information about an RDD from the API

2014-04-29 Thread Koert Kuipers
SparkContext.getRDDStorageInfo


On Tue, Apr 29, 2014 at 12:34 PM, Andras Nemeth 
andras.nem...@lynxanalytics.com wrote:

 Hi,

 Is it possible to know from code about an RDD if it is cached, and more
 precisely, how many of its partitions are cached in memory and how many are
 cached on disk? I know I can get the storage level, but I also want to know
 the current actual caching status. Knowing memory consumption would also be
 awesome. :)

 Basically what I'm looking for is the information on the storage tab of
 the UI, but accessible from the API.

 Thanks,
 Andras



Re: How to declare Tuple return type for a function

2014-04-29 Thread Roger Hoover
The return type should be RDD[(Int, Int, Int)] because sc.textFile()
returns an RDD.  Try adding an import for the RDD type to get rid of the
compile error.

import org.apache.spark.rdd.RDD


On Mon, Apr 28, 2014 at 6:22 PM, SK skrishna...@gmail.com wrote:

 Hi,

 I am a new user of Spark. I have a class that defines a function as
 follows.
 It returns a tuple : (Int, Int, Int).

 class Sim extends VectorSim {
  override def  input(master:String): (Int,Int,Int) = {
 sc = new SparkContext(master, Test)
 val ratings = sc.textFile(INP_FILE)
   .map(line= {
 val fields = line.split(\t)
 (fields(0).toInt, fields(1).toInt, fields(2).toInt)
   })
 ratings
   }
 }

 The class extends the trait VectorSim where the function  input() is
 declared as follows.

 trait VectorSim {
   def input (s:String): (Int, Int, Int)
 }

 However, when I compile, I get a type mismatch saying input() returns
 RDD[(Int,Int,Int)]. So I changed the return type to RDD[(Int,Int,Int)], but
 the compiler complains that there is no type called RDD. What is the right
 way to  declare when the return type of a function is  a tuple that is
 (Int,Int,Int).

 I am using spark 0.9.

 thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-declare-Tuple-return-type-for-a-function-tp4999.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Python Spark on YARN

2014-04-29 Thread Guanhua Yan
Hi all:

Is it possible to develop Spark programs in Python and run them on YARN?
From the Python SparkContext class, it doesn't seem to have such an option.

Thank you,
- Guanhua

===
Guanhua Yan, Ph.D.
Information Sciences Group (CCS-3)
Los Alamos National Laboratory
Tel: +1-505-667-0176
Email: gh...@lanl.gov
Web: http://ghyan.weebly.com/
===




How to declare Tuple return type for a function

2014-04-29 Thread SK
Hi,

I am a new user of Spark. I have a class that defines a function as follows.
It returns a tuple : (Int, Int, Int).

class Sim extends VectorSim {
override def  input(master:String): (Int,Int,Int) = {
sc = new SparkContext(master, Test)
val ratings = sc.textFile(INP_FILE)
  .map(line= {
val fields = line.split(\t)
(fields(0).toInt, fields(1).toInt, fields(2).toInt)
  })
ratings
  }
}

The class extends the trait VectorSim where the function  input() is
declared as follows.

trait VectorSim {
  def input (s:String): (Int, Int, Int)
}

However, when I compile, I get a type mismatch saying input() returns
RDD[(Int,Int,Int)]. So I changed the return type to RDD[(Int,Int,Int)], but
the compiler complains that there is no type called RDD. What is the right
way to  declare the return type for a function that returns a tuple that is
(Int,Int,Int).

I am using spark 0.9.

thanks 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-declare-Tuple-return-type-for-a-function-tp5047.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


What is Seq[V] in updateStateByKey?

2014-04-29 Thread Adrian Mocanu
What is Seq[V] in updateStateByKey?
Does this store the collected tuples of the RDD in a collection?

Method signature:
def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) = 
Option[S] ): DStream[(K, S)]

In my case I used Seq[Double] assuming a sequence of Doubles in the RDD; the 
moment I switched to a different type like Seq[(String, Double)] the code 
didn't compile.

-Adrian



packaging time

2014-04-29 Thread SK
Each time I run sbt/sbt assembly to compile my program, the packaging time
takes about 370 sec (about 6 min). How can I reduce this time? 

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/packaging-time-tp5048.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Delayed Scheduling - Setting spark.locality.wait.node parameter in interactive shell

2014-04-29 Thread Sai Prasanna
Hi All,

I have replication factor 3 in my HDFS.
With 3 datanodes, i ran my experiments. Now i just added another node to it
with no data in it.
When i ran, SPARK launches non-local tasks in it and the time taken is more
than what it took for 3 node cluster.

Here delayed scheduling fails i think because of the parameter
spark.locality.wait.node which is by default 3 sec. It launches ANY level
tasks in the added data node.

I wanted to increase this parameter in the interactive shell. How do i do
it.
What variable should i set to pass it onto the spark-context in interactive
shell?

Thanks.


Re: Spark: issues with running a sbt fat jar due to akka dependencies

2014-04-29 Thread Koert Kuipers
you need to merge reference.conf files and its no longer an issue.

see the Build for for spark itself:
  case reference.conf = MergeStrategy.concat


On Tue, Apr 29, 2014 at 3:32 PM, Shivani Rao raoshiv...@gmail.com wrote:

 Hello folks,

 I was going to post this question to spark user group as well. If you have
 any leads on how to solve this issue please let me know:

 I am trying to build a basic spark project (spark depends on akka) and I
 am trying to create a fatjar using sbt assembly. The goal is to run the
 fatjar via commandline as follows:
  java -cp path to my spark fatjar mainclassname

 I encountered deduplication errors in the following akka libraries during
 sbt assembly
 akka-remote_2.10-2.2.3.jar with akka-remote_2.10-2.2.3-shaded-protobuf.jar
  akka-actor_2.10-2.2.3.jar with akka-actor_2.10-2.2.3-shaded-protobuf.jar

 I resolved them by using MergeStrategy.first and that helped with a
 successful compilation of the sbt assembly command. But for some or the
 other configuration parameter in the akka kept throwing up with the
 following message

 Exception in thread main com.typesafe.config.ConfigException$Missing:
 No configuration setting found for key

 I then used MergeStrategy.concat for reference.conf and I started
 getting this repeated error

 Exception in thread main com.typesafe.config.ConfigException$Missing: No
 configuration setting found for key 'akka.version'.

 I noticed that akka.version is only in the akka-actor jars and not in the
 akka-remote. The resulting reference.conf (in my final fat jar) does not
 contain akka.version either. So the strategy is not working.

 There are several things I could try

 a) Use the following dependency https://github.com/sbt/sbt-proguard
 b) Write a build.scala to handle merging of reference.conf

 https://spark-project.atlassian.net/browse/SPARK-395
 http://letitcrash.com/post/21025950392/howto-sbt-assembly-vs-reference-conf

 c) Create a reference.conf by merging all akka configurations and then
 passing it in my java -cp command as shown below

 java -cp jar-name -DConfig.file=config

 The main issue is that if I run the spark jar as sbt run there are no
 errors in accessing any of the akka configuration parameters. It is only
 when I run it via command line (java -cp jar-name classname) that I
 encounter the error.

 Which of these is a long term fix to akka issues? For now, I removed the
 akka dependencies and that solved the problem, but I know that is not a
 long term solution

 Regards,
 Shivani

 --
 Software Engineer
 Analytics Engineering Team@ Box
 Mountain View, CA



Re: packaging time

2014-04-29 Thread Mark Hamstra
Tip: read the wiki --
https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools


On Tue, Apr 29, 2014 at 12:48 PM, Daniel Darabos 
daniel.dara...@lynxanalytics.com wrote:

 Tips from my experience. Disable scaladoc:

 sources in doc in Compile := List()

 Do not package the source:

 publishArtifact in packageSrc := false

 And most importantly do not run sbt assembly. It creates a fat jar. Use
 sbt package or sbt stage (from sbt-native-packager). They create a
 directory full of jars, and only need to update the one containing your
 code.



 On Tue, Apr 29, 2014 at 8:50 PM, SK skrishna...@gmail.com wrote:

 Each time I run sbt/sbt assembly to compile my program, the packaging time
 takes about 370 sec (about 6 min). How can I reduce this time?

 thanks



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/packaging-time-tp5048.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.





Re: What is Seq[V] in updateStateByKey?

2014-04-29 Thread Sean Owen
The original DStream is of (K,V). This function creates a DStream of
(K,S). Each time slice brings one or more new V for each K. The old
state S (can be different from V!) for each K -- possibly non-existent
-- is updated in some way by a bunch of new V, to produce a new state
S -- which also might not exist anymore after update. That's why the
function is from a Seq[V], and an Option[S], to an Option[S].

If you RDD has value type V = Double then your function needs to
update state based on a new Seq[Double] at each time slice, since
Doubles are the new thing arriving for each key at each time slice.


On Tue, Apr 29, 2014 at 7:50 PM, Adrian Mocanu
amoc...@verticalscope.com wrote:
 What is Seq[V] in updateStateByKey?

 Does this store the collected tuples of the RDD in a collection?



 Method signature:

 def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) =
 Option[S] ): DStream[(K, S)]



 In my case I used Seq[Double] assuming a sequence of Doubles in the RDD; the
 moment I switched to a different type like Seq[(String, Double)] the code
 didn’t compile.



 -Adrian




Re: What is Seq[V] in updateStateByKey?

2014-04-29 Thread Tathagata Das
You may have already seen it, but I will mention it anyways. This example
may help.
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala

Here the state is essentially a running count of the words seen. So the
value type (i.e, V) is Int (count of a word in each batch) and the state
type (i.e. S) is also a Int (running count). The updateFunction essentially
sums up the running count with the new count and to generate a new running
count.

TD



On Tue, Apr 29, 2014 at 1:49 PM, Sean Owen so...@cloudera.com wrote:

 The original DStream is of (K,V). This function creates a DStream of
 (K,S). Each time slice brings one or more new V for each K. The old
 state S (can be different from V!) for each K -- possibly non-existent
 -- is updated in some way by a bunch of new V, to produce a new state
 S -- which also might not exist anymore after update. That's why the
 function is from a Seq[V], and an Option[S], to an Option[S].

 If you RDD has value type V = Double then your function needs to
 update state based on a new Seq[Double] at each time slice, since
 Doubles are the new thing arriving for each key at each time slice.


 On Tue, Apr 29, 2014 at 7:50 PM, Adrian Mocanu
 amoc...@verticalscope.com wrote:
  What is Seq[V] in updateStateByKey?
 
  Does this store the collected tuples of the RDD in a collection?
 
 
 
  Method signature:
 
  def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) =
  Option[S] ): DStream[(K, S)]
 
 
 
  In my case I used Seq[Double] assuming a sequence of Doubles in the RDD;
 the
  moment I switched to a different type like Seq[(String, Double)] the code
  didn’t compile.
 
 
 
  -Adrian
 
 



Re: Python Spark on YARN

2014-04-29 Thread Matei Zaharia
This will be possible in 1.0 after this pull request: 
https://github.com/apache/spark/pull/30

Matei

On Apr 29, 2014, at 9:51 AM, Guanhua Yan gh...@lanl.gov wrote:

 Hi all:
 
 Is it possible to develop Spark programs in Python and run them on YARN? From 
 the Python SparkContext class, it doesn't seem to have such an option.
 
 Thank you,
 - Guanhua
 
 ===
 Guanhua Yan, Ph.D.
 Information Sciences Group (CCS-3)
 Los Alamos National Laboratory
 Tel: +1-505-667-0176
 Email: gh...@lanl.gov
 Web: http://ghyan.weebly.com/
 ===



Spark's behavior

2014-04-29 Thread Eduardo Costa Alfaia
Hi TD,

In my tests with spark streaming, I'm using JavaNetworkWordCount(modified) code 
and a program that I wrote that sends words to the Spark worker, I use TCP as 
transport. I verified that after starting Spark, it connects to my source which 
actually starts sending, but the first word count is advertised approximately 
30 seconds after the context creation. So I'm wondering where is stored the 30 
seconds data already sent by the source. Is this a normal spark’s behaviour? I 
saw the same behaviour using the shipped JavaNetworkWordCount application.

Many thanks.
-- 
Informativa sulla Privacy: http://www.unibs.it/node/8155


rdd ordering gets scrambled

2014-04-29 Thread Mohit Jaggi
Hi,
I started with a text file(CSV) of sorted data (by first column), parsed it
into Scala objects using map operation in Scala. Then I used more maps to
add some extra info to the data and saved it as text file.
The final text file is not sorted. What do I need to do to keep the order
from the original input intact?

My code looks like:

csvFile = sc.textFile(..) //file is CSV and ordered by first column
splitRdd = csvFile map { line = line.split(,,-1) }
parsedRdd = rdd map { parts =
  {
key = parts(0) //use first column as key
value = new MyObject(parts(0), parts(1)) //parse into scala objects
(key, value)
  }

augmentedRdd = parsedRdd map { x =
   key =  x._1
   value = //add extra fields to x._2
   (key, value)
}
augmentedRdd.saveAsFile(...) //this file is not sorted

Mohit.


Re: Spark's behavior

2014-04-29 Thread Eduardo Costa Alfaia
Hi TD,
We are not using stream context with master local, we have 1 Master and 8 
Workers and 1 word source. The command line that we are using is:
bin/run-example org.apache.spark.streaming.examples.JavaNetworkWordCount 
spark://192.168.0.13:7077
 
On Apr 30, 2014, at 0:09, Tathagata Das tathagata.das1...@gmail.com wrote:

 Is you batch size 30 seconds by any chance? 
 
 Assuming not, please check whether you are creating the streaming context 
 with master local[n] where n  2. With local or local[1], the system 
 only has one processing slot, which is occupied by the receiver leaving no 
 room for processing the received data. It could be that after 30 seconds, the 
 server disconnects, the receiver terminates, releasing the single slot for 
 the processing to proceed. 
 
 TD
 
 
 On Tue, Apr 29, 2014 at 2:28 PM, Eduardo Costa Alfaia 
 e.costaalf...@unibs.it wrote:
 Hi TD,
 
 In my tests with spark streaming, I'm using JavaNetworkWordCount(modified) 
 code and a program that I wrote that sends words to the Spark worker, I use 
 TCP as transport. I verified that after starting Spark, it connects to my 
 source which actually starts sending, but the first word count is advertised 
 approximately 30 seconds after the context creation. So I'm wondering where 
 is stored the 30 seconds data already sent by the source. Is this a normal 
 spark’s behaviour? I saw the same behaviour using the shipped 
 JavaNetworkWordCount application.
 
 Many thanks.
 --
 Informativa sulla Privacy: http://www.unibs.it/node/8155
 


-- 
Informativa sulla Privacy: http://www.unibs.it/node/8155


Re: Python Spark on YARN

2014-04-29 Thread Guanhua Yan
Thanks, Matei. Will take a look at it.

Best regards,
Guanhua

From:  Matei Zaharia matei.zaha...@gmail.com
Reply-To:  user@spark.apache.org
Date:  Tue, 29 Apr 2014 14:19:30 -0700
To:  user@spark.apache.org
Subject:  Re: Python Spark on YARN

This will be possible in 1.0 after this pull request:
https://github.com/apache/spark/pull/30

Matei

On Apr 29, 2014, at 9:51 AM, Guanhua Yan gh...@lanl.gov wrote:

 Hi all:
 
 Is it possible to develop Spark programs in Python and run them on YARN? From
 the Python SparkContext class, it doesn't seem to have such an option.
 
 Thank you,
 - Guanhua
 
 ===
 Guanhua Yan, Ph.D.
 Information Sciences Group (CCS-3)
 Los Alamos National Laboratory
 Tel: +1-505-667-0176
 Email: gh...@lanl.gov
 Web: http://ghyan.weebly.com http://ghyan.weebly.com/ /
 ===





Re: Spark's behavior

2014-04-29 Thread Tathagata Das
Strange! Can you just do lines.print() to print the raw data instead of
doing word count. Beyond that we can do two things.

1. Can see the Spark stage UI to see whether there are stages running
during the 30 second period you referred to?
2. If you upgrade to using Spark master branch (or Spark 1.0 RC3, see
different thread by Patrick), it has a streaming UI, which shows the number
of records received, the state of the receiver, etc. That may be more
useful in debugging whats going on .

TD


On Tue, Apr 29, 2014 at 3:31 PM, Eduardo Costa Alfaia 
e.costaalf...@unibs.it wrote:

 Hi TD,
 We are not using stream context with master local, we have 1 Master and 8
 Workers and 1 word source. The command line that we are using is:
 bin/run-example org.apache.spark.streaming.examples.JavaNetworkWordCount
 spark://192.168.0.13:7077

 On Apr 30, 2014, at 0:09, Tathagata Das tathagata.das1...@gmail.com
 wrote:

 Is you batch size 30 seconds by any chance?

 Assuming not, please check whether you are creating the streaming context
 with master local[n] where n  2. With local or local[1], the system
 only has one processing slot, which is occupied by the receiver leaving no
 room for processing the received data. It could be that after 30 seconds,
 the server disconnects, the receiver terminates, releasing the single slot
 for the processing to proceed.

 TD


 On Tue, Apr 29, 2014 at 2:28 PM, Eduardo Costa Alfaia 
 e.costaalf...@unibs.it wrote:

 Hi TD,

 In my tests with spark streaming, I'm using
 JavaNetworkWordCount(modified) code and a program that I wrote that sends
 words to the Spark worker, I use TCP as transport. I verified that after
 starting Spark, it connects to my source which actually starts sending, but
 the first word count is advertised approximately 30 seconds after the
 context creation. So I'm wondering where is stored the 30 seconds data
 already sent by the source. Is this a normal spark’s behaviour? I saw the
 same behaviour using the shipped JavaNetworkWordCount application.

 Many thanks.
 --
 Informativa sulla Privacy: http://www.unibs.it/node/8155




 Informativa sulla Privacy: http://www.unibs.it/node/8155



RE: Shuffle Spill Issue

2014-04-29 Thread Liu, Raymond
Hi Daniel

Thanks for your reply, While I think for reduceByKey, it will also do 
map side combine, thus extra the result is the same, say, for each partition, 
one entry per distinct word. In my case with javaserializer,  240MB dataset 
yield to around 70MB shuffle data. Only that shuffle Spill ( memory ) is 
abnormal, and sounds to me should not trigger at all. And, by the way, this 
behavior only occurs in map out side, on reduce / shuffle fetch side, this 
strange behavior won't happen.

Best Regards,
Raymond Liu

From: Daniel Darabos [mailto:daniel.dara...@lynxanalytics.com] 

I have no idea why shuffle spill is so large. But this might make it smaller:

val addition = (a: Int, b: Int) = a + b
val wordsCount = wordsPair.combineByKey(identity, addition, addition)

This way only one entry per distinct word will end up in the shuffle for each 
partition, instead of one entry per word occurrence.

On Tue, Apr 29, 2014 at 7:48 AM, Liu, Raymond raymond@intel.com wrote:
Hi  Patrick

        I am just doing simple word count , the data is generated by hadoop 
random text writer.

        This seems to me not quite related to compress , If I turn off compress 
on shuffle, the metrics is something like below for the smaller 240MB Dataset.


Executor ID     Address Task Time       Total Tasks     Failed Tasks    
Succeeded Tasks Shuffle Read    Shuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
10      sr437:48527     35 s    8       0       8       0.0 B   2.5 MB  2.2 GB  
1291.2 KB
12      sr437:46077     34 s    8       0       8       0.0 B   2.5 MB  1822.6 
MB       1073.3 KB
13      sr434:37896     31 s    8       0       8       0.0 B   2.4 MB  1099.2 
MB       621.2 KB
15      sr438:52819     31 s    8       0       8       0.0 B   2.5 MB  1898.8 
MB       1072.6 KB
16      sr434:37103     32 s    8       0       8       0.0 B   2.4 MB  1638.0 
MB       1044.6 KB


        And the program pretty simple:

val files = sc.textFile(args(1))
val words = files.flatMap(_.split( ))
val wordsPair = words.map(x = (x, 1))

val wordsCount = wordsPair.reduceByKey(_ + _)
val count = wordsCount.count()

println(Number of words =  + count)


Best Regards,
Raymond Liu

From: Patrick Wendell [mailto:pwend...@gmail.com]

Could you explain more what your job is doing and what data types you are 
using? These numbers alone don't necessarily indicate something is wrong. The 
relationship between the in-memory and on-disk shuffle amount is definitely a 
bit strange, the data gets compressed when written to disk, but unless you have 
a weird dataset (E.g. all zeros) I wouldn't expect it to compress _that_ much.

On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond raymond@intel.com wrote:
Hi


        I am running a simple word count program on spark standalone cluster. 
The cluster is made up of 6 node, each run 4 worker and each worker own 10G 
memory and 16 core thus total 96 core and 240G memory. ( well, also used to 
configed as 1 worker with 40G memory on each node )

        I run a very small data set (2.4GB on HDFS on total) to confirm the 
problem here as below:

        As you can read from part of the task metrics as below, I noticed that 
the shuffle spill part of metrics indicate that there are something wrong.

Executor ID     Address Task Time       Total Tasks     Failed Tasks    
Succeeded Tasks Shuffle Read    Shuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
0       sr437:42139     29 s    4       0       4       0.0 B   4.3 MB  23.6 GB 
4.3 MB
1       sr433:46935     1.1 min 4       0       4       0.0 B   4.2 MB  19.0 GB 
3.4 MB
10      sr436:53277     26 s    4       0       4       0.0 B   4.3 MB  25.6 GB 
4.6 MB
11      sr437:58872     32 s    4       0       4       0.0 B   4.3 MB  25.0 GB 
4.4 MB
12      sr435:48358     27 s    4       0       4       0.0 B   4.3 MB  25.1 GB 
4.4 MB


You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x of the 
actual shuffle data and Shuffle Spill (Disk), and also it seems to me that by 
no means that the spill should trigger, since the memory is not used up at all.

To verify that I further reduce the data size to 240MB on total

And here is the result:


Executor ID     Address Task Time       Total Tasks     Failed Tasks    
Succeeded Tasks Shuffle Read    Shuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
0       sr437:50895     15 s    4       0       4       0.0 B   703.0 KB        
80.0 MB 43.2 KB
1       sr433:50207     17 s    4       0       4       0.0 B   704.7 KB        
389.5 MB        90.2 KB
10      sr436:56352     16 s    4       0       4       0.0 B   700.9 KB        
814.9 MB        181.6 KB
11      sr437:53099     15 s    4       0       4       0.0 B   689.7 KB        
0.0 B   0.0 B
12      sr435:48318     15 s    4       0       4       0.0 B   702.1 KB        
427.4 MB        90.7 KB
13      sr433:59294     17 s    4       0       4       0.0 B   704.8 KB        
779.9 MB        

Re: NoSuchMethodError from Spark Java

2014-04-29 Thread wxhsdp
i met with the same question when update to spark 0.9.1
(svn checkout https://github.com/apache/spark/)

Exception in thread main java.lang.NoSuchMethodError:
org.apache.spark.SparkContext$.jarOfClass(Ljava/lang/Class;)Lscala/collection/Seq;
at org.apache.spark.examples.GroupByTest$.main(GroupByTest.scala:38)
at org.apache.spark.examples.GroupByTest.main(GroupByTest.scala)

sbt.buid:
name := GroupByTest

version := 1.0

scalaVersion := 2.10.4

libraryDependencies += org.apache.spark %% spark-core % 0.9.1

resolvers += Akka Repository at http://repo.akka.io/releases/;

is there something need to modify?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-from-Spark-Java-tp4937p5076.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


How fast would you expect shuffle serialize to be?

2014-04-29 Thread Liu, Raymond
Hi

I am running a WordCount program which count words from HDFS, and I 
noticed that the serializer part of code takes a lot of CPU time. On a 
16core/32thread node, the total throughput is around 50MB/s by JavaSerializer, 
and if I switching to KryoSerializer, it doubles to around 100-150MB/s. ( I 
have 12 disks per node and files scatter across disks, so HDFS BW is not a 
problem)

And I also notice that, in this case, the object to write is (String, 
Int), if I try some case with (int, int), the throughput will be 2-3x faster 
further.

So, in my Wordcount case, the bottleneck is CPU ( cause if with shuffle 
compress on, the 150MB/s data bandwidth in input side, will usually lead to 
around 50MB/s shuffle data)

This serialize BW looks somehow too low , so I am wondering, what's BW 
you observe in your case? Does this throughput sounds reasonable to you? If 
not, anything might possible need to be examined in my case?



Best Regards,
Raymond Liu




Re: How fast would you expect shuffle serialize to be?

2014-04-29 Thread Patrick Wendell
Is this the serialization throughput per task or the serialization
throughput for all the tasks?

On Tue, Apr 29, 2014 at 9:34 PM, Liu, Raymond raymond@intel.com wrote:
 Hi

 I am running a WordCount program which count words from HDFS, and I 
 noticed that the serializer part of code takes a lot of CPU time. On a 
 16core/32thread node, the total throughput is around 50MB/s by 
 JavaSerializer, and if I switching to KryoSerializer, it doubles to around 
 100-150MB/s. ( I have 12 disks per node and files scatter across disks, so 
 HDFS BW is not a problem)

 And I also notice that, in this case, the object to write is (String, 
 Int), if I try some case with (int, int), the throughput will be 2-3x faster 
 further.

 So, in my Wordcount case, the bottleneck is CPU ( cause if with 
 shuffle compress on, the 150MB/s data bandwidth in input side, will usually 
 lead to around 50MB/s shuffle data)

 This serialize BW looks somehow too low , so I am wondering, what's 
 BW you observe in your case? Does this throughput sounds reasonable to you? 
 If not, anything might possible need to be examined in my case?



 Best Regards,
 Raymond Liu




RE: How fast would you expect shuffle serialize to be?

2014-04-29 Thread Liu, Raymond
For all the tasks, say 32 task on total

Best Regards,
Raymond Liu


-Original Message-
From: Patrick Wendell [mailto:pwend...@gmail.com] 

Is this the serialization throughput per task or the serialization throughput 
for all the tasks?

On Tue, Apr 29, 2014 at 9:34 PM, Liu, Raymond raymond@intel.com wrote:
 Hi

 I am running a WordCount program which count words from HDFS, 
 and I noticed that the serializer part of code takes a lot of CPU 
 time. On a 16core/32thread node, the total throughput is around 50MB/s 
 by JavaSerializer, and if I switching to KryoSerializer, it doubles to 
 around 100-150MB/s. ( I have 12 disks per node and files scatter 
 across disks, so HDFS BW is not a problem)

 And I also notice that, in this case, the object to write is (String, 
 Int), if I try some case with (int, int), the throughput will be 2-3x faster 
 further.

 So, in my Wordcount case, the bottleneck is CPU ( cause if 
 with shuffle compress on, the 150MB/s data bandwidth in input side, 
 will usually lead to around 50MB/s shuffle data)

 This serialize BW looks somehow too low , so I am wondering, what's 
 BW you observe in your case? Does this throughput sounds reasonable to you? 
 If not, anything might possible need to be examined in my case?



 Best Regards,
 Raymond Liu




Re: Union of 2 RDD's only returns the first one

2014-04-29 Thread Mingyu Kim
Hi Patrick,

I¹m a little confused about your comment that RDDs are not ordered. As far
as I know, RDDs keep list of partitions that are ordered and this is why I
can call RDD.take() and get the same first k rows every time I call it and
RDD.take() returns the same entries as RDD.map(Š).take() because map
preserves the partition order. RDD order is also what allows me to get the
top k out of RDD by doing RDD.sort().take().

Am I misunderstanding it? Or, is it just when RDD is written to disk that
the order is not well preserved? Thanks in advance!

Mingyu




On 1/22/14, 4:46 PM, Patrick Wendell pwend...@gmail.com wrote:

Ah somehow after all this time I've never seen that!

On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia buendia...@gmail.com
wrote:



 On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell pwend...@gmail.com
 wrote:

 What is the ++ operator here? Is this something you defined?


 No, it's an alias for union defined in RDD.scala:

 def ++(other: RDD[T]): RDD[T] = this.union(other)



 Another issue is that RDD's are not ordered, so when you union two
 together it doesn't have a well defined ordering.

 If you do want to do this you could coalesce into one partition, then
 call MapPartitions and return an iterator that first adds your header
 and then the rest of the file, then call saveAsTextFile. Keep in mind
 this will only work if you coalesce into a single partition.


 Thanks! I'll give this a try.



 myRdd.coalesce(1)
 .map(_.mkString(,)))
 .mapPartitions(it = (Seq(col1,col2,col3) ++ it).iterator)
 .saveAsTextFile(out.csv)

 - Patrick

 On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
 buendia...@gmail.com wrote:
  Hi,
 
  I'm trying to find a way to create a csv header when using
  saveAsTextFile,
  and I came up with this:
 
  (sc.makeRDD(Array(col1,col2,col3), 1) ++
  myRdd.coalesce(1).map(_.mkString(,)))
.saveAsTextFile(out.csv)
 
  But it only saves the header part. Why is that the union method does
not
  return both RDD's?




smime.p7s
Description: S/MIME cryptographic signature


RE: How fast would you expect shuffle serialize to be?

2014-04-29 Thread Liu, Raymond
By the way, to be clear, I run repartition firstly to make all data go through 
shuffle instead of run ReduceByKey etc directly ( which reduce the data need to 
be shuffle and serialized), thus say all 50MB/s data from HDFS will go to 
serializer. ( in fact, I also tried generate data in memory directly instead of 
read from HDFS, similar throughput result)

Best Regards,
Raymond Liu


-Original Message-
From: Liu, Raymond [mailto:raymond@intel.com] 

For all the tasks, say 32 task on total

Best Regards,
Raymond Liu


-Original Message-
From: Patrick Wendell [mailto:pwend...@gmail.com] 

Is this the serialization throughput per task or the serialization throughput 
for all the tasks?

On Tue, Apr 29, 2014 at 9:34 PM, Liu, Raymond raymond@intel.com wrote:
 Hi

 I am running a WordCount program which count words from HDFS, 
 and I noticed that the serializer part of code takes a lot of CPU 
 time. On a 16core/32thread node, the total throughput is around 50MB/s 
 by JavaSerializer, and if I switching to KryoSerializer, it doubles to 
 around 100-150MB/s. ( I have 12 disks per node and files scatter 
 across disks, so HDFS BW is not a problem)

 And I also notice that, in this case, the object to write is (String, 
 Int), if I try some case with (int, int), the throughput will be 2-3x faster 
 further.

 So, in my Wordcount case, the bottleneck is CPU ( cause if 
 with shuffle compress on, the 150MB/s data bandwidth in input side, 
 will usually lead to around 50MB/s shuffle data)

 This serialize BW looks somehow too low , so I am wondering, what's 
 BW you observe in your case? Does this throughput sounds reasonable to you? 
 If not, anything might possible need to be examined in my case?



 Best Regards,
 Raymond Liu




Re: JavaSparkConf

2014-04-29 Thread Patrick Wendell
This class was made to be java friendly so that we wouldn't have to
use two versions. The class itself is simple. But I agree adding java
setters would be nice.

On Tue, Apr 29, 2014 at 8:32 PM, Soren Macbeth so...@yieldbot.com wrote:
 There is a JavaSparkContext, but no JavaSparkConf object. I know SparkConf
 is new in 0.9.x.

 Is there a plan to add something like this to the java api?

 It's rather a bother to have things like setAll take a scala
 Traverable[String String] when using SparkConf from the java api.

 At a minimum adding methods signatures for java collections where there are
 currently scala collection would be a good start.

 TIA


RE: How fast would you expect shuffle serialize to be?

2014-04-29 Thread Liu, Raymond
Later case, total throughput aggregated from all cores.

Best Regards,
Raymond Liu


-Original Message-
From: Patrick Wendell [mailto:pwend...@gmail.com] 
Sent: Wednesday, April 30, 2014 1:22 PM
To: user@spark.apache.org
Subject: Re: How fast would you expect shuffle serialize to be?

Hm - I'm still not sure if you mean
100MB/s for each task = 3200MB/s across all cores
-or-
3.1MB/s for each task = 100MB/s across all cores

If it's the second one, that's really slow and something is wrong. If it's the 
first one this in the range of what I'd expect, but I'm no expert.

On Tue, Apr 29, 2014 at 10:14 PM, Liu, Raymond raymond@intel.com wrote:
 For all the tasks, say 32 task on total

 Best Regards,
 Raymond Liu


 -Original Message-
 From: Patrick Wendell [mailto:pwend...@gmail.com]

 Is this the serialization throughput per task or the serialization throughput 
 for all the tasks?

 On Tue, Apr 29, 2014 at 9:34 PM, Liu, Raymond raymond@intel.com wrote:
 Hi

 I am running a WordCount program which count words from HDFS, 
 and I noticed that the serializer part of code takes a lot of CPU 
 time. On a 16core/32thread node, the total throughput is around 
 50MB/s by JavaSerializer, and if I switching to KryoSerializer, it 
 doubles to around 100-150MB/s. ( I have 12 disks per node and files 
 scatter across disks, so HDFS BW is not a problem)

 And I also notice that, in this case, the object to write is 
 (String, Int), if I try some case with (int, int), the throughput will be 
 2-3x faster further.

 So, in my Wordcount case, the bottleneck is CPU ( cause if 
 with shuffle compress on, the 150MB/s data bandwidth in input side, 
 will usually lead to around 50MB/s shuffle data)

 This serialize BW looks somehow too low , so I am wondering, what's 
 BW you observe in your case? Does this throughput sounds reasonable to you? 
 If not, anything might possible need to be examined in my case?



 Best Regards,
 Raymond Liu




Re: Union of 2 RDD's only returns the first one

2014-04-29 Thread Patrick Wendell
You are right, once you sort() the RDD, then yes it has a well defined ordering.

But that ordering is lost as soon as you transform the RDD, including
if you union it with another RDD.

On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim m...@palantir.com wrote:
 Hi Patrick,

 I¹m a little confused about your comment that RDDs are not ordered. As far
 as I know, RDDs keep list of partitions that are ordered and this is why I
 can call RDD.take() and get the same first k rows every time I call it and
 RDD.take() returns the same entries as RDD.map(Š).take() because map
 preserves the partition order. RDD order is also what allows me to get the
 top k out of RDD by doing RDD.sort().take().

 Am I misunderstanding it? Or, is it just when RDD is written to disk that
 the order is not well preserved? Thanks in advance!

 Mingyu




 On 1/22/14, 4:46 PM, Patrick Wendell pwend...@gmail.com wrote:

Ah somehow after all this time I've never seen that!

On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia buendia...@gmail.com
wrote:



 On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell pwend...@gmail.com
 wrote:

 What is the ++ operator here? Is this something you defined?


 No, it's an alias for union defined in RDD.scala:

 def ++(other: RDD[T]): RDD[T] = this.union(other)



 Another issue is that RDD's are not ordered, so when you union two
 together it doesn't have a well defined ordering.

 If you do want to do this you could coalesce into one partition, then
 call MapPartitions and return an iterator that first adds your header
 and then the rest of the file, then call saveAsTextFile. Keep in mind
 this will only work if you coalesce into a single partition.


 Thanks! I'll give this a try.



 myRdd.coalesce(1)
 .map(_.mkString(,)))
 .mapPartitions(it = (Seq(col1,col2,col3) ++ it).iterator)
 .saveAsTextFile(out.csv)

 - Patrick

 On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
 buendia...@gmail.com wrote:
  Hi,
 
  I'm trying to find a way to create a csv header when using
  saveAsTextFile,
  and I came up with this:
 
  (sc.makeRDD(Array(col1,col2,col3), 1) ++
  myRdd.coalesce(1).map(_.mkString(,)))
.saveAsTextFile(out.csv)
 
  But it only saves the header part. Why is that the union method does
not
  return both RDD's?