[GitHub] spark pull request #20875: [MINOR] Fix Java lint from new JavaKolmogorovSmir...
GitHub user jkbradley opened a pull request: https://github.com/apache/spark/pull/20875 [MINOR] Fix Java lint from new JavaKolmogorovSmirnovTestSuite ## What changes were proposed in this pull request? Fix lint-java from https://github.com/apache/spark/pull/19108 addition of JavaKolmogorovSmirnovTestSuite You can merge this pull request into a Git repository by running: $ git pull https://github.com/jkbradley/spark kstest-lint-fix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20875.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20875 commit d3236cce0113177c0807937c8795daf51524dd6a Author: Joseph K. Bradley Date: 2018-03-21T19:36:31Z fix java lint --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-10884][ML] Support prediction on single instance for regression and classification related models
Repository: spark Updated Branches: refs/heads/master 500b21c3d -> bf09f2f71 [SPARK-10884][ML] Support prediction on single instance for regression and classification related models ## What changes were proposed in this pull request? Support prediction on single instance for regression and classification related models (i.e., PredictionModel, ClassificationModel and their sub classes). Add corresponding test cases. ## How was this patch tested? Test cases added. Author: WeichenXu Closes #19381 from WeichenXu123/single_prediction. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf09f2f7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf09f2f7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf09f2f7 Branch: refs/heads/master Commit: bf09f2f71276d3b3a84a8f89109bd785a066c3e6 Parents: 500b21c Author: WeichenXu Authored: Wed Mar 21 09:39:14 2018 -0700 Committer: Joseph K. Bradley Committed: Wed Mar 21 09:39:14 2018 -0700 -- .../scala/org/apache/spark/ml/Predictor.scala | 5 ++-- .../spark/ml/classification/Classifier.scala| 6 ++--- .../classification/DecisionTreeClassifier.scala | 2 +- .../spark/ml/classification/GBTClassifier.scala | 2 +- .../spark/ml/classification/LinearSVC.scala | 2 +- .../ml/classification/LogisticRegression.scala | 2 +- .../MultilayerPerceptronClassifier.scala| 2 +- .../ml/regression/DecisionTreeRegressor.scala | 2 +- .../spark/ml/regression/GBTRegressor.scala | 2 +- .../GeneralizedLinearRegression.scala | 2 +- .../spark/ml/regression/LinearRegression.scala | 2 +- .../ml/regression/RandomForestRegressor.scala | 2 +- .../DecisionTreeClassifierSuite.scala | 17 +- .../ml/classification/GBTClassifierSuite.scala | 9 .../ml/classification/LinearSVCSuite.scala | 6 + .../LogisticRegressionSuite.scala | 9 .../MultilayerPerceptronClassifierSuite.scala | 12 ++ .../ml/classification/NaiveBayesSuite.scala | 22 ++ .../RandomForestClassifierSuite.scala | 16 + .../regression/DecisionTreeRegressorSuite.scala | 15 .../spark/ml/regression/GBTRegressorSuite.scala | 8 +++ .../GeneralizedLinearRegressionSuite.scala | 8 +++ .../ml/regression/LinearRegressionSuite.scala | 7 ++ .../regression/RandomForestRegressorSuite.scala | 24 .../scala/org/apache/spark/ml/util/MLTest.scala | 15 ++-- 25 files changed, 176 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bf09f2f7/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala index 08b0cb9..d8f3dfa 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala @@ -219,7 +219,8 @@ abstract class PredictionModel[FeaturesType, M <: PredictionModel[FeaturesType, /** * Predict label for the given features. - * This internal method is used to implement `transform()` and output [[predictionCol]]. + * This method is used to implement `transform()` and output [[predictionCol]]. */ - protected def predict(features: FeaturesType): Double + @Since("2.4.0") + def predict(features: FeaturesType): Double } http://git-wip-us.apache.org/repos/asf/spark/blob/bf09f2f7/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala index 9d1d5aa..7e5790a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala @@ -18,7 +18,7 @@ package org.apache.spark.ml.classification import org.apache.spark.SparkException -import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.annotation.{DeveloperApi, Since} import org.apache.spark.ml.{PredictionModel, Predictor, PredictorParams} import org.apache.spark.ml.feature.LabeledPoint import org.apache.spark.ml.linalg.{Vector, VectorUDT} @@ -192,12 +192,12 @@ abstract class ClassificationModel[FeaturesType, M <: ClassificationModel[Featur /** * Predict label for the given features. - * This internal method is used to implement `transform()` and output [[predictionCol]]. + * This method is used to implement `transform()` and output [[predictionCol]]. *
[GitHub] spark issue #19381: [SPARK-10884][ML] Support prediction on single instance ...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/19381 Merging with master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20633: [SPARK-23455][ML] Default Params in ML should be ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20633#discussion_r175965924 --- Diff: mllib/src/test/scala/org/apache/spark/ml/util/DefaultReadWriteTest.scala --- @@ -169,4 +179,54 @@ class DefaultReadWriteSuite extends SparkFunSuite with MLlibTestSparkContext val myParams = new MyParams("my_params") testDefaultReadWrite(myParams) } + + test("default param shouldn't become user-supplied param after persistence") { +val myParams = new MyParams("my_params") +myParams.set(myParams.shouldNotSetIfSetintParamWithDefault, 1) +myParams.checkExclusiveParams() +val loadedMyParams = testDefaultReadWrite(myParams) +loadedMyParams.checkExclusiveParams() +assert(loadedMyParams.getDefault(loadedMyParams.intParamWithDefault) == + myParams.getDefault(myParams.intParamWithDefault)) + +loadedMyParams.set(myParams.intParamWithDefault, 1) +intercept[SparkException] { + loadedMyParams.checkExclusiveParams() +} + } + + test("User-suppiled value for default param should be kept after persistence") { --- End diff -- suppiled -> supplied --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20633: [SPARK-23455][ML] Default Params in ML should be ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20633#discussion_r175965272 --- Diff: mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala --- @@ -351,27 +359,88 @@ private[ml] object DefaultParamsReader { timestamp: Long, sparkVersion: String, params: JValue, + defaultParams: JValue, metadata: JValue, metadataJson: String) { + +private def getValueFromParams(params: JValue): Seq[(String, JValue)] = { + params match { +case JObject(pairs) => pairs +case _ => + throw new IllegalArgumentException( +s"Cannot recognize JSON metadata: $metadataJson.") + } +} + /** * Get the JSON value of the [[org.apache.spark.ml.param.Param]] of the given name. * This can be useful for getting a Param value before an instance of `Params` - * is available. + * is available. This will look up `params` first, if not existing then looking up + * `defaultParams`. */ def getParamValue(paramName: String): JValue = { implicit val format = DefaultFormats - params match { + + // Looking up for `params` first. + var pairs = getValueFromParams(params) + var foundPairs = pairs.filter { case (pName, jsonValue) => +pName == paramName + } + if (foundPairs.length == 0) { +// Looking up for `defaultParams` then. +pairs = getValueFromParams(defaultParams) +foundPairs = pairs.filter { case (pName, jsonValue) => + pName == paramName +} + } + assert(foundPairs.length == 1, s"Expected one instance of Param '$paramName' but found" + +s" ${foundPairs.length} in JSON Params: " + pairs.map(_.toString).mkString(", ")) + + foundPairs.map(_._2).head +} + +/** + * Extract Params from metadata, and set them in the instance. + * This works if all Params (except params included by `skipParams` list) implement + * [[org.apache.spark.ml.param.Param.jsonDecode()]]. + * + * @param skipParams The params included in `skipParams` won't be set. This is useful if some + * params don't implement [[org.apache.spark.ml.param.Param.jsonDecode()]] + * and need special handling. + */ +def getAndSetParams( +instance: Params, +skipParams: Option[List[String]] = None): Unit = { + setParams(instance, false, skipParams) + setParams(instance, true, skipParams) +} + +private def setParams( +instance: Params, +isDefault: Boolean, +skipParams: Option[List[String]]): Unit = { + implicit val format = DefaultFormats + val (major, minor) = VersionUtils.majorMinorVersion(sparkVersion) + val paramsToSet = if (isDefault) defaultParams else params + paramsToSet match { case JObject(pairs) => - val values = pairs.filter { case (pName, jsonValue) => -pName == paramName - }.map(_._2) - assert(values.length == 1, s"Expected one instance of Param '$paramName' but found" + -s" ${values.length} in JSON Params: " + pairs.map(_.toString).mkString(", ")) - values.head + pairs.foreach { case (paramName, jsonValue) => +if (skipParams == None || !skipParams.get.contains(paramName)) { + val param = instance.getParam(paramName) + val value = param.jsonDecode(compact(render(jsonValue))) + if (isDefault) { +instance.setDefault(param, value) + } else { +instance.set(param, value) + } +} + } +// For metadata file prior to Spark 2.4, there is no default section. +case JNothing if isDefault && (major == 2 && minor < 4 || major < 2) => --- End diff -- This logic would be simpler if this check were put in the getAndSetParams method, which could just skip calling setParams(instance, true, skipParams) for Spark 2.3-. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20633: [SPARK-23455][ML] Default Params in ML should be ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20633#discussion_r175961795 --- Diff: mllib/src/main/scala/org/apache/spark/ml/param/params.scala --- @@ -791,7 +791,7 @@ trait Params extends Identifiable with Serializable { * this method gets called. * @param value the default value */ - protected final def setDefault[T](param: Param[T], value: T): this.type = { + private[ml] final def setDefault[T](param: Param[T], value: T): this.type = { --- End diff -- We should leave this as protected. It's important that 3rd-party libraries be able to extend MLlib APIs, and this is one they need. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20633: [SPARK-23455][ML] Default Params in ML should be ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20633#discussion_r175966231 --- Diff: mllib/src/test/scala/org/apache/spark/ml/util/DefaultReadWriteTest.scala --- @@ -169,4 +179,54 @@ class DefaultReadWriteSuite extends SparkFunSuite with MLlibTestSparkContext val myParams = new MyParams("my_params") testDefaultReadWrite(myParams) } + + test("default param shouldn't become user-supplied param after persistence") { +val myParams = new MyParams("my_params") +myParams.set(myParams.shouldNotSetIfSetintParamWithDefault, 1) +myParams.checkExclusiveParams() +val loadedMyParams = testDefaultReadWrite(myParams) +loadedMyParams.checkExclusiveParams() +assert(loadedMyParams.getDefault(loadedMyParams.intParamWithDefault) == + myParams.getDefault(myParams.intParamWithDefault)) + +loadedMyParams.set(myParams.intParamWithDefault, 1) +intercept[SparkException] { + loadedMyParams.checkExclusiveParams() +} + } + + test("User-suppiled value for default param should be kept after persistence") { +val myParams = new MyParams("my_params") +myParams.set(myParams.intParamWithDefault, 100) +val loadedMyParams = testDefaultReadWrite(myParams) +assert(loadedMyParams.get(myParams.intParamWithDefault).get == 100) + } + + test("Read metadata without default field prior to 2.4") { --- End diff -- Nice! I like this setup. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20633: [SPARK-23455][ML] Default Params in ML should be ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20633#discussion_r175963464 --- Diff: mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala --- @@ -351,27 +359,88 @@ private[ml] object DefaultParamsReader { timestamp: Long, sparkVersion: String, params: JValue, + defaultParams: JValue, metadata: JValue, metadataJson: String) { + +private def getValueFromParams(params: JValue): Seq[(String, JValue)] = { + params match { +case JObject(pairs) => pairs +case _ => + throw new IllegalArgumentException( +s"Cannot recognize JSON metadata: $metadataJson.") + } +} + /** * Get the JSON value of the [[org.apache.spark.ml.param.Param]] of the given name. * This can be useful for getting a Param value before an instance of `Params` - * is available. + * is available. This will look up `params` first, if not existing then looking up + * `defaultParams`. */ def getParamValue(paramName: String): JValue = { implicit val format = DefaultFormats - params match { + + // Looking up for `params` first. + var pairs = getValueFromParams(params) + var foundPairs = pairs.filter { case (pName, jsonValue) => +pName == paramName + } + if (foundPairs.length == 0) { +// Looking up for `defaultParams` then. +pairs = getValueFromParams(defaultParams) +foundPairs = pairs.filter { case (pName, jsonValue) => + pName == paramName +} + } + assert(foundPairs.length == 1, s"Expected one instance of Param '$paramName' but found" + +s" ${foundPairs.length} in JSON Params: " + pairs.map(_.toString).mkString(", ")) + + foundPairs.map(_._2).head +} + +/** + * Extract Params from metadata, and set them in the instance. + * This works if all Params (except params included by `skipParams` list) implement + * [[org.apache.spark.ml.param.Param.jsonDecode()]]. + * + * @param skipParams The params included in `skipParams` won't be set. This is useful if some + * params don't implement [[org.apache.spark.ml.param.Param.jsonDecode()]] + * and need special handling. + */ +def getAndSetParams( +instance: Params, +skipParams: Option[List[String]] = None): Unit = { + setParams(instance, false, skipParams) --- End diff -- style nit: It's nice to pass boolean args by name --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20633: [SPARK-23455][ML] Default Params in ML should be ...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20633#discussion_r175962188 --- Diff: mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala --- @@ -296,14 +297,19 @@ private[ml] object DefaultParamsWriter { paramMap: Option[JValue] = None): String = { val uid = instance.uid val cls = instance.getClass.getName -val params = instance.extractParamMap().toSeq.asInstanceOf[Seq[ParamPair[Any]]] +val params = instance.paramMap.toSeq +val defaultParams = instance.defaultParamMap.toSeq val jsonParams = paramMap.getOrElse(render(params.map { case ParamPair(p, v) => p.name -> parse(p.jsonEncode(v)) }.toList)) +val jsonDefaultParams = render(defaultParams.map { case ParamPair(p, v) => + p.name -> parse(p.jsonEncode(v)) +}.toList) val basicMetadata = ("class" -> cls) ~ ("timestamp" -> System.currentTimeMillis()) ~ ("sparkVersion" -> sc.version) ~ ("uid" -> uid) ~ + ("defaultParamMap" -> jsonDefaultParams) ~ --- End diff -- nit: How about putting this below paramMap since that's nicer for viewing the JSON? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20786#discussion_r175957326 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala --- @@ -84,35 +86,73 @@ private[ml] object Node { /** * Create a new Node from the old Node format, recursively creating child nodes as needed. */ - def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node = { + def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int], +isClassification: Boolean): Node = { if (oldNode.isLeaf) { // TODO: Once the implementation has been moved to this API, then include sufficient // statistics here. - new LeafNode(prediction = oldNode.predict.predict, -impurity = oldNode.impurity, impurityStats = null) + if (isClassification) { +new ClassificationLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } else { +new RegressionLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } } else { val gain = if (oldNode.stats.nonEmpty) { oldNode.stats.get.gain } else { 0.0 } - new InternalNode(prediction = oldNode.predict.predict, impurity = oldNode.impurity, -gain = gain, leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures), -rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures), -split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + if (isClassification) { +new ClassificationInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } else { +new RegressionInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } } } } -/** - * Decision tree leaf node. - * @param prediction Prediction this node makes - * @param impurity Impurity measure at this node (for training data) - */ -class LeafNode private[ml] ( -override val prediction: Double, -override val impurity: Double, -override private[ml] val impurityStats: ImpurityCalculator) extends Node { +trait ClassificationNode extends Node { + + @Since("2.4.0") + def getLabelCount(label: Int): Double = { --- End diff -- Add doc string to this (same for regression node methods) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20786#discussion_r175957767 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala --- @@ -135,32 +175,59 @@ class LeafNode private[ml] ( override private[ml] def maxSplitFeatureIndex(): Int = -1 +} + +/** + * Decision tree leaf node for classification. + * @param prediction Prediction this node makes --- End diff -- I think you don't need to add these Param doc strings since you're overriding methods with doc strings. The base trait doc string should show up. (here and elsewhere) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20786#discussion_r175955388 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala --- @@ -84,35 +86,73 @@ private[ml] object Node { /** * Create a new Node from the old Node format, recursively creating child nodes as needed. */ - def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node = { + def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int], --- End diff -- Please fix scala style (here and elsewhere) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20786#discussion_r175958393 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala --- @@ -240,6 +302,69 @@ private object InternalNode { } } +/** + * Internal Decision Tree node for regression. + * @param prediction Prediction this node would make if it were a leaf node + * @param impurity Impurity measure at this node (for training data) + * @param gain Information gain value. Values less than 0 indicate missing values; + * this quirk will be removed with future updates. + * @param leftChild Left-hand child node + * @param rightChild Right-hand child node + * @param split Information about the test used to split to the left or right child. + */ +class ClassificationInternalNode private[ml] ( +override val prediction: Double, +override val impurity: Double, +override val gain: Double, +override val leftChild: ClassificationNode, +override val rightChild: ClassificationNode, +val split: Split, --- End diff -- override? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20786#discussion_r175957538 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala --- @@ -84,35 +86,73 @@ private[ml] object Node { /** * Create a new Node from the old Node format, recursively creating child nodes as needed. */ - def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node = { + def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int], +isClassification: Boolean): Node = { if (oldNode.isLeaf) { // TODO: Once the implementation has been moved to this API, then include sufficient // statistics here. - new LeafNode(prediction = oldNode.predict.predict, -impurity = oldNode.impurity, impurityStats = null) + if (isClassification) { +new ClassificationLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } else { +new RegressionLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } } else { val gain = if (oldNode.stats.nonEmpty) { oldNode.stats.get.gain } else { 0.0 } - new InternalNode(prediction = oldNode.predict.predict, impurity = oldNode.impurity, -gain = gain, leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures), -rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures), -split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + if (isClassification) { +new ClassificationInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } else { +new RegressionInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } } } } -/** - * Decision tree leaf node. - * @param prediction Prediction this node makes - * @param impurity Impurity measure at this node (for training data) - */ -class LeafNode private[ml] ( -override val prediction: Double, -override val impurity: Double, -override private[ml] val impurityStats: ImpurityCalculator) extends Node { +trait ClassificationNode extends Node { + + @Since("2.4.0") + def getLabelCount(label: Int): Double = { +require(label >= 0 && label < impurityStats.stats.length) +impurityStats.stats(label) + } +} + +trait RegressionNode extends Node { + + @Since("2.4.0") + def getCount(): Double = impurityStats.stats(0) + + @Since("2.4.0") + def getSum(): Double = impurityStats.stats(1) + + @Since("2.4.0") + def getSquareSum(): Double = impurityStats.stats(2) +} + +trait LeafNode extends Node { + + /** Prediction this node make. */ --- End diff -- make -> makes --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20786#discussion_r175959353 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/treeModels.scala --- @@ -212,15 +212,13 @@ private[ml] object TreeEnsembleModel { def computeFeatureImportance( node: Node, importances: OpenHashMap[Int, Double]): Unit = { -node match { - case n: InternalNode => -val feature = n.split.featureIndex -val scaledGain = n.gain * n.impurityStats.count -importances.changeValue(feature, scaledGain, _ + scaledGain) -computeFeatureImportance(n.leftChild, importances) -computeFeatureImportance(n.rightChild, importances) - case n: LeafNode => - // do nothing +if (node.isInstanceOf[InternalNode]) { --- End diff -- Why change this? It may be better to use match-case since that will give warnings if we ever add new types of nodes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20786#discussion_r175957188 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala --- @@ -84,35 +86,73 @@ private[ml] object Node { /** * Create a new Node from the old Node format, recursively creating child nodes as needed. */ - def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node = { + def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int], +isClassification: Boolean): Node = { if (oldNode.isLeaf) { // TODO: Once the implementation has been moved to this API, then include sufficient // statistics here. - new LeafNode(prediction = oldNode.predict.predict, -impurity = oldNode.impurity, impurityStats = null) + if (isClassification) { +new ClassificationLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } else { +new RegressionLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } } else { val gain = if (oldNode.stats.nonEmpty) { oldNode.stats.get.gain } else { 0.0 } - new InternalNode(prediction = oldNode.predict.predict, impurity = oldNode.impurity, -gain = gain, leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures), -rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures), -split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + if (isClassification) { +new ClassificationInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } else { +new RegressionInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } } } } -/** - * Decision tree leaf node. - * @param prediction Prediction this node makes - * @param impurity Impurity measure at this node (for training data) - */ -class LeafNode private[ml] ( -override val prediction: Double, -override val impurity: Double, -override private[ml] val impurityStats: ImpurityCalculator) extends Node { +trait ClassificationNode extends Node { --- End diff -- Please add Since annotations to the traits and classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20786#discussion_r175959372 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/treeModels.scala --- @@ -306,17 +304,21 @@ private[ml] object DecisionTreeModelReadWrite { * The nodes are returned in pre-order traversal (root first) so that it is easy to * get the ID of the subtree's root node. */ -def build(node: Node, id: Int): (Seq[NodeData], Int) = node match { - case n: InternalNode => +def build(node: Node, id: Int): (Seq[NodeData], Int) = { --- End diff -- ditto: why change this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20786#discussion_r175957487 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala --- @@ -84,35 +86,73 @@ private[ml] object Node { /** * Create a new Node from the old Node format, recursively creating child nodes as needed. */ - def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node = { + def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int], +isClassification: Boolean): Node = { if (oldNode.isLeaf) { // TODO: Once the implementation has been moved to this API, then include sufficient // statistics here. - new LeafNode(prediction = oldNode.predict.predict, -impurity = oldNode.impurity, impurityStats = null) + if (isClassification) { +new ClassificationLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } else { +new RegressionLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } } else { val gain = if (oldNode.stats.nonEmpty) { oldNode.stats.get.gain } else { 0.0 } - new InternalNode(prediction = oldNode.predict.predict, impurity = oldNode.impurity, -gain = gain, leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures), -rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures), -split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + if (isClassification) { +new ClassificationInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } else { +new RegressionInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } } } } -/** - * Decision tree leaf node. - * @param prediction Prediction this node makes - * @param impurity Impurity measure at this node (for training data) - */ -class LeafNode private[ml] ( -override val prediction: Double, -override val impurity: Double, -override private[ml] val impurityStats: ImpurityCalculator) extends Node { +trait ClassificationNode extends Node { + + @Since("2.4.0") + def getLabelCount(label: Int): Double = { +require(label >= 0 && label < impurityStats.stats.length) +impurityStats.stats(label) + } +} + +trait RegressionNode extends Node { + + @Since("2.4.0") + def getCount(): Double = impurityStats.stats(0) + + @Since("2.4.0") + def getSum(): Double = impurityStats.stats(1) + + @Since("2.4.0") + def getSquareSum(): Double = impurityStats.stats(2) --- End diff -- rename: getSumOfSquares --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20786#discussion_r175957245 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala --- @@ -84,35 +86,73 @@ private[ml] object Node { /** * Create a new Node from the old Node format, recursively creating child nodes as needed. */ - def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node = { + def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int], +isClassification: Boolean): Node = { if (oldNode.isLeaf) { // TODO: Once the implementation has been moved to this API, then include sufficient // statistics here. - new LeafNode(prediction = oldNode.predict.predict, -impurity = oldNode.impurity, impurityStats = null) + if (isClassification) { +new ClassificationLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } else { +new RegressionLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } } else { val gain = if (oldNode.stats.nonEmpty) { oldNode.stats.get.gain } else { 0.0 } - new InternalNode(prediction = oldNode.predict.predict, impurity = oldNode.impurity, -gain = gain, leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures), -rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures), -split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + if (isClassification) { +new ClassificationInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } else { +new RegressionInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } } } } -/** - * Decision tree leaf node. - * @param prediction Prediction this node makes - * @param impurity Impurity measure at this node (for training data) - */ -class LeafNode private[ml] ( -override val prediction: Double, -override val impurity: Double, -override private[ml] val impurityStats: ImpurityCalculator) extends Node { +trait ClassificationNode extends Node { + + @Since("2.4.0") + def getLabelCount(label: Int): Double = { +require(label >= 0 && label < impurityStats.stats.length) --- End diff -- Add error message with more info --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20786#discussion_r175956585 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala --- @@ -276,8 +276,9 @@ object DecisionTreeClassificationModel extends MLReadable[DecisionTreeClassifica val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val numFeatures = (metadata.metadata \ "numFeatures").extract[Int] val numClasses = (metadata.metadata \ "numClasses").extract[Int] - val root = loadTreeNodes(path, metadata, sparkSession) - val model = new DecisionTreeClassificationModel(metadata.uid, root, numFeatures, numClasses) + val root = loadTreeNodes(path, metadata, sparkSession, true) --- End diff -- scalastyle: use named parameter for last arg "true" since it's unclear what the value means --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20786#discussion_r175957437 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala --- @@ -84,35 +86,73 @@ private[ml] object Node { /** * Create a new Node from the old Node format, recursively creating child nodes as needed. */ - def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node = { + def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int], +isClassification: Boolean): Node = { if (oldNode.isLeaf) { // TODO: Once the implementation has been moved to this API, then include sufficient // statistics here. - new LeafNode(prediction = oldNode.predict.predict, -impurity = oldNode.impurity, impurityStats = null) + if (isClassification) { +new ClassificationLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } else { +new RegressionLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } } else { val gain = if (oldNode.stats.nonEmpty) { oldNode.stats.get.gain } else { 0.0 } - new InternalNode(prediction = oldNode.predict.predict, impurity = oldNode.impurity, -gain = gain, leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures), -rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures), -split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + if (isClassification) { +new ClassificationInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } else { +new RegressionInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } } } } -/** - * Decision tree leaf node. - * @param prediction Prediction this node makes - * @param impurity Impurity measure at this node (for training data) - */ -class LeafNode private[ml] ( -override val prediction: Double, -override val impurity: Double, -override private[ml] val impurityStats: ImpurityCalculator) extends Node { +trait ClassificationNode extends Node { + + @Since("2.4.0") + def getLabelCount(label: Int): Double = { +require(label >= 0 && label < impurityStats.stats.length) +impurityStats.stats(label) + } +} + +trait RegressionNode extends Node { + + @Since("2.4.0") + def getCount(): Double = impurityStats.stats(0) --- End diff -- nit: Let's remove the parentheses for these methods --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20786#discussion_r175955065 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala --- @@ -84,35 +86,73 @@ private[ml] object Node { /** * Create a new Node from the old Node format, recursively creating child nodes as needed. */ - def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node = { + def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int], +isClassification: Boolean): Node = { if (oldNode.isLeaf) { // TODO: Once the implementation has been moved to this API, then include sufficient // statistics here. - new LeafNode(prediction = oldNode.predict.predict, -impurity = oldNode.impurity, impurityStats = null) + if (isClassification) { +new ClassificationLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } else { +new RegressionLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } } else { val gain = if (oldNode.stats.nonEmpty) { oldNode.stats.get.gain } else { 0.0 } - new InternalNode(prediction = oldNode.predict.predict, impurity = oldNode.impurity, -gain = gain, leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures), -rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures), -split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + if (isClassification) { +new ClassificationInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } else { +new RegressionInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } } } } -/** - * Decision tree leaf node. - * @param prediction Prediction this node makes - * @param impurity Impurity measure at this node (for training data) - */ -class LeafNode private[ml] ( --- End diff -- I guess it's fine; let's proceed! I'll keep reviewing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19381: [SPARK-10884][ML] Support prediction on single instance ...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/19381 LGTM Merging after fresh tests run Thanks a lot @WeichenXu123 for the PR and others for reviews! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175908732 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { --- End diff -- make more explicit: + " when keepInvalid = true" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175907866 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -37,24 +37,26 @@ class VectorAssemblerSuite test("assemble") { import org.apache.spark.ml.feature.VectorAssembler.assemble -assert(assemble(0.0) === Vectors.sparse(1, Array.empty, Array.empty)) -assert(assemble(0.0, 1.0) === Vectors.sparse(2, Array(1), Array(1.0))) +assert(assemble(Seq(1), true)(0.0) === Vectors.sparse(1, Array.empty, Array.empty)) +assert(assemble(Seq(1, 1), true)(0.0, 1.0) === Vectors.sparse(2, Array(1), Array(1.0))) val dv = Vectors.dense(2.0, 0.0) -assert(assemble(0.0, dv, 1.0) === Vectors.sparse(4, Array(1, 3), Array(2.0, 1.0))) +assert(assemble(Seq(1, 2, 1), true)(0.0, dv, 1.0) === + Vectors.sparse(4, Array(1, 3), Array(2.0, 1.0))) val sv = Vectors.sparse(2, Array(0, 1), Array(3.0, 4.0)) -assert(assemble(0.0, dv, 1.0, sv) === +assert(assemble(Seq(1, 2, 1, 2), true)(0.0, dv, 1.0, sv) === Vectors.sparse(6, Array(1, 3, 4, 5), Array(2.0, 1.0, 3.0, 4.0))) -for (v <- Seq(1, "a", null)) { - intercept[SparkException](assemble(v)) - intercept[SparkException](assemble(1.0, v)) +for (v <- Seq(1, "a")) { + intercept[SparkException](assemble(Seq(1), true)(v)) + intercept[SparkException](assemble(Seq(1, 1), true)(1.0, v)) } } test("assemble should compress vectors") { import org.apache.spark.ml.feature.VectorAssembler.assemble -val v1 = assemble(0.0, 0.0, 0.0, Vectors.dense(4.0)) +val v1 = assemble(Seq(1, 1, 1, 4), true)(0.0, 0.0, 0.0, Vectors.dense(4.0)) --- End diff -- We probably want this to fail, right? It expects a Vector of length 4 but is given a Vector of length 1. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175904796 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala --- @@ -234,7 +234,7 @@ class StringIndexerModel ( val metadata = NominalAttribute.defaultAttr .withName($(outputCol)).withValues(filteredLabels).toMetadata() // If we are skipping invalid records, filter them out. -val (filteredDataset, keepInvalid) = getHandleInvalid match { --- End diff -- For the record, in general, I would not bother making changes like this. The one exception I do make is IntelliJ style complaints since those can be annoying for developers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175913440 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { --- End diff -- This is great that you're testing this carefully, but I recommend we make sure to pass better exceptions to users. E.g., they won't know what to do with a NullPointerException, so we could instead tell them something like: "Column x in the first row of the dataset has a null entry, but VectorAssembler expected a non-null entry. This can be fixed by explicitly specifying the expected size using VectorSizeHint." --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175912462 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") +assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == "y" -> 2)) --- End diff -- I think you can do a direct comparison: ``` assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")) === Map("y" -> 2)) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175910846 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + + private[feature] def getLengthsFromFirst(dataset: Dataset[_], --- End diff -- Also, it's unclear what this method does until I read the code. Possibly make the method name more explicit ("getVectorLengthsFromFirstRow"), and definitely document that "columns" must all be of Vector type. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175909539 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + + private[feature] def getLengthsFromFirst(dataset: Dataset[_], + columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF.select(columns.map(col): _*).first + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +"Saw null value on the first row: " + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +"Cannot infer vector size from all empty DataFrame" + e.toString) +} + } + + private[feature] def getLengths(dataset: Dataset[_], columns: Seq[String], + handleInvalid: String) = { +val group_sizes = columns.map { c => + c -> AttributeGroup.fromStructField(dataset.schema(c)).size +}.toMap +val missing_columns: Seq[String] = group_sizes.filter(_._2 == -1).keys.toSeq +val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, handleInvalid) match { + case (true, VectorAssembler.ERROR_INVALID) => +getLengthsFromFirst(dataset, missing_columns) + case (true, VectorAssembler.SKIP_INVALID) => +getLengthsFromFirst(dataset.na.drop, missing_columns) + case (true, VectorAssembler.KEEP_INVALID) => throw new RuntimeException( +"Consider using VectorSizeHint for columns: " + missing_columns.mkString("[", ",", "]")) + case (_, _) => Map.empty +} +group_sizes ++ first_sizes + } + + @Since("1.6.0") override def load(path: String): VectorAssembler = super.load(path) - private[feature] def assemble(vv: Any*): Vector = { + private[feature] def assemble(lengths: Seq[Int], keepInvalid: Boolean)(vv: Any*): Vector = { --- End diff -- Also, I'd add doc explaining requirements, especially that this assumes that lengths and vv have the same length. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175910402 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + + private[feature] def getLengthsFromFirst(dataset: Dataset[_], + columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF.select(columns.map(col): _*).first --- End diff -- first -> first() --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175908930 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) --- End diff -- No need to compare with anything; just call assemble() --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175914154 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") +assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == "y" -> 2)) + intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"), Seq("y"))) +intercept[NoSuchElementException]( + VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y"))) + +assert(VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == "y" -> 2)) +intercept[NullPointerException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID)) +intercept[RuntimeException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID)) + } + + test("Handle Invalid should behave properly") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( --- End diff -- Also, if there are "trash" columns not used by VectorAssembler, maybe name them as such and add a few null values in them for better testing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175913755 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") +assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == "y" -> 2)) + intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"), Seq("y"))) +intercept[NoSuchElementException]( + VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y"))) + +assert(VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == "y" -> 2)) +intercept[NullPointerException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID)) +intercept[RuntimeException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID)) + } + + test("Handle Invalid should behave properly") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( --- End diff -- Since this is shared across multiple tests, just make it a shared value. See e.g. https://github.com/apache/spark/blob/master/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala#L55 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175915187 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") +assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == "y" -> 2)) + intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"), Seq("y"))) +intercept[NoSuchElementException]( + VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y"))) + +assert(VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == "y" -> 2)) +intercept[NullPointerException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID)) +intercept[RuntimeException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID)) + } + + test("Handle Invalid should behave properly") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") + +val assembler = new VectorAssembler() + .setInputCols(Array("x", "y", "z", "n")) + .setOutputCol("features") + +// behavior when first row has information +assert(assembler.setHandleInvalid("skip").transform(df).count() == 1) + intercept[RuntimeException](assembler.setHandleInvalid("keep").transform(df).collect()) + intercept[SparkException](assembler.setHandleInvalid("error").transform(df).collect()) + +// numeric column is all null --- End diff -- Did you want to test: * extraction of metadata from the first row (which is what this is testing, I believe), or * transformation on an all-null column (which this never reaches)? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175908248 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -49,32 +53,57 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + /** @group setParam */ + @Since("2.4.0") + def setHandleInvalid(value: String): this.type = set(handleInvalid, value) + + /** + * Param for how to handle invalid data (NULL values). Options are 'skip' (filter out rows with --- End diff -- It would be good to expand this doc to explain the behavior: how various types of invalid values are treated (null, NaN, incorrect Vector length) and how computationally expensive different options can be. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175914695 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") +assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == "y" -> 2)) + intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"), Seq("y"))) +intercept[NoSuchElementException]( + VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y"))) + +assert(VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == "y" -> 2)) +intercept[NullPointerException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID)) +intercept[RuntimeException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID)) + } + + test("Handle Invalid should behave properly") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") + +val assembler = new VectorAssembler() + .setInputCols(Array("x", "y", "z", "n")) + .setOutputCol("features") + +// behavior when first row has information +assert(assembler.setHandleInvalid("skip").transform(df).count() == 1) + intercept[RuntimeException](assembler.setHandleInvalid("keep").transform(df).collect()) --- End diff -- Should this fail? I thought it should pad with NaNs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175915257 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") +assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == "y" -> 2)) + intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"), Seq("y"))) +intercept[NoSuchElementException]( + VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y"))) + +assert(VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == "y" -> 2)) +intercept[NullPointerException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID)) +intercept[RuntimeException](VectorAssembler.getLengths( + df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID)) + } + + test("Handle Invalid should behave properly") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( + (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 7L), + (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L), + (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, Array(1), Array(3.0)), 8L), + (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L) +).toDF("id1", "id2", "x", "y", "name", "z", "n") + +val assembler = new VectorAssembler() + .setInputCols(Array("x", "y", "z", "n")) + .setOutputCol("features") + +// behavior when first row has information +assert(assembler.setHandleInvalid("skip").transform(df).count() == 1) + intercept[RuntimeException](assembler.setHandleInvalid("keep").transform(df).collect()) + intercept[SparkException](assembler.setHandleInvalid("error").transform(df).collect()) + +// numeric column is all null +intercept[RuntimeException]( + assembler.setHandleInvalid("keep").transform(df.filter("id1==3")).count() == 1) + +// vector column is all null --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175909880 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN)) +intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) === + Vectors.dense(1.0, Double.NaN, Double.NaN)) +intercept[SparkException](assemble(Seq(1), false)(null) === Vectors.dense(Double.NaN)) +intercept[SparkException](assemble(Seq(2), false)(null) === + Vectors.dense(Double.NaN, Double.NaN)) + } + + test("get lengths function") { +val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, Long)]( --- End diff -- Why java.lang.Double instead of Double? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175911314 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + + private[feature] def getLengthsFromFirst(dataset: Dataset[_], + columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF.select(columns.map(col): _*).first + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +"Saw null value on the first row: " + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +"Cannot infer vector size from all empty DataFrame" + e.toString) +} + } + + private[feature] def getLengths(dataset: Dataset[_], columns: Seq[String], + handleInvalid: String) = { +val group_sizes = columns.map { c => + c -> AttributeGroup.fromStructField(dataset.schema(c)).size +}.toMap +val missing_columns: Seq[String] = group_sizes.filter(_._2 == -1).keys.toSeq +val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, handleInvalid) match { + case (true, VectorAssembler.ERROR_INVALID) => +getLengthsFromFirst(dataset, missing_columns) + case (true, VectorAssembler.SKIP_INVALID) => +getLengthsFromFirst(dataset.na.drop, missing_columns) --- End diff -- This will drop Rows with NA values in extraneous columns. I.e., even if the VectorAssembler is only assembling columns A and B, if there is a NAN in column C, this will drop that row. Pass the list of columns you care about to the drop() method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175910352 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + + private[feature] def getLengthsFromFirst(dataset: Dataset[_], + columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF.select(columns.map(col): _*).first --- End diff -- Fix (new) IntelliJ style warnings: "toDF" -> "toDF()" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175910188 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + + private[feature] def getLengthsFromFirst(dataset: Dataset[_], --- End diff -- scala style: For multiline class and method headers, put the first argument on the next line, with +4 space indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175906193 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala --- @@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.6.0") object VectorAssembler extends DefaultParamsReadable[VectorAssembler] { + private[feature] val SKIP_INVALID: String = "skip" + private[feature] val ERROR_INVALID: String = "error" + private[feature] val KEEP_INVALID: String = "keep" + private[feature] val supportedHandleInvalids: Array[String] = +Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID) + + + private[feature] def getLengthsFromFirst(dataset: Dataset[_], + columns: Seq[String]): Map[String, Int] = { +try { + val first_row = dataset.toDF.select(columns.map(col): _*).first + columns.zip(first_row.toSeq).map { +case (c, x) => c -> x.asInstanceOf[Vector].size + }.toMap +} catch { + case e: NullPointerException => throw new NullPointerException( +"Saw null value on the first row: " + e.toString) + case e: NoSuchElementException => throw new NoSuchElementException( +"Cannot infer vector size from all empty DataFrame" + e.toString) +} + } + + private[feature] def getLengths(dataset: Dataset[_], columns: Seq[String], + handleInvalid: String) = { +val group_sizes = columns.map { c => + c -> AttributeGroup.fromStructField(dataset.schema(c)).size +}.toMap +val missing_columns: Seq[String] = group_sizes.filter(_._2 == -1).keys.toSeq +val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, handleInvalid) match { + case (true, VectorAssembler.ERROR_INVALID) => +getLengthsFromFirst(dataset, missing_columns) + case (true, VectorAssembler.SKIP_INVALID) => +getLengthsFromFirst(dataset.na.drop, missing_columns) + case (true, VectorAssembler.KEEP_INVALID) => throw new RuntimeException( +"Consider using VectorSizeHint for columns: " + missing_columns.mkString("[", ",", "]")) + case (_, _) => Map.empty +} +group_sizes ++ first_sizes + } + + @Since("1.6.0") override def load(path: String): VectorAssembler = super.load(path) - private[feature] def assemble(vv: Any*): Vector = { + private[feature] def assemble(lengths: Seq[Int], keepInvalid: Boolean)(vv: Any*): Vector = { --- End diff -- nit: Use Array[Int] for faster access --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20829#discussion_r175908774 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -147,4 +149,72 @@ class VectorAssemblerSuite .filter(vectorUDF($"features") > 1) .count() == 1) } + + test("assemble should keep nulls") { +import org.apache.spark.ml.feature.VectorAssembler.assemble +assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, Double.NaN)) +assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, Double.NaN, Double.NaN)) +assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN)) +assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, Double.NaN)) + } + + test("assemble should throw errors") { --- End diff -- similarly: make more explicit: + " when keepInvalid = false" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20829: [SPARK-23690][ML] Add handleinvalid to VectorAssembler
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/20829 @hootoconnor Please refrain from making non-constructive comments. If you did not intend to leave the comment here, please remove it. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-21898][ML] Feature parity for KolmogorovSmirnovTest in MLlib
Repository: spark Updated Branches: refs/heads/master 5e7bc2ace -> 7f5e8aa26 [SPARK-21898][ML] Feature parity for KolmogorovSmirnovTest in MLlib ## What changes were proposed in this pull request? Feature parity for KolmogorovSmirnovTest in MLlib. Implement `DataFrame` interface for `KolmogorovSmirnovTest` in `mllib.stat`. ## How was this patch tested? Test suite added. Author: WeichenXu Author: jkbradley Closes #19108 from WeichenXu123/ml-ks-test. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f5e8aa2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f5e8aa2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f5e8aa2 Branch: refs/heads/master Commit: 7f5e8aa2606b0ee0297ceb6f4603bd368e3b0291 Parents: 5e7bc2a Author: WeichenXu Authored: Tue Mar 20 11:14:34 2018 -0700 Committer: Joseph K. Bradley Committed: Tue Mar 20 11:14:34 2018 -0700 -- .../spark/ml/stat/KolmogorovSmirnovTest.scala | 113 +++ .../ml/stat/JavaKolmogorovSmirnovTestSuite.java | 84 +++ .../ml/stat/KolmogorovSmirnovTestSuite.scala| 140 +++ 3 files changed, 337 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7f5e8aa2/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala new file mode 100644 index 000..8d80e77 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.stat + +import scala.annotation.varargs + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.api.java.function.Function +import org.apache.spark.ml.util.SchemaUtils +import org.apache.spark.mllib.stat.{Statistics => OldStatistics} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.functions.col + +/** + * :: Experimental :: + * + * Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a + * continuous distribution. By comparing the largest difference between the empirical cumulative + * distribution of the sample data and the theoretical distribution we can provide a test for the + * the null hypothesis that the sample data comes from that theoretical distribution. + * For more information on KS Test: + * @see https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test";> + * Kolmogorov-Smirnov test (Wikipedia) + */ +@Experimental +@Since("2.4.0") +object KolmogorovSmirnovTest { + + /** Used to construct output schema of test */ + private case class KolmogorovSmirnovTestResult( + pValue: Double, + statistic: Double) + + private def getSampleRDD(dataset: DataFrame, sampleCol: String): RDD[Double] = { +SchemaUtils.checkNumericType(dataset.schema, sampleCol) +import dataset.sparkSession.implicits._ +dataset.select(col(sampleCol).cast("double")).as[Double].rdd + } + + /** + * Conduct the two-sided Kolmogorov-Smirnov (KS) test for data sampled from a + * continuous distribution. By comparing the largest difference between the empirical cumulative + * distribution of the sample data and the theoretical distribution we can provide a test for the + * the null hypothesis that the sample data comes from that theoretical distribution. + * + * @param dataset a `DataFrame` containing the sample of data to test + * @param sampleCol Name of sample column in dataset, of any numerical type + * @param cdf a `Double => Double` function to calculate the theoretical CDF at a given value + * @return DataFrame containing the test result for the input sampled data. + * This DataFrame will contain a single Row wit
[GitHub] spark issue #19108: [SPARK-21898][ML] Feature parity for KolmogorovSmirnovTe...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/19108 LGTM Thanks for the PR! Merging with master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-23728][BRANCH-2.3] Fix ML tests with expected exceptions running streaming tests
Repository: spark Updated Branches: refs/heads/branch-2.3 80e79430f -> 920493949 [SPARK-23728][BRANCH-2.3] Fix ML tests with expected exceptions running streaming tests ## What changes were proposed in this pull request? The testTransformerByInterceptingException failed to catch the expected message on 2.3 during streaming tests as the feature generated message is not at the direct caused by exception but even one level deeper. ## How was this patch tested? Running the unit tests. Author: âattilapirosâ Closes #20852 from attilapiros/SPARK-23728. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92049394 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92049394 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92049394 Branch: refs/heads/branch-2.3 Commit: 920493949eba77befd67e32f9e6ede5d594bcd56 Parents: 80e7943 Author: âattilapirosâ Authored: Mon Mar 19 10:42:12 2018 -0700 Committer: Joseph K. Bradley Committed: Mon Mar 19 10:42:12 2018 -0700 -- .../src/test/scala/org/apache/spark/ml/util/MLTest.scala | 10 -- 1 file changed, 8 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/92049394/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala b/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala index 795fd0e..23e05ac 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala @@ -119,9 +119,15 @@ trait MLTest extends StreamTest with TempDirectory { self: Suite => expectedMessagePart : String, firstResultCol: String) { +def hasExpectedMessageDirectly(exception: Throwable): Boolean = + exception.getMessage.contains(expectedMessagePart) + def hasExpectedMessage(exception: Throwable): Boolean = - exception.getMessage.contains(expectedMessagePart) || -(exception.getCause != null && exception.getCause.getMessage.contains(expectedMessagePart)) + hasExpectedMessageDirectly(exception) || ( +exception.getCause != null && ( + hasExpectedMessageDirectly(exception.getCause) || ( +exception.getCause.getCause != null && +hasExpectedMessageDirectly(exception.getCause.getCause withClue(s"""Expected message part "${expectedMessagePart}" is not found in DF test.""") { val exceptionOnDf = intercept[Throwable] { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] spark issue #20852: [SPARK-23728][BRANCH-2.3] Fix ML tests with expected exc...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/20852 Apologies for breaking it! This LGTM I'll go ahead and merge it to fix the build, but please comment further on this PR as needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20852: [SPARK-23728][BRANCH-2.3] Fix ML tests with expected exc...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/20852 Merging with branch-2.3 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20852: [SPARK-23728][BRANCH-2.3] Fix ML tests with expec...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20852#discussion_r175524288 --- Diff: mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala --- @@ -119,9 +119,15 @@ trait MLTest extends StreamTest with TempDirectory { self: Suite => expectedMessagePart : String, firstResultCol: String) { +def hasExpectedMessageDirectly(exception: Throwable): Boolean = + exception.getMessage.contains(expectedMessagePart) + def hasExpectedMessage(exception: Throwable): Boolean = - exception.getMessage.contains(expectedMessagePart) || -(exception.getCause != null && exception.getCause.getMessage.contains(expectedMessagePart)) + hasExpectedMessageDirectly(exception) || ( +exception.getCause != null && ( + hasExpectedMessageDirectly(exception.getCause) || ( +exception.getCause.getCause != null && +hasExpectedMessageDirectly(exception.getCause.getCause --- End diff -- I don't think we need to loop further. If the real message is buried this deep, that could be considered a problem in and of itself. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19108: [SPARK-21898][ML] Feature parity for KolmogorovSm...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/19108#discussion_r175521718 --- Diff: mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.stat + +import scala.annotation.varargs + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.util.SchemaUtils +import org.apache.spark.mllib.stat.{Statistics => OldStatistics} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.functions.col + +/** + * :: Experimental :: + * + * Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a + * continuous distribution. By comparing the largest difference between the empirical cumulative + * distribution of the sample data and the theoretical distribution we can provide a test for the + * the null hypothesis that the sample data comes from that theoretical distribution. + * For more information on KS Test: + * @see https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test";> + * Kolmogorov-Smirnov test (Wikipedia) + */ +@Experimental +@Since("2.4.0") +object KolmogorovSmirnovTest { + + /** Used to construct output schema of test */ + private case class KolmogorovSmirnovTestResult( + pValues: Double, --- End diff -- pValues -> pValue --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19108: [SPARK-21898][ML] Feature parity for KolmogorovSm...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/19108#discussion_r175521761 --- Diff: mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.stat + +import scala.annotation.varargs + +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.util.SchemaUtils +import org.apache.spark.mllib.stat.{Statistics => OldStatistics} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.functions.col + +/** + * :: Experimental :: + * + * Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a + * continuous distribution. By comparing the largest difference between the empirical cumulative + * distribution of the sample data and the theoretical distribution we can provide a test for the + * the null hypothesis that the sample data comes from that theoretical distribution. + * For more information on KS Test: + * @see https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test";> + * Kolmogorov-Smirnov test (Wikipedia) + */ +@Experimental +@Since("2.4.0") +object KolmogorovSmirnovTest { + + /** Used to construct output schema of test */ + private case class KolmogorovSmirnovTestResult( + pValues: Double, + statistics: Double) --- End diff -- statistics -> statistic --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19108: [SPARK-21898][ML] Feature parity for KolmogorovSm...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/19108#discussion_r174977495 --- Diff: mllib/src/test/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTestSuite.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.stat + +import org.apache.commons.math3.distribution.{ExponentialDistribution, NormalDistribution} +import org.apache.commons.math3.stat.inference.{KolmogorovSmirnovTest => Math3KSTest} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.util.DefaultReadWriteTest +import org.apache.spark.ml.util.TestingUtils._ +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.sql.Row + +class KolmogorovSmirnovTestSuite + extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { + + import testImplicits._ + + test("1 sample Kolmogorov-Smirnov test: apache commons math3 implementation equivalence") { +// Create theoretical distributions +val stdNormalDist = new NormalDistribution(0, 1) +val expDist = new ExponentialDistribution(0.6) + +// set seeds +val seed = 10L +stdNormalDist.reseedRandomGenerator(seed) +expDist.reseedRandomGenerator(seed) + +// Sample data from the distributions and parallelize it +val n = 10 +val sampledNormArray = stdNormalDist.sample(n) +val sampledNormDF = sc.parallelize(sampledNormArray, 10).toDF("sample") +val sampledExpArray = expDist.sample(n) +val sampledExpDF = sc.parallelize(sampledExpArray, 10).toDF("sample") + +// Use a apache math commons local KS test to verify calculations +val ksTest = new Math3KSTest() +val pThreshold = 0.05 + +// Comparing a standard normal sample to a standard normal distribution --- End diff -- Can this and the next set of code lines be combined into a single helper method? That could help with adding in the test for the uniform distribution as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19108: [SPARK-21898][ML] Feature parity for KolmogorovSm...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/19108#discussion_r174977489 --- Diff: mllib/src/test/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTestSuite.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.stat + +import org.apache.commons.math3.distribution.{ExponentialDistribution, NormalDistribution} +import org.apache.commons.math3.stat.inference.{KolmogorovSmirnovTest => Math3KSTest} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.util.DefaultReadWriteTest +import org.apache.spark.ml.util.TestingUtils._ +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.sql.Row + +class KolmogorovSmirnovTestSuite + extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { + + import testImplicits._ + + test("1 sample Kolmogorov-Smirnov test: apache commons math3 implementation equivalence") { +// Create theoretical distributions +val stdNormalDist = new NormalDistribution(0, 1) +val expDist = new ExponentialDistribution(0.6) + --- End diff -- In the spark.mllib tests, it looks like they meant to test against the uniform distribution but forgot to. Would you mind adding that test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20786#discussion_r174921341 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala --- @@ -84,35 +86,73 @@ private[ml] object Node { /** * Create a new Node from the old Node format, recursively creating child nodes as needed. */ - def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node = { + def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int], +isClassification: Boolean): Node = { if (oldNode.isLeaf) { // TODO: Once the implementation has been moved to this API, then include sufficient // statistics here. - new LeafNode(prediction = oldNode.predict.predict, -impurity = oldNode.impurity, impurityStats = null) + if (isClassification) { +new ClassificationLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } else { +new RegressionLeafNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, impurityStats = null) + } } else { val gain = if (oldNode.stats.nonEmpty) { oldNode.stats.get.gain } else { 0.0 } - new InternalNode(prediction = oldNode.predict.predict, impurity = oldNode.impurity, -gain = gain, leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures), -rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures), -split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + if (isClassification) { +new ClassificationInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, true) +.asInstanceOf[ClassificationNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } else { +new RegressionInternalNode(prediction = oldNode.predict.predict, + impurity = oldNode.impurity, gain = gain, + leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, false) +.asInstanceOf[RegressionNode], + split = Split.fromOld(oldNode.split.get, categoricalFeatures), impurityStats = null) + } } } } -/** - * Decision tree leaf node. - * @param prediction Prediction this node makes - * @param impurity Impurity measure at this node (for training data) - */ -class LeafNode private[ml] ( --- End diff -- While it would be nice to have this be a trait instead of a class, I am worried about breaking public APIs. However, one could argue that this isn't a public API since the constructor is private (though the class is public). I'll CC people on https://issues.apache.org/jira/browse/SPARK-7131 where these classes were made public to get feedback. Let's give a couple of days for feedback before proceeding. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19381: [SPARK-10884][ML] Support prediction on single in...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/19381#discussion_r174862550 --- Diff: mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala --- @@ -109,4 +110,14 @@ trait MLTest extends StreamTest with TempDirectory { self: Suite => testTransformerOnDF(dataframe, transformer, firstResultCol, otherResultCols: _*)(globalCheckFunction) } + + def testPredictorModelSinglePrediction(model: PredictionModel[Vector, _], --- End diff -- Thanks for the updates! Just this comment: Can we please call this method testPredictionModelSinglePrediction to match the name of the class it is testing? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml.featur...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/20686 Merging to branch-2.3 too --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[1/2] spark git commit: [SPARK-22915][MLLIB] Streaming tests for spark.ml.feature, from N to Z
Repository: spark Updated Branches: refs/heads/master 1098933b0 -> 279b3db89 http://git-wip-us.apache.org/repos/asf/spark/blob/279b3db8/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala index 775a04d..df24367 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala @@ -17,17 +17,14 @@ package org.apache.spark.ml.feature -import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.ml.attribute.{Attribute, NominalAttribute} import org.apache.spark.ml.param.ParamsSuite -import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} -import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils} import org.apache.spark.sql.Row import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType} -class StringIndexerSuite - extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { +class StringIndexerSuite extends MLTest with DefaultReadWriteTest { import testImplicits._ @@ -46,19 +43,23 @@ class StringIndexerSuite .setInputCol("label") .setOutputCol("labelIndex") val indexerModel = indexer.fit(df) - MLTestingUtils.checkCopyAndUids(indexer, indexerModel) - -val transformed = indexerModel.transform(df) -val attr = Attribute.fromStructField(transformed.schema("labelIndex")) - .asInstanceOf[NominalAttribute] -assert(attr.values.get === Array("a", "c", "b")) -val output = transformed.select("id", "labelIndex").rdd.map { r => - (r.getInt(0), r.getDouble(1)) -}.collect().toSet // a -> 0, b -> 2, c -> 1 -val expected = Set((0, 0.0), (1, 2.0), (2, 1.0), (3, 0.0), (4, 0.0), (5, 1.0)) -assert(output === expected) +val expected = Seq( + (0, 0.0), + (1, 2.0), + (2, 1.0), + (3, 0.0), + (4, 0.0), + (5, 1.0) +).toDF("id", "labelIndex") + +testTransformerByGlobalCheckFunc[(Int, String)](df, indexerModel, "id", "labelIndex") { rows => + val attr = Attribute.fromStructField(rows.head.schema("labelIndex")) +.asInstanceOf[NominalAttribute] + assert(attr.values.get === Array("a", "c", "b")) + assert(rows.seq === expected.collect().toSeq) +} } test("StringIndexerUnseen") { @@ -70,36 +71,38 @@ class StringIndexerSuite .setInputCol("label") .setOutputCol("labelIndex") .fit(df) + // Verify we throw by default with unseen values -intercept[SparkException] { - indexer.transform(df2).collect() -} +testTransformerByInterceptingException[(Int, String)]( + df2, + indexer, + "Unseen label:", + "labelIndex") -indexer.setHandleInvalid("skip") // Verify that we skip the c record -val transformedSkip = indexer.transform(df2) -val attrSkip = Attribute.fromStructField(transformedSkip.schema("labelIndex")) - .asInstanceOf[NominalAttribute] -assert(attrSkip.values.get === Array("b", "a")) -val outputSkip = transformedSkip.select("id", "labelIndex").rdd.map { r => - (r.getInt(0), r.getDouble(1)) -}.collect().toSet // a -> 1, b -> 0 -val expectedSkip = Set((0, 1.0), (1, 0.0)) -assert(outputSkip === expectedSkip) +indexer.setHandleInvalid("skip") + +val expectedSkip = Seq((0, 1.0), (1, 0.0)).toDF() +testTransformerByGlobalCheckFunc[(Int, String)](df2, indexer, "id", "labelIndex") { rows => + val attrSkip = Attribute.fromStructField(rows.head.schema("labelIndex")) +.asInstanceOf[NominalAttribute] + assert(attrSkip.values.get === Array("b", "a")) + assert(rows.seq === expectedSkip.collect().toSeq) +} indexer.setHandleInvalid("keep") -// Verify that we keep the unseen records -val transformedKeep = indexer.transform(df2) -val attrKeep = Attribute.fromStructField(transformedKeep.schema("labelIndex")) - .asInstanceOf[NominalAttribute] -assert(attrKeep.values.get === Array("b", "a", "__unknown")) -val outputKeep = transformedKeep.select("id", "labelIndex").rdd.map { r => - (r.getInt(0), r.getDouble(1)) -}.collect().toSet + // a -> 1, b -> 0, c -> 2, d -> 3 -val expectedKeep = Set((0, 1.0), (1, 0.0), (2, 2.0), (3, 2.0)) -assert(outputKeep === expectedKeep) +val expectedKeep = Seq((0, 1.0), (1, 0.0), (2, 2.0), (3, 2.0)).toDF() + +// Verify that we keep the unseen records +testTransformerByGlobalCheckFunc[(Int, String)](df2, indexer, "id", "labelIndex") { rows => + val attrKeep = Attribute.fromStructField(rows.head.schema("labelIn
[2/2] spark git commit: [SPARK-22915][MLLIB] Streaming tests for spark.ml.feature, from N to Z
[SPARK-22915][MLLIB] Streaming tests for spark.ml.feature, from N to Z # What changes were proposed in this pull request? Adds structured streaming tests using testTransformer for these suites: - NGramSuite - NormalizerSuite - OneHotEncoderEstimatorSuite - OneHotEncoderSuite - PCASuite - PolynomialExpansionSuite - QuantileDiscretizerSuite - RFormulaSuite - SQLTransformerSuite - StandardScalerSuite - StopWordsRemoverSuite - StringIndexerSuite - TokenizerSuite - RegexTokenizerSuite - VectorAssemblerSuite - VectorIndexerSuite - VectorSizeHintSuite - VectorSlicerSuite - Word2VecSuite # How was this patch tested? They are unit test. Author: âattilapirosâ Closes #20686 from attilapiros/SPARK-22915. (cherry picked from commit 279b3db8970809104c30941254e57e3d62da5041) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0663b611 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0663b611 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0663b611 Branch: refs/heads/branch-2.3 Commit: 0663b61193b37094b9d00c7f2cbb0268ad946e25 Parents: f3efbfa Author: âattilapirosâ Authored: Wed Mar 14 18:36:31 2018 -0700 Committer: Joseph K. Bradley Committed: Wed Mar 14 18:36:41 2018 -0700 -- .../apache/spark/ml/feature/NGramSuite.scala| 23 +-- .../spark/ml/feature/NormalizerSuite.scala | 57 ++ .../feature/OneHotEncoderEstimatorSuite.scala | 193 +- .../spark/ml/feature/OneHotEncoderSuite.scala | 124 ++- .../org/apache/spark/ml/feature/PCASuite.scala | 14 +- .../ml/feature/PolynomialExpansionSuite.scala | 62 +++--- .../ml/feature/QuantileDiscretizerSuite.scala | 198 ++ .../apache/spark/ml/feature/RFormulaSuite.scala | 158 +++--- .../spark/ml/feature/SQLTransformerSuite.scala | 35 ++-- .../spark/ml/feature/StandardScalerSuite.scala | 33 +-- .../ml/feature/StopWordsRemoverSuite.scala | 37 ++-- .../spark/ml/feature/StringIndexerSuite.scala | 204 ++- .../spark/ml/feature/TokenizerSuite.scala | 30 +-- .../spark/ml/feature/VectorIndexerSuite.scala | 183 + .../spark/ml/feature/VectorSizeHintSuite.scala | 88 +--- .../spark/ml/feature/VectorSlicerSuite.scala| 27 +-- .../apache/spark/ml/feature/Word2VecSuite.scala | 28 +-- .../scala/org/apache/spark/ml/util/MLTest.scala | 33 ++- 18 files changed, 809 insertions(+), 718 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0663b611/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala index d4975c0..e5956ee 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala @@ -19,17 +19,15 @@ package org.apache.spark.ml.feature import scala.beans.BeanInfo -import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.util.DefaultReadWriteTest -import org.apache.spark.mllib.util.MLlibTestSparkContext -import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest} +import org.apache.spark.sql.{DataFrame, Row} + @BeanInfo case class NGramTestData(inputTokens: Array[String], wantedNGrams: Array[String]) -class NGramSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { +class NGramSuite extends MLTest with DefaultReadWriteTest { - import org.apache.spark.ml.feature.NGramSuite._ import testImplicits._ test("default behavior yields bigram features") { @@ -83,16 +81,11 @@ class NGramSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRe .setN(3) testDefaultReadWrite(t) } -} - -object NGramSuite extends SparkFunSuite { - def testNGram(t: NGram, dataset: Dataset[_]): Unit = { -t.transform(dataset) - .select("nGrams", "wantedNGrams") - .collect() - .foreach { case Row(actualNGrams, wantedNGrams) => + def testNGram(t: NGram, dataFrame: DataFrame): Unit = { +testTransformer[(Seq[String], Seq[String])](dataFrame, t, "nGrams", "wantedNGrams") { + case Row(actualNGrams : Seq[String], wantedNGrams: Seq[String]) => assert(actualNGrams === wantedNGrams) - } +} } } http://git-wip-us.apache.org/repos/asf/spark/blob/0663b611/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala b/mllib/src/test/scala/org/apache/spark/
[1/2] spark git commit: [SPARK-22915][MLLIB] Streaming tests for spark.ml.feature, from N to Z
Repository: spark Updated Branches: refs/heads/branch-2.3 f3efbfa4b -> 0663b6119 http://git-wip-us.apache.org/repos/asf/spark/blob/0663b611/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala index 775a04d..df24367 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala @@ -17,17 +17,14 @@ package org.apache.spark.ml.feature -import org.apache.spark.{SparkException, SparkFunSuite} import org.apache.spark.ml.attribute.{Attribute, NominalAttribute} import org.apache.spark.ml.param.ParamsSuite -import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} -import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils} import org.apache.spark.sql.Row import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType} -class StringIndexerSuite - extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { +class StringIndexerSuite extends MLTest with DefaultReadWriteTest { import testImplicits._ @@ -46,19 +43,23 @@ class StringIndexerSuite .setInputCol("label") .setOutputCol("labelIndex") val indexerModel = indexer.fit(df) - MLTestingUtils.checkCopyAndUids(indexer, indexerModel) - -val transformed = indexerModel.transform(df) -val attr = Attribute.fromStructField(transformed.schema("labelIndex")) - .asInstanceOf[NominalAttribute] -assert(attr.values.get === Array("a", "c", "b")) -val output = transformed.select("id", "labelIndex").rdd.map { r => - (r.getInt(0), r.getDouble(1)) -}.collect().toSet // a -> 0, b -> 2, c -> 1 -val expected = Set((0, 0.0), (1, 2.0), (2, 1.0), (3, 0.0), (4, 0.0), (5, 1.0)) -assert(output === expected) +val expected = Seq( + (0, 0.0), + (1, 2.0), + (2, 1.0), + (3, 0.0), + (4, 0.0), + (5, 1.0) +).toDF("id", "labelIndex") + +testTransformerByGlobalCheckFunc[(Int, String)](df, indexerModel, "id", "labelIndex") { rows => + val attr = Attribute.fromStructField(rows.head.schema("labelIndex")) +.asInstanceOf[NominalAttribute] + assert(attr.values.get === Array("a", "c", "b")) + assert(rows.seq === expected.collect().toSeq) +} } test("StringIndexerUnseen") { @@ -70,36 +71,38 @@ class StringIndexerSuite .setInputCol("label") .setOutputCol("labelIndex") .fit(df) + // Verify we throw by default with unseen values -intercept[SparkException] { - indexer.transform(df2).collect() -} +testTransformerByInterceptingException[(Int, String)]( + df2, + indexer, + "Unseen label:", + "labelIndex") -indexer.setHandleInvalid("skip") // Verify that we skip the c record -val transformedSkip = indexer.transform(df2) -val attrSkip = Attribute.fromStructField(transformedSkip.schema("labelIndex")) - .asInstanceOf[NominalAttribute] -assert(attrSkip.values.get === Array("b", "a")) -val outputSkip = transformedSkip.select("id", "labelIndex").rdd.map { r => - (r.getInt(0), r.getDouble(1)) -}.collect().toSet // a -> 1, b -> 0 -val expectedSkip = Set((0, 1.0), (1, 0.0)) -assert(outputSkip === expectedSkip) +indexer.setHandleInvalid("skip") + +val expectedSkip = Seq((0, 1.0), (1, 0.0)).toDF() +testTransformerByGlobalCheckFunc[(Int, String)](df2, indexer, "id", "labelIndex") { rows => + val attrSkip = Attribute.fromStructField(rows.head.schema("labelIndex")) +.asInstanceOf[NominalAttribute] + assert(attrSkip.values.get === Array("b", "a")) + assert(rows.seq === expectedSkip.collect().toSeq) +} indexer.setHandleInvalid("keep") -// Verify that we keep the unseen records -val transformedKeep = indexer.transform(df2) -val attrKeep = Attribute.fromStructField(transformedKeep.schema("labelIndex")) - .asInstanceOf[NominalAttribute] -assert(attrKeep.values.get === Array("b", "a", "__unknown")) -val outputKeep = transformedKeep.select("id", "labelIndex").rdd.map { r => - (r.getInt(0), r.getDouble(1)) -}.collect().toSet + // a -> 1, b -> 0, c -> 2, d -> 3 -val expectedKeep = Set((0, 1.0), (1, 0.0), (2, 2.0), (3, 2.0)) -assert(outputKeep === expectedKeep) +val expectedKeep = Seq((0, 1.0), (1, 0.0), (2, 2.0), (3, 2.0)).toDF() + +// Verify that we keep the unseen records +testTransformerByGlobalCheckFunc[(Int, String)](df2, indexer, "id", "labelIndex") { rows => + val attrKeep = Attribute.fromStructField(rows.head.schema("lab
[2/2] spark git commit: [SPARK-22915][MLLIB] Streaming tests for spark.ml.feature, from N to Z
[SPARK-22915][MLLIB] Streaming tests for spark.ml.feature, from N to Z # What changes were proposed in this pull request? Adds structured streaming tests using testTransformer for these suites: - NGramSuite - NormalizerSuite - OneHotEncoderEstimatorSuite - OneHotEncoderSuite - PCASuite - PolynomialExpansionSuite - QuantileDiscretizerSuite - RFormulaSuite - SQLTransformerSuite - StandardScalerSuite - StopWordsRemoverSuite - StringIndexerSuite - TokenizerSuite - RegexTokenizerSuite - VectorAssemblerSuite - VectorIndexerSuite - VectorSizeHintSuite - VectorSlicerSuite - Word2VecSuite # How was this patch tested? They are unit test. Author: âattilapirosâ Closes #20686 from attilapiros/SPARK-22915. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/279b3db8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/279b3db8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/279b3db8 Branch: refs/heads/master Commit: 279b3db8970809104c30941254e57e3d62da5041 Parents: 1098933 Author: âattilapirosâ Authored: Wed Mar 14 18:36:31 2018 -0700 Committer: Joseph K. Bradley Committed: Wed Mar 14 18:36:31 2018 -0700 -- .../apache/spark/ml/feature/NGramSuite.scala| 23 +-- .../spark/ml/feature/NormalizerSuite.scala | 57 ++ .../feature/OneHotEncoderEstimatorSuite.scala | 193 +- .../spark/ml/feature/OneHotEncoderSuite.scala | 124 ++- .../org/apache/spark/ml/feature/PCASuite.scala | 14 +- .../ml/feature/PolynomialExpansionSuite.scala | 62 +++--- .../ml/feature/QuantileDiscretizerSuite.scala | 198 ++ .../apache/spark/ml/feature/RFormulaSuite.scala | 158 +++--- .../spark/ml/feature/SQLTransformerSuite.scala | 35 ++-- .../spark/ml/feature/StandardScalerSuite.scala | 33 +-- .../ml/feature/StopWordsRemoverSuite.scala | 37 ++-- .../spark/ml/feature/StringIndexerSuite.scala | 204 ++- .../spark/ml/feature/TokenizerSuite.scala | 30 +-- .../spark/ml/feature/VectorIndexerSuite.scala | 183 + .../spark/ml/feature/VectorSizeHintSuite.scala | 88 +--- .../spark/ml/feature/VectorSlicerSuite.scala| 27 +-- .../apache/spark/ml/feature/Word2VecSuite.scala | 28 +-- .../scala/org/apache/spark/ml/util/MLTest.scala | 33 ++- 18 files changed, 809 insertions(+), 718 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/279b3db8/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala index d4975c0..e5956ee 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala @@ -19,17 +19,15 @@ package org.apache.spark.ml.feature import scala.beans.BeanInfo -import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.util.DefaultReadWriteTest -import org.apache.spark.mllib.util.MLlibTestSparkContext -import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest} +import org.apache.spark.sql.{DataFrame, Row} + @BeanInfo case class NGramTestData(inputTokens: Array[String], wantedNGrams: Array[String]) -class NGramSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { +class NGramSuite extends MLTest with DefaultReadWriteTest { - import org.apache.spark.ml.feature.NGramSuite._ import testImplicits._ test("default behavior yields bigram features") { @@ -83,16 +81,11 @@ class NGramSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRe .setN(3) testDefaultReadWrite(t) } -} - -object NGramSuite extends SparkFunSuite { - def testNGram(t: NGram, dataset: Dataset[_]): Unit = { -t.transform(dataset) - .select("nGrams", "wantedNGrams") - .collect() - .foreach { case Row(actualNGrams, wantedNGrams) => + def testNGram(t: NGram, dataFrame: DataFrame): Unit = { +testTransformer[(Seq[String], Seq[String])](dataFrame, t, "nGrams", "wantedNGrams") { + case Row(actualNGrams : Seq[String], wantedNGrams: Seq[String]) => assert(actualNGrams === wantedNGrams) - } +} } } http://git-wip-us.apache.org/repos/asf/spark/blob/279b3db8/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala index c75027f..eff57f1 100644 --- a/mllib/src/test/scala/org/apache/spark/m
[GitHub] spark issue #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml.featur...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/20686 Thanks for the updates & all the work this PR took @attilapiros and for the review @WeichenXu123 ! LGTM Merging with master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r174656617 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorSlicerSuite.scala --- @@ -84,26 +84,29 @@ class VectorSlicerSuite extends SparkFunSuite with MLlibTestSparkContext with De val vectorSlicer = new VectorSlicer().setInputCol("features").setOutputCol("result") -def validateResults(df: DataFrame): Unit = { - df.select("result", "expected").collect().foreach { case Row(vec1: Vector, vec2: Vector) => +def validateResults(rows: Seq[Row]): Unit = { + rows.foreach { case Row(vec1: Vector, vec2: Vector) => assert(vec1 === vec2) } - val resultMetadata = AttributeGroup.fromStructField(df.schema("result")) - val expectedMetadata = AttributeGroup.fromStructField(df.schema("expected")) + val resultMetadata = AttributeGroup.fromStructField(rows.head.schema("result")) + val expectedMetadata = AttributeGroup.fromStructField(rows.head.schema("expected")) assert(resultMetadata.numAttributes === expectedMetadata.numAttributes) resultMetadata.attributes.get.zip(expectedMetadata.attributes.get).foreach { case (a, b) => assert(a === b) } } vectorSlicer.setIndices(Array(1, 4)).setNames(Array.empty) -validateResults(vectorSlicer.transform(df)) +testTransformerByGlobalCheckFunc[(Vector, Vector)](df, vectorSlicer, "result", "expected")( --- End diff -- I see, makes sense. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r174535931 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala --- @@ -299,18 +310,17 @@ class StringIndexerSuite .setInputCol("label") .setOutputCol("labelIndex") -val expected = Seq(Set((0, 0.0), (1, 0.0), (2, 2.0), (3, 1.0), (4, 1.0), (5, 0.0)), - Set((0, 2.0), (1, 2.0), (2, 0.0), (3, 1.0), (4, 1.0), (5, 2.0)), - Set((0, 1.0), (1, 1.0), (2, 0.0), (3, 2.0), (4, 2.0), (5, 1.0)), - Set((0, 1.0), (1, 1.0), (2, 2.0), (3, 0.0), (4, 0.0), (5, 1.0))) +val expected = Seq(Seq((0, 0.0), (1, 0.0), (2, 2.0), (3, 1.0), (4, 1.0), (5, 0.0)), --- End diff -- I agree that's correct. The problem is that people tend to see these patterns and copy them without thinking. It's best to follow patterns which help other contributors to avoid making mistakes. I'm OK with leaving it since this issue is scattered throughout MLlib tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173594072 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala --- @@ -299,18 +310,17 @@ class StringIndexerSuite .setInputCol("label") .setOutputCol("labelIndex") -val expected = Seq(Set((0, 0.0), (1, 0.0), (2, 2.0), (3, 1.0), (4, 1.0), (5, 0.0)), - Set((0, 2.0), (1, 2.0), (2, 0.0), (3, 1.0), (4, 1.0), (5, 2.0)), - Set((0, 1.0), (1, 1.0), (2, 0.0), (3, 2.0), (4, 2.0), (5, 1.0)), - Set((0, 1.0), (1, 1.0), (2, 2.0), (3, 0.0), (4, 0.0), (5, 1.0))) +val expected = Seq(Seq((0, 0.0), (1, 0.0), (2, 2.0), (3, 1.0), (4, 1.0), (5, 0.0)), --- End diff -- I'd keep using Set. The point is to be row-order-agnostic since DataFrames do not guarantee stable row ordering in general. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173592896 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala --- @@ -109,16 +111,14 @@ class StringIndexerSuite .setInputCol("label") .setOutputCol("labelIndex") .fit(df) -val transformed = indexer.transform(df) -val attr = Attribute.fromStructField(transformed.schema("labelIndex")) - .asInstanceOf[NominalAttribute] -assert(attr.values.get === Array("100", "300", "200")) -val output = transformed.select("id", "labelIndex").rdd.map { r => - (r.getInt(0), r.getDouble(1)) -}.collect().toSet -// 100 -> 0, 200 -> 2, 300 -> 1 -val expected = Set((0, 0.0), (1, 2.0), (2, 1.0), (3, 0.0), (4, 0.0), (5, 1.0)) -assert(output === expected) +testTransformerByGlobalCheckFunc[(Int, String)](df, indexer, "id", "labelIndex") { rows => + val attr = Attribute.fromStructField(rows.head.schema("labelIndex")) +.asInstanceOf[NominalAttribute] + assert(attr.values.get === Array("100", "300", "200")) + // 100 -> 0, 200 -> 2, 300 -> 1 + val expected = Seq((0, 0.0), (1, 2.0), (2, 1.0), (3, 0.0), (4, 0.0), (5, 1.0)).toDF() --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173600685 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala --- @@ -128,18 +126,29 @@ class VectorIndexerSuite extends SparkFunSuite with MLlibTestSparkContext MLTestingUtils.checkCopyAndUids(vectorIndexer, model) -model.transform(densePoints1) // should work -model.transform(sparsePoints1) // should work +// should work +testTransformer[FeatureData](densePoints1, model, "indexed") { _ => } +// should work +testTransformer[FeatureData](sparsePoints1, model, "indexed") { _ => } + // If the data is local Dataset, it throws AssertionError directly. -intercept[AssertionError] { - model.transform(densePoints2).collect() - logInfo("Did not throw error when fit, transform were called on vectors of different lengths") +withClue("Did not found expected error message when fit, " + + "transform were called on vectors of different lengths") { + testTransformerByInterceptingException[FeatureData]( +densePoints2, +model, +"VectorIndexerModel expected vector of length 3 but found length 4", +"indexed") } // If the data is distributed Dataset, it throws SparkException // which is the wrapper of AssertionError. -intercept[SparkException] { - model.transform(densePoints2.repartition(2)).collect() - logInfo("Did not throw error when fit, transform were called on vectors of different lengths") +withClue("Did not found expected error message when fit, " + --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173593795 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala --- @@ -247,14 +253,18 @@ class StringIndexerSuite .setInputCol("label") .setOutputCol("labelIndex") .fit(df) -val transformed = indexer.transform(df) +val expected1 = Seq(0.0, 2.0, 1.0, 0.0, 0.0, 1.0).map(Tuple1(_)).toDF("labelIndex") --- End diff -- Not needed; this isn't what this unit test is testing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173600331 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala --- @@ -86,16 +94,19 @@ class RFormulaSuite extends MLTest with DefaultReadWriteTest { } } - test("label column already exists but is not numeric type") { + ignore("label column already exists but is not numeric type") { --- End diff -- testTransformerByInterceptingException should use ```(Int, Boolean)``` (not Double) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173600416 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -58,14 +57,16 @@ class VectorAssemblerSuite assert(v2.isInstanceOf[DenseVector]) } - test("VectorAssembler") { + ignore("VectorAssembler") { --- End diff -- Don't ignore; just test on batch. This case is solved with VectorSizeHint. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173584784 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala --- @@ -324,19 +352,24 @@ class QuantileDiscretizerSuite .setStages(Array(discretizerForCol1, discretizerForCol2, discretizerForCol3)) .fit(df) -val resultForMultiCols = plForMultiCols.transform(df) - .select("result1", "result2", "result3") - .collect() +val expected = plForSingleCol.transform(df).select("result1", "result2", "result3").collect() -val resultForSingleCol = plForSingleCol.transform(df) - .select("result1", "result2", "result3") - .collect() +testTransformerByGlobalCheckFunc[(Double, Double, Double)]( + df, + plForMultiCols, + "result1", + "result2", + "result3") { rows => +assert(rows === expected) + } -resultForSingleCol.zip(resultForMultiCols).foreach { - case (rowForSingle, rowForMultiCols) => -assert(rowForSingle.getDouble(0) == rowForMultiCols.getDouble(0) && - rowForSingle.getDouble(1) == rowForMultiCols.getDouble(1) && - rowForSingle.getDouble(2) == rowForMultiCols.getDouble(2)) +testTransformerByGlobalCheckFunc[(Double, Double, Double)]( --- End diff -- I'd remove this. Testing vs. multiCol is already testing batch vs streaming. No need to test singleCol against itself. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173582122 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala --- @@ -90,23 +96,29 @@ class OneHotEncoderSuite val encoder = new OneHotEncoder() .setInputCol("size") .setOutputCol("encoded") -val output = encoder.transform(df) -val group = AttributeGroup.fromStructField(output.schema("encoded")) -assert(group.size === 2) -assert(group.getAttr(0) === BinaryAttribute.defaultAttr.withName("small").withIndex(0)) -assert(group.getAttr(1) === BinaryAttribute.defaultAttr.withName("medium").withIndex(1)) +testTransformerByGlobalCheckFunc[(Double)](df, encoder, "encoded") { rows => + val group = AttributeGroup.fromStructField(rows.head.schema("encoded")) + assert(group.size === 2) + assert(group.getAttr(0) === BinaryAttribute.defaultAttr.withName("small").withIndex(0)) + assert(group.getAttr(1) === BinaryAttribute.defaultAttr.withName("medium").withIndex(1)) +} } - test("input column without ML attribute") { + + ignore("input column without ML attribute") { --- End diff -- Let's keep the test but limit it to batch. People should switch to OneHotEncoderEstimator anyways. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173593049 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala --- @@ -109,16 +111,14 @@ class StringIndexerSuite .setInputCol("label") .setOutputCol("labelIndex") .fit(df) -val transformed = indexer.transform(df) -val attr = Attribute.fromStructField(transformed.schema("labelIndex")) - .asInstanceOf[NominalAttribute] -assert(attr.values.get === Array("100", "300", "200")) -val output = transformed.select("id", "labelIndex").rdd.map { r => - (r.getInt(0), r.getDouble(1)) -}.collect().toSet -// 100 -> 0, 200 -> 2, 300 -> 1 -val expected = Set((0, 0.0), (1, 2.0), (2, 1.0), (3, 0.0), (4, 0.0), (5, 1.0)) -assert(output === expected) +testTransformerByGlobalCheckFunc[(Int, String)](df, indexer, "id", "labelIndex") { rows => + val attr = Attribute.fromStructField(rows.head.schema("labelIndex")) +.asInstanceOf[NominalAttribute] + assert(attr.values.get === Array("100", "300", "200")) + // 100 -> 0, 200 -> 2, 300 -> 1 + val expected = Seq((0, 0.0), (1, 2.0), (2, 1.0), (3, 0.0), (4, 0.0), (5, 1.0)).toDF() --- End diff -- And elsewhere below --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173600642 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala --- @@ -128,18 +126,29 @@ class VectorIndexerSuite extends SparkFunSuite with MLlibTestSparkContext MLTestingUtils.checkCopyAndUids(vectorIndexer, model) -model.transform(densePoints1) // should work -model.transform(sparsePoints1) // should work +// should work +testTransformer[FeatureData](densePoints1, model, "indexed") { _ => } +// should work +testTransformer[FeatureData](sparsePoints1, model, "indexed") { _ => } + // If the data is local Dataset, it throws AssertionError directly. -intercept[AssertionError] { - model.transform(densePoints2).collect() - logInfo("Did not throw error when fit, transform were called on vectors of different lengths") +withClue("Did not found expected error message when fit, " + --- End diff -- Or just use the original text --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173587524 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala --- @@ -313,13 +306,14 @@ class RFormulaSuite extends MLTest with DefaultReadWriteTest { Seq(("male", "foo", 4), ("female", "bar", 4), ("female", "bar", 5), ("male", "baz", 5)) .toDF("id", "a", "b") val model = formula.fit(original) +val attr = NominalAttribute.defaultAttr val expected = Seq( ("male", "foo", 4, Vectors.dense(0.0, 1.0, 4.0), 1.0), ("female", "bar", 4, Vectors.dense(1.0, 0.0, 4.0), 0.0), ("female", "bar", 5, Vectors.dense(1.0, 0.0, 5.0), 0.0), ("male", "baz", 5, Vectors.dense(0.0, 0.0, 5.0), 1.0) ).toDF("id", "a", "b", "features", "label") -// assert(result.schema.toString == resultSchema.toString) + .select($"id", $"a", $"b", $"features", $"label".as("label", attr.toMetadata())) --- End diff -- I'd just keep what you have now. There isn't a great solution here, and what you have fits other code examples in MLlib. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173592635 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala --- @@ -70,36 +71,37 @@ class StringIndexerSuite .setInputCol("label") .setOutputCol("labelIndex") .fit(df) + // Verify we throw by default with unseen values -intercept[SparkException] { - indexer.transform(df2).collect() -} +testTransformerByInterceptingException[(Int, String)]( + df2, + indexer, + "Unseen label:", + "labelIndex") indexer.setHandleInvalid("skip") -// Verify that we skip the c record -val transformedSkip = indexer.transform(df2) -val attrSkip = Attribute.fromStructField(transformedSkip.schema("labelIndex")) - .asInstanceOf[NominalAttribute] -assert(attrSkip.values.get === Array("b", "a")) -val outputSkip = transformedSkip.select("id", "labelIndex").rdd.map { r => - (r.getInt(0), r.getDouble(1)) -}.collect().toSet -// a -> 1, b -> 0 -val expectedSkip = Set((0, 1.0), (1, 0.0)) -assert(outputSkip === expectedSkip) + +testTransformerByGlobalCheckFunc[(Int, String)](df2, indexer, "id", "labelIndex") { rows => + val attrSkip = Attribute.fromStructField(rows.head.schema("labelIndex")) +.asInstanceOf[NominalAttribute] + assert(attrSkip.values.get === Array("b", "a")) + // Verify that we skip the c record + // a -> 1, b -> 0 + val expectedSkip = Seq((0, 1.0), (1, 0.0)).toDF() --- End diff -- This can be moved outside of the testTransformerByGlobalCheckFunc method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173594378 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala --- @@ -328,7 +338,12 @@ class StringIndexerSuite .setOutputCol("CITYIndexed") .fit(dfNoBristol) -val dfWithIndex = model.transform(dfNoBristol) -assert(dfWithIndex.filter($"CITYIndexed" === 1.0).count == 1) +testTransformerByGlobalCheckFunc[(String, String, String)]( + dfNoBristol, + model, + "CITYIndexed") { rows => + val transformed = rows.map { r => r.getDouble(0) }.toDF("CITYIndexed") --- End diff -- It's probably easier to avoid going through a DataFrame here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173600463 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala --- @@ -76,16 +77,18 @@ class VectorAssemblerSuite val assembler = new VectorAssembler() .setInputCols(Array("a", "b", "c")) .setOutputCol("features") -val thrown = intercept[IllegalArgumentException] { - assembler.transform(df) -} -assert(thrown.getMessage contains +testTransformerByInterceptingException[(String, String, String)]( + df, + assembler, "Data type StringType of column a is not supported.\n" + "Data type StringType of column b is not supported.\n" + - "Data type StringType of column c is not supported.") + "Data type StringType of column c is not supported.", + "features") } - test("ML attributes") { + ignore("ML attributes") { --- End diff -- ditto: do not ignore --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173876598 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorSlicerSuite.scala --- @@ -84,26 +84,29 @@ class VectorSlicerSuite extends SparkFunSuite with MLlibTestSparkContext with De val vectorSlicer = new VectorSlicer().setInputCol("features").setOutputCol("result") -def validateResults(df: DataFrame): Unit = { - df.select("result", "expected").collect().foreach { case Row(vec1: Vector, vec2: Vector) => +def validateResults(rows: Seq[Row]): Unit = { + rows.foreach { case Row(vec1: Vector, vec2: Vector) => assert(vec1 === vec2) } - val resultMetadata = AttributeGroup.fromStructField(df.schema("result")) - val expectedMetadata = AttributeGroup.fromStructField(df.schema("expected")) + val resultMetadata = AttributeGroup.fromStructField(rows.head.schema("result")) + val expectedMetadata = AttributeGroup.fromStructField(rows.head.schema("expected")) assert(resultMetadata.numAttributes === expectedMetadata.numAttributes) resultMetadata.attributes.get.zip(expectedMetadata.attributes.get).foreach { case (a, b) => assert(a === b) } } vectorSlicer.setIndices(Array(1, 4)).setNames(Array.empty) -validateResults(vectorSlicer.transform(df)) +testTransformerByGlobalCheckFunc[(Vector, Vector)](df, vectorSlicer, "result", "expected")( --- End diff -- Avoid using a global check function when you don't need to. It'd be better to use testTransformer() since the test is per-row. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173593885 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala --- @@ -247,14 +253,18 @@ class StringIndexerSuite .setInputCol("label") .setOutputCol("labelIndex") .fit(df) -val transformed = indexer.transform(df) +val expected1 = Seq(0.0, 2.0, 1.0, 0.0, 0.0, 1.0).map(Tuple1(_)).toDF("labelIndex") +testTransformerByGlobalCheckFunc[(Int, String)](df, indexer, "labelIndex") { rows => + assert(rows == expected1.collect().seq) +} + val idx2str = new IndexToString() .setInputCol("labelIndex") .setOutputCol("sameLabel") .setLabels(indexer.labels) -idx2str.transform(transformed).select("label", "sameLabel").collect().foreach { - case Row(a: String, b: String) => -assert(a === b) + +testTransformerByGlobalCheckFunc[(Double)](expected1, idx2str, "sameLabel") { rows => --- End diff -- You should be able to test per-row, rather than using a global check function. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173592811 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala --- @@ -70,36 +71,37 @@ class StringIndexerSuite .setInputCol("label") .setOutputCol("labelIndex") .fit(df) + // Verify we throw by default with unseen values -intercept[SparkException] { - indexer.transform(df2).collect() -} +testTransformerByInterceptingException[(Int, String)]( + df2, + indexer, + "Unseen label:", + "labelIndex") indexer.setHandleInvalid("skip") -// Verify that we skip the c record -val transformedSkip = indexer.transform(df2) -val attrSkip = Attribute.fromStructField(transformedSkip.schema("labelIndex")) - .asInstanceOf[NominalAttribute] -assert(attrSkip.values.get === Array("b", "a")) -val outputSkip = transformedSkip.select("id", "labelIndex").rdd.map { r => - (r.getInt(0), r.getDouble(1)) -}.collect().toSet -// a -> 1, b -> 0 -val expectedSkip = Set((0, 1.0), (1, 0.0)) -assert(outputSkip === expectedSkip) + +testTransformerByGlobalCheckFunc[(Int, String)](df2, indexer, "id", "labelIndex") { rows => + val attrSkip = Attribute.fromStructField(rows.head.schema("labelIndex")) +.asInstanceOf[NominalAttribute] + assert(attrSkip.values.get === Array("b", "a")) + // Verify that we skip the c record + // a -> 1, b -> 0 + val expectedSkip = Seq((0, 1.0), (1, 0.0)).toDF() + assert(rows.seq === expectedSkip.collect().toSeq) +} indexer.setHandleInvalid("keep") + // Verify that we keep the unseen records -val transformedKeep = indexer.transform(df2) -val attrKeep = Attribute.fromStructField(transformedKeep.schema("labelIndex")) - .asInstanceOf[NominalAttribute] -assert(attrKeep.values.get === Array("b", "a", "__unknown")) -val outputKeep = transformedKeep.select("id", "labelIndex").rdd.map { r => - (r.getInt(0), r.getDouble(1)) -}.collect().toSet -// a -> 1, b -> 0, c -> 2, d -> 3 -val expectedKeep = Set((0, 1.0), (1, 0.0), (2, 2.0), (3, 2.0)) -assert(outputKeep === expectedKeep) +testTransformerByGlobalCheckFunc[(Int, String)](df2, indexer, "id", "labelIndex") { rows => + val attrKeep = Attribute.fromStructField(rows.head.schema("labelIndex")) +.asInstanceOf[NominalAttribute] + assert(attrKeep.values.get === Array("b", "a", "__unknown")) + // a -> 1, b -> 0, c -> 2, d -> 3 + val expectedKeep = Seq((0, 1.0), (1, 0.0), (2, 2.0), (3, 2.0)).toDF() --- End diff -- ditto: move outside checkFunc --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173600557 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala --- @@ -128,18 +126,29 @@ class VectorIndexerSuite extends SparkFunSuite with MLlibTestSparkContext MLTestingUtils.checkCopyAndUids(vectorIndexer, model) -model.transform(densePoints1) // should work -model.transform(sparsePoints1) // should work +// should work +testTransformer[FeatureData](densePoints1, model, "indexed") { _ => } +// should work +testTransformer[FeatureData](sparsePoints1, model, "indexed") { _ => } + // If the data is local Dataset, it throws AssertionError directly. -intercept[AssertionError] { - model.transform(densePoints2).collect() - logInfo("Did not throw error when fit, transform were called on vectors of different lengths") +withClue("Did not found expected error message when fit, " + --- End diff -- found -> find --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173584864 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala --- @@ -364,18 +397,26 @@ class QuantileDiscretizerSuite .setOutputCols(Array("result1", "result2", "result3")) .setNumBucketsArray(Array(10, 10, 10)) -val result1 = discretizerSingleNumBuckets.fit(df).transform(df) - .select("result1", "result2", "result3") - .collect() -val result2 = discretizerNumBucketsArray.fit(df).transform(df) - .select("result1", "result2", "result3") - .collect() - -result1.zip(result2).foreach { - case (row1, row2) => -assert(row1.getDouble(0) == row2.getDouble(0) && - row1.getDouble(1) == row2.getDouble(1) && - row1.getDouble(2) == row2.getDouble(2)) +val model = discretizerSingleNumBuckets.fit(df) +val expected = model.transform(df).select("result1", "result2", "result3").collect() + + +testTransformerByGlobalCheckFunc[(Double, Double, Double)]( + df, + model, + "result1", + "result2", + "result3") { rows => + assert(rows === expected) +} + +testTransformerByGlobalCheckFunc[(Double, Double, Double)]( --- End diff -- Is this a repeat of the test just above? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173600517 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala --- @@ -128,18 +126,29 @@ class VectorIndexerSuite extends SparkFunSuite with MLlibTestSparkContext MLTestingUtils.checkCopyAndUids(vectorIndexer, model) -model.transform(densePoints1) // should work -model.transform(sparsePoints1) // should work +// should work --- End diff -- We can remove "should work" comments : P --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml.featur...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/20686 I'll do a complete review now! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173582018 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala --- @@ -90,23 +96,29 @@ class OneHotEncoderSuite val encoder = new OneHotEncoder() .setInputCol("size") .setOutputCol("encoded") -val output = encoder.transform(df) -val group = AttributeGroup.fromStructField(output.schema("encoded")) -assert(group.size === 2) -assert(group.getAttr(0) === BinaryAttribute.defaultAttr.withName("small").withIndex(0)) -assert(group.getAttr(1) === BinaryAttribute.defaultAttr.withName("medium").withIndex(1)) +testTransformerByGlobalCheckFunc[(Double)](df, encoder, "encoded") { rows => + val group = AttributeGroup.fromStructField(rows.head.schema("encoded")) + assert(group.size === 2) + assert(group.getAttr(0) === BinaryAttribute.defaultAttr.withName("small").withIndex(0)) + assert(group.getAttr(1) === BinaryAttribute.defaultAttr.withName("medium").withIndex(1)) +} } - test("input column without ML attribute") { + + ignore("input column without ML attribute") { --- End diff -- Let's keep the test but limit it to batch. People should switch to OneHotEncoderEstimator anyways. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173580629 --- Diff: mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala --- @@ -108,5 +111,29 @@ trait MLTest extends StreamTest with TempDirectory { self: Suite => otherResultCols: _*)(globalCheckFunction) testTransformerOnDF(dataframe, transformer, firstResultCol, otherResultCols: _*)(globalCheckFunction) +} + + def testTransformerByInterceptingException[A : Encoder]( +dataframe: DataFrame, +transformer: Transformer, +expectedMessagePart : String, +firstResultCol: String) { + +def hasExpectedMessage(exception: Throwable): Boolean = --- End diff -- Just curious: Did you have to add the getCause case because of streaming throwing wrapped exceptions? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173580016 --- Diff: mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala --- @@ -108,5 +111,29 @@ trait MLTest extends StreamTest with TempDirectory { self: Suite => otherResultCols: _*)(globalCheckFunction) testTransformerOnDF(dataframe, transformer, firstResultCol, otherResultCols: _*)(globalCheckFunction) +} + + def testTransformerByInterceptingException[A : Encoder]( +dataframe: DataFrame, +transformer: Transformer, +expectedMessagePart : String, +firstResultCol: String) { + +def hasExpectedMessage(exception: Throwable): Boolean = --- End diff -- Since most other tests check parts of the message, I'm OK with this setup. When we don't think the message will remain stable, we can pass an empty string for expectedMessagePart. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173555129 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala --- @@ -17,94 +17,72 @@ package org.apache.spark.ml.feature -import org.apache.spark.SparkFunSuite import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors} -import org.apache.spark.ml.util.DefaultReadWriteTest +import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest} import org.apache.spark.ml.util.TestingUtils._ -import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.{DataFrame, Row} -class NormalizerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { +class NormalizerSuite extends MLTest with DefaultReadWriteTest { import testImplicits._ - @transient var data: Array[Vector] = _ - @transient var dataFrame: DataFrame = _ - @transient var normalizer: Normalizer = _ - @transient var l1Normalized: Array[Vector] = _ - @transient var l2Normalized: Array[Vector] = _ + @transient val data: Seq[Vector] = Seq( +Vectors.sparse(3, Seq((0, -2.0), (1, 2.3))), +Vectors.dense(0.0, 0.0, 0.0), +Vectors.dense(0.6, -1.1, -3.0), +Vectors.sparse(3, Seq((1, 0.91), (2, 3.2))), +Vectors.sparse(3, Seq((0, 5.7), (1, 0.72), (2, 2.7))), +Vectors.sparse(3, Seq())) --- End diff -- I'd prefer to revert these changes. As far as I know, nothing is broken, and this is a common pattern used in many parts of MLlib tests. I think the main reason to move data around would be to have actual + expected values side-by-side for easier reading. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173554643 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala --- @@ -19,61 +19,59 @@ package org.apache.spark.ml.feature import scala.beans.BeanInfo -import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.util.DefaultReadWriteTest -import org.apache.spark.mllib.util.MLlibTestSparkContext -import org.apache.spark.sql.{Dataset, Row} +import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest} +import org.apache.spark.sql.{DataFrame, Row} + @BeanInfo case class NGramTestData(inputTokens: Array[String], wantedNGrams: Array[String]) -class NGramSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { +class NGramSuite extends MLTest with DefaultReadWriteTest { - import org.apache.spark.ml.feature.NGramSuite._ import testImplicits._ test("default behavior yields bigram features") { val nGram = new NGram() .setInputCol("inputTokens") .setOutputCol("nGrams") -val dataset = Seq(NGramTestData( +val dataFrame = Seq(NGramTestData( --- End diff -- These kinds of changes are not necessary and make the PR a lot longer. Would you mind reverting them? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20686#discussion_r173556190 --- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala --- @@ -17,94 +17,72 @@ package org.apache.spark.ml.feature -import org.apache.spark.SparkFunSuite import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors} -import org.apache.spark.ml.util.DefaultReadWriteTest +import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest} import org.apache.spark.ml.util.TestingUtils._ -import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.{DataFrame, Row} -class NormalizerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { +class NormalizerSuite extends MLTest with DefaultReadWriteTest { import testImplicits._ - @transient var data: Array[Vector] = _ - @transient var dataFrame: DataFrame = _ - @transient var normalizer: Normalizer = _ --- End diff -- I will say, though, that I'm happy with moving Normalizer into individual tests. It's weird how it is shared here since it's mutated within tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml.featur...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/20686 Thanks for the PR @attilapiros and @WeichenXu123 for the review! I'll take a look now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19381: [SPARK-10884][ML] Support prediction on single instance ...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/19381 Thanks for the PR! Can you please add Since annotations? Also, can the test code be consolidated? Maybe you can have a helper function taking a model and dataset, usable with all Predictors. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17466: [SPARK-14681][ML] Added getter for impurityStats
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/17466 @shaynativ Sorry for the inactivity here. Btw, for the JIRA & PR title question above, I'd recommend checking out http://spark.apache.org/contributing.html Since @WeichenXu123 opened a fresh PR for this, would you mind working with him on it? We can close this issue / PR for now. Thank you! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-18630][PYTHON][ML] Move del method from JavaParams to JavaWrapper; add tests
Repository: spark Updated Branches: refs/heads/master 508573958 -> 7706eea6a [SPARK-18630][PYTHON][ML] Move del method from JavaParams to JavaWrapper; add tests The `__del__` method that explicitly detaches the object was moved from `JavaParams` to `JavaWrapper` class, this way model summaries could also be garbage collected in Java. A test case was added to make sure that relevant error messages are thrown after the objects are deleted. I ran pyspark tests agains `pyspark-ml` module `./python/run-tests --python-executables=$(which python) --modules=pyspark-ml` Author: Yogesh Garg Closes #20724 from yogeshg/java_wrapper_memory. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7706eea6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7706eea6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7706eea6 Branch: refs/heads/master Commit: 7706eea6a8bdcd73e9dde5212368f8825e2f1801 Parents: 5085739 Author: Yogesh Garg Authored: Mon Mar 5 15:53:10 2018 -0800 Committer: Joseph K. Bradley Committed: Mon Mar 5 15:53:10 2018 -0800 -- python/pyspark/ml/tests.py | 39 +++ python/pyspark/ml/wrapper.py | 8 2 files changed, 43 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7706eea6/python/pyspark/ml/tests.py -- diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 1168859..6dee693 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -173,6 +173,45 @@ class MockModel(MockTransformer, Model, HasFake): pass +class JavaWrapperMemoryTests(SparkSessionTestCase): + +def test_java_object_gets_detached(self): +df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)), + (0.0, 2.0, Vectors.sparse(1, [], []))], +["label", "weight", "features"]) +lr = LinearRegression(maxIter=1, regParam=0.0, solver="normal", weightCol="weight", + fitIntercept=False) + +model = lr.fit(df) +summary = model.summary + +self.assertIsInstance(model, JavaWrapper) +self.assertIsInstance(summary, JavaWrapper) +self.assertIsInstance(model, JavaParams) +self.assertNotIsInstance(summary, JavaParams) + +error_no_object = 'Target Object ID does not exist for this gateway' + +self.assertIn("LinearRegression_", model._java_obj.toString()) +self.assertIn("LinearRegressionTrainingSummary", summary._java_obj.toString()) + +model.__del__() + +with self.assertRaisesRegexp(py4j.protocol.Py4JError, error_no_object): +model._java_obj.toString() +self.assertIn("LinearRegressionTrainingSummary", summary._java_obj.toString()) + +try: +summary.__del__() +except: +pass + +with self.assertRaisesRegexp(py4j.protocol.Py4JError, error_no_object): +model._java_obj.toString() +with self.assertRaisesRegexp(py4j.protocol.Py4JError, error_no_object): +summary._java_obj.toString() + + class ParamTypeConversionTests(PySparkTestCase): """ Test that param type conversion happens. http://git-wip-us.apache.org/repos/asf/spark/blob/7706eea6/python/pyspark/ml/wrapper.py -- diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py index 0f846fb..5061f64 100644 --- a/python/pyspark/ml/wrapper.py +++ b/python/pyspark/ml/wrapper.py @@ -36,6 +36,10 @@ class JavaWrapper(object): super(JavaWrapper, self).__init__() self._java_obj = java_obj +def __del__(self): +if SparkContext._active_spark_context and self._java_obj is not None: +SparkContext._active_spark_context._gateway.detach(self._java_obj) + @classmethod def _create_from_java_class(cls, java_class, *args): """ @@ -100,10 +104,6 @@ class JavaParams(JavaWrapper, Params): __metaclass__ = ABCMeta -def __del__(self): -if SparkContext._active_spark_context: -SparkContext._active_spark_context._gateway.detach(self._java_obj) - def _make_java_param_pair(self, param, value): """ Makes a Java param pair. - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] spark issue #20724: [SPARK-18630][PYTHON][ML] Move del method from JavaParam...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/20724 Thanks @yogeshg and @WeichenXu123 ! I verified the new test fails without the wrapper fix. LGTM Merging with master I won't bother backporting this since it's a small issue. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20724: [SPARK-18630][PYTHON][ML] Move del method from Ja...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20724#discussion_r172367567 --- Diff: python/pyspark/ml/tests.py --- @@ -173,6 +173,45 @@ class MockModel(MockTransformer, Model, HasFake): pass +class JavaWrapperMemoryTests(SparkSessionTestCase): + +def test_java_object_gets_detached(self): +df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)), + (0.0, 2.0, Vectors.sparse(1, [], []))], +["label", "weight", "features"]) +lr = LinearRegression(maxIter=1, regParam=0.0, solver="normal", weightCol="weight", + fitIntercept=False) + +model = lr.fit(df) +summary = model.summary + +self.assertIsInstance(model, JavaWrapper) +self.assertIsInstance(summary, JavaWrapper) +self.assertIsInstance(model, JavaParams) +self.assertNotIsInstance(summary, JavaParams) + +error_no_object = 'Target Object ID does not exist for this gateway' + +self.assertIn("LinearRegression_", model._java_obj.toString()) +self.assertIn("LinearRegressionTrainingSummary", summary._java_obj.toString()) + +model.__del__() + +with self.assertRaisesRegexp(py4j.protocol.Py4JError, error_no_object): +model._java_obj.toString() +self.assertIn("LinearRegressionTrainingSummary", summary._java_obj.toString()) + +try: +summary.__del__() +except: --- End diff -- Clarified offline: This was needed to run the test before the fix in wrapper.py to verify that it failed before the fix. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
spark git commit: [SPARK-22882][ML][TESTS] ML test for structured streaming: ml.classification
Repository: spark Updated Branches: refs/heads/master 4586eada4 -> 98a5c0a35 [SPARK-22882][ML][TESTS] ML test for structured streaming: ml.classification ## What changes were proposed in this pull request? adding Structured Streaming tests for all Models/Transformers in spark.ml.classification ## How was this patch tested? N/A Author: WeichenXu Closes #20121 from WeichenXu123/ml_stream_test_classification. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/98a5c0a3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/98a5c0a3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/98a5c0a3 Branch: refs/heads/master Commit: 98a5c0a35f0a24730f5074522939acf57ef95422 Parents: 4586ead Author: WeichenXu Authored: Mon Mar 5 10:50:00 2018 -0800 Committer: Joseph K. Bradley Committed: Mon Mar 5 10:50:00 2018 -0800 -- .../DecisionTreeClassifierSuite.scala | 29 ++- .../ml/classification/GBTClassifierSuite.scala | 77 ++- .../ml/classification/LinearSVCSuite.scala | 15 +- .../LogisticRegressionSuite.scala | 229 +++ .../MultilayerPerceptronClassifierSuite.scala | 44 ++-- .../ml/classification/NaiveBayesSuite.scala | 47 ++-- .../ml/classification/OneVsRestSuite.scala | 21 +- .../ProbabilisticClassifierSuite.scala | 29 +-- .../RandomForestClassifierSuite.scala | 16 +- 9 files changed, 202 insertions(+), 305 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/98a5c0a3/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala index 38b265d..eeb0324 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala @@ -23,15 +23,14 @@ import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.tree.{CategoricalSplit, InternalNode, LeafNode} import org.apache.spark.ml.tree.impl.TreeTests -import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} +import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils} import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint} -import org.apache.spark.mllib.tree.{DecisionTree => OldDecisionTree, DecisionTreeSuite => OldDecisionTreeSuite} -import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.mllib.tree.{DecisionTree => OldDecisionTree, + DecisionTreeSuite => OldDecisionTreeSuite} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row} -class DecisionTreeClassifierSuite - extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { +class DecisionTreeClassifierSuite extends MLTest with DefaultReadWriteTest { import DecisionTreeClassifierSuite.compareAPIs import testImplicits._ @@ -251,20 +250,18 @@ class DecisionTreeClassifierSuite MLTestingUtils.checkCopyAndUids(dt, newTree) -val predictions = newTree.transform(newData) - .select(newTree.getPredictionCol, newTree.getRawPredictionCol, newTree.getProbabilityCol) - .collect() - -predictions.foreach { case Row(pred: Double, rawPred: Vector, probPred: Vector) => - assert(pred === rawPred.argmax, -s"Expected prediction $pred but calculated ${rawPred.argmax} from rawPrediction.") - val sum = rawPred.toArray.sum - assert(Vectors.dense(rawPred.toArray.map(_ / sum)) === probPred, -"probability prediction mismatch") +testTransformer[(Vector, Double)](newData, newTree, + "prediction", "rawPrediction", "probability") { + case Row(pred: Double, rawPred: Vector, probPred: Vector) => +assert(pred === rawPred.argmax, + s"Expected prediction $pred but calculated ${rawPred.argmax} from rawPrediction.") +val sum = rawPred.toArray.sum +assert(Vectors.dense(rawPred.toArray.map(_ / sum)) === probPred, + "probability prediction mismatch") } ProbabilisticClassifierSuite.testPredictMethods[ - Vector, DecisionTreeClassificationModel](newTree, newData) + Vector, DecisionTreeClassificationModel](this, newTree, newData) } test("training with 1-category categorical feature") { http://git-wip-us.apache.org/repos/asf/spark/blob/98a5c0a3/mllib/src/test/scala/org/apache/spark/ml/classification/GBTClassifierSuite.scala ---
[GitHub] spark pull request #20121: [SPARK-22882][ML][TESTS] ML test for structured s...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/20121#discussion_r172288890 --- Diff: mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala --- @@ -2567,10 +2504,13 @@ class LogisticRegressionSuite val model1 = lr.fit(smallBinaryDataset) val lr2 = new LogisticRegression().setInitialModel(model1).setMaxIter(5).setFamily("binomial") val model2 = lr2.fit(smallBinaryDataset) -val predictions1 = model1.transform(smallBinaryDataset).select("prediction").collect() -val predictions2 = model2.transform(smallBinaryDataset).select("prediction").collect() -predictions1.zip(predictions2).foreach { case (Row(p1: Double), Row(p2: Double)) => - assert(p1 === p2) +val binaryExpected = model1.transform(smallBinaryDataset).select("prediction").collect() + .map(_.getDouble(0)) +for (model <- Seq(model1, model2)) { --- End diff -- My thought is that testing binaryExpected (from model1) against model2 would already test the 2 things we care about: * batch vs streaming prediction * initial model I'll just merge this though since it's not a big deal (just a bit longer testing time). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org