Repository: spark
Updated Branches:
  refs/heads/branch-2.2 3cad66e5e -> 3686c2e96


[SPARK-20790][MLLIB] Correctly handle negative values for implicit feedback in 
ALS

## What changes were proposed in this pull request?

Revert the handling of negative values in ALS with implicit feedback, so that 
the confidence is the absolute value of the rating and the preference is 0 for 
negative ratings. This was the original behavior.

## How was this patch tested?

This patch was tested with the existing unit tests and an added unit test to 
ensure that negative ratings are not ignored.

mengxr

Author: David Eis <d...@bloomberg.net>

Closes #18022 from davideis/bugfix/negative-rating.

(cherry picked from commit d52f636228e833db89045bc7a0c17b72da13f138)
Signed-off-by: Sean Owen <so...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3686c2e9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3686c2e9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3686c2e9

Branch: refs/heads/branch-2.2
Commit: 3686c2e965758f471f9784b3e06223ce143b6aca
Parents: 3cad66e
Author: David Eis <d...@bloomberg.net>
Authored: Wed May 31 13:52:55 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed May 31 13:53:05 2017 +0100

----------------------------------------------------------------------
 .../apache/spark/ml/recommendation/ALS.scala    | 22 +++++----
 .../spark/ml/recommendation/ALSSuite.scala      | 50 +++++++++++++++++++-
 2 files changed, 62 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3686c2e9/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala 
b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index b2e3dba..3aa6457 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -763,11 +763,15 @@ object ALS extends DefaultParamsReadable[ALS] with 
Logging {
   /**
    * Representing a normal equation to solve the following weighted least 
squares problem:
    *
-   * minimize \sum,,i,, c,,i,, (a,,i,,^T^ x - b,,i,,)^2^ + lambda * x^T^ x.
+   * minimize \sum,,i,, c,,i,, (a,,i,,^T^ x - d,,i,,)^2^ + lambda * x^T^ x.
    *
    * Its normal equation is given by
    *
-   * \sum,,i,, c,,i,, (a,,i,, a,,i,,^T^ x - b,,i,, a,,i,,) + lambda * x = 0.
+   * \sum,,i,, c,,i,, (a,,i,, a,,i,,^T^ x - d,,i,, a,,i,,) + lambda * x = 0.
+   *
+   * Distributing and letting b,,i,, = c,,i,, * d,,i,,
+   *
+   * \sum,,i,, c,,i,, a,,i,, a,,i,,^T^ x - b,,i,, a,,i,, + lambda * x = 0.
    */
   private[recommendation] class NormalEquation(val k: Int) extends 
Serializable {
 
@@ -796,7 +800,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
       copyToDouble(a)
       blas.dspr(upper, k, c, da, 1, ata)
       if (b != 0.0) {
-        blas.daxpy(k, c * b, da, 1, atb, 1)
+        blas.daxpy(k, b, da, 1, atb, 1)
       }
       this
     }
@@ -1456,15 +1460,15 @@ object ALS extends DefaultParamsReadable[ALS] with 
Logging {
             val srcFactor = sortedSrcFactors(blockId)(localIndex)
             val rating = ratings(i)
             if (implicitPrefs) {
-              // Extension to the original paper to handle b < 0. confidence 
is a function of |b|
-              // instead so that it is never negative. c1 is confidence - 1.0.
+              // Extension to the original paper to handle rating < 0. 
confidence is a function
+              // of |rating| instead so that it is never negative. c1 is 
confidence - 1.
               val c1 = alpha * math.abs(rating)
-              // For rating <= 0, the corresponding preference is 0. So the 
term below is only added
-              // for rating > 0. Because YtY is already added, we need to 
adjust the scaling here.
-              if (rating > 0) {
+              // For rating <= 0, the corresponding preference is 0. So the 
second argument of add
+              // is only there for rating > 0.
+              if (rating > 0.0) {
                 numExplicits += 1
-                ls.add(srcFactor, (c1 + 1.0) / c1, c1)
               }
+              ls.add(srcFactor, if (rating > 0.0) 1.0 + c1 else 0.0, c1)
             } else {
               ls.add(srcFactor, rating)
               numExplicits += 1

http://git-wip-us.apache.org/repos/asf/spark/blob/3686c2e9/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
index 9d31e79..701040f 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
@@ -37,6 +37,7 @@ import org.apache.spark.ml.recommendation.ALS._
 import org.apache.spark.ml.recommendation.ALS.Rating
 import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
 import org.apache.spark.ml.util.TestingUtils._
+import org.apache.spark.mllib.recommendation.MatrixFactorizationModelSuite
 import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted}
@@ -78,7 +79,7 @@ class ALSSuite
     val k = 2
     val ne0 = new NormalEquation(k)
       .add(Array(1.0f, 2.0f), 3.0)
-      .add(Array(4.0f, 5.0f), 6.0, 2.0) // weighted
+      .add(Array(4.0f, 5.0f), 12.0, 2.0) // weighted
     assert(ne0.k === k)
     assert(ne0.triK === k * (k + 1) / 2)
     // NumPy code that computes the expected values:
@@ -348,6 +349,37 @@ class ALSSuite
   }
 
   /**
+  * Train ALS using the given training set and parameters
+  * @param training training dataset
+  * @param rank rank of the matrix factorization
+  * @param maxIter max number of iterations
+  * @param regParam regularization constant
+  * @param implicitPrefs whether to use implicit preference
+  * @param numUserBlocks number of user blocks
+  * @param numItemBlocks number of item blocks
+  * @return a trained ALSModel
+  */
+  def trainALS(
+    training: RDD[Rating[Int]],
+    rank: Int,
+    maxIter: Int,
+    regParam: Double,
+    implicitPrefs: Boolean = false,
+    numUserBlocks: Int = 2,
+    numItemBlocks: Int = 3): ALSModel = {
+    val spark = this.spark
+    import spark.implicits._
+    val als = new ALS()
+      .setRank(rank)
+      .setRegParam(regParam)
+      .setImplicitPrefs(implicitPrefs)
+      .setNumUserBlocks(numUserBlocks)
+      .setNumItemBlocks(numItemBlocks)
+      .setSeed(0)
+    als.fit(training.toDF())
+  }
+
+  /**
    * Test ALS using the given training/test splits and parameters.
    * @param training training dataset
    * @param test test dataset
@@ -455,6 +487,22 @@ class ALSSuite
       targetRMSE = 0.3)
   }
 
+  test("implicit feedback regression") {
+    val trainingWithNeg = sc.parallelize(Array(Rating(0, 0, 1), Rating(1, 1, 
1), Rating(0, 1, -3)))
+    val trainingWithZero = sc.parallelize(Array(Rating(0, 0, 1), Rating(1, 1, 
1), Rating(0, 1, 0)))
+    val modelWithNeg =
+      trainALS(trainingWithNeg, rank = 1, maxIter = 5, regParam = 0.01, 
implicitPrefs = true)
+    val modelWithZero =
+      trainALS(trainingWithZero, rank = 1, maxIter = 5, regParam = 0.01, 
implicitPrefs = true)
+    val userFactorsNeg = modelWithNeg.userFactors
+    val itemFactorsNeg = modelWithNeg.itemFactors
+    val userFactorsZero = modelWithZero.userFactors
+    val itemFactorsZero = modelWithZero.itemFactors
+    userFactorsNeg.collect().foreach(arr => logInfo(s"implicit test " + 
arr.mkString(" ")))
+    userFactorsZero.collect().foreach(arr => logInfo(s"implicit test " + 
arr.mkString(" ")))
+    assert(userFactorsNeg.intersect(userFactorsZero).count() == 0)
+    assert(itemFactorsNeg.intersect(itemFactorsZero).count() == 0)
+  }
   test("using generic ID types") {
     val (ratings, _) = genImplicitTestData(numUsers = 20, numItems = 40, rank 
= 2, noiseStd = 0.01)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to