Repository: spark
Updated Branches:
  refs/heads/master 4b5cfc988 -> 740b034f1


[SPARK-4362] [MLLIB] Make prediction probability available in NaiveBayesModel

Add predictProbabilities to Naive Bayes, return class probabilities.

Continues https://github.com/apache/spark/pull/6761

Author: Sean Owen <so...@cloudera.com>

Closes #7376 from srowen/SPARK-4362 and squashes the following commits:

23d5a76 [Sean Owen] Fix model.labels -> model.theta
95d91fb [Sean Owen] Check that predicted probabilities sum to 1
b32d1c8 [Sean Owen] Add predictProbabilities to Naive Bayes, return class 
probabilities


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/740b034f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/740b034f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/740b034f

Branch: refs/heads/master
Commit: 740b034f1ca885a386f5a9ef7e0c81c714b047ff
Parents: 4b5cfc9
Author: Sean Owen <so...@cloudera.com>
Authored: Tue Jul 14 22:44:54 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Jul 14 22:44:54 2015 +0100

----------------------------------------------------------------------
 .../spark/mllib/classification/NaiveBayes.scala | 76 +++++++++++++++-----
 .../mllib/classification/NaiveBayesSuite.scala  | 55 +++++++++++++-
 2 files changed, 113 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/740b034f/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
index f51ee36..9e379d7 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
@@ -93,26 +93,70 @@ class NaiveBayesModel private[mllib] (
   override def predict(testData: Vector): Double = {
     modelType match {
       case Multinomial =>
-        val prob = thetaMatrix.multiply(testData)
-        BLAS.axpy(1.0, piVector, prob)
-        labels(prob.argmax)
+        labels(multinomialCalculation(testData).argmax)
       case Bernoulli =>
-        testData.foreachActive { (index, value) =>
-          if (value != 0.0 && value != 1.0) {
-            throw new SparkException(
-              s"Bernoulli naive Bayes requires 0 or 1 feature values but found 
$testData.")
-          }
-        }
-        val prob = thetaMinusNegTheta.get.multiply(testData)
-        BLAS.axpy(1.0, piVector, prob)
-        BLAS.axpy(1.0, negThetaSum.get, prob)
-        labels(prob.argmax)
-      case _ =>
-        // This should never happen.
-        throw new UnknownError(s"Invalid modelType: $modelType.")
+        labels(bernoulliCalculation(testData).argmax)
+    }
+  }
+
+  /**
+   * Predict values for the given data set using the model trained.
+   *
+   * @param testData RDD representing data points to be predicted
+   * @return an RDD[Vector] where each entry contains the predicted posterior 
class probabilities,
+   *         in the same order as class labels
+   */
+  def predictProbabilities(testData: RDD[Vector]): RDD[Vector] = {
+    val bcModel = testData.context.broadcast(this)
+    testData.mapPartitions { iter =>
+      val model = bcModel.value
+      iter.map(model.predictProbabilities)
     }
   }
 
+  /**
+   * Predict posterior class probabilities for a single data point using the 
model trained.
+   *
+   * @param testData array representing a single data point
+   * @return predicted posterior class probabilities from the trained model,
+   *         in the same order as class labels
+   */
+  def predictProbabilities(testData: Vector): Vector = {
+    modelType match {
+      case Multinomial =>
+        posteriorProbabilities(multinomialCalculation(testData))
+      case Bernoulli =>
+        posteriorProbabilities(bernoulliCalculation(testData))
+    }
+  }
+
+  private def multinomialCalculation(testData: Vector) = {
+    val prob = thetaMatrix.multiply(testData)
+    BLAS.axpy(1.0, piVector, prob)
+    prob
+  }
+
+  private def bernoulliCalculation(testData: Vector) = {
+    testData.foreachActive((_, value) =>
+      if (value != 0.0 && value != 1.0) {
+        throw new SparkException(
+          s"Bernoulli naive Bayes requires 0 or 1 feature values but found 
$testData.")
+      }
+    )
+    val prob = thetaMinusNegTheta.get.multiply(testData)
+    BLAS.axpy(1.0, piVector, prob)
+    BLAS.axpy(1.0, negThetaSum.get, prob)
+    prob
+  }
+
+  private def posteriorProbabilities(logProb: DenseVector) = {
+    val logProbArray = logProb.toArray
+    val maxLog = logProbArray.max
+    val scaledProbs = logProbArray.map(lp => math.exp(lp - maxLog))
+    val probSum = scaledProbs.sum
+    new DenseVector(scaledProbs.map(_ / probSum))
+  }
+
   override def save(sc: SparkContext, path: String): Unit = {
     val data = NaiveBayesModel.SaveLoadV2_0.Data(labels, pi, theta, modelType)
     NaiveBayesModel.SaveLoadV2_0.save(sc, path, data)

http://git-wip-us.apache.org/repos/asf/spark/blob/740b034f/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
index f7fc873..cffa1ab 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
@@ -19,13 +19,14 @@ package org.apache.spark.mllib.classification
 
 import scala.util.Random
 
-import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => 
brzArgmax, sum => brzSum}
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, Vector => BV}
 import breeze.stats.distributions.{Multinomial => BrzMultinomial}
 
 import org.apache.spark.{SparkException, SparkFunSuite}
-import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.linalg.{Vector, Vectors}
 import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.mllib.util.{LocalClusterSparkContext, 
MLlibTestSparkContext}
+import org.apache.spark.mllib.util.TestingUtils._
 import org.apache.spark.util.Utils
 
 object NaiveBayesSuite {
@@ -154,6 +155,29 @@ class NaiveBayesSuite extends SparkFunSuite with 
MLlibTestSparkContext {
 
     // Test prediction on Array.
     validatePrediction(validationData.map(row => model.predict(row.features)), 
validationData)
+
+    // Test posteriors
+    validationData.map(_.features).foreach { features =>
+      val predicted = model.predictProbabilities(features).toArray
+      assert(predicted.sum ~== 1.0 relTol 1.0e-10)
+      val expected = expectedMultinomialProbabilities(model, features)
+      expected.zip(predicted).foreach { case (e, p) => assert(e ~== p relTol 
1.0e-10) }
+    }
+  }
+
+  /**
+   * @param model Multinomial Naive Bayes model
+   * @param testData input to compute posterior probabilities for
+   * @return posterior class probabilities (in order of labels) for input
+   */
+  private def expectedMultinomialProbabilities(model: NaiveBayesModel, 
testData: Vector) = {
+    val piVector = new BDV(model.pi)
+    // model.theta is row-major; treat it as col-major representation of 
transpose, and transpose:
+    val thetaMatrix = new BDM(model.theta(0).length, model.theta.length, 
model.theta.flatten).t
+    val logClassProbs: BV[Double] = piVector + (thetaMatrix * 
testData.toBreeze)
+    val classProbs = logClassProbs.toArray.map(math.exp)
+    val classProbsSum = classProbs.sum
+    classProbs.map(_ / classProbsSum)
   }
 
   test("Naive Bayes Bernoulli") {
@@ -182,6 +206,33 @@ class NaiveBayesSuite extends SparkFunSuite with 
MLlibTestSparkContext {
 
     // Test prediction on Array.
     validatePrediction(validationData.map(row => model.predict(row.features)), 
validationData)
+
+    // Test posteriors
+    validationData.map(_.features).foreach { features =>
+      val predicted = model.predictProbabilities(features).toArray
+      assert(predicted.sum ~== 1.0 relTol 1.0e-10)
+      val expected = expectedBernoulliProbabilities(model, features)
+      expected.zip(predicted).foreach { case (e, p) => assert(e ~== p relTol 
1.0e-10) }
+    }
+  }
+
+  /**
+   * @param model Bernoulli Naive Bayes model
+   * @param testData input to compute posterior probabilities for
+   * @return posterior class probabilities (in order of labels) for input
+   */
+  private def expectedBernoulliProbabilities(model: NaiveBayesModel, testData: 
Vector) = {
+    val piVector = new BDV(model.pi)
+    val thetaMatrix = new BDM(model.theta(0).length, model.theta.length, 
model.theta.flatten).t
+    val negThetaMatrix = new BDM(model.theta(0).length, model.theta.length,
+      model.theta.flatten.map(v => math.log(1.0 - math.exp(v)))).t
+    val testBreeze = testData.toBreeze
+    val negTestBreeze = new BDV(Array.fill(testBreeze.size)(1.0)) - testBreeze
+    val piTheta: BV[Double] = piVector + (thetaMatrix * testBreeze)
+    val logClassProbs: BV[Double] = piTheta + (negThetaMatrix * negTestBreeze)
+    val classProbs = logClassProbs.toArray.map(math.exp)
+    val classProbsSum = classProbs.sum
+    classProbs.map(_ / classProbsSum)
   }
 
   test("detect negative values") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to