[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-11-09 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r87151780
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala ---
@@ -109,10 +118,89 @@ class NaiveBayes @Since("1.5.0") (
 s" numClasses=$numClasses, but thresholds has length 
${$(thresholds).length}")
 }
 
-val oldDataset: RDD[OldLabeledPoint] =
-  extractLabeledPoints(dataset).map(OldLabeledPoint.fromML)
-val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), 
$(modelType))
-NaiveBayesModel.fromOld(oldModel, this)
+val numFeatures = 
dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
+
+val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+
+  require(values.forall(_ >= 0.0),
+s"Naive Bayes requires nonnegative feature values but found $v.")
+}
+
+val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+
+  require(values.forall(v => v == 0.0 || v == 1.0),
+s"Bernoulli naive Bayes requires 0 or 1 feature values but found 
$v.")
+}
+
+val requireValues: Vector => Unit = {
+  $(modelType) match {
--- End diff --

@thunterdb Good catch! I updated at #15826. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-11-08 Thread thunterdb
Github user thunterdb commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r87029278
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala ---
@@ -109,10 +118,89 @@ class NaiveBayes @Since("1.5.0") (
 s" numClasses=$numClasses, but thresholds has length 
${$(thresholds).length}")
 }
 
-val oldDataset: RDD[OldLabeledPoint] =
-  extractLabeledPoints(dataset).map(OldLabeledPoint.fromML)
-val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), 
$(modelType))
-NaiveBayesModel.fromOld(oldModel, this)
+val numFeatures = 
dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
+
+val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+
+  require(values.forall(_ >= 0.0),
+s"Naive Bayes requires nonnegative feature values but found $v.")
+}
+
+val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+
+  require(values.forall(v => v == 0.0 || v == 1.0),
+s"Bernoulli naive Bayes requires 0 or 1 feature values but found 
$v.")
+}
+
+val requireValues: Vector => Unit = {
+  $(modelType) match {
--- End diff --

Looking at the decompiled bytecode, the situation is a bit more 
complicated: the pattern match is inlined and is not wrapped in a closure, so 
it would work as you would expect. However, because this relies on a compiler 
optimization, we should still evaluate the model type before outside 
`requireValues`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-11-08 Thread thunterdb
Github user thunterdb commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r87025609
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala ---
@@ -109,10 +118,89 @@ class NaiveBayes @Since("1.5.0") (
 s" numClasses=$numClasses, but thresholds has length 
${$(thresholds).length}")
 }
 
-val oldDataset: RDD[OldLabeledPoint] =
-  extractLabeledPoints(dataset).map(OldLabeledPoint.fromML)
-val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), 
$(modelType))
-NaiveBayesModel.fromOld(oldModel, this)
+val numFeatures = 
dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
+
+val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+
+  require(values.forall(_ >= 0.0),
+s"Naive Bayes requires nonnegative feature values but found $v.")
+}
+
+val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+
+  require(values.forall(v => v == 0.0 || v == 1.0),
+s"Bernoulli naive Bayes requires 0 or 1 feature values but found 
$v.")
+}
+
+val requireValues: Vector => Unit = {
+  $(modelType) match {
--- End diff --

@zhengruifeng (major) you are capturing the outer object, through `$` and 
`modelType`. You should make a `val` outside the method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-11-08 Thread thunterdb
Github user thunterdb commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r87025222
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala ---
@@ -109,10 +118,89 @@ class NaiveBayes @Since("1.5.0") (
 s" numClasses=$numClasses, but thresholds has length 
${$(thresholds).length}")
 }
 
-val oldDataset: RDD[OldLabeledPoint] =
-  extractLabeledPoints(dataset).map(OldLabeledPoint.fromML)
-val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), 
$(modelType))
-NaiveBayesModel.fromOld(oldModel, this)
+val numFeatures = 
dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
+
+val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
--- End diff --

Also, this will help reducing the length of this method, which is very long.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-11-08 Thread thunterdb
Github user thunterdb commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r87025073
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala ---
@@ -109,10 +118,89 @@ class NaiveBayes @Since("1.5.0") (
 s" numClasses=$numClasses, but thresholds has length 
${$(thresholds).length}")
 }
 
-val oldDataset: RDD[OldLabeledPoint] =
-  extractLabeledPoints(dataset).map(OldLabeledPoint.fromML)
-val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), 
$(modelType))
-NaiveBayesModel.fromOld(oldModel, this)
+val numFeatures = 
dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
+
+val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
--- End diff --

@zhengruifeng (minor) this method is static and does not depend on the 
state of the object. It is preferable to put it in the companion object. Also, 
because people do not have a functional programing background, it is better to 
define it with `def` rather than a `val`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/12819


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-30 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r81285828
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala ---
@@ -355,79 +356,33 @@ class NaiveBayes private (
*/
   @Since("0.9.0")
   def run(data: RDD[LabeledPoint]): NaiveBayesModel = {
-val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
-  val values = v match {
-case sv: SparseVector => sv.values
-case dv: DenseVector => dv.values
-  }
-  if (!values.forall(_ >= 0.0)) {
-throw new SparkException(s"Naive Bayes requires nonnegative 
feature values but found $v.")
-  }
-}
+val spark = SparkSession
+  .builder()
+  .sparkContext(data.context)
+  .getOrCreate()
 
-val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
-  val values = v match {
-case sv: SparseVector => sv.values
-case dv: DenseVector => dv.values
-  }
-  if (!values.forall(v => v == 0.0 || v == 1.0)) {
-throw new SparkException(
-  s"Bernoulli naive Bayes requires 0 or 1 feature values but found 
$v.")
-  }
-}
+import spark.implicits._
 
-// Aggregates term frequencies per label.
-// TODO: Calling combineByKey and collect creates two stages, we can 
implement something
-// TODO: similar to reduceByKeyLocally to save one stage.
-val aggregated = data.map(p => (p.label, 
p.features)).combineByKey[(Long, DenseVector)](
-  createCombiner = (v: Vector) => {
-if (modelType == Bernoulli) {
-  requireZeroOneBernoulliValues(v)
-} else {
-  requireNonnegativeValues(v)
-}
-(1L, v.copy.toDense)
-  },
-  mergeValue = (c: (Long, DenseVector), v: Vector) => {
-requireNonnegativeValues(v)
-BLAS.axpy(1.0, v, c._2)
-(c._1 + 1L, c._2)
-  },
-  mergeCombiners = (c1: (Long, DenseVector), c2: (Long, DenseVector)) 
=> {
-BLAS.axpy(1.0, c2._2, c1._2)
-(c1._1 + c2._1, c1._2)
-  }
-).collect().sortBy(_._1)
+val nb = new NewNaiveBayes()
+  .setModelType(modelType)
+  .setSmoothing(lambda)
 
-val numLabels = aggregated.length
-var numDocuments = 0L
-aggregated.foreach { case (_, (n, _)) =>
-  numDocuments += n
-}
-val numFeatures = aggregated.head match { case (_, (_, v)) => v.size }
-
-val labels = new Array[Double](numLabels)
-val pi = new Array[Double](numLabels)
-val theta = Array.fill(numLabels)(new Array[Double](numFeatures))
-
-val piLogDenom = math.log(numDocuments + numLabels * lambda)
-var i = 0
-aggregated.foreach { case (label, (n, sumTermFreqs)) =>
-  labels(i) = label
-  pi(i) = math.log(n + lambda) - piLogDenom
-  val thetaLogDenom = modelType match {
-case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures 
* lambda)
-case Bernoulli => math.log(n + 2.0 * lambda)
-case _ =>
-  // This should never happen.
-  throw new UnknownError(s"Invalid modelType: $modelType.")
-  }
-  var j = 0
-  while (j < numFeatures) {
-theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom
-j += 1
-  }
-  i += 1
+val labels = data.map(_.label).distinct().collect().sorted
--- End diff --

Yeah, untouching old mllib implementation is one of the candidates, but not 
an elegant solution. I'm ambivalent about this issue. Let's leave this issue as 
follow-up work. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-29 Thread zhengruifeng
Github user zhengruifeng commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r81256496
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala ---
@@ -355,79 +356,33 @@ class NaiveBayes private (
*/
   @Since("0.9.0")
   def run(data: RDD[LabeledPoint]): NaiveBayesModel = {
-val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
-  val values = v match {
-case sv: SparseVector => sv.values
-case dv: DenseVector => dv.values
-  }
-  if (!values.forall(_ >= 0.0)) {
-throw new SparkException(s"Naive Bayes requires nonnegative 
feature values but found $v.")
-  }
-}
+val spark = SparkSession
+  .builder()
+  .sparkContext(data.context)
+  .getOrCreate()
 
-val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
-  val values = v match {
-case sv: SparseVector => sv.values
-case dv: DenseVector => dv.values
-  }
-  if (!values.forall(v => v == 0.0 || v == 1.0)) {
-throw new SparkException(
-  s"Bernoulli naive Bayes requires 0 or 1 feature values but found 
$v.")
-  }
-}
+import spark.implicits._
 
-// Aggregates term frequencies per label.
-// TODO: Calling combineByKey and collect creates two stages, we can 
implement something
-// TODO: similar to reduceByKeyLocally to save one stage.
-val aggregated = data.map(p => (p.label, 
p.features)).combineByKey[(Long, DenseVector)](
-  createCombiner = (v: Vector) => {
-if (modelType == Bernoulli) {
-  requireZeroOneBernoulliValues(v)
-} else {
-  requireNonnegativeValues(v)
-}
-(1L, v.copy.toDense)
-  },
-  mergeValue = (c: (Long, DenseVector), v: Vector) => {
-requireNonnegativeValues(v)
-BLAS.axpy(1.0, v, c._2)
-(c._1 + 1L, c._2)
-  },
-  mergeCombiners = (c1: (Long, DenseVector), c2: (Long, DenseVector)) 
=> {
-BLAS.axpy(1.0, c2._2, c1._2)
-(c1._1 + c2._1, c1._2)
-  }
-).collect().sortBy(_._1)
+val nb = new NewNaiveBayes()
+  .setModelType(modelType)
+  .setSmoothing(lambda)
 
-val numLabels = aggregated.length
-var numDocuments = 0L
-aggregated.foreach { case (_, (n, _)) =>
-  numDocuments += n
-}
-val numFeatures = aggregated.head match { case (_, (_, v)) => v.size }
-
-val labels = new Array[Double](numLabels)
-val pi = new Array[Double](numLabels)
-val theta = Array.fill(numLabels)(new Array[Double](numFeatures))
-
-val piLogDenom = math.log(numDocuments + numLabels * lambda)
-var i = 0
-aggregated.foreach { case (label, (n, sumTermFreqs)) =>
-  labels(i) = label
-  pi(i) = math.log(n + lambda) - piLogDenom
-  val thetaLogDenom = modelType match {
-case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures 
* lambda)
-case Bernoulli => math.log(n + 2.0 * lambda)
-case _ =>
-  // This should never happen.
-  throw new UnknownError(s"Invalid modelType: $modelType.")
-  }
-  var j = 0
-  while (j < numFeatures) {
-theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom
-j += 1
-  }
-  i += 1
+val labels = data.map(_.label).distinct().collect().sorted
+
+// Input labels for [[org.apache.spark.ml.classification.NaiveBayes]] 
must be
+// in range [0, numClasses).
+val dataset = data.map {
--- End diff --

`data.toDF()` dont work, because there are two types of `VectorUDT`.
This change will cause MLlib's test failures, so I revert it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-29 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r81127281
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala ---
@@ -109,10 +119,90 @@ class NaiveBayes @Since("1.5.0") (
 s" numClasses=$numClasses, but thresholds has length 
${$(thresholds).length}")
 }
 
-val oldDataset: RDD[OldLabeledPoint] =
-  extractLabeledPoints(dataset).map(OldLabeledPoint.fromML)
-val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), 
$(modelType))
-NaiveBayesModel.fromOld(oldModel, this)
+val numFeatures = 
dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
+
+val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+  if (!values.forall(_ >= 0.0)) {
--- End diff --

let's just use `require` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-29 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r81225025
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala ---
@@ -355,79 +356,33 @@ class NaiveBayes private (
*/
   @Since("0.9.0")
   def run(data: RDD[LabeledPoint]): NaiveBayesModel = {
-val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
-  val values = v match {
-case sv: SparseVector => sv.values
-case dv: DenseVector => dv.values
-  }
-  if (!values.forall(_ >= 0.0)) {
-throw new SparkException(s"Naive Bayes requires nonnegative 
feature values but found $v.")
-  }
-}
+val spark = SparkSession
+  .builder()
+  .sparkContext(data.context)
+  .getOrCreate()
 
-val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
-  val values = v match {
-case sv: SparseVector => sv.values
-case dv: DenseVector => dv.values
-  }
-  if (!values.forall(v => v == 0.0 || v == 1.0)) {
-throw new SparkException(
-  s"Bernoulli naive Bayes requires 0 or 1 feature values but found 
$v.")
-  }
-}
+import spark.implicits._
 
-// Aggregates term frequencies per label.
-// TODO: Calling combineByKey and collect creates two stages, we can 
implement something
-// TODO: similar to reduceByKeyLocally to save one stage.
-val aggregated = data.map(p => (p.label, 
p.features)).combineByKey[(Long, DenseVector)](
-  createCombiner = (v: Vector) => {
-if (modelType == Bernoulli) {
-  requireZeroOneBernoulliValues(v)
-} else {
-  requireNonnegativeValues(v)
-}
-(1L, v.copy.toDense)
-  },
-  mergeValue = (c: (Long, DenseVector), v: Vector) => {
-requireNonnegativeValues(v)
-BLAS.axpy(1.0, v, c._2)
-(c._1 + 1L, c._2)
-  },
-  mergeCombiners = (c1: (Long, DenseVector), c2: (Long, DenseVector)) 
=> {
-BLAS.axpy(1.0, c2._2, c1._2)
-(c1._1 + c2._1, c1._2)
-  }
-).collect().sortBy(_._1)
+val nb = new NewNaiveBayes()
+  .setModelType(modelType)
+  .setSmoothing(lambda)
 
-val numLabels = aggregated.length
-var numDocuments = 0L
-aggregated.foreach { case (_, (n, _)) =>
-  numDocuments += n
-}
-val numFeatures = aggregated.head match { case (_, (_, v)) => v.size }
-
-val labels = new Array[Double](numLabels)
-val pi = new Array[Double](numLabels)
-val theta = Array.fill(numLabels)(new Array[Double](numFeatures))
-
-val piLogDenom = math.log(numDocuments + numLabels * lambda)
-var i = 0
-aggregated.foreach { case (label, (n, sumTermFreqs)) =>
-  labels(i) = label
-  pi(i) = math.log(n + lambda) - piLogDenom
-  val thetaLogDenom = modelType match {
-case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures 
* lambda)
-case Bernoulli => math.log(n + 2.0 * lambda)
-case _ =>
-  // This should never happen.
-  throw new UnknownError(s"Invalid modelType: $modelType.")
-  }
-  var j = 0
-  while (j < numFeatures) {
-theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom
-j += 1
-  }
-  i += 1
+val labels = data.map(_.label).distinct().collect().sorted
--- End diff --

Yeah, so I think this is going to introduce a performance regression into 
the mllib version. Before, you could get the labels from the aggregates, but if 
we make mllib call into the ml implementation we need an extra pass through the 
data. One way to avoid this is leave the mllib implementation untouched, which 
has been done in the past with GBT and RF. It's not a great solution, I'm open 
to other suggestions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-29 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r81127329
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala ---
@@ -109,10 +119,90 @@ class NaiveBayes @Since("1.5.0") (
 s" numClasses=$numClasses, but thresholds has length 
${$(thresholds).length}")
 }
 
-val oldDataset: RDD[OldLabeledPoint] =
-  extractLabeledPoints(dataset).map(OldLabeledPoint.fromML)
-val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), 
$(modelType))
-NaiveBayesModel.fromOld(oldModel, this)
+val numFeatures = 
dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
+
+val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+  if (!values.forall(_ >= 0.0)) {
+throw new SparkException(s"Naive Bayes requires nonnegative 
feature values but found $v.")
+  }
+}
+
+val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+  if (!values.forall(v => v == 0.0 || v == 1.0)) {
--- End diff --

ditto.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-29 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r81132746
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala ---
@@ -280,15 +366,14 @@ object NaiveBayesModel extends 
MLReadable[NaiveBayesModel] {
   val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
 
   val dataPath = new Path(path, "data").toString
-  val data = sparkSession.read.parquet(dataPath)
-  val vecConverted = MLUtils.convertVectorColumnsToML(data, "pi")
-  val Row(pi: Vector, theta: Matrix) = 
MLUtils.convertMatrixColumnsToML(vecConverted, "theta")
-.select("pi", "theta")
-.head()
+  val data = sparkSession.read.parquet(dataPath).select("pi", 
"theta").head()
+  val pi = data.getAs[Vector](0)
+  val theta = data.getAs[Matrix](1)
   val model = new NaiveBayesModel(metadata.uid, pi, theta)
 
   DefaultParamsReader.getAndSetParams(model, metadata)
   model
 }
   }
+
--- End diff --

remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-29 Thread zhengruifeng
Github user zhengruifeng commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r81128334
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala ---
@@ -355,79 +356,33 @@ class NaiveBayes private (
*/
   @Since("0.9.0")
   def run(data: RDD[LabeledPoint]): NaiveBayesModel = {
-val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
-  val values = v match {
-case sv: SparseVector => sv.values
-case dv: DenseVector => dv.values
-  }
-  if (!values.forall(_ >= 0.0)) {
-throw new SparkException(s"Naive Bayes requires nonnegative 
feature values but found $v.")
-  }
-}
+val spark = SparkSession
+  .builder()
+  .sparkContext(data.context)
+  .getOrCreate()
 
-val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
-  val values = v match {
-case sv: SparseVector => sv.values
-case dv: DenseVector => dv.values
-  }
-  if (!values.forall(v => v == 0.0 || v == 1.0)) {
-throw new SparkException(
-  s"Bernoulli naive Bayes requires 0 or 1 feature values but found 
$v.")
-  }
-}
+import spark.implicits._
 
-// Aggregates term frequencies per label.
-// TODO: Calling combineByKey and collect creates two stages, we can 
implement something
-// TODO: similar to reduceByKeyLocally to save one stage.
-val aggregated = data.map(p => (p.label, 
p.features)).combineByKey[(Long, DenseVector)](
-  createCombiner = (v: Vector) => {
-if (modelType == Bernoulli) {
-  requireZeroOneBernoulliValues(v)
-} else {
-  requireNonnegativeValues(v)
-}
-(1L, v.copy.toDense)
-  },
-  mergeValue = (c: (Long, DenseVector), v: Vector) => {
-requireNonnegativeValues(v)
-BLAS.axpy(1.0, v, c._2)
-(c._1 + 1L, c._2)
-  },
-  mergeCombiners = (c1: (Long, DenseVector), c2: (Long, DenseVector)) 
=> {
-BLAS.axpy(1.0, c2._2, c1._2)
-(c1._1 + c2._1, c1._2)
-  }
-).collect().sortBy(_._1)
+val nb = new NewNaiveBayes()
+  .setModelType(modelType)
+  .setSmoothing(lambda)
 
-val numLabels = aggregated.length
-var numDocuments = 0L
-aggregated.foreach { case (_, (n, _)) =>
-  numDocuments += n
-}
-val numFeatures = aggregated.head match { case (_, (_, v)) => v.size }
-
-val labels = new Array[Double](numLabels)
-val pi = new Array[Double](numLabels)
-val theta = Array.fill(numLabels)(new Array[Double](numFeatures))
-
-val piLogDenom = math.log(numDocuments + numLabels * lambda)
-var i = 0
-aggregated.foreach { case (label, (n, sumTermFreqs)) =>
-  labels(i) = label
-  pi(i) = math.log(n + lambda) - piLogDenom
-  val thetaLogDenom = modelType match {
-case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures 
* lambda)
-case Bernoulli => math.log(n + 2.0 * lambda)
-case _ =>
-  // This should never happen.
-  throw new UnknownError(s"Invalid modelType: $modelType.")
-  }
-  var j = 0
-  while (j < numFeatures) {
-theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom
-j += 1
-  }
-  i += 1
+val labels = data.map(_.label).distinct().collect().sorted
--- End diff --

Labels in mllib's NB implementation are not guaranteed to be in range [0, 
numClasses), and the following code with label set to `{-1,+1}` run successfully
```
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.classification.NaiveBayes

val points = Seq(LabeledPoint(-1.0, Vectors.dense(Array(1.0,2.0))), 
LabeledPoint(+1.0, Vectors.dense(Array(1.0,2.0
val rdd = sc.parallelize(points)
val nbm = NaiveBayes.train(rdd)

```




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-29 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r81107309
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala ---
@@ -355,79 +356,33 @@ class NaiveBayes private (
*/
   @Since("0.9.0")
   def run(data: RDD[LabeledPoint]): NaiveBayesModel = {
-val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
-  val values = v match {
-case sv: SparseVector => sv.values
-case dv: DenseVector => dv.values
-  }
-  if (!values.forall(_ >= 0.0)) {
-throw new SparkException(s"Naive Bayes requires nonnegative 
feature values but found $v.")
-  }
-}
+val spark = SparkSession
+  .builder()
+  .sparkContext(data.context)
+  .getOrCreate()
 
-val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
-  val values = v match {
-case sv: SparseVector => sv.values
-case dv: DenseVector => dv.values
-  }
-  if (!values.forall(v => v == 0.0 || v == 1.0)) {
-throw new SparkException(
-  s"Bernoulli naive Bayes requires 0 or 1 feature values but found 
$v.")
-  }
-}
+import spark.implicits._
 
-// Aggregates term frequencies per label.
-// TODO: Calling combineByKey and collect creates two stages, we can 
implement something
-// TODO: similar to reduceByKeyLocally to save one stage.
-val aggregated = data.map(p => (p.label, 
p.features)).combineByKey[(Long, DenseVector)](
-  createCombiner = (v: Vector) => {
-if (modelType == Bernoulli) {
-  requireZeroOneBernoulliValues(v)
-} else {
-  requireNonnegativeValues(v)
-}
-(1L, v.copy.toDense)
-  },
-  mergeValue = (c: (Long, DenseVector), v: Vector) => {
-requireNonnegativeValues(v)
-BLAS.axpy(1.0, v, c._2)
-(c._1 + 1L, c._2)
-  },
-  mergeCombiners = (c1: (Long, DenseVector), c2: (Long, DenseVector)) 
=> {
-BLAS.axpy(1.0, c2._2, c1._2)
-(c1._1 + c2._1, c1._2)
-  }
-).collect().sortBy(_._1)
+val nb = new NewNaiveBayes()
+  .setModelType(modelType)
+  .setSmoothing(lambda)
 
-val numLabels = aggregated.length
-var numDocuments = 0L
-aggregated.foreach { case (_, (n, _)) =>
-  numDocuments += n
-}
-val numFeatures = aggregated.head match { case (_, (_, v)) => v.size }
-
-val labels = new Array[Double](numLabels)
-val pi = new Array[Double](numLabels)
-val theta = Array.fill(numLabels)(new Array[Double](numFeatures))
-
-val piLogDenom = math.log(numDocuments + numLabels * lambda)
-var i = 0
-aggregated.foreach { case (label, (n, sumTermFreqs)) =>
-  labels(i) = label
-  pi(i) = math.log(n + lambda) - piLogDenom
-  val thetaLogDenom = modelType match {
-case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures 
* lambda)
-case Bernoulli => math.log(n + 2.0 * lambda)
-case _ =>
-  // This should never happen.
-  throw new UnknownError(s"Invalid modelType: $modelType.")
-  }
-  var j = 0
-  while (j < numFeatures) {
-theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom
-j += 1
-  }
-  i += 1
+val labels = data.map(_.label).distinct().collect().sorted
+
+// Input labels for [[org.apache.spark.ml.classification.NaiveBayes]] 
must be
+// in range [0, numClasses).
+val dataset = data.map {
--- End diff --

```val dataset = data.toDF()```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-29 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r81106353
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala 
---
@@ -150,6 +150,54 @@ class NaiveBayesSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defa
 validateProbabilities(featureAndProbabilities, model, "multinomial")
   }
 
+  test("Naive Bayes Multinomial with weighted samples") {
+val nPoints = 1000
+val piArray = Array(0.5, 0.1, 0.4).map(math.log)
+val thetaArray = Array(
+  Array(0.70, 0.10, 0.10, 0.10), // label 0
+  Array(0.10, 0.70, 0.10, 0.10), // label 1
+  Array(0.10, 0.10, 0.70, 0.10) // label 2
+).map(_.map(math.log))
+
+val testData = spark.createDataFrame(
+  generateNaiveBayesInput(piArray, thetaArray, nPoints, 42, 
"multinomial"))
+val (overSampledData, weightedData) =
+  MLTestingUtils.genEquivalentOversampledAndWeightedInstances(testData,
+"label", "features", 42L)
+val nb = new NaiveBayes().setModelType("multinomial")
+val unweightedModel = nb.fit(weightedData)
+val overSampledModel = nb.fit(overSampledData)
+val weightedModel = nb.setWeightCol("weight").fit(weightedData)
+assert(weightedModel.theta ~== overSampledModel.theta relTol 0.001)
+assert(weightedModel.pi ~== overSampledModel.pi relTol 0.001)
+assert(unweightedModel.theta !~= overSampledModel.theta relTol 0.001)
+assert(unweightedModel.pi !~= overSampledModel.pi relTol 0.001)
+  }
+
+  test("Naive Bayes Bernoulli with weighted samples") {
+val nPoints = 1
+val piArray = Array(0.5, 0.3, 0.2).map(math.log)
+val thetaArray = Array(
+  Array(0.50, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 
0.02, 0.40), // label 0
+  Array(0.02, 0.70, 0.10, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 
0.02, 0.02), // label 1
+  Array(0.02, 0.02, 0.60, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 
0.02, 0.30)  // label 2
+).map(_.map(math.log))
+
+val testData = spark.createDataFrame(
+  generateNaiveBayesInput(piArray, thetaArray, nPoints, 42, 
"bernoulli"))
--- End diff --

Ditto.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-29 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r81105501
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala ---
@@ -355,79 +356,33 @@ class NaiveBayes private (
*/
   @Since("0.9.0")
   def run(data: RDD[LabeledPoint]): NaiveBayesModel = {
-val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
-  val values = v match {
-case sv: SparseVector => sv.values
-case dv: DenseVector => dv.values
-  }
-  if (!values.forall(_ >= 0.0)) {
-throw new SparkException(s"Naive Bayes requires nonnegative 
feature values but found $v.")
-  }
-}
+val spark = SparkSession
+  .builder()
+  .sparkContext(data.context)
+  .getOrCreate()
 
-val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
-  val values = v match {
-case sv: SparseVector => sv.values
-case dv: DenseVector => dv.values
-  }
-  if (!values.forall(v => v == 0.0 || v == 1.0)) {
-throw new SparkException(
-  s"Bernoulli naive Bayes requires 0 or 1 feature values but found 
$v.")
-  }
-}
+import spark.implicits._
 
-// Aggregates term frequencies per label.
-// TODO: Calling combineByKey and collect creates two stages, we can 
implement something
-// TODO: similar to reduceByKeyLocally to save one stage.
-val aggregated = data.map(p => (p.label, 
p.features)).combineByKey[(Long, DenseVector)](
-  createCombiner = (v: Vector) => {
-if (modelType == Bernoulli) {
-  requireZeroOneBernoulliValues(v)
-} else {
-  requireNonnegativeValues(v)
-}
-(1L, v.copy.toDense)
-  },
-  mergeValue = (c: (Long, DenseVector), v: Vector) => {
-requireNonnegativeValues(v)
-BLAS.axpy(1.0, v, c._2)
-(c._1 + 1L, c._2)
-  },
-  mergeCombiners = (c1: (Long, DenseVector), c2: (Long, DenseVector)) 
=> {
-BLAS.axpy(1.0, c2._2, c1._2)
-(c1._1 + c2._1, c1._2)
-  }
-).collect().sortBy(_._1)
+val nb = new NewNaiveBayes()
+  .setModelType(modelType)
+  .setSmoothing(lambda)
 
-val numLabels = aggregated.length
-var numDocuments = 0L
-aggregated.foreach { case (_, (n, _)) =>
-  numDocuments += n
-}
-val numFeatures = aggregated.head match { case (_, (_, v)) => v.size }
-
-val labels = new Array[Double](numLabels)
-val pi = new Array[Double](numLabels)
-val theta = Array.fill(numLabels)(new Array[Double](numFeatures))
-
-val piLogDenom = math.log(numDocuments + numLabels * lambda)
-var i = 0
-aggregated.foreach { case (label, (n, sumTermFreqs)) =>
-  labels(i) = label
-  pi(i) = math.log(n + lambda) - piLogDenom
-  val thetaLogDenom = modelType match {
-case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures 
* lambda)
-case Bernoulli => math.log(n + 2.0 * lambda)
-case _ =>
-  // This should never happen.
-  throw new UnknownError(s"Invalid modelType: $modelType.")
-  }
-  var j = 0
-  while (j < numFeatures) {
-theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom
-j += 1
-  }
-  i += 1
+val labels = data.map(_.label).distinct().collect().sorted
--- End diff --

It's not necessary to get labels by RDD operation which is expensive. Since 
we sort the labels and guarantee it in range [0, numClass), we can use ```val 
labels = pi.indices.map(_.toDouble).toArray``` directly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-29 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r81106187
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala 
---
@@ -150,6 +150,54 @@ class NaiveBayesSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defa
 validateProbabilities(featureAndProbabilities, model, "multinomial")
   }
 
+  test("Naive Bayes Multinomial with weighted samples") {
+val nPoints = 1000
+val piArray = Array(0.5, 0.1, 0.4).map(math.log)
+val thetaArray = Array(
+  Array(0.70, 0.10, 0.10, 0.10), // label 0
+  Array(0.10, 0.70, 0.10, 0.10), // label 1
+  Array(0.10, 0.10, 0.70, 0.10) // label 2
+).map(_.map(math.log))
+
+val testData = spark.createDataFrame(
+  generateNaiveBayesInput(piArray, thetaArray, nPoints, 42, 
"multinomial"))
--- End diff --

```val testData = generateNaiveBayesInput(piArray, thetaArray, nPoints, 42, 
"multinomial").toDF()```.
We promote ```toDF()``` which is more succinct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-29 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r81105095
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala ---
@@ -27,11 +27,14 @@ import org.json4s.jackson.JsonMethods._
 import org.apache.spark.{SparkContext, SparkException}
 import org.apache.spark.annotation.Since
 import org.apache.spark.internal.Logging
-import org.apache.spark.mllib.linalg.{BLAS, DenseMatrix, DenseVector, 
SparseVector, Vector}
+import org.apache.spark.ml.classification.{NaiveBayes => NewNaiveBayes}
+import org.apache.spark.ml.linalg.VectorUDT
+import org.apache.spark.mllib.linalg.{BLAS, DenseMatrix, DenseVector, 
Vector}
 import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.mllib.util.{Loader, Saveable}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.types._
--- End diff --

Remove unused import.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-29 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r81105041
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala ---
@@ -27,11 +27,14 @@ import org.json4s.jackson.JsonMethods._
 import org.apache.spark.{SparkContext, SparkException}
 import org.apache.spark.annotation.Since
 import org.apache.spark.internal.Logging
-import org.apache.spark.mllib.linalg.{BLAS, DenseMatrix, DenseVector, 
SparseVector, Vector}
+import org.apache.spark.ml.classification.{NaiveBayes => NewNaiveBayes}
+import org.apache.spark.ml.linalg.VectorUDT
--- End diff --

Remove unused import.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-28 Thread zhengruifeng
Github user zhengruifeng commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r80884362
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala 
---
@@ -150,6 +150,75 @@ class NaiveBayesSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defa
 validateProbabilities(featureAndProbabilities, model, "multinomial")
   }
 
+  test("Naive Bayes Multinomial with weighted samples") {
+val (dataset, weightedDataset) = {
+  val nPoints = 1000
+  val piArray = Array(0.5, 0.1, 0.4).map(math.log)
+  val thetaArray = Array(
+Array(0.70, 0.10, 0.10, 0.10), // label 0
+Array(0.10, 0.70, 0.10, 0.10), // label 1
+Array(0.10, 0.10, 0.70, 0.10) // label 2
+  ).map(_.map(math.log))
+  val pi = Vectors.dense(piArray)
+  val theta = new DenseMatrix(3, 4, thetaArray.flatten, true)
+
+  val testData = generateNaiveBayesInput(piArray, thetaArray, nPoints, 
42, "multinomial")
+
+  // Let's over-sample the label-1 samples twice, label-2 samples 
triple.
+  val data1 = testData.flatMap { case labeledPoint: LabeledPoint =>
+labeledPoint.label match {
+  case 0.0 => Iterator(labeledPoint)
+  case 1.0 => Iterator(labeledPoint, labeledPoint)
+  case 2.0 => Iterator(labeledPoint, labeledPoint, labeledPoint)
+}
+  }
+
+  val rnd = new Random(8392)
+  val data2 = testData.flatMap { case LabeledPoint(label: Double, 
features: Vector) =>
--- End diff --

@sethah Thanks a lot!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-21 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r79889093
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala 
---
@@ -150,6 +150,75 @@ class NaiveBayesSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defa
 validateProbabilities(featureAndProbabilities, model, "multinomial")
   }
 
+  test("Naive Bayes Multinomial with weighted samples") {
+val (dataset, weightedDataset) = {
+  val nPoints = 1000
+  val piArray = Array(0.5, 0.1, 0.4).map(math.log)
+  val thetaArray = Array(
+Array(0.70, 0.10, 0.10, 0.10), // label 0
+Array(0.10, 0.70, 0.10, 0.10), // label 1
+Array(0.10, 0.10, 0.70, 0.10) // label 2
+  ).map(_.map(math.log))
+  val pi = Vectors.dense(piArray)
+  val theta = new DenseMatrix(3, 4, thetaArray.flatten, true)
+
+  val testData = generateNaiveBayesInput(piArray, thetaArray, nPoints, 
42, "multinomial")
+
+  // Let's over-sample the label-1 samples twice, label-2 samples 
triple.
+  val data1 = testData.flatMap { case labeledPoint: LabeledPoint =>
+labeledPoint.label match {
+  case 0.0 => Iterator(labeledPoint)
+  case 1.0 => Iterator(labeledPoint, labeledPoint)
+  case 2.0 => Iterator(labeledPoint, labeledPoint, labeledPoint)
+}
+  }
+
+  val rnd = new Random(8392)
+  val data2 = testData.flatMap { case LabeledPoint(label: Double, 
features: Vector) =>
--- End diff --

I submitted a pr to your pr, with the weighted tests. (Hopefully I've done 
that correctly). Actually, I also think it is nice to test a case where the 
majority of the samples are outliers, but have small weights so they should not 
affect the predictions. This is semi-automated in MLUtils, but since NaiveBayes 
requires a certain type of features (0/1 in some cases) I don't think it 
integrates nicely yet. I think we should create a JIRA to automate weighted 
testing where we can think about this all together. For now, this test should 
be sufficient.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-21 Thread zhengruifeng
Github user zhengruifeng commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r79799464
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala 
---
@@ -150,6 +150,75 @@ class NaiveBayesSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defa
 validateProbabilities(featureAndProbabilities, model, "multinomial")
   }
 
+  test("Naive Bayes Multinomial with weighted samples") {
+val (dataset, weightedDataset) = {
+  val nPoints = 1000
+  val piArray = Array(0.5, 0.1, 0.4).map(math.log)
+  val thetaArray = Array(
+Array(0.70, 0.10, 0.10, 0.10), // label 0
+Array(0.10, 0.70, 0.10, 0.10), // label 1
+Array(0.10, 0.10, 0.70, 0.10) // label 2
+  ).map(_.map(math.log))
+  val pi = Vectors.dense(piArray)
+  val theta = new DenseMatrix(3, 4, thetaArray.flatten, true)
+
+  val testData = generateNaiveBayesInput(piArray, thetaArray, nPoints, 
42, "multinomial")
+
+  // Let's over-sample the label-1 samples twice, label-2 samples 
triple.
+  val data1 = testData.flatMap { case labeledPoint: LabeledPoint =>
+labeledPoint.label match {
+  case 0.0 => Iterator(labeledPoint)
+  case 1.0 => Iterator(labeledPoint, labeledPoint)
+  case 2.0 => Iterator(labeledPoint, labeledPoint, labeledPoint)
+}
+  }
+
+  val rnd = new Random(8392)
+  val data2 = testData.flatMap { case LabeledPoint(label: Double, 
features: Vector) =>
--- End diff --

I think it need some time before I can harness the new framework. What 
about let it alone for now?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-21 Thread zhengruifeng
Github user zhengruifeng commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r79795778
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala ---
@@ -355,79 +357,32 @@ class NaiveBayes private (
*/
   @Since("0.9.0")
   def run(data: RDD[LabeledPoint]): NaiveBayesModel = {
-val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
-  val values = v match {
-case sv: SparseVector => sv.values
-case dv: DenseVector => dv.values
-  }
-  if (!values.forall(_ >= 0.0)) {
-throw new SparkException(s"Naive Bayes requires nonnegative 
feature values but found $v.")
-  }
-}
+val spark = SparkSession
+  .builder()
+  .getOrCreate()
 
-val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
-  val values = v match {
-case sv: SparseVector => sv.values
-case dv: DenseVector => dv.values
-  }
-  if (!values.forall(v => v == 0.0 || v == 1.0)) {
-throw new SparkException(
-  s"Bernoulli naive Bayes requires 0 or 1 feature values but found 
$v.")
-  }
-}
+import spark.implicits._
 
-// Aggregates term frequencies per label.
-// TODO: Calling combineByKey and collect creates two stages, we can 
implement something
-// TODO: similar to reduceByKeyLocally to save one stage.
-val aggregated = data.map(p => (p.label, 
p.features)).combineByKey[(Long, DenseVector)](
-  createCombiner = (v: Vector) => {
-if (modelType == Bernoulli) {
-  requireZeroOneBernoulliValues(v)
-} else {
-  requireNonnegativeValues(v)
-}
-(1L, v.copy.toDense)
-  },
-  mergeValue = (c: (Long, DenseVector), v: Vector) => {
-requireNonnegativeValues(v)
-BLAS.axpy(1.0, v, c._2)
-(c._1 + 1L, c._2)
-  },
-  mergeCombiners = (c1: (Long, DenseVector), c2: (Long, DenseVector)) 
=> {
-BLAS.axpy(1.0, c2._2, c1._2)
-(c1._1 + c2._1, c1._2)
-  }
-).collect().sortBy(_._1)
+val nb = new NewNaiveBayes()
+  .setModelType(modelType)
+  .setSmoothing(lambda)
 
-val numLabels = aggregated.length
-var numDocuments = 0L
-aggregated.foreach { case (_, (n, _)) =>
-  numDocuments += n
-}
-val numFeatures = aggregated.head match { case (_, (_, v)) => v.size }
-
-val labels = new Array[Double](numLabels)
-val pi = new Array[Double](numLabels)
-val theta = Array.fill(numLabels)(new Array[Double](numFeatures))
-
-val piLogDenom = math.log(numDocuments + numLabels * lambda)
-var i = 0
-aggregated.foreach { case (label, (n, sumTermFreqs)) =>
-  labels(i) = label
-  pi(i) = math.log(n + lambda) - piLogDenom
-  val thetaLogDenom = modelType match {
-case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures 
* lambda)
-case Bernoulli => math.log(n + 2.0 * lambda)
-case _ =>
-  // This should never happen.
-  throw new UnknownError(s"Invalid modelType: $modelType.")
-  }
-  var j = 0
-  while (j < numFeatures) {
-theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom
-j += 1
-  }
-  i += 1
+val labels = data.map(_.label).distinct().collect().sorted
+
+// Input labels for [[org.apache.spark.ml.classification.NaiveBayes]] 
must be
+// in range [0, numClasses).
+val dataset = data.map {
+  case LabeledPoint(label, features) =>
+(labels.indexOf(label).toDouble, features.asML)
+}.toDF("label", "features")
--- End diff --

@yanboliang  I test `toDF` and `createDF`, and the implement with `toDF` is 
faster:
dataset:  mnist.scale, numFeatures : 780, numSamples 6
run 1000 times 
Duration:  `toDF` 85488, `createDF` 97122.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-20 Thread zhengruifeng
Github user zhengruifeng commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r79752969
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala 
---
@@ -150,6 +150,75 @@ class NaiveBayesSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defa
 validateProbabilities(featureAndProbabilities, model, "multinomial")
   }
 
+  test("Naive Bayes Multinomial with weighted samples") {
+val (dataset, weightedDataset) = {
+  val nPoints = 1000
+  val piArray = Array(0.5, 0.1, 0.4).map(math.log)
+  val thetaArray = Array(
+Array(0.70, 0.10, 0.10, 0.10), // label 0
+Array(0.10, 0.70, 0.10, 0.10), // label 1
+Array(0.10, 0.10, 0.70, 0.10) // label 2
+  ).map(_.map(math.log))
+  val pi = Vectors.dense(piArray)
+  val theta = new DenseMatrix(3, 4, thetaArray.flatten, true)
+
+  val testData = generateNaiveBayesInput(piArray, thetaArray, nPoints, 
42, "multinomial")
+
+  // Let's over-sample the label-1 samples twice, label-2 samples 
triple.
+  val data1 = testData.flatMap { case labeledPoint: LabeledPoint =>
+labeledPoint.label match {
+  case 0.0 => Iterator(labeledPoint)
+  case 1.0 => Iterator(labeledPoint, labeledPoint)
+  case 2.0 => Iterator(labeledPoint, labeledPoint, labeledPoint)
+}
+  }
+
+  val rnd = new Random(8392)
+  val data2 = testData.flatMap { case LabeledPoint(label: Double, 
features: Vector) =>
--- End diff --

Good point. Of course, I will update this testsuite keep in line with other 
algorithms.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-20 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r79632172
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala 
---
@@ -150,6 +150,75 @@ class NaiveBayesSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defa
 validateProbabilities(featureAndProbabilities, model, "multinomial")
   }
 
+  test("Naive Bayes Multinomial with weighted samples") {
+val (dataset, weightedDataset) = {
+  val nPoints = 1000
+  val piArray = Array(0.5, 0.1, 0.4).map(math.log)
+  val thetaArray = Array(
+Array(0.70, 0.10, 0.10, 0.10), // label 0
+Array(0.10, 0.70, 0.10, 0.10), // label 1
+Array(0.10, 0.10, 0.70, 0.10) // label 2
+  ).map(_.map(math.log))
+  val pi = Vectors.dense(piArray)
+  val theta = new DenseMatrix(3, 4, thetaArray.flatten, true)
+
+  val testData = generateNaiveBayesInput(piArray, thetaArray, nPoints, 
42, "multinomial")
+
+  // Let's over-sample the label-1 samples twice, label-2 samples 
triple.
+  val data1 = testData.flatMap { case labeledPoint: LabeledPoint =>
+labeledPoint.label match {
+  case 0.0 => Iterator(labeledPoint)
+  case 1.0 => Iterator(labeledPoint, labeledPoint)
+  case 2.0 => Iterator(labeledPoint, labeledPoint, labeledPoint)
+}
+  }
+
+  val rnd = new Random(8392)
+  val data2 = testData.flatMap { case LabeledPoint(label: Double, 
features: Vector) =>
--- End diff --

So, we are adding more and more algorithms in MLlib that are accepting 
weights, and I think we need to take care to create standardized unit tests 
that we can reuse. Could you take a look at the test for `LogisticRegression` 
and reuse that framework here? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-20 Thread zhengruifeng
Github user zhengruifeng commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r79603681
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala ---
@@ -109,10 +119,88 @@ class NaiveBayes @Since("1.5.0") (
 s" numClasses=$numClasses, but thresholds has length 
${$(thresholds).length}")
 }
 
-val oldDataset: RDD[OldLabeledPoint] =
-  extractLabeledPoints(dataset).map(OldLabeledPoint.fromML)
-val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), 
$(modelType))
-NaiveBayesModel.fromOld(oldModel, this)
+val numFeatures = 
dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
+
+val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+  if (!values.forall(_ >= 0.0)) {
+throw new SparkException(s"Naive Bayes requires nonnegative 
feature values but found $v.")
+  }
+}
+
+val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+  if (!values.forall(v => v == 0.0 || v == 1.0)) {
+throw new SparkException(
+  s"Bernoulli naive Bayes requires 0 or 1 feature values but found 
$v.")
+  }
+}
+
+val requireValues: Vector => Unit = {
+  $(modelType) match {
+case Multinomial =>
+  requireNonnegativeValues
+case Bernoulli =>
+  requireZeroOneBernoulliValues
+case _ =>
+  // This should never happen.
+  throw new UnknownError(s"Invalid modelType: ${$(modelType)}.")
+  }
+}
+
+val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) 
else col($(weightCol))
+
+val aggregated = dataset.select(col($(labelCol)).cast(DoubleType), w, 
col($(featuresCol))).rdd
+  .map { row => (row.getDouble(0), (row.getDouble(1), 
row.getAs[Vector](2)))
+  }.aggregateByKey[(Double, DenseVector)]((0.0, 
Vectors.zeros(numFeatures).toDense))(
+  seqOp = {
+ case (agg, (weight, features)) =>
+   requireValues(features)
+   BLAS.axpy(weight, features, agg._2)
+   (agg._1 + weight, agg._2)
+  },
+  combOp = {
+ case (agg1, agg2) =>
+   BLAS.axpy(1.0, agg2._2, agg1._2)
+   (agg1._1 + agg2._1, agg1._2)
+  }).collect().sortBy(_._1)
+
+val numLabels = aggregated.length
+val numDocuments = aggregated.map(_._2._1).sum
+
+val pi = Array.fill[Double](numLabels)(0.0)
+val theta = Array.fill[Double](numLabels, numFeatures)(0.0)
+
+val lambda = $(smoothing)
+val piLogDenom = math.log(numDocuments + numLabels * lambda)
+var i = 0
+aggregated.foreach { case (label, (n, sumTermFreqs)) =>
+  pi(i) = math.log(n + lambda) - piLogDenom
+  val thetaLogDenom = $(modelType) match {
+case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures 
* lambda)
+case Bernoulli => math.log(n + 2.0 * lambda)
+case _ =>
+  // This should never happen.
+  throw new UnknownError(s"Invalid modelType: ${$(modelType)}.")
+  }
+  var j = 0
+  while (j < numFeatures) {
+theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom
+j += 1
+  }
+  i += 1
+}
+
+val uid = Identifiable.randomUID("nb")
+val piVector = Vectors.dense(pi)
+val thetaMatrix = new DenseMatrix(numLabels, theta(0).length, 
theta.flatten, true)
--- End diff --

agreed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-20 Thread zhengruifeng
Github user zhengruifeng commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r79603503
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala ---
@@ -109,10 +119,88 @@ class NaiveBayes @Since("1.5.0") (
 s" numClasses=$numClasses, but thresholds has length 
${$(thresholds).length}")
 }
 
-val oldDataset: RDD[OldLabeledPoint] =
-  extractLabeledPoints(dataset).map(OldLabeledPoint.fromML)
-val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), 
$(modelType))
-NaiveBayesModel.fromOld(oldModel, this)
+val numFeatures = 
dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
+
+val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+  if (!values.forall(_ >= 0.0)) {
+throw new SparkException(s"Naive Bayes requires nonnegative 
feature values but found $v.")
+  }
+}
+
+val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+  if (!values.forall(v => v == 0.0 || v == 1.0)) {
+throw new SparkException(
+  s"Bernoulli naive Bayes requires 0 or 1 feature values but found 
$v.")
+  }
+}
+
+val requireValues: Vector => Unit = {
+  $(modelType) match {
+case Multinomial =>
+  requireNonnegativeValues
+case Bernoulli =>
+  requireZeroOneBernoulliValues
+case _ =>
+  // This should never happen.
+  throw new UnknownError(s"Invalid modelType: ${$(modelType)}.")
+  }
+}
+
+val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) 
else col($(weightCol))
+
+val aggregated = dataset.select(col($(labelCol)).cast(DoubleType), w, 
col($(featuresCol))).rdd
+  .map { row => (row.getDouble(0), (row.getDouble(1), 
row.getAs[Vector](2)))
+  }.aggregateByKey[(Double, DenseVector)]((0.0, 
Vectors.zeros(numFeatures).toDense))(
+  seqOp = {
+ case (agg, (weight, features)) =>
--- End diff --

ok.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-20 Thread zhengruifeng
Github user zhengruifeng commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r79603468
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala ---
@@ -109,10 +119,88 @@ class NaiveBayes @Since("1.5.0") (
 s" numClasses=$numClasses, but thresholds has length 
${$(thresholds).length}")
 }
 
-val oldDataset: RDD[OldLabeledPoint] =
-  extractLabeledPoints(dataset).map(OldLabeledPoint.fromML)
-val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), 
$(modelType))
-NaiveBayesModel.fromOld(oldModel, this)
+val numFeatures = 
dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
+
+val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+  if (!values.forall(_ >= 0.0)) {
+throw new SparkException(s"Naive Bayes requires nonnegative 
feature values but found $v.")
+  }
+}
+
+val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+  if (!values.forall(v => v == 0.0 || v == 1.0)) {
+throw new SparkException(
+  s"Bernoulli naive Bayes requires 0 or 1 feature values but found 
$v.")
+  }
+}
+
+val requireValues: Vector => Unit = {
+  $(modelType) match {
+case Multinomial =>
+  requireNonnegativeValues
+case Bernoulli =>
+  requireZeroOneBernoulliValues
+case _ =>
+  // This should never happen.
+  throw new UnknownError(s"Invalid modelType: ${$(modelType)}.")
+  }
+}
+
+val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) 
else col($(weightCol))
+
+val aggregated = dataset.select(col($(labelCol)).cast(DoubleType), w, 
col($(featuresCol))).rdd
+  .map { row => (row.getDouble(0), (row.getDouble(1), 
row.getAs[Vector](2)))
+  }.aggregateByKey[(Double, DenseVector)]((0.0, 
Vectors.zeros(numFeatures).toDense))(
+  seqOp = {
+ case (agg, (weight, features)) =>
+   requireValues(features)
+   BLAS.axpy(weight, features, agg._2)
+   (agg._1 + weight, agg._2)
+  },
+  combOp = {
+ case (agg1, agg2) =>
+   BLAS.axpy(1.0, agg2._2, agg1._2)
+   (agg1._1 + agg2._1, agg1._2)
+  }).collect().sortBy(_._1)
+
+val numLabels = aggregated.length
+val numDocuments = aggregated.map(_._2._1).sum
+
+val pi = Array.fill[Double](numLabels)(0.0)
+val theta = Array.fill[Double](numLabels, numFeatures)(0.0)
+
+val lambda = $(smoothing)
+val piLogDenom = math.log(numDocuments + numLabels * lambda)
+var i = 0
+aggregated.foreach { case (label, (n, sumTermFreqs)) =>
+  pi(i) = math.log(n + lambda) - piLogDenom
+  val thetaLogDenom = $(modelType) match {
+case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures 
* lambda)
+case Bernoulli => math.log(n + 2.0 * lambda)
+case _ =>
+  // This should never happen.
+  throw new UnknownError(s"Invalid modelType: ${$(modelType)}.")
+  }
+  var j = 0
+  while (j < numFeatures) {
+theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom
+j += 1
+  }
+  i += 1
+}
+
+val uid = Identifiable.randomUID("nb")
--- End diff --

ok, I will remove this line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-20 Thread zhengruifeng
Github user zhengruifeng commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r79602209
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala ---
@@ -109,10 +119,88 @@ class NaiveBayes @Since("1.5.0") (
 s" numClasses=$numClasses, but thresholds has length 
${$(thresholds).length}")
 }
 
-val oldDataset: RDD[OldLabeledPoint] =
-  extractLabeledPoints(dataset).map(OldLabeledPoint.fromML)
-val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), 
$(modelType))
-NaiveBayesModel.fromOld(oldModel, this)
+val numFeatures = 
dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
+
+val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+  if (!values.forall(_ >= 0.0)) {
+throw new SparkException(s"Naive Bayes requires nonnegative 
feature values but found $v.")
+  }
+}
+
+val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+  if (!values.forall(v => v == 0.0 || v == 1.0)) {
+throw new SparkException(
+  s"Bernoulli naive Bayes requires 0 or 1 feature values but found 
$v.")
+  }
+}
+
+val requireValues: Vector => Unit = {
+  $(modelType) match {
+case Multinomial =>
+  requireNonnegativeValues
+case Bernoulli =>
+  requireZeroOneBernoulliValues
+case _ =>
+  // This should never happen.
+  throw new UnknownError(s"Invalid modelType: ${$(modelType)}.")
+  }
+}
+
+val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) 
else col($(weightCol))
+
+val aggregated = dataset.select(col($(labelCol)).cast(DoubleType), w, 
col($(featuresCol))).rdd
--- End diff --

ok, but this TODO and comments should be added in ml's implement.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-20 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r79567686
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala ---
@@ -109,10 +119,88 @@ class NaiveBayes @Since("1.5.0") (
 s" numClasses=$numClasses, but thresholds has length 
${$(thresholds).length}")
 }
 
-val oldDataset: RDD[OldLabeledPoint] =
-  extractLabeledPoints(dataset).map(OldLabeledPoint.fromML)
-val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), 
$(modelType))
-NaiveBayesModel.fromOld(oldModel, this)
+val numFeatures = 
dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
+
+val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+  if (!values.forall(_ >= 0.0)) {
+throw new SparkException(s"Naive Bayes requires nonnegative 
feature values but found $v.")
+  }
+}
+
+val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+  if (!values.forall(v => v == 0.0 || v == 1.0)) {
+throw new SparkException(
+  s"Bernoulli naive Bayes requires 0 or 1 feature values but found 
$v.")
+  }
+}
+
+val requireValues: Vector => Unit = {
+  $(modelType) match {
+case Multinomial =>
+  requireNonnegativeValues
+case Bernoulli =>
+  requireZeroOneBernoulliValues
+case _ =>
+  // This should never happen.
+  throw new UnknownError(s"Invalid modelType: ${$(modelType)}.")
+  }
+}
+
+val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) 
else col($(weightCol))
+
+val aggregated = dataset.select(col($(labelCol)).cast(DoubleType), w, 
col($(featuresCol))).rdd
+  .map { row => (row.getDouble(0), (row.getDouble(1), 
row.getAs[Vector](2)))
+  }.aggregateByKey[(Double, DenseVector)]((0.0, 
Vectors.zeros(numFeatures).toDense))(
+  seqOp = {
+ case (agg, (weight, features)) =>
+   requireValues(features)
+   BLAS.axpy(weight, features, agg._2)
+   (agg._1 + weight, agg._2)
+  },
+  combOp = {
+ case (agg1, agg2) =>
+   BLAS.axpy(1.0, agg2._2, agg1._2)
+   (agg1._1 + agg2._1, agg1._2)
+  }).collect().sortBy(_._1)
+
+val numLabels = aggregated.length
+val numDocuments = aggregated.map(_._2._1).sum
+
+val pi = Array.fill[Double](numLabels)(0.0)
+val theta = Array.fill[Double](numLabels, numFeatures)(0.0)
+
+val lambda = $(smoothing)
+val piLogDenom = math.log(numDocuments + numLabels * lambda)
+var i = 0
+aggregated.foreach { case (label, (n, sumTermFreqs)) =>
+  pi(i) = math.log(n + lambda) - piLogDenom
+  val thetaLogDenom = $(modelType) match {
+case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures 
* lambda)
+case Bernoulli => math.log(n + 2.0 * lambda)
+case _ =>
+  // This should never happen.
+  throw new UnknownError(s"Invalid modelType: ${$(modelType)}.")
+  }
+  var j = 0
+  while (j < numFeatures) {
+theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom
+j += 1
+  }
+  i += 1
+}
+
+val uid = Identifiable.randomUID("nb")
+val piVector = Vectors.dense(pi)
+val thetaMatrix = new DenseMatrix(numLabels, theta(0).length, 
theta.flatten, true)
--- End diff --

```pi -> piArray, piVector -> pi, theta -> thetaArrays, thetaMatrix -> 
theta``` should be better?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-20 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r79564439
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala ---
@@ -109,10 +119,88 @@ class NaiveBayes @Since("1.5.0") (
 s" numClasses=$numClasses, but thresholds has length 
${$(thresholds).length}")
 }
 
-val oldDataset: RDD[OldLabeledPoint] =
-  extractLabeledPoints(dataset).map(OldLabeledPoint.fromML)
-val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), 
$(modelType))
-NaiveBayesModel.fromOld(oldModel, this)
+val numFeatures = 
dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
+
+val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+  if (!values.forall(_ >= 0.0)) {
+throw new SparkException(s"Naive Bayes requires nonnegative 
feature values but found $v.")
+  }
+}
+
+val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+  if (!values.forall(v => v == 0.0 || v == 1.0)) {
+throw new SparkException(
+  s"Bernoulli naive Bayes requires 0 or 1 feature values but found 
$v.")
+  }
+}
+
+val requireValues: Vector => Unit = {
+  $(modelType) match {
+case Multinomial =>
+  requireNonnegativeValues
+case Bernoulli =>
+  requireZeroOneBernoulliValues
+case _ =>
+  // This should never happen.
+  throw new UnknownError(s"Invalid modelType: ${$(modelType)}.")
+  }
+}
+
+val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) 
else col($(weightCol))
+
+val aggregated = dataset.select(col($(labelCol)).cast(DoubleType), w, 
col($(featuresCol))).rdd
+  .map { row => (row.getDouble(0), (row.getDouble(1), 
row.getAs[Vector](2)))
+  }.aggregateByKey[(Double, DenseVector)]((0.0, 
Vectors.zeros(numFeatures).toDense))(
+  seqOp = {
+ case (agg, (weight, features)) =>
--- End diff --

```agg -> (weightSum: Double, featureSum: Vector)```, we should avoid use 
```._1``` and it's better to mark type explicitly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-20 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r79569671
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala ---
@@ -355,79 +357,32 @@ class NaiveBayes private (
*/
   @Since("0.9.0")
   def run(data: RDD[LabeledPoint]): NaiveBayesModel = {
-val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
-  val values = v match {
-case sv: SparseVector => sv.values
-case dv: DenseVector => dv.values
-  }
-  if (!values.forall(_ >= 0.0)) {
-throw new SparkException(s"Naive Bayes requires nonnegative 
feature values but found $v.")
-  }
-}
+val spark = SparkSession
+  .builder()
+  .getOrCreate()
 
-val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
-  val values = v match {
-case sv: SparseVector => sv.values
-case dv: DenseVector => dv.values
-  }
-  if (!values.forall(v => v == 0.0 || v == 1.0)) {
-throw new SparkException(
-  s"Bernoulli naive Bayes requires 0 or 1 feature values but found 
$v.")
-  }
-}
+import spark.implicits._
 
-// Aggregates term frequencies per label.
-// TODO: Calling combineByKey and collect creates two stages, we can 
implement something
-// TODO: similar to reduceByKeyLocally to save one stage.
-val aggregated = data.map(p => (p.label, 
p.features)).combineByKey[(Long, DenseVector)](
-  createCombiner = (v: Vector) => {
-if (modelType == Bernoulli) {
-  requireZeroOneBernoulliValues(v)
-} else {
-  requireNonnegativeValues(v)
-}
-(1L, v.copy.toDense)
-  },
-  mergeValue = (c: (Long, DenseVector), v: Vector) => {
-requireNonnegativeValues(v)
-BLAS.axpy(1.0, v, c._2)
-(c._1 + 1L, c._2)
-  },
-  mergeCombiners = (c1: (Long, DenseVector), c2: (Long, DenseVector)) 
=> {
-BLAS.axpy(1.0, c2._2, c1._2)
-(c1._1 + c2._1, c1._2)
-  }
-).collect().sortBy(_._1)
+val nb = new NewNaiveBayes()
+  .setModelType(modelType)
+  .setSmoothing(lambda)
 
-val numLabels = aggregated.length
-var numDocuments = 0L
-aggregated.foreach { case (_, (n, _)) =>
-  numDocuments += n
-}
-val numFeatures = aggregated.head match { case (_, (_, v)) => v.size }
-
-val labels = new Array[Double](numLabels)
-val pi = new Array[Double](numLabels)
-val theta = Array.fill(numLabels)(new Array[Double](numFeatures))
-
-val piLogDenom = math.log(numDocuments + numLabels * lambda)
-var i = 0
-aggregated.foreach { case (label, (n, sumTermFreqs)) =>
-  labels(i) = label
-  pi(i) = math.log(n + lambda) - piLogDenom
-  val thetaLogDenom = modelType match {
-case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures 
* lambda)
-case Bernoulli => math.log(n + 2.0 * lambda)
-case _ =>
-  // This should never happen.
-  throw new UnknownError(s"Invalid modelType: $modelType.")
-  }
-  var j = 0
-  while (j < numFeatures) {
-theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom
-j += 1
-  }
-  i += 1
+val labels = data.map(_.label).distinct().collect().sorted
+
+// Input labels for [[org.apache.spark.ml.classification.NaiveBayes]] 
must be
+// in range [0, numClasses).
+val dataset = data.map {
+  case LabeledPoint(label, features) =>
+(labels.indexOf(label).toDouble, features.asML)
+}.toDF("label", "features")
--- End diff --

It's better to compare the performance between ```toDF``` and 
```createDataFrame```, and the regression performance is also necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-20 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r79561754
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala ---
@@ -109,10 +119,88 @@ class NaiveBayes @Since("1.5.0") (
 s" numClasses=$numClasses, but thresholds has length 
${$(thresholds).length}")
 }
 
-val oldDataset: RDD[OldLabeledPoint] =
-  extractLabeledPoints(dataset).map(OldLabeledPoint.fromML)
-val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), 
$(modelType))
-NaiveBayesModel.fromOld(oldModel, this)
+val numFeatures = 
dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
+
+val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+  if (!values.forall(_ >= 0.0)) {
+throw new SparkException(s"Naive Bayes requires nonnegative 
feature values but found $v.")
+  }
+}
+
+val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+  if (!values.forall(v => v == 0.0 || v == 1.0)) {
+throw new SparkException(
+  s"Bernoulli naive Bayes requires 0 or 1 feature values but found 
$v.")
+  }
+}
+
+val requireValues: Vector => Unit = {
+  $(modelType) match {
+case Multinomial =>
+  requireNonnegativeValues
+case Bernoulli =>
+  requireZeroOneBernoulliValues
+case _ =>
+  // This should never happen.
+  throw new UnknownError(s"Invalid modelType: ${$(modelType)}.")
+  }
+}
+
+val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) 
else col($(weightCol))
+
+val aggregated = dataset.select(col($(labelCol)).cast(DoubleType), w, 
col($(featuresCol))).rdd
--- End diff --

It's better to keep the original comments and TODOs here, and it can help 
developers or users to improve the code continuously.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-20 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r79569085
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala ---
@@ -355,79 +357,32 @@ class NaiveBayes private (
*/
   @Since("0.9.0")
   def run(data: RDD[LabeledPoint]): NaiveBayesModel = {
-val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
-  val values = v match {
-case sv: SparseVector => sv.values
-case dv: DenseVector => dv.values
-  }
-  if (!values.forall(_ >= 0.0)) {
-throw new SparkException(s"Naive Bayes requires nonnegative 
feature values but found $v.")
-  }
-}
+val spark = SparkSession
+  .builder()
+  .getOrCreate()
--- End diff --

```val spark = 
SparkSession.builder().sparkContext(data.context).getOrCreate()```
We should guarantee SparkSession was generated based on the existing data 
RDD context.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-20 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r79566824
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala ---
@@ -109,10 +119,88 @@ class NaiveBayes @Since("1.5.0") (
 s" numClasses=$numClasses, but thresholds has length 
${$(thresholds).length}")
 }
 
-val oldDataset: RDD[OldLabeledPoint] =
-  extractLabeledPoints(dataset).map(OldLabeledPoint.fromML)
-val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), 
$(modelType))
-NaiveBayesModel.fromOld(oldModel, this)
+val numFeatures = 
dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
+
+val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+  if (!values.forall(_ >= 0.0)) {
+throw new SparkException(s"Naive Bayes requires nonnegative 
feature values but found $v.")
+  }
+}
+
+val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+  if (!values.forall(v => v == 0.0 || v == 1.0)) {
+throw new SparkException(
+  s"Bernoulli naive Bayes requires 0 or 1 feature values but found 
$v.")
+  }
+}
+
+val requireValues: Vector => Unit = {
+  $(modelType) match {
+case Multinomial =>
+  requireNonnegativeValues
+case Bernoulli =>
+  requireZeroOneBernoulliValues
+case _ =>
+  // This should never happen.
+  throw new UnknownError(s"Invalid modelType: ${$(modelType)}.")
+  }
+}
+
+val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) 
else col($(weightCol))
+
+val aggregated = dataset.select(col($(labelCol)).cast(DoubleType), w, 
col($(featuresCol))).rdd
+  .map { row => (row.getDouble(0), (row.getDouble(1), 
row.getAs[Vector](2)))
+  }.aggregateByKey[(Double, DenseVector)]((0.0, 
Vectors.zeros(numFeatures).toDense))(
+  seqOp = {
+ case (agg, (weight, features)) =>
+   requireValues(features)
+   BLAS.axpy(weight, features, agg._2)
+   (agg._1 + weight, agg._2)
+  },
+  combOp = {
+ case (agg1, agg2) =>
+   BLAS.axpy(1.0, agg2._2, agg1._2)
+   (agg1._1 + agg2._1, agg1._2)
+  }).collect().sortBy(_._1)
+
+val numLabels = aggregated.length
+val numDocuments = aggregated.map(_._2._1).sum
+
+val pi = Array.fill[Double](numLabels)(0.0)
+val theta = Array.fill[Double](numLabels, numFeatures)(0.0)
+
+val lambda = $(smoothing)
+val piLogDenom = math.log(numDocuments + numLabels * lambda)
+var i = 0
+aggregated.foreach { case (label, (n, sumTermFreqs)) =>
+  pi(i) = math.log(n + lambda) - piLogDenom
+  val thetaLogDenom = $(modelType) match {
+case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures 
* lambda)
+case Bernoulli => math.log(n + 2.0 * lambda)
+case _ =>
+  // This should never happen.
+  throw new UnknownError(s"Invalid modelType: ${$(modelType)}.")
+  }
+  var j = 0
+  while (j < numFeatures) {
+theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom
+j += 1
+  }
+  i += 1
+}
+
+val uid = Identifiable.randomUID("nb")
--- End diff --

```uid``` has been generated at ```def this() = 
this(Identifiable.randomUID("nb"))```, you should use it directly rather than 
generate a new one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-20 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r7956
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala ---
@@ -109,10 +119,88 @@ class NaiveBayes @Since("1.5.0") (
 s" numClasses=$numClasses, but thresholds has length 
${$(thresholds).length}")
 }
 
-val oldDataset: RDD[OldLabeledPoint] =
-  extractLabeledPoints(dataset).map(OldLabeledPoint.fromML)
-val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), 
$(modelType))
-NaiveBayesModel.fromOld(oldModel, this)
+val numFeatures = 
dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
+
+val requireNonnegativeValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+  if (!values.forall(_ >= 0.0)) {
+throw new SparkException(s"Naive Bayes requires nonnegative 
feature values but found $v.")
+  }
+}
+
+val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => {
+  val values = v match {
+case sv: SparseVector => sv.values
+case dv: DenseVector => dv.values
+  }
+  if (!values.forall(v => v == 0.0 || v == 1.0)) {
+throw new SparkException(
+  s"Bernoulli naive Bayes requires 0 or 1 feature values but found 
$v.")
+  }
+}
+
+val requireValues: Vector => Unit = {
+  $(modelType) match {
+case Multinomial =>
+  requireNonnegativeValues
+case Bernoulli =>
+  requireZeroOneBernoulliValues
+case _ =>
+  // This should never happen.
+  throw new UnknownError(s"Invalid modelType: ${$(modelType)}.")
+  }
+}
+
+val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) 
else col($(weightCol))
+
+val aggregated = dataset.select(col($(labelCol)).cast(DoubleType), w, 
col($(featuresCol))).rdd
+  .map { row => (row.getDouble(0), (row.getDouble(1), 
row.getAs[Vector](2)))
+  }.aggregateByKey[(Double, DenseVector)]((0.0, 
Vectors.zeros(numFeatures).toDense))(
+  seqOp = {
+ case (agg, (weight, features)) =>
+   requireValues(features)
+   BLAS.axpy(weight, features, agg._2)
+   (agg._1 + weight, agg._2)
+  },
+  combOp = {
+ case (agg1, agg2) =>
--- End diff --

Ditto: ```(agg1, agg2) -> ((weightSum1: Double, featureSum1: Vector), 
(weightSum2: Double, featureSum2: Vetor))```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-12 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r78325688
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala ---
@@ -109,10 +120,51 @@ class NaiveBayes @Since("1.5.0") (
 s" numClasses=$numClasses, but thresholds has length 
${$(thresholds).length}")
 }
 
-val oldDataset: RDD[OldLabeledPoint] =
-  extractLabeledPoints(dataset).map(OldLabeledPoint.fromML)
-val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), 
$(modelType))
-NaiveBayesModel.fromOld(oldModel, this)
+val numFeatures = 
dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
+
+val wvsum = new WeightedVectorSum($(modelType), numFeatures)
+
+val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol))
+
+val aggregated =
+  dataset.select(col($(labelCol)).cast(DoubleType).as("label"), 
w.as("weight"),
+col($(featuresCol)).as("features"))
+.groupBy(col($(labelCol)))
+.agg(sum(col("weight")), wvsum(col("weight"), col("features")))
+.collect().map { row =>
+(row.getDouble(0), (row.getDouble(1), 
row.getAs[Vector](2).toDense))
+  }.sortBy(_._1)
--- End diff --

Do you have some performance test for switching to Dataset based operation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...

2016-09-12 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/12819#discussion_r78325579
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala ---
@@ -98,7 +99,17 @@ class NaiveBayes @Since("1.5.0") (
*/
   @Since("1.5.0")
   def setModelType(value: String): this.type = set(modelType, value)
-  setDefault(modelType -> OldNaiveBayes.Multinomial)
+  setDefault(modelType -> NaiveBayes.Multinomial)
+
+  /**
+   * Whether to over-/under-sample training instances according to the 
given weights in weightCol.
+   * If empty, all instances are treated equally (weight 1.0).
+   * Default is empty, so all instances have weight one.
+   * @group setParam
+   */
+  @Since("2.1.0")
+  def setWeightCol(value: String): this.type = set(weightCol, value)
+  setDefault(weightCol -> "")
--- End diff --

It's not necessary to set default for ```weightCol```. You can refer other 
place where used ```weightCol```.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org