Repository: spark
Updated Branches:
  refs/heads/master 3cb82047f -> 75a183071


[SPARK-22883] ML test for StructuredStreaming: spark.ml.feature, I-M

## What changes were proposed in this pull request?

Adds structured streaming tests using testTransformer for these suites:
* IDF
* Imputer
* Interaction
* MaxAbsScaler
* MinHashLSH
* MinMaxScaler
* NGram

## How was this patch tested?

It is a bunch of tests!

Author: Joseph K. Bradley <jos...@databricks.com>

Closes #20964 from jkbradley/SPARK-22883-part2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/75a18307
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/75a18307
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/75a18307

Branch: refs/heads/master
Commit: 75a183071c4ed2e407c930edfdf721779662b3ee
Parents: 3cb8204
Author: Joseph K. Bradley <jos...@databricks.com>
Authored: Wed Apr 11 09:59:38 2018 -0700
Committer: Joseph K. Bradley <jos...@databricks.com>
Committed: Wed Apr 11 09:59:38 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/ml/feature/IDFSuite.scala  | 14 +++---
 .../apache/spark/ml/feature/ImputerSuite.scala  | 31 ++++++++++---
 .../spark/ml/feature/InteractionSuite.scala     | 46 ++++++++++----------
 .../spark/ml/feature/MaxAbsScalerSuite.scala    | 14 +++---
 .../spark/ml/feature/MinHashLSHSuite.scala      | 25 ++++++++---
 .../spark/ml/feature/MinMaxScalerSuite.scala    | 14 +++---
 .../apache/spark/ml/feature/NGramSuite.scala    |  2 +-
 7 files changed, 89 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/75a18307/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala
index 005edf7..cdd62be 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala
@@ -17,17 +17,15 @@
 
 package org.apache.spark.ml.feature
 
-import org.apache.spark.SparkFunSuite
 import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors}
 import org.apache.spark.ml.param.ParamsSuite
-import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
 import org.apache.spark.ml.util.TestingUtils._
 import org.apache.spark.mllib.feature.{IDFModel => OldIDFModel}
 import org.apache.spark.mllib.linalg.VectorImplicits._
-import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.sql.Row
 
-class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with 
DefaultReadWriteTest {
+class IDFSuite extends MLTest with DefaultReadWriteTest {
 
   import testImplicits._
 
@@ -57,7 +55,7 @@ class IDFSuite extends SparkFunSuite with 
MLlibTestSparkContext with DefaultRead
       Vectors.dense(0.0, 1.0, 2.0, 3.0),
       Vectors.sparse(numOfFeatures, Array(1), Array(1.0))
     )
-    val numOfData = data.size
+    val numOfData = data.length
     val idf = Vectors.dense(Array(0, 3, 1, 2).map { x =>
       math.log((numOfData + 1.0) / (x + 1.0))
     })
@@ -72,7 +70,7 @@ class IDFSuite extends SparkFunSuite with 
MLlibTestSparkContext with DefaultRead
 
     MLTestingUtils.checkCopyAndUids(idfEst, idfModel)
 
-    idfModel.transform(df).select("idfValue", "expected").collect().foreach {
+    testTransformer[(Vector, Vector)](df, idfModel, "idfValue", "expected") {
       case Row(x: Vector, y: Vector) =>
         assert(x ~== y absTol 1e-5, "Transformed vector is different with 
expected vector.")
     }
@@ -85,7 +83,7 @@ class IDFSuite extends SparkFunSuite with 
MLlibTestSparkContext with DefaultRead
       Vectors.dense(0.0, 1.0, 2.0, 3.0),
       Vectors.sparse(numOfFeatures, Array(1), Array(1.0))
     )
-    val numOfData = data.size
+    val numOfData = data.length
     val idf = Vectors.dense(Array(0, 3, 1, 2).map { x =>
       if (x > 0) math.log((numOfData + 1.0) / (x + 1.0)) else 0
     })
@@ -99,7 +97,7 @@ class IDFSuite extends SparkFunSuite with 
MLlibTestSparkContext with DefaultRead
       .setMinDocFreq(1)
       .fit(df)
 
-    idfModel.transform(df).select("idfValue", "expected").collect().foreach {
+    testTransformer[(Vector, Vector)](df, idfModel, "idfValue", "expected") {
       case Row(x: Vector, y: Vector) =>
         assert(x ~== y absTol 1e-5, "Transformed vector is different with 
expected vector.")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/75a18307/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
index c08b35b..75f63a6 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/ImputerSuite.scala
@@ -16,13 +16,12 @@
  */
 package org.apache.spark.ml.feature
 
-import org.apache.spark.{SparkException, SparkFunSuite}
-import org.apache.spark.ml.util.DefaultReadWriteTest
-import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.SparkException
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
 import org.apache.spark.mllib.util.TestingUtils._
 import org.apache.spark.sql.{DataFrame, Row}
 
-class ImputerSuite extends SparkFunSuite with MLlibTestSparkContext with 
DefaultReadWriteTest {
+class ImputerSuite extends MLTest with DefaultReadWriteTest {
 
   test("Imputer for Double with default missing Value NaN") {
     val df = spark.createDataFrame( Seq(
@@ -76,6 +75,28 @@ class ImputerSuite extends SparkFunSuite with 
MLlibTestSparkContext with Default
     ImputerSuite.iterateStrategyTest(imputer, df)
   }
 
+  test("Imputer should work with Structured Streaming") {
+    val localSpark = spark
+    import localSpark.implicits._
+    val df = Seq[(java.lang.Double, Double)](
+      (4.0, 4.0),
+      (10.0, 10.0),
+      (10.0, 10.0),
+      (Double.NaN, 8.0),
+      (null, 8.0)
+    ).toDF("value", "expected_mean_value")
+    val imputer = new Imputer()
+      .setInputCols(Array("value"))
+      .setOutputCols(Array("out"))
+      .setStrategy("mean")
+    val model = imputer.fit(df)
+    testTransformer[(java.lang.Double, Double)](df, model, 
"expected_mean_value", "out") {
+      case Row(exp: java.lang.Double, out: Double) =>
+        assert((exp.isNaN && out.isNaN) || (exp == out),
+          s"Imputed values differ. Expected: $exp, actual: $out")
+    }
+  }
+
   test("Imputer throws exception when surrogate cannot be computed") {
     val df = spark.createDataFrame( Seq(
       (0, Double.NaN, 1.0, 1.0),
@@ -164,8 +185,6 @@ object ImputerSuite {
    * @param df DataFrame with columns "id", "value", "expected_mean", 
"expected_median"
    */
   def iterateStrategyTest(imputer: Imputer, df: DataFrame): Unit = {
-    val inputCols = imputer.getInputCols
-
     Seq("mean", "median").foreach { strategy =>
       imputer.setStrategy(strategy)
       val model = imputer.fit(df)

http://git-wip-us.apache.org/repos/asf/spark/blob/75a18307/mllib/src/test/scala/org/apache/spark/ml/feature/InteractionSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/InteractionSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/InteractionSuite.scala
index 54f059e..eea31fc 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/InteractionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/InteractionSuite.scala
@@ -19,15 +19,15 @@ package org.apache.spark.ml.feature
 
 import scala.collection.mutable.ArrayBuilder
 
-import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.SparkException
 import org.apache.spark.ml.attribute._
 import org.apache.spark.ml.linalg.{Vector, Vectors}
 import org.apache.spark.ml.param.ParamsSuite
-import org.apache.spark.ml.util.DefaultReadWriteTest
-import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.functions.col
 
-class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with 
DefaultReadWriteTest {
+class InteractionSuite extends MLTest with DefaultReadWriteTest {
 
   import testImplicits._
 
@@ -63,9 +63,9 @@ class InteractionSuite extends SparkFunSuite with 
MLlibTestSparkContext with Def
 
   test("numeric interaction") {
     val data = Seq(
-      (2, Vectors.dense(3.0, 4.0)),
-      (1, Vectors.dense(1.0, 5.0))
-    ).toDF("a", "b")
+      (2, Vectors.dense(3.0, 4.0), Vectors.dense(6.0, 8.0)),
+      (1, Vectors.dense(1.0, 5.0), Vectors.dense(1.0, 5.0))
+    ).toDF("a", "b", "expected")
     val groupAttr = new AttributeGroup(
       "b",
       Array[Attribute](
@@ -73,14 +73,15 @@ class InteractionSuite extends SparkFunSuite with 
MLlibTestSparkContext with Def
         NumericAttribute.defaultAttr.withName("bar")))
     val df = data.select(
       col("a").as("a", NumericAttribute.defaultAttr.toMetadata()),
-      col("b").as("b", groupAttr.toMetadata()))
+      col("b").as("b", groupAttr.toMetadata()),
+      col("expected"))
     val trans = new Interaction().setInputCols(Array("a", 
"b")).setOutputCol("features")
+    testTransformer[(Int, Vector, Vector)](df, trans, "features", "expected") {
+      case Row(features: Vector, expected: Vector) =>
+        assert(features === expected)
+    }
+
     val res = trans.transform(df)
-    val expected = Seq(
-      (2, Vectors.dense(3.0, 4.0), Vectors.dense(6.0, 8.0)),
-      (1, Vectors.dense(1.0, 5.0), Vectors.dense(1.0, 5.0))
-    ).toDF("a", "b", "features")
-    assert(res.collect() === expected.collect())
     val attrs = AttributeGroup.fromStructField(res.schema("features"))
     val expectedAttrs = new AttributeGroup(
       "features",
@@ -92,9 +93,9 @@ class InteractionSuite extends SparkFunSuite with 
MLlibTestSparkContext with Def
 
   test("nominal interaction") {
     val data = Seq(
-      (2, Vectors.dense(3.0, 4.0)),
-      (1, Vectors.dense(1.0, 5.0))
-    ).toDF("a", "b")
+      (2, Vectors.dense(3.0, 4.0), Vectors.dense(0, 0, 0, 0, 3, 4)),
+      (1, Vectors.dense(1.0, 5.0), Vectors.dense(0, 0, 1, 5, 0, 0))
+    ).toDF("a", "b", "expected")
     val groupAttr = new AttributeGroup(
       "b",
       Array[Attribute](
@@ -103,14 +104,15 @@ class InteractionSuite extends SparkFunSuite with 
MLlibTestSparkContext with Def
     val df = data.select(
       col("a").as(
         "a", NominalAttribute.defaultAttr.withValues(Array("up", "down", 
"left")).toMetadata()),
-      col("b").as("b", groupAttr.toMetadata()))
+      col("b").as("b", groupAttr.toMetadata()),
+      col("expected"))
     val trans = new Interaction().setInputCols(Array("a", 
"b")).setOutputCol("features")
+    testTransformer[(Int, Vector, Vector)](df, trans, "features", "expected") {
+      case Row(features: Vector, expected: Vector) =>
+        assert(features === expected)
+    }
+
     val res = trans.transform(df)
-    val expected = Seq(
-      (2, Vectors.dense(3.0, 4.0), Vectors.dense(0, 0, 0, 0, 3, 4)),
-      (1, Vectors.dense(1.0, 5.0), Vectors.dense(0, 0, 1, 5, 0, 0))
-    ).toDF("a", "b", "features")
-    assert(res.collect() === expected.collect())
     val attrs = AttributeGroup.fromStructField(res.schema("features"))
     val expectedAttrs = new AttributeGroup(
       "features",

http://git-wip-us.apache.org/repos/asf/spark/blob/75a18307/mllib/src/test/scala/org/apache/spark/ml/feature/MaxAbsScalerSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/MaxAbsScalerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/MaxAbsScalerSuite.scala
index 918da4f..8dd0f0c 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/MaxAbsScalerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/MaxAbsScalerSuite.scala
@@ -14,15 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.spark.ml.feature
 
-import org.apache.spark.SparkFunSuite
 import org.apache.spark.ml.linalg.{Vector, Vectors}
-import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
-import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
 import org.apache.spark.sql.Row
 
-class MaxAbsScalerSuite extends SparkFunSuite with MLlibTestSparkContext with 
DefaultReadWriteTest {
+class MaxAbsScalerSuite extends MLTest with DefaultReadWriteTest {
 
   import testImplicits._
 
@@ -45,9 +44,10 @@ class MaxAbsScalerSuite extends SparkFunSuite with 
MLlibTestSparkContext with De
       .setOutputCol("scaled")
 
     val model = scaler.fit(df)
-    model.transform(df).select("expected", "scaled").collect()
-      .foreach { case Row(vector1: Vector, vector2: Vector) =>
-      assert(vector1.equals(vector2), s"MaxAbsScaler ut error: $vector2 should 
be $vector1")
+    testTransformer[(Vector, Vector)](df, model, "expected", "scaled") {
+      case Row(expectedVec: Vector, actualVec: Vector) =>
+        assert(expectedVec === actualVec,
+          s"MaxAbsScaler error: Expected $expectedVec but computed $actualVec")
     }
 
     MLTestingUtils.checkCopyAndUids(scaler, model)

http://git-wip-us.apache.org/repos/asf/spark/blob/75a18307/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala
index 3da0fb7..1c2956c 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/MinHashLSHSuite.scala
@@ -17,14 +17,13 @@
 
 package org.apache.spark.ml.feature
 
-import org.apache.spark.SparkFunSuite
 import org.apache.spark.ml.linalg.{Vector, Vectors}
 import org.apache.spark.ml.param.ParamsSuite
-import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
-import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.sql.Dataset
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
+import org.apache.spark.sql.{Dataset, Row}
 
-class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with 
DefaultReadWriteTest {
+
+class MinHashLSHSuite extends MLTest with DefaultReadWriteTest {
 
   @transient var dataset: Dataset[_] = _
 
@@ -175,4 +174,20 @@ class MinHashLSHSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defa
     assert(precision == 1.0)
     assert(recall >= 0.7)
   }
+
+  test("MinHashLSHModel.transform should work with Structured Streaming") {
+    val localSpark = spark
+    import localSpark.implicits._
+
+    val model = new MinHashLSHModel("mh", randCoefficients = Array((1, 0)))
+    model.set(model.inputCol, "keys")
+    testTransformer[Tuple1[Vector]](dataset.toDF(), model, "keys", 
model.getOutputCol) {
+      case Row(_: Vector, output: Seq[_]) =>
+        assert(output.length === model.randCoefficients.length)
+        // no AND-amplification yet: SPARK-18450, so each hash output is of 
length 1
+        output.foreach {
+          case hashOutput: Vector => assert(hashOutput.size === 1)
+        }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/75a18307/mllib/src/test/scala/org/apache/spark/ml/feature/MinMaxScalerSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/MinMaxScalerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/MinMaxScalerSuite.scala
index 51db74e..2d965f2 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/MinMaxScalerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/MinMaxScalerSuite.scala
@@ -17,13 +17,11 @@
 
 package org.apache.spark.ml.feature
 
-import org.apache.spark.SparkFunSuite
 import org.apache.spark.ml.linalg.{Vector, Vectors}
-import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
-import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
 import org.apache.spark.sql.Row
 
-class MinMaxScalerSuite extends SparkFunSuite with MLlibTestSparkContext with 
DefaultReadWriteTest {
+class MinMaxScalerSuite extends MLTest with DefaultReadWriteTest {
 
   import testImplicits._
 
@@ -48,9 +46,9 @@ class MinMaxScalerSuite extends SparkFunSuite with 
MLlibTestSparkContext with De
       .setMax(5)
 
     val model = scaler.fit(df)
-    model.transform(df).select("expected", "scaled").collect()
-      .foreach { case Row(vector1: Vector, vector2: Vector) =>
-        assert(vector1.equals(vector2), "Transformed vector is different with 
expected.")
+    testTransformer[(Vector, Vector)](df, model, "expected", "scaled") {
+      case Row(vector1: Vector, vector2: Vector) =>
+        assert(vector1 === vector2, "Transformed vector is different with 
expected.")
     }
 
     MLTestingUtils.checkCopyAndUids(scaler, model)
@@ -114,7 +112,7 @@ class MinMaxScalerSuite extends SparkFunSuite with 
MLlibTestSparkContext with De
     val model = scaler.fit(df)
     model.transform(df).select("expected", "scaled").collect()
       .foreach { case Row(vector1: Vector, vector2: Vector) =>
-        assert(vector1.equals(vector2), "Transformed vector is different with 
expected.")
+        assert(vector1 === vector2, "Transformed vector is different with 
expected.")
       }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/75a18307/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala
index e5956ee..201a335 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala
@@ -84,7 +84,7 @@ class NGramSuite extends MLTest with DefaultReadWriteTest {
 
   def testNGram(t: NGram, dataFrame: DataFrame): Unit = {
     testTransformer[(Seq[String], Seq[String])](dataFrame, t, "nGrams", 
"wantedNGrams") {
-      case Row(actualNGrams : Seq[String], wantedNGrams: Seq[String]) =>
+      case Row(actualNGrams : Seq[_], wantedNGrams: Seq[_]) =>
         assert(actualNGrams === wantedNGrams)
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to