Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/18624#discussion_r150170451 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala --- @@ -286,40 +288,119 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { srcFeatures: RDD[(Int, Array[Double])], dstFeatures: RDD[(Int, Array[Double])], num: Int): RDD[(Int, Array[(Int, Double)])] = { - val srcBlocks = blockify(srcFeatures) - val dstBlocks = blockify(dstFeatures) - val ratings = srcBlocks.cartesian(dstBlocks).flatMap { case (srcIter, dstIter) => - val m = srcIter.size - val n = math.min(dstIter.size, num) - val output = new Array[(Int, (Int, Double))](m * n) + val srcBlocks = blockify(rank, srcFeatures).zipWithIndex() + val dstBlocks = blockify(rank, dstFeatures) + val ratings = srcBlocks.cartesian(dstBlocks).map { + case (((srcIds, srcFactors), index), (dstIds, dstFactors)) => + val m = srcIds.length + val n = dstIds.length + val dstIdMatrix = new Array[Int](m * num) + val scoreMatrix = Array.fill[Double](m * num)(Double.NegativeInfinity) + val pq = new BoundedPriorityQueue[(Int, Double)](num)(Ordering.by(_._2)) + + val ratings = srcFactors.transpose.multiply(dstFactors) + var i = 0 + var j = 0 + while (i < m) { + var k = 0 + while (k < n) { + pq += dstIds(k) -> ratings(i, k) + k += 1 + } + k = 0 + pq.toArray.sortBy(-_._2).foreach { case (id, score) => + dstIdMatrix(j + k) = id + scoreMatrix(j + k) = score + k += 1 + } + // pq.size maybe less than num, corner case + j += num + i += 1 + pq.clear() + } + (index, (srcIds, dstIdMatrix, new DenseMatrix(m, num, scoreMatrix, true))) + } + ratings.aggregateByKey(null: Array[Int], null: Array[Int], null: DenseMatrix)( + (rateSum, rate) => mergeFunc(rateSum, rate, num), + (rateSum1, rateSum2) => mergeFunc(rateSum1, rateSum2, num) + ).flatMap { case (index, (srcIds, dstIdMatrix, scoreMatrix)) => + // to avoid corner case that the number of items is less than recommendation num + var col: Int = 0 + while (col < num && scoreMatrix(0, col) > Double.NegativeInfinity) { + col += 1 + } + val row = scoreMatrix.numRows + val output = new Array[(Int, Array[(Int, Double)])](row) var i = 0 - val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2)) - srcIter.foreach { case (srcId, srcFactor) => - dstIter.foreach { case (dstId, dstFactor) => - // We use F2jBLAS which is faster than a call to native BLAS for vector dot product - val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1) - pq += dstId -> score + while (i < row) { + val factors = new Array[(Int, Double)](col) + var j = 0 + while (j < col) { + factors(j) = (dstIdMatrix(i * num + j), scoreMatrix(i, j)) + j += 1 } - pq.foreach { case (dstId, score) => - output(i) = (srcId, (dstId, score)) - i += 1 + output(i) = (srcIds(i), factors) + i += 1 + } + output.toSeq} + } + + private def mergeFunc(rateSum: (Array[Int], Array[Int], DenseMatrix), + rate: (Array[Int], Array[Int], DenseMatrix), + num: Int): (Array[Int], Array[Int], DenseMatrix) = { + if (rateSum._1 == null) { + rate + } else { + val row = rateSum._3.numRows + var i = 0 + val tempIdMatrix = new Array[Int](row * num) + val tempScoreMatrix = Array.fill[Double](row * num)(Double.NegativeInfinity) + while (i < row) { + var j = 0 + var sum_index = 0 + var rate_index = 0 + val matrixIndex = i * num + while (j < num) { + if (rate._3(i, rate_index) > rateSum._3(i, sum_index)) { + tempIdMatrix(matrixIndex + j) = rate._2(matrixIndex + rate_index) + tempScoreMatrix(matrixIndex + j) = rate._3(i, rate_index) + rate_index += 1 + } else { + tempIdMatrix(matrixIndex + j) = rateSum._2(matrixIndex + sum_index) + tempScoreMatrix(matrixIndex + j) = rateSum._3(i, sum_index) + sum_index += 1 + } + j += 1 } - pq.clear() + i += 1 } - output.toSeq + (rateSum._1, tempIdMatrix, new DenseMatrix(row, num, tempScoreMatrix, true)) } - ratings.topByKey(num)(Ordering.by(_._2)) } /** * Blockifies features to improve the efficiency of cartesian product * TODO: SPARK-20443 - expose blockSize as a param? */ - private def blockify( - features: RDD[(Int, Array[Double])], - blockSize: Int = 4096): RDD[Seq[(Int, Array[Double])]] = { + def blockify( + rank: Int, + features: RDD[(Int, Array[Double])]): RDD[(Array[Int], DenseMatrix)] = { + val blockSize = 2000 // TODO: tune the block size --- End diff -- So will you add a parameter for this ?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org