[GitHub] spark pull request #21195: [Spark 23975][ML] Add support of array input for ...
GitHub user ludatabricks opened a pull request: https://github.com/apache/spark/pull/21195 [Spark 23975][ML] Add support of array input for all clustering methods ## What changes were proposed in this pull request? Add support for all of the clustering methods ## How was this patch tested? unit tests added Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ludatabricks/spark-1 SPARK-23975-1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21195.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21195 commit 31226b4b8e5aa5fc016f61ec86c42683c452a696 Author: Lu WANG Date: 2018-04-26T17:46:49Z add Array input support for BisectingKMeans commit 45e6e96e974607ed0526401d0fdbb4f1c8161dd6 Author: Lu WANG Date: 2018-04-30T17:14:41Z add support of array input for all clustering methods --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21195: [Spark-23975][ML] Add support of array input for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21195#discussion_r185984500 --- Diff: mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala --- @@ -182,6 +184,40 @@ class BisectingKMeansSuite model.clusterCenters.forall(Vectors.norm(_, 2) == 1.0) } + + test("BisectingKMeans with Array input") { +val featuresColNameD = "array_double_features" +val featuresColNameF = "array_float_features" +val doubleUDF = udf { (features: Vector) => + val featureArray = Array.fill[Double](features.size)(0.0) + features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) + featureArray +} +val floatUDF = udf { (features: Vector) => + val featureArray = Array.fill[Float](features.size)(0.0f) + features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) + featureArray +} +val newdatasetD = dataset.withColumn(featuresColNameD, doubleUDF(col("features"))) + .drop("features") +val newdatasetF = dataset.withColumn(featuresColNameF, floatUDF(col("features"))) + .drop("features") +assert(newdatasetD.schema(featuresColNameD).dataType.equals(new ArrayType(DoubleType, false))) +assert(newdatasetF.schema(featuresColNameF).dataType.equals(new ArrayType(FloatType, false))) + +val bkmD = new BisectingKMeans() + .setK(k).setMaxIter(1).setFeaturesCol(featuresColNameD).setSeed(1) +val bkmF = new BisectingKMeans() + .setK(k).setMaxIter(1).setFeaturesCol(featuresColNameF).setSeed(1) +val modelD = bkmD.fit(newdatasetD) +val modelF = bkmF.fit(newdatasetF) +val transformedD = modelD.transform(newdatasetD) +val transformedF = modelF.transform(newdatasetF) +val predictDifference = transformedD.select("prediction") + .except(transformedF.select("prediction")) +assert(predictDifference.count() == 0) --- End diff -- This only verifies it handles `Array[Double]` and `Array[Float]` the same way. But it doesn't guarantee that the result is correct. We can define a method that takes a dataset, apply one iteration, and return the cost. ~~~scala def trainAndComputeCost(dataset: DataFrame): Double = { val model = new BisectingKMeans() .setK(k).setMaxIter(1).setSeed(1) .fit(dataset) model.computeCost(dataset) } val trueCost = trainAndComputeCost(dataset) val floatArrayCost = trainAndComputeCost(newDatasetF) assert(floatArrayCost === trueCost) val doubleArrayCost = trainAndComputeCost(newDatasetD) assert(doubleArrayCost === trueCost) ~~~ We can map the original dataset to single precision to have exact match. Or we can test equality with a threshold. See https://github.com/apache/spark/blob/master/mllib/src/test/scala/org/apache/spark/mllib/util/TestingUtils.scala --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21195: [Spark-23975][ML] Add support of array input for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21195#discussion_r185971385 --- Diff: mllib/src/main/scala/org/apache/spark/ml/util/SchemaUtils.scala --- @@ -101,4 +102,17 @@ private[spark] object SchemaUtils { require(!schema.fieldNames.contains(col.name), s"Column ${col.name} already exists.") StructType(schema.fields :+ col) } + + /** + * Check whether the given column in the schema is one of the supporting vector type: Vector, + * Array[Dloat]. Array[Double] --- End diff -- nit: Float --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21195: [Spark-23975][ML] Add support of array input for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21195#discussion_r185983646 --- Diff: mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala --- @@ -182,6 +184,40 @@ class BisectingKMeansSuite model.clusterCenters.forall(Vectors.norm(_, 2) == 1.0) } + + test("BisectingKMeans with Array input") { +val featuresColNameD = "array_double_features" +val featuresColNameF = "array_float_features" +val doubleUDF = udf { (features: Vector) => + val featureArray = Array.fill[Double](features.size)(0.0) + features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) + featureArray +} +val floatUDF = udf { (features: Vector) => + val featureArray = Array.fill[Float](features.size)(0.0f) + features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) + featureArray +} +val newdatasetD = dataset.withColumn(featuresColNameD, doubleUDF(col("features"))) + .drop("features") --- End diff -- * Unnecessary to drop `features`. Or you can simply replace the features column: ~~~scala val newdatasetD = dataset.withColumn(FEATURES, doubleUDF(col(FEATURES))) ~~~ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21195: [Spark-23975][ML] Add support of array input for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21195#discussion_r185984527 --- Diff: mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala --- @@ -256,6 +258,42 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext val expectedMatrix = GaussianMixture.unpackUpperTriangularMatrix(4, triangularValues) assert(symmetricMatrix === expectedMatrix) } + + test("GaussianMixture with Array input") { +val featuresColNameD = "array_double_features" +val featuresColNameF = "array_float_features" +val doubleUDF = udf { (features: Vector) => + val featureArray = Array.fill[Double](features.size)(0.0) + features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) + featureArray +} +val floatUDF = udf { (features: Vector) => + val featureArray = Array.fill[Float](features.size)(0.0f) + features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) + featureArray +} +val newdatasetD = dataset.withColumn(featuresColNameD, doubleUDF(col("features"))) + .drop("features") +val newdatasetF = dataset.withColumn(featuresColNameF, floatUDF(col("features"))) + .drop("features") +assert(newdatasetD.schema(featuresColNameD).dataType.equals(new ArrayType(DoubleType, false))) +assert(newdatasetF.schema(featuresColNameF).dataType.equals(new ArrayType(FloatType, false))) + +val gmD = new GaussianMixture().setK(k).setMaxIter(1) + .setFeaturesCol(featuresColNameD).setSeed(1) +val gmF = new GaussianMixture().setK(k).setMaxIter(1) + .setFeaturesCol(featuresColNameF).setSeed(1) +val modelD = gmD.fit(newdatasetD) +val modelF = gmF.fit(newdatasetF) +val transformedD = modelD.transform(newdatasetD) +val transformedF = modelF.transform(newdatasetF) +val predictDifference = transformedD.select("prediction") + .except(transformedF.select("prediction")) +assert(predictDifference.count() == 0) +val probabilityDifference = transformedD.select("probability") + .except(transformedF.select("probability")) +assert(probabilityDifference.count() == 0) --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21195: [Spark-23975][ML] Add support of array input for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21195#discussion_r185971647 --- Diff: mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala --- @@ -182,6 +184,40 @@ class BisectingKMeansSuite model.clusterCenters.forall(Vectors.norm(_, 2) == 1.0) } + + test("BisectingKMeans with Array input") { +val featuresColNameD = "array_double_features" +val featuresColNameF = "array_float_features" +val doubleUDF = udf { (features: Vector) => + val featureArray = Array.fill[Double](features.size)(0.0) + features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) --- End diff -- * If `.toFloat` is to keep the same precision, we should leave an inline comment. * `features.toArray.map(_.toFloat.toDouble)` should do the work. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21195: [Spark-23975][ML] Add support of array input for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21195#discussion_r185984894 --- Diff: mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala --- @@ -323,4 +324,44 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead assert(model.getOptimizer === optimizer) } } + + test("LDA with Array input") { +val featuresColNameD = "array_double_features" +val featuresColNameF = "array_float_features" +val doubleUDF = udf { (features: Vector) => + val featureArray = Array.fill[Double](features.size)(0.0) + features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) + featureArray +} +val floatUDF = udf { (features: Vector) => + val featureArray = Array.fill[Float](features.size)(0.0f) + features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) + featureArray +} +val newdatasetD = dataset.withColumn(featuresColNameD, doubleUDF(col("features"))) + .drop("features") +val newdatasetF = dataset.withColumn(featuresColNameF, floatUDF(col("features"))) + .drop("features") +assert(newdatasetD.schema(featuresColNameD).dataType.equals(new ArrayType(DoubleType, false))) +assert(newdatasetF.schema(featuresColNameF).dataType.equals(new ArrayType(FloatType, false))) + +val ldaD = new LDA().setK(k).setOptimizer("online") + .setMaxIter(1).setFeaturesCol(featuresColNameD).setSeed(1) +val ldaF = new LDA().setK(k).setOptimizer("online"). + setMaxIter(1).setFeaturesCol(featuresColNameF).setSeed(1) +val modelD = ldaD.fit(newdatasetD) +val modelF = ldaF.fit(newdatasetF) + +// logLikelihood, logPerplexity +val llD = modelD.logLikelihood(newdatasetD) +val llF = modelF.logLikelihood(newdatasetF) +// assert(llD == llF) +assert(llD <= 0.0 && llD != Double.NegativeInfinity) +assert(llF <= 0.0 && llF != Double.NegativeInfinity) +val lpD = modelD.logPerplexity(newdatasetD) +val lpF = modelF.logPerplexity(newdatasetF) +// assert(lpD == lpF) +assert(lpD >= 0.0 && lpD != Double.NegativeInfinity) +assert(lpF >= 0.0 && lpF != Double.NegativeInfinity) --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21195: [Spark-23975][ML] Add support of array input for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21195#discussion_r186556119 --- Diff: mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala --- @@ -247,4 +247,21 @@ object MLTestingUtils extends SparkFunSuite { } models.sliding(2).foreach { case Seq(m1, m2) => modelEquals(m1, m2)} } + + /** + * Helper function for testing different input types for features. Given a DataFrame, generate + * three output DataFrames: one having vector feature column with float precision, one having --- End diff -- minor: should say `features` column to make the contract clear. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21195: [Spark-23975][ML] Add support of array input for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21195#discussion_r186555908 --- Diff: mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala --- @@ -323,4 +324,21 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead assert(model.getOptimizer === optimizer) } } + + test("LDA with Array input") { +def trainAndLogLikelihoodAndPerplexity(dataset: Dataset[_]): (Double, Double) = { + val model = new LDA().setK(k).setOptimizer("online").setMaxIter(1).setSeed(1).fit(dataset) + (model.logLikelihood(dataset), model.logPerplexity(dataset)) +} + +val (newDataset, newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset) +val (ll, lp) = trainAndLogLikelihoodAndPerplexity(newDataset) --- End diff -- minor: the output are not used. I expect they will be used once we fixed SPARK-22210 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21195: [Spark-23975][ML] Add support of array input for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21195#discussion_r186556808 --- Diff: mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala --- @@ -247,4 +247,21 @@ object MLTestingUtils extends SparkFunSuite { } models.sliding(2).foreach { case Seq(m1, m2) => modelEquals(m1, m2)} } + + /** + * Helper function for testing different input types for features. Given a DataFrame, generate + * three output DataFrames: one having vector feature column with float precision, one having + * double array feature column with float precision, and one having float array feature column. + */ + def generateArrayFeatureDataset(dataset: Dataset[_]): (Dataset[_], Dataset[_], Dataset[_]) = { +val toFloatVectorUDF = udf { (features: Vector) => features.toArray.map(_.toFloat).toVector} +val toDoubleArrayUDF = udf { (features: Vector) => features.toArray} +val toFloatArrayUDF = udf { (features: Vector) => features.toArray.map(_.toFloat)} +val newDataset = dataset.withColumn("features", toFloatVectorUDF(col("features"))) +val newDatasetD = dataset.withColumn("features", toDoubleArrayUDF(col("features"))) --- End diff -- This doesn't truncate the precision to single. Did you want to use `newDataset` instead of `dataset`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21195: [Spark-23975][ML] Add support of array input for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21195#discussion_r186556798 --- Diff: mllib/src/test/scala/org/apache/spark/ml/util/MLTestingUtils.scala --- @@ -247,4 +247,21 @@ object MLTestingUtils extends SparkFunSuite { } models.sliding(2).foreach { case Seq(m1, m2) => modelEquals(m1, m2)} } + + /** + * Helper function for testing different input types for features. Given a DataFrame, generate + * three output DataFrames: one having vector feature column with float precision, one having + * double array feature column with float precision, and one having float array feature column. + */ + def generateArrayFeatureDataset(dataset: Dataset[_]): (Dataset[_], Dataset[_], Dataset[_]) = { +val toFloatVectorUDF = udf { (features: Vector) => features.toArray.map(_.toFloat).toVector} +val toDoubleArrayUDF = udf { (features: Vector) => features.toArray} +val toFloatArrayUDF = udf { (features: Vector) => features.toArray.map(_.toFloat)} +val newDataset = dataset.withColumn("features", toFloatVectorUDF(col("features"))) --- End diff -- minor: maybe useful to define `"features"` as a constant at the beginning of the function --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21195: [Spark-23975][ML] Add support of array input for ...
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21195#discussion_r186555425 --- Diff: mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala --- @@ -256,6 +257,22 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext val expectedMatrix = GaussianMixture.unpackUpperTriangularMatrix(4, triangularValues) assert(symmetricMatrix === expectedMatrix) } + + test("GaussianMixture with Array input") { +def trainAndComputlogLikelihood(dataset: Dataset[_]): Double = { + val model = new GaussianMixture().setK(k).setMaxIter(1).setSeed(1).fit(dataset) + model.summary.logLikelihood +} + +val (newDataset, newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset) +val trueLikelihood = trainAndComputlogLikelihood(newDataset) +val doubleLikelihood = trainAndComputlogLikelihood(newDatasetD) +val floatLikelihood = trainAndComputlogLikelihood(newDatasetF) + +// checking the cost is fine enough as a sanity check +assert(trueLikelihood == doubleLikelihood) --- End diff -- minor: should use `===` instead of `==` for assertions, the former gives a better error message. (not necessary to update this PR) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21195: [Spark-23975][ML] Add support of array input for ...
Github user ludatabricks commented on a diff in the pull request: https://github.com/apache/spark/pull/21195#discussion_r186566521 --- Diff: mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala --- @@ -323,4 +324,21 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead assert(model.getOptimizer === optimizer) } } + + test("LDA with Array input") { +def trainAndLogLikelihoodAndPerplexity(dataset: Dataset[_]): (Double, Double) = { + val model = new LDA().setK(k).setOptimizer("online").setMaxIter(1).setSeed(1).fit(dataset) + (model.logLikelihood(dataset), model.logPerplexity(dataset)) +} + +val (newDataset, newDatasetD, newDatasetF) = MLTestingUtils.generateArrayFeatureDataset(dataset) +val (ll, lp) = trainAndLogLikelihoodAndPerplexity(newDataset) --- End diff -- Yes. I want to use this as the base for the comparison after we fix SPARK-22210. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21195: [Spark-23975][ML] Add support of array input for ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21195 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org