Re: Why Spark require this object to be serializerable?
The code is here:https://github.com/Earthson/sparklda/blob/master/src/main/scala/net/earthson/nlp/lda/lda.scala I've change it to from Broadcast to Serializable. Now it works:) But There are too many rdd cache, It is the problem? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-Spark-require-this-object-to-be-serializerable-tp5009p5024.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Issue during Spark streaming with ZeroMQ source
Hi, all I installed spark-0.9.1 and zeromq 4.0.1 , and then run below example: ./bin/run-example org.apache.spark.streaming.examples.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar` ./bin/run-example org.apache.spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo` No any message was received in ZeroMQWordCount side. Does anyone know what the issue is ? Thanks, Francis
Re: Issue during Spark streaming with ZeroMQ source
Unfortunately zeromq 4.0.1 is not supported. https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala#L63Says about the version. You will need that version of zeromq to see it work. Basically I have seen it working nicely with zeromq 2.2.0 and if you have jzmq libraries installed performance is much better. Prashant Sharma On Tue, Apr 29, 2014 at 12:29 PM, Francis.Hu francis...@reachjunction.comwrote: Hi, all I installed spark-0.9.1 and zeromq 4.0.1 , and then run below example: ./bin/run-example org.apache.spark.streaming.examples.SimpleZeroMQPublisher *tcp*:// 127.0.1.1:1234 foo.bar` ./bin/run-example org.apache.spark.streaming.examples.ZeroMQWordCount local[2] *tcp*://127.0.1.1:1234 *foo*` No any message was received in ZeroMQWordCount side. Does anyone know what the issue is ? Thanks, Francis
Re: 答复: Issue during Spark streaming with ZeroMQ source
Well that is not going to be easy, simply because we depend on akka-zeromq for zeromq support. And since akka does not support the latest zeromq library yet, I doubt if there is something simple that can be done to support it. Prashant Sharma On Tue, Apr 29, 2014 at 2:44 PM, Francis.Hu francis...@reachjunction.comwrote: Thanks, Prashant Sharma It works right now after degrade zeromq from 4.0.1 to 2.2. Do you know the new release of spark whether it will upgrade zeromq ? Many of our programs are using zeromq 4.0.1, so if in next release ,spark streaming can release with a newer zeromq that would be better for us. Francis. *发件人:* Prashant Sharma [mailto:scrapco...@gmail.com] *发送时间:* Tuesday, April 29, 2014 15:53 *收件人:* user@spark.apache.org *主题:* Re: Issue during Spark streaming with ZeroMQ source Unfortunately zeromq 4.0.1 is not supported. https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala#L63Says about the version. You will need that version of zeromq to see it work. Basically I have seen it working nicely with zeromq 2.2.0 and if you have jzmq libraries installed performance is much better. Prashant Sharma On Tue, Apr 29, 2014 at 12:29 PM, Francis.Hu francis...@reachjunction.com wrote: Hi, all I installed spark-0.9.1 and zeromq 4.0.1 , and then run below example: ./bin/run-example org.apache.spark.streaming.examples.SimpleZeroMQPublisher *tcp*:// 127.0.1.1:1234 foo.bar` ./bin/run-example org.apache.spark.streaming.examples.ZeroMQWordCount local[2] *tcp*://127.0.1.1:1234 *foo*` No any message was received in ZeroMQWordCount side. Does anyone know what the issue is ? Thanks, Francis
Re: launching concurrent jobs programmatically
Very interesting. One of spark's attractive features is being able to do stuff interactively via spark-shell. Is something like that still available via Ooyala's job server? Or do you use the spark-shell independently of that? If the latter then how do you manage custom jars for spark-shell? Our app has a number of jars that I don't particularly want to have to upload each time I want to run a small ad-hoc spark-shell session. Thanks, Ishaaq -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/launching-concurrent-jobs-programmatically-tp4990p5033.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Fwd: Spark RDD cache memory usage
Hi, By default a fraction of the executor memory (60%) is reserved for RDD caching, so if there's no explicit caching in the code (eg. rdd.cache() etc.), or if we persist RDD with StorageLevel.DISK_ONLY, is this part of memory wated? Does Spark allocates the RDD cache memory dynamically? Or does spark automatically caches RDDs when it can? Thanks. -- *JU Han* Data Engineer @ Botify.com +33 061960
Re: Joining not-pair RDDs in Spark
Create a key and join on that. val callPricesByHour = callPrices.map(p = ((p.year, p.month, p.day, p.hour), p)) val callsByHour = calls.map(c = ((c.year, c.month, c.day, c.hour), c)) val bills = callPricesByHour.join(callsByHour).mapValues({ case (p, c) = BillRow(c.customer, c.hour, c.minutes * p.basePrice) }).values You should be able to expand this approach to three RDDs too. On Tue, Apr 29, 2014 at 11:55 AM, jsantos jsan...@tecsisa.com wrote: In the context of telecom industry, let's supose we have several existing RDDs populated from some tables in Cassandra: val callPrices: RDD[PriceRow] val calls: RDD[CallRow] val offersInCourse: RDD[OfferRow] where types are defined as follows, /** Represents the price per minute for a concrete hour */ case class PriceRow( val year: Int, val month: Int, val day: Int, val hour: Int, val basePrice: Float) /** Call registries*/ case class CallRow( val customer: String, val year: Int, val month: Int, val day: Int, val minutes: Int) /** Is there any discount that could be applicable here? */ case class OfferRow( val offerName: String, val hour: Int,//[0..23] val discount: Float)//[0..1] Assuming we cannot use `flatMap` to mix these three RDDs like this way (since RDD is not really 'monadic'): /** * The final bill at a concrete hour for a call * is defined as {{{ *def billPerHour(minutes: Int,basePrice:Float,discount:Float) = * minutes * basePrice * discount * }}} */ val bills: RDD[BillRow] = for{ price - callPrices call - calls if call.hour==price.hour offer - offersInCourse if offer.hour==price.hour } yield BillRow( call.customer, call.hour, billPerHour(call.minutes,price.basePrice,offer.discount)) case class BillRow( val customer: String, val hour: DateTime, val amount: Float) which is the best practise for generating a new RDD that join all these three RDDs and represents the bill for a concrete customer? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Joining-not-pair-RDDs-in-Spark-tp5034.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: User/Product Clustering with pySpark ALS
There's no easy way to d this currently. The pieces are there from the PySpark code for regression which should be adaptable. But you'd have to roll your own solution. This is something I also want so I intend to put together a pull request for this soon — Sent from Mailbox On Tue, Apr 29, 2014 at 4:28 PM, Laird, Benjamin benjamin.la...@capitalone.com wrote: Hi all - I’m using pySpark/MLLib ALS for user/item clustering and would like to directly access the user/product RDDs (called userFeatures/productFeatures in class MatrixFactorizationModel in mllib/recommendation/MatrixFactorizationModel.scala This doesn’t seem to complex, but it doesn’t seem like the functionality is currently available. I think it requires accessing the underlying java mode like so: model = ALS.train(ratings,1,iterations=1,blocks=5) userFeatures = RDD(model.javamodel.userFeatures, sc, ???) However, I don’t know what to pass as the deserializer. I need these low dimensional vectors as an RDD to then use in Kmeans clustering. Has anyone done something similar? Ben The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: Storage information about an RDD from the API
SparkContext.getRDDStorageInfo On Tue, Apr 29, 2014 at 12:34 PM, Andras Nemeth andras.nem...@lynxanalytics.com wrote: Hi, Is it possible to know from code about an RDD if it is cached, and more precisely, how many of its partitions are cached in memory and how many are cached on disk? I know I can get the storage level, but I also want to know the current actual caching status. Knowing memory consumption would also be awesome. :) Basically what I'm looking for is the information on the storage tab of the UI, but accessible from the API. Thanks, Andras
Re: How to declare Tuple return type for a function
The return type should be RDD[(Int, Int, Int)] because sc.textFile() returns an RDD. Try adding an import for the RDD type to get rid of the compile error. import org.apache.spark.rdd.RDD On Mon, Apr 28, 2014 at 6:22 PM, SK skrishna...@gmail.com wrote: Hi, I am a new user of Spark. I have a class that defines a function as follows. It returns a tuple : (Int, Int, Int). class Sim extends VectorSim { override def input(master:String): (Int,Int,Int) = { sc = new SparkContext(master, Test) val ratings = sc.textFile(INP_FILE) .map(line= { val fields = line.split(\t) (fields(0).toInt, fields(1).toInt, fields(2).toInt) }) ratings } } The class extends the trait VectorSim where the function input() is declared as follows. trait VectorSim { def input (s:String): (Int, Int, Int) } However, when I compile, I get a type mismatch saying input() returns RDD[(Int,Int,Int)]. So I changed the return type to RDD[(Int,Int,Int)], but the compiler complains that there is no type called RDD. What is the right way to declare when the return type of a function is a tuple that is (Int,Int,Int). I am using spark 0.9. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-declare-Tuple-return-type-for-a-function-tp4999.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Python Spark on YARN
Hi all: Is it possible to develop Spark programs in Python and run them on YARN? From the Python SparkContext class, it doesn't seem to have such an option. Thank you, - Guanhua === Guanhua Yan, Ph.D. Information Sciences Group (CCS-3) Los Alamos National Laboratory Tel: +1-505-667-0176 Email: gh...@lanl.gov Web: http://ghyan.weebly.com/ ===
How to declare Tuple return type for a function
Hi, I am a new user of Spark. I have a class that defines a function as follows. It returns a tuple : (Int, Int, Int). class Sim extends VectorSim { override def input(master:String): (Int,Int,Int) = { sc = new SparkContext(master, Test) val ratings = sc.textFile(INP_FILE) .map(line= { val fields = line.split(\t) (fields(0).toInt, fields(1).toInt, fields(2).toInt) }) ratings } } The class extends the trait VectorSim where the function input() is declared as follows. trait VectorSim { def input (s:String): (Int, Int, Int) } However, when I compile, I get a type mismatch saying input() returns RDD[(Int,Int,Int)]. So I changed the return type to RDD[(Int,Int,Int)], but the compiler complains that there is no type called RDD. What is the right way to declare the return type for a function that returns a tuple that is (Int,Int,Int). I am using spark 0.9. thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-declare-Tuple-return-type-for-a-function-tp5047.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
What is Seq[V] in updateStateByKey?
What is Seq[V] in updateStateByKey? Does this store the collected tuples of the RDD in a collection? Method signature: def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) = Option[S] ): DStream[(K, S)] In my case I used Seq[Double] assuming a sequence of Doubles in the RDD; the moment I switched to a different type like Seq[(String, Double)] the code didn't compile. -Adrian
packaging time
Each time I run sbt/sbt assembly to compile my program, the packaging time takes about 370 sec (about 6 min). How can I reduce this time? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/packaging-time-tp5048.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Delayed Scheduling - Setting spark.locality.wait.node parameter in interactive shell
Hi All, I have replication factor 3 in my HDFS. With 3 datanodes, i ran my experiments. Now i just added another node to it with no data in it. When i ran, SPARK launches non-local tasks in it and the time taken is more than what it took for 3 node cluster. Here delayed scheduling fails i think because of the parameter spark.locality.wait.node which is by default 3 sec. It launches ANY level tasks in the added data node. I wanted to increase this parameter in the interactive shell. How do i do it. What variable should i set to pass it onto the spark-context in interactive shell? Thanks.
Re: Spark: issues with running a sbt fat jar due to akka dependencies
you need to merge reference.conf files and its no longer an issue. see the Build for for spark itself: case reference.conf = MergeStrategy.concat On Tue, Apr 29, 2014 at 3:32 PM, Shivani Rao raoshiv...@gmail.com wrote: Hello folks, I was going to post this question to spark user group as well. If you have any leads on how to solve this issue please let me know: I am trying to build a basic spark project (spark depends on akka) and I am trying to create a fatjar using sbt assembly. The goal is to run the fatjar via commandline as follows: java -cp path to my spark fatjar mainclassname I encountered deduplication errors in the following akka libraries during sbt assembly akka-remote_2.10-2.2.3.jar with akka-remote_2.10-2.2.3-shaded-protobuf.jar akka-actor_2.10-2.2.3.jar with akka-actor_2.10-2.2.3-shaded-protobuf.jar I resolved them by using MergeStrategy.first and that helped with a successful compilation of the sbt assembly command. But for some or the other configuration parameter in the akka kept throwing up with the following message Exception in thread main com.typesafe.config.ConfigException$Missing: No configuration setting found for key I then used MergeStrategy.concat for reference.conf and I started getting this repeated error Exception in thread main com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.version'. I noticed that akka.version is only in the akka-actor jars and not in the akka-remote. The resulting reference.conf (in my final fat jar) does not contain akka.version either. So the strategy is not working. There are several things I could try a) Use the following dependency https://github.com/sbt/sbt-proguard b) Write a build.scala to handle merging of reference.conf https://spark-project.atlassian.net/browse/SPARK-395 http://letitcrash.com/post/21025950392/howto-sbt-assembly-vs-reference-conf c) Create a reference.conf by merging all akka configurations and then passing it in my java -cp command as shown below java -cp jar-name -DConfig.file=config The main issue is that if I run the spark jar as sbt run there are no errors in accessing any of the akka configuration parameters. It is only when I run it via command line (java -cp jar-name classname) that I encounter the error. Which of these is a long term fix to akka issues? For now, I removed the akka dependencies and that solved the problem, but I know that is not a long term solution Regards, Shivani -- Software Engineer Analytics Engineering Team@ Box Mountain View, CA
Re: packaging time
Tip: read the wiki -- https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools On Tue, Apr 29, 2014 at 12:48 PM, Daniel Darabos daniel.dara...@lynxanalytics.com wrote: Tips from my experience. Disable scaladoc: sources in doc in Compile := List() Do not package the source: publishArtifact in packageSrc := false And most importantly do not run sbt assembly. It creates a fat jar. Use sbt package or sbt stage (from sbt-native-packager). They create a directory full of jars, and only need to update the one containing your code. On Tue, Apr 29, 2014 at 8:50 PM, SK skrishna...@gmail.com wrote: Each time I run sbt/sbt assembly to compile my program, the packaging time takes about 370 sec (about 6 min). How can I reduce this time? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/packaging-time-tp5048.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: What is Seq[V] in updateStateByKey?
The original DStream is of (K,V). This function creates a DStream of (K,S). Each time slice brings one or more new V for each K. The old state S (can be different from V!) for each K -- possibly non-existent -- is updated in some way by a bunch of new V, to produce a new state S -- which also might not exist anymore after update. That's why the function is from a Seq[V], and an Option[S], to an Option[S]. If you RDD has value type V = Double then your function needs to update state based on a new Seq[Double] at each time slice, since Doubles are the new thing arriving for each key at each time slice. On Tue, Apr 29, 2014 at 7:50 PM, Adrian Mocanu amoc...@verticalscope.com wrote: What is Seq[V] in updateStateByKey? Does this store the collected tuples of the RDD in a collection? Method signature: def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) = Option[S] ): DStream[(K, S)] In my case I used Seq[Double] assuming a sequence of Doubles in the RDD; the moment I switched to a different type like Seq[(String, Double)] the code didn’t compile. -Adrian
Re: What is Seq[V] in updateStateByKey?
You may have already seen it, but I will mention it anyways. This example may help. https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala Here the state is essentially a running count of the words seen. So the value type (i.e, V) is Int (count of a word in each batch) and the state type (i.e. S) is also a Int (running count). The updateFunction essentially sums up the running count with the new count and to generate a new running count. TD On Tue, Apr 29, 2014 at 1:49 PM, Sean Owen so...@cloudera.com wrote: The original DStream is of (K,V). This function creates a DStream of (K,S). Each time slice brings one or more new V for each K. The old state S (can be different from V!) for each K -- possibly non-existent -- is updated in some way by a bunch of new V, to produce a new state S -- which also might not exist anymore after update. That's why the function is from a Seq[V], and an Option[S], to an Option[S]. If you RDD has value type V = Double then your function needs to update state based on a new Seq[Double] at each time slice, since Doubles are the new thing arriving for each key at each time slice. On Tue, Apr 29, 2014 at 7:50 PM, Adrian Mocanu amoc...@verticalscope.com wrote: What is Seq[V] in updateStateByKey? Does this store the collected tuples of the RDD in a collection? Method signature: def updateStateByKey[S: ClassTag] ( updateFunc: (Seq[V], Option[S]) = Option[S] ): DStream[(K, S)] In my case I used Seq[Double] assuming a sequence of Doubles in the RDD; the moment I switched to a different type like Seq[(String, Double)] the code didn’t compile. -Adrian
Re: Python Spark on YARN
This will be possible in 1.0 after this pull request: https://github.com/apache/spark/pull/30 Matei On Apr 29, 2014, at 9:51 AM, Guanhua Yan gh...@lanl.gov wrote: Hi all: Is it possible to develop Spark programs in Python and run them on YARN? From the Python SparkContext class, it doesn't seem to have such an option. Thank you, - Guanhua === Guanhua Yan, Ph.D. Information Sciences Group (CCS-3) Los Alamos National Laboratory Tel: +1-505-667-0176 Email: gh...@lanl.gov Web: http://ghyan.weebly.com/ ===
Spark's behavior
Hi TD, In my tests with spark streaming, I'm using JavaNetworkWordCount(modified) code and a program that I wrote that sends words to the Spark worker, I use TCP as transport. I verified that after starting Spark, it connects to my source which actually starts sending, but the first word count is advertised approximately 30 seconds after the context creation. So I'm wondering where is stored the 30 seconds data already sent by the source. Is this a normal spark’s behaviour? I saw the same behaviour using the shipped JavaNetworkWordCount application. Many thanks. -- Informativa sulla Privacy: http://www.unibs.it/node/8155
rdd ordering gets scrambled
Hi, I started with a text file(CSV) of sorted data (by first column), parsed it into Scala objects using map operation in Scala. Then I used more maps to add some extra info to the data and saved it as text file. The final text file is not sorted. What do I need to do to keep the order from the original input intact? My code looks like: csvFile = sc.textFile(..) //file is CSV and ordered by first column splitRdd = csvFile map { line = line.split(,,-1) } parsedRdd = rdd map { parts = { key = parts(0) //use first column as key value = new MyObject(parts(0), parts(1)) //parse into scala objects (key, value) } augmentedRdd = parsedRdd map { x = key = x._1 value = //add extra fields to x._2 (key, value) } augmentedRdd.saveAsFile(...) //this file is not sorted Mohit.
Re: Spark's behavior
Hi TD, We are not using stream context with master local, we have 1 Master and 8 Workers and 1 word source. The command line that we are using is: bin/run-example org.apache.spark.streaming.examples.JavaNetworkWordCount spark://192.168.0.13:7077 On Apr 30, 2014, at 0:09, Tathagata Das tathagata.das1...@gmail.com wrote: Is you batch size 30 seconds by any chance? Assuming not, please check whether you are creating the streaming context with master local[n] where n 2. With local or local[1], the system only has one processing slot, which is occupied by the receiver leaving no room for processing the received data. It could be that after 30 seconds, the server disconnects, the receiver terminates, releasing the single slot for the processing to proceed. TD On Tue, Apr 29, 2014 at 2:28 PM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi TD, In my tests with spark streaming, I'm using JavaNetworkWordCount(modified) code and a program that I wrote that sends words to the Spark worker, I use TCP as transport. I verified that after starting Spark, it connects to my source which actually starts sending, but the first word count is advertised approximately 30 seconds after the context creation. So I'm wondering where is stored the 30 seconds data already sent by the source. Is this a normal spark’s behaviour? I saw the same behaviour using the shipped JavaNetworkWordCount application. Many thanks. -- Informativa sulla Privacy: http://www.unibs.it/node/8155 -- Informativa sulla Privacy: http://www.unibs.it/node/8155
Re: Python Spark on YARN
Thanks, Matei. Will take a look at it. Best regards, Guanhua From: Matei Zaharia matei.zaha...@gmail.com Reply-To: user@spark.apache.org Date: Tue, 29 Apr 2014 14:19:30 -0700 To: user@spark.apache.org Subject: Re: Python Spark on YARN This will be possible in 1.0 after this pull request: https://github.com/apache/spark/pull/30 Matei On Apr 29, 2014, at 9:51 AM, Guanhua Yan gh...@lanl.gov wrote: Hi all: Is it possible to develop Spark programs in Python and run them on YARN? From the Python SparkContext class, it doesn't seem to have such an option. Thank you, - Guanhua === Guanhua Yan, Ph.D. Information Sciences Group (CCS-3) Los Alamos National Laboratory Tel: +1-505-667-0176 Email: gh...@lanl.gov Web: http://ghyan.weebly.com http://ghyan.weebly.com/ / ===
Re: Spark's behavior
Strange! Can you just do lines.print() to print the raw data instead of doing word count. Beyond that we can do two things. 1. Can see the Spark stage UI to see whether there are stages running during the 30 second period you referred to? 2. If you upgrade to using Spark master branch (or Spark 1.0 RC3, see different thread by Patrick), it has a streaming UI, which shows the number of records received, the state of the receiver, etc. That may be more useful in debugging whats going on . TD On Tue, Apr 29, 2014 at 3:31 PM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi TD, We are not using stream context with master local, we have 1 Master and 8 Workers and 1 word source. The command line that we are using is: bin/run-example org.apache.spark.streaming.examples.JavaNetworkWordCount spark://192.168.0.13:7077 On Apr 30, 2014, at 0:09, Tathagata Das tathagata.das1...@gmail.com wrote: Is you batch size 30 seconds by any chance? Assuming not, please check whether you are creating the streaming context with master local[n] where n 2. With local or local[1], the system only has one processing slot, which is occupied by the receiver leaving no room for processing the received data. It could be that after 30 seconds, the server disconnects, the receiver terminates, releasing the single slot for the processing to proceed. TD On Tue, Apr 29, 2014 at 2:28 PM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi TD, In my tests with spark streaming, I'm using JavaNetworkWordCount(modified) code and a program that I wrote that sends words to the Spark worker, I use TCP as transport. I verified that after starting Spark, it connects to my source which actually starts sending, but the first word count is advertised approximately 30 seconds after the context creation. So I'm wondering where is stored the 30 seconds data already sent by the source. Is this a normal spark’s behaviour? I saw the same behaviour using the shipped JavaNetworkWordCount application. Many thanks. -- Informativa sulla Privacy: http://www.unibs.it/node/8155 Informativa sulla Privacy: http://www.unibs.it/node/8155
RE: Shuffle Spill Issue
Hi Daniel Thanks for your reply, While I think for reduceByKey, it will also do map side combine, thus extra the result is the same, say, for each partition, one entry per distinct word. In my case with javaserializer, 240MB dataset yield to around 70MB shuffle data. Only that shuffle Spill ( memory ) is abnormal, and sounds to me should not trigger at all. And, by the way, this behavior only occurs in map out side, on reduce / shuffle fetch side, this strange behavior won't happen. Best Regards, Raymond Liu From: Daniel Darabos [mailto:daniel.dara...@lynxanalytics.com] I have no idea why shuffle spill is so large. But this might make it smaller: val addition = (a: Int, b: Int) = a + b val wordsCount = wordsPair.combineByKey(identity, addition, addition) This way only one entry per distinct word will end up in the shuffle for each partition, instead of one entry per word occurrence. On Tue, Apr 29, 2014 at 7:48 AM, Liu, Raymond raymond@intel.com wrote: Hi Patrick I am just doing simple word count , the data is generated by hadoop random text writer. This seems to me not quite related to compress , If I turn off compress on shuffle, the metrics is something like below for the smaller 240MB Dataset. Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Shuffle Read Shuffle Write Shuffle Spill (Memory) Shuffle Spill (Disk) 10 sr437:48527 35 s 8 0 8 0.0 B 2.5 MB 2.2 GB 1291.2 KB 12 sr437:46077 34 s 8 0 8 0.0 B 2.5 MB 1822.6 MB 1073.3 KB 13 sr434:37896 31 s 8 0 8 0.0 B 2.4 MB 1099.2 MB 621.2 KB 15 sr438:52819 31 s 8 0 8 0.0 B 2.5 MB 1898.8 MB 1072.6 KB 16 sr434:37103 32 s 8 0 8 0.0 B 2.4 MB 1638.0 MB 1044.6 KB And the program pretty simple: val files = sc.textFile(args(1)) val words = files.flatMap(_.split( )) val wordsPair = words.map(x = (x, 1)) val wordsCount = wordsPair.reduceByKey(_ + _) val count = wordsCount.count() println(Number of words = + count) Best Regards, Raymond Liu From: Patrick Wendell [mailto:pwend...@gmail.com] Could you explain more what your job is doing and what data types you are using? These numbers alone don't necessarily indicate something is wrong. The relationship between the in-memory and on-disk shuffle amount is definitely a bit strange, the data gets compressed when written to disk, but unless you have a weird dataset (E.g. all zeros) I wouldn't expect it to compress _that_ much. On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond raymond@intel.com wrote: Hi I am running a simple word count program on spark standalone cluster. The cluster is made up of 6 node, each run 4 worker and each worker own 10G memory and 16 core thus total 96 core and 240G memory. ( well, also used to configed as 1 worker with 40G memory on each node ) I run a very small data set (2.4GB on HDFS on total) to confirm the problem here as below: As you can read from part of the task metrics as below, I noticed that the shuffle spill part of metrics indicate that there are something wrong. Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Shuffle Read Shuffle Write Shuffle Spill (Memory) Shuffle Spill (Disk) 0 sr437:42139 29 s 4 0 4 0.0 B 4.3 MB 23.6 GB 4.3 MB 1 sr433:46935 1.1 min 4 0 4 0.0 B 4.2 MB 19.0 GB 3.4 MB 10 sr436:53277 26 s 4 0 4 0.0 B 4.3 MB 25.6 GB 4.6 MB 11 sr437:58872 32 s 4 0 4 0.0 B 4.3 MB 25.0 GB 4.4 MB 12 sr435:48358 27 s 4 0 4 0.0 B 4.3 MB 25.1 GB 4.4 MB You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x of the actual shuffle data and Shuffle Spill (Disk), and also it seems to me that by no means that the spill should trigger, since the memory is not used up at all. To verify that I further reduce the data size to 240MB on total And here is the result: Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Shuffle Read Shuffle Write Shuffle Spill (Memory) Shuffle Spill (Disk) 0 sr437:50895 15 s 4 0 4 0.0 B 703.0 KB 80.0 MB 43.2 KB 1 sr433:50207 17 s 4 0 4 0.0 B 704.7 KB 389.5 MB 90.2 KB 10 sr436:56352 16 s 4 0 4 0.0 B 700.9 KB 814.9 MB 181.6 KB 11 sr437:53099 15 s 4 0 4 0.0 B 689.7 KB 0.0 B 0.0 B 12 sr435:48318 15 s 4 0 4 0.0 B 702.1 KB 427.4 MB 90.7 KB 13 sr433:59294 17 s 4 0 4 0.0 B 704.8 KB 779.9 MB
Re: NoSuchMethodError from Spark Java
i met with the same question when update to spark 0.9.1 (svn checkout https://github.com/apache/spark/) Exception in thread main java.lang.NoSuchMethodError: org.apache.spark.SparkContext$.jarOfClass(Ljava/lang/Class;)Lscala/collection/Seq; at org.apache.spark.examples.GroupByTest$.main(GroupByTest.scala:38) at org.apache.spark.examples.GroupByTest.main(GroupByTest.scala) sbt.buid: name := GroupByTest version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 0.9.1 resolvers += Akka Repository at http://repo.akka.io/releases/; is there something need to modify? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-from-Spark-Java-tp4937p5076.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
How fast would you expect shuffle serialize to be?
Hi I am running a WordCount program which count words from HDFS, and I noticed that the serializer part of code takes a lot of CPU time. On a 16core/32thread node, the total throughput is around 50MB/s by JavaSerializer, and if I switching to KryoSerializer, it doubles to around 100-150MB/s. ( I have 12 disks per node and files scatter across disks, so HDFS BW is not a problem) And I also notice that, in this case, the object to write is (String, Int), if I try some case with (int, int), the throughput will be 2-3x faster further. So, in my Wordcount case, the bottleneck is CPU ( cause if with shuffle compress on, the 150MB/s data bandwidth in input side, will usually lead to around 50MB/s shuffle data) This serialize BW looks somehow too low , so I am wondering, what's BW you observe in your case? Does this throughput sounds reasonable to you? If not, anything might possible need to be examined in my case? Best Regards, Raymond Liu
Re: How fast would you expect shuffle serialize to be?
Is this the serialization throughput per task or the serialization throughput for all the tasks? On Tue, Apr 29, 2014 at 9:34 PM, Liu, Raymond raymond@intel.com wrote: Hi I am running a WordCount program which count words from HDFS, and I noticed that the serializer part of code takes a lot of CPU time. On a 16core/32thread node, the total throughput is around 50MB/s by JavaSerializer, and if I switching to KryoSerializer, it doubles to around 100-150MB/s. ( I have 12 disks per node and files scatter across disks, so HDFS BW is not a problem) And I also notice that, in this case, the object to write is (String, Int), if I try some case with (int, int), the throughput will be 2-3x faster further. So, in my Wordcount case, the bottleneck is CPU ( cause if with shuffle compress on, the 150MB/s data bandwidth in input side, will usually lead to around 50MB/s shuffle data) This serialize BW looks somehow too low , so I am wondering, what's BW you observe in your case? Does this throughput sounds reasonable to you? If not, anything might possible need to be examined in my case? Best Regards, Raymond Liu
RE: How fast would you expect shuffle serialize to be?
For all the tasks, say 32 task on total Best Regards, Raymond Liu -Original Message- From: Patrick Wendell [mailto:pwend...@gmail.com] Is this the serialization throughput per task or the serialization throughput for all the tasks? On Tue, Apr 29, 2014 at 9:34 PM, Liu, Raymond raymond@intel.com wrote: Hi I am running a WordCount program which count words from HDFS, and I noticed that the serializer part of code takes a lot of CPU time. On a 16core/32thread node, the total throughput is around 50MB/s by JavaSerializer, and if I switching to KryoSerializer, it doubles to around 100-150MB/s. ( I have 12 disks per node and files scatter across disks, so HDFS BW is not a problem) And I also notice that, in this case, the object to write is (String, Int), if I try some case with (int, int), the throughput will be 2-3x faster further. So, in my Wordcount case, the bottleneck is CPU ( cause if with shuffle compress on, the 150MB/s data bandwidth in input side, will usually lead to around 50MB/s shuffle data) This serialize BW looks somehow too low , so I am wondering, what's BW you observe in your case? Does this throughput sounds reasonable to you? If not, anything might possible need to be examined in my case? Best Regards, Raymond Liu
Re: Union of 2 RDD's only returns the first one
Hi Patrick, I¹m a little confused about your comment that RDDs are not ordered. As far as I know, RDDs keep list of partitions that are ordered and this is why I can call RDD.take() and get the same first k rows every time I call it and RDD.take() returns the same entries as RDD.map().take() because map preserves the partition order. RDD order is also what allows me to get the top k out of RDD by doing RDD.sort().take(). Am I misunderstanding it? Or, is it just when RDD is written to disk that the order is not well preserved? Thanks in advance! Mingyu On 1/22/14, 4:46 PM, Patrick Wendell pwend...@gmail.com wrote: Ah somehow after all this time I've never seen that! On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia buendia...@gmail.com wrote: On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell pwend...@gmail.com wrote: What is the ++ operator here? Is this something you defined? No, it's an alias for union defined in RDD.scala: def ++(other: RDD[T]): RDD[T] = this.union(other) Another issue is that RDD's are not ordered, so when you union two together it doesn't have a well defined ordering. If you do want to do this you could coalesce into one partition, then call MapPartitions and return an iterator that first adds your header and then the rest of the file, then call saveAsTextFile. Keep in mind this will only work if you coalesce into a single partition. Thanks! I'll give this a try. myRdd.coalesce(1) .map(_.mkString(,))) .mapPartitions(it = (Seq(col1,col2,col3) ++ it).iterator) .saveAsTextFile(out.csv) - Patrick On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia buendia...@gmail.com wrote: Hi, I'm trying to find a way to create a csv header when using saveAsTextFile, and I came up with this: (sc.makeRDD(Array(col1,col2,col3), 1) ++ myRdd.coalesce(1).map(_.mkString(,))) .saveAsTextFile(out.csv) But it only saves the header part. Why is that the union method does not return both RDD's? smime.p7s Description: S/MIME cryptographic signature
RE: How fast would you expect shuffle serialize to be?
By the way, to be clear, I run repartition firstly to make all data go through shuffle instead of run ReduceByKey etc directly ( which reduce the data need to be shuffle and serialized), thus say all 50MB/s data from HDFS will go to serializer. ( in fact, I also tried generate data in memory directly instead of read from HDFS, similar throughput result) Best Regards, Raymond Liu -Original Message- From: Liu, Raymond [mailto:raymond@intel.com] For all the tasks, say 32 task on total Best Regards, Raymond Liu -Original Message- From: Patrick Wendell [mailto:pwend...@gmail.com] Is this the serialization throughput per task or the serialization throughput for all the tasks? On Tue, Apr 29, 2014 at 9:34 PM, Liu, Raymond raymond@intel.com wrote: Hi I am running a WordCount program which count words from HDFS, and I noticed that the serializer part of code takes a lot of CPU time. On a 16core/32thread node, the total throughput is around 50MB/s by JavaSerializer, and if I switching to KryoSerializer, it doubles to around 100-150MB/s. ( I have 12 disks per node and files scatter across disks, so HDFS BW is not a problem) And I also notice that, in this case, the object to write is (String, Int), if I try some case with (int, int), the throughput will be 2-3x faster further. So, in my Wordcount case, the bottleneck is CPU ( cause if with shuffle compress on, the 150MB/s data bandwidth in input side, will usually lead to around 50MB/s shuffle data) This serialize BW looks somehow too low , so I am wondering, what's BW you observe in your case? Does this throughput sounds reasonable to you? If not, anything might possible need to be examined in my case? Best Regards, Raymond Liu
Re: JavaSparkConf
This class was made to be java friendly so that we wouldn't have to use two versions. The class itself is simple. But I agree adding java setters would be nice. On Tue, Apr 29, 2014 at 8:32 PM, Soren Macbeth so...@yieldbot.com wrote: There is a JavaSparkContext, but no JavaSparkConf object. I know SparkConf is new in 0.9.x. Is there a plan to add something like this to the java api? It's rather a bother to have things like setAll take a scala Traverable[String String] when using SparkConf from the java api. At a minimum adding methods signatures for java collections where there are currently scala collection would be a good start. TIA
RE: How fast would you expect shuffle serialize to be?
Later case, total throughput aggregated from all cores. Best Regards, Raymond Liu -Original Message- From: Patrick Wendell [mailto:pwend...@gmail.com] Sent: Wednesday, April 30, 2014 1:22 PM To: user@spark.apache.org Subject: Re: How fast would you expect shuffle serialize to be? Hm - I'm still not sure if you mean 100MB/s for each task = 3200MB/s across all cores -or- 3.1MB/s for each task = 100MB/s across all cores If it's the second one, that's really slow and something is wrong. If it's the first one this in the range of what I'd expect, but I'm no expert. On Tue, Apr 29, 2014 at 10:14 PM, Liu, Raymond raymond@intel.com wrote: For all the tasks, say 32 task on total Best Regards, Raymond Liu -Original Message- From: Patrick Wendell [mailto:pwend...@gmail.com] Is this the serialization throughput per task or the serialization throughput for all the tasks? On Tue, Apr 29, 2014 at 9:34 PM, Liu, Raymond raymond@intel.com wrote: Hi I am running a WordCount program which count words from HDFS, and I noticed that the serializer part of code takes a lot of CPU time. On a 16core/32thread node, the total throughput is around 50MB/s by JavaSerializer, and if I switching to KryoSerializer, it doubles to around 100-150MB/s. ( I have 12 disks per node and files scatter across disks, so HDFS BW is not a problem) And I also notice that, in this case, the object to write is (String, Int), if I try some case with (int, int), the throughput will be 2-3x faster further. So, in my Wordcount case, the bottleneck is CPU ( cause if with shuffle compress on, the 150MB/s data bandwidth in input side, will usually lead to around 50MB/s shuffle data) This serialize BW looks somehow too low , so I am wondering, what's BW you observe in your case? Does this throughput sounds reasonable to you? If not, anything might possible need to be examined in my case? Best Regards, Raymond Liu
Re: Union of 2 RDD's only returns the first one
You are right, once you sort() the RDD, then yes it has a well defined ordering. But that ordering is lost as soon as you transform the RDD, including if you union it with another RDD. On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim m...@palantir.com wrote: Hi Patrick, I¹m a little confused about your comment that RDDs are not ordered. As far as I know, RDDs keep list of partitions that are ordered and this is why I can call RDD.take() and get the same first k rows every time I call it and RDD.take() returns the same entries as RDD.map(Š).take() because map preserves the partition order. RDD order is also what allows me to get the top k out of RDD by doing RDD.sort().take(). Am I misunderstanding it? Or, is it just when RDD is written to disk that the order is not well preserved? Thanks in advance! Mingyu On 1/22/14, 4:46 PM, Patrick Wendell pwend...@gmail.com wrote: Ah somehow after all this time I've never seen that! On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia buendia...@gmail.com wrote: On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell pwend...@gmail.com wrote: What is the ++ operator here? Is this something you defined? No, it's an alias for union defined in RDD.scala: def ++(other: RDD[T]): RDD[T] = this.union(other) Another issue is that RDD's are not ordered, so when you union two together it doesn't have a well defined ordering. If you do want to do this you could coalesce into one partition, then call MapPartitions and return an iterator that first adds your header and then the rest of the file, then call saveAsTextFile. Keep in mind this will only work if you coalesce into a single partition. Thanks! I'll give this a try. myRdd.coalesce(1) .map(_.mkString(,))) .mapPartitions(it = (Seq(col1,col2,col3) ++ it).iterator) .saveAsTextFile(out.csv) - Patrick On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia buendia...@gmail.com wrote: Hi, I'm trying to find a way to create a csv header when using saveAsTextFile, and I came up with this: (sc.makeRDD(Array(col1,col2,col3), 1) ++ myRdd.coalesce(1).map(_.mkString(,))) .saveAsTextFile(out.csv) But it only saves the header part. Why is that the union method does not return both RDD's?