[3/4] spark git commit: [SPARK-14615][ML] Use the new ML Vector and Matrix in the ML pipeline based algorithms
http://git-wip-us.apache.org/repos/asf/spark/blob/ff1cfce1/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala index 626e97e..9d084b5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala @@ -21,11 +21,14 @@ import org.apache.hadoop.fs.Path import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml._ +import org.apache.spark.ml.linalg.{Vector, VectorUDT} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ import org.apache.spark.mllib.feature -import org.apache.spark.mllib.linalg.{Vector, VectorUDT} +import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} +import org.apache.spark.mllib.linalg.VectorImplicits._ +import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{StructField, StructType} @@ -93,7 +96,9 @@ class StandardScaler(override val uid: String) extends Estimator[StandardScalerM @Since("2.0.0") override def fit(dataset: Dataset[_]): StandardScalerModel = { transformSchema(dataset.schema, logging = true) -val input = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v } +val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map { + case Row(v: Vector) => OldVectors.fromML(v) +} val scaler = new feature.StandardScaler(withMean = $(withMean), withStd = $(withStd)) val scalerModel = scaler.fit(input) copyValues(new StandardScalerModel(uid, scalerModel.std, scalerModel.mean).setParent(this)) @@ -145,7 +150,11 @@ class StandardScalerModel private[ml] ( override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) val scaler = new feature.StandardScalerModel(std, mean, $(withStd), $(withMean)) -val scale = udf { scaler.transform _ } + +// TODO: Make the transformer natively in ml framework to avoid extra conversion. +val transformer: Vector => Vector = v => scaler.transform(OldVectors.fromML(v)).asML + +val scale = udf(transformer) dataset.withColumn($(outputCol), scale(col($(inputCol } http://git-wip-us.apache.org/repos/asf/spark/blob/ff1cfce1/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala index 4d3e46e..1bc2420 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala @@ -23,10 +23,10 @@ import org.apache.spark.SparkException import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.Transformer import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute, UnresolvedAttribute} +import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ -import org.apache.spark.mllib.linalg.{Vector, Vectors, VectorUDT} import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ http://git-wip-us.apache.org/repos/asf/spark/blob/ff1cfce1/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala index 68b699d..2bc9d22 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala @@ -27,10 +27,10 @@ import org.apache.hadoop.fs.Path import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.attribute._ +import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, VectorUDT} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ -import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, VectorUDT} import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions.udf import org.apache.spark.sql.types.{StructField, StructType}
[3/4] spark git commit: [SPARK-14615][ML] Use the new ML Vector and Matrix in the ML pipeline based algorithms
http://git-wip-us.apache.org/repos/asf/spark/blob/e2efe052/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala index 626e97e..9d084b5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala @@ -21,11 +21,14 @@ import org.apache.hadoop.fs.Path import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml._ +import org.apache.spark.ml.linalg.{Vector, VectorUDT} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ import org.apache.spark.mllib.feature -import org.apache.spark.mllib.linalg.{Vector, VectorUDT} +import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} +import org.apache.spark.mllib.linalg.VectorImplicits._ +import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{StructField, StructType} @@ -93,7 +96,9 @@ class StandardScaler(override val uid: String) extends Estimator[StandardScalerM @Since("2.0.0") override def fit(dataset: Dataset[_]): StandardScalerModel = { transformSchema(dataset.schema, logging = true) -val input = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => v } +val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map { + case Row(v: Vector) => OldVectors.fromML(v) +} val scaler = new feature.StandardScaler(withMean = $(withMean), withStd = $(withStd)) val scalerModel = scaler.fit(input) copyValues(new StandardScalerModel(uid, scalerModel.std, scalerModel.mean).setParent(this)) @@ -145,7 +150,11 @@ class StandardScalerModel private[ml] ( override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) val scaler = new feature.StandardScalerModel(std, mean, $(withStd), $(withMean)) -val scale = udf { scaler.transform _ } + +// TODO: Make the transformer natively in ml framework to avoid extra conversion. +val transformer: Vector => Vector = v => scaler.transform(OldVectors.fromML(v)).asML + +val scale = udf(transformer) dataset.withColumn($(outputCol), scale(col($(inputCol } http://git-wip-us.apache.org/repos/asf/spark/blob/e2efe052/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala index 4d3e46e..1bc2420 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala @@ -23,10 +23,10 @@ import org.apache.spark.SparkException import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.Transformer import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute, UnresolvedAttribute} +import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ -import org.apache.spark.mllib.linalg.{Vector, Vectors, VectorUDT} import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ http://git-wip-us.apache.org/repos/asf/spark/blob/e2efe052/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala index 68b699d..2bc9d22 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala @@ -27,10 +27,10 @@ import org.apache.hadoop.fs.Path import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.attribute._ +import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, VectorUDT} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ -import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, VectorUDT} import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions.udf import org.apache.spark.sql.types.{StructField, StructType}