Largest input data set observed for Spark.

2014-03-20 Thread Usman Ghani
All,
What is the largest input data set y'all have come across that has been
successfully processed in production using spark. Ball park?


Re: Relation between DStream and RDDs

2014-03-20 Thread Pascal Voitot Dev
If I may add my contribution to this discussion if I understand well your
question...

DStream is discretized stream. It discretized the data stream over windows
of time (according to the project code I've read and paper too). so when
you write:

JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new
Duration(60 * 60 * 1000)); //1 hour

It means you are discretizing over a 1h window. Each batch so each RDD of
the dstream will collect data for 1h before going to next RDD.
So if you want to have more RDD, you should reduce batch size/duration...

Pascal


On Thu, Mar 20, 2014 at 7:51 AM, Tathagata Das
tathagata.das1...@gmail.comwrote:

 That is a good question. If I understand correctly, you need multiple RDDs
 from a DStream in *every batch*. Can you elaborate on why do you need
 multiple RDDs every batch?

 TD


 On Wed, Mar 19, 2014 at 10:20 PM, Sanjay Awatramani sanjay_a...@yahoo.com
  wrote:

 Hi,

 As I understand, a DStream consists of 1 or more RDDs. And foreachRDD
 will run a given func on each and every RDD inside a DStream.

 I created a simple program which reads log files from a folder every hour:
 JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new
 Duration(60 * 60 * 1000)); //1 hour
 JavaDStreamString obj = stcObj.textFileStream(/Users/path/to/Input);

 When the interval is reached, Spark reads all the files and creates one
 and only one RDD (as i verified from a sysout inside foreachRDD).

 The streaming doc at a lot of places gives an indication that many
 operations (e.g. flatMap) on a DStream are applied individually to a RDD
 and the resulting DStream consists of the mapped RDDs in the same number as
 the input DStream.
 ref:
 https://spark.apache.org/docs/latest/streaming-programming-guide.html#dstreams

 If that is the case, how can i generate a scenario where in I have
 multiple RDDs inside a DStream in my example ?

 Regards,
 Sanjay





Re: Relation between DStream and RDDs

2014-03-20 Thread Sanjay Awatramani
@TD: I do not need multiple RDDs in a DStream in every batch. On the contrary 
my logic would work fine if there is only 1 RDD. But then the description for 
functions like reduce  count (Return a new DStream of single-element RDDs by 
counting the number of elements in each RDD of the source DStream.) left me 
confused whether I should account for the fact that a DStream can have multiple 
RDDs. My streaming code processes a batch every hour. In the 2nd batch, i 
checked that the DStream contains only 1 RDD, i.e. the 2nd batch's RDD. I 
verified this using sysout in foreachRDD. Does that mean that the DStream will 
always contain only 1 RDD ? Is there a way to access the RDD of the 1st batch 
in the 2nd batch ? The 1st batch may contain some records which were not 
relevant to the first batch and are to be processed in the 2nd batch. I know i 
can use the sliding window mechanism of streaming, but if i'm not using it and 
there is no way to access the previous
 batch's RDD, then it means that functions like count will always return a 
DStream containing only 1 RDD, am i correct ?

@Pascal, yes your answer resolves my question partially, but the other part of 
the question(which i've clarified in above paragraph) still remains.

Thanks for your answers !

Regards,
Sanjay



On Thursday, 20 March 2014 1:27 PM, Pascal Voitot Dev 
pascal.voitot@gmail.com wrote:
 
If I may add my contribution to this discussion if I understand well your 
question...


DStream is discretized stream. It discretized the data stream over windows of 
time (according to the project code I've read and paper too). so when you write:


JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new Duration(60 
* 60 * 1000)); //1 hour


It means you are discretizing over a 1h window. Each batch so each RDD of the 
dstream will collect data for 1h before going to next RDD.

So if you want to have more RDD, you should reduce batch size/duration...


Pascal




On Thu, Mar 20, 2014 at 7:51 AM, Tathagata Das tathagata.das1...@gmail.com 
wrote:

That is a good question. If I understand correctly, you need multiple RDDs from 
a DStream in *every batch*. Can you elaborate on why do you need multiple RDDs 
every batch?


TD



On Wed, Mar 19, 2014 at 10:20 PM, Sanjay Awatramani sanjay_a...@yahoo.com 
wrote:

Hi,


As I understand, a DStream consists of 1 or more RDDs. And foreachRDD will 
run a given func on each and every RDD inside a DStream.


I created a simple program which reads log files from a folder every hour:
JavaStreamingContext stcObj = new JavaStreamingContext(confObj, new 
Duration(60 * 60 * 1000)); //1 hour
JavaDStreamString obj = stcObj.textFileStream(/Users/path/to/Input);


When the interval is reached, Spark reads all the files and creates one and 
only one RDD (as i verified from a sysout inside foreachRDD).


The streaming doc at a lot of places gives an indication that many operations 
(e.g. flatMap) on a DStream are applied individually to a RDD and the 
resulting DStream consists of the mapped RDDs in the same number as the input 
DStream.
ref: 
https://spark.apache.org/docs/latest/streaming-programming-guide.html#dstreams


If that is the case, how can i generate a scenario where in I have multiple 
RDDs inside a DStream in my example ?


Regards,
Sanjay


Error while reading from HDFS Simple application

2014-03-20 Thread Laeeq Ahmed
VerifyError: class 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$CreateSnapshotRequestProto
 overrides final method getUnknownFields.()Lcom/google/protobuf/UnknownFieldSet;


What can be cause of this error?

Regards,
Laeeq Ahmed,
PhD Student,
HPCViz, KTH.

http://laeeprofile.weebly.com


Re: Spark worker threads waiting

2014-03-20 Thread sparrow
This is what the web UI looks like:
[image: Inline image 1]

I also tail all the worker logs and theese are the last entries before the
waiting begins:

14/03/20 13:29:10 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
maxBytesInFlight: 50331648, minRequest: 10066329
14/03/20 13:29:10 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Getting 29853 non-zero-bytes blocks out of 37714 blocks
14/03/20 13:29:10 INFO BlockFetcherIterator$BasicBlockFetcherIterator:
Started 5 remote gets in  62 ms
[PSYoungGen: 12464967K-3767331K(10552192K)]
36074093K-29053085K(44805696K), 0.6765460 secs] [Times: user=5.35
sys=0.02, real=0.67 secs]
[PSYoungGen: 10779466K-3203826K(9806400K)]
35384386K-31562169K(44059904K), 0.6925730 secs] [Times: user=5.47
sys=0.00, real=0.70 secs]

From the screenshot above you can see that task take ~ 6 minutes to
complete. The amount of time it takes the tasks to complete seems to depend
on the amount of input data. If s3 input string captures 2.5 times less
data (less data to shuffle write  and later read), same tasks take 1
minute. Any idea how to debug what the workers are doing?

Domen

On Wed, Mar 19, 2014 at 5:27 PM, Mayur Rustagi [via Apache Spark User List]
ml-node+s1001560n2882...@n3.nabble.com wrote:

 You could have some outlier task that is preventing the next set of stages
 from launching. Can you check out stages state in the Spark WebUI, is any
 task running or is everything halted.
 Regards
 Mayur

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Wed, Mar 19, 2014 at 5:40 AM, Domen Grabec [hidden 
 email]http://user/SendEmail.jtp?type=nodenode=2882i=0
  wrote:

 Hi,

 I have a cluster with 16 nodes, each node has 69Gb ram (50GB goes to
 spark) and 8 cores running spark 0.8.1. I have a groupByKey operation that
 causes a wide RDD dependency so shuffle write and shuffle read are
 performed.

 For some reason all worker threads seem to sleep for about 3-4 minutes
 each time performing a shuffle read and completing a set of tasks. See
 graphs below how no resources are being utilized in specific time windows.

 Each time 3-4 minutes pass, a next set of tasks are being grabbed and
 processed, and then another waiting period happens.

 Each task has an input of 80Mb +- 5Mb data to shuffle read.

  [image: Inline image 1]

 Here http://pastebin.com/UHWMdTRY is a link to thread dump performed
 in the middle of the waiting period. Any idea what could cause the long
 waits?

 Kind regards, Domen




 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-worker-threads-waiting-tp2859p2882.html
  To start a new topic under Apache Spark User List, email
 ml-node+s1001560n1...@n3.nabble.com
 To unsubscribe from Apache Spark User List, click 
 herehttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=ZG9tZW5AY2VsdHJhLmNvbXwxfC01NjUwMzk2ODU=
 .
 NAMLhttp://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml



Stages.png (204K) 
http://apache-spark-user-list.1001560.n3.nabble.com/attachment/2938/0/Stages.png




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-worker-threads-waiting-tp2859p2938.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Accessing the reduce key

2014-03-20 Thread Mayur Rustagi
Why are you trying to reducebyKey? Are you looking to work on the data
sequentially.
If I understand correctly you are looking to filter your data using the
bloom filter  each bloom filter is tied to which key is instantiating it.
Following are some of the options
*partiition* your data by key  use mappartition operator to run function
on partition independently. The same function will be applied to each
partition.
If your bloomfilter is large then you can bundle all of them in as a
broadcast variable  use it to apply the transformation on your data using
a simple map operation, basically you are looking up the right bloom filter
on each key  applying the filter on it, again here if unserializing bloom
filter is time consuming then you can partition the data on key  then use
the broadcast variable to look up the bloom filter for each key  apply
filter on all data in serial.

Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Thu, Mar 20, 2014 at 1:55 PM, Surendranauth Hiraman 
suren.hira...@velos.io wrote:

 We ended up going with:

 map() - set the group_id as the key in a Tuple
 reduceByKey() - end up with (K,Seq[V])
 map() - create the bloom filter and loop through the Seq and persist the
 Bloom filter

 This seems to be fine.

 I guess Spark cannot optimize the reduceByKey and map steps to occur
 together since the fact that we are looping through the Seq is out of
 Spark's control.

 -Suren




 On Thu, Mar 20, 2014 at 9:48 AM, Surendranauth Hiraman 
 suren.hira...@velos.io wrote:

 Hi,

 My team is trying to replicate an existing Map/Reduce process in Spark.

 Basically, we are creating Bloom Filters for quick set membership tests
 within our processing pipeline.

 We have a single column (call it group_id) that we use to partition into
 sets.

 As you would expect, in the map phase, we emit the group_id as the key
 and in the reduce phase, we instantiate the Bloom Filter for a given key in
 the setup() method and persist that Bloom Filter in the cleanup() method.

 In Spark, we can do something similar with map() and reduceByKey() but we
 have the following questions.


 1. Accessing the reduce key
 In reduceByKey(), how do we get access to the specific key within the
 reduce function?


 2. Equivalent of setup/cleanup
 Where should we instantiate and persist each Bloom Filter by key? In the
 driver and then pass in the references to the reduce function? But if so,
 how does the reduce function know which set's Bloom Filter it should be
 writing to (question 1 above)?

 It seems if we use groupByKey and then reduceByKey, that gives us access
 to all of the values at one go. I assume there, Spark will manage if those
 values all don't fit in memory in one go.



 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io





 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io




Re: sort order after reduceByKey / groupByKey

2014-03-20 Thread Mayur Rustagi
Thats expected. I think sortByKey is option too  probably a better one.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Thu, Mar 20, 2014 at 3:20 PM, Ameet Kini ameetk...@gmail.com wrote:


 val rdd2 = rdd.partitionBy(my partitioner).reduceByKey(some function)

 I see that rdd2's partitions are not internally sorted. Can someone
 confirm that this is expected behavior? And if so, the only way to get
 partitions internally sorted is to follow it with something like this

 val rdd2 = rdd.partitionBy(my partitioner).reduceByKey(some
 function).mapPartitions(p = sort(p))

 Thanks,
 Ameet




Re: sort order after reduceByKey / groupByKey

2014-03-20 Thread Ameet Kini
I saw that but I don't need a global sort, only intra-partition sort.

Ameet


On Thu, Mar 20, 2014 at 3:26 PM, Mayur Rustagi mayur.rust...@gmail.comwrote:

 Thats expected. I think sortByKey is option too  probably a better one.

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
  @mayur_rustagi https://twitter.com/mayur_rustagi



 On Thu, Mar 20, 2014 at 3:20 PM, Ameet Kini ameetk...@gmail.com wrote:


 val rdd2 = rdd.partitionBy(my partitioner).reduceByKey(some function)

 I see that rdd2's partitions are not internally sorted. Can someone
 confirm that this is expected behavior? And if so, the only way to get
 partitions internally sorted is to follow it with something like this

 val rdd2 = rdd.partitionBy(my partitioner).reduceByKey(some
 function).mapPartitions(p = sort(p))

 Thanks,
 Ameet





Re: in SF until Friday

2014-03-20 Thread Mayur Rustagi
Would love to .. but I am in NY till Friday :(

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi https://twitter.com/mayur_rustagi



On Wed, Mar 19, 2014 at 7:34 PM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 I'm in San Francisco until Friday for a conference (visiting from Boston).

 If any of y'all are up for a drink or something, I'd love to meet you in
 person and say hi.

 Nick


 --
 View this message in context: in SF until 
 Fridayhttp://apache-spark-user-list.1001560.n3.nabble.com/in-SF-until-Friday-tp2900.html
 Sent from the Apache Spark User List mailing list 
 archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.



Re: Accessing the reduce key

2014-03-20 Thread Surendranauth Hiraman
Mayur,

To be a little clearer, for creating the Bloom Filters, I don't think
broadcast variables are the way to go, though definitely that would work
for using the Bloom Filters to filter data.

The reason why is that the creation needs to happen in a single thread.
Otherwise, some type of locking/distributed locking is needed on the
individual Bloom Filter itself, with performance impact.

Agreed?

-Suren




On Thu, Mar 20, 2014 at 3:40 PM, Surendranauth Hiraman 
suren.hira...@velos.io wrote:

 Mayur,

 Thanks. This step is for creating the Bloom Filter, not using it to filter
 data, actually. But your answer still stands.

 Partitioning by key, having the bloom filters as a broadcast variable and
 then doing mappartition makes sense.

 Are there performance implications for this approach, such as with using
 the broadcast variable, versus the approach we used, in which the Bloom
 Filter (again, for creating it) is only referenced by the single map
 application?

 -Suren





 On Thu, Mar 20, 2014 at 3:20 PM, Mayur Rustagi mayur.rust...@gmail.comwrote:

 Why are you trying to reducebyKey? Are you looking to work on the data
 sequentially.
 If I understand correctly you are looking to filter your data using the
 bloom filter  each bloom filter is tied to which key is instantiating it.
 Following are some of the options
 *partiition* your data by key  use mappartition operator to run function
 on partition independently. The same function will be applied to each
 partition.
 If your bloomfilter is large then you can bundle all of them in as a
 broadcast variable  use it to apply the transformation on your data using
 a simple map operation, basically you are looking up the right bloom filter
 on each key  applying the filter on it, again here if unserializing bloom
 filter is time consuming then you can partition the data on key  then use
 the broadcast variable to look up the bloom filter for each key  apply
 filter on all data in serial.

 Regards
 Mayur

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Thu, Mar 20, 2014 at 1:55 PM, Surendranauth Hiraman 
 suren.hira...@velos.io wrote:

 We ended up going with:

 map() - set the group_id as the key in a Tuple
 reduceByKey() - end up with (K,Seq[V])
 map() - create the bloom filter and loop through the Seq and persist the
 Bloom filter

 This seems to be fine.

 I guess Spark cannot optimize the reduceByKey and map steps to occur
 together since the fact that we are looping through the Seq is out of
 Spark's control.

 -Suren




 On Thu, Mar 20, 2014 at 9:48 AM, Surendranauth Hiraman 
 suren.hira...@velos.io wrote:

 Hi,

 My team is trying to replicate an existing Map/Reduce process in Spark.

 Basically, we are creating Bloom Filters for quick set membership tests
 within our processing pipeline.

 We have a single column (call it group_id) that we use to partition
 into sets.

 As you would expect, in the map phase, we emit the group_id as the key
 and in the reduce phase, we instantiate the Bloom Filter for a given key in
 the setup() method and persist that Bloom Filter in the cleanup() method.

 In Spark, we can do something similar with map() and reduceByKey() but
 we have the following questions.


 1. Accessing the reduce key
 In reduceByKey(), how do we get access to the specific key within the
 reduce function?


 2. Equivalent of setup/cleanup
 Where should we instantiate and persist each Bloom Filter by key? In
 the driver and then pass in the references to the reduce function? But if
 so, how does the reduce function know which set's Bloom Filter it should be
 writing to (question 1 above)?

 It seems if we use groupByKey and then reduceByKey, that gives us
 access to all of the values at one go. I assume there, Spark will manage if
 those values all don't fit in memory in one go.



 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io





 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io





 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io




-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v suren.hira...@sociocast.comelos.io
W: www.velos.io


Re: Hadoop streaming like feature for Spark

2014-03-20 Thread Jaonary Rabarisoa
Thank you Ewen. RDD.pipe is what I need and it works like a charm. On the
other side RDD.mapPartitions seems to be interesting but I can't figure out
how to make it work.

Jaonary


On Thu, Mar 20, 2014 at 4:54 PM, Ewen Cheslack-Postava m...@ewencp.orgwrote:

 Take a look at RDD.pipe().

 You could also accomplish the same thing using RDD.mapPartitions, which
 you pass a function that processes the iterator for each partition rather
 than processing each element individually. This lets you only start up as
 many processes as there are partitions, pipe the contents of each iterator
 to them, then collect the output. This might be useful if, e.g., your
 external process doesn't use line-oriented input/output.

 -Ewen

   Jaonary Rabarisoa jaon...@gmail.com
  March 20, 2014 at 1:04 AM
 Dear all,


 Dear all,


 Does Spark has a kind of Hadoop streaming feature to run external process
 to manipulate data from RDD sent through stdin and stdout ?

 Best,

 Jaonary


inline: compose-unknown-contact.jpg

Re: Largest input data set observed for Spark.

2014-03-20 Thread Soila Pertet Kavulya
Hi Reynold,

Nice! What spark configuration parameters did you use to get your job to
run successfully on a large dataset? My job is failing on 1TB of input data
(uncompressed) on a 4-node cluster (64GB memory per node). No OutOfMemory
errors just lost executors.

Thanks,

Soila
On Mar 20, 2014 11:29 AM, Reynold Xin r...@databricks.com wrote:

 I'm not really at liberty to discuss details of the job. It involves some
 expensive aggregated statistics, and took 10 hours to complete (mostly
 bottlenecked by network  io).





 On Thu, Mar 20, 2014 at 11:12 AM, Surendranauth Hiraman 
 suren.hira...@velos.io wrote:

 Reynold,

 How complex was that job (I guess in terms of number of transforms and
 actions) and how long did that take to process?

 -Suren



 On Thu, Mar 20, 2014 at 2:08 PM, Reynold Xin r...@databricks.com wrote:

  Actually we just ran a job with 70TB+ compressed data on 28 worker
 nodes -
  I didn't count the size of the uncompressed data, but I am guessing it
 is
  somewhere between 200TB to 700TB.
 
 
 
  On Thu, Mar 20, 2014 at 12:23 AM, Usman Ghani us...@platfora.com
 wrote:
 
   All,
   What is the largest input data set y'all have come across that has
 been
   successfully processed in production using spark. Ball park?
  
 



 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io





Re: Machine Learning on streaming data

2014-03-20 Thread Jeremy Freeman
Thanks TD, happy to share my experience with MLLib + Spark Streaming 
integration.

Here's a gist with two examples I have working, one for 
StreamingLinearRegression and another for StreamingKMeans.

https://gist.github.com/freeman-lab/9672685

The goal in each case was to implement a streaming version of the algorithm, 
using as much as possible directly from MLLib. For Linear Regression this was 
straightforward, because the MLLib version already uses a (stochastic) update 
rule, which I just use to update the model inside a foreachRDD(), using each 
new batch of data. For KMeans, I used the model class from MLLib, but extended 
it to keep a running count for each cluster. I also had to re-implement a chunk 
of the core algorithm in the form of an update rule. Tighter integration in 
this case would, I think, require refactoring some of MLLib (e.g. to use 
something like this update function), but this works fine.

One unresolved issue: for these kinds of algorithms, the dimensionality of the 
data must be known in advance. Would be cool to automatically detect it based 
on the first record.

-- Jeremy

On Mar 19, 2014, at 9:03 PM, Tathagata Das tathagata.das1...@gmail.com wrote:

 Yes, of course you can conceptually apply machine learning algorithm on Spark 
 Streaming. However the current MLLib does not yet have direct support for 
 Spark Streaming's DStream. However, since DStreams are essentially a sequence 
 of RDDs, you can apply MLLib algorithms on those RDDs. Take a look at 
 DStream.transform() and DStream.foreachRDD() operations, which allows you 
 access RDDs in a DStream. You can apply MLLib functions on them.
 
 Some people have attempted to make a tighter integration between MLLib and 
 Spark Streaming. Jeremy (cc'ed) can say more about his adventures. 
 
 TD
 
 
 On Sun, Mar 16, 2014 at 5:56 PM, Nasir Khan nasirkhan.onl...@gmail.com 
 wrote:
 hi, I m into a project in which i have to get streaming URL's and Filter it
 and classify it as benin or suspicious. Now Machine Learning and Streaming
 are two separate things in apache spark (AFAIK). my Question is Can we apply
 Online Machine Learning Algorithms on Streams??
 
 I am at Beginner Level, Kindly Explain in abit detail and if some one can
 direct me to some good material for me will be greats.
 
 Thanks
 Nasir Khan.
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Machine-Learning-on-streaming-data-tp2732.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 



Re: Accessing the reduce key

2014-03-20 Thread Surendranauth Hiraman
Grouped by the group_id but not sorted.

-Suren



On Thu, Mar 20, 2014 at 5:52 PM, Mayur Rustagi mayur.rust...@gmail.comwrote:

 You are using the data grouped (sorted?) To create the bloom filter ?
 On Mar 20, 2014 4:35 PM, Surendranauth Hiraman suren.hira...@velos.io
 wrote:

 Mayur,

 To be a little clearer, for creating the Bloom Filters, I don't think
 broadcast variables are the way to go, though definitely that would work
 for using the Bloom Filters to filter data.

 The reason why is that the creation needs to happen in a single thread.
 Otherwise, some type of locking/distributed locking is needed on the
 individual Bloom Filter itself, with performance impact.

 Agreed?

 -Suren




 On Thu, Mar 20, 2014 at 3:40 PM, Surendranauth Hiraman 
 suren.hira...@velos.io wrote:

 Mayur,

 Thanks. This step is for creating the Bloom Filter, not using it to
 filter data, actually. But your answer still stands.

 Partitioning by key, having the bloom filters as a broadcast variable
 and then doing mappartition makes sense.

 Are there performance implications for this approach, such as with using
 the broadcast variable, versus the approach we used, in which the Bloom
 Filter (again, for creating it) is only referenced by the single map
 application?

 -Suren





 On Thu, Mar 20, 2014 at 3:20 PM, Mayur Rustagi 
 mayur.rust...@gmail.comwrote:

 Why are you trying to reducebyKey? Are you looking to work on the data
 sequentially.
 If I understand correctly you are looking to filter your data using the
 bloom filter  each bloom filter is tied to which key is instantiating it.
 Following are some of the options
 *partiition* your data by key  use mappartition operator to run
 function on partition independently. The same function will be applied to
 each partition.
 If your bloomfilter is large then you can bundle all of them in as a
 broadcast variable  use it to apply the transformation on your data using
 a simple map operation, basically you are looking up the right bloom filter
 on each key  applying the filter on it, again here if unserializing bloom
 filter is time consuming then you can partition the data on key  then use
 the broadcast variable to look up the bloom filter for each key  apply
 filter on all data in serial.

 Regards
 Mayur

 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi https://twitter.com/mayur_rustagi



 On Thu, Mar 20, 2014 at 1:55 PM, Surendranauth Hiraman 
 suren.hira...@velos.io wrote:

 We ended up going with:

 map() - set the group_id as the key in a Tuple
 reduceByKey() - end up with (K,Seq[V])
 map() - create the bloom filter and loop through the Seq and persist
 the Bloom filter

 This seems to be fine.

 I guess Spark cannot optimize the reduceByKey and map steps to occur
 together since the fact that we are looping through the Seq is out of
 Spark's control.

 -Suren




 On Thu, Mar 20, 2014 at 9:48 AM, Surendranauth Hiraman 
 suren.hira...@velos.io wrote:

 Hi,

 My team is trying to replicate an existing Map/Reduce process in
 Spark.

 Basically, we are creating Bloom Filters for quick set membership
 tests within our processing pipeline.

 We have a single column (call it group_id) that we use to partition
 into sets.

 As you would expect, in the map phase, we emit the group_id as the
 key and in the reduce phase, we instantiate the Bloom Filter for a given
 key in the setup() method and persist that Bloom Filter in the cleanup()
 method.

 In Spark, we can do something similar with map() and reduceByKey()
 but we have the following questions.


 1. Accessing the reduce key
 In reduceByKey(), how do we get access to the specific key within the
 reduce function?


 2. Equivalent of setup/cleanup
 Where should we instantiate and persist each Bloom Filter by key? In
 the driver and then pass in the references to the reduce function? But if
 so, how does the reduce function know which set's Bloom Filter it should 
 be
 writing to (question 1 above)?

 It seems if we use groupByKey and then reduceByKey, that gives us
 access to all of the values at one go. I assume there, Spark will manage 
 if
 those values all don't fit in memory in one go.



 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io





 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io





 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine Learning

 440 NINTH AVENUE, 11TH FLOOR
 NEW YORK, NY 10001
 O: (917) 525-2466 ext. 105
 F: 646.349.4063
 E: suren.hiraman@v suren.hira...@sociocast.comelos.io
 W: www.velos.io




 --

 SUREN HIRAMAN, VP TECHNOLOGY
 Velos
 Accelerating Machine 

Re: Pyspark worker memory

2014-03-20 Thread Andrew Ash
Jim, I'm starting to document the heap size settings all in one place,
which has been a confusion for a lot of my peers.  Maybe you can take a
look at this ticket?

https://spark-project.atlassian.net/browse/SPARK-1264


On Wed, Mar 19, 2014 at 12:53 AM, Jim Blomo jim.bl...@gmail.com wrote:

 To document this, it would be nice to clarify what environment
 variables should be used to set which Java system properties, and what
 type of process they affect.  I'd be happy to start a page if you can
 point me to the right place:

 SPARK_JAVA_OPTS:
   -Dspark.executor.memory can by set on the machine running the driver
 (typically the master host) and will affect the memory available to
 the Executor running on a slave node
   -D

 SPARK_DAEMON_OPTS:
   

 On Wed, Mar 19, 2014 at 12:48 AM, Jim Blomo jim.bl...@gmail.com wrote:
  Thanks for the suggestion, Matei.  I've tracked this down to a setting
  I had to make on the Driver.  It looks like spark-env.sh has no impact
  on the Executor, which confused me for a long while with settings like
  SPARK_EXECUTOR_MEMORY.  The only setting that mattered was setting the
  system property in the *driver* (in this case pyspark/shell.py) or
  using -Dspark.executor.memory in SPARK_JAVA_OPTS *on the master*.  I'm
  not sure how this varies from 0.9.0 release, but it seems to work on
  SNAPSHOT.
 
  On Tue, Mar 18, 2014 at 11:52 PM, Matei Zaharia matei.zaha...@gmail.com
 wrote:
  Try checking spark-env.sh on the workers as well. Maybe code there is
  somehow overriding the spark.executor.memory setting.
 
  Matei
 
  On Mar 18, 2014, at 6:17 PM, Jim Blomo jim.bl...@gmail.com wrote:
 
  Hello, I'm using the Github snapshot of PySpark and having trouble
 setting
  the worker memory correctly. I've set spark.executor.memory to 5g, but
  somewhere along the way Xmx is getting capped to 512M. This was not
  occurring with the same setup and 0.9.0. How many places do I need to
  configure the memory? Thank you!
 
 



Re: Pyspark worker memory

2014-03-20 Thread Matei Zaharia
Yeah, this is definitely confusing. The motivation for this was that different 
users of the same cluster may want to set different memory sizes for their 
apps, so we decided to put this setting in the driver. However, if you put 
SPARK_JAVA_OPTS in spark-env.sh, it also applies to executors, which is 
confusing (though in this case it wouldn’t overwrite spark.executor.memory 
AFAIK).

We want to clean a bunch of this stuff up for 1.0, or at least document it 
better. Thanks for the suggestions.

Matei

On Mar 19, 2014, at 12:53 AM, Jim Blomo jim.bl...@gmail.com wrote:

 To document this, it would be nice to clarify what environment
 variables should be used to set which Java system properties, and what
 type of process they affect.  I'd be happy to start a page if you can
 point me to the right place:
 
 SPARK_JAVA_OPTS:
  -Dspark.executor.memory can by set on the machine running the driver
 (typically the master host) and will affect the memory available to
 the Executor running on a slave node
  -D
 
 SPARK_DAEMON_OPTS:
  
 
 On Wed, Mar 19, 2014 at 12:48 AM, Jim Blomo jim.bl...@gmail.com wrote:
 Thanks for the suggestion, Matei.  I've tracked this down to a setting
 I had to make on the Driver.  It looks like spark-env.sh has no impact
 on the Executor, which confused me for a long while with settings like
 SPARK_EXECUTOR_MEMORY.  The only setting that mattered was setting the
 system property in the *driver* (in this case pyspark/shell.py) or
 using -Dspark.executor.memory in SPARK_JAVA_OPTS *on the master*.  I'm
 not sure how this varies from 0.9.0 release, but it seems to work on
 SNAPSHOT.
 
 On Tue, Mar 18, 2014 at 11:52 PM, Matei Zaharia matei.zaha...@gmail.com 
 wrote:
 Try checking spark-env.sh on the workers as well. Maybe code there is
 somehow overriding the spark.executor.memory setting.
 
 Matei
 
 On Mar 18, 2014, at 6:17 PM, Jim Blomo jim.bl...@gmail.com wrote:
 
 Hello, I'm using the Github snapshot of PySpark and having trouble setting
 the worker memory correctly. I've set spark.executor.memory to 5g, but
 somewhere along the way Xmx is getting capped to 512M. This was not
 occurring with the same setup and 0.9.0. How many places do I need to
 configure the memory? Thank you!
 
 



Re: DStream spark paper

2014-03-20 Thread Matei Zaharia
Hi Adrian,

On every timestep of execution, we receive new data, then report updated word 
counts for that new data plus the past 30 seconds. The latency here is about 
how quickly you get these updated counts once the new batch of data comes in. 
It’s true that the count reflects some data from 30 seconds ago as well, but it 
doesn’t mean the overall processing latency is 30 seconds.

Matei

On Mar 20, 2014, at 1:36 PM, Adrian Mocanu amoc...@verticalscope.com wrote:

 I looked over the specs on page 9 from 
 http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf
 The first paragraph mentions the window size is 30 seconds “Word-Count, which 
 performs a sliding window count over 30s;
 and TopKCount, which finds the k most frequent words over the past 30s. “
  
 The second paragraph mentions subsecond latency.
  
 Putting these 2 together, is the paper saying that in the 30 sec window the 
 tuples are delayed at most 1 second?
  
 The paper explains “By “end-to-end latency,” we mean the time from when 
 records are sent to the system to when results incorporating them appear.” 
 This leads me to conclude that end-to-end latency for a 30 sec window should 
 be at least 30 seconds because results won’t be incorporated until the entire 
 window is completed ie: 30sec. At the same time the paper claims latency is 
 sub second so clearly I’m misunderstanding something.
  
 -Adrian



Sprak Job stuck

2014-03-20 Thread mohit.goyal
Hi,

I have run the spark application to process input data of size ~14GB with
executor memory 10GB. The job got stuck with below message

14/03/21 05:02:07 WARN storage.BlockManagerMasterActor: Removing
BlockManager BlockManagerId(0, guavus-0102bf, 49347, 0) with no recent heart
beats: 85563ms exceeds 45000ms

But job completed successfully if i increase executor memory 40GB.

Any idea??

Thanks
Mohit Goyal




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Sprak-Job-stuck-tp2979.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.