[
https://issues.apache.org/jira/browse/MAHOUT-1464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14029469#comment-14029469
]
ASF GitHub Bot commented on MAHOUT-1464:
----------------------------------------
Github user pferrel commented on a diff in the pull request:
https://github.com/apache/mahout/pull/12#discussion_r13714354
--- Diff:
spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala ---
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf
+
+import org.apache.mahout.math._
+import scalabindings._
+import RLikeOps._
+import drm._
+import RLikeDrmOps._
+import org.apache.mahout.sparkbindings._
+import scala.collection.JavaConversions._
+import org.apache.mahout.math.stats.LogLikelihood
+import collection._
+import org.apache.mahout.common.RandomUtils
+import org.apache.mahout.math.function.{VectorFunction, Functions}
+
+
+/**
+ * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning,
Innovations in Recommendation",
+ * available at http://www.mapr.com/practical-machine-learning
+ *
+ * see also "Sebastian Schelter, Christoph Boden, Volker Markl:
+ * Scalable Similarity-Based Neighborhood Methods with MapReduce
+ * ACM Conference on Recommender Systems 2012"
+ */
+object CooccurrenceAnalysis extends Serializable {
+
+ /** Compares (Int,Double) pairs by the second value */
+ private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case
((_, score1), (_, score2)) => score1 > score2}
+
+ def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef,
maxInterestingItemsPerThing: Int = 50,
+ maxNumInteractions: Int = 500, drmBs:
Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
+
+ implicit val distributedContext = drmARaw.context
+
+ // Apply selective downsampling, pin resulting matrix
+ val drmA = sampleDownAndBinarize(drmARaw, randomSeed,
maxNumInteractions)
+
+ // num users, which equals the maximum number of interactions per item
+ val numUsers = drmA.nrow.toInt
+
+ // Compute & broadcast the number of interactions per thing in A
+ val bcastInteractionsPerItemA = drmBroadcast(drmA.colCounts)
+
+ // Compute co-occurrence matrix A'A
+ val drmAtA = drmA.t %*% drmA
+
+ // Compute loglikelihood scores and sparsify the resulting matrix to
get the indicator matrix
+ val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers,
maxInterestingItemsPerThing, bcastInteractionsPerItemA,
+ bcastInteractionsPerItemA, crossCooccurrence = false)
+
+ var indicatorMatrices = List(drmIndicatorsAtA)
+
+ // Now look at cross-co-occurrences
+ for (drmBRaw <- drmBs) {
+ // Down-sample and pin other interaction matrix
+ val drmB = sampleDownAndBinarize(drmBRaw, randomSeed,
maxNumInteractions).checkpoint()
+
+ // Compute & broadcast the number of interactions per thing in B
+ val bcastInteractionsPerThingB = drmBroadcast(drmB.colCounts)
+
+ // Compute cross-co-occurrence matrix B'A
+ val drmBtA = drmB.t %*% drmA
+
+ val drmIndicatorsBtA = computeIndicators(drmBtA, numUsers,
maxInterestingItemsPerThing,
+ bcastInteractionsPerThingB, bcastInteractionsPerItemA)
+
+ indicatorMatrices = indicatorMatrices :+ drmIndicatorsBtA
+
+ drmB.uncache()
+ }
+
+ // Unpin downsampled interaction matrix
+ drmA.uncache()
+
+ // Return list of indicator matrices
+ indicatorMatrices
+ }
+
+ /**
+ * Compute loglikelihood ratio
+ * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html
for details
+ **/
+ def loglikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB:
Long,
+ numInteractionsWithAandB: Long, numInteractions:
Long) = {
+
+ val k11 = numInteractionsWithAandB
+ val k12 = numInteractionsWithA - numInteractionsWithAandB
+ val k21 = numInteractionsWithB - numInteractionsWithAandB
+ val k22 = numInteractions - numInteractionsWithA -
numInteractionsWithB + numInteractionsWithAandB
+
+ LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22)
+ }
+
+ def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int,
maxInterestingItemsPerThing: Int,
+ bcastNumInteractionsB: BCast[Vector],
bcastNumInteractionsA: BCast[Vector],
+ crossCooccurrence: Boolean = true) = {
+ drmBtA.mapBlock() {
+ case (keys, block) =>
+
+ val llrBlock = block.like()
+ val numInteractionsB: Vector = bcastNumInteractionsB
+ val numInteractionsA: Vector = bcastNumInteractionsA
+
+ for (index <- 0 until keys.size) {
+
+ val thingB = keys(index)
+
+ // PriorityQueue to select the top-k items
+ val topItemsPerThing = new mutable.PriorityQueue[(Int,
Double)]()(orderByScore)
+
+ block(index, ::).nonZeroes().foreach { elem =>
+ val thingA = elem.index
+ val cooccurrences = elem.get
+
+ // exclude co-occurrences of the item with itself
+ if (crossCooccurrence || thingB != thingA) {
+ // Compute loglikelihood ratio
+ val llrRatio =
loglikelihoodRatio(numInteractionsB(thingB).toLong,
numInteractionsA(thingA).toLong,
+ cooccurrences.toLong, numUsers)
+ val candidate = thingA -> llrRatio
+
+ // Enqueue item with score, if belonging to the top-k
+ if (topItemsPerThing.size < maxInterestingItemsPerThing) {
+ topItemsPerThing.enqueue(candidate)
+ } else if (orderByScore.lt(candidate,
topItemsPerThing.head)) {
+ topItemsPerThing.dequeue()
+ topItemsPerThing.enqueue(candidate)
+ }
+ }
+ }
+
+ // Add top-k interesting items to the output matrix
+ topItemsPerThing.dequeueAll.foreach {
+ case (otherThing, llrScore) =>
+ llrBlock(index, otherThing) = llrScore
+ }
+ }
+
+ keys -> llrBlock
+ }
+ }
+
+ /**
+ * Selectively downsample users and things with an anomalous amount of
interactions, inspired by
+ *
https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java
+ *
+ * additionally binarizes input matrix, as we're only interesting in
knowing whether interactions happened or not
+ */
+ def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int,
maxNumInteractions: Int) = {
+
+ implicit val distributedContext = drmM.context
+
+ // Pin raw interaction matrix
+ val drmI = drmM.checkpoint()
+
+ // Broadcast vector containing the number of interactions with each
thing
+ val bcastNumInteractions = drmBroadcast(drmI.colCounts)
+
+ val downSampledDrmI = drmI.mapBlock() {
+ case (keys, block) =>
+ val numInteractions: Vector = bcastNumInteractions
+
+ // Use a hash of the unique first key to seed the RNG, makes this
computation repeatable in case of failures
+ val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed))
+
+ val downsampledBlock = block.like()
+
+ // Downsample the interaction vector of each user
+ for (userIndex <- 0 until keys.size) {
+
+ val interactionsOfUser = block(userIndex, ::)
+
+ //todo: can we trust getNumNonZeroElements or is this the upper
limit? May have to actually count them?
--- End diff --
got it, I'll remove the comment since we do rely on it in the code
> Cooccurrence Analysis on Spark
> ------------------------------
>
> Key: MAHOUT-1464
> URL: https://issues.apache.org/jira/browse/MAHOUT-1464
> Project: Mahout
> Issue Type: Improvement
> Components: Collaborative Filtering
> Environment: hadoop, spark
> Reporter: Pat Ferrel
> Assignee: Pat Ferrel
> Fix For: 1.0
>
> Attachments: MAHOUT-1464.patch, MAHOUT-1464.patch, MAHOUT-1464.patch,
> MAHOUT-1464.patch, MAHOUT-1464.patch, MAHOUT-1464.patch, run-spark-xrsj.sh
>
>
> Create a version of Cooccurrence Analysis (RowSimilarityJob with LLR) that
> runs on Spark. This should be compatible with Mahout Spark DRM DSL so a DRM
> can be used as input.
> Ideally this would extend to cover MAHOUT-1422. This cross-cooccurrence has
> several applications including cross-action recommendations.
--
This message was sent by Atlassian JIRA
(v6.2#6252)