This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 074712e [SPARK-30762] Add dtype=float32 support to vector_to_array UDF 074712e is described below commit 074712e329b347f769f8c009949c7845e95b3212 Author: Liang Zhang <liang.zh...@databricks.com> AuthorDate: Thu Feb 13 23:55:13 2020 +0800 [SPARK-30762] Add dtype=float32 support to vector_to_array UDF ### What changes were proposed in this pull request? In this PR, we add a parameter in the python function vector_to_array(col) that allows converting to a column of arrays of Float (32bits) in scala, which would be mapped to a numpy array of dtype=float32. ### Why are the changes needed? In the downstream ML training, using float32 instead of float64 (default) would allow a larger batch size, i.e., allow more data to fit in the memory. ### Does this PR introduce any user-facing change? Yes. Old: `vector_to_array()` only take one param ``` df.select(vector_to_array("colA"), ...) ``` New: `vector_to_array()` can take an additional optional param: `dtype` = "float32" (or "float64") ``` df.select(vector_to_array("colA", "float32"), ...) ``` ### How was this patch tested? Unit test in scala. doctest in python. Closes #27522 from liangz1/udf-float32. Authored-by: Liang Zhang <liang.zh...@databricks.com> Signed-off-by: WeichenXu <weichen...@databricks.com> (cherry picked from commit 82d0aa37ae521231d8067e473c6ea79a118a115a) Signed-off-by: WeichenXu <weichen...@databricks.com> --- .../main/scala/org/apache/spark/ml/functions.scala | 34 +++++++++++++++++++--- .../scala/org/apache/spark/ml/FunctionsSuite.scala | 33 ++++++++++++++++++--- python/pyspark/ml/functions.py | 27 +++++++++++++---- 3 files changed, 81 insertions(+), 13 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/functions.scala b/mllib/src/main/scala/org/apache/spark/ml/functions.scala index 1faf562..0f03231 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/functions.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/functions.scala @@ -18,7 +18,7 @@ package org.apache.spark.ml import org.apache.spark.annotation.Since -import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.linalg.{SparseVector, Vector} import org.apache.spark.mllib.linalg.{Vector => OldVector} import org.apache.spark.sql.Column import org.apache.spark.sql.functions.udf @@ -27,7 +27,6 @@ import org.apache.spark.sql.functions.udf @Since("3.0.0") object functions { // scalastyle:on - private val vectorToArrayUdf = udf { vec: Any => vec match { case v: Vector => v.toArray @@ -39,10 +38,37 @@ object functions { } }.asNonNullable() + private val vectorToArrayFloatUdf = udf { vec: Any => + vec match { + case v: SparseVector => + val data = new Array[Float](v.size) + v.foreachActive { (index, value) => data(index) = value.toFloat } + data + case v: Vector => v.toArray.map(_.toFloat) + case v: OldVector => v.toArray.map(_.toFloat) + case v => throw new IllegalArgumentException( + "function vector_to_array requires a non-null input argument and input type must be " + + "`org.apache.spark.ml.linalg.Vector` or `org.apache.spark.mllib.linalg.Vector`, " + + s"but got ${ if (v == null) "null" else v.getClass.getName }.") + } + }.asNonNullable() + /** * Converts a column of MLlib sparse/dense vectors into a column of dense arrays. - * + * @param v: the column of MLlib sparse/dense vectors + * @param dtype: the desired underlying data type in the returned array + * @return an array<float> if dtype is float32, or array<double> if dtype is float64 * @since 3.0.0 */ - def vector_to_array(v: Column): Column = vectorToArrayUdf(v) + def vector_to_array(v: Column, dtype: String = "float64"): Column = { + if (dtype == "float64") { + vectorToArrayUdf(v) + } else if (dtype == "float32") { + vectorToArrayFloatUdf(v) + } else { + throw new IllegalArgumentException( + s"Unsupported dtype: $dtype. Valid values: float64, float32." + ) + } + } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala index 2f5062c..3dd9a7d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala @@ -34,9 +34,8 @@ class FunctionsSuite extends MLTest { (Vectors.sparse(3, Seq((0, 2.0), (2, 3.0))), OldVectors.sparse(3, Seq((0, 20.0), (2, 30.0)))) ).toDF("vec", "oldVec") - val result = df.select(vector_to_array('vec), vector_to_array('oldVec)) - .as[(Seq[Double], Seq[Double])] - .collect().toSeq + val result = df.select(vector_to_array('vec), vector_to_array('oldVec)) + .as[(Seq[Double], Seq[Double])].collect().toSeq val expected = Seq( (Seq(1.0, 2.0, 3.0), Seq(10.0, 20.0, 30.0)), @@ -50,7 +49,6 @@ class FunctionsSuite extends MLTest { (null, null, 0) ).toDF("vec", "oldVec", "label") - for ((colName, valType) <- Seq( ("vec", "null"), ("oldVec", "null"), ("label", "java.lang.Integer"))) { val thrown1 = intercept[SparkException] { @@ -61,5 +59,32 @@ class FunctionsSuite extends MLTest { "`org.apache.spark.ml.linalg.Vector` or `org.apache.spark.mllib.linalg.Vector`, " + s"but got ${valType}")) } + + val df3 = Seq( + (Vectors.dense(1.0, 2.0, 3.0), OldVectors.dense(10.0, 20.0, 30.0)), + (Vectors.sparse(3, Seq((0, 2.0), (2, 3.0))), OldVectors.sparse(3, Seq((0, 20.0), (2, 30.0)))) + ).toDF("vec", "oldVec") + val dfArrayFloat = df3.select( + vector_to_array('vec, dtype = "float32"), vector_to_array('oldVec, dtype = "float32")) + + // Check values are correct + val result3 = dfArrayFloat.as[(Seq[Float], Seq[Float])].collect().toSeq + + val expected3 = Seq( + (Seq(1.0, 2.0, 3.0), Seq(10.0, 20.0, 30.0)), + (Seq(2.0, 0.0, 3.0), Seq(20.0, 0.0, 30.0)) + ) + assert(result3 === expected3) + + // Check data types are correct + assert(dfArrayFloat.schema.simpleString === + "struct<UDF(vec):array<float>,UDF(oldVec):array<float>>") + + val thrown2 = intercept[IllegalArgumentException] { + df3.select( + vector_to_array('vec, dtype = "float16"), vector_to_array('oldVec, dtype = "float16")) + } + assert(thrown2.getMessage.contains( + s"Unsupported dtype: float16. Valid values: float64, float32.")) } } diff --git a/python/pyspark/ml/functions.py b/python/pyspark/ml/functions.py index 2b4d8dd..ec164f3 100644 --- a/python/pyspark/ml/functions.py +++ b/python/pyspark/ml/functions.py @@ -19,10 +19,15 @@ from pyspark import since, SparkContext from pyspark.sql.column import Column, _to_java_column -@since(3.0) -def vector_to_array(col): +@since("3.0.0") +def vector_to_array(col, dtype="float64"): """ Converts a column of MLlib sparse/dense vectors into a column of dense arrays. + :param col: A string of the column name or a Column + :param dtype: The data type of the output array. Valid values: "float64" or "float32". + :return: The converted column of dense arrays. + + .. versionadded:: 3.0.0 >>> from pyspark.ml.linalg import Vectors >>> from pyspark.ml.functions import vector_to_array @@ -32,14 +37,26 @@ def vector_to_array(col): ... (Vectors.sparse(3, [(0, 2.0), (2, 3.0)]), ... OldVectors.sparse(3, [(0, 20.0), (2, 30.0)]))], ... ["vec", "oldVec"]) - >>> df.select(vector_to_array("vec").alias("vec"), - ... vector_to_array("oldVec").alias("oldVec")).collect() + >>> df1 = df.select(vector_to_array("vec").alias("vec"), + ... vector_to_array("oldVec").alias("oldVec")) + >>> df1.collect() + [Row(vec=[1.0, 2.0, 3.0], oldVec=[10.0, 20.0, 30.0]), + Row(vec=[2.0, 0.0, 3.0], oldVec=[20.0, 0.0, 30.0])] + >>> df2 = df.select(vector_to_array("vec", "float32").alias("vec"), + ... vector_to_array("oldVec", "float32").alias("oldVec")) + >>> df2.collect() [Row(vec=[1.0, 2.0, 3.0], oldVec=[10.0, 20.0, 30.0]), Row(vec=[2.0, 0.0, 3.0], oldVec=[20.0, 0.0, 30.0])] + >>> df1.schema.fields + [StructField(vec,ArrayType(DoubleType,false),false), + StructField(oldVec,ArrayType(DoubleType,false),false)] + >>> df2.schema.fields + [StructField(vec,ArrayType(FloatType,false),false), + StructField(oldVec,ArrayType(FloatType,false),false)] """ sc = SparkContext._active_spark_context return Column( - sc._jvm.org.apache.spark.ml.functions.vector_to_array(_to_java_column(col))) + sc._jvm.org.apache.spark.ml.functions.vector_to_array(_to_java_column(col), dtype)) def _test(): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org