
ASF GitHub Bot commented on MAHOUT-1464:

Github user pferrel commented on a diff in the pull request:

    --- Diff: 
spark/src/main/scala/org/apache/mahout/cf/CooccurrenceAnalysis.scala ---
    @@ -0,0 +1,214 @@
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.mahout.cf
    +import org.apache.mahout.math._
    +import scalabindings._
    +import RLikeOps._
    +import drm._
    +import RLikeDrmOps._
    +import org.apache.mahout.sparkbindings._
    +import scala.collection.JavaConversions._
    +import org.apache.mahout.math.stats.LogLikelihood
    +import collection._
    +import org.apache.mahout.common.RandomUtils
    +import org.apache.mahout.math.function.{VectorFunction, Functions}
    + * based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, 
Innovations in Recommendation",
    + * available at http://www.mapr.com/practical-machine-learning
    + *
    + * see also "Sebastian Schelter, Christoph Boden, Volker Markl:
    + * Scalable Similarity-Based Neighborhood Methods with MapReduce
    + * ACM Conference on Recommender Systems 2012"
    + */
    +object CooccurrenceAnalysis extends Serializable {
    +  /** Compares (Int,Double) pairs by the second value */
    +  private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case 
((_, score1), (_, score2)) => score1 > score2}
    +  def cooccurrences(drmARaw: DrmLike[Int], randomSeed: Int = 0xdeadbeef, 
maxInterestingItemsPerThing: Int = 50,
    +                    maxNumInteractions: Int = 500, drmBs: 
Array[DrmLike[Int]] = Array()): List[DrmLike[Int]] = {
    +    implicit val distributedContext = drmARaw.context
    +    // Apply selective downsampling, pin resulting matrix
    +    val drmA = sampleDownAndBinarize(drmARaw, randomSeed, 
    +    // num users, which equals the maximum number of interactions per item
    +    val numUsers = drmA.nrow.toInt
    +    // Compute & broadcast the number of interactions per thing in A
    +    val bcastInteractionsPerItemA = drmBroadcast(drmA.colCounts)
    +    // Compute co-occurrence matrix A'A
    +    val drmAtA = drmA.t %*% drmA
    +    // Compute loglikelihood scores and sparsify the resulting matrix to 
get the indicator matrix
    +    val drmIndicatorsAtA = computeIndicators(drmAtA, numUsers, 
maxInterestingItemsPerThing, bcastInteractionsPerItemA,
    +      bcastInteractionsPerItemA, crossCooccurrence = false)
    +    var indicatorMatrices = List(drmIndicatorsAtA)
    +    // Now look at cross-co-occurrences
    +    for (drmBRaw <- drmBs) {
    +      // Down-sample and pin other interaction matrix
    +      val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, 
    +      // Compute & broadcast the number of interactions per thing in B
    +      val bcastInteractionsPerThingB = drmBroadcast(drmB.colCounts)
    +      // Compute cross-co-occurrence matrix B'A
    +      val drmBtA = drmB.t %*% drmA
    +      val drmIndicatorsBtA = computeIndicators(drmBtA, numUsers, 
    +        bcastInteractionsPerThingB, bcastInteractionsPerItemA)
    +      indicatorMatrices = indicatorMatrices :+ drmIndicatorsBtA
    +      drmB.uncache()
    +    }
    +    // Unpin downsampled interaction matrix
    +    drmA.uncache()
    +    // Return list of indicator matrices
    +    indicatorMatrices
    +  }
    +  /**
    +   * Compute loglikelihood ratio
    +   * see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html 
for details
    +   **/
    +  def loglikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: 
    +                         numInteractionsWithAandB: Long, numInteractions: 
Long) = {
    +    val k11 = numInteractionsWithAandB
    +    val k12 = numInteractionsWithA - numInteractionsWithAandB
    +    val k21 = numInteractionsWithB - numInteractionsWithAandB
    +    val k22 = numInteractions - numInteractionsWithA - 
numInteractionsWithB + numInteractionsWithAandB
    +    LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22)
    +  }
    +  def computeIndicators(drmBtA: DrmLike[Int], numUsers: Int, 
maxInterestingItemsPerThing: Int,
    +                        bcastNumInteractionsB: BCast[Vector], 
bcastNumInteractionsA: BCast[Vector],
    +                        crossCooccurrence: Boolean = true) = {
    +    drmBtA.mapBlock() {
    +      case (keys, block) =>
    +        val llrBlock = block.like()
    +        val numInteractionsB: Vector = bcastNumInteractionsB
    +        val numInteractionsA: Vector = bcastNumInteractionsA
    +        for (index <- 0 until keys.size) {
    +          val thingB = keys(index)
    +          // PriorityQueue to select the top-k items
    +          val topItemsPerThing = new mutable.PriorityQueue[(Int, 
    +          block(index, ::).nonZeroes().foreach { elem =>
    +            val thingA = elem.index
    +            val cooccurrences = elem.get
    +            // exclude co-occurrences of the item with itself
    +            if (crossCooccurrence || thingB != thingA) {
    +              // Compute loglikelihood ratio
    +              val llrRatio = 
    +                cooccurrences.toLong, numUsers)
    +              val candidate = thingA -> llrRatio
    +              // Enqueue item with score, if belonging to the top-k
    +              if (topItemsPerThing.size < maxInterestingItemsPerThing) {
    +                topItemsPerThing.enqueue(candidate)
    +              } else if (orderByScore.lt(candidate, 
topItemsPerThing.head)) {
    +                topItemsPerThing.dequeue()
    +                topItemsPerThing.enqueue(candidate)
    +              }
    +            }
    +          }
    +          // Add top-k interesting items to the output matrix
    +          topItemsPerThing.dequeueAll.foreach {
    +            case (otherThing, llrScore) =>
    +              llrBlock(index, otherThing) = llrScore
    +          }
    +        }
    +        keys -> llrBlock
    +    }
    +  }
    +  /**
    +   * Selectively downsample users and things with an anomalous amount of 
interactions, inspired by
    +   * 
    +   *
    +   * additionally binarizes input matrix, as we're only interesting in 
knowing whether interactions happened or not
    +   */
    +  def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, 
maxNumInteractions: Int) = {
    +    implicit val distributedContext = drmM.context
    +    // Pin raw interaction matrix
    +    val drmI = drmM.checkpoint()
    +    // Broadcast vector containing the number of interactions with each 
    +    val bcastNumInteractions = drmBroadcast(drmI.colCounts)
    +    val downSampledDrmI = drmI.mapBlock() {
    +      case (keys, block) =>
    +        val numInteractions: Vector = bcastNumInteractions
    +        // Use a hash of the unique first key to seed the RNG, makes this 
computation repeatable in case of failures
    +        val random = RandomUtils.getRandom(MurmurHash.hash(keys(0), seed))
    +        val downsampledBlock = block.like()
    +        // Downsample the interaction vector of each user
    +        for (userIndex <- 0 until keys.size) {
    +          val interactionsOfUser = block(userIndex, ::)
    +          //todo: can we trust getNumNonZeroElements or is this the upper 
limit? May have to actually count them?
    --- End diff --
    got it, I'll remove the comment since we do rely on it in the code

> Cooccurrence Analysis on Spark
> ------------------------------
>                 Key: MAHOUT-1464
>                 URL: https://issues.apache.org/jira/browse/MAHOUT-1464
>             Project: Mahout
>          Issue Type: Improvement
>          Components: Collaborative Filtering
>         Environment: hadoop, spark
>            Reporter: Pat Ferrel
>            Assignee: Pat Ferrel
>             Fix For: 1.0
>         Attachments: MAHOUT-1464.patch, MAHOUT-1464.patch, MAHOUT-1464.patch, 
> MAHOUT-1464.patch, MAHOUT-1464.patch, MAHOUT-1464.patch, run-spark-xrsj.sh
> Create a version of Cooccurrence Analysis (RowSimilarityJob with LLR) that 
> runs on Spark. This should be compatible with Mahout Spark DRM DSL so a DRM 
> can be used as input. 
> Ideally this would extend to cover MAHOUT-1422. This cross-cooccurrence has 
> several applications including cross-action recommendations. 

This message was sent by Atlassian JIRA

Reply via email to