[ https://issues.apache.org/jira/browse/MAHOUT-1464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14029469#comment-14029469 ]
ASF GitHub Bot commented on MAHOUT-1464: ---------------------------------------- Github user pferrel commented on a diff in the pull request: https://github.com/apache/mahout/pull/12#discussion_r13714354 --- Diff: spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala --- @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf + +import org.apache.mahout.math._ +import scalabindings._ +import RLikeOps._ +import drm._ +import RLikeDrmOps._ +import org.apache.mahout.sparkbindings._ +import scala.collection.JavaConversions._ +import org.apache.mahout.math.stats.LogLikelihood +import collection._ +import org.apache.mahout.common.RandomUtils +import org.apache.mahout.math.function.{VectorFunction, Functions} + + +/** + * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation", + * available at http://www.mapr.com/practical-machine-learning + * + * see also "Sebastian Schelter, Christoph Boden, Volker Markl: + * Scalable Similarity-Based Neighborhood Methods with MapReduce + * ACM Conference on Recommender Systems 2012" + */ +object CooccurrenceAnalysis extends Serializable { + + /** Compares (Int,Double) pairs by the second value */ + private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2} + + def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, maxInterestingItemsPerThing: Int = 50, + maxNumInteractions: Int = 500, drmBs: Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = { + + implicit val distributedContext = drmARaw.context + + // Apply selective downsampling, pin resulting matrix + val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions) + + // num users, which equals the maximum number of interactions per item + val numUsers = drmA.nrow.toInt + + // Compute & broadcast the number of interactions per thing in A + val bcastInteractionsPerItemA = drmBroadcast(drmA.colCounts) + + // Compute co-occurrence matrix A'A + val drmAtA = drmA.t %*% drmA + + // Compute loglikelihood scores and sparsify the resulting matrix to get the indicator matrix + val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, maxInterestingItemsPerThing, bcastInteractionsPerItemA, + bcastInteractionsPerItemA, crossCooccurrence = false) + + var indicatorMatrices = List(drmIndicatorsAtA) + + // Now look at cross-co-occurrences + for (drmBRaw <- drmBs) { + // Down-sample and pin other interaction matrix + val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint() + + // Compute & broadcast the number of interactions per thing in B + val bcastInteractionsPerThingB = drmBroadcast(drmB.colCounts) + + // Compute cross-co-occurrence matrix B'A + val drmBtA = drmB.t %*% drmA + + val drmIndicatorsBtA = computeIndicators(drmBtA, numUsers, maxInterestingItemsPerThing, + bcastInteractionsPerThingB, bcastInteractionsPerItemA) + + indicatorMatrices = indicatorMatrices :+ drmIndicatorsBtA + + drmB.uncache() + } + + // Unpin downsampled interaction matrix + drmA.uncache() + + // Return list of indicator matrices + indicatorMatrices + } + + /** + * Compute loglikelihood ratio + * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details + **/ + def loglikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long, + numInteractionsWithAandB: Long, numInteractions: Long) = { + + val k11 = numInteractionsWithAandB + val k12 = numInteractionsWithA - numInteractionsWithAandB + val k21 = numInteractionsWithB - numInteractionsWithAandB + val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB + + LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22) + } + + def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, maxInterestingItemsPerThing: Int, + bcastNumInteractionsB: BCast[Vector], bcastNumInteractionsA: BCast[Vector], + crossCooccurrence: Boolean = true) = { + drmBtA.mapBlock() { + case (keys, block) => + + val llrBlock = block.like() + val numInteractionsB: Vector = bcastNumInteractionsB + val numInteractionsA: Vector = bcastNumInteractionsA + + for (index <- 0 until keys.size) { + + val thingB = keys(index) + + // PriorityQueue to select the top-k items + val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore) + + block(index, ::).nonZeroes().foreach { elem => + val thingA = elem.index + val cooccurrences = elem.get + + // exclude co-occurrences of the item with itself + if (crossCooccurrence || thingB != thingA) { + // Compute loglikelihood ratio + val llrRatio = loglikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong, + cooccurrences.toLong, numUsers) + val candidate = thingA -> llrRatio + + // Enqueue item with score, if belonging to the top-k + if (topItemsPerThing.size < maxInterestingItemsPerThing) { + topItemsPerThing.enqueue(candidate) + } else if (orderByScore.lt(candidate, topItemsPerThing.head)) { + topItemsPerThing.dequeue() + topItemsPerThing.enqueue(candidate) + } + } + } + + // Add top-k interesting items to the output matrix + topItemsPerThing.dequeueAll.foreach { + case (otherThing, llrScore) => + llrBlock(index, otherThing) = llrScore + } + } + + keys -> llrBlock + } + } + + /** + * Selectively downsample users and things with an anomalous amount of interactions, inspired by + * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java + * + * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not + */ + def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = { + + implicit val distributedContext = drmM.context + + // Pin raw interaction matrix + val drmI = drmM.checkpoint() + + // Broadcast vector containing the number of interactions with each thing + val bcastNumInteractions = drmBroadcast(drmI.colCounts) + + val downSampledDrmI = drmI.mapBlock() { + case (keys, block) => + val numInteractions: Vector = bcastNumInteractions + + // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of failures + val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed)) + + val downsampledBlock = block.like() + + // Downsample the interaction vector of each user + for (userIndex <- 0 until keys.size) { + + val interactionsOfUser = block(userIndex, ::) + + //todo: can we trust getNumNonZeroElements or is this the upper limit? May have to actually count them? --- End diff -- got it, I'll remove the comment since we do rely on it in the code > Cooccurrence Analysis on Spark > ------------------------------ > > Key: MAHOUT-1464 > URL: https://issues.apache.org/jira/browse/MAHOUT-1464 > Project: Mahout > Issue Type: Improvement > Components: Collaborative Filtering > Environment: hadoop, spark > Reporter: Pat Ferrel > Assignee: Pat Ferrel > Fix For: 1.0 > > Attachments: MAHOUT-1464.patch, MAHOUT-1464.patch, MAHOUT-1464.patch, > MAHOUT-1464.patch, MAHOUT-1464.patch, MAHOUT-1464.patch, run-spark-xrsj.sh > > > Create a version of Cooccurrence Analysis (RowSimilarityJob with LLR) that > runs on Spark. This should be compatible with Mahout Spark DRM DSL so a DRM > can be used as input. > Ideally this would extend to cover MAHOUT-1422. This cross-cooccurrence has > several applications including cross-action recommendations. -- This message was sent by Atlassian JIRA (v6.2#6252)