[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r87151780 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala --- @@ -109,10 +118,89 @@ class NaiveBayes @Since("1.5.0") ( s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}") } -val oldDataset: RDD[OldLabeledPoint] = - extractLabeledPoints(dataset).map(OldLabeledPoint.fromML) -val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), $(modelType)) -NaiveBayesModel.fromOld(oldModel, this) +val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size + +val requireNonnegativeValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + + require(values.forall(_ >= 0.0), +s"Naive Bayes requires nonnegative feature values but found $v.") +} + +val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + + require(values.forall(v => v == 0.0 || v == 1.0), +s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.") +} + +val requireValues: Vector => Unit = { + $(modelType) match { --- End diff -- @thunterdb Good catch! I updated at #15826. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user thunterdb commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r87029278 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala --- @@ -109,10 +118,89 @@ class NaiveBayes @Since("1.5.0") ( s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}") } -val oldDataset: RDD[OldLabeledPoint] = - extractLabeledPoints(dataset).map(OldLabeledPoint.fromML) -val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), $(modelType)) -NaiveBayesModel.fromOld(oldModel, this) +val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size + +val requireNonnegativeValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + + require(values.forall(_ >= 0.0), +s"Naive Bayes requires nonnegative feature values but found $v.") +} + +val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + + require(values.forall(v => v == 0.0 || v == 1.0), +s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.") +} + +val requireValues: Vector => Unit = { + $(modelType) match { --- End diff -- Looking at the decompiled bytecode, the situation is a bit more complicated: the pattern match is inlined and is not wrapped in a closure, so it would work as you would expect. However, because this relies on a compiler optimization, we should still evaluate the model type before outside `requireValues`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user thunterdb commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r87025609 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala --- @@ -109,10 +118,89 @@ class NaiveBayes @Since("1.5.0") ( s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}") } -val oldDataset: RDD[OldLabeledPoint] = - extractLabeledPoints(dataset).map(OldLabeledPoint.fromML) -val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), $(modelType)) -NaiveBayesModel.fromOld(oldModel, this) +val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size + +val requireNonnegativeValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + + require(values.forall(_ >= 0.0), +s"Naive Bayes requires nonnegative feature values but found $v.") +} + +val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + + require(values.forall(v => v == 0.0 || v == 1.0), +s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.") +} + +val requireValues: Vector => Unit = { + $(modelType) match { --- End diff -- @zhengruifeng (major) you are capturing the outer object, through `$` and `modelType`. You should make a `val` outside the method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user thunterdb commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r87025222 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala --- @@ -109,10 +118,89 @@ class NaiveBayes @Since("1.5.0") ( s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}") } -val oldDataset: RDD[OldLabeledPoint] = - extractLabeledPoints(dataset).map(OldLabeledPoint.fromML) -val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), $(modelType)) -NaiveBayesModel.fromOld(oldModel, this) +val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size + +val requireNonnegativeValues: Vector => Unit = (v: Vector) => { --- End diff -- Also, this will help reducing the length of this method, which is very long. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user thunterdb commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r87025073 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala --- @@ -109,10 +118,89 @@ class NaiveBayes @Since("1.5.0") ( s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}") } -val oldDataset: RDD[OldLabeledPoint] = - extractLabeledPoints(dataset).map(OldLabeledPoint.fromML) -val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), $(modelType)) -NaiveBayesModel.fromOld(oldModel, this) +val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size + +val requireNonnegativeValues: Vector => Unit = (v: Vector) => { --- End diff -- @zhengruifeng (minor) this method is static and does not depend on the state of the object. It is preferable to put it in the companion object. Also, because people do not have a functional programing background, it is better to define it with `def` rather than a `val`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/12819 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r81285828 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala --- @@ -355,79 +356,33 @@ class NaiveBayes private ( */ @Since("0.9.0") def run(data: RDD[LabeledPoint]): NaiveBayesModel = { -val requireNonnegativeValues: Vector => Unit = (v: Vector) => { - val values = v match { -case sv: SparseVector => sv.values -case dv: DenseVector => dv.values - } - if (!values.forall(_ >= 0.0)) { -throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.") - } -} +val spark = SparkSession + .builder() + .sparkContext(data.context) + .getOrCreate() -val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => { - val values = v match { -case sv: SparseVector => sv.values -case dv: DenseVector => dv.values - } - if (!values.forall(v => v == 0.0 || v == 1.0)) { -throw new SparkException( - s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.") - } -} +import spark.implicits._ -// Aggregates term frequencies per label. -// TODO: Calling combineByKey and collect creates two stages, we can implement something -// TODO: similar to reduceByKeyLocally to save one stage. -val aggregated = data.map(p => (p.label, p.features)).combineByKey[(Long, DenseVector)]( - createCombiner = (v: Vector) => { -if (modelType == Bernoulli) { - requireZeroOneBernoulliValues(v) -} else { - requireNonnegativeValues(v) -} -(1L, v.copy.toDense) - }, - mergeValue = (c: (Long, DenseVector), v: Vector) => { -requireNonnegativeValues(v) -BLAS.axpy(1.0, v, c._2) -(c._1 + 1L, c._2) - }, - mergeCombiners = (c1: (Long, DenseVector), c2: (Long, DenseVector)) => { -BLAS.axpy(1.0, c2._2, c1._2) -(c1._1 + c2._1, c1._2) - } -).collect().sortBy(_._1) +val nb = new NewNaiveBayes() + .setModelType(modelType) + .setSmoothing(lambda) -val numLabels = aggregated.length -var numDocuments = 0L -aggregated.foreach { case (_, (n, _)) => - numDocuments += n -} -val numFeatures = aggregated.head match { case (_, (_, v)) => v.size } - -val labels = new Array[Double](numLabels) -val pi = new Array[Double](numLabels) -val theta = Array.fill(numLabels)(new Array[Double](numFeatures)) - -val piLogDenom = math.log(numDocuments + numLabels * lambda) -var i = 0 -aggregated.foreach { case (label, (n, sumTermFreqs)) => - labels(i) = label - pi(i) = math.log(n + lambda) - piLogDenom - val thetaLogDenom = modelType match { -case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures * lambda) -case Bernoulli => math.log(n + 2.0 * lambda) -case _ => - // This should never happen. - throw new UnknownError(s"Invalid modelType: $modelType.") - } - var j = 0 - while (j < numFeatures) { -theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom -j += 1 - } - i += 1 +val labels = data.map(_.label).distinct().collect().sorted --- End diff -- Yeah, untouching old mllib implementation is one of the candidates, but not an elegant solution. I'm ambivalent about this issue. Let's leave this issue as follow-up work. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user zhengruifeng commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r81256496 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala --- @@ -355,79 +356,33 @@ class NaiveBayes private ( */ @Since("0.9.0") def run(data: RDD[LabeledPoint]): NaiveBayesModel = { -val requireNonnegativeValues: Vector => Unit = (v: Vector) => { - val values = v match { -case sv: SparseVector => sv.values -case dv: DenseVector => dv.values - } - if (!values.forall(_ >= 0.0)) { -throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.") - } -} +val spark = SparkSession + .builder() + .sparkContext(data.context) + .getOrCreate() -val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => { - val values = v match { -case sv: SparseVector => sv.values -case dv: DenseVector => dv.values - } - if (!values.forall(v => v == 0.0 || v == 1.0)) { -throw new SparkException( - s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.") - } -} +import spark.implicits._ -// Aggregates term frequencies per label. -// TODO: Calling combineByKey and collect creates two stages, we can implement something -// TODO: similar to reduceByKeyLocally to save one stage. -val aggregated = data.map(p => (p.label, p.features)).combineByKey[(Long, DenseVector)]( - createCombiner = (v: Vector) => { -if (modelType == Bernoulli) { - requireZeroOneBernoulliValues(v) -} else { - requireNonnegativeValues(v) -} -(1L, v.copy.toDense) - }, - mergeValue = (c: (Long, DenseVector), v: Vector) => { -requireNonnegativeValues(v) -BLAS.axpy(1.0, v, c._2) -(c._1 + 1L, c._2) - }, - mergeCombiners = (c1: (Long, DenseVector), c2: (Long, DenseVector)) => { -BLAS.axpy(1.0, c2._2, c1._2) -(c1._1 + c2._1, c1._2) - } -).collect().sortBy(_._1) +val nb = new NewNaiveBayes() + .setModelType(modelType) + .setSmoothing(lambda) -val numLabels = aggregated.length -var numDocuments = 0L -aggregated.foreach { case (_, (n, _)) => - numDocuments += n -} -val numFeatures = aggregated.head match { case (_, (_, v)) => v.size } - -val labels = new Array[Double](numLabels) -val pi = new Array[Double](numLabels) -val theta = Array.fill(numLabels)(new Array[Double](numFeatures)) - -val piLogDenom = math.log(numDocuments + numLabels * lambda) -var i = 0 -aggregated.foreach { case (label, (n, sumTermFreqs)) => - labels(i) = label - pi(i) = math.log(n + lambda) - piLogDenom - val thetaLogDenom = modelType match { -case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures * lambda) -case Bernoulli => math.log(n + 2.0 * lambda) -case _ => - // This should never happen. - throw new UnknownError(s"Invalid modelType: $modelType.") - } - var j = 0 - while (j < numFeatures) { -theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom -j += 1 - } - i += 1 +val labels = data.map(_.label).distinct().collect().sorted + +// Input labels for [[org.apache.spark.ml.classification.NaiveBayes]] must be +// in range [0, numClasses). +val dataset = data.map { --- End diff -- `data.toDF()` dont work, because there are two types of `VectorUDT`. This change will cause MLlib's test failures, so I revert it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r81127281 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala --- @@ -109,10 +119,90 @@ class NaiveBayes @Since("1.5.0") ( s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}") } -val oldDataset: RDD[OldLabeledPoint] = - extractLabeledPoints(dataset).map(OldLabeledPoint.fromML) -val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), $(modelType)) -NaiveBayesModel.fromOld(oldModel, this) +val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size + +val requireNonnegativeValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + if (!values.forall(_ >= 0.0)) { --- End diff -- let's just use `require` here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r81225025 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala --- @@ -355,79 +356,33 @@ class NaiveBayes private ( */ @Since("0.9.0") def run(data: RDD[LabeledPoint]): NaiveBayesModel = { -val requireNonnegativeValues: Vector => Unit = (v: Vector) => { - val values = v match { -case sv: SparseVector => sv.values -case dv: DenseVector => dv.values - } - if (!values.forall(_ >= 0.0)) { -throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.") - } -} +val spark = SparkSession + .builder() + .sparkContext(data.context) + .getOrCreate() -val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => { - val values = v match { -case sv: SparseVector => sv.values -case dv: DenseVector => dv.values - } - if (!values.forall(v => v == 0.0 || v == 1.0)) { -throw new SparkException( - s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.") - } -} +import spark.implicits._ -// Aggregates term frequencies per label. -// TODO: Calling combineByKey and collect creates two stages, we can implement something -// TODO: similar to reduceByKeyLocally to save one stage. -val aggregated = data.map(p => (p.label, p.features)).combineByKey[(Long, DenseVector)]( - createCombiner = (v: Vector) => { -if (modelType == Bernoulli) { - requireZeroOneBernoulliValues(v) -} else { - requireNonnegativeValues(v) -} -(1L, v.copy.toDense) - }, - mergeValue = (c: (Long, DenseVector), v: Vector) => { -requireNonnegativeValues(v) -BLAS.axpy(1.0, v, c._2) -(c._1 + 1L, c._2) - }, - mergeCombiners = (c1: (Long, DenseVector), c2: (Long, DenseVector)) => { -BLAS.axpy(1.0, c2._2, c1._2) -(c1._1 + c2._1, c1._2) - } -).collect().sortBy(_._1) +val nb = new NewNaiveBayes() + .setModelType(modelType) + .setSmoothing(lambda) -val numLabels = aggregated.length -var numDocuments = 0L -aggregated.foreach { case (_, (n, _)) => - numDocuments += n -} -val numFeatures = aggregated.head match { case (_, (_, v)) => v.size } - -val labels = new Array[Double](numLabels) -val pi = new Array[Double](numLabels) -val theta = Array.fill(numLabels)(new Array[Double](numFeatures)) - -val piLogDenom = math.log(numDocuments + numLabels * lambda) -var i = 0 -aggregated.foreach { case (label, (n, sumTermFreqs)) => - labels(i) = label - pi(i) = math.log(n + lambda) - piLogDenom - val thetaLogDenom = modelType match { -case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures * lambda) -case Bernoulli => math.log(n + 2.0 * lambda) -case _ => - // This should never happen. - throw new UnknownError(s"Invalid modelType: $modelType.") - } - var j = 0 - while (j < numFeatures) { -theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom -j += 1 - } - i += 1 +val labels = data.map(_.label).distinct().collect().sorted --- End diff -- Yeah, so I think this is going to introduce a performance regression into the mllib version. Before, you could get the labels from the aggregates, but if we make mllib call into the ml implementation we need an extra pass through the data. One way to avoid this is leave the mllib implementation untouched, which has been done in the past with GBT and RF. It's not a great solution, I'm open to other suggestions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r81127329 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala --- @@ -109,10 +119,90 @@ class NaiveBayes @Since("1.5.0") ( s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}") } -val oldDataset: RDD[OldLabeledPoint] = - extractLabeledPoints(dataset).map(OldLabeledPoint.fromML) -val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), $(modelType)) -NaiveBayesModel.fromOld(oldModel, this) +val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size + +val requireNonnegativeValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + if (!values.forall(_ >= 0.0)) { +throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.") + } +} + +val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + if (!values.forall(v => v == 0.0 || v == 1.0)) { --- End diff -- ditto. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r81132746 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala --- @@ -280,15 +366,14 @@ object NaiveBayesModel extends MLReadable[NaiveBayesModel] { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString - val data = sparkSession.read.parquet(dataPath) - val vecConverted = MLUtils.convertVectorColumnsToML(data, "pi") - val Row(pi: Vector, theta: Matrix) = MLUtils.convertMatrixColumnsToML(vecConverted, "theta") -.select("pi", "theta") -.head() + val data = sparkSession.read.parquet(dataPath).select("pi", "theta").head() + val pi = data.getAs[Vector](0) + val theta = data.getAs[Matrix](1) val model = new NaiveBayesModel(metadata.uid, pi, theta) DefaultParamsReader.getAndSetParams(model, metadata) model } } + --- End diff -- remove --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user zhengruifeng commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r81128334 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala --- @@ -355,79 +356,33 @@ class NaiveBayes private ( */ @Since("0.9.0") def run(data: RDD[LabeledPoint]): NaiveBayesModel = { -val requireNonnegativeValues: Vector => Unit = (v: Vector) => { - val values = v match { -case sv: SparseVector => sv.values -case dv: DenseVector => dv.values - } - if (!values.forall(_ >= 0.0)) { -throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.") - } -} +val spark = SparkSession + .builder() + .sparkContext(data.context) + .getOrCreate() -val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => { - val values = v match { -case sv: SparseVector => sv.values -case dv: DenseVector => dv.values - } - if (!values.forall(v => v == 0.0 || v == 1.0)) { -throw new SparkException( - s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.") - } -} +import spark.implicits._ -// Aggregates term frequencies per label. -// TODO: Calling combineByKey and collect creates two stages, we can implement something -// TODO: similar to reduceByKeyLocally to save one stage. -val aggregated = data.map(p => (p.label, p.features)).combineByKey[(Long, DenseVector)]( - createCombiner = (v: Vector) => { -if (modelType == Bernoulli) { - requireZeroOneBernoulliValues(v) -} else { - requireNonnegativeValues(v) -} -(1L, v.copy.toDense) - }, - mergeValue = (c: (Long, DenseVector), v: Vector) => { -requireNonnegativeValues(v) -BLAS.axpy(1.0, v, c._2) -(c._1 + 1L, c._2) - }, - mergeCombiners = (c1: (Long, DenseVector), c2: (Long, DenseVector)) => { -BLAS.axpy(1.0, c2._2, c1._2) -(c1._1 + c2._1, c1._2) - } -).collect().sortBy(_._1) +val nb = new NewNaiveBayes() + .setModelType(modelType) + .setSmoothing(lambda) -val numLabels = aggregated.length -var numDocuments = 0L -aggregated.foreach { case (_, (n, _)) => - numDocuments += n -} -val numFeatures = aggregated.head match { case (_, (_, v)) => v.size } - -val labels = new Array[Double](numLabels) -val pi = new Array[Double](numLabels) -val theta = Array.fill(numLabels)(new Array[Double](numFeatures)) - -val piLogDenom = math.log(numDocuments + numLabels * lambda) -var i = 0 -aggregated.foreach { case (label, (n, sumTermFreqs)) => - labels(i) = label - pi(i) = math.log(n + lambda) - piLogDenom - val thetaLogDenom = modelType match { -case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures * lambda) -case Bernoulli => math.log(n + 2.0 * lambda) -case _ => - // This should never happen. - throw new UnknownError(s"Invalid modelType: $modelType.") - } - var j = 0 - while (j < numFeatures) { -theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom -j += 1 - } - i += 1 +val labels = data.map(_.label).distinct().collect().sorted --- End diff -- Labels in mllib's NB implementation are not guaranteed to be in range [0, numClasses), and the following code with label set to `{-1,+1}` run successfully ``` import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.classification.NaiveBayes val points = Seq(LabeledPoint(-1.0, Vectors.dense(Array(1.0,2.0))), LabeledPoint(+1.0, Vectors.dense(Array(1.0,2.0 val rdd = sc.parallelize(points) val nbm = NaiveBayes.train(rdd) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r81107309 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala --- @@ -355,79 +356,33 @@ class NaiveBayes private ( */ @Since("0.9.0") def run(data: RDD[LabeledPoint]): NaiveBayesModel = { -val requireNonnegativeValues: Vector => Unit = (v: Vector) => { - val values = v match { -case sv: SparseVector => sv.values -case dv: DenseVector => dv.values - } - if (!values.forall(_ >= 0.0)) { -throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.") - } -} +val spark = SparkSession + .builder() + .sparkContext(data.context) + .getOrCreate() -val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => { - val values = v match { -case sv: SparseVector => sv.values -case dv: DenseVector => dv.values - } - if (!values.forall(v => v == 0.0 || v == 1.0)) { -throw new SparkException( - s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.") - } -} +import spark.implicits._ -// Aggregates term frequencies per label. -// TODO: Calling combineByKey and collect creates two stages, we can implement something -// TODO: similar to reduceByKeyLocally to save one stage. -val aggregated = data.map(p => (p.label, p.features)).combineByKey[(Long, DenseVector)]( - createCombiner = (v: Vector) => { -if (modelType == Bernoulli) { - requireZeroOneBernoulliValues(v) -} else { - requireNonnegativeValues(v) -} -(1L, v.copy.toDense) - }, - mergeValue = (c: (Long, DenseVector), v: Vector) => { -requireNonnegativeValues(v) -BLAS.axpy(1.0, v, c._2) -(c._1 + 1L, c._2) - }, - mergeCombiners = (c1: (Long, DenseVector), c2: (Long, DenseVector)) => { -BLAS.axpy(1.0, c2._2, c1._2) -(c1._1 + c2._1, c1._2) - } -).collect().sortBy(_._1) +val nb = new NewNaiveBayes() + .setModelType(modelType) + .setSmoothing(lambda) -val numLabels = aggregated.length -var numDocuments = 0L -aggregated.foreach { case (_, (n, _)) => - numDocuments += n -} -val numFeatures = aggregated.head match { case (_, (_, v)) => v.size } - -val labels = new Array[Double](numLabels) -val pi = new Array[Double](numLabels) -val theta = Array.fill(numLabels)(new Array[Double](numFeatures)) - -val piLogDenom = math.log(numDocuments + numLabels * lambda) -var i = 0 -aggregated.foreach { case (label, (n, sumTermFreqs)) => - labels(i) = label - pi(i) = math.log(n + lambda) - piLogDenom - val thetaLogDenom = modelType match { -case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures * lambda) -case Bernoulli => math.log(n + 2.0 * lambda) -case _ => - // This should never happen. - throw new UnknownError(s"Invalid modelType: $modelType.") - } - var j = 0 - while (j < numFeatures) { -theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom -j += 1 - } - i += 1 +val labels = data.map(_.label).distinct().collect().sorted + +// Input labels for [[org.apache.spark.ml.classification.NaiveBayes]] must be +// in range [0, numClasses). +val dataset = data.map { --- End diff -- ```val dataset = data.toDF()``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r81106353 --- Diff: mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala --- @@ -150,6 +150,54 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa validateProbabilities(featureAndProbabilities, model, "multinomial") } + test("Naive Bayes Multinomial with weighted samples") { +val nPoints = 1000 +val piArray = Array(0.5, 0.1, 0.4).map(math.log) +val thetaArray = Array( + Array(0.70, 0.10, 0.10, 0.10), // label 0 + Array(0.10, 0.70, 0.10, 0.10), // label 1 + Array(0.10, 0.10, 0.70, 0.10) // label 2 +).map(_.map(math.log)) + +val testData = spark.createDataFrame( + generateNaiveBayesInput(piArray, thetaArray, nPoints, 42, "multinomial")) +val (overSampledData, weightedData) = + MLTestingUtils.genEquivalentOversampledAndWeightedInstances(testData, +"label", "features", 42L) +val nb = new NaiveBayes().setModelType("multinomial") +val unweightedModel = nb.fit(weightedData) +val overSampledModel = nb.fit(overSampledData) +val weightedModel = nb.setWeightCol("weight").fit(weightedData) +assert(weightedModel.theta ~== overSampledModel.theta relTol 0.001) +assert(weightedModel.pi ~== overSampledModel.pi relTol 0.001) +assert(unweightedModel.theta !~= overSampledModel.theta relTol 0.001) +assert(unweightedModel.pi !~= overSampledModel.pi relTol 0.001) + } + + test("Naive Bayes Bernoulli with weighted samples") { +val nPoints = 1 +val piArray = Array(0.5, 0.3, 0.2).map(math.log) +val thetaArray = Array( + Array(0.50, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.40), // label 0 + Array(0.02, 0.70, 0.10, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02), // label 1 + Array(0.02, 0.02, 0.60, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.02, 0.30) // label 2 +).map(_.map(math.log)) + +val testData = spark.createDataFrame( + generateNaiveBayesInput(piArray, thetaArray, nPoints, 42, "bernoulli")) --- End diff -- Ditto. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r81105501 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala --- @@ -355,79 +356,33 @@ class NaiveBayes private ( */ @Since("0.9.0") def run(data: RDD[LabeledPoint]): NaiveBayesModel = { -val requireNonnegativeValues: Vector => Unit = (v: Vector) => { - val values = v match { -case sv: SparseVector => sv.values -case dv: DenseVector => dv.values - } - if (!values.forall(_ >= 0.0)) { -throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.") - } -} +val spark = SparkSession + .builder() + .sparkContext(data.context) + .getOrCreate() -val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => { - val values = v match { -case sv: SparseVector => sv.values -case dv: DenseVector => dv.values - } - if (!values.forall(v => v == 0.0 || v == 1.0)) { -throw new SparkException( - s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.") - } -} +import spark.implicits._ -// Aggregates term frequencies per label. -// TODO: Calling combineByKey and collect creates two stages, we can implement something -// TODO: similar to reduceByKeyLocally to save one stage. -val aggregated = data.map(p => (p.label, p.features)).combineByKey[(Long, DenseVector)]( - createCombiner = (v: Vector) => { -if (modelType == Bernoulli) { - requireZeroOneBernoulliValues(v) -} else { - requireNonnegativeValues(v) -} -(1L, v.copy.toDense) - }, - mergeValue = (c: (Long, DenseVector), v: Vector) => { -requireNonnegativeValues(v) -BLAS.axpy(1.0, v, c._2) -(c._1 + 1L, c._2) - }, - mergeCombiners = (c1: (Long, DenseVector), c2: (Long, DenseVector)) => { -BLAS.axpy(1.0, c2._2, c1._2) -(c1._1 + c2._1, c1._2) - } -).collect().sortBy(_._1) +val nb = new NewNaiveBayes() + .setModelType(modelType) + .setSmoothing(lambda) -val numLabels = aggregated.length -var numDocuments = 0L -aggregated.foreach { case (_, (n, _)) => - numDocuments += n -} -val numFeatures = aggregated.head match { case (_, (_, v)) => v.size } - -val labels = new Array[Double](numLabels) -val pi = new Array[Double](numLabels) -val theta = Array.fill(numLabels)(new Array[Double](numFeatures)) - -val piLogDenom = math.log(numDocuments + numLabels * lambda) -var i = 0 -aggregated.foreach { case (label, (n, sumTermFreqs)) => - labels(i) = label - pi(i) = math.log(n + lambda) - piLogDenom - val thetaLogDenom = modelType match { -case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures * lambda) -case Bernoulli => math.log(n + 2.0 * lambda) -case _ => - // This should never happen. - throw new UnknownError(s"Invalid modelType: $modelType.") - } - var j = 0 - while (j < numFeatures) { -theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom -j += 1 - } - i += 1 +val labels = data.map(_.label).distinct().collect().sorted --- End diff -- It's not necessary to get labels by RDD operation which is expensive. Since we sort the labels and guarantee it in range [0, numClass), we can use ```val labels = pi.indices.map(_.toDouble).toArray``` directly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r81106187 --- Diff: mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala --- @@ -150,6 +150,54 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa validateProbabilities(featureAndProbabilities, model, "multinomial") } + test("Naive Bayes Multinomial with weighted samples") { +val nPoints = 1000 +val piArray = Array(0.5, 0.1, 0.4).map(math.log) +val thetaArray = Array( + Array(0.70, 0.10, 0.10, 0.10), // label 0 + Array(0.10, 0.70, 0.10, 0.10), // label 1 + Array(0.10, 0.10, 0.70, 0.10) // label 2 +).map(_.map(math.log)) + +val testData = spark.createDataFrame( + generateNaiveBayesInput(piArray, thetaArray, nPoints, 42, "multinomial")) --- End diff -- ```val testData = generateNaiveBayesInput(piArray, thetaArray, nPoints, 42, "multinomial").toDF()```. We promote ```toDF()``` which is more succinct. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r81105095 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala --- @@ -27,11 +27,14 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging -import org.apache.spark.mllib.linalg.{BLAS, DenseMatrix, DenseVector, SparseVector, Vector} +import org.apache.spark.ml.classification.{NaiveBayes => NewNaiveBayes} +import org.apache.spark.ml.linalg.VectorUDT +import org.apache.spark.mllib.linalg.{BLAS, DenseMatrix, DenseVector, Vector} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.types._ --- End diff -- Remove unused import. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r81105041 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala --- @@ -27,11 +27,14 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging -import org.apache.spark.mllib.linalg.{BLAS, DenseMatrix, DenseVector, SparseVector, Vector} +import org.apache.spark.ml.classification.{NaiveBayes => NewNaiveBayes} +import org.apache.spark.ml.linalg.VectorUDT --- End diff -- Remove unused import. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user zhengruifeng commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r80884362 --- Diff: mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala --- @@ -150,6 +150,75 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa validateProbabilities(featureAndProbabilities, model, "multinomial") } + test("Naive Bayes Multinomial with weighted samples") { +val (dataset, weightedDataset) = { + val nPoints = 1000 + val piArray = Array(0.5, 0.1, 0.4).map(math.log) + val thetaArray = Array( +Array(0.70, 0.10, 0.10, 0.10), // label 0 +Array(0.10, 0.70, 0.10, 0.10), // label 1 +Array(0.10, 0.10, 0.70, 0.10) // label 2 + ).map(_.map(math.log)) + val pi = Vectors.dense(piArray) + val theta = new DenseMatrix(3, 4, thetaArray.flatten, true) + + val testData = generateNaiveBayesInput(piArray, thetaArray, nPoints, 42, "multinomial") + + // Let's over-sample the label-1 samples twice, label-2 samples triple. + val data1 = testData.flatMap { case labeledPoint: LabeledPoint => +labeledPoint.label match { + case 0.0 => Iterator(labeledPoint) + case 1.0 => Iterator(labeledPoint, labeledPoint) + case 2.0 => Iterator(labeledPoint, labeledPoint, labeledPoint) +} + } + + val rnd = new Random(8392) + val data2 = testData.flatMap { case LabeledPoint(label: Double, features: Vector) => --- End diff -- @sethah Thanks a lot! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r79889093 --- Diff: mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala --- @@ -150,6 +150,75 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa validateProbabilities(featureAndProbabilities, model, "multinomial") } + test("Naive Bayes Multinomial with weighted samples") { +val (dataset, weightedDataset) = { + val nPoints = 1000 + val piArray = Array(0.5, 0.1, 0.4).map(math.log) + val thetaArray = Array( +Array(0.70, 0.10, 0.10, 0.10), // label 0 +Array(0.10, 0.70, 0.10, 0.10), // label 1 +Array(0.10, 0.10, 0.70, 0.10) // label 2 + ).map(_.map(math.log)) + val pi = Vectors.dense(piArray) + val theta = new DenseMatrix(3, 4, thetaArray.flatten, true) + + val testData = generateNaiveBayesInput(piArray, thetaArray, nPoints, 42, "multinomial") + + // Let's over-sample the label-1 samples twice, label-2 samples triple. + val data1 = testData.flatMap { case labeledPoint: LabeledPoint => +labeledPoint.label match { + case 0.0 => Iterator(labeledPoint) + case 1.0 => Iterator(labeledPoint, labeledPoint) + case 2.0 => Iterator(labeledPoint, labeledPoint, labeledPoint) +} + } + + val rnd = new Random(8392) + val data2 = testData.flatMap { case LabeledPoint(label: Double, features: Vector) => --- End diff -- I submitted a pr to your pr, with the weighted tests. (Hopefully I've done that correctly). Actually, I also think it is nice to test a case where the majority of the samples are outliers, but have small weights so they should not affect the predictions. This is semi-automated in MLUtils, but since NaiveBayes requires a certain type of features (0/1 in some cases) I don't think it integrates nicely yet. I think we should create a JIRA to automate weighted testing where we can think about this all together. For now, this test should be sufficient. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user zhengruifeng commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r79799464 --- Diff: mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala --- @@ -150,6 +150,75 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa validateProbabilities(featureAndProbabilities, model, "multinomial") } + test("Naive Bayes Multinomial with weighted samples") { +val (dataset, weightedDataset) = { + val nPoints = 1000 + val piArray = Array(0.5, 0.1, 0.4).map(math.log) + val thetaArray = Array( +Array(0.70, 0.10, 0.10, 0.10), // label 0 +Array(0.10, 0.70, 0.10, 0.10), // label 1 +Array(0.10, 0.10, 0.70, 0.10) // label 2 + ).map(_.map(math.log)) + val pi = Vectors.dense(piArray) + val theta = new DenseMatrix(3, 4, thetaArray.flatten, true) + + val testData = generateNaiveBayesInput(piArray, thetaArray, nPoints, 42, "multinomial") + + // Let's over-sample the label-1 samples twice, label-2 samples triple. + val data1 = testData.flatMap { case labeledPoint: LabeledPoint => +labeledPoint.label match { + case 0.0 => Iterator(labeledPoint) + case 1.0 => Iterator(labeledPoint, labeledPoint) + case 2.0 => Iterator(labeledPoint, labeledPoint, labeledPoint) +} + } + + val rnd = new Random(8392) + val data2 = testData.flatMap { case LabeledPoint(label: Double, features: Vector) => --- End diff -- I think it need some time before I can harness the new framework. What about let it alone for now? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user zhengruifeng commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r79795778 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala --- @@ -355,79 +357,32 @@ class NaiveBayes private ( */ @Since("0.9.0") def run(data: RDD[LabeledPoint]): NaiveBayesModel = { -val requireNonnegativeValues: Vector => Unit = (v: Vector) => { - val values = v match { -case sv: SparseVector => sv.values -case dv: DenseVector => dv.values - } - if (!values.forall(_ >= 0.0)) { -throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.") - } -} +val spark = SparkSession + .builder() + .getOrCreate() -val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => { - val values = v match { -case sv: SparseVector => sv.values -case dv: DenseVector => dv.values - } - if (!values.forall(v => v == 0.0 || v == 1.0)) { -throw new SparkException( - s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.") - } -} +import spark.implicits._ -// Aggregates term frequencies per label. -// TODO: Calling combineByKey and collect creates two stages, we can implement something -// TODO: similar to reduceByKeyLocally to save one stage. -val aggregated = data.map(p => (p.label, p.features)).combineByKey[(Long, DenseVector)]( - createCombiner = (v: Vector) => { -if (modelType == Bernoulli) { - requireZeroOneBernoulliValues(v) -} else { - requireNonnegativeValues(v) -} -(1L, v.copy.toDense) - }, - mergeValue = (c: (Long, DenseVector), v: Vector) => { -requireNonnegativeValues(v) -BLAS.axpy(1.0, v, c._2) -(c._1 + 1L, c._2) - }, - mergeCombiners = (c1: (Long, DenseVector), c2: (Long, DenseVector)) => { -BLAS.axpy(1.0, c2._2, c1._2) -(c1._1 + c2._1, c1._2) - } -).collect().sortBy(_._1) +val nb = new NewNaiveBayes() + .setModelType(modelType) + .setSmoothing(lambda) -val numLabels = aggregated.length -var numDocuments = 0L -aggregated.foreach { case (_, (n, _)) => - numDocuments += n -} -val numFeatures = aggregated.head match { case (_, (_, v)) => v.size } - -val labels = new Array[Double](numLabels) -val pi = new Array[Double](numLabels) -val theta = Array.fill(numLabels)(new Array[Double](numFeatures)) - -val piLogDenom = math.log(numDocuments + numLabels * lambda) -var i = 0 -aggregated.foreach { case (label, (n, sumTermFreqs)) => - labels(i) = label - pi(i) = math.log(n + lambda) - piLogDenom - val thetaLogDenom = modelType match { -case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures * lambda) -case Bernoulli => math.log(n + 2.0 * lambda) -case _ => - // This should never happen. - throw new UnknownError(s"Invalid modelType: $modelType.") - } - var j = 0 - while (j < numFeatures) { -theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom -j += 1 - } - i += 1 +val labels = data.map(_.label).distinct().collect().sorted + +// Input labels for [[org.apache.spark.ml.classification.NaiveBayes]] must be +// in range [0, numClasses). +val dataset = data.map { + case LabeledPoint(label, features) => +(labels.indexOf(label).toDouble, features.asML) +}.toDF("label", "features") --- End diff -- @yanboliang I test `toDF` and `createDF`, and the implement with `toDF` is faster: dataset: mnist.scale, numFeatures : 780, numSamples 6 run 1000 times Duration: `toDF` 85488, `createDF` 97122. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user zhengruifeng commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r79752969 --- Diff: mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala --- @@ -150,6 +150,75 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa validateProbabilities(featureAndProbabilities, model, "multinomial") } + test("Naive Bayes Multinomial with weighted samples") { +val (dataset, weightedDataset) = { + val nPoints = 1000 + val piArray = Array(0.5, 0.1, 0.4).map(math.log) + val thetaArray = Array( +Array(0.70, 0.10, 0.10, 0.10), // label 0 +Array(0.10, 0.70, 0.10, 0.10), // label 1 +Array(0.10, 0.10, 0.70, 0.10) // label 2 + ).map(_.map(math.log)) + val pi = Vectors.dense(piArray) + val theta = new DenseMatrix(3, 4, thetaArray.flatten, true) + + val testData = generateNaiveBayesInput(piArray, thetaArray, nPoints, 42, "multinomial") + + // Let's over-sample the label-1 samples twice, label-2 samples triple. + val data1 = testData.flatMap { case labeledPoint: LabeledPoint => +labeledPoint.label match { + case 0.0 => Iterator(labeledPoint) + case 1.0 => Iterator(labeledPoint, labeledPoint) + case 2.0 => Iterator(labeledPoint, labeledPoint, labeledPoint) +} + } + + val rnd = new Random(8392) + val data2 = testData.flatMap { case LabeledPoint(label: Double, features: Vector) => --- End diff -- Good point. Of course, I will update this testsuite keep in line with other algorithms. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r79632172 --- Diff: mllib/src/test/scala/org/apache/spark/ml/classification/NaiveBayesSuite.scala --- @@ -150,6 +150,75 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa validateProbabilities(featureAndProbabilities, model, "multinomial") } + test("Naive Bayes Multinomial with weighted samples") { +val (dataset, weightedDataset) = { + val nPoints = 1000 + val piArray = Array(0.5, 0.1, 0.4).map(math.log) + val thetaArray = Array( +Array(0.70, 0.10, 0.10, 0.10), // label 0 +Array(0.10, 0.70, 0.10, 0.10), // label 1 +Array(0.10, 0.10, 0.70, 0.10) // label 2 + ).map(_.map(math.log)) + val pi = Vectors.dense(piArray) + val theta = new DenseMatrix(3, 4, thetaArray.flatten, true) + + val testData = generateNaiveBayesInput(piArray, thetaArray, nPoints, 42, "multinomial") + + // Let's over-sample the label-1 samples twice, label-2 samples triple. + val data1 = testData.flatMap { case labeledPoint: LabeledPoint => +labeledPoint.label match { + case 0.0 => Iterator(labeledPoint) + case 1.0 => Iterator(labeledPoint, labeledPoint) + case 2.0 => Iterator(labeledPoint, labeledPoint, labeledPoint) +} + } + + val rnd = new Random(8392) + val data2 = testData.flatMap { case LabeledPoint(label: Double, features: Vector) => --- End diff -- So, we are adding more and more algorithms in MLlib that are accepting weights, and I think we need to take care to create standardized unit tests that we can reuse. Could you take a look at the test for `LogisticRegression` and reuse that framework here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user zhengruifeng commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r79603681 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala --- @@ -109,10 +119,88 @@ class NaiveBayes @Since("1.5.0") ( s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}") } -val oldDataset: RDD[OldLabeledPoint] = - extractLabeledPoints(dataset).map(OldLabeledPoint.fromML) -val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), $(modelType)) -NaiveBayesModel.fromOld(oldModel, this) +val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size + +val requireNonnegativeValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + if (!values.forall(_ >= 0.0)) { +throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.") + } +} + +val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + if (!values.forall(v => v == 0.0 || v == 1.0)) { +throw new SparkException( + s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.") + } +} + +val requireValues: Vector => Unit = { + $(modelType) match { +case Multinomial => + requireNonnegativeValues +case Bernoulli => + requireZeroOneBernoulliValues +case _ => + // This should never happen. + throw new UnknownError(s"Invalid modelType: ${$(modelType)}.") + } +} + +val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) + +val aggregated = dataset.select(col($(labelCol)).cast(DoubleType), w, col($(featuresCol))).rdd + .map { row => (row.getDouble(0), (row.getDouble(1), row.getAs[Vector](2))) + }.aggregateByKey[(Double, DenseVector)]((0.0, Vectors.zeros(numFeatures).toDense))( + seqOp = { + case (agg, (weight, features)) => + requireValues(features) + BLAS.axpy(weight, features, agg._2) + (agg._1 + weight, agg._2) + }, + combOp = { + case (agg1, agg2) => + BLAS.axpy(1.0, agg2._2, agg1._2) + (agg1._1 + agg2._1, agg1._2) + }).collect().sortBy(_._1) + +val numLabels = aggregated.length +val numDocuments = aggregated.map(_._2._1).sum + +val pi = Array.fill[Double](numLabels)(0.0) +val theta = Array.fill[Double](numLabels, numFeatures)(0.0) + +val lambda = $(smoothing) +val piLogDenom = math.log(numDocuments + numLabels * lambda) +var i = 0 +aggregated.foreach { case (label, (n, sumTermFreqs)) => + pi(i) = math.log(n + lambda) - piLogDenom + val thetaLogDenom = $(modelType) match { +case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures * lambda) +case Bernoulli => math.log(n + 2.0 * lambda) +case _ => + // This should never happen. + throw new UnknownError(s"Invalid modelType: ${$(modelType)}.") + } + var j = 0 + while (j < numFeatures) { +theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom +j += 1 + } + i += 1 +} + +val uid = Identifiable.randomUID("nb") +val piVector = Vectors.dense(pi) +val thetaMatrix = new DenseMatrix(numLabels, theta(0).length, theta.flatten, true) --- End diff -- agreed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user zhengruifeng commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r79603503 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala --- @@ -109,10 +119,88 @@ class NaiveBayes @Since("1.5.0") ( s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}") } -val oldDataset: RDD[OldLabeledPoint] = - extractLabeledPoints(dataset).map(OldLabeledPoint.fromML) -val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), $(modelType)) -NaiveBayesModel.fromOld(oldModel, this) +val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size + +val requireNonnegativeValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + if (!values.forall(_ >= 0.0)) { +throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.") + } +} + +val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + if (!values.forall(v => v == 0.0 || v == 1.0)) { +throw new SparkException( + s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.") + } +} + +val requireValues: Vector => Unit = { + $(modelType) match { +case Multinomial => + requireNonnegativeValues +case Bernoulli => + requireZeroOneBernoulliValues +case _ => + // This should never happen. + throw new UnknownError(s"Invalid modelType: ${$(modelType)}.") + } +} + +val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) + +val aggregated = dataset.select(col($(labelCol)).cast(DoubleType), w, col($(featuresCol))).rdd + .map { row => (row.getDouble(0), (row.getDouble(1), row.getAs[Vector](2))) + }.aggregateByKey[(Double, DenseVector)]((0.0, Vectors.zeros(numFeatures).toDense))( + seqOp = { + case (agg, (weight, features)) => --- End diff -- ok. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user zhengruifeng commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r79603468 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala --- @@ -109,10 +119,88 @@ class NaiveBayes @Since("1.5.0") ( s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}") } -val oldDataset: RDD[OldLabeledPoint] = - extractLabeledPoints(dataset).map(OldLabeledPoint.fromML) -val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), $(modelType)) -NaiveBayesModel.fromOld(oldModel, this) +val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size + +val requireNonnegativeValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + if (!values.forall(_ >= 0.0)) { +throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.") + } +} + +val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + if (!values.forall(v => v == 0.0 || v == 1.0)) { +throw new SparkException( + s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.") + } +} + +val requireValues: Vector => Unit = { + $(modelType) match { +case Multinomial => + requireNonnegativeValues +case Bernoulli => + requireZeroOneBernoulliValues +case _ => + // This should never happen. + throw new UnknownError(s"Invalid modelType: ${$(modelType)}.") + } +} + +val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) + +val aggregated = dataset.select(col($(labelCol)).cast(DoubleType), w, col($(featuresCol))).rdd + .map { row => (row.getDouble(0), (row.getDouble(1), row.getAs[Vector](2))) + }.aggregateByKey[(Double, DenseVector)]((0.0, Vectors.zeros(numFeatures).toDense))( + seqOp = { + case (agg, (weight, features)) => + requireValues(features) + BLAS.axpy(weight, features, agg._2) + (agg._1 + weight, agg._2) + }, + combOp = { + case (agg1, agg2) => + BLAS.axpy(1.0, agg2._2, agg1._2) + (agg1._1 + agg2._1, agg1._2) + }).collect().sortBy(_._1) + +val numLabels = aggregated.length +val numDocuments = aggregated.map(_._2._1).sum + +val pi = Array.fill[Double](numLabels)(0.0) +val theta = Array.fill[Double](numLabels, numFeatures)(0.0) + +val lambda = $(smoothing) +val piLogDenom = math.log(numDocuments + numLabels * lambda) +var i = 0 +aggregated.foreach { case (label, (n, sumTermFreqs)) => + pi(i) = math.log(n + lambda) - piLogDenom + val thetaLogDenom = $(modelType) match { +case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures * lambda) +case Bernoulli => math.log(n + 2.0 * lambda) +case _ => + // This should never happen. + throw new UnknownError(s"Invalid modelType: ${$(modelType)}.") + } + var j = 0 + while (j < numFeatures) { +theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom +j += 1 + } + i += 1 +} + +val uid = Identifiable.randomUID("nb") --- End diff -- ok, I will remove this line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user zhengruifeng commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r79602209 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala --- @@ -109,10 +119,88 @@ class NaiveBayes @Since("1.5.0") ( s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}") } -val oldDataset: RDD[OldLabeledPoint] = - extractLabeledPoints(dataset).map(OldLabeledPoint.fromML) -val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), $(modelType)) -NaiveBayesModel.fromOld(oldModel, this) +val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size + +val requireNonnegativeValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + if (!values.forall(_ >= 0.0)) { +throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.") + } +} + +val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + if (!values.forall(v => v == 0.0 || v == 1.0)) { +throw new SparkException( + s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.") + } +} + +val requireValues: Vector => Unit = { + $(modelType) match { +case Multinomial => + requireNonnegativeValues +case Bernoulli => + requireZeroOneBernoulliValues +case _ => + // This should never happen. + throw new UnknownError(s"Invalid modelType: ${$(modelType)}.") + } +} + +val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) + +val aggregated = dataset.select(col($(labelCol)).cast(DoubleType), w, col($(featuresCol))).rdd --- End diff -- ok, but this TODO and comments should be added in ml's implement. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r79567686 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala --- @@ -109,10 +119,88 @@ class NaiveBayes @Since("1.5.0") ( s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}") } -val oldDataset: RDD[OldLabeledPoint] = - extractLabeledPoints(dataset).map(OldLabeledPoint.fromML) -val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), $(modelType)) -NaiveBayesModel.fromOld(oldModel, this) +val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size + +val requireNonnegativeValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + if (!values.forall(_ >= 0.0)) { +throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.") + } +} + +val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + if (!values.forall(v => v == 0.0 || v == 1.0)) { +throw new SparkException( + s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.") + } +} + +val requireValues: Vector => Unit = { + $(modelType) match { +case Multinomial => + requireNonnegativeValues +case Bernoulli => + requireZeroOneBernoulliValues +case _ => + // This should never happen. + throw new UnknownError(s"Invalid modelType: ${$(modelType)}.") + } +} + +val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) + +val aggregated = dataset.select(col($(labelCol)).cast(DoubleType), w, col($(featuresCol))).rdd + .map { row => (row.getDouble(0), (row.getDouble(1), row.getAs[Vector](2))) + }.aggregateByKey[(Double, DenseVector)]((0.0, Vectors.zeros(numFeatures).toDense))( + seqOp = { + case (agg, (weight, features)) => + requireValues(features) + BLAS.axpy(weight, features, agg._2) + (agg._1 + weight, agg._2) + }, + combOp = { + case (agg1, agg2) => + BLAS.axpy(1.0, agg2._2, agg1._2) + (agg1._1 + agg2._1, agg1._2) + }).collect().sortBy(_._1) + +val numLabels = aggregated.length +val numDocuments = aggregated.map(_._2._1).sum + +val pi = Array.fill[Double](numLabels)(0.0) +val theta = Array.fill[Double](numLabels, numFeatures)(0.0) + +val lambda = $(smoothing) +val piLogDenom = math.log(numDocuments + numLabels * lambda) +var i = 0 +aggregated.foreach { case (label, (n, sumTermFreqs)) => + pi(i) = math.log(n + lambda) - piLogDenom + val thetaLogDenom = $(modelType) match { +case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures * lambda) +case Bernoulli => math.log(n + 2.0 * lambda) +case _ => + // This should never happen. + throw new UnknownError(s"Invalid modelType: ${$(modelType)}.") + } + var j = 0 + while (j < numFeatures) { +theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom +j += 1 + } + i += 1 +} + +val uid = Identifiable.randomUID("nb") +val piVector = Vectors.dense(pi) +val thetaMatrix = new DenseMatrix(numLabels, theta(0).length, theta.flatten, true) --- End diff -- ```pi -> piArray, piVector -> pi, theta -> thetaArrays, thetaMatrix -> theta``` should be better? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r79564439 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala --- @@ -109,10 +119,88 @@ class NaiveBayes @Since("1.5.0") ( s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}") } -val oldDataset: RDD[OldLabeledPoint] = - extractLabeledPoints(dataset).map(OldLabeledPoint.fromML) -val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), $(modelType)) -NaiveBayesModel.fromOld(oldModel, this) +val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size + +val requireNonnegativeValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + if (!values.forall(_ >= 0.0)) { +throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.") + } +} + +val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + if (!values.forall(v => v == 0.0 || v == 1.0)) { +throw new SparkException( + s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.") + } +} + +val requireValues: Vector => Unit = { + $(modelType) match { +case Multinomial => + requireNonnegativeValues +case Bernoulli => + requireZeroOneBernoulliValues +case _ => + // This should never happen. + throw new UnknownError(s"Invalid modelType: ${$(modelType)}.") + } +} + +val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) + +val aggregated = dataset.select(col($(labelCol)).cast(DoubleType), w, col($(featuresCol))).rdd + .map { row => (row.getDouble(0), (row.getDouble(1), row.getAs[Vector](2))) + }.aggregateByKey[(Double, DenseVector)]((0.0, Vectors.zeros(numFeatures).toDense))( + seqOp = { + case (agg, (weight, features)) => --- End diff -- ```agg -> (weightSum: Double, featureSum: Vector)```, we should avoid use ```._1``` and it's better to mark type explicitly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r79569671 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala --- @@ -355,79 +357,32 @@ class NaiveBayes private ( */ @Since("0.9.0") def run(data: RDD[LabeledPoint]): NaiveBayesModel = { -val requireNonnegativeValues: Vector => Unit = (v: Vector) => { - val values = v match { -case sv: SparseVector => sv.values -case dv: DenseVector => dv.values - } - if (!values.forall(_ >= 0.0)) { -throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.") - } -} +val spark = SparkSession + .builder() + .getOrCreate() -val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => { - val values = v match { -case sv: SparseVector => sv.values -case dv: DenseVector => dv.values - } - if (!values.forall(v => v == 0.0 || v == 1.0)) { -throw new SparkException( - s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.") - } -} +import spark.implicits._ -// Aggregates term frequencies per label. -// TODO: Calling combineByKey and collect creates two stages, we can implement something -// TODO: similar to reduceByKeyLocally to save one stage. -val aggregated = data.map(p => (p.label, p.features)).combineByKey[(Long, DenseVector)]( - createCombiner = (v: Vector) => { -if (modelType == Bernoulli) { - requireZeroOneBernoulliValues(v) -} else { - requireNonnegativeValues(v) -} -(1L, v.copy.toDense) - }, - mergeValue = (c: (Long, DenseVector), v: Vector) => { -requireNonnegativeValues(v) -BLAS.axpy(1.0, v, c._2) -(c._1 + 1L, c._2) - }, - mergeCombiners = (c1: (Long, DenseVector), c2: (Long, DenseVector)) => { -BLAS.axpy(1.0, c2._2, c1._2) -(c1._1 + c2._1, c1._2) - } -).collect().sortBy(_._1) +val nb = new NewNaiveBayes() + .setModelType(modelType) + .setSmoothing(lambda) -val numLabels = aggregated.length -var numDocuments = 0L -aggregated.foreach { case (_, (n, _)) => - numDocuments += n -} -val numFeatures = aggregated.head match { case (_, (_, v)) => v.size } - -val labels = new Array[Double](numLabels) -val pi = new Array[Double](numLabels) -val theta = Array.fill(numLabels)(new Array[Double](numFeatures)) - -val piLogDenom = math.log(numDocuments + numLabels * lambda) -var i = 0 -aggregated.foreach { case (label, (n, sumTermFreqs)) => - labels(i) = label - pi(i) = math.log(n + lambda) - piLogDenom - val thetaLogDenom = modelType match { -case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures * lambda) -case Bernoulli => math.log(n + 2.0 * lambda) -case _ => - // This should never happen. - throw new UnknownError(s"Invalid modelType: $modelType.") - } - var j = 0 - while (j < numFeatures) { -theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom -j += 1 - } - i += 1 +val labels = data.map(_.label).distinct().collect().sorted + +// Input labels for [[org.apache.spark.ml.classification.NaiveBayes]] must be +// in range [0, numClasses). +val dataset = data.map { + case LabeledPoint(label, features) => +(labels.indexOf(label).toDouble, features.asML) +}.toDF("label", "features") --- End diff -- It's better to compare the performance between ```toDF``` and ```createDataFrame```, and the regression performance is also necessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r79561754 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala --- @@ -109,10 +119,88 @@ class NaiveBayes @Since("1.5.0") ( s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}") } -val oldDataset: RDD[OldLabeledPoint] = - extractLabeledPoints(dataset).map(OldLabeledPoint.fromML) -val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), $(modelType)) -NaiveBayesModel.fromOld(oldModel, this) +val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size + +val requireNonnegativeValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + if (!values.forall(_ >= 0.0)) { +throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.") + } +} + +val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + if (!values.forall(v => v == 0.0 || v == 1.0)) { +throw new SparkException( + s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.") + } +} + +val requireValues: Vector => Unit = { + $(modelType) match { +case Multinomial => + requireNonnegativeValues +case Bernoulli => + requireZeroOneBernoulliValues +case _ => + // This should never happen. + throw new UnknownError(s"Invalid modelType: ${$(modelType)}.") + } +} + +val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) + +val aggregated = dataset.select(col($(labelCol)).cast(DoubleType), w, col($(featuresCol))).rdd --- End diff -- It's better to keep the original comments and TODOs here, and it can help developers or users to improve the code continuously. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r79569085 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala --- @@ -355,79 +357,32 @@ class NaiveBayes private ( */ @Since("0.9.0") def run(data: RDD[LabeledPoint]): NaiveBayesModel = { -val requireNonnegativeValues: Vector => Unit = (v: Vector) => { - val values = v match { -case sv: SparseVector => sv.values -case dv: DenseVector => dv.values - } - if (!values.forall(_ >= 0.0)) { -throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.") - } -} +val spark = SparkSession + .builder() + .getOrCreate() --- End diff -- ```val spark = SparkSession.builder().sparkContext(data.context).getOrCreate()``` We should guarantee SparkSession was generated based on the existing data RDD context. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r79566824 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala --- @@ -109,10 +119,88 @@ class NaiveBayes @Since("1.5.0") ( s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}") } -val oldDataset: RDD[OldLabeledPoint] = - extractLabeledPoints(dataset).map(OldLabeledPoint.fromML) -val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), $(modelType)) -NaiveBayesModel.fromOld(oldModel, this) +val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size + +val requireNonnegativeValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + if (!values.forall(_ >= 0.0)) { +throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.") + } +} + +val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + if (!values.forall(v => v == 0.0 || v == 1.0)) { +throw new SparkException( + s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.") + } +} + +val requireValues: Vector => Unit = { + $(modelType) match { +case Multinomial => + requireNonnegativeValues +case Bernoulli => + requireZeroOneBernoulliValues +case _ => + // This should never happen. + throw new UnknownError(s"Invalid modelType: ${$(modelType)}.") + } +} + +val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) + +val aggregated = dataset.select(col($(labelCol)).cast(DoubleType), w, col($(featuresCol))).rdd + .map { row => (row.getDouble(0), (row.getDouble(1), row.getAs[Vector](2))) + }.aggregateByKey[(Double, DenseVector)]((0.0, Vectors.zeros(numFeatures).toDense))( + seqOp = { + case (agg, (weight, features)) => + requireValues(features) + BLAS.axpy(weight, features, agg._2) + (agg._1 + weight, agg._2) + }, + combOp = { + case (agg1, agg2) => + BLAS.axpy(1.0, agg2._2, agg1._2) + (agg1._1 + agg2._1, agg1._2) + }).collect().sortBy(_._1) + +val numLabels = aggregated.length +val numDocuments = aggregated.map(_._2._1).sum + +val pi = Array.fill[Double](numLabels)(0.0) +val theta = Array.fill[Double](numLabels, numFeatures)(0.0) + +val lambda = $(smoothing) +val piLogDenom = math.log(numDocuments + numLabels * lambda) +var i = 0 +aggregated.foreach { case (label, (n, sumTermFreqs)) => + pi(i) = math.log(n + lambda) - piLogDenom + val thetaLogDenom = $(modelType) match { +case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures * lambda) +case Bernoulli => math.log(n + 2.0 * lambda) +case _ => + // This should never happen. + throw new UnknownError(s"Invalid modelType: ${$(modelType)}.") + } + var j = 0 + while (j < numFeatures) { +theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom +j += 1 + } + i += 1 +} + +val uid = Identifiable.randomUID("nb") --- End diff -- ```uid``` has been generated at ```def this() = this(Identifiable.randomUID("nb"))```, you should use it directly rather than generate a new one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r7956 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala --- @@ -109,10 +119,88 @@ class NaiveBayes @Since("1.5.0") ( s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}") } -val oldDataset: RDD[OldLabeledPoint] = - extractLabeledPoints(dataset).map(OldLabeledPoint.fromML) -val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), $(modelType)) -NaiveBayesModel.fromOld(oldModel, this) +val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size + +val requireNonnegativeValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + if (!values.forall(_ >= 0.0)) { +throw new SparkException(s"Naive Bayes requires nonnegative feature values but found $v.") + } +} + +val requireZeroOneBernoulliValues: Vector => Unit = (v: Vector) => { + val values = v match { +case sv: SparseVector => sv.values +case dv: DenseVector => dv.values + } + if (!values.forall(v => v == 0.0 || v == 1.0)) { +throw new SparkException( + s"Bernoulli naive Bayes requires 0 or 1 feature values but found $v.") + } +} + +val requireValues: Vector => Unit = { + $(modelType) match { +case Multinomial => + requireNonnegativeValues +case Bernoulli => + requireZeroOneBernoulliValues +case _ => + // This should never happen. + throw new UnknownError(s"Invalid modelType: ${$(modelType)}.") + } +} + +val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) + +val aggregated = dataset.select(col($(labelCol)).cast(DoubleType), w, col($(featuresCol))).rdd + .map { row => (row.getDouble(0), (row.getDouble(1), row.getAs[Vector](2))) + }.aggregateByKey[(Double, DenseVector)]((0.0, Vectors.zeros(numFeatures).toDense))( + seqOp = { + case (agg, (weight, features)) => + requireValues(features) + BLAS.axpy(weight, features, agg._2) + (agg._1 + weight, agg._2) + }, + combOp = { + case (agg1, agg2) => --- End diff -- Ditto: ```(agg1, agg2) -> ((weightSum1: Double, featureSum1: Vector), (weightSum2: Double, featureSum2: Vetor))``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r78325688 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala --- @@ -109,10 +120,51 @@ class NaiveBayes @Since("1.5.0") ( s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}") } -val oldDataset: RDD[OldLabeledPoint] = - extractLabeledPoints(dataset).map(OldLabeledPoint.fromML) -val oldModel = OldNaiveBayes.train(oldDataset, $(smoothing), $(modelType)) -NaiveBayesModel.fromOld(oldModel, this) +val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size + +val wvsum = new WeightedVectorSum($(modelType), numFeatures) + +val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol)) + +val aggregated = + dataset.select(col($(labelCol)).cast(DoubleType).as("label"), w.as("weight"), +col($(featuresCol)).as("features")) +.groupBy(col($(labelCol))) +.agg(sum(col("weight")), wvsum(col("weight"), col("features"))) +.collect().map { row => +(row.getDouble(0), (row.getDouble(1), row.getAs[Vector](2).toDense)) + }.sortBy(_._1) --- End diff -- Do you have some performance test for switching to Dataset based operation? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #12819: [SPARK-14077][ML] Refactor NaiveBayes to support ...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/12819#discussion_r78325579 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala --- @@ -98,7 +99,17 @@ class NaiveBayes @Since("1.5.0") ( */ @Since("1.5.0") def setModelType(value: String): this.type = set(modelType, value) - setDefault(modelType -> OldNaiveBayes.Multinomial) + setDefault(modelType -> NaiveBayes.Multinomial) + + /** + * Whether to over-/under-sample training instances according to the given weights in weightCol. + * If empty, all instances are treated equally (weight 1.0). + * Default is empty, so all instances have weight one. + * @group setParam + */ + @Since("2.1.0") + def setWeightCol(value: String): this.type = set(weightCol, value) + setDefault(weightCol -> "") --- End diff -- It's not necessary to set default for ```weightCol```. You can refer other place where used ```weightCol```. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org