Largest input data set observed for Spark.
All, What is the largest input data set y'all have come across that has been successfully processed in production using spark. Ball park?
Re: Relation between DStream and RDDs
If I may add my contribution to this discussion if I understand well your question... DStream is discretized stream. It discretized the data stream over windows of time (according to the project code I've read and paper too). so when you write: JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(60 * 60 * 1000)); //1 hour It means you are discretizing over a 1h window. Each batch so each RDD of the dstream will collect data for 1h before going to next RDD. So if you want to have more RDD, you should reduce batch size/duration... Pascal On Thu, Mar 20, 2014 at 7:51 AM, Tathagata Das tathagata.das1...@gmail.comwrote: That is a good question. If I understand correctly, you need multiple RDDs from a DStream in *every batch*. Can you elaborate on why do you need multiple RDDs every batch? TD On Wed, Mar 19, 2014 at 10:20 PM, Sanjay Awatramani sanjay_a...@yahoo.com wrote: Hi, As I understand, a DStream consists of 1 or more RDDs. And foreachRDD will run a given func on each and every RDD inside a DStream. I created a simple program which reads log files from a folder every hour: JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(60 * 60 * 1000)); //1 hour JavaDStreamString obj = stcObj.textFileStream(/Users/path/to/Input); When the interval is reached, Spark reads all the files and creates one and only one RDD (as i verified from a sysout inside foreachRDD). The streaming doc at a lot of places gives an indication that many operations (e.g. flatMap) on a DStream are applied individually to a RDD and the resulting DStream consists of the mapped RDDs in the same number as the input DStream. ref: https://spark.apache.org/docs/latest/streaming-programming-guide.html#dstreams If that is the case, how can i generate a scenario where in I have multiple RDDs inside a DStream in my example ? Regards, Sanjay
Re: Relation between DStream and RDDs
@TD: I do not need multiple RDDs in a DStream in every batch. On the contrary my logic would work fine if there is only 1 RDD. But then the description for functions like reduce count (Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream.) left me confused whether I should account for the fact that a DStream can have multiple RDDs. My streaming code processes a batch every hour. In the 2nd batch, i checked that the DStream contains only 1 RDD, i.e. the 2nd batch's RDD. I verified this using sysout in foreachRDD. Does that mean that the DStream will always contain only 1 RDD ? Is there a way to access the RDD of the 1st batch in the 2nd batch ? The 1st batch may contain some records which were not relevant to the first batch and are to be processed in the 2nd batch. I know i can use the sliding window mechanism of streaming, but if i'm not using it and there is no way to access the previous batch's RDD, then it means that functions like count will always return a DStream containing only 1 RDD, am i correct ? @Pascal, yes your answer resolves my question partially, but the other part of the question(which i've clarified in above paragraph) still remains. Thanks for your answers ! Regards, Sanjay On Thursday, 20 March 2014 1:27 PM, Pascal Voitot Dev pascal.voitot@gmail.com wrote: If I may add my contribution to this discussion if I understand well your question... DStream is discretized stream. It discretized the data stream over windows of time (according to the project code I've read and paper too). so when you write: JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(60 * 60 * 1000)); //1 hour It means you are discretizing over a 1h window. Each batch so each RDD of the dstream will collect data for 1h before going to next RDD. So if you want to have more RDD, you should reduce batch size/duration... Pascal On Thu, Mar 20, 2014 at 7:51 AM, Tathagata Das tathagata.das1...@gmail.com wrote: That is a good question. If I understand correctly, you need multiple RDDs from a DStream in *every batch*. Can you elaborate on why do you need multiple RDDs every batch? TD On Wed, Mar 19, 2014 at 10:20 PM, Sanjay Awatramani sanjay_a...@yahoo.com wrote: Hi, As I understand, a DStream consists of 1 or more RDDs. And foreachRDD will run a given func on each and every RDD inside a DStream. I created a simple program which reads log files from a folder every hour: JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(60 * 60 * 1000)); //1 hour JavaDStreamString obj = stcObj.textFileStream(/Users/path/to/Input); When the interval is reached, Spark reads all the files and creates one and only one RDD (as i verified from a sysout inside foreachRDD). The streaming doc at a lot of places gives an indication that many operations (e.g. flatMap) on a DStream are applied individually to a RDD and the resulting DStream consists of the mapped RDDs in the same number as the input DStream. ref: https://spark.apache.org/docs/latest/streaming-programming-guide.html#dstreams If that is the case, how can i generate a scenario where in I have multiple RDDs inside a DStream in my example ? Regards, Sanjay
Error while reading from HDFS Simple application
VerifyError: class org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$CreateSnapshotRequestProto overrides final method getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet; What can be cause of this error? Regards, Laeeq Ahmed, PhD Student, HPCViz, KTH. http://laeeprofile.weebly.com
Re: Spark worker threads waiting
This is what the web UI looks like: [image: Inline image 1] I also tail all the worker logs and theese are the last entries before the waiting begins: 14/03/20 13:29:10 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, minRequest: 10066329 14/03/20 13:29:10 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 29853 non-zero-bytes blocks out of 37714 blocks 14/03/20 13:29:10 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 5 remote gets in 62 ms [PSYoungGen: 12464967K-3767331K(10552192K)] 36074093K-29053085K(44805696K), 0.6765460 secs] [Times: user=5.35 sys=0.02, real=0.67 secs] [PSYoungGen: 10779466K-3203826K(9806400K)] 35384386K-31562169K(44059904K), 0.6925730 secs] [Times: user=5.47 sys=0.00, real=0.70 secs] From the screenshot above you can see that task take ~ 6 minutes to complete. The amount of time it takes the tasks to complete seems to depend on the amount of input data. If s3 input string captures 2.5 times less data (less data to shuffle write and later read), same tasks take 1 minute. Any idea how to debug what the workers are doing? Domen On Wed, Mar 19, 2014 at 5:27 PM, Mayur Rustagi [via Apache Spark User List] ml-node+s1001560n2882...@n3.nabble.com wrote: You could have some outlier task that is preventing the next set of stages from launching. Can you check out stages state in the Spark WebUI, is any task running or is everything halted. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Mar 19, 2014 at 5:40 AM, Domen Grabec [hidden email]http://user/SendEmail.jtp?type=nodenode=2882i=0 wrote: Hi, I have a cluster with 16 nodes, each node has 69Gb ram (50GB goes to spark) and 8 cores running spark 0.8.1. I have a groupByKey operation that causes a wide RDD dependency so shuffle write and shuffle read are performed. For some reason all worker threads seem to sleep for about 3-4 minutes each time performing a shuffle read and completing a set of tasks. See graphs below how no resources are being utilized in specific time windows. Each time 3-4 minutes pass, a next set of tasks are being grabbed and processed, and then another waiting period happens. Each task has an input of 80Mb +- 5Mb data to shuffle read. [image: Inline image 1] Here http://pastebin.com/UHWMdTRY is a link to thread dump performed in the middle of the waiting period. Any idea what could cause the long waits? Kind regards, Domen -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-worker-threads-waiting-tp2859p2882.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click herehttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=ZG9tZW5AY2VsdHJhLmNvbXwxfC01NjUwMzk2ODU= . NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml Stages.png (204K) http://apache-spark-user-list.1001560.n3.nabble.com/attachment/2938/0/Stages.png -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-worker-threads-waiting-tp2859p2938.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Accessing the reduce key
Why are you trying to reducebyKey? Are you looking to work on the data sequentially. If I understand correctly you are looking to filter your data using the bloom filter each bloom filter is tied to which key is instantiating it. Following are some of the options *partiition* your data by key use mappartition operator to run function on partition independently. The same function will be applied to each partition. If your bloomfilter is large then you can bundle all of them in as a broadcast variable use it to apply the transformation on your data using a simple map operation, basically you are looking up the right bloom filter on each key applying the filter on it, again here if unserializing bloom filter is time consuming then you can partition the data on key then use the broadcast variable to look up the bloom filter for each key apply filter on all data in serial. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Mar 20, 2014 at 1:55 PM, Surendranauth Hiraman suren.hira...@velos.io wrote: We ended up going with: map() - set the group_id as the key in a Tuple reduceByKey() - end up with (K,Seq[V]) map() - create the bloom filter and loop through the Seq and persist the Bloom filter This seems to be fine. I guess Spark cannot optimize the reduceByKey and map steps to occur together since the fact that we are looping through the Seq is out of Spark's control. -Suren On Thu, Mar 20, 2014 at 9:48 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: Hi, My team is trying to replicate an existing Map/Reduce process in Spark. Basically, we are creating Bloom Filters for quick set membership tests within our processing pipeline. We have a single column (call it group_id) that we use to partition into sets. As you would expect, in the map phase, we emit the group_id as the key and in the reduce phase, we instantiate the Bloom Filter for a given key in the setup() method and persist that Bloom Filter in the cleanup() method. In Spark, we can do something similar with map() and reduceByKey() but we have the following questions. 1. Accessing the reduce key In reduceByKey(), how do we get access to the specific key within the reduce function? 2. Equivalent of setup/cleanup Where should we instantiate and persist each Bloom Filter by key? In the driver and then pass in the references to the reduce function? But if so, how does the reduce function know which set's Bloom Filter it should be writing to (question 1 above)? It seems if we use groupByKey and then reduceByKey, that gives us access to all of the values at one go. I assume there, Spark will manage if those values all don't fit in memory in one go. SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: sort order after reduceByKey / groupByKey
Thats expected. I think sortByKey is option too probably a better one. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Mar 20, 2014 at 3:20 PM, Ameet Kini ameetk...@gmail.com wrote: val rdd2 = rdd.partitionBy(my partitioner).reduceByKey(some function) I see that rdd2's partitions are not internally sorted. Can someone confirm that this is expected behavior? And if so, the only way to get partitions internally sorted is to follow it with something like this val rdd2 = rdd.partitionBy(my partitioner).reduceByKey(some function).mapPartitions(p = sort(p)) Thanks, Ameet
Re: sort order after reduceByKey / groupByKey
I saw that but I don't need a global sort, only intra-partition sort. Ameet On Thu, Mar 20, 2014 at 3:26 PM, Mayur Rustagi mayur.rust...@gmail.comwrote: Thats expected. I think sortByKey is option too probably a better one. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Mar 20, 2014 at 3:20 PM, Ameet Kini ameetk...@gmail.com wrote: val rdd2 = rdd.partitionBy(my partitioner).reduceByKey(some function) I see that rdd2's partitions are not internally sorted. Can someone confirm that this is expected behavior? And if so, the only way to get partitions internally sorted is to follow it with something like this val rdd2 = rdd.partitionBy(my partitioner).reduceByKey(some function).mapPartitions(p = sort(p)) Thanks, Ameet
Re: in SF until Friday
Would love to .. but I am in NY till Friday :( Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Mar 19, 2014 at 7:34 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: I'm in San Francisco until Friday for a conference (visiting from Boston). If any of y'all are up for a drink or something, I'd love to meet you in person and say hi. Nick -- View this message in context: in SF until Fridayhttp://apache-spark-user-list.1001560.n3.nabble.com/in-SF-until-Friday-tp2900.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.
Re: Accessing the reduce key
Mayur, To be a little clearer, for creating the Bloom Filters, I don't think broadcast variables are the way to go, though definitely that would work for using the Bloom Filters to filter data. The reason why is that the creation needs to happen in a single thread. Otherwise, some type of locking/distributed locking is needed on the individual Bloom Filter itself, with performance impact. Agreed? -Suren On Thu, Mar 20, 2014 at 3:40 PM, Surendranauth Hiraman suren.hira...@velos.io wrote: Mayur, Thanks. This step is for creating the Bloom Filter, not using it to filter data, actually. But your answer still stands. Partitioning by key, having the bloom filters as a broadcast variable and then doing mappartition makes sense. Are there performance implications for this approach, such as with using the broadcast variable, versus the approach we used, in which the Bloom Filter (again, for creating it) is only referenced by the single map application? -Suren On Thu, Mar 20, 2014 at 3:20 PM, Mayur Rustagi mayur.rust...@gmail.comwrote: Why are you trying to reducebyKey? Are you looking to work on the data sequentially. If I understand correctly you are looking to filter your data using the bloom filter each bloom filter is tied to which key is instantiating it. Following are some of the options *partiition* your data by key use mappartition operator to run function on partition independently. The same function will be applied to each partition. If your bloomfilter is large then you can bundle all of them in as a broadcast variable use it to apply the transformation on your data using a simple map operation, basically you are looking up the right bloom filter on each key applying the filter on it, again here if unserializing bloom filter is time consuming then you can partition the data on key then use the broadcast variable to look up the bloom filter for each key apply filter on all data in serial. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Mar 20, 2014 at 1:55 PM, Surendranauth Hiraman suren.hira...@velos.io wrote: We ended up going with: map() - set the group_id as the key in a Tuple reduceByKey() - end up with (K,Seq[V]) map() - create the bloom filter and loop through the Seq and persist the Bloom filter This seems to be fine. I guess Spark cannot optimize the reduceByKey and map steps to occur together since the fact that we are looping through the Seq is out of Spark's control. -Suren On Thu, Mar 20, 2014 at 9:48 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: Hi, My team is trying to replicate an existing Map/Reduce process in Spark. Basically, we are creating Bloom Filters for quick set membership tests within our processing pipeline. We have a single column (call it group_id) that we use to partition into sets. As you would expect, in the map phase, we emit the group_id as the key and in the reduce phase, we instantiate the Bloom Filter for a given key in the setup() method and persist that Bloom Filter in the cleanup() method. In Spark, we can do something similar with map() and reduceByKey() but we have the following questions. 1. Accessing the reduce key In reduceByKey(), how do we get access to the specific key within the reduce function? 2. Equivalent of setup/cleanup Where should we instantiate and persist each Bloom Filter by key? In the driver and then pass in the references to the reduce function? But if so, how does the reduce function know which set's Bloom Filter it should be writing to (question 1 above)? It seems if we use groupByKey and then reduceByKey, that gives us access to all of the values at one go. I assume there, Spark will manage if those values all don't fit in memory in one go. SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: Hadoop streaming like feature for Spark
Thank you Ewen. RDD.pipe is what I need and it works like a charm. On the other side RDD.mapPartitions seems to be interesting but I can't figure out how to make it work. Jaonary On Thu, Mar 20, 2014 at 4:54 PM, Ewen Cheslack-Postava m...@ewencp.orgwrote: Take a look at RDD.pipe(). You could also accomplish the same thing using RDD.mapPartitions, which you pass a function that processes the iterator for each partition rather than processing each element individually. This lets you only start up as many processes as there are partitions, pipe the contents of each iterator to them, then collect the output. This might be useful if, e.g., your external process doesn't use line-oriented input/output. -Ewen Jaonary Rabarisoa jaon...@gmail.com March 20, 2014 at 1:04 AM Dear all, Dear all, Does Spark has a kind of Hadoop streaming feature to run external process to manipulate data from RDD sent through stdin and stdout ? Best, Jaonary inline: compose-unknown-contact.jpg
Re: Largest input data set observed for Spark.
Hi Reynold, Nice! What spark configuration parameters did you use to get your job to run successfully on a large dataset? My job is failing on 1TB of input data (uncompressed) on a 4-node cluster (64GB memory per node). No OutOfMemory errors just lost executors. Thanks, Soila On Mar 20, 2014 11:29 AM, Reynold Xin r...@databricks.com wrote: I'm not really at liberty to discuss details of the job. It involves some expensive aggregated statistics, and took 10 hours to complete (mostly bottlenecked by network io). On Thu, Mar 20, 2014 at 11:12 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: Reynold, How complex was that job (I guess in terms of number of transforms and actions) and how long did that take to process? -Suren On Thu, Mar 20, 2014 at 2:08 PM, Reynold Xin r...@databricks.com wrote: Actually we just ran a job with 70TB+ compressed data on 28 worker nodes - I didn't count the size of the uncompressed data, but I am guessing it is somewhere between 200TB to 700TB. On Thu, Mar 20, 2014 at 12:23 AM, Usman Ghani us...@platfora.com wrote: All, What is the largest input data set y'all have come across that has been successfully processed in production using spark. Ball park? -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io
Re: Machine Learning on streaming data
Thanks TD, happy to share my experience with MLLib + Spark Streaming integration. Here's a gist with two examples I have working, one for StreamingLinearRegression and another for StreamingKMeans. https://gist.github.com/freeman-lab/9672685 The goal in each case was to implement a streaming version of the algorithm, using as much as possible directly from MLLib. For Linear Regression this was straightforward, because the MLLib version already uses a (stochastic) update rule, which I just use to update the model inside a foreachRDD(), using each new batch of data. For KMeans, I used the model class from MLLib, but extended it to keep a running count for each cluster. I also had to re-implement a chunk of the core algorithm in the form of an update rule. Tighter integration in this case would, I think, require refactoring some of MLLib (e.g. to use something like this update function), but this works fine. One unresolved issue: for these kinds of algorithms, the dimensionality of the data must be known in advance. Would be cool to automatically detect it based on the first record. -- Jeremy On Mar 19, 2014, at 9:03 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Yes, of course you can conceptually apply machine learning algorithm on Spark Streaming. However the current MLLib does not yet have direct support for Spark Streaming's DStream. However, since DStreams are essentially a sequence of RDDs, you can apply MLLib algorithms on those RDDs. Take a look at DStream.transform() and DStream.foreachRDD() operations, which allows you access RDDs in a DStream. You can apply MLLib functions on them. Some people have attempted to make a tighter integration between MLLib and Spark Streaming. Jeremy (cc'ed) can say more about his adventures. TD On Sun, Mar 16, 2014 at 5:56 PM, Nasir Khan nasirkhan.onl...@gmail.com wrote: hi, I m into a project in which i have to get streaming URL's and Filter it and classify it as benin or suspicious. Now Machine Learning and Streaming are two separate things in apache spark (AFAIK). my Question is Can we apply Online Machine Learning Algorithms on Streams?? I am at Beginner Level, Kindly Explain in abit detail and if some one can direct me to some good material for me will be greats. Thanks Nasir Khan. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Machine-Learning-on-streaming-data-tp2732.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Accessing the reduce key
Grouped by the group_id but not sorted. -Suren On Thu, Mar 20, 2014 at 5:52 PM, Mayur Rustagi mayur.rust...@gmail.comwrote: You are using the data grouped (sorted?) To create the bloom filter ? On Mar 20, 2014 4:35 PM, Surendranauth Hiraman suren.hira...@velos.io wrote: Mayur, To be a little clearer, for creating the Bloom Filters, I don't think broadcast variables are the way to go, though definitely that would work for using the Bloom Filters to filter data. The reason why is that the creation needs to happen in a single thread. Otherwise, some type of locking/distributed locking is needed on the individual Bloom Filter itself, with performance impact. Agreed? -Suren On Thu, Mar 20, 2014 at 3:40 PM, Surendranauth Hiraman suren.hira...@velos.io wrote: Mayur, Thanks. This step is for creating the Bloom Filter, not using it to filter data, actually. But your answer still stands. Partitioning by key, having the bloom filters as a broadcast variable and then doing mappartition makes sense. Are there performance implications for this approach, such as with using the broadcast variable, versus the approach we used, in which the Bloom Filter (again, for creating it) is only referenced by the single map application? -Suren On Thu, Mar 20, 2014 at 3:20 PM, Mayur Rustagi mayur.rust...@gmail.comwrote: Why are you trying to reducebyKey? Are you looking to work on the data sequentially. If I understand correctly you are looking to filter your data using the bloom filter each bloom filter is tied to which key is instantiating it. Following are some of the options *partiition* your data by key use mappartition operator to run function on partition independently. The same function will be applied to each partition. If your bloomfilter is large then you can bundle all of them in as a broadcast variable use it to apply the transformation on your data using a simple map operation, basically you are looking up the right bloom filter on each key applying the filter on it, again here if unserializing bloom filter is time consuming then you can partition the data on key then use the broadcast variable to look up the bloom filter for each key apply filter on all data in serial. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Mar 20, 2014 at 1:55 PM, Surendranauth Hiraman suren.hira...@velos.io wrote: We ended up going with: map() - set the group_id as the key in a Tuple reduceByKey() - end up with (K,Seq[V]) map() - create the bloom filter and loop through the Seq and persist the Bloom filter This seems to be fine. I guess Spark cannot optimize the reduceByKey and map steps to occur together since the fact that we are looping through the Seq is out of Spark's control. -Suren On Thu, Mar 20, 2014 at 9:48 AM, Surendranauth Hiraman suren.hira...@velos.io wrote: Hi, My team is trying to replicate an existing Map/Reduce process in Spark. Basically, we are creating Bloom Filters for quick set membership tests within our processing pipeline. We have a single column (call it group_id) that we use to partition into sets. As you would expect, in the map phase, we emit the group_id as the key and in the reduce phase, we instantiate the Bloom Filter for a given key in the setup() method and persist that Bloom Filter in the cleanup() method. In Spark, we can do something similar with map() and reduceByKey() but we have the following questions. 1. Accessing the reduce key In reduceByKey(), how do we get access to the specific key within the reduce function? 2. Equivalent of setup/cleanup Where should we instantiate and persist each Bloom Filter by key? In the driver and then pass in the references to the reduce function? But if so, how does the reduce function know which set's Bloom Filter it should be writing to (question 1 above)? It seems if we use groupByKey and then reduceByKey, that gives us access to all of the values at one go. I assume there, Spark will manage if those values all don't fit in memory in one go. SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR NEW YORK, NY 10001 O: (917) 525-2466 ext. 105 F: 646.349.4063 E: suren.hiraman@v suren.hira...@sociocast.comelos.io W: www.velos.io -- SUREN HIRAMAN, VP TECHNOLOGY Velos Accelerating Machine
Re: Pyspark worker memory
Jim, I'm starting to document the heap size settings all in one place, which has been a confusion for a lot of my peers. Maybe you can take a look at this ticket? https://spark-project.atlassian.net/browse/SPARK-1264 On Wed, Mar 19, 2014 at 12:53 AM, Jim Blomo jim.bl...@gmail.com wrote: To document this, it would be nice to clarify what environment variables should be used to set which Java system properties, and what type of process they affect. I'd be happy to start a page if you can point me to the right place: SPARK_JAVA_OPTS: -Dspark.executor.memory can by set on the machine running the driver (typically the master host) and will affect the memory available to the Executor running on a slave node -D SPARK_DAEMON_OPTS: On Wed, Mar 19, 2014 at 12:48 AM, Jim Blomo jim.bl...@gmail.com wrote: Thanks for the suggestion, Matei. I've tracked this down to a setting I had to make on the Driver. It looks like spark-env.sh has no impact on the Executor, which confused me for a long while with settings like SPARK_EXECUTOR_MEMORY. The only setting that mattered was setting the system property in the *driver* (in this case pyspark/shell.py) or using -Dspark.executor.memory in SPARK_JAVA_OPTS *on the master*. I'm not sure how this varies from 0.9.0 release, but it seems to work on SNAPSHOT. On Tue, Mar 18, 2014 at 11:52 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Try checking spark-env.sh on the workers as well. Maybe code there is somehow overriding the spark.executor.memory setting. Matei On Mar 18, 2014, at 6:17 PM, Jim Blomo jim.bl...@gmail.com wrote: Hello, I'm using the Github snapshot of PySpark and having trouble setting the worker memory correctly. I've set spark.executor.memory to 5g, but somewhere along the way Xmx is getting capped to 512M. This was not occurring with the same setup and 0.9.0. How many places do I need to configure the memory? Thank you!
Re: Pyspark worker memory
Yeah, this is definitely confusing. The motivation for this was that different users of the same cluster may want to set different memory sizes for their apps, so we decided to put this setting in the driver. However, if you put SPARK_JAVA_OPTS in spark-env.sh, it also applies to executors, which is confusing (though in this case it wouldn’t overwrite spark.executor.memory AFAIK). We want to clean a bunch of this stuff up for 1.0, or at least document it better. Thanks for the suggestions. Matei On Mar 19, 2014, at 12:53 AM, Jim Blomo jim.bl...@gmail.com wrote: To document this, it would be nice to clarify what environment variables should be used to set which Java system properties, and what type of process they affect. I'd be happy to start a page if you can point me to the right place: SPARK_JAVA_OPTS: -Dspark.executor.memory can by set on the machine running the driver (typically the master host) and will affect the memory available to the Executor running on a slave node -D SPARK_DAEMON_OPTS: On Wed, Mar 19, 2014 at 12:48 AM, Jim Blomo jim.bl...@gmail.com wrote: Thanks for the suggestion, Matei. I've tracked this down to a setting I had to make on the Driver. It looks like spark-env.sh has no impact on the Executor, which confused me for a long while with settings like SPARK_EXECUTOR_MEMORY. The only setting that mattered was setting the system property in the *driver* (in this case pyspark/shell.py) or using -Dspark.executor.memory in SPARK_JAVA_OPTS *on the master*. I'm not sure how this varies from 0.9.0 release, but it seems to work on SNAPSHOT. On Tue, Mar 18, 2014 at 11:52 PM, Matei Zaharia matei.zaha...@gmail.com wrote: Try checking spark-env.sh on the workers as well. Maybe code there is somehow overriding the spark.executor.memory setting. Matei On Mar 18, 2014, at 6:17 PM, Jim Blomo jim.bl...@gmail.com wrote: Hello, I'm using the Github snapshot of PySpark and having trouble setting the worker memory correctly. I've set spark.executor.memory to 5g, but somewhere along the way Xmx is getting capped to 512M. This was not occurring with the same setup and 0.9.0. How many places do I need to configure the memory? Thank you!
Re: DStream spark paper
Hi Adrian, On every timestep of execution, we receive new data, then report updated word counts for that new data plus the past 30 seconds. The latency here is about how quickly you get these updated counts once the new batch of data comes in. It’s true that the count reflects some data from 30 seconds ago as well, but it doesn’t mean the overall processing latency is 30 seconds. Matei On Mar 20, 2014, at 1:36 PM, Adrian Mocanu amoc...@verticalscope.com wrote: I looked over the specs on page 9 from http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf The first paragraph mentions the window size is 30 seconds “Word-Count, which performs a sliding window count over 30s; and TopKCount, which finds the k most frequent words over the past 30s. “ The second paragraph mentions subsecond latency. Putting these 2 together, is the paper saying that in the 30 sec window the tuples are delayed at most 1 second? The paper explains “By “end-to-end latency,” we mean the time from when records are sent to the system to when results incorporating them appear.” This leads me to conclude that end-to-end latency for a 30 sec window should be at least 30 seconds because results won’t be incorporated until the entire window is completed ie: 30sec. At the same time the paper claims latency is sub second so clearly I’m misunderstanding something. -Adrian
Sprak Job stuck
Hi, I have run the spark application to process input data of size ~14GB with executor memory 10GB. The job got stuck with below message 14/03/21 05:02:07 WARN storage.BlockManagerMasterActor: Removing BlockManager BlockManagerId(0, guavus-0102bf, 49347, 0) with no recent heart beats: 85563ms exceeds 45000ms But job completed successfully if i increase executor memory 40GB. Any idea?? Thanks Mohit Goyal -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sprak-Job-stuck-tp2979.html Sent from the Apache Spark User List mailing list archive at Nabble.com.