[GitHub] spark pull request #20875: [MINOR] Fix Java lint from new JavaKolmogorovSmir...

2018-03-21 Thread jkbradley
GitHub user jkbradley opened a pull request:

https://github.com/apache/spark/pull/20875

[MINOR] Fix Java lint from new JavaKolmogorovSmirnovTestSuite

## What changes were proposed in this pull request?

Fix lint-java from https://github.com/apache/spark/pull/19108 addition of 
JavaKolmogorovSmirnovTestSuite


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jkbradley/spark kstest-lint-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20875.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20875


commit d3236cce0113177c0807937c8795daf51524dd6a
Author: Joseph K. Bradley 
Date:   2018-03-21T19:36:31Z

fix java lint




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-10884][ML] Support prediction on single instance for regression and classification related models

2018-03-21 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 500b21c3d -> bf09f2f71


[SPARK-10884][ML] Support prediction on single instance for regression and 
classification related models

## What changes were proposed in this pull request?

Support prediction on single instance for regression and classification related 
models (i.e., PredictionModel, ClassificationModel and their sub classes).
Add corresponding test cases.

## How was this patch tested?

Test cases added.

Author: WeichenXu 

Closes #19381 from WeichenXu123/single_prediction.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf09f2f7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf09f2f7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf09f2f7

Branch: refs/heads/master
Commit: bf09f2f71276d3b3a84a8f89109bd785a066c3e6
Parents: 500b21c
Author: WeichenXu 
Authored: Wed Mar 21 09:39:14 2018 -0700
Committer: Joseph K. Bradley 
Committed: Wed Mar 21 09:39:14 2018 -0700

--
 .../scala/org/apache/spark/ml/Predictor.scala   |  5 ++--
 .../spark/ml/classification/Classifier.scala|  6 ++---
 .../classification/DecisionTreeClassifier.scala |  2 +-
 .../spark/ml/classification/GBTClassifier.scala |  2 +-
 .../spark/ml/classification/LinearSVC.scala |  2 +-
 .../ml/classification/LogisticRegression.scala  |  2 +-
 .../MultilayerPerceptronClassifier.scala|  2 +-
 .../ml/regression/DecisionTreeRegressor.scala   |  2 +-
 .../spark/ml/regression/GBTRegressor.scala  |  2 +-
 .../GeneralizedLinearRegression.scala   |  2 +-
 .../spark/ml/regression/LinearRegression.scala  |  2 +-
 .../ml/regression/RandomForestRegressor.scala   |  2 +-
 .../DecisionTreeClassifierSuite.scala   | 17 +-
 .../ml/classification/GBTClassifierSuite.scala  |  9 
 .../ml/classification/LinearSVCSuite.scala  |  6 +
 .../LogisticRegressionSuite.scala   |  9 
 .../MultilayerPerceptronClassifierSuite.scala   | 12 ++
 .../ml/classification/NaiveBayesSuite.scala | 22 ++
 .../RandomForestClassifierSuite.scala   | 16 +
 .../regression/DecisionTreeRegressorSuite.scala | 15 
 .../spark/ml/regression/GBTRegressorSuite.scala |  8 +++
 .../GeneralizedLinearRegressionSuite.scala  |  8 +++
 .../ml/regression/LinearRegressionSuite.scala   |  7 ++
 .../regression/RandomForestRegressorSuite.scala | 24 
 .../scala/org/apache/spark/ml/util/MLTest.scala | 15 ++--
 25 files changed, 176 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/bf09f2f7/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala 
b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
index 08b0cb9..d8f3dfa 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
@@ -219,7 +219,8 @@ abstract class PredictionModel[FeaturesType, M <: 
PredictionModel[FeaturesType,
 
   /**
* Predict label for the given features.
-   * This internal method is used to implement `transform()` and output 
[[predictionCol]].
+   * This method is used to implement `transform()` and output 
[[predictionCol]].
*/
-  protected def predict(features: FeaturesType): Double
+  @Since("2.4.0")
+  def predict(features: FeaturesType): Double
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bf09f2f7/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala 
b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala
index 9d1d5aa..7e5790a 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.ml.classification
 
 import org.apache.spark.SparkException
-import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.annotation.{DeveloperApi, Since}
 import org.apache.spark.ml.{PredictionModel, Predictor, PredictorParams}
 import org.apache.spark.ml.feature.LabeledPoint
 import org.apache.spark.ml.linalg.{Vector, VectorUDT}
@@ -192,12 +192,12 @@ abstract class ClassificationModel[FeaturesType, M <: 
ClassificationModel[Featur
 
   /**
* Predict label for the given features.
-   * This internal method is used to implement `transform()` and output 
[[predictionCol]].
+   * This method is used to implement `transform()` and output 
[[predictionCol]].
*
 

[GitHub] spark issue #19381: [SPARK-10884][ML] Support prediction on single instance ...

2018-03-21 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/19381
  
Merging with master


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20633: [SPARK-23455][ML] Default Params in ML should be ...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20633#discussion_r175965924
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/util/DefaultReadWriteTest.scala ---
@@ -169,4 +179,54 @@ class DefaultReadWriteSuite extends SparkFunSuite with 
MLlibTestSparkContext
 val myParams = new MyParams("my_params")
 testDefaultReadWrite(myParams)
   }
+
+  test("default param shouldn't become user-supplied param after 
persistence") {
+val myParams = new MyParams("my_params")
+myParams.set(myParams.shouldNotSetIfSetintParamWithDefault, 1)
+myParams.checkExclusiveParams()
+val loadedMyParams = testDefaultReadWrite(myParams)
+loadedMyParams.checkExclusiveParams()
+assert(loadedMyParams.getDefault(loadedMyParams.intParamWithDefault) ==
+  myParams.getDefault(myParams.intParamWithDefault))
+
+loadedMyParams.set(myParams.intParamWithDefault, 1)
+intercept[SparkException] {
+  loadedMyParams.checkExclusiveParams()
+}
+  }
+
+  test("User-suppiled value for default param should be kept after 
persistence") {
--- End diff --

suppiled -> supplied


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20633: [SPARK-23455][ML] Default Params in ML should be ...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20633#discussion_r175965272
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala ---
@@ -351,27 +359,88 @@ private[ml] object DefaultParamsReader {
   timestamp: Long,
   sparkVersion: String,
   params: JValue,
+  defaultParams: JValue,
   metadata: JValue,
   metadataJson: String) {
 
+
+private def getValueFromParams(params: JValue): Seq[(String, JValue)] 
= {
+  params match {
+case JObject(pairs) => pairs
+case _ =>
+  throw new IllegalArgumentException(
+s"Cannot recognize JSON metadata: $metadataJson.")
+  }
+}
+
 /**
  * Get the JSON value of the [[org.apache.spark.ml.param.Param]] of 
the given name.
  * This can be useful for getting a Param value before an instance of 
`Params`
- * is available.
+ * is available. This will look up `params` first, if not existing 
then looking up
+ * `defaultParams`.
  */
 def getParamValue(paramName: String): JValue = {
   implicit val format = DefaultFormats
-  params match {
+
+  // Looking up for `params` first.
+  var pairs = getValueFromParams(params)
+  var foundPairs = pairs.filter { case (pName, jsonValue) =>
+pName == paramName
+  }
+  if (foundPairs.length == 0) {
+// Looking up for `defaultParams` then.
+pairs = getValueFromParams(defaultParams)
+foundPairs = pairs.filter { case (pName, jsonValue) =>
+  pName == paramName
+}
+  }
+  assert(foundPairs.length == 1, s"Expected one instance of Param 
'$paramName' but found" +
+s" ${foundPairs.length} in JSON Params: " + 
pairs.map(_.toString).mkString(", "))
+
+  foundPairs.map(_._2).head
+}
+
+/**
+ * Extract Params from metadata, and set them in the instance.
+ * This works if all Params (except params included by `skipParams` 
list) implement
+ * [[org.apache.spark.ml.param.Param.jsonDecode()]].
+ *
+ * @param skipParams The params included in `skipParams` won't be set. 
This is useful if some
+ *   params don't implement 
[[org.apache.spark.ml.param.Param.jsonDecode()]]
+ *   and need special handling.
+ */
+def getAndSetParams(
+instance: Params,
+skipParams: Option[List[String]] = None): Unit = {
+  setParams(instance, false, skipParams)
+  setParams(instance, true, skipParams)
+}
+
+private def setParams(
+instance: Params,
+isDefault: Boolean,
+skipParams: Option[List[String]]): Unit = {
+  implicit val format = DefaultFormats
+  val (major, minor) = VersionUtils.majorMinorVersion(sparkVersion)
+  val paramsToSet = if (isDefault) defaultParams else params
+  paramsToSet match {
 case JObject(pairs) =>
-  val values = pairs.filter { case (pName, jsonValue) =>
-pName == paramName
-  }.map(_._2)
-  assert(values.length == 1, s"Expected one instance of Param 
'$paramName' but found" +
-s" ${values.length} in JSON Params: " + 
pairs.map(_.toString).mkString(", "))
-  values.head
+  pairs.foreach { case (paramName, jsonValue) =>
+if (skipParams == None || !skipParams.get.contains(paramName)) 
{
+  val param = instance.getParam(paramName)
+  val value = param.jsonDecode(compact(render(jsonValue)))
+  if (isDefault) {
+instance.setDefault(param, value)
+  } else {
+instance.set(param, value)
+  }
+}
+  }
+// For metadata file prior to Spark 2.4, there is no default 
section.
+case JNothing if isDefault && (major == 2 && minor < 4 || major < 
2) =>
--- End diff --

This logic would be simpler if this check were put in the getAndSetParams 
method, which could just skip calling setParams(instance, true, skipParams) for 
Spark 2.3-.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20633: [SPARK-23455][ML] Default Params in ML should be ...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20633#discussion_r175961795
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/param/params.scala ---
@@ -791,7 +791,7 @@ trait Params extends Identifiable with Serializable {
*   this method gets called.
* @param value  the default value
*/
-  protected final def setDefault[T](param: Param[T], value: T): this.type 
= {
+  private[ml] final def setDefault[T](param: Param[T], value: T): 
this.type = {
--- End diff --

We should leave this as protected.  It's important that 3rd-party libraries 
be able to extend MLlib APIs, and this is one they need.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20633: [SPARK-23455][ML] Default Params in ML should be ...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20633#discussion_r175966231
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/util/DefaultReadWriteTest.scala ---
@@ -169,4 +179,54 @@ class DefaultReadWriteSuite extends SparkFunSuite with 
MLlibTestSparkContext
 val myParams = new MyParams("my_params")
 testDefaultReadWrite(myParams)
   }
+
+  test("default param shouldn't become user-supplied param after 
persistence") {
+val myParams = new MyParams("my_params")
+myParams.set(myParams.shouldNotSetIfSetintParamWithDefault, 1)
+myParams.checkExclusiveParams()
+val loadedMyParams = testDefaultReadWrite(myParams)
+loadedMyParams.checkExclusiveParams()
+assert(loadedMyParams.getDefault(loadedMyParams.intParamWithDefault) ==
+  myParams.getDefault(myParams.intParamWithDefault))
+
+loadedMyParams.set(myParams.intParamWithDefault, 1)
+intercept[SparkException] {
+  loadedMyParams.checkExclusiveParams()
+}
+  }
+
+  test("User-suppiled value for default param should be kept after 
persistence") {
+val myParams = new MyParams("my_params")
+myParams.set(myParams.intParamWithDefault, 100)
+val loadedMyParams = testDefaultReadWrite(myParams)
+assert(loadedMyParams.get(myParams.intParamWithDefault).get == 100)
+  }
+
+  test("Read metadata without default field prior to 2.4") {
--- End diff --

Nice!  I like this setup.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20633: [SPARK-23455][ML] Default Params in ML should be ...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20633#discussion_r175963464
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala ---
@@ -351,27 +359,88 @@ private[ml] object DefaultParamsReader {
   timestamp: Long,
   sparkVersion: String,
   params: JValue,
+  defaultParams: JValue,
   metadata: JValue,
   metadataJson: String) {
 
+
+private def getValueFromParams(params: JValue): Seq[(String, JValue)] 
= {
+  params match {
+case JObject(pairs) => pairs
+case _ =>
+  throw new IllegalArgumentException(
+s"Cannot recognize JSON metadata: $metadataJson.")
+  }
+}
+
 /**
  * Get the JSON value of the [[org.apache.spark.ml.param.Param]] of 
the given name.
  * This can be useful for getting a Param value before an instance of 
`Params`
- * is available.
+ * is available. This will look up `params` first, if not existing 
then looking up
+ * `defaultParams`.
  */
 def getParamValue(paramName: String): JValue = {
   implicit val format = DefaultFormats
-  params match {
+
+  // Looking up for `params` first.
+  var pairs = getValueFromParams(params)
+  var foundPairs = pairs.filter { case (pName, jsonValue) =>
+pName == paramName
+  }
+  if (foundPairs.length == 0) {
+// Looking up for `defaultParams` then.
+pairs = getValueFromParams(defaultParams)
+foundPairs = pairs.filter { case (pName, jsonValue) =>
+  pName == paramName
+}
+  }
+  assert(foundPairs.length == 1, s"Expected one instance of Param 
'$paramName' but found" +
+s" ${foundPairs.length} in JSON Params: " + 
pairs.map(_.toString).mkString(", "))
+
+  foundPairs.map(_._2).head
+}
+
+/**
+ * Extract Params from metadata, and set them in the instance.
+ * This works if all Params (except params included by `skipParams` 
list) implement
+ * [[org.apache.spark.ml.param.Param.jsonDecode()]].
+ *
+ * @param skipParams The params included in `skipParams` won't be set. 
This is useful if some
+ *   params don't implement 
[[org.apache.spark.ml.param.Param.jsonDecode()]]
+ *   and need special handling.
+ */
+def getAndSetParams(
+instance: Params,
+skipParams: Option[List[String]] = None): Unit = {
+  setParams(instance, false, skipParams)
--- End diff --

style nit: It's nice to pass boolean args by name


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20633: [SPARK-23455][ML] Default Params in ML should be ...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20633#discussion_r175962188
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala ---
@@ -296,14 +297,19 @@ private[ml] object DefaultParamsWriter {
   paramMap: Option[JValue] = None): String = {
 val uid = instance.uid
 val cls = instance.getClass.getName
-val params = 
instance.extractParamMap().toSeq.asInstanceOf[Seq[ParamPair[Any]]]
+val params = instance.paramMap.toSeq
+val defaultParams = instance.defaultParamMap.toSeq
 val jsonParams = paramMap.getOrElse(render(params.map { case 
ParamPair(p, v) =>
   p.name -> parse(p.jsonEncode(v))
 }.toList))
+val jsonDefaultParams = render(defaultParams.map { case ParamPair(p, 
v) =>
+  p.name -> parse(p.jsonEncode(v))
+}.toList)
 val basicMetadata = ("class" -> cls) ~
   ("timestamp" -> System.currentTimeMillis()) ~
   ("sparkVersion" -> sc.version) ~
   ("uid" -> uid) ~
+  ("defaultParamMap" -> jsonDefaultParams) ~
--- End diff --

nit: How about putting this below paramMap since that's nicer for viewing 
the JSON?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20786#discussion_r175957326
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala ---
@@ -84,35 +86,73 @@ private[ml] object Node {
   /**
* Create a new Node from the old Node format, recursively creating 
child nodes as needed.
*/
-  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node 
= {
+  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int],
+isClassification: Boolean): Node = {
 if (oldNode.isLeaf) {
   // TODO: Once the implementation has been moved to this API, then 
include sufficient
   //   statistics here.
-  new LeafNode(prediction = oldNode.predict.predict,
-impurity = oldNode.impurity, impurityStats = null)
+  if (isClassification) {
+new ClassificationLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  } else {
+new RegressionLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  }
 } else {
   val gain = if (oldNode.stats.nonEmpty) {
 oldNode.stats.get.gain
   } else {
 0.0
   }
-  new InternalNode(prediction = oldNode.predict.predict, impurity = 
oldNode.impurity,
-gain = gain, leftChild = fromOld(oldNode.leftNode.get, 
categoricalFeatures),
-rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures),
-split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  if (isClassification) {
+new ClassificationInternalNode(prediction = 
oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  } else {
+new RegressionInternalNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  }
 }
   }
 }
 
-/**
- * Decision tree leaf node.
- * @param prediction  Prediction this node makes
- * @param impurity  Impurity measure at this node (for training data)
- */
-class LeafNode private[ml] (
-override val prediction: Double,
-override val impurity: Double,
-override private[ml] val impurityStats: ImpurityCalculator) extends 
Node {
+trait ClassificationNode extends Node {
+
+  @Since("2.4.0")
+  def getLabelCount(label: Int): Double = {
--- End diff --

Add doc string to this (same for regression node methods)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20786#discussion_r175957767
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala ---
@@ -135,32 +175,59 @@ class LeafNode private[ml] (
 
   override private[ml] def maxSplitFeatureIndex(): Int = -1
 
+}
+
+/**
+ * Decision tree leaf node for classification.
+ * @param prediction  Prediction this node makes
--- End diff --

I think you don't need to add these Param doc strings since you're 
overriding methods with doc strings.  The base trait doc string should show up. 
 (here and elsewhere)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20786#discussion_r175955388
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala ---
@@ -84,35 +86,73 @@ private[ml] object Node {
   /**
* Create a new Node from the old Node format, recursively creating 
child nodes as needed.
*/
-  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node 
= {
+  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int],
--- End diff --

Please fix scala style  (here and elsewhere)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20786#discussion_r175958393
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala ---
@@ -240,6 +302,69 @@ private object InternalNode {
   }
 }
 
+/**
+ * Internal Decision Tree node for regression.
+ * @param prediction  Prediction this node would make if it were a leaf 
node
+ * @param impurity  Impurity measure at this node (for training data)
+ * @param gain Information gain value. Values less than 0 indicate missing 
values;
+ * this quirk will be removed with future updates.
+ * @param leftChild  Left-hand child node
+ * @param rightChild  Right-hand child node
+ * @param split  Information about the test used to split to the left or 
right child.
+ */
+class ClassificationInternalNode private[ml] (
+override val prediction: Double,
+override val impurity: Double,
+override val gain: Double,
+override val leftChild: ClassificationNode,
+override val rightChild: ClassificationNode,
+val split: Split,
--- End diff --

override?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20786#discussion_r175957538
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala ---
@@ -84,35 +86,73 @@ private[ml] object Node {
   /**
* Create a new Node from the old Node format, recursively creating 
child nodes as needed.
*/
-  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node 
= {
+  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int],
+isClassification: Boolean): Node = {
 if (oldNode.isLeaf) {
   // TODO: Once the implementation has been moved to this API, then 
include sufficient
   //   statistics here.
-  new LeafNode(prediction = oldNode.predict.predict,
-impurity = oldNode.impurity, impurityStats = null)
+  if (isClassification) {
+new ClassificationLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  } else {
+new RegressionLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  }
 } else {
   val gain = if (oldNode.stats.nonEmpty) {
 oldNode.stats.get.gain
   } else {
 0.0
   }
-  new InternalNode(prediction = oldNode.predict.predict, impurity = 
oldNode.impurity,
-gain = gain, leftChild = fromOld(oldNode.leftNode.get, 
categoricalFeatures),
-rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures),
-split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  if (isClassification) {
+new ClassificationInternalNode(prediction = 
oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  } else {
+new RegressionInternalNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  }
 }
   }
 }
 
-/**
- * Decision tree leaf node.
- * @param prediction  Prediction this node makes
- * @param impurity  Impurity measure at this node (for training data)
- */
-class LeafNode private[ml] (
-override val prediction: Double,
-override val impurity: Double,
-override private[ml] val impurityStats: ImpurityCalculator) extends 
Node {
+trait ClassificationNode extends Node {
+
+  @Since("2.4.0")
+  def getLabelCount(label: Int): Double = {
+require(label >= 0 && label < impurityStats.stats.length)
+impurityStats.stats(label)
+  }
+}
+
+trait RegressionNode extends Node {
+
+  @Since("2.4.0")
+  def getCount(): Double = impurityStats.stats(0)
+
+  @Since("2.4.0")
+  def getSum(): Double = impurityStats.stats(1)
+
+  @Since("2.4.0")
+  def getSquareSum(): Double = impurityStats.stats(2)
+}
+
+trait LeafNode extends Node {
+
+  /** Prediction this node make. */
--- End diff --

make -> makes


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20786#discussion_r175959353
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/treeModels.scala ---
@@ -212,15 +212,13 @@ private[ml] object TreeEnsembleModel {
   def computeFeatureImportance(
   node: Node,
   importances: OpenHashMap[Int, Double]): Unit = {
-node match {
-  case n: InternalNode =>
-val feature = n.split.featureIndex
-val scaledGain = n.gain * n.impurityStats.count
-importances.changeValue(feature, scaledGain, _ + scaledGain)
-computeFeatureImportance(n.leftChild, importances)
-computeFeatureImportance(n.rightChild, importances)
-  case n: LeafNode =>
-  // do nothing
+if (node.isInstanceOf[InternalNode]) {
--- End diff --

Why change this?  It may be better to use match-case since that will give 
warnings if we ever add new types of nodes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20786#discussion_r175957188
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala ---
@@ -84,35 +86,73 @@ private[ml] object Node {
   /**
* Create a new Node from the old Node format, recursively creating 
child nodes as needed.
*/
-  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node 
= {
+  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int],
+isClassification: Boolean): Node = {
 if (oldNode.isLeaf) {
   // TODO: Once the implementation has been moved to this API, then 
include sufficient
   //   statistics here.
-  new LeafNode(prediction = oldNode.predict.predict,
-impurity = oldNode.impurity, impurityStats = null)
+  if (isClassification) {
+new ClassificationLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  } else {
+new RegressionLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  }
 } else {
   val gain = if (oldNode.stats.nonEmpty) {
 oldNode.stats.get.gain
   } else {
 0.0
   }
-  new InternalNode(prediction = oldNode.predict.predict, impurity = 
oldNode.impurity,
-gain = gain, leftChild = fromOld(oldNode.leftNode.get, 
categoricalFeatures),
-rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures),
-split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  if (isClassification) {
+new ClassificationInternalNode(prediction = 
oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  } else {
+new RegressionInternalNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  }
 }
   }
 }
 
-/**
- * Decision tree leaf node.
- * @param prediction  Prediction this node makes
- * @param impurity  Impurity measure at this node (for training data)
- */
-class LeafNode private[ml] (
-override val prediction: Double,
-override val impurity: Double,
-override private[ml] val impurityStats: ImpurityCalculator) extends 
Node {
+trait ClassificationNode extends Node {
--- End diff --

Please add Since annotations to the traits and classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20786#discussion_r175959372
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/treeModels.scala ---
@@ -306,17 +304,21 @@ private[ml] object DecisionTreeModelReadWrite {
  * The nodes are returned in pre-order traversal (root first) 
so that it is easy to
  * get the ID of the subtree's root node.
  */
-def build(node: Node, id: Int): (Seq[NodeData], Int) = node match {
-  case n: InternalNode =>
+def build(node: Node, id: Int): (Seq[NodeData], Int) = {
--- End diff --

ditto: why change this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20786#discussion_r175957487
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala ---
@@ -84,35 +86,73 @@ private[ml] object Node {
   /**
* Create a new Node from the old Node format, recursively creating 
child nodes as needed.
*/
-  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node 
= {
+  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int],
+isClassification: Boolean): Node = {
 if (oldNode.isLeaf) {
   // TODO: Once the implementation has been moved to this API, then 
include sufficient
   //   statistics here.
-  new LeafNode(prediction = oldNode.predict.predict,
-impurity = oldNode.impurity, impurityStats = null)
+  if (isClassification) {
+new ClassificationLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  } else {
+new RegressionLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  }
 } else {
   val gain = if (oldNode.stats.nonEmpty) {
 oldNode.stats.get.gain
   } else {
 0.0
   }
-  new InternalNode(prediction = oldNode.predict.predict, impurity = 
oldNode.impurity,
-gain = gain, leftChild = fromOld(oldNode.leftNode.get, 
categoricalFeatures),
-rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures),
-split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  if (isClassification) {
+new ClassificationInternalNode(prediction = 
oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  } else {
+new RegressionInternalNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  }
 }
   }
 }
 
-/**
- * Decision tree leaf node.
- * @param prediction  Prediction this node makes
- * @param impurity  Impurity measure at this node (for training data)
- */
-class LeafNode private[ml] (
-override val prediction: Double,
-override val impurity: Double,
-override private[ml] val impurityStats: ImpurityCalculator) extends 
Node {
+trait ClassificationNode extends Node {
+
+  @Since("2.4.0")
+  def getLabelCount(label: Int): Double = {
+require(label >= 0 && label < impurityStats.stats.length)
+impurityStats.stats(label)
+  }
+}
+
+trait RegressionNode extends Node {
+
+  @Since("2.4.0")
+  def getCount(): Double = impurityStats.stats(0)
+
+  @Since("2.4.0")
+  def getSum(): Double = impurityStats.stats(1)
+
+  @Since("2.4.0")
+  def getSquareSum(): Double = impurityStats.stats(2)
--- End diff --

rename: getSumOfSquares


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20786#discussion_r175957245
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala ---
@@ -84,35 +86,73 @@ private[ml] object Node {
   /**
* Create a new Node from the old Node format, recursively creating 
child nodes as needed.
*/
-  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node 
= {
+  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int],
+isClassification: Boolean): Node = {
 if (oldNode.isLeaf) {
   // TODO: Once the implementation has been moved to this API, then 
include sufficient
   //   statistics here.
-  new LeafNode(prediction = oldNode.predict.predict,
-impurity = oldNode.impurity, impurityStats = null)
+  if (isClassification) {
+new ClassificationLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  } else {
+new RegressionLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  }
 } else {
   val gain = if (oldNode.stats.nonEmpty) {
 oldNode.stats.get.gain
   } else {
 0.0
   }
-  new InternalNode(prediction = oldNode.predict.predict, impurity = 
oldNode.impurity,
-gain = gain, leftChild = fromOld(oldNode.leftNode.get, 
categoricalFeatures),
-rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures),
-split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  if (isClassification) {
+new ClassificationInternalNode(prediction = 
oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  } else {
+new RegressionInternalNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  }
 }
   }
 }
 
-/**
- * Decision tree leaf node.
- * @param prediction  Prediction this node makes
- * @param impurity  Impurity measure at this node (for training data)
- */
-class LeafNode private[ml] (
-override val prediction: Double,
-override val impurity: Double,
-override private[ml] val impurityStats: ImpurityCalculator) extends 
Node {
+trait ClassificationNode extends Node {
+
+  @Since("2.4.0")
+  def getLabelCount(label: Int): Double = {
+require(label >= 0 && label < impurityStats.stats.length)
--- End diff --

Add error message with more info


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20786#discussion_r175956585
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/DecisionTreeClassifier.scala
 ---
@@ -276,8 +276,9 @@ object DecisionTreeClassificationModel extends 
MLReadable[DecisionTreeClassifica
   val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
   val numFeatures = (metadata.metadata \ "numFeatures").extract[Int]
   val numClasses = (metadata.metadata \ "numClasses").extract[Int]
-  val root = loadTreeNodes(path, metadata, sparkSession)
-  val model = new DecisionTreeClassificationModel(metadata.uid, root, 
numFeatures, numClasses)
+  val root = loadTreeNodes(path, metadata, sparkSession, true)
--- End diff --

scalastyle: use named parameter for last arg "true" since it's unclear what 
the value means


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20786#discussion_r175957437
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala ---
@@ -84,35 +86,73 @@ private[ml] object Node {
   /**
* Create a new Node from the old Node format, recursively creating 
child nodes as needed.
*/
-  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node 
= {
+  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int],
+isClassification: Boolean): Node = {
 if (oldNode.isLeaf) {
   // TODO: Once the implementation has been moved to this API, then 
include sufficient
   //   statistics here.
-  new LeafNode(prediction = oldNode.predict.predict,
-impurity = oldNode.impurity, impurityStats = null)
+  if (isClassification) {
+new ClassificationLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  } else {
+new RegressionLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  }
 } else {
   val gain = if (oldNode.stats.nonEmpty) {
 oldNode.stats.get.gain
   } else {
 0.0
   }
-  new InternalNode(prediction = oldNode.predict.predict, impurity = 
oldNode.impurity,
-gain = gain, leftChild = fromOld(oldNode.leftNode.get, 
categoricalFeatures),
-rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures),
-split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  if (isClassification) {
+new ClassificationInternalNode(prediction = 
oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  } else {
+new RegressionInternalNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  }
 }
   }
 }
 
-/**
- * Decision tree leaf node.
- * @param prediction  Prediction this node makes
- * @param impurity  Impurity measure at this node (for training data)
- */
-class LeafNode private[ml] (
-override val prediction: Double,
-override val impurity: Double,
-override private[ml] val impurityStats: ImpurityCalculator) extends 
Node {
+trait ClassificationNode extends Node {
+
+  @Since("2.4.0")
+  def getLabelCount(label: Int): Double = {
+require(label >= 0 && label < impurityStats.stats.length)
+impurityStats.stats(label)
+  }
+}
+
+trait RegressionNode extends Node {
+
+  @Since("2.4.0")
+  def getCount(): Double = impurityStats.stats(0)
--- End diff --

nit: Let's remove the parentheses for these methods


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20786#discussion_r175955065
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala ---
@@ -84,35 +86,73 @@ private[ml] object Node {
   /**
* Create a new Node from the old Node format, recursively creating 
child nodes as needed.
*/
-  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node 
= {
+  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int],
+isClassification: Boolean): Node = {
 if (oldNode.isLeaf) {
   // TODO: Once the implementation has been moved to this API, then 
include sufficient
   //   statistics here.
-  new LeafNode(prediction = oldNode.predict.predict,
-impurity = oldNode.impurity, impurityStats = null)
+  if (isClassification) {
+new ClassificationLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  } else {
+new RegressionLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  }
 } else {
   val gain = if (oldNode.stats.nonEmpty) {
 oldNode.stats.get.gain
   } else {
 0.0
   }
-  new InternalNode(prediction = oldNode.predict.predict, impurity = 
oldNode.impurity,
-gain = gain, leftChild = fromOld(oldNode.leftNode.get, 
categoricalFeatures),
-rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures),
-split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  if (isClassification) {
+new ClassificationInternalNode(prediction = 
oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  } else {
+new RegressionInternalNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  }
 }
   }
 }
 
-/**
- * Decision tree leaf node.
- * @param prediction  Prediction this node makes
- * @param impurity  Impurity measure at this node (for training data)
- */
-class LeafNode private[ml] (
--- End diff --

I guess it's fine; let's proceed!  I'll keep reviewing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19381: [SPARK-10884][ML] Support prediction on single instance ...

2018-03-20 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/19381
  
LGTM
Merging after fresh tests run
Thanks a lot @WeichenXu123 for the PR and others for reviews!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175908732
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
--- End diff --

make more explicit: + " when keepInvalid = true"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175907866
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -37,24 +37,26 @@ class VectorAssemblerSuite
 
   test("assemble") {
 import org.apache.spark.ml.feature.VectorAssembler.assemble
-assert(assemble(0.0) === Vectors.sparse(1, Array.empty, Array.empty))
-assert(assemble(0.0, 1.0) === Vectors.sparse(2, Array(1), Array(1.0)))
+assert(assemble(Seq(1), true)(0.0) === Vectors.sparse(1, Array.empty, 
Array.empty))
+assert(assemble(Seq(1, 1), true)(0.0, 1.0) === Vectors.sparse(2, 
Array(1), Array(1.0)))
 val dv = Vectors.dense(2.0, 0.0)
-assert(assemble(0.0, dv, 1.0) === Vectors.sparse(4, Array(1, 3), 
Array(2.0, 1.0)))
+assert(assemble(Seq(1, 2, 1), true)(0.0, dv, 1.0) ===
+  Vectors.sparse(4, Array(1, 3), Array(2.0, 1.0)))
 val sv = Vectors.sparse(2, Array(0, 1), Array(3.0, 4.0))
-assert(assemble(0.0, dv, 1.0, sv) ===
+assert(assemble(Seq(1, 2, 1, 2), true)(0.0, dv, 1.0, sv) ===
   Vectors.sparse(6, Array(1, 3, 4, 5), Array(2.0, 1.0, 3.0, 4.0)))
-for (v <- Seq(1, "a", null)) {
-  intercept[SparkException](assemble(v))
-  intercept[SparkException](assemble(1.0, v))
+for (v <- Seq(1, "a")) {
+  intercept[SparkException](assemble(Seq(1), true)(v))
+  intercept[SparkException](assemble(Seq(1, 1), true)(1.0, v))
 }
   }
 
   test("assemble should compress vectors") {
 import org.apache.spark.ml.feature.VectorAssembler.assemble
-val v1 = assemble(0.0, 0.0, 0.0, Vectors.dense(4.0))
+val v1 = assemble(Seq(1, 1, 1, 4), true)(0.0, 0.0, 0.0, 
Vectors.dense(4.0))
--- End diff --

We probably want this to fail, right?  It expects a Vector of length 4 but 
is given a Vector of length 1.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175904796
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala ---
@@ -234,7 +234,7 @@ class StringIndexerModel (
 val metadata = NominalAttribute.defaultAttr
   .withName($(outputCol)).withValues(filteredLabels).toMetadata()
 // If we are skipping invalid records, filter them out.
-val (filteredDataset, keepInvalid) = getHandleInvalid match {
--- End diff --

For the record, in general, I would not bother making changes like this.  
The one exception I do make is IntelliJ style complaints since those can be 
annoying for developers.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175913440
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
--- End diff --

This is great that you're testing this carefully, but I recommend we make 
sure to pass better exceptions to users.  E.g., they won't know what to do with 
a NullPointerException, so we could instead tell them something like: "Column x 
in the first row of the dataset has a null entry, but VectorAssembler expected 
a non-null entry.  This can be fixed by explicitly specifying the expected size 
using VectorSizeHint."


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175912462
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == 
"y" -> 2))
--- End diff --

I think you can do a direct comparison:
```
assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")) === Map("y" -> 
2))
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175910846
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+
+  private[feature] def getLengthsFromFirst(dataset: Dataset[_],
--- End diff --

Also, it's unclear what this method does until I read the code.  Possibly 
make the method name more explicit ("getVectorLengthsFromFirstRow"), and 
definitely document that "columns" must all be of Vector type.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175909539
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+
+  private[feature] def getLengthsFromFirst(dataset: Dataset[_],
+   columns: Seq[String]): 
Map[String, Int] = {
+try {
+  val first_row = dataset.toDF.select(columns.map(col): _*).first
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+"Saw null value on the first row: " + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+"Cannot infer vector size from all empty DataFrame" + e.toString)
+}
+  }
+
+  private[feature] def getLengths(dataset: Dataset[_], columns: 
Seq[String],
+  handleInvalid: String) = {
+val group_sizes = columns.map { c =>
+  c -> AttributeGroup.fromStructField(dataset.schema(c)).size
+}.toMap
+val missing_columns: Seq[String] = group_sizes.filter(_._2 == 
-1).keys.toSeq
+val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, 
handleInvalid) match {
+  case (true, VectorAssembler.ERROR_INVALID) =>
+getLengthsFromFirst(dataset, missing_columns)
+  case (true, VectorAssembler.SKIP_INVALID) =>
+getLengthsFromFirst(dataset.na.drop, missing_columns)
+  case (true, VectorAssembler.KEEP_INVALID) => throw new 
RuntimeException(
+"Consider using VectorSizeHint for columns: " + 
missing_columns.mkString("[", ",", "]"))
+  case (_, _) => Map.empty
+}
+group_sizes ++ first_sizes
+  }
+
+
   @Since("1.6.0")
   override def load(path: String): VectorAssembler = super.load(path)
 
-  private[feature] def assemble(vv: Any*): Vector = {
+  private[feature] def assemble(lengths: Seq[Int], keepInvalid: 
Boolean)(vv: Any*): Vector = {
--- End diff --

Also, I'd add doc explaining requirements, especially that this assumes 
that lengths and vv have the same length.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175910402
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+
+  private[feature] def getLengthsFromFirst(dataset: Dataset[_],
+   columns: Seq[String]): 
Map[String, Int] = {
+try {
+  val first_row = dataset.toDF.select(columns.map(col): _*).first
--- End diff --

first -> first()


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175908930
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
--- End diff --

No need to compare with anything; just call assemble()


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175914154
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == 
"y" -> 2))
+
intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"),
 Seq("y")))
+intercept[NoSuchElementException](
+  VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y")))
+
+assert(VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == 
"y" -> 2))
+intercept[NullPointerException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID))
+intercept[RuntimeException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID))
+  }
+
+  test("Handle Invalid should behave properly") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
--- End diff --

Also, if there are "trash" columns not used by VectorAssembler, maybe name 
them as such and add a few null values in them for better testing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175913755
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == 
"y" -> 2))
+
intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"),
 Seq("y")))
+intercept[NoSuchElementException](
+  VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y")))
+
+assert(VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == 
"y" -> 2))
+intercept[NullPointerException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID))
+intercept[RuntimeException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID))
+  }
+
+  test("Handle Invalid should behave properly") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
--- End diff --

Since this is shared across multiple tests, just make it a shared value.  
See e.g. 
https://github.com/apache/spark/blob/master/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala#L55


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175915187
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == 
"y" -> 2))
+
intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"),
 Seq("y")))
+intercept[NoSuchElementException](
+  VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y")))
+
+assert(VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == 
"y" -> 2))
+intercept[NullPointerException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID))
+intercept[RuntimeException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID))
+  }
+
+  test("Handle Invalid should behave properly") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+
+val assembler = new VectorAssembler()
+  .setInputCols(Array("x", "y", "z", "n"))
+  .setOutputCol("features")
+
+// behavior when first row has information
+assert(assembler.setHandleInvalid("skip").transform(df).count() == 1)
+
intercept[RuntimeException](assembler.setHandleInvalid("keep").transform(df).collect())
+
intercept[SparkException](assembler.setHandleInvalid("error").transform(df).collect())
+
+// numeric column is all null
--- End diff --

Did you want to test:
* extraction of metadata from the first row (which is what this is testing, 
I believe), or
* transformation on an all-null column (which this never reaches)?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175908248
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -49,32 +53,57 @@ class VectorAssembler @Since("1.4.0") (@Since("1.4.0") 
override val uid: String)
   @Since("1.4.0")
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
+  /** @group setParam */
+  @Since("2.4.0")
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
+
+  /**
+   * Param for how to handle invalid data (NULL values). Options are 
'skip' (filter out rows with
--- End diff --

It would be good to expand this doc to explain the behavior: how various 
types of invalid values are treated (null, NaN, incorrect Vector length) and 
how computationally expensive different options can be.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175914695
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == 
"y" -> 2))
+
intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"),
 Seq("y")))
+intercept[NoSuchElementException](
+  VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y")))
+
+assert(VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == 
"y" -> 2))
+intercept[NullPointerException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID))
+intercept[RuntimeException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID))
+  }
+
+  test("Handle Invalid should behave properly") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+
+val assembler = new VectorAssembler()
+  .setInputCols(Array("x", "y", "z", "n"))
+  .setOutputCol("features")
+
+// behavior when first row has information
+assert(assembler.setHandleInvalid("skip").transform(df).count() == 1)
+
intercept[RuntimeException](assembler.setHandleInvalid("keep").transform(df).collect())
--- End diff --

Should this fail?  I thought it should pad with NaNs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175915257
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+assert(VectorAssembler.getLengthsFromFirst(df, Seq("y")).exists(_ == 
"y" -> 2))
+
intercept[NullPointerException](VectorAssembler.getLengthsFromFirst(df.sort("id2"),
 Seq("y")))
+intercept[NoSuchElementException](
+  VectorAssembler.getLengthsFromFirst(df.filter("id1 > 4"), Seq("y")))
+
+assert(VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.SKIP_INVALID).exists(_ == 
"y" -> 2))
+intercept[NullPointerException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.ERROR_INVALID))
+intercept[RuntimeException](VectorAssembler.getLengths(
+  df.sort("id2"), Seq("y"), VectorAssembler.KEEP_INVALID))
+  }
+
+  test("Handle Invalid should behave properly") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
+  (1, 2, 0.0, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 7L),
+  (2, 1, 0.0, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 6L),
+  (3, 3, null, Vectors.dense(1.0, 2.0), "a", Vectors.sparse(2, 
Array(1), Array(3.0)), 8L),
+  (4, 4, null, null, "a", Vectors.sparse(2, Array(1), Array(3.0)), 9L)
+).toDF("id1", "id2", "x", "y", "name", "z", "n")
+
+val assembler = new VectorAssembler()
+  .setInputCols(Array("x", "y", "z", "n"))
+  .setOutputCol("features")
+
+// behavior when first row has information
+assert(assembler.setHandleInvalid("skip").transform(df).count() == 1)
+
intercept[RuntimeException](assembler.setHandleInvalid("keep").transform(df).collect())
+
intercept[SparkException](assembler.setHandleInvalid("error").transform(df).collect())
+
+// numeric column is all null
+intercept[RuntimeException](
+  
assembler.setHandleInvalid("keep").transform(df.filter("id1==3")).count() == 1)
+
+// vector column is all null
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175909880
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+intercept[SparkException](assemble(Seq(1, 1), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN))
+intercept[SparkException](assemble(Seq(1, 2), false)(1.0, null) ===
+  Vectors.dense(1.0, Double.NaN, Double.NaN))
+intercept[SparkException](assemble(Seq(1), false)(null) === 
Vectors.dense(Double.NaN))
+intercept[SparkException](assemble(Seq(2), false)(null) ===
+  Vectors.dense(Double.NaN, Double.NaN))
+  }
+
+  test("get lengths function") {
+val df = Seq[(Long, Long, java.lang.Double, Vector, String, Vector, 
Long)](
--- End diff --

Why java.lang.Double instead of Double?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175911314
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+
+  private[feature] def getLengthsFromFirst(dataset: Dataset[_],
+   columns: Seq[String]): 
Map[String, Int] = {
+try {
+  val first_row = dataset.toDF.select(columns.map(col): _*).first
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+"Saw null value on the first row: " + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+"Cannot infer vector size from all empty DataFrame" + e.toString)
+}
+  }
+
+  private[feature] def getLengths(dataset: Dataset[_], columns: 
Seq[String],
+  handleInvalid: String) = {
+val group_sizes = columns.map { c =>
+  c -> AttributeGroup.fromStructField(dataset.schema(c)).size
+}.toMap
+val missing_columns: Seq[String] = group_sizes.filter(_._2 == 
-1).keys.toSeq
+val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, 
handleInvalid) match {
+  case (true, VectorAssembler.ERROR_INVALID) =>
+getLengthsFromFirst(dataset, missing_columns)
+  case (true, VectorAssembler.SKIP_INVALID) =>
+getLengthsFromFirst(dataset.na.drop, missing_columns)
--- End diff --

This will drop Rows with NA values in extraneous columns.  I.e., even if 
the VectorAssembler is only assembling columns A and B, if there is a NAN in 
column C, this will drop that row.  Pass the list of columns you care about to 
the drop() method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175910352
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+
+  private[feature] def getLengthsFromFirst(dataset: Dataset[_],
+   columns: Seq[String]): 
Map[String, Int] = {
+try {
+  val first_row = dataset.toDF.select(columns.map(col): _*).first
--- End diff --

Fix (new) IntelliJ style warnings: "toDF" -> "toDF()"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175910188
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+
+  private[feature] def getLengthsFromFirst(dataset: Dataset[_],
--- End diff --

scala style: For multiline class and method headers, put the first argument 
on the next line, with +4 space indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175906193
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala ---
@@ -136,34 +172,88 @@ class VectorAssembler @Since("1.4.0") 
(@Since("1.4.0") override val uid: String)
 @Since("1.6.0")
 object VectorAssembler extends DefaultParamsReadable[VectorAssembler] {
 
+  private[feature] val SKIP_INVALID: String = "skip"
+  private[feature] val ERROR_INVALID: String = "error"
+  private[feature] val KEEP_INVALID: String = "keep"
+  private[feature] val supportedHandleInvalids: Array[String] =
+Array(SKIP_INVALID, ERROR_INVALID, KEEP_INVALID)
+
+
+  private[feature] def getLengthsFromFirst(dataset: Dataset[_],
+   columns: Seq[String]): 
Map[String, Int] = {
+try {
+  val first_row = dataset.toDF.select(columns.map(col): _*).first
+  columns.zip(first_row.toSeq).map {
+case (c, x) => c -> x.asInstanceOf[Vector].size
+  }.toMap
+} catch {
+  case e: NullPointerException => throw new NullPointerException(
+"Saw null value on the first row: " + e.toString)
+  case e: NoSuchElementException => throw new NoSuchElementException(
+"Cannot infer vector size from all empty DataFrame" + e.toString)
+}
+  }
+
+  private[feature] def getLengths(dataset: Dataset[_], columns: 
Seq[String],
+  handleInvalid: String) = {
+val group_sizes = columns.map { c =>
+  c -> AttributeGroup.fromStructField(dataset.schema(c)).size
+}.toMap
+val missing_columns: Seq[String] = group_sizes.filter(_._2 == 
-1).keys.toSeq
+val first_sizes: Map[String, Int] = (missing_columns.nonEmpty, 
handleInvalid) match {
+  case (true, VectorAssembler.ERROR_INVALID) =>
+getLengthsFromFirst(dataset, missing_columns)
+  case (true, VectorAssembler.SKIP_INVALID) =>
+getLengthsFromFirst(dataset.na.drop, missing_columns)
+  case (true, VectorAssembler.KEEP_INVALID) => throw new 
RuntimeException(
+"Consider using VectorSizeHint for columns: " + 
missing_columns.mkString("[", ",", "]"))
+  case (_, _) => Map.empty
+}
+group_sizes ++ first_sizes
+  }
+
+
   @Since("1.6.0")
   override def load(path: String): VectorAssembler = super.load(path)
 
-  private[feature] def assemble(vv: Any*): Vector = {
+  private[feature] def assemble(lengths: Seq[Int], keepInvalid: 
Boolean)(vv: Any*): Vector = {
--- End diff --

nit: Use Array[Int] for faster access


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20829: [SPARK-23690][ML] Add handleinvalid to VectorAsse...

2018-03-20 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20829#discussion_r175908774
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -147,4 +149,72 @@ class VectorAssemblerSuite
   .filter(vectorUDF($"features") > 1)
   .count() == 1)
   }
+
+  test("assemble should keep nulls") {
+import org.apache.spark.ml.feature.VectorAssembler.assemble
+assert(assemble(Seq(1, 1), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN))
+assert(assemble(Seq(1, 2), true)(1.0, null) === Vectors.dense(1.0, 
Double.NaN, Double.NaN))
+assert(assemble(Seq(1), true)(null) === Vectors.dense(Double.NaN))
+assert(assemble(Seq(2), true)(null) === Vectors.dense(Double.NaN, 
Double.NaN))
+  }
+
+  test("assemble should throw errors") {
--- End diff --

similarly: make more explicit: + " when keepInvalid = false"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20829: [SPARK-23690][ML] Add handleinvalid to VectorAssembler

2018-03-20 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/20829
  
@hootoconnor Please refrain from making non-constructive comments.  If you 
did not intend to leave the comment here, please remove it.  Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-21898][ML] Feature parity for KolmogorovSmirnovTest in MLlib

2018-03-20 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 5e7bc2ace -> 7f5e8aa26


[SPARK-21898][ML] Feature parity for KolmogorovSmirnovTest in MLlib

## What changes were proposed in this pull request?

Feature parity for KolmogorovSmirnovTest in MLlib.
Implement `DataFrame` interface for `KolmogorovSmirnovTest` in `mllib.stat`.

## How was this patch tested?

Test suite added.

Author: WeichenXu 
Author: jkbradley 

Closes #19108 from WeichenXu123/ml-ks-test.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f5e8aa2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f5e8aa2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f5e8aa2

Branch: refs/heads/master
Commit: 7f5e8aa2606b0ee0297ceb6f4603bd368e3b0291
Parents: 5e7bc2a
Author: WeichenXu 
Authored: Tue Mar 20 11:14:34 2018 -0700
Committer: Joseph K. Bradley 
Committed: Tue Mar 20 11:14:34 2018 -0700

--
 .../spark/ml/stat/KolmogorovSmirnovTest.scala   | 113 +++
 .../ml/stat/JavaKolmogorovSmirnovTestSuite.java |  84 +++
 .../ml/stat/KolmogorovSmirnovTestSuite.scala| 140 +++
 3 files changed, 337 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7f5e8aa2/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala 
b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala
new file mode 100644
index 000..8d80e77
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.stat
+
+import scala.annotation.varargs
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.api.java.function.Function
+import org.apache.spark.ml.util.SchemaUtils
+import org.apache.spark.mllib.stat.{Statistics => OldStatistics}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.functions.col
+
+/**
+ * :: Experimental ::
+ *
+ * Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a
+ * continuous distribution. By comparing the largest difference between the 
empirical cumulative
+ * distribution of the sample data and the theoretical distribution we can 
provide a test for the
+ * the null hypothesis that the sample data comes from that theoretical 
distribution.
+ * For more information on KS Test:
+ * @see https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test";>
+ * Kolmogorov-Smirnov test (Wikipedia)
+ */
+@Experimental
+@Since("2.4.0")
+object KolmogorovSmirnovTest {
+
+  /** Used to construct output schema of test */
+  private case class KolmogorovSmirnovTestResult(
+  pValue: Double,
+  statistic: Double)
+
+  private def getSampleRDD(dataset: DataFrame, sampleCol: String): RDD[Double] 
= {
+SchemaUtils.checkNumericType(dataset.schema, sampleCol)
+import dataset.sparkSession.implicits._
+dataset.select(col(sampleCol).cast("double")).as[Double].rdd
+  }
+
+  /**
+   * Conduct the two-sided Kolmogorov-Smirnov (KS) test for data sampled from a
+   * continuous distribution. By comparing the largest difference between the 
empirical cumulative
+   * distribution of the sample data and the theoretical distribution we can 
provide a test for the
+   * the null hypothesis that the sample data comes from that theoretical 
distribution.
+   *
+   * @param dataset a `DataFrame` containing the sample of data to test
+   * @param sampleCol Name of sample column in dataset, of any numerical type
+   * @param cdf a `Double => Double` function to calculate the theoretical CDF 
at a given value
+   * @return DataFrame containing the test result for the input sampled data.
+   * This DataFrame will contain a single Row wit

[GitHub] spark issue #19108: [SPARK-21898][ML] Feature parity for KolmogorovSmirnovTe...

2018-03-20 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/19108
  
LGTM
Thanks for the PR!
Merging with master


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-23728][BRANCH-2.3] Fix ML tests with expected exceptions running streaming tests

2018-03-19 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 80e79430f -> 920493949


[SPARK-23728][BRANCH-2.3] Fix ML tests with expected exceptions running 
streaming tests

## What changes were proposed in this pull request?

The testTransformerByInterceptingException failed to catch the expected message 
on 2.3 during streaming tests as the feature generated message is not at the 
direct caused by exception but even one level deeper.

## How was this patch tested?

Running the unit tests.

Author: “attilapiros” 

Closes #20852 from attilapiros/SPARK-23728.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92049394
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92049394
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92049394

Branch: refs/heads/branch-2.3
Commit: 920493949eba77befd67e32f9e6ede5d594bcd56
Parents: 80e7943
Author: “attilapiros” 
Authored: Mon Mar 19 10:42:12 2018 -0700
Committer: Joseph K. Bradley 
Committed: Mon Mar 19 10:42:12 2018 -0700

--
 .../src/test/scala/org/apache/spark/ml/util/MLTest.scala  | 10 --
 1 file changed, 8 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/92049394/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala
--
diff --git a/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala 
b/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala
index 795fd0e..23e05ac 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala
@@ -119,9 +119,15 @@ trait MLTest extends StreamTest with TempDirectory { self: 
Suite =>
 expectedMessagePart : String,
 firstResultCol: String) {
 
+def hasExpectedMessageDirectly(exception: Throwable): Boolean =
+  exception.getMessage.contains(expectedMessagePart)
+
 def hasExpectedMessage(exception: Throwable): Boolean =
-  exception.getMessage.contains(expectedMessagePart) ||
-(exception.getCause != null && 
exception.getCause.getMessage.contains(expectedMessagePart))
+  hasExpectedMessageDirectly(exception) || (
+exception.getCause != null && (
+  hasExpectedMessageDirectly(exception.getCause) || (
+exception.getCause.getCause != null &&
+hasExpectedMessageDirectly(exception.getCause.getCause
 
 withClue(s"""Expected message part "${expectedMessagePart}" is not found 
in DF test.""") {
   val exceptionOnDf = intercept[Throwable] {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[GitHub] spark issue #20852: [SPARK-23728][BRANCH-2.3] Fix ML tests with expected exc...

2018-03-19 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/20852
  
Apologies for breaking it!

This LGTM

I'll go ahead and merge it to fix the build, but please comment further on 
this PR as needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20852: [SPARK-23728][BRANCH-2.3] Fix ML tests with expected exc...

2018-03-19 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/20852
  
Merging with branch-2.3


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20852: [SPARK-23728][BRANCH-2.3] Fix ML tests with expec...

2018-03-19 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20852#discussion_r175524288
  
--- Diff: mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala ---
@@ -119,9 +119,15 @@ trait MLTest extends StreamTest with TempDirectory { 
self: Suite =>
 expectedMessagePart : String,
 firstResultCol: String) {
 
+def hasExpectedMessageDirectly(exception: Throwable): Boolean =
+  exception.getMessage.contains(expectedMessagePart)
+
 def hasExpectedMessage(exception: Throwable): Boolean =
-  exception.getMessage.contains(expectedMessagePart) ||
-(exception.getCause != null && 
exception.getCause.getMessage.contains(expectedMessagePart))
+  hasExpectedMessageDirectly(exception) || (
+exception.getCause != null && (
+  hasExpectedMessageDirectly(exception.getCause) || (
+exception.getCause.getCause != null &&
+hasExpectedMessageDirectly(exception.getCause.getCause
--- End diff --

I don't think we need to loop further.  If the real message is buried this 
deep, that could be considered a problem in and of itself.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19108: [SPARK-21898][ML] Feature parity for KolmogorovSm...

2018-03-19 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/19108#discussion_r175521718
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.stat
+
+import scala.annotation.varargs
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.util.SchemaUtils
+import org.apache.spark.mllib.stat.{Statistics => OldStatistics}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.functions.col
+
+/**
+ * :: Experimental ::
+ *
+ * Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled 
from a
+ * continuous distribution. By comparing the largest difference between 
the empirical cumulative
+ * distribution of the sample data and the theoretical distribution we can 
provide a test for the
+ * the null hypothesis that the sample data comes from that theoretical 
distribution.
+ * For more information on KS Test:
+ * @see https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test";>
+ * Kolmogorov-Smirnov test (Wikipedia)
+ */
+@Experimental
+@Since("2.4.0")
+object KolmogorovSmirnovTest {
+
+  /** Used to construct output schema of test */
+  private case class KolmogorovSmirnovTestResult(
+  pValues: Double,
--- End diff --

pValues -> pValue


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19108: [SPARK-21898][ML] Feature parity for KolmogorovSm...

2018-03-19 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/19108#discussion_r175521761
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.stat
+
+import scala.annotation.varargs
+
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.util.SchemaUtils
+import org.apache.spark.mllib.stat.{Statistics => OldStatistics}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.functions.col
+
+/**
+ * :: Experimental ::
+ *
+ * Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled 
from a
+ * continuous distribution. By comparing the largest difference between 
the empirical cumulative
+ * distribution of the sample data and the theoretical distribution we can 
provide a test for the
+ * the null hypothesis that the sample data comes from that theoretical 
distribution.
+ * For more information on KS Test:
+ * @see https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test";>
+ * Kolmogorov-Smirnov test (Wikipedia)
+ */
+@Experimental
+@Since("2.4.0")
+object KolmogorovSmirnovTest {
+
+  /** Used to construct output schema of test */
+  private case class KolmogorovSmirnovTestResult(
+  pValues: Double,
+  statistics: Double)
--- End diff --

statistics -> statistic


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19108: [SPARK-21898][ML] Feature parity for KolmogorovSm...

2018-03-19 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/19108#discussion_r174977495
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTestSuite.scala 
---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.stat
+
+import org.apache.commons.math3.distribution.{ExponentialDistribution, 
NormalDistribution}
+import org.apache.commons.math3.stat.inference.{KolmogorovSmirnovTest => 
Math3KSTest}
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.ml.util.DefaultReadWriteTest
+import org.apache.spark.ml.util.TestingUtils._
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.sql.Row
+
+class KolmogorovSmirnovTestSuite
+  extends SparkFunSuite with MLlibTestSparkContext with 
DefaultReadWriteTest {
+
+  import testImplicits._
+
+  test("1 sample Kolmogorov-Smirnov test: apache commons math3 
implementation equivalence") {
+// Create theoretical distributions
+val stdNormalDist = new NormalDistribution(0, 1)
+val expDist = new ExponentialDistribution(0.6)
+
+// set seeds
+val seed = 10L
+stdNormalDist.reseedRandomGenerator(seed)
+expDist.reseedRandomGenerator(seed)
+
+// Sample data from the distributions and parallelize it
+val n = 10
+val sampledNormArray = stdNormalDist.sample(n)
+val sampledNormDF = sc.parallelize(sampledNormArray, 10).toDF("sample")
+val sampledExpArray = expDist.sample(n)
+val sampledExpDF = sc.parallelize(sampledExpArray, 10).toDF("sample")
+
+// Use a apache math commons local KS test to verify calculations
+val ksTest = new Math3KSTest()
+val pThreshold = 0.05
+
+// Comparing a standard normal sample to a standard normal distribution
--- End diff --

Can this and the next set of code lines be combined into a single helper 
method?  That could help with adding in the test for the uniform distribution 
as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19108: [SPARK-21898][ML] Feature parity for KolmogorovSm...

2018-03-19 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/19108#discussion_r174977489
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTestSuite.scala 
---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.stat
+
+import org.apache.commons.math3.distribution.{ExponentialDistribution, 
NormalDistribution}
+import org.apache.commons.math3.stat.inference.{KolmogorovSmirnovTest => 
Math3KSTest}
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.ml.util.DefaultReadWriteTest
+import org.apache.spark.ml.util.TestingUtils._
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.sql.Row
+
+class KolmogorovSmirnovTestSuite
+  extends SparkFunSuite with MLlibTestSparkContext with 
DefaultReadWriteTest {
+
+  import testImplicits._
+
+  test("1 sample Kolmogorov-Smirnov test: apache commons math3 
implementation equivalence") {
+// Create theoretical distributions
+val stdNormalDist = new NormalDistribution(0, 1)
+val expDist = new ExponentialDistribution(0.6)
+
--- End diff --

In the spark.mllib tests, it looks like they meant to test against the 
uniform distribution but forgot to.  Would you mind adding that test?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20786: [SPARK-14681][ML] Provide label/impurity stats fo...

2018-03-15 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20786#discussion_r174921341
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala ---
@@ -84,35 +86,73 @@ private[ml] object Node {
   /**
* Create a new Node from the old Node format, recursively creating 
child nodes as needed.
*/
-  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int]): Node 
= {
+  def fromOld(oldNode: OldNode, categoricalFeatures: Map[Int, Int],
+isClassification: Boolean): Node = {
 if (oldNode.isLeaf) {
   // TODO: Once the implementation has been moved to this API, then 
include sufficient
   //   statistics here.
-  new LeafNode(prediction = oldNode.predict.predict,
-impurity = oldNode.impurity, impurityStats = null)
+  if (isClassification) {
+new ClassificationLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  } else {
+new RegressionLeafNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, impurityStats = null)
+  }
 } else {
   val gain = if (oldNode.stats.nonEmpty) {
 oldNode.stats.get.gain
   } else {
 0.0
   }
-  new InternalNode(prediction = oldNode.predict.predict, impurity = 
oldNode.impurity,
-gain = gain, leftChild = fromOld(oldNode.leftNode.get, 
categoricalFeatures),
-rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures),
-split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  if (isClassification) {
+new ClassificationInternalNode(prediction = 
oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
true)
+.asInstanceOf[ClassificationNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  } else {
+new RegressionInternalNode(prediction = oldNode.predict.predict,
+  impurity = oldNode.impurity, gain = gain,
+  leftChild = fromOld(oldNode.leftNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  rightChild = fromOld(oldNode.rightNode.get, categoricalFeatures, 
false)
+.asInstanceOf[RegressionNode],
+  split = Split.fromOld(oldNode.split.get, categoricalFeatures), 
impurityStats = null)
+  }
 }
   }
 }
 
-/**
- * Decision tree leaf node.
- * @param prediction  Prediction this node makes
- * @param impurity  Impurity measure at this node (for training data)
- */
-class LeafNode private[ml] (
--- End diff --

While it would be nice to have this be a trait instead of a class, I am 
worried about breaking public APIs.  However, one could argue that this isn't a 
public API since the constructor is private (though the class is public).  I'll 
CC people on https://issues.apache.org/jira/browse/SPARK-7131 where these 
classes were made public to get feedback.  Let's give a couple of days for 
feedback before proceeding.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19381: [SPARK-10884][ML] Support prediction on single in...

2018-03-15 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/19381#discussion_r174862550
  
--- Diff: mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala ---
@@ -109,4 +110,14 @@ trait MLTest extends StreamTest with TempDirectory { 
self: Suite =>
 testTransformerOnDF(dataframe, transformer, firstResultCol,
   otherResultCols: _*)(globalCheckFunction)
   }
+
+  def testPredictorModelSinglePrediction(model: PredictionModel[Vector, _],
--- End diff --

Thanks for the updates!  Just this comment: Can we please call this method 
testPredictionModelSinglePrediction to match the name of the class it is 
testing?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml.featur...

2018-03-14 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/20686
  
Merging to branch-2.3 too


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[1/2] spark git commit: [SPARK-22915][MLLIB] Streaming tests for spark.ml.feature, from N to Z

2018-03-14 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 1098933b0 -> 279b3db89


http://git-wip-us.apache.org/repos/asf/spark/blob/279b3db8/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
index 775a04d..df24367 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
@@ -17,17 +17,14 @@
 
 package org.apache.spark.ml.feature
 
-import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.ml.attribute.{Attribute, NominalAttribute}
 import org.apache.spark.ml.param.ParamsSuite
-import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
-import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.types.{DoubleType, StringType, StructField, 
StructType}
 
-class StringIndexerSuite
-  extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
+class StringIndexerSuite extends MLTest with DefaultReadWriteTest {
 
   import testImplicits._
 
@@ -46,19 +43,23 @@ class StringIndexerSuite
   .setInputCol("label")
   .setOutputCol("labelIndex")
 val indexerModel = indexer.fit(df)
-
 MLTestingUtils.checkCopyAndUids(indexer, indexerModel)
-
-val transformed = indexerModel.transform(df)
-val attr = Attribute.fromStructField(transformed.schema("labelIndex"))
-  .asInstanceOf[NominalAttribute]
-assert(attr.values.get === Array("a", "c", "b"))
-val output = transformed.select("id", "labelIndex").rdd.map { r =>
-  (r.getInt(0), r.getDouble(1))
-}.collect().toSet
 // a -> 0, b -> 2, c -> 1
-val expected = Set((0, 0.0), (1, 2.0), (2, 1.0), (3, 0.0), (4, 0.0), (5, 
1.0))
-assert(output === expected)
+val expected = Seq(
+  (0, 0.0),
+  (1, 2.0),
+  (2, 1.0),
+  (3, 0.0),
+  (4, 0.0),
+   (5, 1.0)
+).toDF("id", "labelIndex")
+
+testTransformerByGlobalCheckFunc[(Int, String)](df, indexerModel, "id", 
"labelIndex") { rows =>
+  val attr = Attribute.fromStructField(rows.head.schema("labelIndex"))
+.asInstanceOf[NominalAttribute]
+  assert(attr.values.get === Array("a", "c", "b"))
+  assert(rows.seq === expected.collect().toSeq)
+}
   }
 
   test("StringIndexerUnseen") {
@@ -70,36 +71,38 @@ class StringIndexerSuite
   .setInputCol("label")
   .setOutputCol("labelIndex")
   .fit(df)
+
 // Verify we throw by default with unseen values
-intercept[SparkException] {
-  indexer.transform(df2).collect()
-}
+testTransformerByInterceptingException[(Int, String)](
+  df2,
+  indexer,
+  "Unseen label:",
+  "labelIndex")
 
-indexer.setHandleInvalid("skip")
 // Verify that we skip the c record
-val transformedSkip = indexer.transform(df2)
-val attrSkip = 
Attribute.fromStructField(transformedSkip.schema("labelIndex"))
-  .asInstanceOf[NominalAttribute]
-assert(attrSkip.values.get === Array("b", "a"))
-val outputSkip = transformedSkip.select("id", "labelIndex").rdd.map { r =>
-  (r.getInt(0), r.getDouble(1))
-}.collect().toSet
 // a -> 1, b -> 0
-val expectedSkip = Set((0, 1.0), (1, 0.0))
-assert(outputSkip === expectedSkip)
+indexer.setHandleInvalid("skip")
+
+val expectedSkip = Seq((0, 1.0), (1, 0.0)).toDF()
+testTransformerByGlobalCheckFunc[(Int, String)](df2, indexer, "id", 
"labelIndex") { rows =>
+  val attrSkip = Attribute.fromStructField(rows.head.schema("labelIndex"))
+.asInstanceOf[NominalAttribute]
+  assert(attrSkip.values.get === Array("b", "a"))
+  assert(rows.seq === expectedSkip.collect().toSeq)
+}
 
 indexer.setHandleInvalid("keep")
-// Verify that we keep the unseen records
-val transformedKeep = indexer.transform(df2)
-val attrKeep = 
Attribute.fromStructField(transformedKeep.schema("labelIndex"))
-  .asInstanceOf[NominalAttribute]
-assert(attrKeep.values.get === Array("b", "a", "__unknown"))
-val outputKeep = transformedKeep.select("id", "labelIndex").rdd.map { r =>
-  (r.getInt(0), r.getDouble(1))
-}.collect().toSet
+
 // a -> 1, b -> 0, c -> 2, d -> 3
-val expectedKeep = Set((0, 1.0), (1, 0.0), (2, 2.0), (3, 2.0))
-assert(outputKeep === expectedKeep)
+val expectedKeep = Seq((0, 1.0), (1, 0.0), (2, 2.0), (3, 2.0)).toDF()
+
+// Verify that we keep the unseen records
+testTransformerByGlobalCheckFunc[(Int, String)](df2, indexer, "id", 
"labelIndex") { rows =>
+  val attrKeep = Attribute.fromStructField(rows.head.schema("labelIn

[2/2] spark git commit: [SPARK-22915][MLLIB] Streaming tests for spark.ml.feature, from N to Z

2018-03-14 Thread jkbradley
[SPARK-22915][MLLIB] Streaming tests for spark.ml.feature, from N to Z

# What changes were proposed in this pull request?

Adds structured streaming tests using testTransformer for these suites:

- NGramSuite
- NormalizerSuite
- OneHotEncoderEstimatorSuite
- OneHotEncoderSuite
- PCASuite
- PolynomialExpansionSuite
- QuantileDiscretizerSuite
- RFormulaSuite
- SQLTransformerSuite
- StandardScalerSuite
- StopWordsRemoverSuite
- StringIndexerSuite
- TokenizerSuite
- RegexTokenizerSuite
- VectorAssemblerSuite
- VectorIndexerSuite
- VectorSizeHintSuite
- VectorSlicerSuite
- Word2VecSuite

# How was this patch tested?

They are unit test.

Author: “attilapiros” 

Closes #20686 from attilapiros/SPARK-22915.

(cherry picked from commit 279b3db8970809104c30941254e57e3d62da5041)
Signed-off-by: Joseph K. Bradley 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0663b611
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0663b611
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0663b611

Branch: refs/heads/branch-2.3
Commit: 0663b61193b37094b9d00c7f2cbb0268ad946e25
Parents: f3efbfa
Author: “attilapiros” 
Authored: Wed Mar 14 18:36:31 2018 -0700
Committer: Joseph K. Bradley 
Committed: Wed Mar 14 18:36:41 2018 -0700

--
 .../apache/spark/ml/feature/NGramSuite.scala|  23 +--
 .../spark/ml/feature/NormalizerSuite.scala  |  57 ++
 .../feature/OneHotEncoderEstimatorSuite.scala   | 193 +-
 .../spark/ml/feature/OneHotEncoderSuite.scala   | 124 ++-
 .../org/apache/spark/ml/feature/PCASuite.scala  |  14 +-
 .../ml/feature/PolynomialExpansionSuite.scala   |  62 +++---
 .../ml/feature/QuantileDiscretizerSuite.scala   | 198 ++
 .../apache/spark/ml/feature/RFormulaSuite.scala | 158 +++---
 .../spark/ml/feature/SQLTransformerSuite.scala  |  35 ++--
 .../spark/ml/feature/StandardScalerSuite.scala  |  33 +--
 .../ml/feature/StopWordsRemoverSuite.scala  |  37 ++--
 .../spark/ml/feature/StringIndexerSuite.scala   | 204 ++-
 .../spark/ml/feature/TokenizerSuite.scala   |  30 +--
 .../spark/ml/feature/VectorIndexerSuite.scala   | 183 +
 .../spark/ml/feature/VectorSizeHintSuite.scala  |  88 +---
 .../spark/ml/feature/VectorSlicerSuite.scala|  27 +--
 .../apache/spark/ml/feature/Word2VecSuite.scala |  28 +--
 .../scala/org/apache/spark/ml/util/MLTest.scala |  33 ++-
 18 files changed, 809 insertions(+), 718 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0663b611/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala
--
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala
index d4975c0..e5956ee 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala
@@ -19,17 +19,15 @@ package org.apache.spark.ml.feature
 
 import scala.beans.BeanInfo
 
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.ml.util.DefaultReadWriteTest
-import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
+import org.apache.spark.sql.{DataFrame, Row}
+
 
 @BeanInfo
 case class NGramTestData(inputTokens: Array[String], wantedNGrams: 
Array[String])
 
-class NGramSuite extends SparkFunSuite with MLlibTestSparkContext with 
DefaultReadWriteTest {
+class NGramSuite extends MLTest with DefaultReadWriteTest {
 
-  import org.apache.spark.ml.feature.NGramSuite._
   import testImplicits._
 
   test("default behavior yields bigram features") {
@@ -83,16 +81,11 @@ class NGramSuite extends SparkFunSuite with 
MLlibTestSparkContext with DefaultRe
   .setN(3)
 testDefaultReadWrite(t)
   }
-}
-
-object NGramSuite extends SparkFunSuite {
 
-  def testNGram(t: NGram, dataset: Dataset[_]): Unit = {
-t.transform(dataset)
-  .select("nGrams", "wantedNGrams")
-  .collect()
-  .foreach { case Row(actualNGrams, wantedNGrams) =>
+  def testNGram(t: NGram, dataFrame: DataFrame): Unit = {
+testTransformer[(Seq[String], Seq[String])](dataFrame, t, "nGrams", 
"wantedNGrams") {
+  case Row(actualNGrams : Seq[String], wantedNGrams: Seq[String]) =>
 assert(actualNGrams === wantedNGrams)
-  }
+}
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0663b611/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/

[1/2] spark git commit: [SPARK-22915][MLLIB] Streaming tests for spark.ml.feature, from N to Z

2018-03-14 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 f3efbfa4b -> 0663b6119


http://git-wip-us.apache.org/repos/asf/spark/blob/0663b611/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
index 775a04d..df24367 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
@@ -17,17 +17,14 @@
 
 package org.apache.spark.ml.feature
 
-import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.ml.attribute.{Attribute, NominalAttribute}
 import org.apache.spark.ml.param.ParamsSuite
-import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
-import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.types.{DoubleType, StringType, StructField, 
StructType}
 
-class StringIndexerSuite
-  extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
+class StringIndexerSuite extends MLTest with DefaultReadWriteTest {
 
   import testImplicits._
 
@@ -46,19 +43,23 @@ class StringIndexerSuite
   .setInputCol("label")
   .setOutputCol("labelIndex")
 val indexerModel = indexer.fit(df)
-
 MLTestingUtils.checkCopyAndUids(indexer, indexerModel)
-
-val transformed = indexerModel.transform(df)
-val attr = Attribute.fromStructField(transformed.schema("labelIndex"))
-  .asInstanceOf[NominalAttribute]
-assert(attr.values.get === Array("a", "c", "b"))
-val output = transformed.select("id", "labelIndex").rdd.map { r =>
-  (r.getInt(0), r.getDouble(1))
-}.collect().toSet
 // a -> 0, b -> 2, c -> 1
-val expected = Set((0, 0.0), (1, 2.0), (2, 1.0), (3, 0.0), (4, 0.0), (5, 
1.0))
-assert(output === expected)
+val expected = Seq(
+  (0, 0.0),
+  (1, 2.0),
+  (2, 1.0),
+  (3, 0.0),
+  (4, 0.0),
+   (5, 1.0)
+).toDF("id", "labelIndex")
+
+testTransformerByGlobalCheckFunc[(Int, String)](df, indexerModel, "id", 
"labelIndex") { rows =>
+  val attr = Attribute.fromStructField(rows.head.schema("labelIndex"))
+.asInstanceOf[NominalAttribute]
+  assert(attr.values.get === Array("a", "c", "b"))
+  assert(rows.seq === expected.collect().toSeq)
+}
   }
 
   test("StringIndexerUnseen") {
@@ -70,36 +71,38 @@ class StringIndexerSuite
   .setInputCol("label")
   .setOutputCol("labelIndex")
   .fit(df)
+
 // Verify we throw by default with unseen values
-intercept[SparkException] {
-  indexer.transform(df2).collect()
-}
+testTransformerByInterceptingException[(Int, String)](
+  df2,
+  indexer,
+  "Unseen label:",
+  "labelIndex")
 
-indexer.setHandleInvalid("skip")
 // Verify that we skip the c record
-val transformedSkip = indexer.transform(df2)
-val attrSkip = 
Attribute.fromStructField(transformedSkip.schema("labelIndex"))
-  .asInstanceOf[NominalAttribute]
-assert(attrSkip.values.get === Array("b", "a"))
-val outputSkip = transformedSkip.select("id", "labelIndex").rdd.map { r =>
-  (r.getInt(0), r.getDouble(1))
-}.collect().toSet
 // a -> 1, b -> 0
-val expectedSkip = Set((0, 1.0), (1, 0.0))
-assert(outputSkip === expectedSkip)
+indexer.setHandleInvalid("skip")
+
+val expectedSkip = Seq((0, 1.0), (1, 0.0)).toDF()
+testTransformerByGlobalCheckFunc[(Int, String)](df2, indexer, "id", 
"labelIndex") { rows =>
+  val attrSkip = Attribute.fromStructField(rows.head.schema("labelIndex"))
+.asInstanceOf[NominalAttribute]
+  assert(attrSkip.values.get === Array("b", "a"))
+  assert(rows.seq === expectedSkip.collect().toSeq)
+}
 
 indexer.setHandleInvalid("keep")
-// Verify that we keep the unseen records
-val transformedKeep = indexer.transform(df2)
-val attrKeep = 
Attribute.fromStructField(transformedKeep.schema("labelIndex"))
-  .asInstanceOf[NominalAttribute]
-assert(attrKeep.values.get === Array("b", "a", "__unknown"))
-val outputKeep = transformedKeep.select("id", "labelIndex").rdd.map { r =>
-  (r.getInt(0), r.getDouble(1))
-}.collect().toSet
+
 // a -> 1, b -> 0, c -> 2, d -> 3
-val expectedKeep = Set((0, 1.0), (1, 0.0), (2, 2.0), (3, 2.0))
-assert(outputKeep === expectedKeep)
+val expectedKeep = Seq((0, 1.0), (1, 0.0), (2, 2.0), (3, 2.0)).toDF()
+
+// Verify that we keep the unseen records
+testTransformerByGlobalCheckFunc[(Int, String)](df2, indexer, "id", 
"labelIndex") { rows =>
+  val attrKeep = Attribute.fromStructField(rows.head.schema("lab

[2/2] spark git commit: [SPARK-22915][MLLIB] Streaming tests for spark.ml.feature, from N to Z

2018-03-14 Thread jkbradley
[SPARK-22915][MLLIB] Streaming tests for spark.ml.feature, from N to Z

# What changes were proposed in this pull request?

Adds structured streaming tests using testTransformer for these suites:

- NGramSuite
- NormalizerSuite
- OneHotEncoderEstimatorSuite
- OneHotEncoderSuite
- PCASuite
- PolynomialExpansionSuite
- QuantileDiscretizerSuite
- RFormulaSuite
- SQLTransformerSuite
- StandardScalerSuite
- StopWordsRemoverSuite
- StringIndexerSuite
- TokenizerSuite
- RegexTokenizerSuite
- VectorAssemblerSuite
- VectorIndexerSuite
- VectorSizeHintSuite
- VectorSlicerSuite
- Word2VecSuite

# How was this patch tested?

They are unit test.

Author: “attilapiros” 

Closes #20686 from attilapiros/SPARK-22915.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/279b3db8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/279b3db8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/279b3db8

Branch: refs/heads/master
Commit: 279b3db8970809104c30941254e57e3d62da5041
Parents: 1098933
Author: “attilapiros” 
Authored: Wed Mar 14 18:36:31 2018 -0700
Committer: Joseph K. Bradley 
Committed: Wed Mar 14 18:36:31 2018 -0700

--
 .../apache/spark/ml/feature/NGramSuite.scala|  23 +--
 .../spark/ml/feature/NormalizerSuite.scala  |  57 ++
 .../feature/OneHotEncoderEstimatorSuite.scala   | 193 +-
 .../spark/ml/feature/OneHotEncoderSuite.scala   | 124 ++-
 .../org/apache/spark/ml/feature/PCASuite.scala  |  14 +-
 .../ml/feature/PolynomialExpansionSuite.scala   |  62 +++---
 .../ml/feature/QuantileDiscretizerSuite.scala   | 198 ++
 .../apache/spark/ml/feature/RFormulaSuite.scala | 158 +++---
 .../spark/ml/feature/SQLTransformerSuite.scala  |  35 ++--
 .../spark/ml/feature/StandardScalerSuite.scala  |  33 +--
 .../ml/feature/StopWordsRemoverSuite.scala  |  37 ++--
 .../spark/ml/feature/StringIndexerSuite.scala   | 204 ++-
 .../spark/ml/feature/TokenizerSuite.scala   |  30 +--
 .../spark/ml/feature/VectorIndexerSuite.scala   | 183 +
 .../spark/ml/feature/VectorSizeHintSuite.scala  |  88 +---
 .../spark/ml/feature/VectorSlicerSuite.scala|  27 +--
 .../apache/spark/ml/feature/Word2VecSuite.scala |  28 +--
 .../scala/org/apache/spark/ml/util/MLTest.scala |  33 ++-
 18 files changed, 809 insertions(+), 718 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/279b3db8/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala
--
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala
index d4975c0..e5956ee 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala
@@ -19,17 +19,15 @@ package org.apache.spark.ml.feature
 
 import scala.beans.BeanInfo
 
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.ml.util.DefaultReadWriteTest
-import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
+import org.apache.spark.sql.{DataFrame, Row}
+
 
 @BeanInfo
 case class NGramTestData(inputTokens: Array[String], wantedNGrams: 
Array[String])
 
-class NGramSuite extends SparkFunSuite with MLlibTestSparkContext with 
DefaultReadWriteTest {
+class NGramSuite extends MLTest with DefaultReadWriteTest {
 
-  import org.apache.spark.ml.feature.NGramSuite._
   import testImplicits._
 
   test("default behavior yields bigram features") {
@@ -83,16 +81,11 @@ class NGramSuite extends SparkFunSuite with 
MLlibTestSparkContext with DefaultRe
   .setN(3)
 testDefaultReadWrite(t)
   }
-}
-
-object NGramSuite extends SparkFunSuite {
 
-  def testNGram(t: NGram, dataset: Dataset[_]): Unit = {
-t.transform(dataset)
-  .select("nGrams", "wantedNGrams")
-  .collect()
-  .foreach { case Row(actualNGrams, wantedNGrams) =>
+  def testNGram(t: NGram, dataFrame: DataFrame): Unit = {
+testTransformer[(Seq[String], Seq[String])](dataFrame, t, "nGrams", 
"wantedNGrams") {
+  case Row(actualNGrams : Seq[String], wantedNGrams: Seq[String]) =>
 assert(actualNGrams === wantedNGrams)
-  }
+}
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/279b3db8/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala
index c75027f..eff57f1 100644
--- a/mllib/src/test/scala/org/apache/spark/m

[GitHub] spark issue #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml.featur...

2018-03-14 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/20686
  
Thanks for the updates & all the work this PR took @attilapiros and for the 
review @WeichenXu123 !
LGTM
Merging with master


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-14 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r174656617
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorSlicerSuite.scala ---
@@ -84,26 +84,29 @@ class VectorSlicerSuite extends SparkFunSuite with 
MLlibTestSparkContext with De
 
 val vectorSlicer = new 
VectorSlicer().setInputCol("features").setOutputCol("result")
 
-def validateResults(df: DataFrame): Unit = {
-  df.select("result", "expected").collect().foreach { case Row(vec1: 
Vector, vec2: Vector) =>
+def validateResults(rows: Seq[Row]): Unit = {
+  rows.foreach { case Row(vec1: Vector, vec2: Vector) =>
 assert(vec1 === vec2)
   }
-  val resultMetadata = 
AttributeGroup.fromStructField(df.schema("result"))
-  val expectedMetadata = 
AttributeGroup.fromStructField(df.schema("expected"))
+  val resultMetadata = 
AttributeGroup.fromStructField(rows.head.schema("result"))
+  val expectedMetadata = 
AttributeGroup.fromStructField(rows.head.schema("expected"))
   assert(resultMetadata.numAttributes === 
expectedMetadata.numAttributes)
   
resultMetadata.attributes.get.zip(expectedMetadata.attributes.get).foreach { 
case (a, b) =>
 assert(a === b)
   }
 }
 
 vectorSlicer.setIndices(Array(1, 4)).setNames(Array.empty)
-validateResults(vectorSlicer.transform(df))
+testTransformerByGlobalCheckFunc[(Vector, Vector)](df, vectorSlicer, 
"result", "expected")(
--- End diff --

I see, makes sense.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-14 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r174535931
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala ---
@@ -299,18 +310,17 @@ class StringIndexerSuite
   .setInputCol("label")
   .setOutputCol("labelIndex")
 
-val expected = Seq(Set((0, 0.0), (1, 0.0), (2, 2.0), (3, 1.0), (4, 
1.0), (5, 0.0)),
-  Set((0, 2.0), (1, 2.0), (2, 0.0), (3, 1.0), (4, 1.0), (5, 2.0)),
-  Set((0, 1.0), (1, 1.0), (2, 0.0), (3, 2.0), (4, 2.0), (5, 1.0)),
-  Set((0, 1.0), (1, 1.0), (2, 2.0), (3, 0.0), (4, 0.0), (5, 1.0)))
+val expected = Seq(Seq((0, 0.0), (1, 0.0), (2, 2.0), (3, 1.0), (4, 
1.0), (5, 0.0)),
--- End diff --

I agree that's correct.  The problem is that people tend to see these 
patterns and copy them without thinking.  It's best to follow patterns which 
help other contributors to avoid making mistakes.
I'm OK with leaving it since this issue is scattered throughout MLlib tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-12 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173594072
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala ---
@@ -299,18 +310,17 @@ class StringIndexerSuite
   .setInputCol("label")
   .setOutputCol("labelIndex")
 
-val expected = Seq(Set((0, 0.0), (1, 0.0), (2, 2.0), (3, 1.0), (4, 
1.0), (5, 0.0)),
-  Set((0, 2.0), (1, 2.0), (2, 0.0), (3, 1.0), (4, 1.0), (5, 2.0)),
-  Set((0, 1.0), (1, 1.0), (2, 0.0), (3, 2.0), (4, 2.0), (5, 1.0)),
-  Set((0, 1.0), (1, 1.0), (2, 2.0), (3, 0.0), (4, 0.0), (5, 1.0)))
+val expected = Seq(Seq((0, 0.0), (1, 0.0), (2, 2.0), (3, 1.0), (4, 
1.0), (5, 0.0)),
--- End diff --

I'd keep using Set.  The point is to be row-order-agnostic since DataFrames 
do not guarantee stable row ordering in general.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-12 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173592896
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala ---
@@ -109,16 +111,14 @@ class StringIndexerSuite
   .setInputCol("label")
   .setOutputCol("labelIndex")
   .fit(df)
-val transformed = indexer.transform(df)
-val attr = Attribute.fromStructField(transformed.schema("labelIndex"))
-  .asInstanceOf[NominalAttribute]
-assert(attr.values.get === Array("100", "300", "200"))
-val output = transformed.select("id", "labelIndex").rdd.map { r =>
-  (r.getInt(0), r.getDouble(1))
-}.collect().toSet
-// 100 -> 0, 200 -> 2, 300 -> 1
-val expected = Set((0, 0.0), (1, 2.0), (2, 1.0), (3, 0.0), (4, 0.0), 
(5, 1.0))
-assert(output === expected)
+testTransformerByGlobalCheckFunc[(Int, String)](df, indexer, "id", 
"labelIndex") { rows =>
+  val attr = Attribute.fromStructField(rows.head.schema("labelIndex"))
+.asInstanceOf[NominalAttribute]
+  assert(attr.values.get === Array("100", "300", "200"))
+  // 100 -> 0, 200 -> 2, 300 -> 1
+  val expected = Seq((0, 0.0), (1, 2.0), (2, 1.0), (3, 0.0), (4, 0.0), 
(5, 1.0)).toDF()
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-12 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173600685
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala ---
@@ -128,18 +126,29 @@ class VectorIndexerSuite extends SparkFunSuite with 
MLlibTestSparkContext
 
 MLTestingUtils.checkCopyAndUids(vectorIndexer, model)
 
-model.transform(densePoints1) // should work
-model.transform(sparsePoints1) // should work
+// should work
+testTransformer[FeatureData](densePoints1, model, "indexed") { _ => }
+// should work
+testTransformer[FeatureData](sparsePoints1, model, "indexed") { _ => }
+
 // If the data is local Dataset, it throws AssertionError directly.
-intercept[AssertionError] {
-  model.transform(densePoints2).collect()
-  logInfo("Did not throw error when fit, transform were called on 
vectors of different lengths")
+withClue("Did not found expected error message when fit, " +
+  "transform were called on vectors of different lengths") {
+  testTransformerByInterceptingException[FeatureData](
+densePoints2,
+model,
+"VectorIndexerModel expected vector of length 3 but found length 
4",
+"indexed")
 }
 // If the data is distributed Dataset, it throws SparkException
 // which is the wrapper of AssertionError.
-intercept[SparkException] {
-  model.transform(densePoints2.repartition(2)).collect()
-  logInfo("Did not throw error when fit, transform were called on 
vectors of different lengths")
+withClue("Did not found expected error message when fit, " +
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-12 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173593795
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala ---
@@ -247,14 +253,18 @@ class StringIndexerSuite
   .setInputCol("label")
   .setOutputCol("labelIndex")
   .fit(df)
-val transformed = indexer.transform(df)
+val expected1 = Seq(0.0, 2.0, 1.0, 0.0, 0.0, 
1.0).map(Tuple1(_)).toDF("labelIndex")
--- End diff --

Not needed; this isn't what this unit test is testing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-12 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173600331
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala ---
@@ -86,16 +94,19 @@ class RFormulaSuite extends MLTest with 
DefaultReadWriteTest {
 }
   }
 
-  test("label column already exists but is not numeric type") {
+  ignore("label column already exists but is not numeric type") {
--- End diff --

testTransformerByInterceptingException should use ```(Int, Boolean)``` (not 
Double)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-12 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173600416
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -58,14 +57,16 @@ class VectorAssemblerSuite
 assert(v2.isInstanceOf[DenseVector])
   }
 
-  test("VectorAssembler") {
+  ignore("VectorAssembler") {
--- End diff --

Don't ignore; just test on batch.  This case is solved with VectorSizeHint.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-12 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173584784
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala 
---
@@ -324,19 +352,24 @@ class QuantileDiscretizerSuite
   .setStages(Array(discretizerForCol1, discretizerForCol2, 
discretizerForCol3))
   .fit(df)
 
-val resultForMultiCols = plForMultiCols.transform(df)
-  .select("result1", "result2", "result3")
-  .collect()
+val expected = plForSingleCol.transform(df).select("result1", 
"result2", "result3").collect()
 
-val resultForSingleCol = plForSingleCol.transform(df)
-  .select("result1", "result2", "result3")
-  .collect()
+testTransformerByGlobalCheckFunc[(Double, Double, Double)](
+  df,
+  plForMultiCols,
+  "result1",
+  "result2",
+  "result3") { rows =>
+assert(rows === expected)
+  }
 
-resultForSingleCol.zip(resultForMultiCols).foreach {
-  case (rowForSingle, rowForMultiCols) =>
-assert(rowForSingle.getDouble(0) == rowForMultiCols.getDouble(0) &&
-  rowForSingle.getDouble(1) == rowForMultiCols.getDouble(1) &&
-  rowForSingle.getDouble(2) == rowForMultiCols.getDouble(2))
+testTransformerByGlobalCheckFunc[(Double, Double, Double)](
--- End diff --

I'd remove this.  Testing vs. multiCol is already testing batch vs 
streaming.  No need to test singleCol against itself.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-12 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173582122
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala ---
@@ -90,23 +96,29 @@ class OneHotEncoderSuite
 val encoder = new OneHotEncoder()
   .setInputCol("size")
   .setOutputCol("encoded")
-val output = encoder.transform(df)
-val group = AttributeGroup.fromStructField(output.schema("encoded"))
-assert(group.size === 2)
-assert(group.getAttr(0) === 
BinaryAttribute.defaultAttr.withName("small").withIndex(0))
-assert(group.getAttr(1) === 
BinaryAttribute.defaultAttr.withName("medium").withIndex(1))
+testTransformerByGlobalCheckFunc[(Double)](df, encoder, "encoded") { 
rows =>
+  val group = 
AttributeGroup.fromStructField(rows.head.schema("encoded"))
+  assert(group.size === 2)
+  assert(group.getAttr(0) === 
BinaryAttribute.defaultAttr.withName("small").withIndex(0))
+  assert(group.getAttr(1) === 
BinaryAttribute.defaultAttr.withName("medium").withIndex(1))
+}
   }
 
-  test("input column without ML attribute") {
+
+  ignore("input column without ML attribute") {
--- End diff --

Let's keep the test but limit it to batch. People should switch to 
OneHotEncoderEstimator anyways.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-12 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173593049
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala ---
@@ -109,16 +111,14 @@ class StringIndexerSuite
   .setInputCol("label")
   .setOutputCol("labelIndex")
   .fit(df)
-val transformed = indexer.transform(df)
-val attr = Attribute.fromStructField(transformed.schema("labelIndex"))
-  .asInstanceOf[NominalAttribute]
-assert(attr.values.get === Array("100", "300", "200"))
-val output = transformed.select("id", "labelIndex").rdd.map { r =>
-  (r.getInt(0), r.getDouble(1))
-}.collect().toSet
-// 100 -> 0, 200 -> 2, 300 -> 1
-val expected = Set((0, 0.0), (1, 2.0), (2, 1.0), (3, 0.0), (4, 0.0), 
(5, 1.0))
-assert(output === expected)
+testTransformerByGlobalCheckFunc[(Int, String)](df, indexer, "id", 
"labelIndex") { rows =>
+  val attr = Attribute.fromStructField(rows.head.schema("labelIndex"))
+.asInstanceOf[NominalAttribute]
+  assert(attr.values.get === Array("100", "300", "200"))
+  // 100 -> 0, 200 -> 2, 300 -> 1
+  val expected = Seq((0, 0.0), (1, 2.0), (2, 1.0), (3, 0.0), (4, 0.0), 
(5, 1.0)).toDF()
--- End diff --

And elsewhere below


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-12 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173600642
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala ---
@@ -128,18 +126,29 @@ class VectorIndexerSuite extends SparkFunSuite with 
MLlibTestSparkContext
 
 MLTestingUtils.checkCopyAndUids(vectorIndexer, model)
 
-model.transform(densePoints1) // should work
-model.transform(sparsePoints1) // should work
+// should work
+testTransformer[FeatureData](densePoints1, model, "indexed") { _ => }
+// should work
+testTransformer[FeatureData](sparsePoints1, model, "indexed") { _ => }
+
 // If the data is local Dataset, it throws AssertionError directly.
-intercept[AssertionError] {
-  model.transform(densePoints2).collect()
-  logInfo("Did not throw error when fit, transform were called on 
vectors of different lengths")
+withClue("Did not found expected error message when fit, " +
--- End diff --

Or just use the original text


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-12 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173587524
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala ---
@@ -313,13 +306,14 @@ class RFormulaSuite extends MLTest with 
DefaultReadWriteTest {
   Seq(("male", "foo", 4), ("female", "bar", 4), ("female", "bar", 5), 
("male", "baz", 5))
 .toDF("id", "a", "b")
 val model = formula.fit(original)
+val attr = NominalAttribute.defaultAttr
 val expected = Seq(
 ("male", "foo", 4, Vectors.dense(0.0, 1.0, 4.0), 1.0),
 ("female", "bar", 4, Vectors.dense(1.0, 0.0, 4.0), 0.0),
 ("female", "bar", 5, Vectors.dense(1.0, 0.0, 5.0), 0.0),
 ("male", "baz", 5, Vectors.dense(0.0, 0.0, 5.0), 1.0)
 ).toDF("id", "a", "b", "features", "label")
-// assert(result.schema.toString == resultSchema.toString)
+  .select($"id", $"a", $"b", $"features", $"label".as("label", 
attr.toMetadata()))
--- End diff --

I'd just keep what you have now.  There isn't a great solution here, and 
what you have fits other code examples in MLlib.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-12 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173592635
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala ---
@@ -70,36 +71,37 @@ class StringIndexerSuite
   .setInputCol("label")
   .setOutputCol("labelIndex")
   .fit(df)
+
 // Verify we throw by default with unseen values
-intercept[SparkException] {
-  indexer.transform(df2).collect()
-}
+testTransformerByInterceptingException[(Int, String)](
+  df2,
+  indexer,
+  "Unseen label:",
+  "labelIndex")
 
 indexer.setHandleInvalid("skip")
-// Verify that we skip the c record
-val transformedSkip = indexer.transform(df2)
-val attrSkip = 
Attribute.fromStructField(transformedSkip.schema("labelIndex"))
-  .asInstanceOf[NominalAttribute]
-assert(attrSkip.values.get === Array("b", "a"))
-val outputSkip = transformedSkip.select("id", "labelIndex").rdd.map { 
r =>
-  (r.getInt(0), r.getDouble(1))
-}.collect().toSet
-// a -> 1, b -> 0
-val expectedSkip = Set((0, 1.0), (1, 0.0))
-assert(outputSkip === expectedSkip)
+
+testTransformerByGlobalCheckFunc[(Int, String)](df2, indexer, "id", 
"labelIndex") { rows =>
+  val attrSkip = 
Attribute.fromStructField(rows.head.schema("labelIndex"))
+.asInstanceOf[NominalAttribute]
+  assert(attrSkip.values.get === Array("b", "a"))
+  // Verify that we skip the c record
+  // a -> 1, b -> 0
+  val expectedSkip = Seq((0, 1.0), (1, 0.0)).toDF()
--- End diff --

This can be moved outside of the testTransformerByGlobalCheckFunc method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-12 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173594378
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala ---
@@ -328,7 +338,12 @@ class StringIndexerSuite
   .setOutputCol("CITYIndexed")
   .fit(dfNoBristol)
 
-val dfWithIndex = model.transform(dfNoBristol)
-assert(dfWithIndex.filter($"CITYIndexed" === 1.0).count == 1)
+testTransformerByGlobalCheckFunc[(String, String, String)](
+  dfNoBristol,
+  model,
+  "CITYIndexed") { rows =>
+  val transformed = rows.map { r => r.getDouble(0) 
}.toDF("CITYIndexed")
--- End diff --

It's probably easier to avoid going through a DataFrame here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-12 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173600463
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala ---
@@ -76,16 +77,18 @@ class VectorAssemblerSuite
 val assembler = new VectorAssembler()
   .setInputCols(Array("a", "b", "c"))
   .setOutputCol("features")
-val thrown = intercept[IllegalArgumentException] {
-  assembler.transform(df)
-}
-assert(thrown.getMessage contains
+testTransformerByInterceptingException[(String, String, String)](
+  df,
+  assembler,
   "Data type StringType of column a is not supported.\n" +
   "Data type StringType of column b is not supported.\n" +
-  "Data type StringType of column c is not supported.")
+  "Data type StringType of column c is not supported.",
+  "features")
   }
 
-  test("ML attributes") {
+  ignore("ML attributes") {
--- End diff --

ditto: do not ignore


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-12 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173876598
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorSlicerSuite.scala ---
@@ -84,26 +84,29 @@ class VectorSlicerSuite extends SparkFunSuite with 
MLlibTestSparkContext with De
 
 val vectorSlicer = new 
VectorSlicer().setInputCol("features").setOutputCol("result")
 
-def validateResults(df: DataFrame): Unit = {
-  df.select("result", "expected").collect().foreach { case Row(vec1: 
Vector, vec2: Vector) =>
+def validateResults(rows: Seq[Row]): Unit = {
+  rows.foreach { case Row(vec1: Vector, vec2: Vector) =>
 assert(vec1 === vec2)
   }
-  val resultMetadata = 
AttributeGroup.fromStructField(df.schema("result"))
-  val expectedMetadata = 
AttributeGroup.fromStructField(df.schema("expected"))
+  val resultMetadata = 
AttributeGroup.fromStructField(rows.head.schema("result"))
+  val expectedMetadata = 
AttributeGroup.fromStructField(rows.head.schema("expected"))
   assert(resultMetadata.numAttributes === 
expectedMetadata.numAttributes)
   
resultMetadata.attributes.get.zip(expectedMetadata.attributes.get).foreach { 
case (a, b) =>
 assert(a === b)
   }
 }
 
 vectorSlicer.setIndices(Array(1, 4)).setNames(Array.empty)
-validateResults(vectorSlicer.transform(df))
+testTransformerByGlobalCheckFunc[(Vector, Vector)](df, vectorSlicer, 
"result", "expected")(
--- End diff --

Avoid using a global check function when you don't need to.  It'd be better 
to use testTransformer() since the test is per-row.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-12 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173593885
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala ---
@@ -247,14 +253,18 @@ class StringIndexerSuite
   .setInputCol("label")
   .setOutputCol("labelIndex")
   .fit(df)
-val transformed = indexer.transform(df)
+val expected1 = Seq(0.0, 2.0, 1.0, 0.0, 0.0, 
1.0).map(Tuple1(_)).toDF("labelIndex")
+testTransformerByGlobalCheckFunc[(Int, String)](df, indexer, 
"labelIndex") { rows =>
+  assert(rows == expected1.collect().seq)
+}
+
 val idx2str = new IndexToString()
   .setInputCol("labelIndex")
   .setOutputCol("sameLabel")
   .setLabels(indexer.labels)
-idx2str.transform(transformed).select("label", 
"sameLabel").collect().foreach {
-  case Row(a: String, b: String) =>
-assert(a === b)
+
+testTransformerByGlobalCheckFunc[(Double)](expected1, idx2str, 
"sameLabel") { rows =>
--- End diff --

You should be able to test per-row, rather than using a global check 
function.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-12 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173592811
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala ---
@@ -70,36 +71,37 @@ class StringIndexerSuite
   .setInputCol("label")
   .setOutputCol("labelIndex")
   .fit(df)
+
 // Verify we throw by default with unseen values
-intercept[SparkException] {
-  indexer.transform(df2).collect()
-}
+testTransformerByInterceptingException[(Int, String)](
+  df2,
+  indexer,
+  "Unseen label:",
+  "labelIndex")
 
 indexer.setHandleInvalid("skip")
-// Verify that we skip the c record
-val transformedSkip = indexer.transform(df2)
-val attrSkip = 
Attribute.fromStructField(transformedSkip.schema("labelIndex"))
-  .asInstanceOf[NominalAttribute]
-assert(attrSkip.values.get === Array("b", "a"))
-val outputSkip = transformedSkip.select("id", "labelIndex").rdd.map { 
r =>
-  (r.getInt(0), r.getDouble(1))
-}.collect().toSet
-// a -> 1, b -> 0
-val expectedSkip = Set((0, 1.0), (1, 0.0))
-assert(outputSkip === expectedSkip)
+
+testTransformerByGlobalCheckFunc[(Int, String)](df2, indexer, "id", 
"labelIndex") { rows =>
+  val attrSkip = 
Attribute.fromStructField(rows.head.schema("labelIndex"))
+.asInstanceOf[NominalAttribute]
+  assert(attrSkip.values.get === Array("b", "a"))
+  // Verify that we skip the c record
+  // a -> 1, b -> 0
+  val expectedSkip = Seq((0, 1.0), (1, 0.0)).toDF()
+  assert(rows.seq === expectedSkip.collect().toSeq)
+}
 
 indexer.setHandleInvalid("keep")
+
 // Verify that we keep the unseen records
-val transformedKeep = indexer.transform(df2)
-val attrKeep = 
Attribute.fromStructField(transformedKeep.schema("labelIndex"))
-  .asInstanceOf[NominalAttribute]
-assert(attrKeep.values.get === Array("b", "a", "__unknown"))
-val outputKeep = transformedKeep.select("id", "labelIndex").rdd.map { 
r =>
-  (r.getInt(0), r.getDouble(1))
-}.collect().toSet
-// a -> 1, b -> 0, c -> 2, d -> 3
-val expectedKeep = Set((0, 1.0), (1, 0.0), (2, 2.0), (3, 2.0))
-assert(outputKeep === expectedKeep)
+testTransformerByGlobalCheckFunc[(Int, String)](df2, indexer, "id", 
"labelIndex") { rows =>
+  val attrKeep = 
Attribute.fromStructField(rows.head.schema("labelIndex"))
+.asInstanceOf[NominalAttribute]
+  assert(attrKeep.values.get === Array("b", "a", "__unknown"))
+  // a -> 1, b -> 0, c -> 2, d -> 3
+  val expectedKeep = Seq((0, 1.0), (1, 0.0), (2, 2.0), (3, 2.0)).toDF()
--- End diff --

ditto: move outside checkFunc


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-12 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173600557
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala ---
@@ -128,18 +126,29 @@ class VectorIndexerSuite extends SparkFunSuite with 
MLlibTestSparkContext
 
 MLTestingUtils.checkCopyAndUids(vectorIndexer, model)
 
-model.transform(densePoints1) // should work
-model.transform(sparsePoints1) // should work
+// should work
+testTransformer[FeatureData](densePoints1, model, "indexed") { _ => }
+// should work
+testTransformer[FeatureData](sparsePoints1, model, "indexed") { _ => }
+
 // If the data is local Dataset, it throws AssertionError directly.
-intercept[AssertionError] {
-  model.transform(densePoints2).collect()
-  logInfo("Did not throw error when fit, transform were called on 
vectors of different lengths")
+withClue("Did not found expected error message when fit, " +
--- End diff --

found -> find


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-12 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173584864
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala 
---
@@ -364,18 +397,26 @@ class QuantileDiscretizerSuite
   .setOutputCols(Array("result1", "result2", "result3"))
   .setNumBucketsArray(Array(10, 10, 10))
 
-val result1 = discretizerSingleNumBuckets.fit(df).transform(df)
-  .select("result1", "result2", "result3")
-  .collect()
-val result2 = discretizerNumBucketsArray.fit(df).transform(df)
-  .select("result1", "result2", "result3")
-  .collect()
-
-result1.zip(result2).foreach {
-  case (row1, row2) =>
-assert(row1.getDouble(0) == row2.getDouble(0) &&
-  row1.getDouble(1) == row2.getDouble(1) &&
-  row1.getDouble(2) == row2.getDouble(2))
+val model = discretizerSingleNumBuckets.fit(df)
+val expected = model.transform(df).select("result1", "result2", 
"result3").collect()
+
+
+testTransformerByGlobalCheckFunc[(Double, Double, Double)](
+  df,
+  model,
+  "result1",
+  "result2",
+  "result3") { rows =>
+  assert(rows === expected)
+}
+
+testTransformerByGlobalCheckFunc[(Double, Double, Double)](
--- End diff --

Is this a repeat of the test just above?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-12 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173600517
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala ---
@@ -128,18 +126,29 @@ class VectorIndexerSuite extends SparkFunSuite with 
MLlibTestSparkContext
 
 MLTestingUtils.checkCopyAndUids(vectorIndexer, model)
 
-model.transform(densePoints1) // should work
-model.transform(sparsePoints1) // should work
+// should work
--- End diff --

We can remove "should work" comments  : P


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml.featur...

2018-03-09 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/20686
  
I'll do a complete review now!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-09 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173582018
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala ---
@@ -90,23 +96,29 @@ class OneHotEncoderSuite
 val encoder = new OneHotEncoder()
   .setInputCol("size")
   .setOutputCol("encoded")
-val output = encoder.transform(df)
-val group = AttributeGroup.fromStructField(output.schema("encoded"))
-assert(group.size === 2)
-assert(group.getAttr(0) === 
BinaryAttribute.defaultAttr.withName("small").withIndex(0))
-assert(group.getAttr(1) === 
BinaryAttribute.defaultAttr.withName("medium").withIndex(1))
+testTransformerByGlobalCheckFunc[(Double)](df, encoder, "encoded") { 
rows =>
+  val group = 
AttributeGroup.fromStructField(rows.head.schema("encoded"))
+  assert(group.size === 2)
+  assert(group.getAttr(0) === 
BinaryAttribute.defaultAttr.withName("small").withIndex(0))
+  assert(group.getAttr(1) === 
BinaryAttribute.defaultAttr.withName("medium").withIndex(1))
+}
   }
 
-  test("input column without ML attribute") {
+
+  ignore("input column without ML attribute") {
--- End diff --

Let's keep the test but limit it to batch.  People should switch to 
OneHotEncoderEstimator anyways.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-09 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173580629
  
--- Diff: mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala ---
@@ -108,5 +111,29 @@ trait MLTest extends StreamTest with TempDirectory { 
self: Suite =>
   otherResultCols: _*)(globalCheckFunction)
 testTransformerOnDF(dataframe, transformer, firstResultCol,
   otherResultCols: _*)(globalCheckFunction)
+}
+
+  def testTransformerByInterceptingException[A : Encoder](
+dataframe: DataFrame,
+transformer: Transformer,
+expectedMessagePart : String,
+firstResultCol: String) {
+
+def hasExpectedMessage(exception: Throwable): Boolean =
--- End diff --

Just curious: Did you have to add the getCause case because of streaming 
throwing wrapped exceptions?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-09 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173580016
  
--- Diff: mllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala ---
@@ -108,5 +111,29 @@ trait MLTest extends StreamTest with TempDirectory { 
self: Suite =>
   otherResultCols: _*)(globalCheckFunction)
 testTransformerOnDF(dataframe, transformer, firstResultCol,
   otherResultCols: _*)(globalCheckFunction)
+}
+
+  def testTransformerByInterceptingException[A : Encoder](
+dataframe: DataFrame,
+transformer: Transformer,
+expectedMessagePart : String,
+firstResultCol: String) {
+
+def hasExpectedMessage(exception: Throwable): Boolean =
--- End diff --

Since most other tests check parts of the message, I'm OK with this setup.  
When we don't think the message will remain stable, we can pass an empty string 
for expectedMessagePart.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-09 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173555129
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala ---
@@ -17,94 +17,72 @@
 
 package org.apache.spark.ml.feature
 
-import org.apache.spark.SparkFunSuite
 import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, 
Vectors}
-import org.apache.spark.ml.util.DefaultReadWriteTest
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
 import org.apache.spark.ml.util.TestingUtils._
-import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.sql.{DataFrame, Row}
 
 
-class NormalizerSuite extends SparkFunSuite with MLlibTestSparkContext 
with DefaultReadWriteTest {
+class NormalizerSuite extends MLTest with DefaultReadWriteTest {
 
   import testImplicits._
 
-  @transient var data: Array[Vector] = _
-  @transient var dataFrame: DataFrame = _
-  @transient var normalizer: Normalizer = _
-  @transient var l1Normalized: Array[Vector] = _
-  @transient var l2Normalized: Array[Vector] = _
+  @transient val data: Seq[Vector] = Seq(
+Vectors.sparse(3, Seq((0, -2.0), (1, 2.3))),
+Vectors.dense(0.0, 0.0, 0.0),
+Vectors.dense(0.6, -1.1, -3.0),
+Vectors.sparse(3, Seq((1, 0.91), (2, 3.2))),
+Vectors.sparse(3, Seq((0, 5.7), (1, 0.72), (2, 2.7))),
+Vectors.sparse(3, Seq()))
--- End diff --

I'd prefer to revert these changes.  As far as I know, nothing is broken, 
and this is a common pattern used in many parts of MLlib tests.

I think the main reason to move data around would be to have actual + 
expected values side-by-side for easier reading.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-09 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173554643
  
--- Diff: mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala 
---
@@ -19,61 +19,59 @@ package org.apache.spark.ml.feature
 
 import scala.beans.BeanInfo
 
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.ml.util.DefaultReadWriteTest
-import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
+import org.apache.spark.sql.{DataFrame, Row}
+
 
 @BeanInfo
 case class NGramTestData(inputTokens: Array[String], wantedNGrams: 
Array[String])
 
-class NGramSuite extends SparkFunSuite with MLlibTestSparkContext with 
DefaultReadWriteTest {
+class NGramSuite extends MLTest with DefaultReadWriteTest {
 
-  import org.apache.spark.ml.feature.NGramSuite._
   import testImplicits._
 
   test("default behavior yields bigram features") {
 val nGram = new NGram()
   .setInputCol("inputTokens")
   .setOutputCol("nGrams")
-val dataset = Seq(NGramTestData(
+val dataFrame = Seq(NGramTestData(
--- End diff --

These kinds of changes are not necessary and make the PR a lot longer.  
Would you mind reverting them?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml...

2018-03-09 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20686#discussion_r173556190
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala ---
@@ -17,94 +17,72 @@
 
 package org.apache.spark.ml.feature
 
-import org.apache.spark.SparkFunSuite
 import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, 
Vectors}
-import org.apache.spark.ml.util.DefaultReadWriteTest
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
 import org.apache.spark.ml.util.TestingUtils._
-import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.sql.{DataFrame, Row}
 
 
-class NormalizerSuite extends SparkFunSuite with MLlibTestSparkContext 
with DefaultReadWriteTest {
+class NormalizerSuite extends MLTest with DefaultReadWriteTest {
 
   import testImplicits._
 
-  @transient var data: Array[Vector] = _
-  @transient var dataFrame: DataFrame = _
-  @transient var normalizer: Normalizer = _
--- End diff --

I will say, though, that I'm happy with moving Normalizer into individual 
tests.  It's weird how it is shared here since it's mutated within tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20686: [SPARK-22915][MLlib] Streaming tests for spark.ml.featur...

2018-03-09 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/20686
  
Thanks for the PR @attilapiros and @WeichenXu123 for the review!  I'll take 
a look now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19381: [SPARK-10884][ML] Support prediction on single instance ...

2018-03-09 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/19381
  
Thanks for the PR!
Can you please add Since annotations?
Also, can the test code be consolidated?  Maybe you can have a helper 
function taking a model and dataset, usable with all Predictors.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17466: [SPARK-14681][ML] Added getter for impurityStats

2018-03-08 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/17466
  
@shaynativ Sorry for the inactivity here.  Btw, for the JIRA & PR title 
question above, I'd recommend checking out 
http://spark.apache.org/contributing.html

Since @WeichenXu123 opened a fresh PR for this, would you mind working with 
him on it?
We can close this issue / PR for now.  Thank you!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-18630][PYTHON][ML] Move del method from JavaParams to JavaWrapper; add tests

2018-03-05 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 508573958 -> 7706eea6a


[SPARK-18630][PYTHON][ML] Move del method from JavaParams to JavaWrapper; add 
tests

The `__del__` method that explicitly detaches the object was moved from 
`JavaParams` to `JavaWrapper` class, this way model summaries could also be 
garbage collected in Java. A test case was added to make sure that relevant 
error messages are thrown after the objects are deleted.

I ran pyspark tests  agains `pyspark-ml` module
`./python/run-tests --python-executables=$(which python) --modules=pyspark-ml`

Author: Yogesh Garg 

Closes #20724 from yogeshg/java_wrapper_memory.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7706eea6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7706eea6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7706eea6

Branch: refs/heads/master
Commit: 7706eea6a8bdcd73e9dde5212368f8825e2f1801
Parents: 5085739
Author: Yogesh Garg 
Authored: Mon Mar 5 15:53:10 2018 -0800
Committer: Joseph K. Bradley 
Committed: Mon Mar 5 15:53:10 2018 -0800

--
 python/pyspark/ml/tests.py   | 39 +++
 python/pyspark/ml/wrapper.py |  8 
 2 files changed, 43 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7706eea6/python/pyspark/ml/tests.py
--
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index 1168859..6dee693 100755
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -173,6 +173,45 @@ class MockModel(MockTransformer, Model, HasFake):
 pass
 
 
+class JavaWrapperMemoryTests(SparkSessionTestCase):
+
+def test_java_object_gets_detached(self):
+df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)),
+ (0.0, 2.0, Vectors.sparse(1, [], 
[]))],
+["label", "weight", "features"])
+lr = LinearRegression(maxIter=1, regParam=0.0, solver="normal", 
weightCol="weight",
+  fitIntercept=False)
+
+model = lr.fit(df)
+summary = model.summary
+
+self.assertIsInstance(model, JavaWrapper)
+self.assertIsInstance(summary, JavaWrapper)
+self.assertIsInstance(model, JavaParams)
+self.assertNotIsInstance(summary, JavaParams)
+
+error_no_object = 'Target Object ID does not exist for this gateway'
+
+self.assertIn("LinearRegression_", model._java_obj.toString())
+self.assertIn("LinearRegressionTrainingSummary", 
summary._java_obj.toString())
+
+model.__del__()
+
+with self.assertRaisesRegexp(py4j.protocol.Py4JError, error_no_object):
+model._java_obj.toString()
+self.assertIn("LinearRegressionTrainingSummary", 
summary._java_obj.toString())
+
+try:
+summary.__del__()
+except:
+pass
+
+with self.assertRaisesRegexp(py4j.protocol.Py4JError, error_no_object):
+model._java_obj.toString()
+with self.assertRaisesRegexp(py4j.protocol.Py4JError, error_no_object):
+summary._java_obj.toString()
+
+
 class ParamTypeConversionTests(PySparkTestCase):
 """
 Test that param type conversion happens.

http://git-wip-us.apache.org/repos/asf/spark/blob/7706eea6/python/pyspark/ml/wrapper.py
--
diff --git a/python/pyspark/ml/wrapper.py b/python/pyspark/ml/wrapper.py
index 0f846fb..5061f64 100644
--- a/python/pyspark/ml/wrapper.py
+++ b/python/pyspark/ml/wrapper.py
@@ -36,6 +36,10 @@ class JavaWrapper(object):
 super(JavaWrapper, self).__init__()
 self._java_obj = java_obj
 
+def __del__(self):
+if SparkContext._active_spark_context and self._java_obj is not None:
+SparkContext._active_spark_context._gateway.detach(self._java_obj)
+
 @classmethod
 def _create_from_java_class(cls, java_class, *args):
 """
@@ -100,10 +104,6 @@ class JavaParams(JavaWrapper, Params):
 
 __metaclass__ = ABCMeta
 
-def __del__(self):
-if SparkContext._active_spark_context:
-SparkContext._active_spark_context._gateway.detach(self._java_obj)
-
 def _make_java_param_pair(self, param, value):
 """
 Makes a Java param pair.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[GitHub] spark issue #20724: [SPARK-18630][PYTHON][ML] Move del method from JavaParam...

2018-03-05 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/20724
  
Thanks @yogeshg and @WeichenXu123 !  I verified the new test fails without 
the wrapper fix.
LGTM
Merging with master

I won't bother backporting this since it's a small issue.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20724: [SPARK-18630][PYTHON][ML] Move del method from Ja...

2018-03-05 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20724#discussion_r172367567
  
--- Diff: python/pyspark/ml/tests.py ---
@@ -173,6 +173,45 @@ class MockModel(MockTransformer, Model, HasFake):
 pass
 
 
+class JavaWrapperMemoryTests(SparkSessionTestCase):
+
+def test_java_object_gets_detached(self):
+df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)),
+ (0.0, 2.0, Vectors.sparse(1, [], 
[]))],
+["label", "weight", "features"])
+lr = LinearRegression(maxIter=1, regParam=0.0, solver="normal", 
weightCol="weight",
+  fitIntercept=False)
+
+model = lr.fit(df)
+summary = model.summary
+
+self.assertIsInstance(model, JavaWrapper)
+self.assertIsInstance(summary, JavaWrapper)
+self.assertIsInstance(model, JavaParams)
+self.assertNotIsInstance(summary, JavaParams)
+
+error_no_object = 'Target Object ID does not exist for this 
gateway'
+
+self.assertIn("LinearRegression_", model._java_obj.toString())
+self.assertIn("LinearRegressionTrainingSummary", 
summary._java_obj.toString())
+
+model.__del__()
+
+with self.assertRaisesRegexp(py4j.protocol.Py4JError, 
error_no_object):
+model._java_obj.toString()
+self.assertIn("LinearRegressionTrainingSummary", 
summary._java_obj.toString())
+
+try:
+summary.__del__()
+except:
--- End diff --

Clarified offline: This was needed to run the test before the fix in 
wrapper.py to verify that it failed before the fix.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-22882][ML][TESTS] ML test for structured streaming: ml.classification

2018-03-05 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 4586eada4 -> 98a5c0a35


[SPARK-22882][ML][TESTS] ML test for structured streaming: ml.classification

## What changes were proposed in this pull request?

adding Structured Streaming tests for all Models/Transformers in 
spark.ml.classification

## How was this patch tested?

N/A

Author: WeichenXu 

Closes #20121 from WeichenXu123/ml_stream_test_classification.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/98a5c0a3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/98a5c0a3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/98a5c0a3

Branch: refs/heads/master
Commit: 98a5c0a35f0a24730f5074522939acf57ef95422
Parents: 4586ead
Author: WeichenXu 
Authored: Mon Mar 5 10:50:00 2018 -0800
Committer: Joseph K. Bradley 
Committed: Mon Mar 5 10:50:00 2018 -0800

--
 .../DecisionTreeClassifierSuite.scala   |  29 ++-
 .../ml/classification/GBTClassifierSuite.scala  |  77 ++-
 .../ml/classification/LinearSVCSuite.scala  |  15 +-
 .../LogisticRegressionSuite.scala   | 229 +++
 .../MultilayerPerceptronClassifierSuite.scala   |  44 ++--
 .../ml/classification/NaiveBayesSuite.scala |  47 ++--
 .../ml/classification/OneVsRestSuite.scala  |  21 +-
 .../ProbabilisticClassifierSuite.scala  |  29 +--
 .../RandomForestClassifierSuite.scala   |  16 +-
 9 files changed, 202 insertions(+), 305 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/98a5c0a3/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala
index 38b265d..eeb0324 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/classification/DecisionTreeClassifierSuite.scala
@@ -23,15 +23,14 @@ import org.apache.spark.ml.linalg.{Vector, Vectors}
 import org.apache.spark.ml.param.ParamsSuite
 import org.apache.spark.ml.tree.{CategoricalSplit, InternalNode, LeafNode}
 import org.apache.spark.ml.tree.impl.TreeTests
-import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
 import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint}
-import org.apache.spark.mllib.tree.{DecisionTree => OldDecisionTree, 
DecisionTreeSuite => OldDecisionTreeSuite}
-import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.mllib.tree.{DecisionTree => OldDecisionTree,
+  DecisionTreeSuite => OldDecisionTreeSuite}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Row}
 
-class DecisionTreeClassifierSuite
-  extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
+class DecisionTreeClassifierSuite extends MLTest with DefaultReadWriteTest {
 
   import DecisionTreeClassifierSuite.compareAPIs
   import testImplicits._
@@ -251,20 +250,18 @@ class DecisionTreeClassifierSuite
 
 MLTestingUtils.checkCopyAndUids(dt, newTree)
 
-val predictions = newTree.transform(newData)
-  .select(newTree.getPredictionCol, newTree.getRawPredictionCol, 
newTree.getProbabilityCol)
-  .collect()
-
-predictions.foreach { case Row(pred: Double, rawPred: Vector, probPred: 
Vector) =>
-  assert(pred === rawPred.argmax,
-s"Expected prediction $pred but calculated ${rawPred.argmax} from 
rawPrediction.")
-  val sum = rawPred.toArray.sum
-  assert(Vectors.dense(rawPred.toArray.map(_ / sum)) === probPred,
-"probability prediction mismatch")
+testTransformer[(Vector, Double)](newData, newTree,
+  "prediction", "rawPrediction", "probability") {
+  case Row(pred: Double, rawPred: Vector, probPred: Vector) =>
+assert(pred === rawPred.argmax,
+  s"Expected prediction $pred but calculated ${rawPred.argmax} from 
rawPrediction.")
+val sum = rawPred.toArray.sum
+assert(Vectors.dense(rawPred.toArray.map(_ / sum)) === probPred,
+  "probability prediction mismatch")
 }
 
 ProbabilisticClassifierSuite.testPredictMethods[
-  Vector, DecisionTreeClassificationModel](newTree, newData)
+  Vector, DecisionTreeClassificationModel](this, newTree, newData)
   }
 
   test("training with 1-category categorical feature") {

http://git-wip-us.apache.org/repos/asf/spark/blob/98a5c0a3/mllib/src/test/scala/org/apache/spark/ml/classification/GBTClassifierSuite.scala
---

[GitHub] spark pull request #20121: [SPARK-22882][ML][TESTS] ML test for structured s...

2018-03-05 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/20121#discussion_r172288890
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
 ---
@@ -2567,10 +2504,13 @@ class LogisticRegressionSuite
 val model1 = lr.fit(smallBinaryDataset)
 val lr2 = new 
LogisticRegression().setInitialModel(model1).setMaxIter(5).setFamily("binomial")
 val model2 = lr2.fit(smallBinaryDataset)
-val predictions1 = 
model1.transform(smallBinaryDataset).select("prediction").collect()
-val predictions2 = 
model2.transform(smallBinaryDataset).select("prediction").collect()
-predictions1.zip(predictions2).foreach { case (Row(p1: Double), 
Row(p2: Double)) =>
-  assert(p1 === p2)
+val binaryExpected = 
model1.transform(smallBinaryDataset).select("prediction").collect()
+  .map(_.getDouble(0))
+for (model <- Seq(model1, model2)) {
--- End diff --

My thought is that testing binaryExpected (from model1) against model2 
would already test the 2 things we care about:
* batch vs streaming prediction
* initial model

I'll just merge this though since it's not a big deal (just a bit longer 
testing time).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   9   10   >