Re: Printing the RDDs in SparkPageRank
When I add parts(0).collect().foreach(println) parts(1).collect().foreach(println), for printing parts, I get the following error *not enough arguments for method collect: (pf: PartialFunction[Char,B])(implicit bf:scala.collection.generic.CanBuildFrom[String,B,That])That.Unspecified value parameter pf.parts(0).collect().foreach(println)* And, when I add parts.collect().foreach(println), I get the following error *not enough arguments for method collect: (pf: PartialFunction[String,B])(implicit bf: scala.collection.generic.CanBuildFrom[Array[String],B,That])That.Unspecified value parameter pf.parts.collect().foreach(println) * On Sun, Aug 24, 2014 at 8:27 PM, Jörn Franke jornfra...@gmail.com wrote: Hi, What kind of error do you receive? Best regards, Jörn Le 24 août 2014 08:29, Deep Pradhan pradhandeep1...@gmail.com a écrit : Hi, I was going through the SparkPageRank code and want to see the intermediate steps, like the RDDs formed in the intermediate steps. Here is a part of the code along with the lines that I added in order to print the RDDs. I want to print the *parts* in the code (denoted by the comment in Bold letters). But, when I try to do the same thing there, it gives an error. Can someone suggest what I should be doing? Thank You CODE: object SparkPageRank { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(PageRank) var iters = args(1).toInt val ctx = new SparkContext(sparkConf) val lines = ctx.textFile(args(0), 1) println(The lines RDD is) lines.collect().foreach(println) val links = lines.map{ s = val parts = s.split(\\s+) (parts(0), parts(1)) */*I want to print this parts*/* }.distinct().groupByKey().cache() println(The links RDD is) links.collect().foreach(println) var ranks = links.mapValues(v = 1.0) println(The ranks RDD is) ranks.collect().foreach(println) for (i - 1 to iters) { val contribs = links.join(ranks).values.flatMap{ case (urls, rank) = val size = urls.size urls.map(url = (url, rank / size)) } println(The contribs RDD is) contribs.collect().foreach(println) ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _) } println(The second ranks RDD is) ranks.collect().foreach(println) val output = ranks.collect() output.foreach(tup = println(tup._1 + has rank: + tup._2 + .)) ctx.stop() } }
How to join two PairRDD together?
Hello everyone, I am transplanting a clustering algorithm to spark platform, and I meet a problem confusing me for a long time, can someone help me? I have a PairRDDInteger, Integer named patternRDD, which the key represents a number and the value stores an information of the key. And I want to use two of the VALUEs to calculate a kendall number, and if the number is greater than 0.6, then output the two KEYs. I have tried to transform the PairRDD to a RDDTuple2Integer, Integer, and add a common key zero to them, and join two together then get a PairRDD0, IterableTuple2Tuple2key1, value1, Tuple2key2, value2, and tried to use values() method and map the keys out, but it gives me an out of memory error. I think the out of memory error is caused by the few entries of my RDD, but I have no idea how to solve it. Can you help me? Regards, Gefei Li
many fetch failure in BlockManager
*HI ALL:* *My job is cpu intensive, and its resource configuration is 400 worker * 1 core * 3G. There are many fetch failure, like:* 14-08-23 08:34:52 WARN [Result resolver thread-3] TaskSetManager: Loss was due to fetch failure from BlockManagerId(slave1:33500) 14-08-23 08:34:52 INFO [spark-akka.actor.default-dispatcher-37] DAGScheduler: Marking Stage 4 (repartition at test.scala:97) for resubmision due to a fetch failure 14-08-23 08:34:52 INFO [spark-akka.actor.default-dispatcher-37] DAGScheduler: The failed fetch was from Stage 5 (repartition at test.scala:82); marking it for resubmission 14-08-23 08:34:53 INFO [spark-akka.actor.default-dispatcher-71] DAGScheduler: Resubmitting failed stages 14-08-23 08:35:06 WARN [Result resolver thread-2] TaskSetManager: Loss was due to fetch failure from BlockManagerId(slave2:34792) 14-08-23 08:35:06 INFO [spark-akka.actor.default-dispatcher-63] DAGScheduler: Marking Stage 4 (repartition at test.scala:97) for resubmision due to a fetch failure 14-08-23 08:35:06 INFO [spark-akka.actor.default-dispatcher-63] DAGScheduler: The failed fetch was from Stage 5 (repartition at test.scala:82); marking it for resubmission 14-08-23 08:35:06 INFO [spark-akka.actor.default-dispatcher-63] DAGScheduler: Executor lost: 118 (epoch 3) 14-08-23 08:35:06 INFO [spark-akka.actor.default-dispatcher-38] BlockManagerMasterActor: Trying to remove executor 118 from BlockManagerMaster. 14-08-23 08:35:06 INFO [spark-akka.actor.default-dispatcher-63] BlockManagerMaster: Removed 118 successfully in removeExecutor 14-08-23 08:35:06 INFO [spark-akka.actor.default-dispatcher-43] DAGScheduler: Resubmitting failed stages *stage 4 will be marked for resubmission. After a period of time: block manager slave1:33500 will be registered again* 14-08-23 08:36:16 INFO [spark-akka.actor.default-dispatcher-58] BlockManagerInfo: Registering block manager slave1:33500 with 1766.4 MB RAM *unfortunately, stage 4 will be resubmitted again and again, and meet many fetch failure. After 14-08-23 09:03:37, there is no log in master, and print log again at 14-08-24 00:43:15* 14-08-23 09:03:37 INFO [Result resolver thread-3] YarnClusterScheduler: Removed TaskSet 4.0, whose tasks have all completed, from pool 14-08-23 09:03:37 INFO [spark-akka.actor.default-dispatcher-28] DAGScheduler: Marking Stage 4 (repartition at test.scala:97) for resubmision due to a fetch failure 14-08-23 09:03:37 INFO [spark-akka.actor.default-dispatcher-28] DAGScheduler: The failed fetch was from Stage 5 (repartition at test.scala:82); marking it for resubmission 14-08-23 09:03:37 INFO [spark-akka.actor.default-dispatcher-71] DAGScheduler: Resubmitting failed stages 14-08-24 00:43:15 INFO [Thread-854] YarnAllocationHandler: Completed container container_1400565786114_133451_01_000171 (state: COMPLETE, exit status: -100) 14-08-24 00:43:15 INFO [Thread-854] YarnAllocationHandler: Container marked as failed: container_1400565786114_133451_01_000171 14-08-24 00:43:15 INFO [Thread-854] YarnAllocationHandler: Completed container container_1400565786114_133451_01_000172 (state: COMPLETE, exit status: -100) 14-08-24 00:43:15 INFO [Thread-854] YarnAllocationHandler: Container marked as failed: container_1400565786114_133451_01_000172 14-08-24 00:43:20 INFO [Thread-854] ApplicationMaster: Allocating 2 containers to make up for (potentially) lost containers 14-08-24 00:43:20 INFO [Thread-854] YarnAllocationHandler: Will Allocate 2 executor containers, each with 3456 memory *Strangely, TaskSet4.0 will be removed as its tasks have completed, while Stage 4 was marked for resubmission. In Executor there are many java.net.ConnectException: Connection timed out, like:* 14-08-23 08:19:14 WARN [pool-3-thread-1] SendingConnection: Error finishing connection to java.net.ConnectException: Connection timed out at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:599) at org.apache.spark.network.SendingConnection.finishConnect(Connection.scala:318) at org.apache.spark.network.ConnectionManager$$anon$7.run(ConnectionManager.scala:203) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) *I often meet such problems, i.e. BlockManager Connection Fail, and Spark can not recover effectively, and job will hang or fail directly.* *Any Suggestions? And are there any guides about resource for job in view of computing, cache, shuffle, etc.* *Thank You!*
Re: Open sourcing Spindle by Adobe Research, a web analytics processing engine in Scala, Spark, and Parquet.
\cc David Tompkins and Jim Donahue if they have anything to add. \cc My school email. Please include bamos_cmu.edu for further discussion. Hi Deb, Debasish Das wrote Looks very cool...will try it out for ad-hoc analysis of our datasets and provide more feedback... Could you please give bit more details about the differences of Spindle architecture compared to Hue + Spark integration (python stack) and Ooyala Jobserver ? Does Spindle allow sharing of spark context over multiple spark jobs like jobserver ? Great point, I think these jobservers would work well with Spindle on larger clusters. I've added the following portion to the README to mention this as an area of future work. Regards, Brandon. --- ## Future Work - Utilizing Spark job servers or resource managers. Spindle's architecture can likely be improved on larger clusters by utilizing a job server or resource manager to maintain a pool of Spark contexts for query execution. [Ooyala's spark-jobserver][spark-jobserver] provides a RESTful interface for submitting Spark jobs that Spindle could interface with instead of interfacing with Spark directly. [YARN][yarn] can also be used to manage Spark's resources on a cluster, as described in [this article][spark-yarn]. However, allocating resources on the cluster raises additional questions and engineering work that Spindle can address in future work. Spindle's current architecture coincides HDFS and Spark workers on the same nodes, minimizing the network traffic required to load data. How much will the performance degrade if the resource manager allocates some subset of Spark workers that don't coincide with any of the HDFS data being accessed? Furthermore, how would a production-ready caching policy on a pool of Spark Contexts look? What if many queries are being submitted and executed on different Spark Contexts that use the same data? Scheduling the queries on the same Spark Context and caching the data between query executions would substantially increase the performance, but how should the scheduler be informed of this information? [spark-jobserver]: https://github.com/ooyala/spark-jobserver [yarn]: http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html [spark-yarn]: http://blog.cloudera.com/blog/2014/05/apache-spark-resource-management-and-yarn-app-models/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Open-sourcing-Spindle-by-Adobe-Research-a-web-analytics-processing-engine-in-Scala-Spark-and-Parquet-tp12203p12731.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark and matlab
Hi all, Is there someone that tried to pipe RDD into matlab script ? I'm trying to do something similiar if one of you could point some hints. Best regards, Jao
Re: Printing the RDDs in SparkPageRank
On Mon, Aug 25, 2014 at 7:18 AM, Deep Pradhan pradhandeep1...@gmail.com wrote: When I add parts(0).collect().foreach(println) parts(1).collect().foreach(println), for printing parts, I get the following error not enough arguments for method collect: (pf: PartialFunction[Char,B])(implicit bf:scala.collection.generic.CanBuildFrom[String,B,That])That.Unspecified value parameter pf.parts(0).collect().foreach(println) val links = lines.map{ s = val parts = s.split(\\s+) (parts(0), parts(1)) /*I want to print this parts*/ }.distinct().groupByKey().cache() Within this code, you are working in a simple Scala function. parts is an Array[String]. parts(0) is a String. You can just println(parts(0)). You are not calling RDD.collect() there, but collect() on a String a sequence of Char. However note that this will print the String on the worker that executes this, not the driver. Maybe you want to print the result right after this map function? Then break this into two statements and print the result of the first. You already are doing that in your code. A good formula is actually take(10) rather than collect() in case the RDD is huge. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
apply at Option.scala:120
Hi, All When I run spark applications, I see from the web-ui that some stage description are like apply at Option.scala:120. Why spark splits a stage on a line that is not in my spark program but a Scala library? Thanks Jensen
StorageLevel error.
Hi, Can someone help me with the following error: scala val rdd = sc.parallelize(Array(1,2,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at console:12 scala rdd.persist(StorageLevel.MEMORY_ONLY) console:15: error: not found: value StorageLevel rdd.persist(StorageLevel.MEMORY_ONLY) ^ Thank you!!!
Re: StorageLevel error.
you need import StorageLevel by: import org.apache.spark.storage.StorageLevel taoist...@gmail.com From: rapelly kartheek Date: 2014-08-25 18:22 To: user Subject: StorageLevel error. Hi, Can someone help me with the following error: scala val rdd = sc.parallelize(Array(1,2,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at console:12 scala rdd.persist(StorageLevel.MEMORY_ONLY) console:15: error: not found: value StorageLevel rdd.persist(StorageLevel.MEMORY_ONLY) ^ Thank you!!!
Re: Trying to run SparkSQL over Spark Streaming
Hi, Thanks for your help the other day. I had one more question regarding the same. If you want to issue an SQL statement on streaming data, you must have both the registerAsTable() and the sql() call *within* the foreachRDD(...) block, or -- as you experienced -- the table name will be unknown Since this is the case then is there any way to run join over data received from two different streams? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Trying-to-run-SparkSQL-over-Spark-Streaming-tp12530p12739.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark - GraphX pregel like with global variables (accumulator / broadcast)
Hi, I'm working on big graph analytics, and currently implementing a mean field inference algorithm in GraphX/Spark. I start with an arbitrary graph, keep a (sparse) probability distribution at each node implemented as a Map[Long,Double]. At each iteration, from the current estimates of the distributions I update some global variables with two accumulators, then I gather with mapReduceTriplet the probability distributions of neighbors, and finally update the distributions with message + the two accumulators values broadcasted to the cluster with sc.broadcast. Unfortunately, this works well for extremely small graphs, but it becomes exponentially slow with the size of the graph and the number of iterations (doesn't finish 20 iterations with graphs having 48000 edges). I suspect that the problem is related to the broadcasted variables, so I tryed to use .checkpoint() to remove the broadcasted variables from the lineage, and to use different Storagelevel for persistence, but without success. It seems to me that a lot of things are unnecessarily recomputed at each iterations whatever I try to do. I also did multiple changes to limit the number of dependency of each object, but it didn't change anything. Here is a sample of code (simplified to be understandable, so not running), hopefully this should give you a feeling about what it is doing. Thanks ! def run(graph : Graph[Long,Long],m : Long)(implicit sc : SparkContext) = { var fusionMap = Map[Long, Long]().withDefault(x = x) // Initials values val tots = Map[Long, Double]().withDefaultValue(1.0) var totBcst = sc.broadcast(tots) var fusionBcst = sc.broadcast(fusionMap) val mC = sc.broadcast(m) // Initial graph var g = graph.mapVertices({ case (vid, deg) = VertexProp(initialDistribution(vid), deg) }) var newVerts = g.vertices //Initial messages var msg = g.mapReduceTriplets(MFExecutor.sendMsgMF, MFExecutor.mergeMsgMF) var iter = 0 while (iter 20) { // MF Messages val oldMessages = msg val oldVerts = newVerts newVerts = newVerts.innerJoin(msg)(MFExecutor.vprogMF(mC,totBcst,fusionBcst))//.persist(StorageLevel.MEMORY_AND_DISK) newVerts.checkpoint() newVerts.count() val prevG = g g = graph.outerJoinVertices(newVerts)({case (vid,deg,newOpt) = newOpt.getOrElse(VertexProp(Map(vid - 1.0).withDefaultValue(0.0), deg))}).cache() //g = g.outerJoinVertices(newVerts)({case (vid,old,newOpt) = newOpt.getOrElse(old)}) // 1st global variable val fusionAcc = sc.accumulable[Map[Long, Long], (Long, Long)](fusionMap)(FusionAccumulable) g.triplets.filter(tp = testEq(fusionBcst)(tp.srcId,tp.dstId) (spd.dotPD(tp.dstAttr.prob, tp.srcAttr.prob) 0.9)).foreach(tp = fusionAcc += (tp.dstId, tp.srcId)) fusionBcst.unpersist(blocking = false) fusionMap = fusionAcc.value fusionBcst = sc.broadcast(fusionMap) //2nd global variable val totAcc = sc.accumulator[Map[Long, Double]](Map[Long, Double]().withDefaultValue(0.0))(TotAccumulable) newVerts.foreach({ case (vid, vprop) = totAcc += vprop.prob.mapValues(p = p * vprop.deg).withDefaultValue(0.0)}) totBcst.unpersist(blocking = false) totBcst = sc.broadcast(totAcc.value) // New MF messages msg = g.mapReduceTriplets(MFExecutor.sendMsgMF, MFExecutor.mergeMsgMF) // Unpersist options oldMessages.unpersist(blocking = false) oldVerts.unpersist(blocking=false) prevG.unpersistVertices(blocking=false) iter = iter + 1 } } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-GraphX-pregel-like-with-global-variables-accumulator-broadcast-tp12742.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Request for help in writing to Textfile
Hi Guys, I am currently playing with huge data.I have an RDD which returns RDD[List[(tuples)]].I need only the tuples to be written to textfile output using saveAsTextFile function. example:val mod=modify.saveASTextFile() returns List((20140813,4,141127,3,HYPHLJLU,HY,KNGHWEB,USD,144.00,662.40,KY1), (20140813,4,141127,3,HYPHLJLU,HY,DBLHWEB,USD,144.00,662.40,KY1)) List((20140813,4,141127,3,HYPHLJLU,HY,KNGHWEB,USD,144.00,662.40,KY1), (20140813,4,141127,3,HYPHLJLU,HY,DBLHWEB,USD,144.00,662.40,KY1) I need following output with only tuple values in a textfile. 20140813,4,141127,3,HYPHLJLU,HY,KNGHWEB,USD,144.00,662.40,KY1 20140813,4,141127,3,HYPHLJLU,HY,DBLHWEB,USD,144.00,662.40,KY1 Please let me know if anybody has anyidea regarding this without using collect() function...Please help me -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Request-for-help-in-writing-to-Textfile-tp12744.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Manipulating columns in CSV file or Transpose of Array[Array[String]] RDD
Hello all, Could someone help me with the manipulation of csv file data. I have 'semicolon' separated csv data including doubles and strings. I want to calculate the maximum/average of a column. When I read the file using sc.textFile(test.csv).map(_.split(;), each field is read as string. Could someone help me with the above manipulation and how to do that. Or maybe if there is some way to take the transpose of the data and then manipulating the rows in some way? Thank you in advance, I am struggling with this thing for quite sometime Regards, Vineet
Re: Development environment issues
On Thu, Aug 21, 2014 at 6:21 PM, pierred pie...@demartines.com wrote: So, what is the accepted wisdom in terms of IDE and development environment? I don't know what the accepted wisdom is. I've been getting by with the Scala IDE for Eclipse, though I am using the stable version - as you noted, this keeps me from upgrading to the latest Eclipse version. The quality of the Scala IDE is poor, but I have found it generally usable. I generate the Eclipse project files from SBT. Debugging does work (mostly) - just be aware you can't easily step into a lambda, so it's easiest to add a breakpoint inside of it. As for unit testing, both Specs2 and ScalaTest work, and I can run individual tests within Eclipse. For Specs2 there is an Eclipse plugin, and for ScalaTest you can annotate your tests with @RunWith(classOf[JUnitRunner]) and it'll work in the usual JUnit tools. I have automated tests running in Bamboo. Took a bit of wrangling to get the test output picked up, but it works. Is there a good tutorial to set things up so that one half of the libraries/tools doesn't break the other half? No idea. What do you guys use? scala 2.10 or 2.11? sbt or maven? eclipse or idea? jdk7 or 8? I'm using Java 7 and Scala 2.10.x (not every framework I use supports later versions). SBT because I use the Play Framework, but I miss Maven. I haven't tried IntelliJ's Scala support, but it's probably worth a shot. The tooling isn't nearly as solid as what Java has, but I make due. -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
Re: Understanding RDD.GroupBy OutOfMemory Exceptions
Hi Patrick, For the spilling within on key work you mention might land in Spark 1.2, is that being tracked in https://issues.apache.org/jira/browse/SPARK-1823 or is there another ticket I should be following? Thanks! Andrew On Tue, Aug 5, 2014 at 3:39 PM, Patrick Wendell pwend...@gmail.com wrote: Hi Jens, Within a partition things will spill - so the current documentation is correct. This spilling can only occur *across keys* at the moment. Spilling cannot occur within a key at present. This is discussed in the video here: https://www.youtube.com/watch?v=dmL0N3qfSc8index=3list=PL-x35fyliRwj7qNxXLgMRJaOk7o9inHBZ Spilling within one key for GroupBy's is likely to end up in the next release of Spark, Spark 1.2. In most cases we see when users hit this, they are actually trying to just do aggregations which would be more efficiently implemented without the groupBy operator. If the goal is literally to just write out to disk all the values associated with each group, and the values associated with a single group are larger than fit in memory, this cannot be accomplished right now with the groupBy operator. The best way to work around this depends a bit on what you are trying to do with the data down stream. Typically approaches involve sub-dividing any very large groups, for instance, appending a hashed value in a small range (1-10) to large keys. Then your downstream code has to deal with aggregating partial values for each group. If your goal is just to lay each group out sequentially on disk on one big file, you can call `sortByKey` with a hashed suffix as well. The sort functions are externalized in Spark 1.1 (which is in pre-release). - Patrick On Tue, Aug 5, 2014 at 2:39 PM, Jens Kristian Geyti sp...@jkg.dk wrote: Patrick Wendell wrote In the latest version of Spark we've added documentation to make this distinction more clear to users: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L390 That is a very good addition to the documentation. Nice and clear about the dangers of groupBy. Patrick Wendell wrote Currently groupBy requires that all of the values for one key can fit in memory. Is that really true? Will partitions not spill to disk, hence the recommendation in the documentation to up the parallelism of groupBy et al? A better question might be: How exactly does partitioning affect groupBy with regards to memory consumption. What will **have** to fit in memory, and what may be spilled to disk, if running out of memory? And if it really is true, that Spark requires all groups' values to fit in memory, how do I do a on-disk grouping of results, similar to what I'd to in a Hadoop job by using a mapper emitting (groupId, value) key-value pairs, and having an entity reducer writing results to disk? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-tp11427p11487.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Manipulating columns in CSV file or Transpose of Array[Array[String]] RDD
Do you want to do this on one column or all numeric columns? On Mon, Aug 25, 2014 at 7:09 AM, Hingorani, Vineet vineet.hingor...@sap.com wrote: Hello all, Could someone help me with the manipulation of csv file data. I have 'semicolon' separated csv data including doubles and strings. I want to calculate the maximum/average of a column. When I read the file using sc.textFile(test.csv).map(_.split(;), each field is read as string. Could someone help me with the above manipulation and how to do that. Or maybe if there is some way to take the transpose of the data and then manipulating the rows in some way? Thank you in advance, I am struggling with this thing for quite sometime Regards, Vineet
Re: Understanding RDD.GroupBy OutOfMemory Exceptions
Hey Andrew, We might create a new JIRA for it, but it doesn't exist yet. We'll create JIRA's for the major 1.2 issues at the beginning of September. - Patrick On Mon, Aug 25, 2014 at 8:53 AM, Andrew Ash and...@andrewash.com wrote: Hi Patrick, For the spilling within on key work you mention might land in Spark 1.2, is that being tracked in https://issues.apache.org/jira/browse/SPARK-1823 or is there another ticket I should be following? Thanks! Andrew On Tue, Aug 5, 2014 at 3:39 PM, Patrick Wendell pwend...@gmail.com wrote: Hi Jens, Within a partition things will spill - so the current documentation is correct. This spilling can only occur *across keys* at the moment. Spilling cannot occur within a key at present. This is discussed in the video here: https://www.youtube.com/watch?v=dmL0N3qfSc8index=3list=PL-x35fyliRwj7qNxXLgMRJaOk7o9inHBZ Spilling within one key for GroupBy's is likely to end up in the next release of Spark, Spark 1.2. In most cases we see when users hit this, they are actually trying to just do aggregations which would be more efficiently implemented without the groupBy operator. If the goal is literally to just write out to disk all the values associated with each group, and the values associated with a single group are larger than fit in memory, this cannot be accomplished right now with the groupBy operator. The best way to work around this depends a bit on what you are trying to do with the data down stream. Typically approaches involve sub-dividing any very large groups, for instance, appending a hashed value in a small range (1-10) to large keys. Then your downstream code has to deal with aggregating partial values for each group. If your goal is just to lay each group out sequentially on disk on one big file, you can call `sortByKey` with a hashed suffix as well. The sort functions are externalized in Spark 1.1 (which is in pre-release). - Patrick On Tue, Aug 5, 2014 at 2:39 PM, Jens Kristian Geyti sp...@jkg.dk wrote: Patrick Wendell wrote In the latest version of Spark we've added documentation to make this distinction more clear to users: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L390 That is a very good addition to the documentation. Nice and clear about the dangers of groupBy. Patrick Wendell wrote Currently groupBy requires that all of the values for one key can fit in memory. Is that really true? Will partitions not spill to disk, hence the recommendation in the documentation to up the parallelism of groupBy et al? A better question might be: How exactly does partitioning affect groupBy with regards to memory consumption. What will **have** to fit in memory, and what may be spilled to disk, if running out of memory? And if it really is true, that Spark requires all groups' values to fit in memory, how do I do a on-disk grouping of results, similar to what I'd to in a Hadoop job by using a mapper emitting (groupId, value) key-value pairs, and having an entity reducer writing results to disk? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-tp11427p11487.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Manipulating columns in CSV file or Transpose of Array[Array[String]] RDD
Hello Victor, I want to do it on multiple columns. I was able to do it on one column by the help of Sean using code below. val matData = file.map(_.split(;)) val stats = matData.map(_(2).toDouble).stats() stats.mean stats.max Thank you Vineet From: Victor Tso-Guillen [mailto:v...@paxata.com] Sent: Montag, 25. August 2014 18:34 To: Hingorani, Vineet Cc: user@spark.apache.org Subject: Re: Manipulating columns in CSV file or Transpose of Array[Array[String]] RDD Do you want to do this on one column or all numeric columns? On Mon, Aug 25, 2014 at 7:09 AM, Hingorani, Vineet vineet.hingor...@sap.commailto:vineet.hingor...@sap.com wrote: Hello all, Could someone help me with the manipulation of csv file data. I have 'semicolon' separated csv data including doubles and strings. I want to calculate the maximum/average of a column. When I read the file using sc.textFile(test.csv).map(_.split(;), each field is read as string. Could someone help me with the above manipulation and how to do that. Or maybe if there is some way to take the transpose of the data and then manipulating the rows in some way? Thank you in advance, I am struggling with this thing for quite sometime Regards, Vineet
SPARK Hive Context UDF Class Not Found Exception,
Hello All, I have added a jar from S3 instance into classpath, i have tried following options 1. sc.addJar(s3n://mybucket/lib/myUDF.jar) 2. hiveContext.sparkContext.addJar(s3n://mybucket/lib/myUDF.jar) 3. ./bin/spark-shell --jars s3n://mybucket/lib/myUDF.jar I am getting ClassNotException when trying to create a temporary function. What would be the issue here? Thanks and Regards, Sankar S.
How do you hit breakpoints using IntelliJ In functions used by an RDD
I was able to get JavaWordCount running with a local instance under IntelliJ. In order to do so I needed to use maven to package my code and call String[] jars = { /SparkExamples/target/word-count-examples_2.10-1.0.0.jar }; sparkConf.setJars(jars); After that the sample ran properly and in the debugger I could set break points in the main. However when I do something like JavaRDDString words = lines.flatMap( new WordsMapFunction()); where WordsMapFunction is a separate class like public static class WordsMapFunction implements FlatMapFunctionString, String { private static final Pattern SPACE = Pattern.compile( ); public IterableString call(String s) { String[] split = SPACE.split(s); for (int i = 0; i split.length; i++) { split[i] = toUpperCase(split[i]); } return Arrays.asList(split); } } Breakpoints set in WordsMapFunction are never hit. Most interesting functionality in the problems I am trying to solve if in the FlatMapFunction and the Function2 code and this is the functionality I will need to examine in more detail. Has anyone figured out how to configure a project to hit breakpoints in these functions??
Re: How do you hit breakpoints using IntelliJ In functions used by an RDD
flatMap() is a transformation only. Calling it by itself does nothing, and it just describes the relationship between one RDD and another. You should see it swing into action if you invoke an action, like count(), on the words RDD. On Mon, Aug 25, 2014 at 6:32 PM, Steve Lewis lordjoe2...@gmail.com wrote: I was able to get JavaWordCount running with a local instance under IntelliJ. In order to do so I needed to use maven to package my code and call String[] jars = { /SparkExamples/target/word-count-examples_2.10-1.0.0.jar }; sparkConf.setJars(jars); After that the sample ran properly and in the debugger I could set break points in the main. However when I do something like JavaRDDString words = lines.flatMap( new WordsMapFunction()); where WordsMapFunction is a separate class like public static class WordsMapFunction implements FlatMapFunctionString, String { private static final Pattern SPACE = Pattern.compile( ); public IterableString call(String s) { String[] split = SPACE.split(s); for (int i = 0; i split.length; i++) { split[i] = toUpperCase(split[i]); } return Arrays.asList(split); } } Breakpoints set in WordsMapFunction are never hit. Most interesting functionality in the problems I am trying to solve if in the FlatMapFunction and the Function2 code and this is the functionality I will need to examine in more detail. Has anyone figured out how to configure a project to hit breakpoints in these functions?? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How do you hit breakpoints using IntelliJ In functions used by an RDD
That was not quite in English My Flatmap code is shown below I know the code is called since the answers are correct but would like to put a break point in dropNonLetters to make sure that code works properly I am running in the IntelliJ debugger but believe the code is executing on a Spark Worker. I am not sure what magic Intellij uses to hook up a debugger to a worker but hope it is possib;e public class WordsMapFunction implements FlatMapFunctionString, String { private static final Pattern SPACE = Pattern.compile( ); public IterableString call(String s) { String[] split = SPACE.split(s); for (int i = 0; i split.length; i++) { split[i] = regularizeString(split[i]); } return Arrays.asList(split); } public static String dropNonLetters(String s) { StringBuilder sb = new StringBuilder(); for (int i = 0; i s.length(); i++) { char c = s.charAt(i); if (Character.isLetter(c)) sb.append(c); } return sb.toString(); } public static String regularizeString(String inp) { inp = inp.trim(); inp = inp.toUpperCase(); return dropNonLetters(inp); } } On Mon, Aug 25, 2014 at 10:35 AM, Sean Owen so...@cloudera.com wrote: flatMap() is a transformation only. Calling it by itself does nothing, and it just describes the relationship between one RDD and another. You should see it swing into action if you invoke an action, like count(), on the words RDD. On Mon, Aug 25, 2014 at 6:32 PM, Steve Lewis lordjoe2...@gmail.com wrote: I was able to get JavaWordCount running with a local instance under IntelliJ. In order to do so I needed to use maven to package my code and call String[] jars = { /SparkExamples/target/word-count-examples_2.10-1.0.0.jar }; sparkConf.setJars(jars); After that the sample ran properly and in the debugger I could set break points in the main. However when I do something like JavaRDDString words = lines.flatMap( new WordsMapFunction()); where WordsMapFunction is a separate class like public static class WordsMapFunction implements FlatMapFunctionString, String { private static final Pattern SPACE = Pattern.compile( ); public IterableString call(String s) { String[] split = SPACE.split(s); for (int i = 0; i split.length; i++) { split[i] = toUpperCase(split[i]); } return Arrays.asList(split); } } Breakpoints set in WordsMapFunction are never hit. Most interesting functionality in the problems I am trying to solve if in the FlatMapFunction and the Function2 code and this is the functionality I will need to examine in more detail. Has anyone figured out how to configure a project to hit breakpoints in these functions?? -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Re: Low Level Kafka Consumer for Spark
Hi Dibyendu, My colleague has taken a look at the spark kafka consumer github you have provided and started experimenting. We found that somehow when Spark has a failure after a data checkpoint, the expected re-computations correspondent to the metadata checkpoints are not recovered so we loose Kafka messages and RDD's computations in Spark. The impression is that this code is replacing quite a bit of Spark Kafka Streaming code where maybe (not sure) metadata checkpoints are done every batch interval. Was it on purpose to solely depend on the Kafka commit to recover data and recomputations between data checkpoints? If so, how to make this work? tnks Rod -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12757.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
GraphX usecases
Hi, I am exploring GraphX library and trying to determine which usecases make most sense for/with it. From what I initially thought, it looked like GraphX could be applied to data stored in RDBMSs as Spark could translate the relational data into graphical representation. However, there seems to be no conversation and everything presented in GraphX implementations AFAIK, works on vertices and edges. So does it mean that GraphX is only relevant when the backend is a GDBMS? Does this We introduce GraphX, which combines the advantages of both data-parallel and graph-parallel systems by efficiently expressing graph computation within the Spark data-parallel framework. We leverage new ideas in distributed graph representation to efficiently distribute graphs as tabular data-structures. Similarly, we leverage advances in data-flow systems to exploit in-memory computation and fault-tolerance. mean that GraphX makes the typical RDBMS operations possible even when the data is persisted in a GDBMS and not viceversa? regards Sunita
Re: Spark - GraphX pregel like with global variables (accumulator / broadcast)
At 2014-08-25 06:41:36 -0700, BertrandR bertrand.rondepierre...@gmail.com wrote: Unfortunately, this works well for extremely small graphs, but it becomes exponentially slow with the size of the graph and the number of iterations (doesn't finish 20 iterations with graphs having 48000 edges). [...] It seems to me that a lot of things are unnecessarily recomputed at each iterations whatever I try to do. I also did multiple changes to limit the number of dependency of each object, but it didn't change anything. [...] fusionBcst.unpersist(blocking = false) The problem is almost certainly because of unpersisting. If you comment out all the unpersist lines, the program should run normally. Unpersisting is very tricky because of the internal dependency structure of graphs: they maintain a vertex and an edge RDD, and each depends on both from the previous iteration. A future update to GraphX will unify them so that a graph only has one RDD, and this will make it easier to unpersist correctly. Until then, unpersisting may not be worth the trouble. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: GraphX usecases
At 2014-08-25 11:23:37 -0700, Sunita Arvind sunitarv...@gmail.com wrote: Does this We introduce GraphX, which combines the advantages of both data-parallel and graph-parallel systems by efficiently expressing graph computation within the Spark data-parallel framework. We leverage new ideas in distributed graph representation to efficiently distribute graphs as tabular data-structures. Similarly, we leverage advances in data-flow systems to exploit in-memory computation and fault-tolerance. mean that GraphX makes the typical RDBMS operations possible even when the data is persisted in a GDBMS and not viceversa? This quote refers to the research idea that while previous graph-parallel systems (Pregel, GraphLab, etc.) were built as specialized systems for performance, it's actually possible to avoid the trouble of a separate system by embedding graph computation efficiently in a general data-parallel system. Here data-parallel refers generally to any system that can support the join optimizations, including Spark and, with some work on the optimizer, relational databases as well. So GraphX use data-parallel or relational operators to provide graph computation, not the other way around. From what I initially thought, it looked like GraphX could be applied to data stored in RDBMSs as Spark could translate the relational data into graphical representation. However, there seems to be no conversation and everything presented in GraphX implementations AFAIK, works on vertices and edges. So does it mean that GraphX is only relevant when the backend is a GDBMS? GraphX, the library on top of Spark, can be applied indirectly to relational data as you described: you can use Spark to load vertex and edge tables from a relational database, then process them with GraphX. This isn't discussed in the GraphX documentation because it's a concern of Spark. GraphX is only relevant once you have the vertices and edges in RDD form. GraphX, the research concept, can in theory be implemented directly in a relational database by augmenting the query optimizer to support the optimizations described in the paper and setting up the appropriate indexes on the vertex and edge tables. Ankur - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark QL and protobuf schema
In general all PRs should be made against master. When necessary, we can back port them to the 1.1 branch as well. However, since we are in code-freeze for that branch, we'll only do that for major bug fixes at this point. On Thu, Aug 21, 2014 at 10:58 AM, Dmitriy Lyubimov dlie...@gmail.com wrote: ok i'll try. happen to do that a lot to other tools. So I am guessing you are saying if i wanted to do it now, i'd start against https://github.com/apache/spark/tree/branch-1.1 and PR against it? On Thu, Aug 21, 2014 at 12:28 AM, Michael Armbrust mich...@databricks.com wrote: I do not know of any existing way to do this. It should be possible using the new public API for applying schema (will be available in 1.1) to an RDD. Basically you'll need to convert the proto buff records into rows, and also create a StructType that represents the schema. With this two things you can all the applySchema method on SparkContext. Would be great if you could contribute this back. On Wed, Aug 20, 2014 at 5:57 PM, Dmitriy Lyubimov dlie...@gmail.com wrote: Hello, is there any known work to adapt protobuf schema to Spark QL data sourcing? If not, would it present interest to contribute one? thanks. -d
Read timeout while running a Job on data in S3
I am running a spark job on ~ 124 GB of data in a S3 bucket. The Job runs fine but occasionally returns the following exception during the first map stage which involves reading and transforming the data from S3. Is there a config parameter I can set to increase this timeout limit? *14/08/23 04:45:46 WARN scheduler.TaskSetManager: Lost task 1379.0 in stage 1.0 (TID 1379, ip-10-237-195-11.ec2.internal): java.net.SocketTimeoutException: Read timed out* * java.net.SocketInputStream.socketRead0(Native Method)* * java.net.SocketInputStream.read(SocketInputStream.java:152)* *java.net.SocketInputStream.read(SocketInputStream.java:122)* *sun.security.ssl.InputRecord.readFully(InputRecord.java:442)* *sun.security.ssl.InputRecord.read(InputRecord.java:480)* *sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)* * sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884)* *sun.security.ssl.AppInputStream.read(AppInputStream.java:102)* *java.io.BufferedInputStream.read1(BufferedInputStream.java:273)* *java.io.BufferedInputStream.read(BufferedInputStream.java:334)* * org.apache.commons.httpclient.ContentLengthInputStream.read(ContentLengthInputStream.java:170)* *java.io.FilterInputStream.read(FilterInputStream.java:133)* * org.apache.commons.httpclient.AutoCloseInputStream.read(AutoCloseInputStream.java:108)* * org.jets3t.service.io.InterruptableInputStream.read(InterruptableInputStream.java:76)* * org.jets3t.service.impl.rest.httpclient.HttpMethodReleaseInputStream.read(HttpMethodReleaseInputStream.java:136)* * org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.read(NativeS3FileSystem.java:98)* *java.io.BufferedInputStream.read1(BufferedInputStream.java:273)* *java.io.BufferedInputStream.read(BufferedInputStream.java:334)* *java.io.DataInputStream.read(DataInputStream.java:100)* *org.apache.hadoop.util.LineReader.readLine(LineReader.java:134)* * org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:133)* * org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)* * org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:219)* * org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:188)* *org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)* * org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)* *scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)* *scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)* *scala.collection.Iterator$class.foreach(Iterator.scala:727)* *scala.collection.AbstractIterator.foreach(Iterator.scala:1157)* * org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340)* * org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)* * org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)* * org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)* * org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)* * org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183) *
Re: spark and matlab
Have you tried the pipe() operator? It should work if you can launch your script from the command line. Just watch out for any environment variables needed (you can pass them to pipe() as an optional argument if there are some). On August 25, 2014 at 12:41:29 AM, Jaonary Rabarisoa (jaon...@gmail.com) wrote: Hi all, Is there someone that tried to pipe RDD into matlab script ? I'm trying to do something similiar if one of you could point some hints. Best regards, Jao
Re: HiveContext ouput log file
Just like with normal Spark Jobs, that command returns an RDD that contains the lineage for computing the answer but does not actually compute the answer. You'll need to run collect() on the RDD in order to get the result. On Mon, Aug 25, 2014 at 11:46 AM, S Malligarjunan smalligarju...@yahoo.com.invalid wrote: Hello All, I have executed the following udf sql in my spark hivecontext, hiveContext.hql(select count(t1.col1) from t1 join t2 where myUDF(t1.id , t2.id) = true) Where do i find the count output? Thanks and Regards, Sankar S.
Re: SPARK Hive Context UDF Class Not Found Exception,
Which version of Spark SQL are you using? Several issues with custom hive UDFs have been fixed in 1.1. On Mon, Aug 25, 2014 at 9:57 AM, S Malligarjunan smalligarju...@yahoo.com.invalid wrote: Hello All, I have added a jar from S3 instance into classpath, i have tried following options 1. sc.addJar(s3n://mybucket/lib/myUDF.jar) 2. hiveContext.sparkContext.addJar(s3n://mybucket/lib/myUDF.jar) 3. ./bin/spark-shell --jars s3n://mybucket/lib/myUDF.jar I am getting ClassNotException when trying to create a temporary function. What would be the issue here? Thanks and Regards, Sankar S.
Re: [Spark SQL] How to select first row in each GROUP BY group?
In our case, the ROW has about 80 columns which exceeds the case class limit. Starting with Spark 1.1 you'll be able to also use the applySchema API https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L126 .
Re: Spark SQL: Caching nested structures extremely slow
One useful thing to do when you run into unexpected slowness is to run 'jstack' a few times on the driver and executors and see if there is any particular hotspot in the Spark SQL code. Also, it seems like a better option here might be to use the new applySchema API https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala#L126 that has been added for the 1.1 release. I'd be curious how this helps your performance. On Thu, Aug 21, 2014 at 1:31 PM, Yin Huai huaiyin@gmail.com wrote: I have not profiled this part. But, I think one possible cause is allocating an array for every inner struct for every row (every struct value is represented by a Spark SQL row). I will play with it later and see what I find. On Tue, Aug 19, 2014 at 9:01 PM, Evan Chan velvia.git...@gmail.com wrote: Hey guys, I'm using Spark 1.0.2 in AWS with 8 x c3.xlarge machines. I am working with a subset of the GDELT dataset (57 columns, 250 million rows, but my subset is only 4 million) and trying to query it with Spark SQL. Since a CSV importer isn't available, my first thought was to use nested case classes (since Scala has a limit of 22 fields, plus there are lots of repeated fields in GDELT).The case classes look like this: case class ActorInfo(Code: String, Name: String, CountryCode: String, KnownGroupCode: String, EthnicCode: String, Religion1Code: String, Religion2Code: String, Type1Code: String, Type2Code: String, Type3Code: String) case class GeoInfo(`Type`: Int, FullName: String, CountryCode: String, ADM1Code: String, Lat: Float, `Long`: Float, FeatureID: Int) case class GDeltRow(EventId: Int, Day: Int, MonthYear: Int, Year: Int, FractionDate: Float, Actor1: ActorInfo, Actor2: ActorInfo, IsRootEvent: Byte, EventCode: String, EventBaseCode: String, EventRootCode: String, QuadClass: Int, GoldsteinScale: Float, NumMentions: Int, NumSources: Int, NumArticles: Int, AvgTone: Float, Actor1Geo: GeoInfo, Actor2Geo: GeoInfo, ActionGeo: GeoInfo, DateAdded: String) Then I use sc.textFile(...) to parse the CSV into an RDD[GDeltRow]. I can query these records without caching. However, if I attempt to cache using registerAsTable() and then sqlContext.cacheTable(...), it is extremely slow (takes 1 hour !!). Any queries using them are also extremely slow. I had tested Spark SQL using a flat structure (no nesting) on a different dataset and the caching and queries were both extremely fast. Thinking that this is an issue with the case classes, I saved them to parquet files and used sqlContext.parquetFile(), but the slowness is the same. This makes sense, since the internal structure of SchemaRdds is basically the same. In both cases, for both parquet and case classes, the schema is the same. Has anybody else experienced this slowness with nested structures? Is this a known problem and being worked on? The only way to work around this issue I can think of is to convert to JSON, which is tedious, or to construct Parquet files manually (also tedious). thanks, Evan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: apply at Option.scala:120
This should be fixed in the latest Spark. What branch are you running? 2014-08-25 1:32 GMT-07:00 Wang, Jensen jensen.w...@sap.com: Hi, All When I run spark applications, I see from the web-ui that some stage description are like “apply at Option.scala:120”. Why spark splits a stage on a line that is not in my spark program but a Scala library? Thanks Jensen
Re: Writeup on Spark SQL with GDELT
Thanks for this very thorough write-up and for continuing to update it as you progress! As I said in the other thread it would be great to do a little profiling to see if we can get to the heart of the slowness with nested case classes (very little optimization has been done in this code path). If you can come up with a simple micro benchmark that shows its much slower using the case class API than with applySchema, I'd go ahead and open a JIRA. On Thu, Aug 21, 2014 at 12:04 PM, Evan Chan velvia.git...@gmail.com wrote: I just put up a repo with a write-up on how to import the GDELT public dataset into Spark SQL and play around. Has a lot of notes on different import methods and observations about Spark SQL. Feel free to have a look and comment. http://www.github.com/velvia/spark-sql-gdelt - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Potential Thrift Server Bug on Spark SQL,perhaps with cache table?
Hi John, I tried to follow your description but failed to reproduce this issue. Would you mind to provide some more details? Especially: - Exact Git commit hash of the snapshot version you were using Mine: e0f946265b9ea5bc48849cf7794c2c03d5e29fba https://github.com/apache/spark/commit/e0f946265b9ea5bc48849cf7794c2c03d5e29fba - Compilation flags (Hadoop version, profiles enabled, etc.) Mine: ./sbt/sbt -Pyarn,kinesis-asl,hive,hadoop-2.3 -Dhadoop.version=2.3.0 clean assembly/assembly - Also, it would be great if you can provide the schema of your table plus some sample data that can help reproduce this issue. Cheng On Wed, Aug 20, 2014 at 6:11 AM, John Omernik j...@omernik.com wrote: I am working with Spark SQL and the Thrift server. I ran into an interesting bug, and I am curious on what information/testing I can provide to help narrow things down. My setup is as follows: Hive 0.12 with a table that has lots of columns (50+) stored as rcfile. Spark-1.1.0-SNAPSHOT with Hive Built in (and Thrift Server) My query is only selecting one STRING column from the data, but only returning data based on other columns . Types: col1 = STRING col2 = STRING col3 = STRING col4 = Partition Field (TYPE STRING) Queries cache table table1; --Run some other queries on other data select col1 from table1 where col2 = 'foo' and col3 = 'bar' and col4 = 'foobar' and col1 is not null limit 100 Fairly simple query. When I run this in SQL Squirrel I get no results. When I remove the and col1 is not null I get 100 rows of null When I run this in beeline (the one that is in the spark-1.1.0-SNAPSHOT) I get no results and when I remove 'and col1 is not null' I gett 100 rows of null Note: Both of these are after I ran some other queries.. .i.e. on other columns, after I ran CACHE TABLE TABLE1 first before any queries. That seemed interesting to me... So I went to the spark-shell to determine if it was a spark issue, or a thrift issue. I ran: val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) import hiveContext._ cacheTable(table1) Then I ran the same other queries got results, and then I ran the query above, and I got results as expected. Interestingly enough, if I don't cache the table through cache table table1 in thrift, I get results for all queries. If I uncache, I start getting results again. I hope I was clear enough here, I am happy to help however I can. John
Re: countByWindow save the count ?
You could try to use foreachRDD on the result of countByWindow with a function that performs the save operation. On Fri, Aug 22, 2014 at 1:58 AM, Josh J joshjd...@gmail.com wrote: Hi, Hopefully a simple question. Though is there an example of where to save the output of countByWindow ? I would like to save the results to external storage (kafka or redis). The examples show only stream.print() Thanks, Josh
Re: Merging two Spark SQL tables?
SO I tried the above (why doesn't union or ++ have the same behavior btw?) I don't think there is a good reason for this. I'd open a JIRA. and it works, but is slow because the original Rdds are not cached and files must be read from disk. I also discovered you can recover the InMemoryCached versions of the Rdds using sqlContext.table(table1). Yeah, this is an unfortunate consequence of the way we handle caching. I've opened this JIRA for the 1.2 roadmap: https://issues.apache.org/jira/browse/SPARK-3212
Request for Help
Hi Guys, I just want to know whether their is any way to determine which file is being handled by spark from a group of files input inside a directory.Suppose I have 1000 files which are given as input,I want to determine which file is being handled currently by spark program so that if any error creeps in at any point of time we can easily determine that particular file as faulty one. Please let me know your thoughts. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Request-for-Help-tp12776.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark QL and protobuf schema
In general master should be a superset of what is in any of the release branches. In the particular case of Spark SQL master and branch-1.1 should be identical (though that will likely change once Patrick cuts the first RC). On Mon, Aug 25, 2014 at 12:50 PM, Dmitriy Lyubimov dlie...@gmail.com wrote: Ok, I was just asking that the changes you've mentioned are likely to be found on 1.1 branch so it would make sense for my starting point to fork off 1.1. Or perhaps master. The question of PR is fairly far off at this point, for legal reasons if nothing else. if and by the time the work is approved for contribution, obviously PR process will be followed. On Mon, Aug 25, 2014 at 11:57 AM, Michael Armbrust mich...@databricks.com wrote: In general all PRs should be made against master. When necessary, we can back port them to the 1.1 branch as well. However, since we are in code-freeze for that branch, we'll only do that for major bug fixes at this point. On Thu, Aug 21, 2014 at 10:58 AM, Dmitriy Lyubimov dlie...@gmail.com wrote: ok i'll try. happen to do that a lot to other tools. So I am guessing you are saying if i wanted to do it now, i'd start against https://github.com/apache/spark/tree/branch-1.1 and PR against it? On Thu, Aug 21, 2014 at 12:28 AM, Michael Armbrust mich...@databricks.com wrote: I do not know of any existing way to do this. It should be possible using the new public API for applying schema (will be available in 1.1) to an RDD. Basically you'll need to convert the proto buff records into rows, and also create a StructType that represents the schema. With this two things you can all the applySchema method on SparkContext. Would be great if you could contribute this back. On Wed, Aug 20, 2014 at 5:57 PM, Dmitriy Lyubimov dlie...@gmail.com wrote: Hello, is there any known work to adapt protobuf schema to Spark QL data sourcing? If not, would it present interest to contribute one? thanks. -d
Re: How do you hit breakpoints using IntelliJ In functions used by an RDD
PS from an offline exchange -- yes more is being called here, the rest is the standard WordCount example. The trick was to make sure the task executes locally, and calling setMaster(local) on SparkConf in the example code does that. That seems to work fine in IntelliJ for debugging this. On Mon, Aug 25, 2014 at 6:41 PM, Steve Lewis lordjoe2...@gmail.com wrote: That was not quite in English My Flatmap code is shown below I know the code is called since the answers are correct but would like to put a break point in dropNonLetters to make sure that code works properly I am running in the IntelliJ debugger but believe the code is executing on a Spark Worker. I am not sure what magic Intellij uses to hook up a debugger to a worker but hope it is possib;e public class WordsMapFunction implements FlatMapFunctionString, String { private static final Pattern SPACE = Pattern.compile( ); public IterableString call(String s) { String[] split = SPACE.split(s); for (int i = 0; i split.length; i++) { split[i] = regularizeString(split[i]); } return Arrays.asList(split); } public static String dropNonLetters(String s) { StringBuilder sb = new StringBuilder(); for (int i = 0; i s.length(); i++) { char c = s.charAt(i); if (Character.isLetter(c)) sb.append(c); } return sb.toString(); } public static String regularizeString(String inp) { inp = inp.trim(); inp = inp.toUpperCase(); return dropNonLetters(inp); } } On Mon, Aug 25, 2014 at 10:35 AM, Sean Owen so...@cloudera.com wrote: flatMap() is a transformation only. Calling it by itself does nothing, and it just describes the relationship between one RDD and another. You should see it swing into action if you invoke an action, like count(), on the words RDD. On Mon, Aug 25, 2014 at 6:32 PM, Steve Lewis lordjoe2...@gmail.com wrote: I was able to get JavaWordCount running with a local instance under IntelliJ. In order to do so I needed to use maven to package my code and call String[] jars = { /SparkExamples/target/word-count-examples_2.10-1.0.0.jar }; sparkConf.setJars(jars); After that the sample ran properly and in the debugger I could set break points in the main. However when I do something like JavaRDDString words = lines.flatMap( new WordsMapFunction()); where WordsMapFunction is a separate class like public static class WordsMapFunction implements FlatMapFunctionString, String { private static final Pattern SPACE = Pattern.compile( ); public IterableString call(String s) { String[] split = SPACE.split(s); for (int i = 0; i split.length; i++) { split[i] = toUpperCase(split[i]); } return Arrays.asList(split); } } Breakpoints set in WordsMapFunction are never hit. Most interesting functionality in the problems I am trying to solve if in the FlatMapFunction and the Function2 code and this is the functionality I will need to examine in more detail. Has anyone figured out how to configure a project to hit breakpoints in these functions?? -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Storage Handlers in Spark SQL
- dev list + user list You should be able to query Spark SQL using JDBC, starting with the 1.1 release. There is some documentation is the repo https://github.com/apache/spark/blob/master/docs/sql-programming-guide.md#running-the-thrift-jdbc-server, and we'll update the official docs once the release is out. On Thu, Aug 21, 2014 at 4:43 AM, Niranda Perera nira...@wso2.com wrote: Hi, I have been playing around with Spark for the past few days, and evaluating the possibility of migrating into Spark (Spark SQL) from Hive/Hadoop. I am working on the WSO2 Business Activity Monitor (WSO2 BAM, https://docs.wso2.com/display/BAM241/WSO2+Business+Activity+Monitor+Documentation ) which has currently employed Hive. We are considering Spark as a successor for Hive, given it's performance enhancement. We have currently employed several custom storage-handlers in Hive. Example: WSO2 JDBC and Cassandra storage handlers: https://docs.wso2.com/display/BAM241/JDBC+Storage+Handler+for+Hive https://docs.wso2.com/display/BAM241/Creating+Hive+Queries+to+Analyze+Data#CreatingHiveQueriestoAnalyzeData-cas I would like to know where Spark SQL can work with these storage handlers (while using HiveContext may be) ? Best regards -- *Niranda Perera* Software Engineer, WSO2 Inc. Mobile: +94-71-554-8430 Twitter: @n1r44 https://twitter.com/N1R44
Spark Screencast doesn't show in Chrome on OS X
https://spark.apache.org/screencasts/1-first-steps-with-spark.html The embedded YouTube video shows up in Safari on OS X but not in Chrome. How come? Nick -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Screencast-doesn-t-show-in-Chrome-on-OS-X-tp12782.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
RE: Hive From Spark
Hi Du, I didn't notice the ticket was updated recently. SPARK-2848 is a sub-task of Spark-2420, and it's already resolved in Spark 1.1.0.It looks like Spark-2420 will release in Spark 1.2.0 according to the current JIRA status. I'm tracking branch-1.1 instead of the master and haven't seen the results merged. Still seeing guava 14.0.1 so I don't think Spark 2848 has been merged yet. Will be great to have someone to confirm or clarify the expectation. From: l...@yahoo-inc.com.INVALID To: van...@cloudera.com; alee...@hotmail.com CC: user@spark.apache.org Subject: Re: Hive From Spark Date: Sat, 23 Aug 2014 00:08:47 + I thought the fix had been pushed to the apache master ref. commit [SPARK-2848] Shade Guava in uber-jars By Marcelo Vanzin on 8/20. So my previous email was based on own build of the apache master, which turned out not working yet. Marcelo: Please correct me if I got that commit wrong. Thanks, Du On 8/22/14, 11:41 AM, Marcelo Vanzin van...@cloudera.com wrote: SPARK-2420 is fixed. I don't think it will be in 1.1, though - might be too risky at this point. I'm not familiar with spark-sql. On Fri, Aug 22, 2014 at 11:25 AM, Andrew Lee alee...@hotmail.com wrote: Hopefully there could be some progress on SPARK-2420. It looks like shading may be the voted solution among downgrading. Any idea when this will happen? Could it happen in Spark 1.1.1 or Spark 1.1.2? By the way, regarding bin/spark-sql? Is this more of a debugging tool for Spark job integrating with Hive? How does people use spark-sql? I'm trying to understand the rationale and motivation behind this script, any idea? Date: Thu, 21 Aug 2014 16:31:08 -0700 Subject: Re: Hive From Spark From: van...@cloudera.com To: l...@yahoo-inc.com.invalid CC: user@spark.apache.org; u...@spark.incubator.apache.org; pwend...@gmail.com Hi Du, I don't believe the Guava change has made it to the 1.1 branch. The Guava doc says hashInt was added in 12.0, so what's probably happening is that you have and old version of Guava in your classpath before the Spark jars. (Hadoop ships with Guava 11, so that may be the source of your problem.) On Thu, Aug 21, 2014 at 4:23 PM, Du Li l...@yahoo-inc.com.invalid wrote: Hi, This guava dependency conflict problem should have been fixed as of yesterday according to https://issues.apache.org/jira/browse/SPARK-2420 However, I just got java.lang.NoSuchMethodError: com.google.common.hash.HashFunction.hashInt(I)Lcom/google/common/hash/Ha shCode; by the following code snippet and ³mvn3 test² on Mac. I built the latest version of spark (1.1.0-SNAPSHOT) and installed the jar files to the local maven repo. From my pom file I explicitly excluded guava from almost all possible dependencies, such as spark-hive_2.10-1.1.0.SNAPSHOT, and hadoop-client. This snippet is abstracted from a larger project. So the pom.xml includes many dependencies although not all are required by this snippet. The pom.xml is attached. Anybody knows what to fix it? Thanks, Du --- package com.myself.test import org.scalatest._ import org.apache.hadoop.io.{NullWritable, BytesWritable} import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ class MyRecord(name: String) extends Serializable { def getWritable(): BytesWritable = { new BytesWritable(Option(name).getOrElse(\\N).toString.getBytes(UTF-8)) } final override def equals(that: Any): Boolean = { if( !that.isInstanceOf[MyRecord] ) false else { val other = that.asInstanceOf[MyRecord] this.getWritable == other.getWritable } } } class MyRecordTestSuite extends FunSuite { // construct an MyRecord by Consumer.schema val rec: MyRecord = new MyRecord(James Bond) test(generated SequenceFile should be readable from spark) { val path = ./testdata/ val conf = new SparkConf(false).setMaster(local).setAppName(test data exchange with Hive) conf.set(spark.driver.host, localhost) val sc = new SparkContext(conf) val rdd = sc.makeRDD(Seq(rec)) rdd.map((x: MyRecord) = (NullWritable.get(), x.getWritable())) .saveAsSequenceFile(path) val bytes = sc.sequenceFile(path, classOf[NullWritable], classOf[BytesWritable]).first._2 assert(rec.getWritable() == bytes) sc.stop() System.clearProperty(spark.driver.port) } } From: Andrew Lee alee...@hotmail.com Reply-To: user@spark.apache.org user@spark.apache.org Date: Monday, July 21, 2014 at 10:27 AM To: user@spark.apache.org user@spark.apache.org, u...@spark.incubator.apache.org u...@spark.incubator.apache.org Subject: RE: Hive From Spark Hi All, Currently, if you are running Spark HiveContext API with Hive 0.12, it won't work due to the following 2 libraries which are not consistent with Hive
unable to instantiate HiveMetaStoreClient on LocalHiveContext
Hi, I created an instance of LocalHiveContext and attempted to create a database. However, it failed with message org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient”. My code is as follows. Similar code worked on spark-shell and also bin/run-example org.apache.spark.examples.sql.hive.HiveFromSpark. import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ import org.apache.spark.sql.hive.LocalHiveContext val conf = new SparkConf(false).setMaster(local).setAppName(test data exchange with Hive) conf.set(spark.driver.host, localhost) val sc = new SparkContext(conf) val hc = new LocalHiveContext(sc) hc.hql(“create database if not exists testdb) The exception was thrown out of the hql call. Did I miss any configuration? Thanks, Du
Does Spark Streaming count the number of windows processed?
Hi, Does any one know whether Spark Streaming count the number of windows processed? I am trying to keep a record of the result of processed windows and corresponding timestamp. But I cannot find any related documents or examples. Thanks, -JC -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-Streaming-count-the-number-of-windows-processed-tp12787.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Low Level Kafka Consumer for Spark
I like this consumer for what it promises - better control over offset and recovery from failures. If I understand this right, it still uses single worker process to read from Kafka (one thread per partition) - is there a way to specify multiple worker processes (on different machines) to read from Kafka? Maybe one worker process for each partition? If there is no such option, what happens when the single machine hosting the Kafka Reader worker process dies and is replaced by a different machine (like in cloud)? Thanks, Bharat -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Low-Level-Kafka-Consumer-for-Spark-tp11258p12788.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Trying to run SparkSQL over Spark Streaming
Hi, On Mon, Aug 25, 2014 at 7:11 PM, praveshjain1991 praveshjain1...@gmail.com wrote: If you want to issue an SQL statement on streaming data, you must have both the registerAsTable() and the sql() call *within* the foreachRDD(...) block, or -- as you experienced -- the table name will be unknown Since this is the case then is there any way to run join over data received from two different streams? Couldn't you do dstream1.join(dstream2).foreachRDD(...)? Tobias
Re: Trying to run SparkSQL over Spark Streaming
Hi again, On Tue, Aug 26, 2014 at 10:13 AM, Tobias Pfeiffer t...@preferred.jp wrote: On Mon, Aug 25, 2014 at 7:11 PM, praveshjain1991 praveshjain1...@gmail.com wrote: If you want to issue an SQL statement on streaming data, you must have both the registerAsTable() and the sql() call *within* the foreachRDD(...) block, or -- as you experienced -- the table name will be unknown Since this is the case then is there any way to run join over data received from two different streams? Couldn't you do dstream1.join(dstream2).foreachRDD(...)? Ah, I guess you meant something like SELECT * FROM dstream1 JOIN dstream2 WHERE ...? I don't know if that is possible. Doesn't seem easy to me; I don't think that's doable with the current codebase... Tobias
Re: Spark webUI - application details page
Hi, I am able to access the Application details web page from the master UI page when I run Spark in standalone mode on my local machine. However, I am not able to access it when I run Spark on our private cluster. The Spark master runs on one of the nodes in the cluster. I am able to access the spark master UI at spark://master-url:8080. It shows the listing of all the running and completed apps. When I click on the completed app, and access the Application details link, the link points to: master-url/app/?appId=app-idvalue When I view the page source to view the html source, the href portion is blank (). However, on my local machine, when I click the Application detail link for a completed app, it correctly points to master-url/history/app-id and when I view the page's html source, the href portion points to /history/app-id On the cluster, I have set spark.eventLog.enabled to true in $SPARK_HOME/conf/spark-defaults.conf on the master node as well as all the slave nodes. I am using spark 1.0.1 on the cluster. I am not sure why I am able to access the application details for completed apps when the app runs on my local machine but not for the apps that run on our cluster, although in both cases I am using spark 1.0.1 in standalone mode. Do I need to do any additional configuration to enable this history on the cluster? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p12792.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Request for help in writing to Textfile
You can try to manipulate the string you want to output before saveAsTextFile, something like modify. flatMap(x=x).map{x= val s=x.toString s.subSequence(1,s.length-1) } Should have more optimized way. Best Regards, Raymond Liu -Original Message- From: yh18190 [mailto:yh18...@gmail.com] Sent: Monday, August 25, 2014 9:57 PM To: u...@spark.incubator.apache.org Subject: Request for help in writing to Textfile Hi Guys, I am currently playing with huge data.I have an RDD which returns RDD[List[(tuples)]].I need only the tuples to be written to textfile output using saveAsTextFile function. example:val mod=modify.saveASTextFile() returns List((20140813,4,141127,3,HYPHLJLU,HY,KNGHWEB,USD,144.00,662.40,KY1), (20140813,4,141127,3,HYPHLJLU,HY,DBLHWEB,USD,144.00,662.40,KY1)) List((20140813,4,141127,3,HYPHLJLU,HY,KNGHWEB,USD,144.00,662.40,KY1), (20140813,4,141127,3,HYPHLJLU,HY,DBLHWEB,USD,144.00,662.40,KY1) I need following output with only tuple values in a textfile. 20140813,4,141127,3,HYPHLJLU,HY,KNGHWEB,USD,144.00,662.40,KY1 20140813,4,141127,3,HYPHLJLU,HY,DBLHWEB,USD,144.00,662.40,KY1 Please let me know if anybody has anyidea regarding this without using collect() function...Please help me -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Request-for-help-in-writing-to-Textfile-tp12744.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Screencast doesn't show in Chrome on OS X
It seems to be because you went there with https:// instead of http://. That said, we'll fix it so that it works on both protocols. Matei On August 25, 2014 at 1:56:16 PM, Nick Chammas (nicholas.cham...@gmail.com) wrote: https://spark.apache.org/screencasts/1-first-steps-with-spark.html The embedded YouTube video shows up in Safari on OS X but not in Chrome. How come? Nick View this message in context: Spark Screencast doesn't show in Chrome on OS X Sent from the Apache Spark User List mailing list archive at Nabble.com.
creating a subgraph with an edge predicate
Im currently creating a subgraph using the vertex predicate: subgraph(vpred = (vid,attr) = attr.split(,)(2)!=999) but wondering if a subgraph can be created using the edge predicate, if so a sample would be great :) thanks Dave -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/creating-a-subgraph-with-an-edge-predicate-tp12797.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to join two PairRDD together?
Can you paste the code? It's unclear to me how/when the out of memory is occurring without seeing the code. On Sun, Aug 24, 2014 at 11:37 PM, Gefei Li gefeili.2...@gmail.com wrote: Hello everyone, I am transplanting a clustering algorithm to spark platform, and I meet a problem confusing me for a long time, can someone help me? I have a PairRDDInteger, Integer named patternRDD, which the key represents a number and the value stores an information of the key. And I want to use two of the VALUEs to calculate a kendall number, and if the number is greater than 0.6, then output the two KEYs. I have tried to transform the PairRDD to a RDDTuple2Integer, Integer, and add a common key zero to them, and join two together then get a PairRDD0, IterableTuple2Tuple2key1, value1, Tuple2key2, value2, and tried to use values() method and map the keys out, but it gives me an out of memory error. I think the out of memory error is caused by the few entries of my RDD, but I have no idea how to solve it. Can you help me? Regards, Gefei Li
Re: Manipulating columns in CSV file or Transpose of Array[Array[String]] RDD
Assuming the CSV is well-formed (every row has the same number of columns) and every column is a number, this is how you can do it. You can adjust so that you pick just the columns you want, of course, by mapping each row to a new Array that contains just the column values you want. Just be sure the logic selects the same columns for every row or your stats might look funny. val rdd: RDD[Array[Double]] = ??? rdd.mapPartitions(vs = { Iterator(vs.toArray.transpose.map(StatCounter(_))) }).reduce((as, bs) = as.zipWithIndex.map { case (a, i) = a.merge(bs(i)) }) On Mon, Aug 25, 2014 at 9:50 AM, Hingorani, Vineet vineet.hingor...@sap.com wrote: Hello Victor, I want to do it on multiple columns. I was able to do it on one column by the help of Sean using code below. val matData = file.map(_.split(;)) val stats = matData.map(_(2).toDouble).stats() stats.mean stats.max Thank you Vineet *From:* Victor Tso-Guillen [mailto:v...@paxata.com] *Sent:* Montag, 25. August 2014 18:34 *To:* Hingorani, Vineet *Cc:* user@spark.apache.org *Subject:* Re: Manipulating columns in CSV file or Transpose of Array[Array[String]] RDD Do you want to do this on one column or all numeric columns? On Mon, Aug 25, 2014 at 7:09 AM, Hingorani, Vineet vineet.hingor...@sap.com wrote: Hello all, Could someone help me with the manipulation of csv file data. I have 'semicolon' separated csv data including doubles and strings. I want to calculate the maximum/average of a column. When I read the file using sc.textFile(test.csv).map(_.split(;), each field is read as string. Could someone help me with the above manipulation and how to do that. Or maybe if there is some way to take the transpose of the data and then manipulating the rows in some way? Thank you in advance, I am struggling with this thing for quite sometime Regards, Vineet
Re: amp lab spark streaming twitter example
Hi Jonathan, Thanks for the reply. I ran other exercises (movie recommendation and GraphX) on the same cluster and did not see these errors. So I think this might not be related to the memory setting.. Thanks, Forest On Aug 24, 2014, at 10:27 AM, Jonathan Haddad j...@jonhaddad.com wrote: Could you be hitting this? https://issues.apache.org/jira/browse/SPARK-3178 On Sun, Aug 24, 2014 at 10:21 AM, Forest D dev24a...@gmail.com wrote: Hi folks, I have been trying to run the AMPLab’s twitter streaming example (http://ampcamp.berkeley.edu/big-data-mini-course/realtime-processing-with-spark-streaming.html) in the last 2 days.I have encountered the same error messages as shown below: 14/08/24 17:14:22 ERROR client.AppClient$ClientActor: All masters are unresponsive! Giving up. 14/08/24 17:14:22 ERROR cluster.SparkDeploySchedulerBackend: Spark cluster looks dead, giving up. [error] (Thread-39) org.apache.spark.SparkException: Job aborted: Spark cluster looks down org.apache.spark.SparkException: Job aborted: Spark cluster looks down at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:262) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1478) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104) [trace] Stack trace suppressed: run last compile:run for the full output. --- Time: 1408900463000 ms --- 14/08/24 17:14:23 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory --- Time: 1408900464000 ms --- --- Time: 1408900465000 ms --- --- Time: 1408900466000 ms --- --- Time: 1408900467000 ms --- --- Time: 1408900468000 ms --- --- Time: 1408900469000 ms --- --- Time: 140890047 ms --- --- Time: 1408900471000 ms --- --- Time: 1408900472000 ms --- --- Time: 1408900473000 ms --- --- Time: 1408900474000 ms --- --- Time: 1408900475000 ms --- --- Time: 1408900476000 ms --- --- Time: 1408900477000 ms --- --- Time: 1408900478000 ms --- 14/08/24 17:14:38 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check
Re: Spark Screencast doesn't show in Chrome on OS X
https://spark.apache.org/screencasts/1-first-steps-with-spark.html The embedded YouTube video shows up in Safari on OS X but not in Chrome. I’m using Chrome 36.0.1985.143 on MacOS 10.9.4 and it it works like a charm for me. Cheers, Michael -- Michael Hausenblas Ireland, Europe http://mhausenblas.info/ On 25 Aug 2014, at 21:55, Nick Chammas nicholas.cham...@gmail.com wrote: https://spark.apache.org/screencasts/1-first-steps-with-spark.html The embedded YouTube video shows up in Safari on OS X but not in Chrome. How come? Nick View this message in context: Spark Screencast doesn't show in Chrome on OS X Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org