spark git commit: [SPARK-23046][ML][SPARKR] Have RFormula include VectorSizeHint in pipeline

2018-01-11 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 f891ee324 -> 2ec302658


[SPARK-23046][ML][SPARKR] Have RFormula include VectorSizeHint in pipeline

## What changes were proposed in this pull request?

Including VectorSizeHint in RFormula piplelines will allow them to be applied 
to streaming dataframes.

## How was this patch tested?

Unit tests.

Author: Bago Amirbekian 

Closes #20238 from MrBago/rFormulaVectorSize.

(cherry picked from commit 186bf8fb2e9ff8a80f3f6bcb5f2a0327fa79a1c9)
Signed-off-by: Joseph K. Bradley 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2ec30265
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2ec30265
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2ec30265

Branch: refs/heads/branch-2.3
Commit: 2ec302658c98038962c9b7a90fd2cff751a35ffa
Parents: f891ee3
Author: Bago Amirbekian 
Authored: Thu Jan 11 13:57:15 2018 -0800
Committer: Joseph K. Bradley 
Committed: Thu Jan 11 13:57:27 2018 -0800

--
 R/pkg/R/mllib_utils.R   |  1 +
 .../org/apache/spark/ml/feature/RFormula.scala  | 18 --
 .../apache/spark/ml/feature/RFormulaSuite.scala | 37 +---
 3 files changed, 48 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2ec30265/R/pkg/R/mllib_utils.R
--
diff --git a/R/pkg/R/mllib_utils.R b/R/pkg/R/mllib_utils.R
index a53c92c..23dda42 100644
--- a/R/pkg/R/mllib_utils.R
+++ b/R/pkg/R/mllib_utils.R
@@ -130,3 +130,4 @@ read.ml <- function(path) {
 stop("Unsupported model: ", jobj)
   }
 }
+

http://git-wip-us.apache.org/repos/asf/spark/blob/2ec30265/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
index 7da3339..f384ffb 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.annotation.{Experimental, Since}
 import org.apache.spark.ml.{Estimator, Model, Pipeline, PipelineModel, 
PipelineStage, Transformer}
 import org.apache.spark.ml.attribute.AttributeGroup
-import org.apache.spark.ml.linalg.VectorUDT
+import org.apache.spark.ml.linalg.{Vector, VectorUDT}
 import org.apache.spark.ml.param.{BooleanParam, Param, ParamMap, 
ParamValidators}
 import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasHandleInvalid, 
HasLabelCol}
 import org.apache.spark.ml.util._
@@ -210,8 +210,8 @@ class RFormula @Since("1.5.0") (@Since("1.5.0") override 
val uid: String)
 
 // First we index each string column referenced by the input terms.
 val indexed: Map[String, String] = 
resolvedFormula.terms.flatten.distinct.map { term =>
-  dataset.schema(term) match {
-case column if column.dataType == StringType =>
+  dataset.schema(term).dataType match {
+case _: StringType =>
   val indexCol = tmpColumn("stridx")
   encoderStages += new StringIndexer()
 .setInputCol(term)
@@ -220,6 +220,18 @@ class RFormula @Since("1.5.0") (@Since("1.5.0") override 
val uid: String)
 .setHandleInvalid($(handleInvalid))
   prefixesToRewrite(indexCol + "_") = term + "_"
   (term, indexCol)
+case _: VectorUDT =>
+  val group = AttributeGroup.fromStructField(dataset.schema(term))
+  val size = if (group.size < 0) {
+dataset.select(term).first().getAs[Vector](0).size
+  } else {
+group.size
+  }
+  encoderStages += new VectorSizeHint(uid)
+.setHandleInvalid("optimistic")
+.setInputCol(term)
+.setSize(size)
+  (term, term)
 case _ =>
   (term, term)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/2ec30265/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala
index 5d09c90..f3f4b5a 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala
@@ -17,15 +17,15 @@
 
 package org.apache.spark.ml.feature
 
-import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.SparkException
 import 

spark git commit: [SPARK-23046][ML][SPARKR] Have RFormula include VectorSizeHint in pipeline

2018-01-11 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 6f7aaed80 -> 186bf8fb2


[SPARK-23046][ML][SPARKR] Have RFormula include VectorSizeHint in pipeline

## What changes were proposed in this pull request?

Including VectorSizeHint in RFormula piplelines will allow them to be applied 
to streaming dataframes.

## How was this patch tested?

Unit tests.

Author: Bago Amirbekian 

Closes #20238 from MrBago/rFormulaVectorSize.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/186bf8fb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/186bf8fb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/186bf8fb

Branch: refs/heads/master
Commit: 186bf8fb2e9ff8a80f3f6bcb5f2a0327fa79a1c9
Parents: 6f7aaed
Author: Bago Amirbekian 
Authored: Thu Jan 11 13:57:15 2018 -0800
Committer: Joseph K. Bradley 
Committed: Thu Jan 11 13:57:15 2018 -0800

--
 R/pkg/R/mllib_utils.R   |  1 +
 .../org/apache/spark/ml/feature/RFormula.scala  | 18 --
 .../apache/spark/ml/feature/RFormulaSuite.scala | 37 +---
 3 files changed, 48 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/186bf8fb/R/pkg/R/mllib_utils.R
--
diff --git a/R/pkg/R/mllib_utils.R b/R/pkg/R/mllib_utils.R
index a53c92c..23dda42 100644
--- a/R/pkg/R/mllib_utils.R
+++ b/R/pkg/R/mllib_utils.R
@@ -130,3 +130,4 @@ read.ml <- function(path) {
 stop("Unsupported model: ", jobj)
   }
 }
+

http://git-wip-us.apache.org/repos/asf/spark/blob/186bf8fb/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
index 7da3339..f384ffb 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.annotation.{Experimental, Since}
 import org.apache.spark.ml.{Estimator, Model, Pipeline, PipelineModel, 
PipelineStage, Transformer}
 import org.apache.spark.ml.attribute.AttributeGroup
-import org.apache.spark.ml.linalg.VectorUDT
+import org.apache.spark.ml.linalg.{Vector, VectorUDT}
 import org.apache.spark.ml.param.{BooleanParam, Param, ParamMap, 
ParamValidators}
 import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasHandleInvalid, 
HasLabelCol}
 import org.apache.spark.ml.util._
@@ -210,8 +210,8 @@ class RFormula @Since("1.5.0") (@Since("1.5.0") override 
val uid: String)
 
 // First we index each string column referenced by the input terms.
 val indexed: Map[String, String] = 
resolvedFormula.terms.flatten.distinct.map { term =>
-  dataset.schema(term) match {
-case column if column.dataType == StringType =>
+  dataset.schema(term).dataType match {
+case _: StringType =>
   val indexCol = tmpColumn("stridx")
   encoderStages += new StringIndexer()
 .setInputCol(term)
@@ -220,6 +220,18 @@ class RFormula @Since("1.5.0") (@Since("1.5.0") override 
val uid: String)
 .setHandleInvalid($(handleInvalid))
   prefixesToRewrite(indexCol + "_") = term + "_"
   (term, indexCol)
+case _: VectorUDT =>
+  val group = AttributeGroup.fromStructField(dataset.schema(term))
+  val size = if (group.size < 0) {
+dataset.select(term).first().getAs[Vector](0).size
+  } else {
+group.size
+  }
+  encoderStages += new VectorSizeHint(uid)
+.setHandleInvalid("optimistic")
+.setInputCol(term)
+.setSize(size)
+  (term, term)
 case _ =>
   (term, term)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/186bf8fb/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala
index 5d09c90..f3f4b5a 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala
@@ -17,15 +17,15 @@
 
 package org.apache.spark.ml.feature
 
-import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.SparkException
 import org.apache.spark.ml.attribute._
-import org.apache.spark.ml.linalg.Vectors
+import org.apache.spark.ml.linalg.{Vector, Vectors}
 import