DStream flatMap "swallows" records
Hi all, I've got a problem with Spark Streaming (both 1.3.1 and 1.5). Following situation: There is a component which gets a DStream of URLs. For each of these URLs, it should access it, retrieve several data elements and pass those on for further processing. The relevant code looks like this: ... val urls: DStream[HttpRequest] = ... val documents = urls.flatMap { url => val docs: Seq[(Label, Document)] = fetcher.retrieveContent(url) System.err.println("D1: " + docs.size + " " + docs.map(_._2.source.timestamp)) docs } documents.count().foreachRDD { rdd => System.err.println("D2: " + rdd.collect().toList) } // write content to kafka documents.foreachRDD { rdd => rdd.foreachPartition { rddPartition => val docs = rddPartition.toList System.err.println("D3:" + docs.map {_._2.source.timestamp}) val messages = docs.map { t => ("raw", t._1, t._2) } Kafka.getSink(zkConfig).accept(messages) } } ... I see following output when I run this in Sparks local mode (cut irrelevant parts, "timestamp" is a unique sequence number to track documents): D2: List(0) D3:List() D1: 10 List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) D2: List(10) D1: 10 List(11, 12, 13, 14, 15, 16, 17, 18, 19, 20) D3:List(11, 12, 13, 14, 15, 16, 17, 18, 19, 20) D1: 10 List(21, 22, 23, 24, 25, 26, 27, 28, 29, 30) D1: 10 List(31, 32, 33, 34, 35, 36, 37, 38, 39, 40) D1: 10 List(41, 42, 43, 44, 45, 46, 47, 48, 49, 50) D1: 10 List(51, 52, 53, 54, 55, 56, 57, 58, 59, 60) D2: List(30) D1: 10 List(61, 62, 63, 64, 65, 66, 67, 68, 69, 70) D1: 10 List(71, 72, 73, 74, 75, 76, 77, 78, 79, 80) D1: 10 List(81, 82, 83, 84, 85, 86, 87, 88, 89, 90) D3:List(61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90) D1: 10 List(91, 92, 93, 94, 95, 96, 97, 98, 99, 100) D1: 0 List() D2: List(0) D1: 0 List() D3:List() D1: 0 List() D2: List(0) D3:List() When I look at the D1 lines (inside the flatMap function), I count 10 batches of 10 documents which is exactly as expected. Then I count the D1,2 lines though (after the flatMap function), there are only 40 documents. A document in my case is a key,value-tuple, the key objects in this case being the same for all records. Does anyone have an idea what might be happening to my other 60 documents? Thank you so much in advance! Regards, Jeffrey
Re: Spark as a service
Hi Ashish, this might be what you're looking for: https://spark.apache.org/docs/latest/sql-programming-guide.html#running-the-thrift-jdbcodbc-server Regards, Jeff 2015-03-24 11:28 GMT+01:00 Ashish Mukherjee ashish.mukher...@gmail.com: Hello, As of now, if I have to execute a Spark job, I need to create a jar and deploy it. If I need to run a dynamically formed SQL from a Web application, is there any way of using SparkSQL in this manner? Perhaps, through a Web Service or something similar. Regards, Ashish
Re: Spark as a service
I don't think there's are general approach to that - the usecases are just to different. If you really need it, you probably will have to implement yourself in the driver of your application. PS: Make sure to use the reply to all button so that the mailing list is included in your reply. Otherwise only I will get your mail. Regards, Jeff 2015-03-24 12:01 GMT+01:00 Ashish Mukherjee ashish.mukher...@gmail.com: Hi Jeffrey, Thanks. Yes, this resolves the SQL problem. My bad - I was looking for something which would work for Spark Streaming and other Spark jobs too, not just SQL. Regards, Ashish On Tue, Mar 24, 2015 at 4:07 PM, Jeffrey Jedele jeffrey.jed...@gmail.com wrote: Hi Ashish, this might be what you're looking for: https://spark.apache.org/docs/latest/sql-programming-guide.html#running-the-thrift-jdbcodbc-server Regards, Jeff 2015-03-24 11:28 GMT+01:00 Ashish Mukherjee ashish.mukher...@gmail.com: Hello, As of now, if I have to execute a Spark job, I need to create a jar and deploy it. If I need to run a dynamically formed SQL from a Web application, is there any way of using SparkSQL in this manner? Perhaps, through a Web Service or something similar. Regards, Ashish
Re: RDD storage in spark steaming
Hey Abhi, many of StreamingContext's methods to create input streams take a StorageLevel parameter to configure this behavior. RDD partitions are generally stored in the in-memory cache of worker nodes I think. You can also configure replication and spilling to disk if needed. Regards, Jeff 2015-03-23 15:26 GMT+01:00 abhi abhishek...@gmail.com: HI, i have a simple question about creating RDD . Whenever RDD is created in spark streaming for the particular time window .When does the RDD gets stored . 1. Does it get stored at the Driver machine ? or it gets stored on all the machines in the cluster ? 2. Does the data gets stored in memory by default ? Can it store at the memory and disk ? How can it configured ? Thanks, Abhi
Re: Spark streaming alerting
What exactly do you mean by alerts? Something specific to your data or general events of the spark cluster? For the first, sth like Akhil suggested should work. For the latter, I would suggest having a log consolidation system like logstash in place and use this to generate alerts. Regards, Jeff 2015-03-23 7:39 GMT+01:00 Akhil Das ak...@sigmoidanalytics.com: What do you mean you can't send it directly from spark workers? Here's a simple approach which you could do: val data = ssc.textFileStream(sigmoid/) val dist = data.filter(_.contains(ERROR)).foreachRDD(rdd = alert(Errors : + rdd.count())) And the alert() function could be anything triggering an email or sending an SMS alert. Thanks Best Regards On Sun, Mar 22, 2015 at 1:52 AM, Mohit Anchlia mohitanch...@gmail.com wrote: Is there a module in spark streaming that lets you listen to the alerts/conditions as they happen in the streaming module? Generally spark streaming components will execute on large set of clusters like hdfs or Cassandra, however when it comes to alerting you generally can't send it directly from the spark workers, which means you need a way to listen to the alerts.
Re: Load balancing
Hi Mohit, please make sure you use the Reply to all button and include the mailing list, otherwise only I will get your message ;) Regarding your question: Yes, that's also my understanding. You can partition streaming RDDs only by time intervals, not by size. So depending on your incoming rate, they may vary. I do not know exactly what the life cycle of the receiver is, but I don't think sth actually happens when you create the DStream. My guess would be that the receiver is allocated when you call StreamingContext#startStreams(), Regards, Jeff 2015-03-21 21:19 GMT+01:00 Mohit Anchlia mohitanch...@gmail.com: Could somebody help me understand the question I posted earlier? On Fri, Mar 20, 2015 at 9:44 AM, Mohit Anchlia mohitanch...@gmail.com wrote: Thanks for the pointer, looking at the below description from the site it looks like in spark block size is not fixed, it's determined by block interval and in fact for the same batch you could have different block sizes. Did I get it right? - Another parameter that should be considered is the receiver’s blocking interval, which is determined by the configuration parameter http://spark.apache.org/docs/1.2.1/configuration.html#spark-streaming spark.streaming.blockInterval. For most receivers, the received data is coalesced together into blocks of data before storing inside Spark’s memory. The number of blocks in each batch determines the number of tasks that will be used to process those the received data in a map-like transformation. The number of tasks per receiver per batch will be approximately (batch interval / block interval). For example, block interval of 200 ms will create 10 tasks per 2 second batches. Too low the number of tasks (that is, less than the number of cores per machine), then it will be inefficient as all available cores will not be used to process the data. To increase the number of tasks for a given batch interval, reduce the block interval. However, the recommended minimum value of block interval is about 50 ms, below which the task launching overheads may be a problem. -- Also, I am not clear about the data flow of the receiver. When client gets handle to a spark context and calls something like val lines = ssc. socketTextStream(localhost, ), is this the point when spark master is contacted to determine which spark worker node the data is going to go to? On Fri, Mar 20, 2015 at 1:33 AM, Jeffrey Jedele jeffrey.jed...@gmail.com wrote: Hi Mohit, it also depends on what the source for your streaming application is. If you use Kafka, you can easily partition topics and have multiple receivers on different machines. If you have sth like a HTTP, socket, etc stream, you probably can't do that. The Spark RDDs generated by your receiver will be partitioned and processed in a distributed manner like usual Spark RDDs however. There are parameters to control that behavior (e.g. defaultParallelism and blockInterval). See here for more details: http://spark.apache.org/docs/1.2.1/streaming-programming-guide.html#performance-tuning Regards, Jeff 2015-03-20 8:02 GMT+01:00 Akhil Das ak...@sigmoidanalytics.com: 1. If you are consuming data from Kafka or any other receiver based sources, then you can start 1-2 receivers per worker (assuming you'll have min 4 core per worker) 2. If you are having single receiver or is a fileStream then what you can do to distribute the data across machines is to do a repartition. Thanks Best Regards On Thu, Mar 19, 2015 at 11:32 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying to understand how to load balance the incoming data to multiple spark streaming workers. Could somebody help me understand how I can distribute my incoming data from various sources such that incoming data is going to multiple spark streaming nodes? Is it done by spark client with help of spark master similar to hadoop client asking namenodes for the list of datanodes?
Re: Spark per app logging
Hi, I'm not completely sure about this either, but this is what we are doing currently: Configure your logging to write to STDOUT, not to a file explicitely. Spark will capture stdour and stderr and separate the messages into a app/driver folder structure in the configured worker directory. We then use logstash to collect the logs and index them to a elasticsearch cluster (Spark seems to produce a lot of logging data). With some simple regex processing, you also get the application id as searchable field. Regards, Jeff 2015-03-20 22:37 GMT+01:00 Ted Yu yuzhih...@gmail.com: Are these jobs the same jobs, just run by different users or, different jobs ? If the latter, can each application use its own log4j.properties ? Cheers On Fri, Mar 20, 2015 at 1:43 PM, Udit Mehta ume...@groupon.com wrote: Hi, We have spark setup such that there are various users running multiple jobs at the same time. Currently all the logs go to 1 file specified in the log4j.properties. Is it possible to configure log4j in spark for per app/user logging instead of sending all logs to 1 file mentioned in the log4j.properties? Thanks Udit
Re: Spark Streaming Not Reading Messages From Multiple Kafka Topics
Hey Eason! Weird problem indeed. More information will probably help to find te issue: Have you searched the logs for peculiar messages? How does your Spark environment look like? #workers, #threads, etc? Does it work if you create separate receivers for the topics? Regards, Jeff 2015-03-21 2:27 GMT+01:00 EH eas...@gmail.com: Hi all, I'm building a Spark Streaming application that will continuously read multiple kafka topics at the same time. However, I found a weird issue that it reads only hundreds of messages then it stopped reading any more. If I changed the three topic to only one topic, then it is fine and it will continue to consume. Below is the code I have. val consumerThreadsPerInputDstream = 1 val topics = Map(raw_0 - consumerThreadsPerInputDstream) raw_1 - consumerThreadsPerInputDstream, raw_2 - consumerThreadsPerInputDstream) val msgs = KafkaUtils.createStream(ssc, 10.10.10.10:2181/hkafka, group01, topics).map(_._2) ... How come it will no longer consume after hundreds of messages for three topic reading? How to resolve this issue? Thank you for your help, Eason -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Not-Reading-Messages-From-Multiple-Kafka-Topics-tp22170.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Visualizing Spark Streaming data
I'll stay with my recommendation - that's exactly what Kibana is made for ;) 2015-03-20 9:06 GMT+01:00 Harut Martirosyan harut.martiros...@gmail.com: Hey Jeffrey. Thanks for reply. I already have something similar, I use Grafana and Graphite, and for simple metric streaming we've got all set-up right. My question is about interactive patterns. For instance, dynamically choose an event to monitor, dynamically choose group-by field or any sort of filter, then view results. This is easy when you have 1 user, but if you have team of analysts all specifying their own criteria, it becomes hard to manage them all. On 20 March 2015 at 12:02, Jeffrey Jedele jeffrey.jed...@gmail.com wrote: Hey Harut, I don't think there'll by any general practices as this part heavily depends on your environment, skills and what you want to achieve. If you don't have a general direction yet, I'd suggest you to have a look at Elasticsearch+Kibana. It's very easy to set up, powerful and therefore gets a lot of traction currently. Regards, Jeff 2015-03-20 8:43 GMT+01:00 Harut harut.martiros...@gmail.com: I'm trying to build a dashboard to visualize stream of events coming from mobile devices. For example, I have event called add_photo, from which I want to calculate trending tags for added photos for last x minutes. Then I'd like to aggregate that by country, etc. I've built the streaming part, which reads from Kafka, and calculates needed results and get appropriate RDDs, the question is now how to connect it to UI. Is there any general practices on how to pass parameters to spark from some custom built UI, how to organize data retrieval, what intermediate storages to use, etc. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-Spark-Streaming-data-tp22160.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- RGRDZ Harut
Re: Visualizing Spark Streaming data
Hey Harut, I don't think there'll by any general practices as this part heavily depends on your environment, skills and what you want to achieve. If you don't have a general direction yet, I'd suggest you to have a look at Elasticsearch+Kibana. It's very easy to set up, powerful and therefore gets a lot of traction currently. Regards, Jeff 2015-03-20 8:43 GMT+01:00 Harut harut.martiros...@gmail.com: I'm trying to build a dashboard to visualize stream of events coming from mobile devices. For example, I have event called add_photo, from which I want to calculate trending tags for added photos for last x minutes. Then I'd like to aggregate that by country, etc. I've built the streaming part, which reads from Kafka, and calculates needed results and get appropriate RDDs, the question is now how to connect it to UI. Is there any general practices on how to pass parameters to spark from some custom built UI, how to organize data retrieval, what intermediate storages to use, etc. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Visualizing-Spark-Streaming-data-tp22160.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Load balancing
Hi Mohit, it also depends on what the source for your streaming application is. If you use Kafka, you can easily partition topics and have multiple receivers on different machines. If you have sth like a HTTP, socket, etc stream, you probably can't do that. The Spark RDDs generated by your receiver will be partitioned and processed in a distributed manner like usual Spark RDDs however. There are parameters to control that behavior (e.g. defaultParallelism and blockInterval). See here for more details: http://spark.apache.org/docs/1.2.1/streaming-programming-guide.html#performance-tuning Regards, Jeff 2015-03-20 8:02 GMT+01:00 Akhil Das ak...@sigmoidanalytics.com: 1. If you are consuming data from Kafka or any other receiver based sources, then you can start 1-2 receivers per worker (assuming you'll have min 4 core per worker) 2. If you are having single receiver or is a fileStream then what you can do to distribute the data across machines is to do a repartition. Thanks Best Regards On Thu, Mar 19, 2015 at 11:32 PM, Mohit Anchlia mohitanch...@gmail.com wrote: I am trying to understand how to load balance the incoming data to multiple spark streaming workers. Could somebody help me understand how I can distribute my incoming data from various sources such that incoming data is going to multiple spark streaming nodes? Is it done by spark client with help of spark master similar to hadoop client asking namenodes for the list of datanodes?
Re: Writing Spark Streaming Programs
I second what has been said already. We just built a streaming app in Java and I would definitely choose Scala this time. Regards, Jeff 2015-03-19 16:34 GMT+01:00 Emre Sevinc emre.sev...@gmail.com: Hello James, I've been working with Spark Streaming for the last 6 months, and I'm coding in Java 7. Even though I haven't encountered any blocking issues with that combination, I'd definitely pick Scala if the decision was up to me. I agree with Gerard and Charles on this one. If you can, go with Scala for Spark Streaming applications. Cheers, Emre Sevinç http://www.bigindustries.be/ On Thu, Mar 19, 2015 at 4:09 PM, James King jakwebin...@gmail.com wrote: Many thanks Gerard, this is very helpful. Cheers! On Thu, Mar 19, 2015 at 4:02 PM, Gerard Maas gerard.m...@gmail.com wrote: Try writing this Spark Streaming idiom in Java and you'll choose Scala soon enough: dstream.foreachRDD{rdd = rdd.foreachPartition( partition = ) } When deciding between Java and Scala for Spark, IMHO Scala has the upperhand. If you're concerned with readability, have a look at the Scala coding style recently open sourced by DataBricks: https://github.com/databricks/scala-style-guide (btw, I don't agree a good part of it, but recognize that it can keep the most complex Scala constructions out of your code) On Thu, Mar 19, 2015 at 3:50 PM, James King jakwebin...@gmail.com wrote: Hello All, I'm using Spark for streaming but I'm unclear one which implementation language to use Java, Scala or Python. I don't know anything about Python, familiar with Scala and have been doing Java for a long time. I think the above shouldn't influence my decision on which language to use because I believe the tool should, fit the problem. In terms of performance Java and Scala are comparable. However Java is OO and Scala is FP, no idea what Python is. If using Scala and not applying a consistent style of programming Scala code can become unreadable, but I do like the fact it seems to be possible to do so much work with so much less code, that's a strong selling point for me. Also it could be that the type of programming done in Spark is best implemented in Scala as FP language, not sure though. The question I would like your good help with is are there any other considerations I need to think about when deciding this? are there any recommendations you can make in regards to this? Regards jk -- Emre Sevinc
Re: Spark + Kafka
Probably 1.3.0 - it has some improvements in the included Kafka receiver for streaming. https://spark.apache.org/releases/spark-release-1-3-0.html Regards, Jeff 2015-03-18 10:38 GMT+01:00 James King jakwebin...@gmail.com: Hi All, Which build of Spark is best when using Kafka? Regards jk
Re: GraphX: Get edges for a vertex
Hi Mas, I never actually worked with GraphX, but one idea: As far as I know, you can directly access the vertex and edge RDDs of your Graph object. Why not simply run a .filter() on the edge RDD to get all edges that originate from or end at your vertex? Regards, Jeff 2015-03-18 10:52 GMT+01:00 mas mas.ha...@gmail.com: Hi, Just to continue with the question. I need to find the edges of one particular vertex. However, (collectNeighbors/collectNeighborIds) provides the neighbors/neighborids for all the vertices of the graph. Any help in this regard will be highly appreciated. Thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Get-edges-for-a-vertex-tp18880p22115.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark + Kafka
What you call sub-category are packages pre-built to run on certain Hadoop environments. It really depends on where you want to run Spark. As far as I know, this is mainly about the included HDFS binding - so if you just want to play around with Spark, any of the packages should be fine. I wouldn't use source though, because you'd have to compile it yourself. PS: Make sure to use Reply to all. If you're not including the mailing list in the response, I'm the only one who will get your message. Regards, Jeff 2015-03-18 10:49 GMT+01:00 James King jakwebin...@gmail.com: Any sub-category recommendations hadoop, MapR, CDH? On Wed, Mar 18, 2015 at 10:48 AM, James King jakwebin...@gmail.com wrote: Many thanks Jeff will give it a go. On Wed, Mar 18, 2015 at 10:47 AM, Jeffrey Jedele jeffrey.jed...@gmail.com wrote: Probably 1.3.0 - it has some improvements in the included Kafka receiver for streaming. https://spark.apache.org/releases/spark-release-1-3-0.html Regards, Jeff 2015-03-18 10:38 GMT+01:00 James King jakwebin...@gmail.com: Hi All, Which build of Spark is best when using Kafka? Regards jk
IllegalAccessError in GraphX (Spark 1.3.0 LDA)
Hi all, I'm trying to use the new LDA in mllib, but when trying to train the model, I'm getting following error: java.lang.IllegalAccessError: tried to access class org.apache.spark.util.collection.Sorter from class org.apache.spark.graphx.impl.EdgePartitionBuilder at org.apache.spark.graphx.impl.EdgePartitionBuilder.toEdgePartition(EdgePartitionBuilder.scala:39) at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:109) Has anyone seen this yet and has an idea what might be the problem? It happens both with the provided sample data and with my own corpus. Full code + more stack below. Thx and Regards, Jeff Code: -- object LdaTest { def main(args: Array[String]) = { val conf = new SparkConf().setAppName(LDA).setMaster(local[4]) val sc = new SparkContext(conf) //val data = scala.io.Source.fromFile(/home/jeff/nmf_compare/scikit_v.txt).getLines().toList //val parsedData = data.map(s = Vectors.dense(s.trim().split( ).map(_.toDouble))) //val corpus = parsedData.zipWithIndex.map( t = (t._2.toLong, t._1) ) //val data = sc.textFile(/home/jeff/nmf_compare/scikit_v.txt) val data = sc.textFile(/home/jeff/Downloads/spark-1.3.0-bin-hadoop2.4/data/mllib/sample_lda_data.txt) val parsedData = data.map(s = Vectors.dense(s.trim().split( ).map(_.toDouble))) val corpus = parsedData.zipWithIndex.map(_.swap).cache() //val parCorpus = sc.parallelize(corpus) //println(parCorpus) val ldaModel = new LDA().setK(10).run(corpus) println(ldaModel) } } Stack: ... 15/03/17 09:48:50 INFO spark.CacheManager: Partition rdd_8_0 not found, computing it 15/03/17 09:48:50 INFO spark.CacheManager: Partition rdd_8_1 not found, computing it 15/03/17 09:48:50 INFO spark.CacheManager: Another thread is loading rdd_8_0, waiting for it to finish... 15/03/17 09:48:50 INFO storage.BlockManager: Found block rdd_4_0 locally 15/03/17 09:48:50 INFO spark.CacheManager: Partition rdd_4_1 not found, computing it 15/03/17 09:48:50 INFO spark.CacheManager: Another thread is loading rdd_8_1, waiting for it to finish... 15/03/17 09:48:50 INFO rdd.HadoopRDD: Input split: file:/home/jeff/Downloads/spark-1.3.0-bin-hadoop2.4/data/mllib/sample_lda_data.txt:132+132 15/03/17 09:48:50 INFO storage.MemoryStore: ensureFreeSpace(1048) called with curMem=47264, maxMem=1965104824 15/03/17 09:48:50 INFO spark.CacheManager: Finished waiting for rdd_8_0 15/03/17 09:48:50 ERROR executor.Executor: Exception in task 0.0 in stage 3.0 (TID 3) java.lang.IllegalAccessError: tried to access class org.apache.spark.util.collection.Sorter from class org.apache.spark.graphx.impl.EdgePartitionBuilder at org.apache.spark.graphx.impl.EdgePartitionBuilder.toEdgePartition(EdgePartitionBuilder.scala:39) at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:109) at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:104) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:609) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:609) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.graphx.EdgeRDD.compute(EdgeRDD.scala:49) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/03/17 09:48:50 INFO spark.CacheManager: Whoever was loading rdd_8_0 failed; we'll try it ourselves 15/03/17 09:48:50 INFO storage.MemoryStore: Block rdd_4_1 stored as values in memory (estimated size 1048.0 B, free 1874.0 MB) 15/03/17 09:48:50 INFO spark.CacheManager: Partition rdd_8_0 not found, computing it 15/03/17 09:48:50 INFO storage.BlockManagerInfo: Added rdd_4_1 in memory on 10.2.200.66:51465 (size: 1048.0 B, free: 1874.1 MB) 15/03/17 09:48:50 INFO storage.BlockManager: Found block rdd_4_0 locally 15/03/17 09:48:50 INFO
Re: IllegalAccessError in GraphX (Spark 1.3.0 LDA)
Hi Xiangrui, thank you a lot for the hint! I just tried on another machine with a clean project and there it worked like a charm. Will retry on the other machine tomorrow. Regards, Jeff 2015-03-17 19:57 GMT+01:00 Xiangrui Meng men...@gmail.com: Please check your classpath and make sure you don't have multiple Spark versions deployed. If the classpath looks correct, please create a JIRA for this issue. Thanks! -Xiangrui On Tue, Mar 17, 2015 at 2:03 AM, Jeffrey Jedele jeffrey.jed...@gmail.com wrote: Hi all, I'm trying to use the new LDA in mllib, but when trying to train the model, I'm getting following error: java.lang.IllegalAccessError: tried to access class org.apache.spark.util.collection.Sorter from class org.apache.spark.graphx.impl.EdgePartitionBuilder at org.apache.spark.graphx.impl.EdgePartitionBuilder.toEdgePartition(EdgePartitionBuilder.scala:39) at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:109) Has anyone seen this yet and has an idea what might be the problem? It happens both with the provided sample data and with my own corpus. Full code + more stack below. Thx and Regards, Jeff Code: -- object LdaTest { def main(args: Array[String]) = { val conf = new SparkConf().setAppName(LDA).setMaster(local[4]) val sc = new SparkContext(conf) //val data = scala.io.Source.fromFile(/home/jeff/nmf_compare/scikit_v.txt).getLines().toList //val parsedData = data.map(s = Vectors.dense(s.trim().split( ).map(_.toDouble))) //val corpus = parsedData.zipWithIndex.map( t = (t._2.toLong, t._1) ) //val data = sc.textFile(/home/jeff/nmf_compare/scikit_v.txt) val data = sc.textFile(/home/jeff/Downloads/spark-1.3.0-bin-hadoop2.4/data/mllib/sample_lda_data.txt) val parsedData = data.map(s = Vectors.dense(s.trim().split( ).map(_.toDouble))) val corpus = parsedData.zipWithIndex.map(_.swap).cache() //val parCorpus = sc.parallelize(corpus) //println(parCorpus) val ldaModel = new LDA().setK(10).run(corpus) println(ldaModel) } } Stack: ... 15/03/17 09:48:50 INFO spark.CacheManager: Partition rdd_8_0 not found, computing it 15/03/17 09:48:50 INFO spark.CacheManager: Partition rdd_8_1 not found, computing it 15/03/17 09:48:50 INFO spark.CacheManager: Another thread is loading rdd_8_0, waiting for it to finish... 15/03/17 09:48:50 INFO storage.BlockManager: Found block rdd_4_0 locally 15/03/17 09:48:50 INFO spark.CacheManager: Partition rdd_4_1 not found, computing it 15/03/17 09:48:50 INFO spark.CacheManager: Another thread is loading rdd_8_1, waiting for it to finish... 15/03/17 09:48:50 INFO rdd.HadoopRDD: Input split: file:/home/jeff/Downloads/spark-1.3.0-bin-hadoop2.4/data/mllib/sample_lda_data.txt:132+132 15/03/17 09:48:50 INFO storage.MemoryStore: ensureFreeSpace(1048) called with curMem=47264, maxMem=1965104824 15/03/17 09:48:50 INFO spark.CacheManager: Finished waiting for rdd_8_0 15/03/17 09:48:50 ERROR executor.Executor: Exception in task 0.0 in stage 3.0 (TID 3) java.lang.IllegalAccessError: tried to access class org.apache.spark.util.collection.Sorter from class org.apache.spark.graphx.impl.EdgePartitionBuilder at org.apache.spark.graphx.impl.EdgePartitionBuilder.toEdgePartition(EdgePartitionBuilder.scala:39) at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:109) at org.apache.spark.graphx.EdgeRDD$$anonfun$1.apply(EdgeRDD.scala:104) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:609) at org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:609) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.graphx.EdgeRDD.compute(EdgeRDD.scala:49) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177
Re: KafkaUtils and specifying a specific partition
Hi Colin, my understanding is that this is currently not possible with KafkaUtils. You would have to write a custom receiver using Kafka's SimpleConsumer API. https://spark.apache.org/docs/1.2.0/streaming-custom-receivers.html https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example Regards, Jeff 2015-03-12 17:58 GMT+01:00 ColinMc colin.mcqu...@shiftenergy.com: Hi, How do you use KafkaUtils to specify a specific partition? I'm writing customer Marathon jobs where a customer is given 1 partition in a topic in Kafka. The job will get the partition from our database for that customer and use that to get the messages for that customer. I misinterpreted KafkaUtils when creating the stream and didn't know that it was the number of partitions per topic in the map. If KafkaUtils doesn't support this, is there another Spark API call for Kafka that supports this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KafkaUtils-and-specifying-a-specific-partition-tp22018.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to use the TF-IDF model?
Hi, well, it really depends on what you want to do ;) TF-IDF is a measure that originates in the information retrieval context and that can be used to judge the relevancy of a document in context of a given search term. It's also often used for text-related machine learning tasks. E.g. have a look at topic extraction using non-negative matrix factorization. Regards, Jeff 2015-03-09 7:39 GMT+01:00 Xi Shen davidshe...@gmail.com: Hi, I read this page, http://spark.apache.org/docs/1.2.0/mllib-feature-extraction.html. But I am wondering, how to use this TF-IDF RDD? What is this TF-IDF vector looks like? Can someone provide me some guide? Thanks, [image: --] Xi Shen [image: http://]about.me/davidshen http://about.me/davidshen?promo=email_sig http://about.me/davidshen
Re: Augment more data to existing MatrixFactorization Model?
Hey Anish, machine learning models that are updated with incoming data are commonly known as online learning systems. Matrix factorization is one way to implement recommender systems, but not the only one. There are papers about how to do online matrix factorization, but you will likely have to implement this on your own. Have a look at: http://en.wikipedia.org/wiki/Recommender_system www0.cs.ucl.ac.uk/staff/l.capra/publications/seams11-vale.pdf Regards, Jeff 2015-02-26 19:40 GMT+01:00 anishm anish.mashan...@gmail.com: I am a beginner to the world of Machine Learning and the usage of Apache Spark. I have followed the tutorial at https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.html#augmenting-matrix-factors https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.html#augmenting-matrix-factors , and was succesfully able to develop the application. Now, as it is required that today's web application need to be powered by real time recommendations. I would like my model to be ready for new data that keeps coming on the server. The site has quoted: * A better way to get the recommendations for you is training a matrix factorization model first and then augmenting the model using your ratings.* How do I do that? I am using Python to develop my application. Also, please tell me how do I persist the model to use it again, or an idea how do I interface this with a web service. Thanking you, Anish Mashankar A Data Science Enthusiast -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Augment-more-data-to-existing-MatrixFactorization-Model-tp21830.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to map and filter in one step?
Hi, we are using RDD#mapPartitions() to achieve the same. Are there advantages/disadvantages of using one method over the other? Regards, Jeff 2015-02-26 20:02 GMT+01:00 Mark Hamstra m...@clearstorydata.com: rdd.map(foo).filter(bar) and rdd.filter(bar).map(foo) will each already be pipelined into a single stage, so there generally isn't any need to complect the map and filter into a single function. Additionally, there is RDD#collect[U](f: PartialFunction[T, U])(implicit arg0: ClassTag[U]): RDD[U], which only applies the partial function to those elements of the RDD for which f is defined. On Thu, Feb 26, 2015 at 10:49 AM, Crystal Xing crystalxin...@gmail.com wrote: I see. The reason we can use flatmap to map to null but not using map to map to null is because flatmap supports map to zero and more but map only support 1-1 mapping? It seems Flatmap is more equivalent to haddop's map. Thanks, Zheng zhen On Thu, Feb 26, 2015 at 10:44 AM, Sean Owen so...@cloudera.com wrote: You can flatMap: rdd.flatMap { in = if (condition(in)) { Some(transformation(in)) } else { None } } On Thu, Feb 26, 2015 at 6:39 PM, Crystal Xing crystalxin...@gmail.com wrote: Hi, I have a text file input and I want to parse line by line and map each line to another format. But at the same time, I want to filter out some lines I do not need. I wonder if there is a way to filter out those lines in the map function. Do I have to do two steps filter and map? In that way, I have to scan and parse the lines twice in order to filter and map. If I map those unwanted line to null and filter out null, will that work? never tried yet. Thanks, Zheng zheng
Re: spark streaming, batchinterval,windowinterval and window sliding interval difference
If you read the streaming programming guide, you'll notice that Spark does not do real streaming but emulates it with a so-called mini-batching approach. Let's say you want to work with a continuous stream of incoming events from a computing centre: Batch interval: That's the basic heartbeat of your streaming application. If you set this to 1 second, Spark will create a RDD every second containing the events of that second. That's your mini-batch of data. Windowing: That's a way to do aggregations on your streaming data. Let's say you want to have a summary of how many warnings your system produced in the last hour. Then you would use a windowed reduce with a window size of 1h. Sliding: This tells Spark how often to perform your windowed operation. If you would set this to 1h as well, you would aggregate your data stream to consecutive 1h windows of data - no overlap. You could also tell spark to create your 1h aggregation 2 times a day only by setting the sliding interval to 12h. Or you could tell Spark to create a 1h aggregation every 30 min. Then each data window overlaps with the previous window of course. I recommend to carefully read the programming guide- it explains these concepts pretty well. https://spark.apache.org/docs/latest/streaming-programming-guide.html Regards, Jeff 2015-02-26 18:51 GMT+01:00 Hafiz Mujadid hafizmujadi...@gmail.com: Can somebody explain the difference between batchinterval,windowinterval and window sliding interval with example. If there is any real time use case of using these parameters? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-batchinterval-windowinterval-and-window-sliding-interval-difference-tp21829.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkStreaming failing with exception Could not compute split, block input
I don't have an idea, but perhaps a little more context would be helpful. What is the source of your streaming data? What's the storage level you're using? What are you doing? Some kind of windows operations? Regards, Jeff 2015-02-26 18:59 GMT+01:00 Mukesh Jha me.mukesh@gmail.com: On Wed, Feb 25, 2015 at 8:09 PM, Mukesh Jha me.mukesh@gmail.com wrote: My application runs fine for ~3/4 hours and then hits this issue. On Wed, Feb 25, 2015 at 11:34 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hi Experts, My Spark Job is failing with below error. From the logs I can see that input-3-1424842351600 was added at 5:32:32 and was never purged out of memory. Also the available free memory for the executor is *2.1G*. Please help me figure out why executors cannot fetch this input. Txz for any help, Cheers. *Logs* 15/02/25 05:32:32 INFO storage.BlockManagerInfo: Added input-3-1424842351600 in memory on chsnmphbase31.usdc2.oraclecloud.com:50208 (size: 276.1 KB, free: 2.1 GB) . . 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added input-1-1424842362600 in memory on chsnmphbase30.usdc2.cloud.com:35919 (size: 232.3 KB, free: 2.1 GB) 15/02/25 05:32:43 INFO storage.BlockManagerInfo: Added input-4-1424842363000 in memory on chsnmphbase23.usdc2.cloud.com:37751 (size: 291.4 KB, free: 2.1 GB) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 32.1 in stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 37.1 in stage 451.0 (TID 22512, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 31.1 in stage 451.0 (TID 22513, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 34.1 in stage 451.0 (TID 22514, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 36.1 in stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 39.1 in stage 451.0 (TID 22516, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 30.1 in stage 451.0 (TID 22517, chsnmphbase30.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 33.1 in stage 451.0 (TID 22518, chsnmphbase26.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 35.1 in stage 451.0 (TID 22519, chsnmphbase19.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 INFO scheduler.TaskSetManager: Starting task 38.1 in stage 451.0 (TID 22520, chsnmphbase23.usdc2.cloud.com, RACK_LOCAL, 1288 bytes) 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 32.1 in stage 451.0 (TID 22511, chsnmphbase19.usdc2.cloud.com): java.lang.Exception: Could not compute split, block input-3-1424842351600 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/02/25 05:32:43 WARN scheduler.TaskSetManager: Lost task 36.1 in stage 451.0 (TID 22515, chsnmphbase19.usdc2.cloud.com): java.lang.Exception: Could not compute split, block input-3-1424842355600 not found at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com* -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com* -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Re: Considering Spark for large data elements
Hi Rob, I fear your questions will be hard to answer without additional information about what kind of simulations you plan to do. int[r][c] basically means you have a matrix of integers? You could for example map this to a row-oriented RDD of integer-arrays or to a column oriented RDD of integer arrays. What the better option is will heavily depend on your workload. Also have a look at the algebaraic data-structures that come with mllib ( https://spark.apache.org/docs/1.1.0/api/scala/index.html#org.apache.spark.mllib.linalg.Vectors ). Regards, Jeff 2015-02-25 23:58 GMT+01:00 Rob Sargent rob.sarg...@utah.edu: I have an application which might benefit from Sparks distribution/analysis, but I'm worried about the size and structure of my data set. I need to perform several thousand simulation on a rather large data set and I need access to all the generated simulations. The data element is largely in int[r][c] where r is 100 to 1000 and c is 20-80K (there's more but that array is the bulk of the problem. I have machines and memory capable of doing 6-10 simulations simultaneously in separate jvms. Is this data structure compatible with Sparks RDD notion? If yes, I will have a slough of how-to-get-started questions, the first of which is how to seed the run? My thinking is to use org.apache.spark.api.java.FlatMapFunction starting with an EmptyRDD and the seed data. Would that be the way to go? Thanks
Re: spark streaming: stderr does not roll
So the summarize (I had a similar question): Spark's log4j per default is configured to log to the console? Those messages end up in the stderr files and the approach does not support rolling? If I configure log4j to log to files, how can I keep the folder structure? Should I use relative paths and assume that those end up in the same folders the stderr files do? Regards, Jeff 2015-02-25 9:35 GMT+01:00 Sean Owen so...@cloudera.com: These settings don't control what happens to stderr, right? stderr is up to the process that invoked the driver to control. You may wish to configure log4j to log to files instead. On Wed, Nov 12, 2014 at 8:15 PM, Nguyen, Duc duc.ngu...@pearson.com wrote: I've also tried setting the aforementioned properties using System.setProperty() as well as on the command line while submitting the job using --conf key=value. All to no success. When I go to the Spark UI and click on that particular streaming job and then the Environment tab, I can see the properties are correctly set. But regardless of what I've tried, the stderr log file on the worker nodes does not roll and continues to grow...leading to a crash of the cluster once it claims 100% of disk. Has anyone else encountered this? Anyone? On Fri, Nov 7, 2014 at 3:35 PM, Nguyen, Duc duc.ngu...@pearson.com wrote: We are running spark streaming jobs (version 1.1.0). After a sufficient amount of time, the stderr file grows until the disk is full at 100% and crashes the cluster. I've read this https://github.com/apache/spark/pull/895 and also read this http://spark.apache.org/docs/latest/configuration.html#spark-streaming So I've tried testing with this in an attempt to get the stderr log file to roll. sparkConf.set(spark.executor.logs.rolling.strategy, size) .set(spark.executor.logs.rolling.size.maxBytes, 1024) .set(spark.executor.logs.rolling.maxRetainedFiles, 3) Yet it does not roll and continues to grow. Am I missing something obvious? thanks, Duc - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to efficiently control concurrent Spark jobs
So basically you have lots of small ML tasks you want to run concurrently? With I've used repartition and cache to store the sub-datasets on only one machine you mean that you reduced each RDD to have one partition only? Maybe you want to give the fair scheduler a try to get more of your tasks executing concurrently. Just an idea... Regards, Jeff 2015-02-25 12:06 GMT+01:00 Staffan staffan.arvids...@gmail.com: Hi, Is there a good way (recommended way) to control and run multiple Spark jobs within the same application? My application is like follows; 1) Run one Spark job on a 'ful' dataset, which then creates a few thousands of RDDs containing sub-datasets from the complete dataset. Each of the sub-datasets are independent from the others (the 'ful' dataset is simply a dump from a database containing several different types of records). 2) Run some filtration and manipulations on each of the RDD and finally do some ML on the data. (Each of the created RDD's from step 1) is completely independent so this should be run concurrently). I've implemented this by using Scala Futures and executing the Spark jobs in 2) from a separate thread for each RDD. This works and improves runtime compared to a naive for-loop over step 2). Scaling is however not as good as I would expect it to be. (28 minutes for 4 cores on 1 machine - 19 minutes for 12 cores on 3 machines). Each of the sub-datasets are fairly small so I've used 'repartition' and 'cache' to store the sub-datasets on only one machine in step 1), this improved runtime a few %. So, either do anyone have a suggestion of how to do this in a better way or perhaps if there a higher level workflow tool that I can use on top of Spark? (The cool solution would have been to use nestled RDDs and just map over them in a high level way, but as this is not supported afaik). Thanks! Staffan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-efficiently-control-concurrent-Spark-jobs-tp21800.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Number of Executors per worker process
Hi Spico, Yes, I think an executor core in Spark is basically a thread in a worker pool. It's recommended to have one executor core per physical core on your machine for best performance, but I think in theory you can create as many threads as your OS allows. For deployment: There seems to be the actual worker JVM which coordinates the work on a worker node. I don't think the actual thread pool lives in there, but a separate JVM is created for each application that has cores allocated on the node. Otherwise it would be rather hard to impose memory limits on application level and it would have serious disadvantages regarding stability. You can check this behavior by looing at the processes on your machine: ps aux | grep spark.deploy = will show master, worker (coordinator) and driver JVMs ps aux | grep spark.executor = will show the actual worker JVMs 2015-02-25 14:23 GMT+01:00 Spico Florin spicoflo...@gmail.com: Hello! I've read the documentation about the spark architecture, I have the following questions: 1: how many executors can be on a single worker process (JMV)? 2:Should I think executor like a Java Thread Executor where the pool size is equal with the number of the given cores (set up by the SPARK_WORKER_CORES)? 3. If the worker can have many executors, how this is handled by the Spark? Or can I handle by myself to set up the number of executors per JVM? I look forward for your answers. Regards, Florin
Re: Re: Many Receiver vs. Many threads per Receiver
As I understand the matter: Option 1) has benefits when you think that your network bandwidth may be a bottle neck, because Spark opens several network connections on possibly several different physical machines. Option 2) - as you already pointed out - has the benefit that you occupy less worker cores with receiver tasks. Regards, Jeff 2015-02-26 9:38 GMT+01:00 bit1...@163.com bit1...@163.com: Sure, Thanks Tathagata! -- bit1...@163.com *From:* Tathagata Das t...@databricks.com *Date:* 2015-02-26 14:47 *To:* bit1...@163.com *CC:* Akhil Das ak...@sigmoidanalytics.com; user user@spark.apache.org *Subject:* Re: Re: Many Receiver vs. Many threads per Receiver Spark Streaming has a new Kafka direct stream, to be release as experimental feature with 1.3. That uses a low level consumer. Not sure if it satisfies your purpose. If you want more control, its best to create your own Receiver with the low level Kafka API. TD On Tue, Feb 24, 2015 at 12:09 AM, bit1...@163.com bit1...@163.com wrote: Thanks Akhil. Not sure whether thelowlevel consumer. https://github.com/dibbhatt/kafka-spark-consumerwill be officially supported by Spark Streaming. So far, I don't see it mentioned/documented in the spark streaming programming guide. -- bit1...@163.com *From:* Akhil Das ak...@sigmoidanalytics.com *Date:* 2015-02-24 16:21 *To:* bit1...@163.com *CC:* user user@spark.apache.org *Subject:* Re: Many Receiver vs. Many threads per Receiver I believe when you go with 1, it will distribute the consumer across your cluster (possibly on 6 machines), but still it i don't see a away to tell from which partition it will consume etc. If you are looking to have a consumer where you can specify the partition details and all, then you are better off with the lowlevel consumer. https://github.com/dibbhatt/kafka-spark-consumer Thanks Best Regards On Tue, Feb 24, 2015 at 9:36 AM, bit1...@163.com bit1...@163.com wrote: Hi, I am experimenting Spark Streaming and Kafka Integration, To read messages from Kafka in parallel, basically there are two ways 1. Create many Receivers like (1 to 6).map(_ = KakfaUtils.createStream). 2. Specifiy many threads when calling KakfaUtils.createStream like val topicMap(myTopic=6), this will create one receiver with 6 reading threads. My question is which option is better, sounds option 2 is better is to me because it saves a lot of cores(one Receiver one core), but I learned from somewhere else that choice 1 is better, so I would ask and see how you guys elaborate on this. Thank -- bit1...@163.com