Re: Spark with Parquet

2014-04-27 Thread Matei Zaharia
Spark uses the Hadoop InputFormat and OutputFormat classes, so you can simply 
create a JobConf to read the data and pass that to SparkContext.hadoopFile. 
There are some examples for Parquet usage here: 
http://zenfractal.com/2013/08/21/a-powerful-big-data-trio/ and here: 
http://engineering.ooyala.com/blog/using-parquet-and-scrooge-spark.

Matei

On Apr 27, 2014, at 11:41 PM, Sai Prasanna  wrote:

> Hi All,
> 
> I want to store a csv-text file in Parquet format in HDFS and then do some 
> processing in Spark.
> 
> Somehow my search to find the way to do was futile. More help was available 
> for parquet with impala. 
> 
> Any guidance here? Thanks !!
> 



Spark with Parquet

2014-04-27 Thread Sai Prasanna
Hi All,

I want to store a csv-text file in Parquet format in HDFS and then do some
processing in Spark.

Somehow my search to find the way to do was futile. More help was available
for parquet with impala.

Any guidance here? Thanks !!


Re: Running out of memory Naive Bayes

2014-04-27 Thread Matei Zaharia
Not sure if this is always ideal for Naive Bayes, but you could also hash the 
features into a lower-dimensional space (e.g. reduce it to 50,000 features). 
For each feature simply take MurmurHash3(featureID) % 5 for example.

Matei

On Apr 27, 2014, at 11:24 PM, DB Tsai  wrote:

> Our customer asked us to implement Naive Bayes which should be able to at 
> least train news20 one year ago, and we implemented for them in Hadoop using 
> distributed cache to store the model.
> 
> 
> Sincerely,
> 
> DB Tsai
> ---
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
> 
> 
> On Sun, Apr 27, 2014 at 11:03 PM, Xiangrui Meng  wrote:
> How big is your problem and how many labels? -Xiangrui
> 
> On Sun, Apr 27, 2014 at 10:28 PM, DB Tsai  wrote:
> > Hi Xiangrui,
> >
> > We also run into this issue at Alpine Data Labs. We ended up using LRU cache
> > to store the counts, and splitting those least used counts to distributed
> > cache in HDFS.
> >
> >
> > Sincerely,
> >
> > DB Tsai
> > ---
> > My Blog: https://www.dbtsai.com
> > LinkedIn: https://www.linkedin.com/in/dbtsai
> >
> >
> > On Sun, Apr 27, 2014 at 7:34 PM, Xiangrui Meng  wrote:
> >>
> >> Even the features are sparse, the conditional probabilities are stored
> >> in a dense matrix. With 200 labels and 2 million features, you need to
> >> store at least 4e8 doubles on the driver node. With multiple
> >> partitions, you may need more memory on the driver. Could you try
> >> reducing the number of partitions and giving driver more ram and see
> >> whether it can help? -Xiangrui
> >>
> >> On Sun, Apr 27, 2014 at 3:33 PM, John King 
> >> wrote:
> >> > I'm already using the SparseVector class.
> >> >
> >> > ~200 labels
> >> >
> >> >
> >> > On Sun, Apr 27, 2014 at 12:26 AM, Xiangrui Meng 
> >> > wrote:
> >> >>
> >> >> How many labels does your dataset have? -Xiangrui
> >> >>
> >> >> On Sat, Apr 26, 2014 at 6:03 PM, DB Tsai  wrote:
> >> >> > Which version of mllib are you using? For Spark 1.0, mllib will
> >> >> > support sparse feature vector which will improve performance a lot
> >> >> > when computing the distance between points and centroid.
> >> >> >
> >> >> > Sincerely,
> >> >> >
> >> >> > DB Tsai
> >> >> > ---
> >> >> > My Blog: https://www.dbtsai.com
> >> >> > LinkedIn: https://www.linkedin.com/in/dbtsai
> >> >> >
> >> >> >
> >> >> > On Sat, Apr 26, 2014 at 5:49 AM, John King
> >> >> >  wrote:
> >> >> >> I'm just wondering are the SparkVector calculations really taking
> >> >> >> into
> >> >> >> account the sparsity or just converting to dense?
> >> >> >>
> >> >> >>
> >> >> >> On Fri, Apr 25, 2014 at 10:06 PM, John King
> >> >> >> 
> >> >> >> wrote:
> >> >> >>>
> >> >> >>> I've been trying to use the Naive Bayes classifier. Each example in
> >> >> >>> the
> >> >> >>> dataset is about 2 million features, only about 20-50 of which are
> >> >> >>> non-zero,
> >> >> >>> so the vectors are very sparse. I keep running out of memory
> >> >> >>> though,
> >> >> >>> even
> >> >> >>> for about 1000 examples on 30gb RAM while the entire dataset is 4
> >> >> >>> million
> >> >> >>> examples. And I would also like to note that I'm using the sparse
> >> >> >>> vector
> >> >> >>> class.
> >> >> >>
> >> >> >>
> >> >
> >> >
> >
> >
> 



Re: Running out of memory Naive Bayes

2014-04-27 Thread DB Tsai
Our customer asked us to implement Naive Bayes which should be able to at
least train news20 one year ago, and we implemented for them in Hadoop
using distributed cache to store the model.


Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Sun, Apr 27, 2014 at 11:03 PM, Xiangrui Meng  wrote:

> How big is your problem and how many labels? -Xiangrui
>
> On Sun, Apr 27, 2014 at 10:28 PM, DB Tsai  wrote:
> > Hi Xiangrui,
> >
> > We also run into this issue at Alpine Data Labs. We ended up using LRU
> cache
> > to store the counts, and splitting those least used counts to distributed
> > cache in HDFS.
> >
> >
> > Sincerely,
> >
> > DB Tsai
> > ---
> > My Blog: https://www.dbtsai.com
> > LinkedIn: https://www.linkedin.com/in/dbtsai
> >
> >
> > On Sun, Apr 27, 2014 at 7:34 PM, Xiangrui Meng  wrote:
> >>
> >> Even the features are sparse, the conditional probabilities are stored
> >> in a dense matrix. With 200 labels and 2 million features, you need to
> >> store at least 4e8 doubles on the driver node. With multiple
> >> partitions, you may need more memory on the driver. Could you try
> >> reducing the number of partitions and giving driver more ram and see
> >> whether it can help? -Xiangrui
> >>
> >> On Sun, Apr 27, 2014 at 3:33 PM, John King <
> usedforprinting...@gmail.com>
> >> wrote:
> >> > I'm already using the SparseVector class.
> >> >
> >> > ~200 labels
> >> >
> >> >
> >> > On Sun, Apr 27, 2014 at 12:26 AM, Xiangrui Meng 
> >> > wrote:
> >> >>
> >> >> How many labels does your dataset have? -Xiangrui
> >> >>
> >> >> On Sat, Apr 26, 2014 at 6:03 PM, DB Tsai 
> wrote:
> >> >> > Which version of mllib are you using? For Spark 1.0, mllib will
> >> >> > support sparse feature vector which will improve performance a lot
> >> >> > when computing the distance between points and centroid.
> >> >> >
> >> >> > Sincerely,
> >> >> >
> >> >> > DB Tsai
> >> >> > ---
> >> >> > My Blog: https://www.dbtsai.com
> >> >> > LinkedIn: https://www.linkedin.com/in/dbtsai
> >> >> >
> >> >> >
> >> >> > On Sat, Apr 26, 2014 at 5:49 AM, John King
> >> >> >  wrote:
> >> >> >> I'm just wondering are the SparkVector calculations really taking
> >> >> >> into
> >> >> >> account the sparsity or just converting to dense?
> >> >> >>
> >> >> >>
> >> >> >> On Fri, Apr 25, 2014 at 10:06 PM, John King
> >> >> >> 
> >> >> >> wrote:
> >> >> >>>
> >> >> >>> I've been trying to use the Naive Bayes classifier. Each example
> in
> >> >> >>> the
> >> >> >>> dataset is about 2 million features, only about 20-50 of which
> are
> >> >> >>> non-zero,
> >> >> >>> so the vectors are very sparse. I keep running out of memory
> >> >> >>> though,
> >> >> >>> even
> >> >> >>> for about 1000 examples on 30gb RAM while the entire dataset is 4
> >> >> >>> million
> >> >> >>> examples. And I would also like to note that I'm using the sparse
> >> >> >>> vector
> >> >> >>> class.
> >> >> >>
> >> >> >>
> >> >
> >> >
> >
> >
>


Re: Running out of memory Naive Bayes

2014-04-27 Thread Xiangrui Meng
How big is your problem and how many labels? -Xiangrui

On Sun, Apr 27, 2014 at 10:28 PM, DB Tsai  wrote:
> Hi Xiangrui,
>
> We also run into this issue at Alpine Data Labs. We ended up using LRU cache
> to store the counts, and splitting those least used counts to distributed
> cache in HDFS.
>
>
> Sincerely,
>
> DB Tsai
> ---
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
> On Sun, Apr 27, 2014 at 7:34 PM, Xiangrui Meng  wrote:
>>
>> Even the features are sparse, the conditional probabilities are stored
>> in a dense matrix. With 200 labels and 2 million features, you need to
>> store at least 4e8 doubles on the driver node. With multiple
>> partitions, you may need more memory on the driver. Could you try
>> reducing the number of partitions and giving driver more ram and see
>> whether it can help? -Xiangrui
>>
>> On Sun, Apr 27, 2014 at 3:33 PM, John King 
>> wrote:
>> > I'm already using the SparseVector class.
>> >
>> > ~200 labels
>> >
>> >
>> > On Sun, Apr 27, 2014 at 12:26 AM, Xiangrui Meng 
>> > wrote:
>> >>
>> >> How many labels does your dataset have? -Xiangrui
>> >>
>> >> On Sat, Apr 26, 2014 at 6:03 PM, DB Tsai  wrote:
>> >> > Which version of mllib are you using? For Spark 1.0, mllib will
>> >> > support sparse feature vector which will improve performance a lot
>> >> > when computing the distance between points and centroid.
>> >> >
>> >> > Sincerely,
>> >> >
>> >> > DB Tsai
>> >> > ---
>> >> > My Blog: https://www.dbtsai.com
>> >> > LinkedIn: https://www.linkedin.com/in/dbtsai
>> >> >
>> >> >
>> >> > On Sat, Apr 26, 2014 at 5:49 AM, John King
>> >> >  wrote:
>> >> >> I'm just wondering are the SparkVector calculations really taking
>> >> >> into
>> >> >> account the sparsity or just converting to dense?
>> >> >>
>> >> >>
>> >> >> On Fri, Apr 25, 2014 at 10:06 PM, John King
>> >> >> 
>> >> >> wrote:
>> >> >>>
>> >> >>> I've been trying to use the Naive Bayes classifier. Each example in
>> >> >>> the
>> >> >>> dataset is about 2 million features, only about 20-50 of which are
>> >> >>> non-zero,
>> >> >>> so the vectors are very sparse. I keep running out of memory
>> >> >>> though,
>> >> >>> even
>> >> >>> for about 1000 examples on 30gb RAM while the entire dataset is 4
>> >> >>> million
>> >> >>> examples. And I would also like to note that I'm using the sparse
>> >> >>> vector
>> >> >>> class.
>> >> >>
>> >> >>
>> >
>> >
>
>


Re: Running out of memory Naive Bayes

2014-04-27 Thread DB Tsai
Hi Xiangrui,

We also run into this issue at Alpine Data Labs. We ended up using LRU
cache to store the counts, and splitting those least used counts to
distributed cache in HDFS.


Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Sun, Apr 27, 2014 at 7:34 PM, Xiangrui Meng  wrote:

> Even the features are sparse, the conditional probabilities are stored
> in a dense matrix. With 200 labels and 2 million features, you need to
> store at least 4e8 doubles on the driver node. With multiple
> partitions, you may need more memory on the driver. Could you try
> reducing the number of partitions and giving driver more ram and see
> whether it can help? -Xiangrui
>
> On Sun, Apr 27, 2014 at 3:33 PM, John King 
> wrote:
> > I'm already using the SparseVector class.
> >
> > ~200 labels
> >
> >
> > On Sun, Apr 27, 2014 at 12:26 AM, Xiangrui Meng 
> wrote:
> >>
> >> How many labels does your dataset have? -Xiangrui
> >>
> >> On Sat, Apr 26, 2014 at 6:03 PM, DB Tsai  wrote:
> >> > Which version of mllib are you using? For Spark 1.0, mllib will
> >> > support sparse feature vector which will improve performance a lot
> >> > when computing the distance between points and centroid.
> >> >
> >> > Sincerely,
> >> >
> >> > DB Tsai
> >> > ---
> >> > My Blog: https://www.dbtsai.com
> >> > LinkedIn: https://www.linkedin.com/in/dbtsai
> >> >
> >> >
> >> > On Sat, Apr 26, 2014 at 5:49 AM, John King
> >> >  wrote:
> >> >> I'm just wondering are the SparkVector calculations really taking
> into
> >> >> account the sparsity or just converting to dense?
> >> >>
> >> >>
> >> >> On Fri, Apr 25, 2014 at 10:06 PM, John King
> >> >> 
> >> >> wrote:
> >> >>>
> >> >>> I've been trying to use the Naive Bayes classifier. Each example in
> >> >>> the
> >> >>> dataset is about 2 million features, only about 20-50 of which are
> >> >>> non-zero,
> >> >>> so the vectors are very sparse. I keep running out of memory though,
> >> >>> even
> >> >>> for about 1000 examples on 30gb RAM while the entire dataset is 4
> >> >>> million
> >> >>> examples. And I would also like to note that I'm using the sparse
> >> >>> vector
> >> >>> class.
> >> >>
> >> >>
> >
> >
>


Re: is it okay to reuse objects across RDD's?

2014-04-27 Thread DB Tsai
Hi Todd,

As Patrick and you already pointed out, it's really dangerous to mutate the
status of RDD. However, when we implement the glmnet in Spark, if we can
reuse the residuals for each row in RDD computed from the previous step, it
can speed up 4~5x.

As a result, we add extra column in RDD for book-keeping the residual for
each row, and initialize it as NaN first. When the next iteration step find
that the residual for that row is NaN, it means that either the RDD is
ended up in the disk or the job is failed, so we recompute the residuals
for those rows. It solves the problem of fault tolerance and data splitting
to disk.

It will be nice to have an API that we can do this type of book-keeping
with native support.


Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Sat, Apr 26, 2014 at 11:22 PM, Patrick Wendell wrote:

> Hey Todd,
>
> This approach violates the normal semantics of RDD transformations as you
> point out. I think you pointed out some issues already, and there are
> others. For instance say you cache originalRDD and some of the partitions
> end up in memory and others end up on disk. The ones that end up in memory
> will be mutated in-place when you create trasnformedRDD, the ones that are
> serialized disk won't actually be changed (because there will be a copy
> into memory from the serialized on-disk data). So you could end up where
> originalRDD is partially mutated.
>
> Also, in the case of failures your map might run twice (e.g. run partially
> once, fail, then get re-run and succeed). So if your mutation e.g. relied
> on the current state of the object, it could end up having unexpected
> behavior.
>
> We'll probably never "disallow" this in Spark because we can't really
> control what you do inside of the function. But I'd be careful using this
> approach...
>
> - Patrick
>
>
> On Sat, Apr 26, 2014 at 5:59 AM, Lisonbee, Todd 
> wrote:
>
>> For example,
>>
>> val originalRDD: RDD[SomeCaseClass] = ...
>>
>> // Option 1: objects are copied, setting prop1 in the process
>> val transformedRDD = originalRDD.map( item => item.copy(prop1 =
>> calculation() )
>>
>> // Option 2: objects are re-used and modified
>> val tranformedRDD = originalRDD.map( item => item.prop1 = calculation() )
>>
>> I did a couple of small tests with option 2 and noticed less time was
>> spent in garbage collection.  It didn't add up to much but with a large
>> enough data set it would make a difference.  Also, it seems that less
>> memory would be used.
>>
>> Potential gotchas:
>>
>> - Objects in originalRDD are being modified, so you can't expect them to
>> have not changed
>> - You also can't rely on objects in originalRDD having the new value
>> because originalRDD might be re-caclulated
>> - If originalRDD was a PairRDD, and you modified the keys, it could cause
>> issues
>> - more?
>>
>> Other than the potential gotchas, is there any reason not to reuse
>> objects across RDD's?  Is it a recommended practice for reducing memory
>> usage and garbage collection or not?
>>
>> Is it safe to do this in code you expect to work on future versions of
>> Spark?
>>
>> Thanks in advance,
>>
>> Todd
>>
>
>


NullPointerException when run SparkPI using YARN env

2014-04-27 Thread martin.ou
1.my hadoop 2.3.0
2.SPARK_HADOOP_VERSION=2.3.0 SPARK_YARN=true sbt/sbt assembly
3.SPARK_YARN_MODE=true
SPARK_JAR=$SPARK_HOME/assembly/target/scala-2.10/spark-assembly-0.9.1-hadoop2.3.0.jar
SPARK_YARN_APP_JAR=$SPARK_HOME/examples/target/scala-2.10/spark-examples-assembly-0.9.1.jar
MASTER=yarn-client $SPARK_HOME/bin/spark-shell

java.lang.NullPointerException
at
scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:114)
at
scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:114)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:32)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
org.apache.spark.deploy.yarn.Client$.populateHadoopClasspath(Client.scala:498)
at
org.apache.spark.deploy.yarn.Client$.populateClasspath(Client.scala:519)
at
org.apache.spark.deploy.yarn.Client.setupLaunchEnv(Client.scala:333)
at org.apache.spark.deploy.yarn.Client.runApp(Client.scala:94)
at
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:78)
at
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:125)
at org.apache.spark.SparkContext.(SparkContext.scala:200)
at
org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:959)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NullPointerException-when-run-SparkPI-using-YARN-env-tp4917.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Running out of memory Naive Bayes

2014-04-27 Thread Xiangrui Meng
Even the features are sparse, the conditional probabilities are stored
in a dense matrix. With 200 labels and 2 million features, you need to
store at least 4e8 doubles on the driver node. With multiple
partitions, you may need more memory on the driver. Could you try
reducing the number of partitions and giving driver more ram and see
whether it can help? -Xiangrui

On Sun, Apr 27, 2014 at 3:33 PM, John King  wrote:
> I'm already using the SparseVector class.
>
> ~200 labels
>
>
> On Sun, Apr 27, 2014 at 12:26 AM, Xiangrui Meng  wrote:
>>
>> How many labels does your dataset have? -Xiangrui
>>
>> On Sat, Apr 26, 2014 at 6:03 PM, DB Tsai  wrote:
>> > Which version of mllib are you using? For Spark 1.0, mllib will
>> > support sparse feature vector which will improve performance a lot
>> > when computing the distance between points and centroid.
>> >
>> > Sincerely,
>> >
>> > DB Tsai
>> > ---
>> > My Blog: https://www.dbtsai.com
>> > LinkedIn: https://www.linkedin.com/in/dbtsai
>> >
>> >
>> > On Sat, Apr 26, 2014 at 5:49 AM, John King
>> >  wrote:
>> >> I'm just wondering are the SparkVector calculations really taking into
>> >> account the sparsity or just converting to dense?
>> >>
>> >>
>> >> On Fri, Apr 25, 2014 at 10:06 PM, John King
>> >> 
>> >> wrote:
>> >>>
>> >>> I've been trying to use the Naive Bayes classifier. Each example in
>> >>> the
>> >>> dataset is about 2 million features, only about 20-50 of which are
>> >>> non-zero,
>> >>> so the vectors are very sparse. I keep running out of memory though,
>> >>> even
>> >>> for about 1000 examples on 30gb RAM while the entire dataset is 4
>> >>> million
>> >>> examples. And I would also like to note that I'm using the sparse
>> >>> vector
>> >>> class.
>> >>
>> >>
>
>


spark running examples error

2014-04-27 Thread Joe L

I applied this 
./bin/run-example org.apache.spark.examples.SparkPi spark://MASTERIP:7077


but I am getting the following error it seems master is not connecting to
the slave nodes.
 

Any suggestion?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-running-examples-error-tp4915.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: parallelize for a large Seq is extreamly slow.

2014-04-27 Thread Earthson
It's my fault! I upload a wrong jar when I changed the number of partitions.
and Now it just works fine:)

The size of word_mapping is 2444185.

So it will take very long time for large object serialization? I don't think
two million is very large, because the cost at local for such size is
typically less than one second. 

Thanks for the help:)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4914.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Strange lookup behavior. Possible bug?

2014-04-27 Thread Yadid Ayzenberg

Can someone please suggest how I can move forward with this?
My spark version is 0.9.1.
The big challenge is that this issue is not recreated when running in 
local mode. What could be the difference?


I would really appreciate any pointers, as currently the the job just hangs.



On 4/25/14, 7:37 PM, Yadid Ayzenberg wrote:

Some additional information - maybe this rings a bell with someone:

I suspect this happens when the lookup returns more than one value.
For 0 and 1 values, the function behaves as you would expect.

Anyone ?



On 4/25/14, 1:55 PM, Yadid Ayzenberg wrote:

Hi All,

Im running a lookup on a JavaPairRDD.
When running on local machine - the lookup is successfull. However, 
when running a standalone cluster with the exact same dataset - one 
of the tasks never ends (constantly in RUNNING status).
When viewing the worker log, it seems that the task has finished 
successfully:


14/04/25 13:40:38 INFO BlockManager: Found block rdd_2_0 locally
14/04/25 13:40:38 INFO Executor: Serialized size of result for 2 is 
10896794

14/04/25 13:40:38 INFO Executor: Sending result for 2 directly to driver
14/04/25 13:40:38 INFO Executor: Finished task ID 2

But it seems the driver is not aware of this, and hangs indefinitely.

If I execute a count priot to the lookup - I get the correct number 
which suggests that the cluster is operating as expected.


The exact same scenario works with a different type of key (Tuple2): 
JavaPairRDD.


Any ideas on how to debug this problem ?

Thanks,

Yadid







Re: questions about debugging a spark application

2014-04-27 Thread wxhsdp
or should i run my app on spark shell by using addJars ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/questions-about-debugging-a-spark-application-tp4891p4911.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: questions about debugging a spark application

2014-04-27 Thread wxhsdp
or should i run my app in spark shell by using addJars



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/questions-about-debugging-a-spark-application-tp4891p4910.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Any advice for using big spark.cleaner.delay value in Spark Streaming?

2014-04-27 Thread Tathagata Das
Hello,

If you want to do aggregations like count that spans across days, weeks or
months, AND do not want the result in real-time, then Spark Streaming
probably not the best thing to use. You probably should store all the data
in a data store (HDFS file or database) and then use Spark job / SQL
queries to do the counting. Spark Streaming is most useful when you want
the processing results based on incoming data streams within seconds of
receiving the data. In case, you want to do aggregations across a day's
data and do it in real time and continuously (e.g. every 5 second, count
records received in last 1 day), then you probably have to do something a
little bit smarter - have per-10-minute / per-hour counts, which gets
continuously together with the latest partial-hour counts.

And regarding the cleaner setting, it should be according to the
computation. If you are using window operations that uses data in the last
30 minutes, then the cleaner TTL should be more than 30 minutes. The
default of one hour should work fine, unless you need to use data that is
more than an hour old. If that is indeed necessary, consider using
(almost-to-be-released) Spark 1.0. That eliminates the requirement of
setting cleaner TTL for Spark Streaming, because Spark core has been made
smart enough to do GC based clean up of unused RDDs and shuffle files.

Regarding the second part, I am not sure what you meant by " Spark
Streaming creates new DStreams for each interval".  Spark Streaming creates
RDDs in each interval. And if you want to count all records received by
Spark Streaming over time you can do something like this.


// variable in the driver
var numRecordsReceivedTillNow: Long = 0


yourDStream.foreachRDD { rdd =>
  val numRecordsInBatch = rdd.count
  // increment the counter in the driver with the count in each batch /
each RDD
  numRecordsReceivedTillNow += numRecordsInBatch
}

Hope this helps!

TD



On Sun, Apr 27, 2014 at 5:40 AM, buremba  wrote:

> It seems default value for spark.cleaner.delay is 3600 seconds but I need
> to
> be able to count things on daily, weekly or even monthly based.
>
> I suppose the aim of DStream batches and spark.cleaner.delay is to avoid
> space issues (running out of memory etc.). I usually use HyperLogLog for
> counting unique things to save space, and AFAIK, the other metrics are
> simply long values which doesn't require much space.
>
> When I start learning Spark Streaming it really confused me because in my
> first "Hello World" example all I wanted is to count all events processed
> by
> Spark Streaming. DStream batches are nice but when I need simple counting
> operations it becomes complex. Since Spark Streaming creates new DStreams
> for each interval, I needed to merge them in a single DStream so I used
> updateStateByKey() to generate a StateDStream. I seems it works now but I'm
> not sure whether it's efficient or not because I all need is a single
> global
> counter but now Spark has counters for all 2 seconds intervals plus a
> global
> counter for StateDStream.
>
> I don't have any specific purpose like "Show me this type of unique things
> for last 10 minutes", instead I need to be able to count things in a large
> scale; it can be both 10 minutes or 1 month. I create pre-aggregation rules
> on the fly and when I need simple monthly based counter, Spark seems
> overkill to me for now.
>
> Do you have any advice for me to use efficiently using Spark Streaming?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Any-advice-for-using-big-spark-cleaner-delay-value-in-Spark-Streaming-tp4895.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: parallelize for a large Seq is extreamly slow.

2014-04-27 Thread Matei Zaharia
How many values are in that sequence? I.e. what is its size?

You can also profile your program while it’s running to see where it’s spending 
time. The easiest way is to get a single stack trace with jstack . 
Maybe some of the serialization methods for this data are super inefficient, or 
toSeq on a map is inefficient. You could try word_mapping.value.toArray. I’m 
also wondering if something earlier in the program is slow and this is just not 
obvious from the output.

Matei

On Apr 27, 2014, at 9:47 AM, Earthson  wrote:

> That's not work. I don't think it is just slow, It never ends(with 30+ hours,
> and I killed it). 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4900.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Running a spark-submit compatible app in spark-shell

2014-04-27 Thread Matei Zaharia
Hi Roger,

You should be able to use the --jars argument of spark-shell to add JARs onto 
the classpath and then work with those classes in the shell. (A recent patch, 
https://github.com/apache/spark/pull/542, made spark-shell use the same 
command-line arguments as spark-submit). But this is a great question, we 
should test it out and see whether anything else would make development easier.

SBT also has an interactive shell where you can run classes in your project, 
but unfortunately Spark can’t deal with closures typed directly in that the 
right way. However you write your Spark logic in a method and just call that 
method from the SBT shell, that should work.

Matei

On Apr 27, 2014, at 3:14 PM, Roger Hoover  wrote:

> Hi,
> 
> From the meetup talk about the 1.0 release, I saw that spark-submit will be 
> the preferred way to launch apps going forward.
> 
> How do you recommend launching such jobs in a development cycle?  For 
> example, how can I load an app that's expecting to a given to spark-submit 
> into spark-shell?
> 
> Also, can anyone recommend other tricks for rapid development?  I'm new to 
> Scala, sbt, etc.  I think sbt can watch for changes in source files and 
> compile them automatically.
> 
> I want to be able to make code changes and quickly get into a spark-shell to 
> play around with them.
> 
> I appreciate any advice.  Thanks,
> 
> Roger



Re: Running out of memory Naive Bayes

2014-04-27 Thread John King
I'm already using the SparseVector class.

~200 labels


On Sun, Apr 27, 2014 at 12:26 AM, Xiangrui Meng  wrote:

> How many labels does your dataset have? -Xiangrui
>
> On Sat, Apr 26, 2014 at 6:03 PM, DB Tsai  wrote:
> > Which version of mllib are you using? For Spark 1.0, mllib will
> > support sparse feature vector which will improve performance a lot
> > when computing the distance between points and centroid.
> >
> > Sincerely,
> >
> > DB Tsai
> > ---
> > My Blog: https://www.dbtsai.com
> > LinkedIn: https://www.linkedin.com/in/dbtsai
> >
> >
> > On Sat, Apr 26, 2014 at 5:49 AM, John King 
> wrote:
> >> I'm just wondering are the SparkVector calculations really taking into
> >> account the sparsity or just converting to dense?
> >>
> >>
> >> On Fri, Apr 25, 2014 at 10:06 PM, John King <
> usedforprinting...@gmail.com>
> >> wrote:
> >>>
> >>> I've been trying to use the Naive Bayes classifier. Each example in the
> >>> dataset is about 2 million features, only about 20-50 of which are
> non-zero,
> >>> so the vectors are very sparse. I keep running out of memory though,
> even
> >>> for about 1000 examples on 30gb RAM while the entire dataset is 4
> million
> >>> examples. And I would also like to note that I'm using the sparse
> vector
> >>> class.
> >>
> >>
>


Running a spark-submit compatible app in spark-shell

2014-04-27 Thread Roger Hoover
Hi,

>From the meetup talk about the 1.0 release, I saw that spark-submit will be
the preferred way to launch apps going forward.

How do you recommend launching such jobs in a development cycle?  For
example, how can I load an app that's expecting to a given to spark-submit
into spark-shell?

Also, can anyone recommend other tricks for rapid development?  I'm new to
Scala, sbt, etc.  I think sbt can watch for changes in source files and
compile them automatically.

I want to be able to make code changes and quickly get into a spark-shell
to play around with them.

I appreciate any advice.  Thanks,

Roger


Re: Spark on Yarn or Mesos?

2014-04-27 Thread Andrew Ash
Much thanks for the perspective Matei.


On Sun, Apr 27, 2014 at 10:51 PM, Matei Zaharia wrote:

> From my point of view, both are supported equally. The YARN support is
> newer and that’s why there’s been a lot more action there in recent months.
>
> Matei
>
> On Apr 27, 2014, at 12:08 PM, Andrew Ash  wrote:
>
> That thread was mostly about benchmarking YARN vs standalone, and the
> results are what I'd expect -- spinning up a Spark cluster on demand
> through YARN has higher startup latency than using a standalone cluster,
> where the JVMs are already initialized and ready.
>
> Given that there's a lot more commit activity around YARN as compared to
> Mesos, does that mean that YARN integration is just earlier in the maturity
> curve, or does it mean that YARN is the future and Mesos is in
> maintenance-only mode?
>
> That may be more a question for the Databricks team though: will YARN and
> Mesos be supported equally, or will one become the preferred method of
> doing cluster management under Spark?
>
> Andrew
>
>
> On Thu, Apr 17, 2014 at 1:27 PM, Arpit Tak 
> wrote:
>
>> Hi Wel,
>>
>> Take a look at this post...
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Job-initialization-performance-of-Spark-standalone-mode-vs-YARN-td2016.html
>>
>> Regards,
>> Arpit Tak
>>
>>
>> On Thu, Apr 17, 2014 at 3:42 PM, Wei Wang  wrote:
>>
>>> Hi, there
>>>
>>> I would like to know is there any differences between Spark on Yarn and
>>> Spark on Mesos. Is there any comparision between them? What are the
>>> advantages and disadvantages for each of them. Is there any criterion for
>>> choosing between Yarn and Mesos?
>>>
>>> BTW, we need MPI in our framework, and I saw MPICH2 is included in
>>> Mesos. Should it be the reason for choosing Mesos?
>>>
>>> Thanks a lot!
>>>
>>>
>>> Weida
>>>
>>
>>
>
>


Re: Spark on Yarn or Mesos?

2014-04-27 Thread Matei Zaharia
From my point of view, both are supported equally. The YARN support is newer 
and that’s why there’s been a lot more action there in recent months.

Matei

On Apr 27, 2014, at 12:08 PM, Andrew Ash  wrote:

> That thread was mostly about benchmarking YARN vs standalone, and the results 
> are what I'd expect -- spinning up a Spark cluster on demand through YARN has 
> higher startup latency than using a standalone cluster, where the JVMs are 
> already initialized and ready.
> 
> Given that there's a lot more commit activity around YARN as compared to 
> Mesos, does that mean that YARN integration is just earlier in the maturity 
> curve, or does it mean that YARN is the future and Mesos is in 
> maintenance-only mode?
> 
> That may be more a question for the Databricks team though: will YARN and 
> Mesos be supported equally, or will one become the preferred method of doing 
> cluster management under Spark?
> 
> Andrew
> 
> 
> On Thu, Apr 17, 2014 at 1:27 PM, Arpit Tak  
> wrote:
> Hi Wel,
> 
> Take a look at this post...
> http://apache-spark-user-list.1001560.n3.nabble.com/Job-initialization-performance-of-Spark-standalone-mode-vs-YARN-td2016.html
> 
> Regards,
> Arpit Tak
> 
> 
> On Thu, Apr 17, 2014 at 3:42 PM, Wei Wang  wrote:
> Hi, there
> 
> I would like to know is there any differences between Spark on Yarn and Spark 
> on Mesos. Is there any comparision between them? What are the advantages and 
> disadvantages for each of them. Is there any criterion for choosing between 
> Yarn and Mesos? 
> 
> BTW, we need MPI in our framework, and I saw MPICH2 is included in Mesos. 
> Should it be the reason for choosing Mesos? 
> 
> Thanks a lot!
> 
> 
> Weida
> 
> 



Re: Spark on Yarn or Mesos?

2014-04-27 Thread Andrew Ash
That thread was mostly about benchmarking YARN vs standalone, and the
results are what I'd expect -- spinning up a Spark cluster on demand
through YARN has higher startup latency than using a standalone cluster,
where the JVMs are already initialized and ready.

Given that there's a lot more commit activity around YARN as compared to
Mesos, does that mean that YARN integration is just earlier in the maturity
curve, or does it mean that YARN is the future and Mesos is in
maintenance-only mode?

That may be more a question for the Databricks team though: will YARN and
Mesos be supported equally, or will one become the preferred method of
doing cluster management under Spark?

Andrew


On Thu, Apr 17, 2014 at 1:27 PM, Arpit Tak wrote:

> Hi Wel,
>
> Take a look at this post...
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Job-initialization-performance-of-Spark-standalone-mode-vs-YARN-td2016.html
>
> Regards,
> Arpit Tak
>
>
> On Thu, Apr 17, 2014 at 3:42 PM, Wei Wang  wrote:
>
>> Hi, there
>>
>> I would like to know is there any differences between Spark on Yarn and
>> Spark on Mesos. Is there any comparision between them? What are the
>> advantages and disadvantages for each of them. Is there any criterion for
>> choosing between Yarn and Mesos?
>>
>> BTW, we need MPI in our framework, and I saw MPICH2 is included in Mesos.
>> Should it be the reason for choosing Mesos?
>>
>> Thanks a lot!
>>
>>
>> Weida
>>
>
>


help

2014-04-27 Thread Joe L
I am getting this error, please help me to fix it

4/04/28 02:16:20 INFO SparkDeploySchedulerBackend: Executor
app-20140428021620-0007/10 removed: class java.io.IOException: Cannot run
program "/home/exobrain/install/spark-0.9.1/bin/compute-classpath.sh" (in
directory "."): error=13,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/help-tp4901.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: parallelize for a large Seq is extreamly slow.

2014-04-27 Thread Earthson
That's not work. I don't think it is just slow, It never ends(with 30+ hours,
and I killed it). 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/parallelize-for-a-large-Seq-is-extreamly-slow-tp4801p4900.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Re: Problem with the Item-Based Collaborative Filtering Recommendation Algorithms in spark

2014-04-27 Thread Qin Wei






Thanks a lot for your reply, it gave me much inspiration.


qinwei
 From: Sean Owen-2 [via Apache Spark User List]Date: 2014-04-25 14:11To: Qin 
WeiSubject: Re: Problem with the Item-Based Collaborative Filtering 
Recommendation Algorithms in spark

So you are computing all-pairs similarity over 20M users?

This going to take about 200 trillion similarity computations, no?

I don't think there's any way to make that fundamentally fast.


I see you're copying the data set to all workers, which helps make it

faster at the expense of memory consumption.


If you really want to do this and can tolerate some approximation, I

think you want to do some kind of location sensitive hashing to bucket

the vectors and then evaluate similarity to only the other items in

the bucket.



On Fri, Apr 25, 2014 at 5:55 AM, Qin Wei <[hidden email]> wrote:

> Hi All,

>

> I have a problem with the Item-Based Collaborative Filtering Recommendation

> Algorithms in spark.

> The basic flow is as below:

>                                             (Item1    ,  (User1     ,

> Score1))

>        RDD1     ==>                    (Item2    ,  (User2     ,   Score2))

>                                             (Item1    ,  (User2     ,

> Score3))

>                                             (Item2    ,  (User1     ,

> Score4))

>

>        RDD1.groupByKey   ==>  RDD2

>                                             (Item1,      ((User1,   Score1),

> (User2,   Score3)))

>                                             (Item2,      ((User1,   Score4),

> (User2,   Score2)))

>

> The similarity of Vector  ((User1,   Score1),   (User2,   Score3)) and

> ((User1,   Score4),   (User2,   Score2)) is the similarity of Item1 and

> Item2.

>

> In my situation, RDD2 contains 20 million records, my spark programm is

> extreamly slow, the source code is as below:

>                                 val conf = new

> SparkConf().setMaster("spark://211.151.121.184:7077").setAppName("Score

> Calcu Total").set("spark.executor.memory",

> "20g").setJars(Seq("/home/deployer/score-calcu-assembly-1.0.jar"))

>                                 val sc = new SparkContext(conf)

>

>                                 val mongoRDD = sc.textFile(args(0).toString,

> 400)

>                                 val jsonRDD = mongoRDD.map(arg => new

> JSONObject(arg))

>

>                                 val newRDD = jsonRDD.map(arg => {

>                                 var score =

> haha(arg.get("a").asInstanceOf[JSONObject])

>

>                                 // set score to 0.5 for testing

>                                 arg.put("score", 0.5)

>                                 arg

>                                 })

>

>                                 val resourceScoresRDD = newRDD.map(arg =>

> (arg.get("rid").toString.toLong, (arg.get("zid").toString,

> arg.get("score").asInstanceOf[Number].doubleValue))).groupByKey().cache()

>                                 val resourceScores =

> resourceScoresRDD.collect()

>                                 val bcResourceScores =

> sc.broadcast(resourceScores)

>

>                                 val simRDD =

> resourceScoresRDD.mapPartitions({iter =>

>                                 val m = bcResourceScores.value

>                                 for{ (r1, v1) <- iter

>                                        (r2, v2) <- m

>                                        if r1 > r2

>                                     } yield (r1, r2, cosSimilarity(v1,

> v2))}, true).filter(arg => arg._3 > 0.1)

>

>                                 println(simRDD.count)

>

> And I saw this in Spark Web UI:

> 

> 

>

> My standalone cluster has 3 worker node (16 core and 32G RAM),and the

> workload of the machine in my cluster is heavy when the spark program is

> running.

>

> Is there any better way to do the algorithm?

>

> Thanks!

>

>

>

> --

> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-the-Item-Based-Collaborative-Filtering-Recommendation-Algorithms-in-spark-tp4808.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.












If you reply to this email, your message will be added to the 
discussion below:

http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-the-Item-Based-Collaborative-Filtering-Recommendation-Algorithms-in-spark-tp4808p4815.html



To unsubscribe from Problem with the Item-Based Collaborative 
Filtering Recommendation Algorithms in spark, click here.

NAML





--
View this message in context: 
http://apache-spa

Re: Re: Problem with the Item-Based Collaborative Filtering Recommendation Algorithms in spark

2014-04-27 Thread Qin Wei






Thanks a lot for your reply, it gave me much inspiration.


qinwei
 From: Sean Owen-2 [via Apache Spark User List]Date: 2014-04-25 14:11To: Qin 
WeiSubject: Re: Problem with the Item-Based Collaborative Filtering 
Recommendation Algorithms in spark

So you are computing all-pairs similarity over 20M users?

This going to take about 200 trillion similarity computations, no?

I don't think there's any way to make that fundamentally fast.


I see you're copying the data set to all workers, which helps make it

faster at the expense of memory consumption.


If you really want to do this and can tolerate some approximation, I

think you want to do some kind of location sensitive hashing to bucket

the vectors and then evaluate similarity to only the other items in

the bucket.



On Fri, Apr 25, 2014 at 5:55 AM, Qin Wei <[hidden email]> wrote:

> Hi All,

>

> I have a problem with the Item-Based Collaborative Filtering Recommendation

> Algorithms in spark.

> The basic flow is as below:

>                                             (Item1    ,  (User1     ,

> Score1))

>        RDD1     ==>                    (Item2    ,  (User2     ,   Score2))

>                                             (Item1    ,  (User2     ,

> Score3))

>                                             (Item2    ,  (User1     ,

> Score4))

>

>        RDD1.groupByKey   ==>  RDD2

>                                             (Item1,      ((User1,   Score1),

> (User2,   Score3)))

>                                             (Item2,      ((User1,   Score4),

> (User2,   Score2)))

>

> The similarity of Vector  ((User1,   Score1),   (User2,   Score3)) and

> ((User1,   Score4),   (User2,   Score2)) is the similarity of Item1 and

> Item2.

>

> In my situation, RDD2 contains 20 million records, my spark programm is

> extreamly slow, the source code is as below:

>                                 val conf = new

> SparkConf().setMaster("spark://211.151.121.184:7077").setAppName("Score

> Calcu Total").set("spark.executor.memory",

> "20g").setJars(Seq("/home/deployer/score-calcu-assembly-1.0.jar"))

>                                 val sc = new SparkContext(conf)

>

>                                 val mongoRDD = sc.textFile(args(0).toString,

> 400)

>                                 val jsonRDD = mongoRDD.map(arg => new

> JSONObject(arg))

>

>                                 val newRDD = jsonRDD.map(arg => {

>                                 var score =

> haha(arg.get("a").asInstanceOf[JSONObject])

>

>                                 // set score to 0.5 for testing

>                                 arg.put("score", 0.5)

>                                 arg

>                                 })

>

>                                 val resourceScoresRDD = newRDD.map(arg =>

> (arg.get("rid").toString.toLong, (arg.get("zid").toString,

> arg.get("score").asInstanceOf[Number].doubleValue))).groupByKey().cache()

>                                 val resourceScores =

> resourceScoresRDD.collect()

>                                 val bcResourceScores =

> sc.broadcast(resourceScores)

>

>                                 val simRDD =

> resourceScoresRDD.mapPartitions({iter =>

>                                 val m = bcResourceScores.value

>                                 for{ (r1, v1) <- iter

>                                        (r2, v2) <- m

>                                        if r1 > r2

>                                     } yield (r1, r2, cosSimilarity(v1,

> v2))}, true).filter(arg => arg._3 > 0.1)

>

>                                 println(simRDD.count)

>

> And I saw this in Spark Web UI:

> 

> 

>

> My standalone cluster has 3 worker node (16 core and 32G RAM),and the

> workload of the machine in my cluster is heavy when the spark program is

> running.

>

> Is there any better way to do the algorithm?

>

> Thanks!

>

>

>

> --

> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-the-Item-Based-Collaborative-Filtering-Recommendation-Algorithms-in-spark-tp4808.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.












If you reply to this email, your message will be added to the 
discussion below:

http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-the-Item-Based-Collaborative-Filtering-Recommendation-Algorithms-in-spark-tp4808p4815.html



To unsubscribe from Problem with the Item-Based Collaborative 
Filtering Recommendation Algorithms in spark, click here.

NAML





--
View this message in context: 
http://apache-spa

Re: Re: Problem with the Item-Based Collaborative Filtering Recommendation Algorithms in spark

2014-04-27 Thread qinwei






Thanks a lot for your reply, it gave me much inspiration.


qinwei
 From: Sean OwenDate: 2014-04-25 14:10To: userSubject: Re: Problem with the 
Item-Based Collaborative Filtering Recommendation Algorithms in sparkSo you are 
computing all-pairs similarity over 20M users?
This going to take about 200 trillion similarity computations, no?
I don't think there's any way to make that fundamentally fast.
 
I see you're copying the data set to all workers, which helps make it
faster at the expense of memory consumption.
 
If you really want to do this and can tolerate some approximation, I
think you want to do some kind of location sensitive hashing to bucket
the vectors and then evaluate similarity to only the other items in
the bucket.
 
 
On Fri, Apr 25, 2014 at 5:55 AM, Qin Wei  wrote:
> Hi All,
>
> I have a problem with the Item-Based Collaborative Filtering Recommendation
> Algorithms in spark.
> The basic flow is as below:
> (Item1    ,  (User1 ,
> Score1))
>    RDD1 ==>    (Item2    ,  (User2 ,   Score2))
> (Item1    ,  (User2 ,
> Score3))
> (Item2    ,  (User1 ,
> Score4))
>
>    RDD1.groupByKey   ==>  RDD2
> (Item1,  ((User1,   Score1),
> (User2,   Score3)))
> (Item2,  ((User1,   Score4),
> (User2,   Score2)))
>
> The similarity of Vector  ((User1,   Score1),   (User2,   Score3)) and
> ((User1,   Score4),   (User2,   Score2)) is the similarity of Item1 and
> Item2.
>
> In my situation, RDD2 contains 20 million records, my spark programm is
> extreamly slow, the source code is as below:
> val conf = new
> SparkConf().setMaster("spark://211.151.121.184:7077").setAppName("Score
> Calcu Total").set("spark.executor.memory",
> "20g").setJars(Seq("/home/deployer/score-calcu-assembly-1.0.jar"))
> val sc = new SparkContext(conf)
>
> val mongoRDD = sc.textFile(args(0).toString,
> 400)
> val jsonRDD = mongoRDD.map(arg => new
> JSONObject(arg))
>
> val newRDD = jsonRDD.map(arg => {
> var score =
> haha(arg.get("a").asInstanceOf[JSONObject])
>
> // set score to 0.5 for testing
> arg.put("score", 0.5)
> arg
> })
>
> val resourceScoresRDD = newRDD.map(arg =>
> (arg.get("rid").toString.toLong, (arg.get("zid").toString,
> arg.get("score").asInstanceOf[Number].doubleValue))).groupByKey().cache()
> val resourceScores =
> resourceScoresRDD.collect()
> val bcResourceScores =
> sc.broadcast(resourceScores)
>
> val simRDD =
> resourceScoresRDD.mapPartitions({iter =>
> val m = bcResourceScores.value
> for{ (r1, v1) <- iter
>    (r2, v2) <- m
>    if r1 > r2
> } yield (r1, r2, cosSimilarity(v1,
> v2))}, true).filter(arg => arg._3 > 0.1)
>
> println(simRDD.count)
>
> And I saw this in Spark Web UI:
> 
> 
>
> My standalone cluster has 3 worker node (16 core and 32G RAM),and the
> workload of the machine in my cluster is heavy when the spark program is
> running.
>
> Is there any better way to do the algorithm?
>
> Thanks!
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-the-Item-Based-Collaborative-Filtering-Recommendation-Algorithms-in-spark-tp4808.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Re: what is the best way to do cartesian

2014-04-27 Thread qinwei






Thanks a lot for your reply, but i have tried the  built-in RDD.cartesian() 
method before, it didn't make it faster.


qinwei
 From: Alex BoisvertDate: 2014-04-26 00:32To: userSubject: Re: what is the best 
way to do cartesianYou might want to try the built-in RDD.cartesian() method.


On Thu, Apr 24, 2014 at 9:05 PM, Qin Wei  wrote:

Hi All,



I have a problem with the Item-Based Collaborative Filtering Recommendation

Algorithms in spark.

The basic flow is as below:

                                            (Item1    ,  (User1     ,

Score1))

       RDD1     ==>                    (Item2    ,  (User2     ,   Score2))

                                            (Item1    ,  (User2     ,

Score3))

                                            (Item2    ,  (User1     ,

Score4))



       RDD1.groupByKey   ==>  RDD2

                                            (Item1,      ((User1,   Score1),

(User2,   Score3)))

                                            (Item2,      ((User1,   Score4),

(User2,   Score2)))



The similarity of Vector  ((User1,   Score1),   (User2,   Score3)) and

((User1,   Score4),   (User2,   Score2)) is the similarity of Item1 and

Item2.



In my situation, RDD2 contains 20 million records, my spark programm is

extreamly slow, the source code is as below:

                                val conf = new

SparkConf().setMaster("spark://211.151.121.184:7077").setAppName("Score

Calcu Total").set("spark.executor.memory",

"20g").setJars(Seq("/home/deployer/score-calcu-assembly-1.0.jar"))

                                val sc = new SparkContext(conf)



                                val mongoRDD = sc.textFile(args(0).toString,

400)

                                val jsonRDD = mongoRDD.map(arg => new

JSONObject(arg))



                                val newRDD = jsonRDD.map(arg => {

                                var score =

haha(arg.get("a").asInstanceOf[JSONObject])



                                // set score to 0.5 for testing

                                arg.put("score", 0.5)

                                arg

                                })



                                val resourceScoresRDD = newRDD.map(arg =>

(arg.get("rid").toString.toLong, (arg.get("zid").toString,

arg.get("score").asInstanceOf[Number].doubleValue))).groupByKey().cache()

                                val resourceScores =

resourceScoresRDD.collect()

                                val bcResourceScores =

sc.broadcast(resourceScores)



                                val simRDD =

resourceScoresRDD.mapPartitions({iter =>

                                val m = bcResourceScores.value

                                for{ (r1, v1) <- iter

                                       (r2, v2) <- m

                                       if r1 > r2

                                    } yield (r1, r2, cosSimilarity(v1,

v2))}, true).filter(arg => arg._3 > 0.1)



                                println(simRDD.count)



And I saw this in Spark Web UI:









My standalone cluster has 3 worker node (16 core and 32G RAM),and the

workload of the machine in my cluster is heavy when the spark program is

running.



Is there any better way to do the algorithm?



Thanks!







--

View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-do-cartesian-tp4807.html


Sent from the Apache Spark User List mailing list archive at Nabble.com.






Any advice for using big spark.cleaner.delay value in Spark Streaming?

2014-04-27 Thread buremba
It seems default value for spark.cleaner.delay is 3600 seconds but I need to
be able to count things on daily, weekly or even monthly based.

I suppose the aim of DStream batches and spark.cleaner.delay is to avoid
space issues (running out of memory etc.). I usually use HyperLogLog for
counting unique things to save space, and AFAIK, the other metrics are
simply long values which doesn't require much space.

When I start learning Spark Streaming it really confused me because in my
first "Hello World" example all I wanted is to count all events processed by
Spark Streaming. DStream batches are nice but when I need simple counting
operations it becomes complex. Since Spark Streaming creates new DStreams
for each interval, I needed to merge them in a single DStream so I used
updateStateByKey() to generate a StateDStream. I seems it works now but I'm
not sure whether it's efficient or not because I all need is a single global
counter but now Spark has counters for all 2 seconds intervals plus a global
counter for StateDStream.

I don't have any specific purpose like "Show me this type of unique things
for last 10 minutes", instead I need to be able to count things in a large
scale; it can be both 10 minutes or 1 month. I create pre-aggregation rules
on the fly and when I need simple monthly based counter, Spark seems
overkill to me for now.

Do you have any advice for me to use efficiently using Spark Streaming?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Any-advice-for-using-big-spark-cleaner-delay-value-in-Spark-Streaming-tp4895.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.