This is an automated email from the ASF dual-hosted git repository. meng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 88542bc [SPARK-30154][ML] PySpark UDF to convert MLlib vectors to dense arrays 88542bc is described below commit 88542bc3d9e506b1a0e852f3e9c632920d3fe553 Author: WeichenXu <weichen...@databricks.com> AuthorDate: Mon Jan 6 16:18:51 2020 -0800 [SPARK-30154][ML] PySpark UDF to convert MLlib vectors to dense arrays ### What changes were proposed in this pull request? PySpark UDF to convert MLlib vectors to dense arrays. Example: ``` from pyspark.ml.functions import vector_to_array df.select(vector_to_array(col("features")) ``` ### Why are the changes needed? If a PySpark user wants to convert MLlib sparse/dense vectors in a DataFrame into dense arrays, an efficient approach is to do that in JVM. However, it requires PySpark user to write Scala code and register it as a UDF. Often this is infeasible for a pure python project. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? UT. Closes #26910 from WeichenXu123/vector_to_array. Authored-by: WeichenXu <weichen...@databricks.com> Signed-off-by: Xiangrui Meng <m...@databricks.com> --- dev/sparktestsupport/modules.py | 1 + .../main/scala/org/apache/spark/ml/functions.scala | 48 +++++++++++++++ .../scala/org/apache/spark/ml/FunctionsSuite.scala | 65 +++++++++++++++++++++ python/docs/pyspark.ml.rst | 8 +++ python/pyspark/ml/functions.py | 68 ++++++++++++++++++++++ 5 files changed, 190 insertions(+) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 1443584..4179359 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -460,6 +460,7 @@ pyspark_ml = Module( "pyspark.ml.evaluation", "pyspark.ml.feature", "pyspark.ml.fpm", + "pyspark.ml.functions", "pyspark.ml.image", "pyspark.ml.linalg.__init__", "pyspark.ml.recommendation", diff --git a/mllib/src/main/scala/org/apache/spark/ml/functions.scala b/mllib/src/main/scala/org/apache/spark/ml/functions.scala new file mode 100644 index 0000000..1faf562 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/functions.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml + +import org.apache.spark.annotation.Since +import org.apache.spark.ml.linalg.Vector +import org.apache.spark.mllib.linalg.{Vector => OldVector} +import org.apache.spark.sql.Column +import org.apache.spark.sql.functions.udf + +// scalastyle:off +@Since("3.0.0") +object functions { +// scalastyle:on + + private val vectorToArrayUdf = udf { vec: Any => + vec match { + case v: Vector => v.toArray + case v: OldVector => v.toArray + case v => throw new IllegalArgumentException( + "function vector_to_array requires a non-null input argument and input type must be " + + "`org.apache.spark.ml.linalg.Vector` or `org.apache.spark.mllib.linalg.Vector`, " + + s"but got ${ if (v == null) "null" else v.getClass.getName }.") + } + }.asNonNullable() + + /** + * Converts a column of MLlib sparse/dense vectors into a column of dense arrays. + * + * @since 3.0.0 + */ + def vector_to_array(v: Column): Column = vectorToArrayUdf(v) +} diff --git a/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala new file mode 100644 index 0000000..2f5062c --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml + +import org.apache.spark.SparkException +import org.apache.spark.ml.functions.vector_to_array +import org.apache.spark.ml.linalg.Vectors +import org.apache.spark.ml.util.MLTest +import org.apache.spark.mllib.linalg.{Vectors => OldVectors} +import org.apache.spark.sql.functions.col + +class FunctionsSuite extends MLTest { + + import testImplicits._ + + test("test vector_to_array") { + val df = Seq( + (Vectors.dense(1.0, 2.0, 3.0), OldVectors.dense(10.0, 20.0, 30.0)), + (Vectors.sparse(3, Seq((0, 2.0), (2, 3.0))), OldVectors.sparse(3, Seq((0, 20.0), (2, 30.0)))) + ).toDF("vec", "oldVec") + + val result = df.select(vector_to_array('vec), vector_to_array('oldVec)) + .as[(Seq[Double], Seq[Double])] + .collect().toSeq + + val expected = Seq( + (Seq(1.0, 2.0, 3.0), Seq(10.0, 20.0, 30.0)), + (Seq(2.0, 0.0, 3.0), Seq(20.0, 0.0, 30.0)) + ) + assert(result === expected) + + val df2 = Seq( + (Vectors.dense(1.0, 2.0, 3.0), + OldVectors.dense(10.0, 20.0, 30.0), 1), + (null, null, 0) + ).toDF("vec", "oldVec", "label") + + + for ((colName, valType) <- Seq( + ("vec", "null"), ("oldVec", "null"), ("label", "java.lang.Integer"))) { + val thrown1 = intercept[SparkException] { + df2.select(vector_to_array(col(colName))).count + } + assert(thrown1.getCause.getMessage.contains( + "function vector_to_array requires a non-null input argument and input type must be " + + "`org.apache.spark.ml.linalg.Vector` or `org.apache.spark.mllib.linalg.Vector`, " + + s"but got ${valType}")) + } + } +} diff --git a/python/docs/pyspark.ml.rst b/python/docs/pyspark.ml.rst index 6a5d817..e31dfdd 100644 --- a/python/docs/pyspark.ml.rst +++ b/python/docs/pyspark.ml.rst @@ -41,6 +41,14 @@ pyspark.ml.clustering module :undoc-members: :inherited-members: +pyspark.ml.functions module +---------------------------- + +.. automodule:: pyspark.ml.functions + :members: + :undoc-members: + :inherited-members: + pyspark.ml.linalg module ---------------------------- diff --git a/python/pyspark/ml/functions.py b/python/pyspark/ml/functions.py new file mode 100644 index 0000000..2b4d8dd --- /dev/null +++ b/python/pyspark/ml/functions.py @@ -0,0 +1,68 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pyspark import since, SparkContext +from pyspark.sql.column import Column, _to_java_column + + +@since(3.0) +def vector_to_array(col): + """ + Converts a column of MLlib sparse/dense vectors into a column of dense arrays. + + >>> from pyspark.ml.linalg import Vectors + >>> from pyspark.ml.functions import vector_to_array + >>> from pyspark.mllib.linalg import Vectors as OldVectors + >>> df = spark.createDataFrame([ + ... (Vectors.dense(1.0, 2.0, 3.0), OldVectors.dense(10.0, 20.0, 30.0)), + ... (Vectors.sparse(3, [(0, 2.0), (2, 3.0)]), + ... OldVectors.sparse(3, [(0, 20.0), (2, 30.0)]))], + ... ["vec", "oldVec"]) + >>> df.select(vector_to_array("vec").alias("vec"), + ... vector_to_array("oldVec").alias("oldVec")).collect() + [Row(vec=[1.0, 2.0, 3.0], oldVec=[10.0, 20.0, 30.0]), + Row(vec=[2.0, 0.0, 3.0], oldVec=[20.0, 0.0, 30.0])] + """ + sc = SparkContext._active_spark_context + return Column( + sc._jvm.org.apache.spark.ml.functions.vector_to_array(_to_java_column(col))) + + +def _test(): + import doctest + from pyspark.sql import SparkSession + import pyspark.ml.functions + import sys + globs = pyspark.ml.functions.__dict__.copy() + spark = SparkSession.builder \ + .master("local[2]") \ + .appName("ml.functions tests") \ + .getOrCreate() + sc = spark.sparkContext + globs['sc'] = sc + globs['spark'] = spark + + (failure_count, test_count) = doctest.testmod( + pyspark.ml.functions, globs=globs, + optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE) + spark.stop() + if failure_count: + sys.exit(-1) + + +if __name__ == "__main__": + _test() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org