Github user WeichenXu123 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18624#discussion_r150170451
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
 ---
    @@ -286,40 +288,119 @@ object MatrixFactorizationModel extends 
Loader[MatrixFactorizationModel] {
           srcFeatures: RDD[(Int, Array[Double])],
           dstFeatures: RDD[(Int, Array[Double])],
           num: Int): RDD[(Int, Array[(Int, Double)])] = {
    -    val srcBlocks = blockify(srcFeatures)
    -    val dstBlocks = blockify(dstFeatures)
    -    val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, 
dstIter) =>
    -      val m = srcIter.size
    -      val n = math.min(dstIter.size, num)
    -      val output = new Array[(Int, (Int, Double))](m * n)
    +    val srcBlocks = blockify(rank, srcFeatures).zipWithIndex()
    +    val dstBlocks = blockify(rank, dstFeatures)
    +    val ratings = srcBlocks.cartesian(dstBlocks).map {
    +      case (((srcIds, srcFactors), index), (dstIds, dstFactors)) =>
    +        val m = srcIds.length
    +        val n = dstIds.length
    +        val dstIdMatrix = new Array[Int](m * num)
    +        val scoreMatrix = Array.fill[Double](m * 
num)(Double.NegativeInfinity)
    +        val pq = new BoundedPriorityQueue[(Int, 
Double)](num)(Ordering.by(_._2))
    +
    +        val ratings = srcFactors.transpose.multiply(dstFactors)
    +        var i = 0
    +        var j = 0
    +        while (i < m) {
    +          var k = 0
    +          while (k < n) {
    +            pq += dstIds(k) -> ratings(i, k)
    +            k += 1
    +          }
    +          k = 0
    +          pq.toArray.sortBy(-_._2).foreach { case (id, score) =>
    +            dstIdMatrix(j + k) = id
    +            scoreMatrix(j + k) = score
    +            k += 1
    +          }
    +          // pq.size maybe less than num, corner case
    +          j += num
    +          i += 1
    +          pq.clear()
    +        }
    +        (index, (srcIds, dstIdMatrix, new DenseMatrix(m, num, scoreMatrix, 
true)))
    +    }
    +    ratings.aggregateByKey(null: Array[Int], null: Array[Int], null: 
DenseMatrix)(
    +      (rateSum, rate) => mergeFunc(rateSum, rate, num),
    +      (rateSum1, rateSum2) => mergeFunc(rateSum1, rateSum2, num)
    +    ).flatMap { case (index, (srcIds, dstIdMatrix, scoreMatrix)) =>
    +      // to avoid corner case that the number of items is less than 
recommendation num
    +      var col: Int = 0
    +      while (col < num && scoreMatrix(0, col) > Double.NegativeInfinity) {
    +        col += 1
    +      }
    +      val row = scoreMatrix.numRows
    +      val output = new Array[(Int, Array[(Int, Double)])](row)
           var i = 0
    -      val pq = new BoundedPriorityQueue[(Int, 
Double)](n)(Ordering.by(_._2))
    -      srcIter.foreach { case (srcId, srcFactor) =>
    -        dstIter.foreach { case (dstId, dstFactor) =>
    -          // We use F2jBLAS which is faster than a call to native BLAS for 
vector dot product
    -          val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1)
    -          pq += dstId -> score
    +      while (i < row) {
    +        val factors = new Array[(Int, Double)](col)
    +        var j = 0
    +        while (j < col) {
    +          factors(j) = (dstIdMatrix(i * num + j), scoreMatrix(i, j))
    +          j += 1
             }
    -        pq.foreach { case (dstId, score) =>
    -          output(i) = (srcId, (dstId, score))
    -          i += 1
    +        output(i) = (srcIds(i), factors)
    +        i += 1
    +      }
    +     output.toSeq}
    +  }
    +
    +  private def mergeFunc(rateSum: (Array[Int], Array[Int], DenseMatrix),
    +                        rate: (Array[Int], Array[Int], DenseMatrix),
    +                        num: Int): (Array[Int], Array[Int], DenseMatrix) = 
{
    +    if (rateSum._1 == null) {
    +      rate
    +    } else {
    +      val row = rateSum._3.numRows
    +      var i = 0
    +      val tempIdMatrix = new Array[Int](row * num)
    +      val tempScoreMatrix = Array.fill[Double](row * 
num)(Double.NegativeInfinity)
    +      while (i < row) {
    +        var j = 0
    +        var sum_index = 0
    +        var rate_index = 0
    +        val matrixIndex = i * num
    +        while (j < num) {
    +          if (rate._3(i, rate_index) > rateSum._3(i, sum_index)) {
    +            tempIdMatrix(matrixIndex + j) = rate._2(matrixIndex + 
rate_index)
    +            tempScoreMatrix(matrixIndex + j) = rate._3(i, rate_index)
    +            rate_index += 1
    +          } else {
    +            tempIdMatrix(matrixIndex + j) = rateSum._2(matrixIndex + 
sum_index)
    +            tempScoreMatrix(matrixIndex + j) = rateSum._3(i, sum_index)
    +            sum_index += 1
    +          }
    +          j += 1
             }
    -        pq.clear()
    +        i += 1
           }
    -      output.toSeq
    +      (rateSum._1, tempIdMatrix, new DenseMatrix(row, num, 
tempScoreMatrix, true))
         }
    -    ratings.topByKey(num)(Ordering.by(_._2))
       }
     
       /**
        * Blockifies features to improve the efficiency of cartesian product
        * TODO: SPARK-20443 - expose blockSize as a param?
        */
    -  private def blockify(
    -      features: RDD[(Int, Array[Double])],
    -      blockSize: Int = 4096): RDD[Seq[(Int, Array[Double])]] = {
    +  def blockify(
    +      rank: Int,
    +      features: RDD[(Int, Array[Double])]): RDD[(Array[Int], DenseMatrix)] 
= {
    +    val blockSize = 2000 // TODO: tune the block size
    --- End diff --
    
    So will you add a parameter for this ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to