spark git commit: [SPARK-23046][ML][SPARKR] Have RFormula include VectorSizeHint in pipeline
Repository: spark Updated Branches: refs/heads/branch-2.3 f891ee324 -> 2ec302658 [SPARK-23046][ML][SPARKR] Have RFormula include VectorSizeHint in pipeline ## What changes were proposed in this pull request? Including VectorSizeHint in RFormula piplelines will allow them to be applied to streaming dataframes. ## How was this patch tested? Unit tests. Author: Bago AmirbekianCloses #20238 from MrBago/rFormulaVectorSize. (cherry picked from commit 186bf8fb2e9ff8a80f3f6bcb5f2a0327fa79a1c9) Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ec30265 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ec30265 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ec30265 Branch: refs/heads/branch-2.3 Commit: 2ec302658c98038962c9b7a90fd2cff751a35ffa Parents: f891ee3 Author: Bago Amirbekian Authored: Thu Jan 11 13:57:15 2018 -0800 Committer: Joseph K. Bradley Committed: Thu Jan 11 13:57:27 2018 -0800 -- R/pkg/R/mllib_utils.R | 1 + .../org/apache/spark/ml/feature/RFormula.scala | 18 -- .../apache/spark/ml/feature/RFormulaSuite.scala | 37 +--- 3 files changed, 48 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2ec30265/R/pkg/R/mllib_utils.R -- diff --git a/R/pkg/R/mllib_utils.R b/R/pkg/R/mllib_utils.R index a53c92c..23dda42 100644 --- a/R/pkg/R/mllib_utils.R +++ b/R/pkg/R/mllib_utils.R @@ -130,3 +130,4 @@ read.ml <- function(path) { stop("Unsupported model: ", jobj) } } + http://git-wip-us.apache.org/repos/asf/spark/blob/2ec30265/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala index 7da3339..f384ffb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.{Estimator, Model, Pipeline, PipelineModel, PipelineStage, Transformer} import org.apache.spark.ml.attribute.AttributeGroup -import org.apache.spark.ml.linalg.VectorUDT +import org.apache.spark.ml.linalg.{Vector, VectorUDT} import org.apache.spark.ml.param.{BooleanParam, Param, ParamMap, ParamValidators} import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasHandleInvalid, HasLabelCol} import org.apache.spark.ml.util._ @@ -210,8 +210,8 @@ class RFormula @Since("1.5.0") (@Since("1.5.0") override val uid: String) // First we index each string column referenced by the input terms. val indexed: Map[String, String] = resolvedFormula.terms.flatten.distinct.map { term => - dataset.schema(term) match { -case column if column.dataType == StringType => + dataset.schema(term).dataType match { +case _: StringType => val indexCol = tmpColumn("stridx") encoderStages += new StringIndexer() .setInputCol(term) @@ -220,6 +220,18 @@ class RFormula @Since("1.5.0") (@Since("1.5.0") override val uid: String) .setHandleInvalid($(handleInvalid)) prefixesToRewrite(indexCol + "_") = term + "_" (term, indexCol) +case _: VectorUDT => + val group = AttributeGroup.fromStructField(dataset.schema(term)) + val size = if (group.size < 0) { +dataset.select(term).first().getAs[Vector](0).size + } else { +group.size + } + encoderStages += new VectorSizeHint(uid) +.setHandleInvalid("optimistic") +.setInputCol(term) +.setSize(size) + (term, term) case _ => (term, term) } http://git-wip-us.apache.org/repos/asf/spark/blob/2ec30265/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala index 5d09c90..f3f4b5a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala @@ -17,15 +17,15 @@ package org.apache.spark.ml.feature -import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.SparkException import
spark git commit: [SPARK-23046][ML][SPARKR] Have RFormula include VectorSizeHint in pipeline
Repository: spark Updated Branches: refs/heads/master 6f7aaed80 -> 186bf8fb2 [SPARK-23046][ML][SPARKR] Have RFormula include VectorSizeHint in pipeline ## What changes were proposed in this pull request? Including VectorSizeHint in RFormula piplelines will allow them to be applied to streaming dataframes. ## How was this patch tested? Unit tests. Author: Bago AmirbekianCloses #20238 from MrBago/rFormulaVectorSize. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/186bf8fb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/186bf8fb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/186bf8fb Branch: refs/heads/master Commit: 186bf8fb2e9ff8a80f3f6bcb5f2a0327fa79a1c9 Parents: 6f7aaed Author: Bago Amirbekian Authored: Thu Jan 11 13:57:15 2018 -0800 Committer: Joseph K. Bradley Committed: Thu Jan 11 13:57:15 2018 -0800 -- R/pkg/R/mllib_utils.R | 1 + .../org/apache/spark/ml/feature/RFormula.scala | 18 -- .../apache/spark/ml/feature/RFormulaSuite.scala | 37 +--- 3 files changed, 48 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/186bf8fb/R/pkg/R/mllib_utils.R -- diff --git a/R/pkg/R/mllib_utils.R b/R/pkg/R/mllib_utils.R index a53c92c..23dda42 100644 --- a/R/pkg/R/mllib_utils.R +++ b/R/pkg/R/mllib_utils.R @@ -130,3 +130,4 @@ read.ml <- function(path) { stop("Unsupported model: ", jobj) } } + http://git-wip-us.apache.org/repos/asf/spark/blob/186bf8fb/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala index 7da3339..f384ffb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.{Estimator, Model, Pipeline, PipelineModel, PipelineStage, Transformer} import org.apache.spark.ml.attribute.AttributeGroup -import org.apache.spark.ml.linalg.VectorUDT +import org.apache.spark.ml.linalg.{Vector, VectorUDT} import org.apache.spark.ml.param.{BooleanParam, Param, ParamMap, ParamValidators} import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasHandleInvalid, HasLabelCol} import org.apache.spark.ml.util._ @@ -210,8 +210,8 @@ class RFormula @Since("1.5.0") (@Since("1.5.0") override val uid: String) // First we index each string column referenced by the input terms. val indexed: Map[String, String] = resolvedFormula.terms.flatten.distinct.map { term => - dataset.schema(term) match { -case column if column.dataType == StringType => + dataset.schema(term).dataType match { +case _: StringType => val indexCol = tmpColumn("stridx") encoderStages += new StringIndexer() .setInputCol(term) @@ -220,6 +220,18 @@ class RFormula @Since("1.5.0") (@Since("1.5.0") override val uid: String) .setHandleInvalid($(handleInvalid)) prefixesToRewrite(indexCol + "_") = term + "_" (term, indexCol) +case _: VectorUDT => + val group = AttributeGroup.fromStructField(dataset.schema(term)) + val size = if (group.size < 0) { +dataset.select(term).first().getAs[Vector](0).size + } else { +group.size + } + encoderStages += new VectorSizeHint(uid) +.setHandleInvalid("optimistic") +.setInputCol(term) +.setSize(size) + (term, term) case _ => (term, term) } http://git-wip-us.apache.org/repos/asf/spark/blob/186bf8fb/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala index 5d09c90..f3f4b5a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala @@ -17,15 +17,15 @@ package org.apache.spark.ml.feature -import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.SparkException import org.apache.spark.ml.attribute._ -import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.linalg.{Vector, Vectors} import