Re: Selecting the top 100 records per group by?

2016-09-10 Thread Karl Higley
Would `topByKey` help?

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctions.scala#L42

Best,
Karl

On Sat, Sep 10, 2016 at 9:04 PM Kevin Burton  wrote:

> I'm trying to figure out a way to group by and return the top 100 records
> in that group.
>
> Something like:
>
> SELECT TOP(100, user_id) FROM posts GROUP BY user_id;
>
> But I can't really figure out the best way to do this...
>
> There is a FIRST and LAST aggregate function but this only returns one
> column.
>
> I could do something like:
>
> SELECT * FROM posts WHERE user_id IN ( /* select top users here */ ) LIMIT
> 100;
>
> But that limit is applied for ALL the records. Not each individual user.
>
> The only other thing I can think of is to do a manual map reduce and then
> have the reducer only return the top 100 each time...
>
> Would LOVE some advice here...
>
> --
>
> We’re hiring if you know of any awesome Java Devops or Linux Operations
> Engineers!
>
> Founder/CEO Spinn3r.com
> Location: *San Francisco, CA*
> blog: http://burtonator.wordpress.com
> … or check out my Google+ profile
> 
>
>


Re: Locality sensitive hashing

2016-07-24 Thread Karl Higley
Hi Janardhan,

I collected some LSH papers while working on an RDD-based implementation.
Links at the end of the README here:
https://github.com/karlhigley/spark-neighbors

Keep me posted on what you come up with!

Best,
Karl

On Sun, Jul 24, 2016 at 9:54 AM janardhan shetty 
wrote:

> I was looking through to implement locality sensitive hashing in
> dataframes.
> Any pointers for reference?
>


Re: How to recommend most similar users using Spark ML

2016-07-17 Thread Karl Higley
There are also some Spark packages for finding approximate nearest
neighbors using locality sensitive hashing:
https://spark-packages.org/?q=tags%3Alsh

On Fri, Jul 15, 2016 at 7:45 AM nguyen duc Tuan 
wrote:

> Hi jeremycod,
> If you want to find top N nearest neighbors for all users using exact
> top-k algorithm for all users, I recommend using the same approach as  as
> used in Mllib :
> https://github.com/apache/spark/blob/85d6b0db9f5bd425c36482ffcb1c3b9fd0fcdb31/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala#L272
>
> If the number of users is large, the exact topk algorithm can rather slow,
> try using approximate nearest neighbors algorithm. There's is a good
> benchmark of various libraries that can be found here:
> https://github.com/erikbern/ann-benchmarks
>
> 2016-07-15 10:36 GMT+07:00 jeremycod :
>
>> Hi,
>>
>> I need to develop a service that will recommend user with other similar
>> users that he can connect to. For each user I have a data about user
>> preferences for specific items in the form:
>>
>> user, item, preference
>> 1,75,   0.89
>> 2,168,  0.478
>> 2,99,   0.321
>> 3,31,   0.012
>>
>> So far, I implemented approach using cosine similarity that compare one
>> user
>> features vector with other users:
>>
>> def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double=
>> {
>> vec1.dot(vec2)/(vec1.norm2()*vec2.norm2())
>> }
>> def user2usersimilarity(userid:Integer, recNumber:Integer): Unit ={
>> val userFactor=model.userFeatures.lookup(userid).head
>> val userVector=new DoubleMatrix(userFactor)
>> val s1=cosineSimilarity(userVector,userVector)
>> val sims=model.userFeatures.map{case(id,factor)=>
>> val factorVector=new DoubleMatrix(factor)
>> val sim=cosineSimilarity(factorVector, userVector)
>> (id,sim)
>> }
>> val sortedSims=sims.top(recNumber+1)(Ordering.by[(Int, Double),Double]
>> {case(id, similarity)=>similarity})
>> println(sortedSims.slice(1,recNumber+1).mkString("\n"))
>>  }
>>
>> This approach works fine with the MovieLens dataset in terms of quality of
>> recommendations. However, my concern is related to performance of such
>> algorithm. Since I have to generate recommendations for all users in the
>> system, with this approach I would compare each user with all other users
>> in
>> the system.
>>
>> I would appreciate if somebody could suggest how to limit comparison of
>> the
>> user to top N neighbors, or some other algorithm that would work better in
>> my use case.
>>
>> Thanks,
>> Zoran
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-recommend-most-similar-users-using-Spark-ML-tp27342.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Compute

2016-04-27 Thread Karl Higley
You're right that there's some duplicate distance computations happening in
the implementation I mentioned. I ran into the kinds of issues you
describe, and I ended up accepting the duplicate computational work in
exchange for significantly reduced memory usage. I couldn't figure out a
way to avoid materializing the pairs (to save shuffle size and memory)
while also avoiding duplicate distance computations. I'd be very curious to
know if there is one, though!



On Wed, Apr 27, 2016, 11:22 PM nguyen duc tuan <newvalu...@gmail.com> wrote:

> I see this implementation before. The problem here is that: If after
> several hashes, if a pair of points appears K times in a bucket (with
> respect to K hashes), the distance needs to be computed K times, and total
> the data needs to shuffled will upto K times. So it deduce to my problem.
> I'm trying new approach and I think It will be better than my original
> approach:
> val part1 = rdd1.map(x => (x._1, x)).join(rdd2).map(_._2)
> val part2 = rdd2.map(x => (x._2, x)).join(rdd2).map(_._2)
> val distances = part1.join(part2).mapValues(v => measure.compute(v._1,
> v._2))
>
> And I'm sorry for uggly title of email. I forgot to check it before send.
>
> 2016-04-28 10:10 GMT+07:00 Karl Higley <kmhig...@gmail.com>:
>
>> One idea is to avoid materializing the pairs of points before computing
>> the distances between them. You could do that using the LSH signatures by
>> building (Signature, (Int, Vector)) tuples, grouping by signature, and then
>> iterating pairwise over the resulting lists of points to compute the
>> distances between them. The points still have to be shuffled over the
>> network, but at least the shuffle doesn't create multiple copies of each
>> point (like a join by point ids would).
>>
>> Here's an implementation of that idea in the context of finding nearest
>> neighbors:
>>
>> https://github.com/karlhigley/spark-neighbors/blob/master/src/main/scala/com/github/karlhigley/spark/neighbors/ANNModel.scala#L33-L34
>>
>> Best,
>> Karl
>>
>>
>>
>> On Wed, Apr 27, 2016 at 10:22 PM nguyen duc tuan <newvalu...@gmail.com>
>> wrote:
>>
>>> Hi all,
>>> Currently, I'm working on implementing LSH on spark. The problem leads
>>> to follow problem. I have an RDD[(Int, Int)] stores all pairs of ids of
>>> vectors need to compute distance and an other RDD[(Int, Vector)] stores all
>>> vectors with their ids. Can anyone  suggest an efficiency way to compute
>>> distance? My simple version that I try first is as follows but it's
>>> inefficient because it require a lot of shuffling data over the network.
>>>
>>> rdd1: RDD[(Int, Int)] = ..
>>> rdd2: RDD[(Int, Vector)] = ...
>>> val distances = rdd2.cartesian(rdd2)
>>>   .map(x => ((x._1._1, x._2._1), (x._1._2, x._2._2)))
>>>   .join(rdd1.map(x => (x, 1))
>>>   .mapValues(x => {
>>>  measure.compute(x._1._1, x._1._2)
>>>   })
>>>
>>> Thanks for any suggestion.
>>>
>>
>


Re: Compute

2016-04-27 Thread Karl Higley
One idea is to avoid materializing the pairs of points before computing the
distances between them. You could do that using the LSH signatures by
building (Signature, (Int, Vector)) tuples, grouping by signature, and then
iterating pairwise over the resulting lists of points to compute the
distances between them. The points still have to be shuffled over the
network, but at least the shuffle doesn't create multiple copies of each
point (like a join by point ids would).

Here's an implementation of that idea in the context of finding nearest
neighbors:
https://github.com/karlhigley/spark-neighbors/blob/master/src/main/scala/com/github/karlhigley/spark/neighbors/ANNModel.scala#L33-L34

Best,
Karl



On Wed, Apr 27, 2016 at 10:22 PM nguyen duc tuan 
wrote:

> Hi all,
> Currently, I'm working on implementing LSH on spark. The problem leads to
> follow problem. I have an RDD[(Int, Int)] stores all pairs of ids of
> vectors need to compute distance and an other RDD[(Int, Vector)] stores all
> vectors with their ids. Can anyone  suggest an efficiency way to compute
> distance? My simple version that I try first is as follows but it's
> inefficient because it require a lot of shuffling data over the network.
>
> rdd1: RDD[(Int, Int)] = ..
> rdd2: RDD[(Int, Vector)] = ...
> val distances = rdd2.cartesian(rdd2)
>   .map(x => ((x._1._1, x._2._1), (x._1._2, x._2._2)))
>   .join(rdd1.map(x => (x, 1))
>   .mapValues(x => {
>  measure.compute(x._1._1, x._1._2)
>   })
>
> Thanks for any suggestion.
>


Re: Reindexing in graphx

2016-02-25 Thread Karl Higley
For real time graph mutations and queries, you might consider a graph
database like Neo4j or TitanDB. Titan can be backed by HBase, which you're
already using, so that's probably worth a look.

On Thu, Feb 25, 2016, 9:55 AM Udbhav Agarwal 
wrote:

> That’s a good thing you pointed out. Let me check that. Thanks.
>
>
>
> Another thing I was struggling with is while this process of addition of
> vertices is happening with the graph(name is *inputGraph)* am not able to
> access it or perform query over it. Currently when I am querying the graph
> during the addition of vertices, its giving result after the addition is
> over. I have also tried with creating and querying another variable
> tempInputGraph where am storing state of inputGraph, which is updated
> whenever the addition process is over. But querying this is also being
> delayed due to the background process.
>
> I have set the number of executors as 8 as per my 8 core system.
>
> Please provide any suggestion as to how I can keep this graph always
> available to user even if any background process is happening over it. Let
> me know if it is possible or not as you said graphx is not really designed
> for real time needs.
>
>
>
> If not graphX which other tool I can consider if I have real time needs.
> To elaborate I want to have a real time system which can store data as and
> when it is coming and I can query over it in real time.
>
> In present case I am using graphx. My data is entering my system via kafka
> and spark streaming and then its updating a graph of let’s say orders. One
> copy of this is sent to hbase where the data is persisted for later use.
> Now I want to query this graph for getting various insights in this orders
> data. I was using graphx because it’s really helpful to use graphs if we
> want to analyse related/connected information e.g. friends of friends and
> other stuffs.
>
>
>
> I really appreciate your valuable help Robin. Thank you In advance.
>
>
>
> Udbhav.
>
> *From:* Robin East [mailto:robin.e...@xense.co.uk]
> *Sent:* Thursday, February 25, 2016 7:42 PM
>
>
> *To:* Udbhav Agarwal 
> *Cc:* user@spark.apache.org
> *Subject:* Re: Reindexing in graphx
>
>
>
> So first up GraphX is not really designed for real-time graph mutation
> time situations. That’s not to say it can’t be done but you may be butting
> up against some of the design limitations in that area. As a first point of
> interrogation you should look at the WebUI to see what particular
> tasks/stages are taking a long time, and what resource (CPU, IO, network,
> shuffles) do they seem to be bottle-necking on.
>
>
> ---
>
> Robin East
>
> *Spark GraphX in Action *Michael Malak and Robin East
>
> Manning Publications Co.
>
> http://www.manning.com/books/spark-graphx-in-action
>
>
>
>
>
>
>
>
>
> On 24 Feb 2016, at 12:05, Udbhav Agarwal 
> wrote:
>
>
>
> Sounds useful Robin. Thanks. I will try that. But fyi in another case I
> tested with adding only one vertex to the graph. In that case also the
> latency for subsequent addition was increasing like for first addition of a
> vertex its 3 seconds, then for second its 7 seconds and so on. This is a
> case when I want to add vertices to graph as and when they are coming in
> our system since it’s a real time system which I am trying to build so
> vertices will be keep on coming.
>
>
>
> Thanks.
>
> *From:* Robin East [mailto:robin.e...@xense.co.uk 
> ]
> *Sent:* Wednesday, February 24, 2016 3:54 PM
> *To:* Udbhav Agarwal 
> *Cc:* user@spark.apache.org
> *Subject:* Re: Reindexing in graphx
>
>
>
> It looks like you adding vertices one-by-one, you definitely don’t want to
> do that. What happens when you batch together 400 vertices into an RDD and
> then add 400 in one go?
>
>
> ---
>
> Robin East
>
> *Spark GraphX in Action *Michael Malak and Robin East
>
> Manning Publications Co.
>
> http://www.manning.com/books/spark-graphx-in-action
>
>
>
>
>
>
>
>
>
> On 24 Feb 2016, at 05:49, Udbhav Agarwal 
> wrote:
>
>
>
> Thank you Robin for your reply.
>
> Actually I am adding bunch of vertices in a graph in graphx using the
> following method . I am facing the problem of latency. First time an
> addition of say 400 vertices to a graph with 100,000 nodes takes around 7
> seconds. next time its taking 15 seconds. So every subsequent adds are
> taking more time than the previous one. Hence I tried to do reindex() so
> the subsequent operations can also be performed fast.
>
> FYI My cluster is presently having one machine with 8 core and 8 gb ram. I
> am running in local mode.
>
>
>
> def addVertex(rdd: RDD[String], sc: SparkContext, session: String): Long =
> {
> val defaultUser = (0, 0)
> 

Re: Computing hamming distance over large data set

2016-02-11 Thread Karl Higley
Hi,

It sounds like you're trying to solve the approximate nearest neighbor
(ANN) problem. With a large dataset, parallelizing a brute force O(n^2)
approach isn't likely to help all that much, because the number of pairwise
comparisons grows quickly as the size of the dataset increases. I'd look at
ways to avoid computing the similarity between all pairs, like
locality-sensitive hashing. (Unfortunately Spark doesn't yet support LSH --
it's currently slated for the Spark 2.0.0 release, but AFAIK development on
it hasn't started yet.)

There are a bunch of Python libraries that support various approaches to
the ANN problem (including LSH), though. It sounds like you need fast
lookups, so you might check out https://github.com/spotify/annoy. For other
alternatives, see this performance comparison of Python ANN libraries:
https://github.com/erikbern/ann-benchmarks.

Hope that helps,
Karl

On Wed, Feb 10, 2016 at 10:29 PM rokclimb15  wrote:

> Hi everyone, new to this list and Spark, so I'm hoping someone can point me
> in the right direction.
>
> I'm trying to perform this same sort of task:
>
> http://stackoverflow.com/questions/14925151/hamming-distance-optimization-for-mysql-or-postgresql
>
> and I'm running into the same problem - it doesn't scale.  Even on a very
> fast processor, MySQL pegs out one CPU core at 100% and takes 8 hours to
> find a match with 30 million+ rows.
>
> What I would like to do is to load this data set from MySQL into Spark and
> compute the Hamming distance using all available cores, then select the
> rows
> matching a maximum distance.  I'm most familiar with Python, so would
> prefer
> to use that.
>
> I found an example of loading data from MySQL
>
>
> http://blog.predikto.com/2015/04/10/using-the-spark-datasource-api-to-access-a-database/
>
> I found a related DataFrame commit and docs, but I'm not exactly sure how
> to
> put this all together.
>
>
> https://mail-archives.apache.org/mod_mbox/spark-commits/201505.mbox/%3c707d439f5fcb478b99aa411e23abb...@git.apache.org%3E
>
>
> http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.bitwiseXOR
>
> Could anyone please point me to a similar example I could follow as a Spark
> newb to try this out?  Is this even worth attempting, or will it similarly
> fail performance-wise?
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Computing-hamming-distance-over-large-data-set-tp26202.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Product similarity with TF/IDF and Cosine similarity (DIMSUM)

2016-02-03 Thread Karl Higley
Hi Alan,

I'm slow responding, so you may have already figured this out. Just in
case, though:

val approx = mat.columnSimilarities(0.1)
approxEntries.first()
res18: ((Long, Long), Double) = ((1638,966248),0.632455532033676)

The above is returning the cosine similarity between columns 1638 and
966248. Since you're providing documents as rows, this is conceptually
something like the similarity between terms based on which documents they
occur in.

In order to get the similarity between documents based on the terms they
contain, you'd need to build a RowMatrix where each row represents one term
and each column represents one document. One way to do that would be to
construct a CoordinateMatrix from your vectors, call transpose() on it,
then convert it to a RowMatrix via toRowMatrix().

Hope that helps!

Best,
Karl

On Sat, Jan 30, 2016 at 4:30 PM Alan Prando  wrote:

> Hi Folks!
>
> I am trying to implement a spark job to calculate the similarity of my
> database products, using only name and descriptions.
> I would like to use TF-IDF to represent my text data and cosine similarity
> to calculate all similarities.
>
> My goal is, after job completes, get all similarities as a list.
> For example:
> Prod1 = ((Prod2, 0.98), (Prod3, 0.88))
> Prod2 = ((Prod1, 0.98), (Prod4, 0.53))
> Prod3 = ((Prod1, 0.98))
> Prod4 = ((Prod1, 0.53))
>
> However, I am new with Spark and I am having issues to use understanding
> what cosine similarity returns!
>
> My code:
> val documents: RDD[Seq[String]] = sc.textFile(filename).map(_.split("
> ").toSeq)
>
> val hashingTF = new HashingTF()
> val tf: RDD[Vector] = hashingTF.transform(documents)
> tf.cache()
>
> val idf = new IDF(minDocFreq = 2).fit(tf)
> val tfidf: RDD[Vector] = idf.transform(tf)
>
> val mat = new RowMatrix(tfidf)
>
> // Compute similar columns perfectly, with brute force.
> val exact = mat.columnSimilarities()
>
> // Compute similar columns with estimation using DIMSUM
> val approx = mat.columnSimilarities(0.1)
>
> val exactEntries = exact.entries.map { case MatrixEntry(i, j, u) =>
> ((i, j), u) }
> val approxEntries = approx.entries.map { case MatrixEntry(i, j, v) =>
> ((i, j), v) }
>
> The file is just products name and description in each row.
>
> The return I got:
> approxEntries.first()
> res18: ((Long, Long), Double) = ((1638,966248),0.632455532033676)
>
> How can I figure out  what row this return is about?
>
> Thanks in advance! =]
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark : merging object with approximation

2015-11-25 Thread Karl Higley
Hi,

What merge behavior do you want when A~=B, B~=C but A!=C? Should the merge
emit ABC? AB and BC? Something else?

Best,
Karl

On Sat, Nov 21, 2015 at 5:24 AM OcterA  wrote:

> Hello,
>
> I have a set of X data (around 30M entry), I have to do a batch to merge
> data which are similar, at the end I will have around X/2 data.
>
> At this moment, i've done the basis : open files, map to an usable Ojbect,
> but I'm stuck at the merge part...
>
> The merge condition is composed from various conditions
>
> A.get*Start*Point == B.get*End*Point
> Difference between A.getStartDate and B.getStartDate is less than X1
> second
> Difference between A.getEndDate and B.getEndDate is less than X2 second
> A.getField1 startWith B.getField1
> some more like that...
>
> Suddenly, I can have A~=B, B~=C but A!=C. For my Spark comprehension, this
> is a problem, because I can have an hash to reduce greatly the scan time...
>
> Have you some advice, to resolve my problem, or pointers on method which
> can
> help me? Maybe an another tools from the Hadoop ecosystem?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-merging-object-with-approximation-tp25445.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>