[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/18732 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143819790 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a `DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. For each group, all columns are passed together as a `pandas.DataFrame` +to the user-function and the returned `pandas.DataFrame` are combined as a `DataFrame`. +The returned `pandas.DataFrame` can be arbitrary length and its schema must match the +returnType of the pandas udf. + +This function does not support partial aggregation, and requires shuffling all the data in +the `DataFrame`. + +:param udf: A wrapped udf function returned by :meth:`pyspark.sql.functions.pandas_udf` --- End diff -- Yeah, that's ok. I just want to make sure it's absolutely clear what the input is because the user doesn't know or even care what type of object`pandas_udf` returns. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143813642 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import scala.collection.JavaConverters._ + +import org.apache.spark.TaskContext +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.types.StructType + +/** + * Physical node for [[org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsInPandas]] + * + * Rows in each group are passed to the python worker as a Arrow record batch. --- End diff -- Fixed "a Arrow -> an Arrow" Fixed "Python and Java capitalization" I am actually leaning toward keeping `pandas.DataFrame` . The preference to `pandas` is usually lower case: https://pandas.pydata.org/pandas-docs/stable/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143812619 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -44,14 +73,18 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex .map { case (attr, i) => attr.withName(s"_$i") }) +val batchSize = conf.arrowMaxRecordsPerBatch +// DO NOT use iter.grouped(). See BatchIterator. --- End diff -- Yes. The reason is explained in the docstring of BatchIterator. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143812311 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -435,6 +435,35 @@ class RelationalGroupedDataset protected[sql]( df.logicalPlan.output, df.logicalPlan)) } + + /** + * Applies a vectorized python user-defined function to each group of data. + * The user-defined function defines a transformation: `Pandas.DataFrame` -> `Pandas.DataFrame`. + * For each group, all elements in the group are passed as a `Pandas.DataFrame` and the results + * for all groups are combined into a new `DataFrame`. + * + * This function does not support partial aggregation, and requires shuffling all the data in + * the `DataFrame`. --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143810948 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a `DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. For each group, all columns are passed together as a `pandas.DataFrame` +to the user-function and the returned `pandas.DataFrame` are combined as a `DataFrame`. +The returned `pandas.DataFrame` can be arbitrary length and its schema must match the +returnType of the pandas udf. + +This function does not support partial aggregation, and requires shuffling all the data in +the `DataFrame`. + +:param udf: A wrapped udf function returned by :meth:`pyspark.sql.functions.pandas_udf` --- End diff -- I think ``` A pandas_udf returned by :meth:`pyspark.sql.functions.pandas_udf` ``` is redundant, how about ``` A function object returned by :meth:`pyspark.sql.functions.pandas_udf` ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143810736 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a `DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. For each group, all columns are passed together as a `pandas.DataFrame` +to the user-function and the returned `pandas.DataFrame` are combined as a `DataFrame`. +The returned `pandas.DataFrame` can be arbitrary length and its schema must match the --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143810539 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a `DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. For each group, all columns are passed together as a `pandas.DataFrame` +to the user-function and the returned `pandas.DataFrame` are combined as a `DataFrame`. --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143810355 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a `DataFrame`. --- End diff -- I think "pandas udf" as a word is fine. `pandas_udf` is the function name. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143809711 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a `DataFrame`. --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143803982 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import scala.collection.JavaConverters._ + +import org.apache.spark.TaskContext +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.types.StructType + +/** + * Physical node for [[org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsInPandas]] + * + * Rows in each group are passed to the python worker as a Arrow record batch. --- End diff -- a Arrow -> an Arrow minor nits: capitalize Python and Java, and change to `Pandas.DataFrame` in these paragraphs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143802697 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -44,14 +73,18 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex .map { case (attr, i) => attr.withName(s"_$i") }) +val batchSize = conf.arrowMaxRecordsPerBatch +// DO NOT use iter.grouped(). See BatchIterator. --- End diff -- Maybe put the reason not to use `grouped()`, I think it can make copies of the rows is that right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143802019 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -435,6 +435,35 @@ class RelationalGroupedDataset protected[sql]( df.logicalPlan.output, df.logicalPlan)) } + + /** + * Applies a vectorized python user-defined function to each group of data. + * The user-defined function defines a transformation: `Pandas.DataFrame` -> `Pandas.DataFrame`. + * For each group, all elements in the group are passed as a `Pandas.DataFrame` and the results + * for all groups are combined into a new `DataFrame`. + * + * This function does not support partial aggregation, and requires shuffling all the data in + * the `DataFrame`. --- End diff -- I believe for scaladoc the Spark DataFrame should be enclosed in brackets -> [[DataFrame]] --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143800589 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a `DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. For each group, all columns are passed together as a `pandas.DataFrame` +to the user-function and the returned `pandas.DataFrame` are combined as a `DataFrame`. +The returned `pandas.DataFrame` can be arbitrary length and its schema must match the +returnType of the pandas udf. + +This function does not support partial aggregation, and requires shuffling all the data in +the `DataFrame`. + +:param udf: A wrapped udf function returned by :meth:`pyspark.sql.functions.pandas_udf` --- End diff -- I think "A wrapped udf" might be confusing to the user, how about just saying "A `pandas_udf` returned by..."? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143800072 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a `DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. For each group, all columns are passed together as a `pandas.DataFrame` +to the user-function and the returned `pandas.DataFrame` are combined as a `DataFrame`. +The returned `pandas.DataFrame` can be arbitrary length and its schema must match the --- End diff -- should be "can have an arbitrary length" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143799780 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a `DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. For each group, all columns are passed together as a `pandas.DataFrame` +to the user-function and the returned `pandas.DataFrame` are combined as a `DataFrame`. --- End diff -- I think this should be plural -> `pandas.DataFrame`s are combined `DataFrame`. -> :class:`DataFrame`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143799187 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a `DataFrame`. --- End diff -- "pandas udf" to "pandas_udf" `DataFrame` to :class:`DataFrame` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143744197 --- Diff: python/pyspark/sql/functions.py --- @@ -2181,30 +2187,66 @@ def udf(f=None, returnType=StringType()): @since(2.3) def pandas_udf(f=None, returnType=StringType()): """ -Creates a :class:`Column` expression representing a user defined function (UDF) that accepts -`Pandas.Series` as input arguments and outputs a `Pandas.Series` of the same length. +Creates a vectorized user defined function (UDF). -:param f: python function if used as a standalone function +:param f: user-defined function. A python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object ->>> from pyspark.sql.types import IntegerType, StringType ->>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) ->>> @pandas_udf(returnType=StringType()) -... def to_upper(s): -... return s.str.upper() -... ->>> @pandas_udf(returnType="integer") -... def add_one(x): -... return x + 1 -... ->>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) ->>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\ -... .show() # doctest: +SKIP -+--+--++ -|slen(name)|to_upper(name)|add_one(age)| -+--+--++ -| 8| JOHN DOE| 22| -+--+--++ +The user-defined function can define one of the following transformations: + +1. One or more `pandas.Series` -> A `pandas.Series` + + This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and + :meth:`pyspark.sql.DataFrame.select`. + The returnType should be a primitive data type, e.g., `DoubleType()`. + The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`. + + >>> from pyspark.sql.types import IntegerType, StringType + >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) + >>> @pandas_udf(returnType=StringType()) + ... def to_upper(s): + ... return s.str.upper() + ... + >>> @pandas_udf(returnType="integer") + ... def add_one(x): + ... return x + 1 + ... + >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) + >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\ + ... .show() # doctest: +SKIP + +--+--++ + |slen(name)|to_upper(name)|add_one(age)| + +--+--++ + | 8| JOHN DOE| 22| + +--+--++ + +2. A `pandas.DataFrame` -> A `pandas.DataFrame` + + This udf is used with :meth:`pyspark.sql.GroupedData.apply`. --- End diff -- Change to `This udf is only used with` and added `note`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143741944 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -44,14 +73,18 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex .map { case (attr, i) => attr.withName(s"_$i") }) +val batchSize = conf.arrowMaxRecordsPerBatch +// DO NOT use iter.grouped(). See BatchIterator. +val batchIter = if (batchSize > 0) new BatchIterator(iter, batchSize) else Iterator(iter) + val columnarBatchIter = new ArrowPythonRunner( -funcs, conf.arrowMaxRecordsPerBatch, bufferSize, reuseWorker, +funcs, bufferSize, reuseWorker, PythonEvalType.SQL_PANDAS_UDF, argOffsets, schema) - .compute(iter, context.partitionId(), context) + .compute(batchIter, context.partitionId(), context) new Iterator[InternalRow] { - var currentIter = if (columnarBatchIter.hasNext) { + private var currentIter = if (columnarBatchIter.hasNext) { --- End diff -- I think so. The variable is reassigned for each columnar batch --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143740882 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression} + +/** + * Logical nodes specific to PySpark. + */ + +/** + * FlatMap groups using a udf: pandas.Dataframe -> pandas.DataFrame. --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143740773 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression} + +/** + * Logical nodes specific to PySpark. + */ --- End diff -- Removed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143740636 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -519,3 +519,4 @@ case class CoGroup( outputObjAttr: Attribute, left: LogicalPlan, right: LogicalPlan) extends BinaryNode with ObjectProducer + --- End diff -- Reverted --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143740157 --- Diff: python/pyspark/sql/tests.py --- @@ -3376,6 +3376,151 @@ def test_vectorized_udf_empty_partition(self): res = df.select(f(col('id'))) self.assertEquals(df.collect(), res.collect()) +def test_vectorized_udf_varargs(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) +f = pandas_udf(lambda *v: v[0], LongType()) +res = df.select(f(col('id'))) +self.assertEquals(df.collect(), res.collect()) + + +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyApplyTests(ReusedPySparkTestCase): +@classmethod +def setUpClass(cls): +ReusedPySparkTestCase.setUpClass() +cls.spark = SparkSession(cls.sc) + +@classmethod +def tearDownClass(cls): +ReusedPySparkTestCase.tearDownClass() +cls.spark.stop() + +def assertFramesEqual(self, expected, result): +msg = ("DataFrames are not equal: " + + ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) + + ("\n\nResult:\n%s\n%s" % (result, result.dtypes))) +self.assertTrue(expected.equals(result), msg=msg) + +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i) for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))).drop('vs') + +def test_simple(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +foo_udf = pandas_udf( +lambda df: df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id), +StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) + +result = df.groupby('id').apply(foo_udf).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True) +self.assertFramesEqual(expected, result) + +def test_decorator(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +@pandas_udf(StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) +def foo(df): +return df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id) + +result = df.groupby('id').apply(foo).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True) +self.assertFramesEqual(expected, result) + +def test_coerce(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +foo = pandas_udf( +lambda df: df, +StructType([StructField('id', LongType()), StructField('v', DoubleType())])) + +result = df.groupby('id').apply(foo).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True) +expected = expected.assign(v=expected.v.astype('float64')) +self.assertFramesEqual(expected, result) + +def test_complex_groupby(self): +from pyspark.sql.functions import pandas_udf, col +df = self.data + +@pandas_udf(StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('norm', DoubleType())])) +def normalize(pdf): +v = pdf.v +return pdf.assign(norm=(v - v.mean()) / v.std()) + +result = df.groupby(col('id') % 2 == 0).apply(normalize).sort('id', 'v').toPandas() +pdf = df.toPandas() +expected = pdf.groupby(pdf['id'] % 2 == 0).apply(normalize.func) +expected = expected.sort_values(['id', 'v']).reset_index(drop=True) +expected = expected.assign(norm=expected.norm.astype('float64')) +self.assertFramesEqual(expected, result) + +def test_empty_groupby(self): +from pyspark.sql.functions import pandas_udf, col +df = self.data + +@pandas_udf(StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), +
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143740129 --- Diff: python/pyspark/sql/tests.py --- @@ -3376,6 +3376,151 @@ def test_vectorized_udf_empty_partition(self): res = df.select(f(col('id'))) self.assertEquals(df.collect(), res.collect()) +def test_vectorized_udf_varargs(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) +f = pandas_udf(lambda *v: v[0], LongType()) +res = df.select(f(col('id'))) +self.assertEquals(df.collect(), res.collect()) + + +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyApplyTests(ReusedPySparkTestCase): +@classmethod +def setUpClass(cls): +ReusedPySparkTestCase.setUpClass() +cls.spark = SparkSession(cls.sc) + +@classmethod +def tearDownClass(cls): +ReusedPySparkTestCase.tearDownClass() +cls.spark.stop() + +def assertFramesEqual(self, expected, result): +msg = ("DataFrames are not equal: " + + ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) + + ("\n\nResult:\n%s\n%s" % (result, result.dtypes))) +self.assertTrue(expected.equals(result), msg=msg) + +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i) for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))).drop('vs') + +def test_simple(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +foo_udf = pandas_udf( +lambda df: df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id), +StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) + +result = df.groupby('id').apply(foo_udf).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True) +self.assertFramesEqual(expected, result) + +def test_decorator(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +@pandas_udf(StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) +def foo(df): +return df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id) + +result = df.groupby('id').apply(foo).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True) +self.assertFramesEqual(expected, result) + +def test_coerce(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +foo = pandas_udf( +lambda df: df, --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143740078 --- Diff: python/pyspark/sql/tests.py --- @@ -3376,6 +3376,151 @@ def test_vectorized_udf_empty_partition(self): res = df.select(f(col('id'))) self.assertEquals(df.collect(), res.collect()) +def test_vectorized_udf_varargs(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) +f = pandas_udf(lambda *v: v[0], LongType()) +res = df.select(f(col('id'))) +self.assertEquals(df.collect(), res.collect()) + + +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyApplyTests(ReusedPySparkTestCase): +@classmethod +def setUpClass(cls): +ReusedPySparkTestCase.setUpClass() +cls.spark = SparkSession(cls.sc) + +@classmethod +def tearDownClass(cls): +ReusedPySparkTestCase.tearDownClass() +cls.spark.stop() + +def assertFramesEqual(self, expected, result): +msg = ("DataFrames are not equal: " + + ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) + + ("\n\nResult:\n%s\n%s" % (result, result.dtypes))) +self.assertTrue(expected.equals(result), msg=msg) + +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i) for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))).drop('vs') + +def test_simple(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +foo_udf = pandas_udf( +lambda df: df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id), +StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) + +result = df.groupby('id').apply(foo_udf).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True) +self.assertFramesEqual(expected, result) + +def test_decorator(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +@pandas_udf(StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) +def foo(df): +return df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id) --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143646922 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression} + +/** + * Logical nodes specific to PySpark. + */ + +/** + * FlatMap groups using a udf: pandas.Dataframe -> pandas.DataFrame. --- End diff -- tiny typo: a -> an. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143646526 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression} + +/** + * Logical nodes specific to PySpark. + */ + +/** + * FlatMap groups using a udf: pandas.Dataframe -> pandas.DataFrame. + * This is used by DataFrame.groupby().apply(). + */ +case class FlatMapGroupsInPandas( --- End diff -- Actually it is moved out `object.scala` by a previous comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143640334 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression} + +/** + * Logical nodes specific to PySpark. + */ + +/** + * FlatMap groups using a udf: pandas.Dataframe -> pandas.DataFrame. + * This is used by DataFrame.groupby().apply(). + */ +case class FlatMapGroupsInPandas( --- End diff -- shall we move this class to `object.scala`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143640650 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -44,14 +73,18 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex .map { case (attr, i) => attr.withName(s"_$i") }) +val batchSize = conf.arrowMaxRecordsPerBatch +// DO NOT use iter.grouped(). See BatchIterator. +val batchIter = if (batchSize > 0) new BatchIterator(iter, batchSize) else Iterator(iter) + val columnarBatchIter = new ArrowPythonRunner( -funcs, conf.arrowMaxRecordsPerBatch, bufferSize, reuseWorker, +funcs, bufferSize, reuseWorker, PythonEvalType.SQL_PANDAS_UDF, argOffsets, schema) - .compute(iter, context.partitionId(), context) + .compute(batchIter, context.partitionId(), context) new Iterator[InternalRow] { - var currentIter = if (columnarBatchIter.hasNext) { + private var currentIter = if (columnarBatchIter.hasNext) { --- End diff -- does it need to be var? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143630635 --- Diff: python/pyspark/sql/tests.py --- @@ -3376,6 +3376,151 @@ def test_vectorized_udf_empty_partition(self): res = df.select(f(col('id'))) self.assertEquals(df.collect(), res.collect()) +def test_vectorized_udf_varargs(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) +f = pandas_udf(lambda *v: v[0], LongType()) +res = df.select(f(col('id'))) +self.assertEquals(df.collect(), res.collect()) + + +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyApplyTests(ReusedPySparkTestCase): +@classmethod +def setUpClass(cls): +ReusedPySparkTestCase.setUpClass() +cls.spark = SparkSession(cls.sc) + +@classmethod +def tearDownClass(cls): +ReusedPySparkTestCase.tearDownClass() +cls.spark.stop() + +def assertFramesEqual(self, expected, result): +msg = ("DataFrames are not equal: " + + ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) + + ("\n\nResult:\n%s\n%s" % (result, result.dtypes))) +self.assertTrue(expected.equals(result), msg=msg) + +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i) for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))).drop('vs') + +def test_simple(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +foo_udf = pandas_udf( +lambda df: df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id), +StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) + +result = df.groupby('id').apply(foo_udf).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True) +self.assertFramesEqual(expected, result) + +def test_decorator(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +@pandas_udf(StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) +def foo(df): +return df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id) + +result = df.groupby('id').apply(foo).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True) +self.assertFramesEqual(expected, result) + +def test_coerce(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +foo = pandas_udf( +lambda df: df, +StructType([StructField('id', LongType()), StructField('v', DoubleType())])) + +result = df.groupby('id').apply(foo).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True) +expected = expected.assign(v=expected.v.astype('float64')) +self.assertFramesEqual(expected, result) + +def test_complex_groupby(self): +from pyspark.sql.functions import pandas_udf, col +df = self.data + +@pandas_udf(StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('norm', DoubleType())])) +def normalize(pdf): +v = pdf.v +return pdf.assign(norm=(v - v.mean()) / v.std()) + +result = df.groupby(col('id') % 2 == 0).apply(normalize).sort('id', 'v').toPandas() +pdf = df.toPandas() +expected = pdf.groupby(pdf['id'] % 2 == 0).apply(normalize.func) +expected = expected.sort_values(['id', 'v']).reset_index(drop=True) +expected = expected.assign(norm=expected.norm.astype('float64')) +self.assertFramesEqual(expected, result) + +def test_empty_groupby(self): +from pyspark.sql.functions import pandas_udf, col +df = self.data + +@pandas_udf(StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), +
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143630813 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -519,3 +519,4 @@ case class CoGroup( outputObjAttr: Attribute, left: LogicalPlan, right: LogicalPlan) extends BinaryNode with ObjectProducer + --- End diff -- little nit: let's remove other changes here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143630505 --- Diff: python/pyspark/sql/tests.py --- @@ -3376,6 +3376,151 @@ def test_vectorized_udf_empty_partition(self): res = df.select(f(col('id'))) self.assertEquals(df.collect(), res.collect()) +def test_vectorized_udf_varargs(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) +f = pandas_udf(lambda *v: v[0], LongType()) +res = df.select(f(col('id'))) +self.assertEquals(df.collect(), res.collect()) + + +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyApplyTests(ReusedPySparkTestCase): +@classmethod +def setUpClass(cls): +ReusedPySparkTestCase.setUpClass() +cls.spark = SparkSession(cls.sc) + +@classmethod +def tearDownClass(cls): +ReusedPySparkTestCase.tearDownClass() +cls.spark.stop() + +def assertFramesEqual(self, expected, result): +msg = ("DataFrames are not equal: " + + ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) + + ("\n\nResult:\n%s\n%s" % (result, result.dtypes))) +self.assertTrue(expected.equals(result), msg=msg) + +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i) for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))).drop('vs') + +def test_simple(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +foo_udf = pandas_udf( +lambda df: df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id), +StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) + +result = df.groupby('id').apply(foo_udf).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True) +self.assertFramesEqual(expected, result) + +def test_decorator(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +@pandas_udf(StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) +def foo(df): +return df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id) + +result = df.groupby('id').apply(foo).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True) +self.assertFramesEqual(expected, result) + +def test_coerce(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +foo = pandas_udf( +lambda df: df, --- End diff -- ditto: `df` -> `pdf` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143629848 --- Diff: python/pyspark/sql/functions.py --- @@ -2181,30 +2187,66 @@ def udf(f=None, returnType=StringType()): @since(2.3) def pandas_udf(f=None, returnType=StringType()): """ -Creates a :class:`Column` expression representing a user defined function (UDF) that accepts -`Pandas.Series` as input arguments and outputs a `Pandas.Series` of the same length. +Creates a vectorized user defined function (UDF). -:param f: python function if used as a standalone function +:param f: user-defined function. A python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object ->>> from pyspark.sql.types import IntegerType, StringType ->>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) ->>> @pandas_udf(returnType=StringType()) -... def to_upper(s): -... return s.str.upper() -... ->>> @pandas_udf(returnType="integer") -... def add_one(x): -... return x + 1 -... ->>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) ->>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\ -... .show() # doctest: +SKIP -+--+--++ -|slen(name)|to_upper(name)|add_one(age)| -+--+--++ -| 8| JOHN DOE| 22| -+--+--++ +The user-defined function can define one of the following transformations: + +1. One or more `pandas.Series` -> A `pandas.Series` + + This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and + :meth:`pyspark.sql.DataFrame.select`. + The returnType should be a primitive data type, e.g., `DoubleType()`. + The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`. + + >>> from pyspark.sql.types import IntegerType, StringType + >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) + >>> @pandas_udf(returnType=StringType()) + ... def to_upper(s): + ... return s.str.upper() + ... + >>> @pandas_udf(returnType="integer") + ... def add_one(x): + ... return x + 1 + ... + >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) + >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\ + ... .show() # doctest: +SKIP + +--+--++ + |slen(name)|to_upper(name)|add_one(age)| + +--+--++ + | 8| JOHN DOE| 22| + +--+--++ + +2. A `pandas.DataFrame` -> A `pandas.DataFrame` + + This udf is used with :meth:`pyspark.sql.GroupedData.apply`. --- End diff -- Maybe, `This udf is used with` -> `This udf is only used with` or .. probably we should add a `note` here. If I didn't know the context here, I'd wonder why it does not work as normal pandas udf .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143630469 --- Diff: python/pyspark/sql/tests.py --- @@ -3376,6 +3376,151 @@ def test_vectorized_udf_empty_partition(self): res = df.select(f(col('id'))) self.assertEquals(df.collect(), res.collect()) +def test_vectorized_udf_varargs(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) +f = pandas_udf(lambda *v: v[0], LongType()) +res = df.select(f(col('id'))) +self.assertEquals(df.collect(), res.collect()) + + +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyApplyTests(ReusedPySparkTestCase): +@classmethod +def setUpClass(cls): +ReusedPySparkTestCase.setUpClass() +cls.spark = SparkSession(cls.sc) + +@classmethod +def tearDownClass(cls): +ReusedPySparkTestCase.tearDownClass() +cls.spark.stop() + +def assertFramesEqual(self, expected, result): +msg = ("DataFrames are not equal: " + + ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) + + ("\n\nResult:\n%s\n%s" % (result, result.dtypes))) +self.assertTrue(expected.equals(result), msg=msg) + +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i) for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))).drop('vs') + +def test_simple(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +foo_udf = pandas_udf( +lambda df: df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id), +StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) + +result = df.groupby('id').apply(foo_udf).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True) +self.assertFramesEqual(expected, result) + +def test_decorator(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +@pandas_udf(StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) +def foo(df): +return df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id) --- End diff -- little nit: I'd call id `pdf` partly to avoid shadowing `df` and partly to say `pd.DataFrame`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143630939 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression} + +/** + * Logical nodes specific to PySpark. + */ --- End diff -- little nit: I'd remove this comment. I think the name already implies what this file contains. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143622623 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -435,6 +435,35 @@ class RelationalGroupedDataset protected[sql]( df.logicalPlan.output, df.logicalPlan)) } + + /** + * Applies a vectorized python use-defined function to each group of data. --- End diff -- Thanks! Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143622617 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import scala.collection.JavaConverters._ + +import org.apache.spark.TaskContext +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.types.StructType + +/** + * Physical node for [[org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsInPandas]] + * + * Rows in each group are passed to the python worker as a Arrow record batch. + * The python worker turns the record batch to a pandas.DataFrame, invoke the + * use-defined function, and passes the resulting pandas.DataFrame --- End diff -- Thanks! Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143614190 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -435,6 +435,35 @@ class RelationalGroupedDataset protected[sql]( df.logicalPlan.output, df.logicalPlan)) } + + /** + * Applies a vectorized python use-defined function to each group of data. --- End diff -- nit: `use-defined` -> `user-defined` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143614283 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import scala.collection.JavaConverters._ + +import org.apache.spark.TaskContext +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.types.StructType + +/** + * Physical node for [[org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsInPandas]] + * + * Rows in each group are passed to the python worker as a Arrow record batch. + * The python worker turns the record batch to a pandas.DataFrame, invoke the + * use-defined function, and passes the resulting pandas.DataFrame --- End diff -- nit: `use-defined` -> `user-defined` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143507748 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -519,3 +519,18 @@ case class CoGroup( outputObjAttr: Attribute, left: LogicalPlan, right: LogicalPlan) extends BinaryNode with ObjectProducer + +case class FlatMapGroupsInPandas( --- End diff -- @HyukjinKwon Thanks for catching this. I added docs for `FlatMapGroupsInPandas` (function) `FlatMapGroupsInPandas` (logical node) and `FlatMapGroupsInPandasExec` and cross referenced them. @rxin I created file `pythonLogicalOperators` under `org.apache.spark.sql.catalyst.plans.logical` and move `FlatMapGroupsInPandas` under that file. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143506845 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. For each group, all columns are passed together as a `pandas.DataFrame` +to the user-function and the returned `pandas.DataFrame` are combined as a +:class:`DataFrame`. The returned `pandas.DataFrame` can be arbitrary length and its schema +must match the returnType of the pandas udf. + +:param udf: A wrapped udf function returned by :meth:`pyspark.sql.functions.pandas_udf` + +>>> from pyspark.sql.functions import pandas_udf +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP ++---+---+ +| id| v| ++---+---+ +| 1|-0.7071067811865475| +| 1| 0.7071067811865475| +| 2|-0.8320502943378437| +| 2|-0.2773500981126146| +| 2| 1.1094003924504583| ++---+---+ + +.. seealso:: :meth:`pyspark.sql.functions.pandas_udf` + +""" +from pyspark.sql.functions import pandas_udf + +# Columns are special because hasattr always return True +if isinstance(udf, Column) or not hasattr(udf, 'func') or not udf.vectorized: +raise ValueError("The argument to apply must be a pandas_udf") +if not isinstance(udf.returnType, StructType): +raise ValueError("The returnType of the pandas_udf must be a StructType") + +df = self._df +func = udf.func +returnType = udf.returnType --- End diff -- I actually like it because I think it's more readable this way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143340681 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -519,3 +519,18 @@ case class CoGroup( outputObjAttr: Attribute, left: LogicalPlan, right: LogicalPlan) extends BinaryNode with ObjectProducer + +case class FlatMapGroupsInPandas( --- End diff -- I'd also put this somewhere else that's not object.scala --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143338477 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -519,3 +519,18 @@ case class CoGroup( outputObjAttr: Attribute, left: LogicalPlan, right: LogicalPlan) extends BinaryNode with ObjectProducer + +case class FlatMapGroupsInPandas( --- End diff -- Hm.. looks the doc for this `case class` is missed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143313284 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. For each group, all columns are passed together as a `pandas.DataFrame` +to the user-function and the returned `pandas.DataFrame` are combined as a +:class:`DataFrame`. The returned `pandas.DataFrame` can be arbitrary length and its schema +must match the returnType of the pandas udf. + +:param udf: A wrapped udf function returned by :meth:`pyspark.sql.functions.pandas_udf` + +>>> from pyspark.sql.functions import pandas_udf +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP ++---+---+ +| id| v| ++---+---+ +| 1|-0.7071067811865475| +| 1| 0.7071067811865475| +| 2|-0.8320502943378437| +| 2|-0.2773500981126146| +| 2| 1.1094003924504583| ++---+---+ + +.. seealso:: :meth:`pyspark.sql.functions.pandas_udf` + +""" +from pyspark.sql.functions import pandas_udf + +# Columns are special because hasattr always return True +if isinstance(udf, Column) or not hasattr(udf, 'func') or not udf.vectorized: +raise ValueError("The argument to apply must be a pandas_udf") +if not isinstance(udf.returnType, StructType): +raise ValueError("The returnType of the pandas_udf must be a StructType") + +df = self._df +func = udf.func +returnType = udf.returnType --- End diff -- is it necessary to make all these copies? I could understand maybe copying `func` and `columns` because they are in the wrapped function, but not sure if `df` and `returnType` need to be copied --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143263694 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): --- End diff -- @rxin just to recap our discussion regarding naming: You asked: > What's the difference between this one and the transform function you also proposed? I'm trying to see if all the naming makes sense when considered together. Answer is: `transform` takes a function: pd.Series -> pd.Series and apply the function on each column (or subset of columns). The input and output Series are of the same length. `apply` takes a function: pd.DataFrame -> pd.DataFrame and apply the function on the group. Similar to `flatMapGroups` Does this make sense to you? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143213000 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` to the user-function and +the returned`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned +`pandas.DataFrame` can be arbitrary length and its schema should match the returnType of +the pandas udf. + +:param udf: A wrapped udf function returned by :meth:`pyspark.sql.functions.pandas_udf` + +>>> from pyspark.sql.functions import pandas_udf +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP ++---+---+ +| id| v| ++---+---+ +| 1|-0.7071067811865475| +| 1| 0.7071067811865475| +| 2|-0.8320502943378437| +| 2|-0.2773500981126146| +| 2| 1.1094003924504583| ++---+---+ + +.. seealso:: :meth:`pyspark.sql.functions.pandas_udf` + +""" +from pyspark.sql.functions import pandas_udf + +# Columns are special because hasattr always return True +if isinstance(udf, Column) or not hasattr(udf, 'func') or not udf.vectorized: +raise ValueError("The argument to apply must be a pandas_udf") +if not isinstance(udf.returnType, StructType): +raise ValueError("The returnType of the pandas_udf must be a StructType") + +df = self._df +func = udf.func +returnType = udf.returnType + +# The python executors expects the function to take a list of pd.Series as input +# So we to create a wrapper function that turns that to a pd.DataFrame before passing +# down to the user function +columns = df.columns + +def wrapped(*cols): +import pandas as pd +return func(pd.concat(cols, axis=1, keys=columns)) + +wrapped_udf_obj = pandas_udf(wrapped, returnType) +udf_column = wrapped_udf_obj(*[df[col] for col in df.columns]) --- End diff -- Added --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143198047 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` to the user-function and +the returned`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned +`pandas.DataFrame` can be arbitrary length and its schema should match the returnType of +the pandas udf. + +:param udf: A wrapped udf function returned by :meth:`pyspark.sql.functions.pandas_udf` + +>>> from pyspark.sql.functions import pandas_udf +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP ++---+---+ +| id| v| ++---+---+ +| 1|-0.7071067811865475| +| 1| 0.7071067811865475| +| 2|-0.8320502943378437| +| 2|-0.2773500981126146| +| 2| 1.1094003924504583| ++---+---+ + +.. seealso:: :meth:`pyspark.sql.functions.pandas_udf` + +""" +from pyspark.sql.functions import pandas_udf + +# Columns are special because hasattr always return True +if isinstance(udf, Column) or not hasattr(udf, 'func') or not udf.vectorized: +raise ValueError("The argument to apply must be a pandas_udf") +if not isinstance(udf.returnType, StructType): +raise ValueError("The returnType of the pandas_udf must be a StructType") + +df = self._df +func = udf.func +returnType = udf.returnType + +# The python executors expects the function to take a list of pd.Series as input +# So we to create a wrapper function that turns that to a pd.DataFrame before passing +# down to the user function +columns = df.columns + +def wrapped(*cols): +import pandas as pd +return func(pd.concat(cols, axis=1, keys=columns)) + +wrapped_udf_obj = pandas_udf(wrapped, returnType) +udf_column = wrapped_udf_obj(*[df[col] for col in df.columns]) --- End diff -- I see. Yeah I can make it more clear in the doc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143086739 --- Diff: python/pyspark/sql/group.py --- @@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col, values) return GroupedData(jgd, self.sql_ctx) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-function should take a `pandas.DataFrame` and return another `pandas.DataFrame`. +Each group is passed as a `pandas.DataFrame` to the user-function and the returned +`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned `pandas.DataFrame` +can be arbitrary length and its schema should match the returnType of the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP ++---+---+ +| id| v| ++---+---+ +| 1|-0.7071067811865475| +| 1| 0.7071067811865475| +| 2|-0.8320502943378437| +| 2|-0.2773500981126146| +| 2| 1.1094003924504583| ++---+---+ + +.. seealso:: :meth:`pyspark.sql.functions.pandas_udf` + +""" +from pyspark.sql.functions import pandas_udf + +# Columns are special because hasattr always return True +if isinstance(udf, Column) or not hasattr(udf, 'func') or not udf.vectorized: +raise ValueError("The argument to apply must be a pandas_udf") +if not isinstance(udf.returnType, StructType): +raise ValueError("The returnType of the pandas_udf must be a StructType") + +df = DataFrame(self._jgd.df(), self.sql_ctx) +func = udf.func +returnType = udf.returnType + +# The python executors expects the function to take a list of pd.Series as input +# So we to create a wrapper function that turns that to a pd.DataFrame before passing +# down to the user function +columns = df.columns + +def wrapped(*cols): +import pandas as pd +return func(pd.concat(cols, axis=1, keys=columns)) --- End diff -- ok, I'll try but if this PR is ready to merge then no need to hold up for this, we can address later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143083397 --- Diff: python/pyspark/sql/group.py --- @@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col, values) return GroupedData(jgd, self.sql_ctx) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-function should take a `pandas.DataFrame` and return another `pandas.DataFrame`. +Each group is passed as a `pandas.DataFrame` to the user-function and the returned +`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned `pandas.DataFrame` +can be arbitrary length and its schema should match the returnType of the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP ++---+---+ +| id| v| ++---+---+ +| 1|-0.7071067811865475| +| 1| 0.7071067811865475| +| 2|-0.8320502943378437| +| 2|-0.2773500981126146| +| 2| 1.1094003924504583| ++---+---+ + +.. seealso:: :meth:`pyspark.sql.functions.pandas_udf` + +""" +from pyspark.sql.functions import pandas_udf + +# Columns are special because hasattr always return True +if isinstance(udf, Column) or not hasattr(udf, 'func') or not udf.vectorized: +raise ValueError("The argument to apply must be a pandas_udf") +if not isinstance(udf.returnType, StructType): +raise ValueError("The returnType of the pandas_udf must be a StructType") + +df = DataFrame(self._jgd.df(), self.sql_ctx) +func = udf.func +returnType = udf.returnType + +# The python executors expects the function to take a list of pd.Series as input +# So we to create a wrapper function that turns that to a pd.DataFrame before passing +# down to the user function +columns = df.columns + +def wrapped(*cols): +import pandas as pd +return func(pd.concat(cols, axis=1, keys=columns)) --- End diff -- Feel free! The tricky bit is that the wrapped function is unknown when calling pandas_udf so there needs to be a way to change it later. Also the function chaining makes it a bit complicated too. I still think we should do it, but needs to be more careful. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143082816 --- Diff: python/pyspark/sql/group.py --- @@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col, values) return GroupedData(jgd, self.sql_ctx) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-function should take a `pandas.DataFrame` and return another `pandas.DataFrame`. +Each group is passed as a `pandas.DataFrame` to the user-function and the returned +`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned `pandas.DataFrame` +can be arbitrary length and its schema should match the returnType of the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP ++---+---+ +| id| v| ++---+---+ +| 1|-0.7071067811865475| +| 1| 0.7071067811865475| +| 2|-0.8320502943378437| +| 2|-0.2773500981126146| +| 2| 1.1094003924504583| ++---+---+ + +.. seealso:: :meth:`pyspark.sql.functions.pandas_udf` + +""" +from pyspark.sql.functions import pandas_udf + +# Columns are special because hasattr always return True +if isinstance(udf, Column) or not hasattr(udf, 'func') or not udf.vectorized: +raise ValueError("The argument to apply must be a pandas_udf") +if not isinstance(udf.returnType, StructType): +raise ValueError("The returnType of the pandas_udf must be a StructType") + +df = DataFrame(self._jgd.df(), self.sql_ctx) +func = udf.func +returnType = udf.returnType + +# The python executors expects the function to take a list of pd.Series as input +# So we to create a wrapper function that turns that to a pd.DataFrame before passing +# down to the user function +columns = df.columns + +def wrapped(*cols): +import pandas as pd +return func(pd.concat(cols, axis=1, keys=columns)) --- End diff -- I was thinking it should simplify things. Mind if I give it a try? If does only complicate then we can forget it.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143081782 --- Diff: python/pyspark/sql/group.py --- @@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col, values) return GroupedData(jgd, self.sql_ctx) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-function should take a `pandas.DataFrame` and return another `pandas.DataFrame`. +Each group is passed as a `pandas.DataFrame` to the user-function and the returned +`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned `pandas.DataFrame` +can be arbitrary length and its schema should match the returnType of the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP ++---+---+ +| id| v| ++---+---+ +| 1|-0.7071067811865475| +| 1| 0.7071067811865475| +| 2|-0.8320502943378437| +| 2|-0.2773500981126146| +| 2| 1.1094003924504583| ++---+---+ + +.. seealso:: :meth:`pyspark.sql.functions.pandas_udf` + +""" +from pyspark.sql.functions import pandas_udf + +# Columns are special because hasattr always return True +if isinstance(udf, Column) or not hasattr(udf, 'func') or not udf.vectorized: +raise ValueError("The argument to apply must be a pandas_udf") +if not isinstance(udf.returnType, StructType): +raise ValueError("The returnType of the pandas_udf must be a StructType") + +df = DataFrame(self._jgd.df(), self.sql_ctx) +func = udf.func +returnType = udf.returnType + +# The python executors expects the function to take a list of pd.Series as input +# So we to create a wrapper function that turns that to a pd.DataFrame before passing +# down to the user function +columns = df.columns + +def wrapped(*cols): +import pandas as pd +return func(pd.concat(cols, axis=1, keys=columns)) --- End diff -- @BryanCutler I got something kind of working here: https://github.com/icexelloss/spark/pull/5 But I think the change is not trivial and I don't want to complicate this PR. (It is already pretty large). Should we maybe make the wrapper refactoring it's own PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143081592 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` to the user-function and +the returned`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned +`pandas.DataFrame` can be arbitrary length and its schema should match the returnType of +the pandas udf. + +:param udf: A wrapped udf function returned by :meth:`pyspark.sql.functions.pandas_udf` + +>>> from pyspark.sql.functions import pandas_udf +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP ++---+---+ +| id| v| ++---+---+ +| 1|-0.7071067811865475| +| 1| 0.7071067811865475| +| 2|-0.8320502943378437| +| 2|-0.2773500981126146| +| 2| 1.1094003924504583| ++---+---+ + +.. seealso:: :meth:`pyspark.sql.functions.pandas_udf` + +""" +from pyspark.sql.functions import pandas_udf + +# Columns are special because hasattr always return True +if isinstance(udf, Column) or not hasattr(udf, 'func') or not udf.vectorized: +raise ValueError("The argument to apply must be a pandas_udf") +if not isinstance(udf.returnType, StructType): +raise ValueError("The returnType of the pandas_udf must be a StructType") + +df = self._df +func = udf.func +returnType = udf.returnType + +# The python executors expects the function to take a list of pd.Series as input +# So we to create a wrapper function that turns that to a pd.DataFrame before passing +# down to the user function +columns = df.columns + +def wrapped(*cols): +import pandas as pd +return func(pd.concat(cols, axis=1, keys=columns)) + +wrapped_udf_obj = pandas_udf(wrapped, returnType) +udf_column = wrapped_udf_obj(*[df[col] for col in df.columns]) --- End diff -- Do you mean sending all columns? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143080889 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` to the user-function and +the returned`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned +`pandas.DataFrame` can be arbitrary length and its schema should match the returnType of +the pandas udf. + +:param udf: A wrapped udf function returned by :meth:`pyspark.sql.functions.pandas_udf` + +>>> from pyspark.sql.functions import pandas_udf +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP ++---+---+ +| id| v| ++---+---+ +| 1|-0.7071067811865475| +| 1| 0.7071067811865475| +| 2|-0.8320502943378437| +| 2|-0.2773500981126146| +| 2| 1.1094003924504583| ++---+---+ + +.. seealso:: :meth:`pyspark.sql.functions.pandas_udf` + +""" +from pyspark.sql.functions import pandas_udf + +# Columns are special because hasattr always return True +if isinstance(udf, Column) or not hasattr(udf, 'func') or not udf.vectorized: +raise ValueError("The argument to apply must be a pandas_udf") +if not isinstance(udf.returnType, StructType): +raise ValueError("The returnType of the pandas_udf must be a StructType") + +df = self._df +func = udf.func +returnType = udf.returnType + +# The python executors expects the function to take a list of pd.Series as input +# So we to create a wrapper function that turns that to a pd.DataFrame before passing +# down to the user function +columns = df.columns + +def wrapped(*cols): +import pandas as pd +return func(pd.concat(cols, axis=1, keys=columns)) + +wrapped_udf_obj = pandas_udf(wrapped, returnType) +udf_column = wrapped_udf_obj(*[df[col] for col in df.columns]) --- End diff -- This sends the entire Spark `DataFrame` to Python, is that what users would expect for this operation? Should that be mentioned in the doc? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143032320 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` to the user-function and +the returned`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned +`pandas.DataFrame` can be arbitrary length and its schema should match the returnType of +the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP --- End diff -- Ahh neat. Thanks @HyukjinKwon @BryanCutler doctest passes now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143033289 --- Diff: python/pyspark/worker.py --- @@ -74,17 +74,35 @@ def wrap_udf(f, return_type): def wrap_pandas_udf(f, return_type): -arrow_return_type = toArrowType(return_type) - -def verify_result_length(*a): -result = f(*a) -if not hasattr(result, "__len__"): -raise TypeError("Return type of pandas_udf should be a Pandas.Series") -if len(result) != len(a[0]): -raise RuntimeError("Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (len(a[0]), len(result))) -return result -return lambda *a: (verify_result_length(*a), arrow_return_type) +if isinstance(return_type, StructType): --- End diff -- Added doc. Check for `returnType == StructType `is done earlier: https://github.com/icexelloss/spark/blob/groupby-apply-SPARK-20396/python/pyspark/sql/group.py#L238 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143008730 --- Diff: python/pyspark/sql/group.py --- @@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col, values) return GroupedData(jgd, self.sql_ctx) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-function should take a `pandas.DataFrame` and return another `pandas.DataFrame`. +Each group is passed as a `pandas.DataFrame` to the user-function and the returned +`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned `pandas.DataFrame` +can be arbitrary length and its schema should match the returnType of the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP ++---+---+ +| id| v| ++---+---+ +| 1|-0.7071067811865475| +| 1| 0.7071067811865475| +| 2|-0.8320502943378437| +| 2|-0.2773500981126146| +| 2| 1.1094003924504583| ++---+---+ + +.. seealso:: :meth:`pyspark.sql.functions.pandas_udf` + +""" +from pyspark.sql.functions import pandas_udf + +# Columns are special because hasattr always return True +if isinstance(udf, Column) or not hasattr(udf, 'func') or not udf.vectorized: +raise ValueError("The argument to apply must be a pandas_udf") +if not isinstance(udf.returnType, StructType): +raise ValueError("The returnType of the pandas_udf must be a StructType") + +df = DataFrame(self._jgd.df(), self.sql_ctx) +func = udf.func +returnType = udf.returnType + +# The python executors expects the function to take a list of pd.Series as input +# So we to create a wrapper function that turns that to a pd.DataFrame before passing +# down to the user function +columns = df.columns + +def wrapped(*cols): +import pandas as pd +return func(pd.concat(cols, axis=1, keys=columns)) --- End diff -- I see, yes it does strip the names from the columns. How about the opposite way then, move the conversion from DataFrame to Series out of `worker.py` `wrap_pandas_udf` and put it here? I think you could then use `wrap_pandas_udf` as-is. I also think doing this wrapping would be better off in `functions.py` `_create_udf`, is that possible? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143005943 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` to the user-function and +the returned`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned +`pandas.DataFrame` can be arbitrary length and its schema should match the returnType of +the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP --- End diff -- Ah, cool. I misunderstood. That's the answer to the question. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143004584 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` to the user-function and +the returned`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned +`pandas.DataFrame` can be arbitrary length and its schema should match the returnType of +the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP --- End diff -- @icexelloss , this works for me to run doctests locally `SPARK_TESTING=1 bin/pyspark pyspark.sql.group` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142961120 --- Diff: python/pyspark/worker.py --- @@ -74,17 +74,35 @@ def wrap_udf(f, return_type): def wrap_pandas_udf(f, return_type): -arrow_return_type = toArrowType(return_type) - -def verify_result_length(*a): -result = f(*a) -if not hasattr(result, "__len__"): -raise TypeError("Return type of pandas_udf should be a Pandas.Series") -if len(result) != len(a[0]): -raise RuntimeError("Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (len(a[0]), len(result))) -return result -return lambda *a: (verify_result_length(*a), arrow_return_type) +if isinstance(return_type, StructType): --- End diff -- Yes will do. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142957552 --- Diff: python/pyspark/sql/functions.py --- @@ -2058,7 +2058,7 @@ def __init__(self, func, returnType, name=None, vectorized=False): self._name = name or ( func.__name__ if hasattr(func, '__name__') else func.__class__.__name__) -self._vectorized = vectorized +self.vectorized = vectorized --- End diff -- Are we ok with having `vectorized` being public field? I am fine with both public or private but I do think the fields of the function returned by `UserDefinedFuncion_wrapped()` should have the same field names as `UserDefinedFunction` to avoid confusion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142956597 --- Diff: python/pyspark/sql/group.py --- @@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col, values) return GroupedData(jgd, self.sql_ctx) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-function should take a `pandas.DataFrame` and return another `pandas.DataFrame`. +Each group is passed as a `pandas.DataFrame` to the user-function and the returned +`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned `pandas.DataFrame` +can be arbitrary length and its schema should match the returnType of the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP ++---+---+ +| id| v| ++---+---+ +| 1|-0.7071067811865475| +| 1| 0.7071067811865475| +| 2|-0.8320502943378437| +| 2|-0.2773500981126146| +| 2| 1.1094003924504583| ++---+---+ + +.. seealso:: :meth:`pyspark.sql.functions.pandas_udf` + +""" +from pyspark.sql.functions import pandas_udf + +# Columns are special because hasattr always return True +if isinstance(udf, Column) or not hasattr(udf, 'func') or not udf.vectorized: +raise ValueError("The argument to apply must be a pandas_udf") +if not isinstance(udf.returnType, StructType): +raise ValueError("The returnType of the pandas_udf must be a StructType") + +df = DataFrame(self._jgd.df(), self.sql_ctx) +func = udf.func +returnType = udf.returnType + +# The python executors expects the function to take a list of pd.Series as input +# So we to create a wrapper function that turns that to a pd.DataFrame before passing +# down to the user function +columns = df.columns + +def wrapped(*cols): +import pandas as pd +return func(pd.concat(cols, axis=1, keys=columns)) --- End diff -- @BryanCutler yeah I was trying to do that earlier that but unfortunately the column names are lost on the worker so we cannot construct the `Pandas.DataFrame` on the worker. I think the best plcae to define the wrap function is probably on the pyspark driver side because we have the most information there. However, that requires some refactoring. I will give it a try and see how that goes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142952213 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` to the user-function and +the returned`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned +`pandas.DataFrame` can be arbitrary length and its schema should match the returnType of +the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP --- End diff -- Not sure.. I think what you know is what I usually do. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142949557 --- Diff: python/pyspark/worker.py --- @@ -74,17 +74,35 @@ def wrap_udf(f, return_type): def wrap_pandas_udf(f, return_type): -arrow_return_type = toArrowType(return_type) - -def verify_result_length(*a): -result = f(*a) -if not hasattr(result, "__len__"): -raise TypeError("Return type of pandas_udf should be a Pandas.Series") -if len(result) != len(a[0]): -raise RuntimeError("Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (len(a[0]), len(result))) -return result -return lambda *a: (verify_result_length(*a), arrow_return_type) +if isinstance(return_type, StructType): +arrow_return_types = [to_arrow_type(field.dataType) for field in return_type] + +def fn(*a): --- End diff -- Yes, I will change the name to some thing more descriptive. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142949179 --- Diff: python/pyspark/worker.py --- @@ -74,17 +74,35 @@ def wrap_udf(f, return_type): def wrap_pandas_udf(f, return_type): -arrow_return_type = toArrowType(return_type) - -def verify_result_length(*a): -result = f(*a) -if not hasattr(result, "__len__"): -raise TypeError("Return type of pandas_udf should be a Pandas.Series") -if len(result) != len(a[0]): -raise RuntimeError("Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (len(a[0]), len(result))) -return result -return lambda *a: (verify_result_length(*a), arrow_return_type) +if isinstance(return_type, StructType): --- End diff -- Yea, let's add some comments and throws a better exception. For example, I think we should clarify `StructType` should be used in groupping udf only in the exception message. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142948551 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` to the user-function and +the returned`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned +`pandas.DataFrame` can be arbitrary length and its schema should match the returnType of +the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP --- End diff -- Ahh..Thanks! Will give it a try. Still, is there a easier way to run the pyspark tests locally (the way jenkins runs them)? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142948307 --- Diff: python/pyspark/worker.py --- @@ -74,17 +74,35 @@ def wrap_udf(f, return_type): def wrap_pandas_udf(f, return_type): -arrow_return_type = toArrowType(return_type) - -def verify_result_length(*a): -result = f(*a) -if not hasattr(result, "__len__"): -raise TypeError("Return type of pandas_udf should be a Pandas.Series") -if len(result) != len(a[0]): -raise RuntimeError("Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (len(a[0]), len(result))) -return result -return lambda *a: (verify_result_length(*a), arrow_return_type) +if isinstance(return_type, StructType): +arrow_return_types = [to_arrow_type(field.dataType) for field in return_type] + +def fn(*a): --- End diff -- Yea, but `fn` looks a no-no .. do you maybe have an idea about a better name? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142947514 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` to the user-function and +the returned`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned +`pandas.DataFrame` can be arbitrary length and its schema should match the returnType of +the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP --- End diff -- Also, it looks this file does not define `spark` as a global that is used in doctests. I think we should add something like ... ```diff sc = spark.sparkContext globs['sc'] = sc + globs['spark'] = spark globs['df'] = sc.parallelize([(2, 'Alice'), (5, 'Bob')]) \ ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142946504 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` to the user-function and +the returned`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned +`pandas.DataFrame` can be arbitrary length and its schema should match the returnType of +the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP --- End diff -- Probably, importing `pandas_udf` should solve the problem I guess. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142946430 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` to the user-function and +the returned`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned +`pandas.DataFrame` can be arbitrary length and its schema should match the returnType of +the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP --- End diff -- I think the problem is, `pandas_udf` is unimportable in this doctest. Up to my knowledge, `# doctest: +SKIP` is per line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142945465 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` to the user-function and +the returned`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned +`pandas.DataFrame` can be arbitrary length and its schema should match the returnType of +the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP --- End diff -- I have been using ``` bin/pyspark pyspark.sql.tests GroupbyApplyTests ``` But this doesn't seem to do doctest. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142944123 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` to the user-function and +the returned`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned +`pandas.DataFrame` can be arbitrary length and its schema should match the returnType of +the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP --- End diff -- Seems this is still not skipped by doc test. What's the best way to run pyspark test locally? I tried ``` ./run-tests --modules=pyspark-sql --parallelism=4 ``` But it's giving me a different failure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142845456 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` to the user-function and +the returned`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned +`pandas.DataFrame` can be arbitrary length and its schema should match the returnType of +the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP --- End diff -- Fixed. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142841543 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` to the user-function and +the returned`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned +`pandas.DataFrame` can be arbitrary length and its schema should match the returnType of +the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP --- End diff -- nit: the spaces around `#` are wrong. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142840490 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -519,3 +519,18 @@ case class CoGroup( outputObjAttr: Attribute, left: LogicalPlan, right: LogicalPlan) extends BinaryNode with ObjectProducer + +case class FlatMapGroupsInPandas( --- End diff -- Doc added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142839010 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -26,6 +26,25 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.StructType +private class BatchIterator[T](iter: Iterator[T], batchSize: Int) + extends Iterator[Iterator[T]] { + + override def hasNext: Boolean = iter.hasNext + + override def next(): Iterator[T] = { --- End diff -- Sorry I pushed a bit late. The comment is added now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142836611 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -26,6 +26,25 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.StructType +private class BatchIterator[T](iter: Iterator[T], batchSize: Int) + extends Iterator[Iterator[T]] { + + override def hasNext: Boolean = iter.hasNext + + override def next(): Iterator[T] = { --- End diff -- I didn't see the comment added? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142836297 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -519,3 +519,18 @@ case class CoGroup( outputObjAttr: Attribute, left: LogicalPlan, right: LogicalPlan) extends BinaryNode with ObjectProducer + +case class FlatMapGroupsInPandas( +groupingAttributes: Seq[Attribute], +functionExpr: Expression, +output: Seq[Attribute], +child: LogicalPlan) extends UnaryNode { + /** + * This is needed because output attributes is considered `reference` when + * passed through the constructor. + * + * Without this, catalyst will complain that output attributes are missing + * from the input. + */ + override val producedAttributes = AttributeSet(output) --- End diff -- Ok. I see. Should be fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142836245 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -519,3 +519,18 @@ case class CoGroup( outputObjAttr: Attribute, left: LogicalPlan, right: LogicalPlan) extends BinaryNode with ObjectProducer + +case class FlatMapGroupsInPandas( +groupingAttributes: Seq[Attribute], +functionExpr: Expression, +output: Seq[Attribute], +child: LogicalPlan) extends UnaryNode { + /** + * This is needed because output attributes is considered `reference` when --- End diff -- nit: `reference` -> `references`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142835260 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,66 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self) --- End diff -- `return GroupedData(jgd, self)` -> `return GroupedData(jgd, self._df)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142801623 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -26,6 +26,25 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.StructType +private class BatchIterator[T](iter: Iterator[T], batchSize: Int) + extends Iterator[Iterator[T]] { + + override def hasNext: Boolean = iter.hasNext + + override def next(): Iterator[T] = { --- End diff -- Added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142796899 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -519,3 +519,18 @@ case class CoGroup( outputObjAttr: Attribute, left: LogicalPlan, right: LogicalPlan) extends BinaryNode with ObjectProducer + +case class FlatMapGroupsInPandas( +groupingAttributes: Seq[Attribute], +functionExpr: Expression, +output: Seq[Attribute], +child: LogicalPlan) extends UnaryNode { + /** + * This is needed because output attributes is considered `reference` when + * passed through the constructor. + * + * Without this, catalyst will complain that output attributes are missing + * from the input. + */ + override val producedAttributes = AttributeSet(output) --- End diff -- This is one of the trick bit. It's because of this code: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala#L135 Because of `productIterator` will return all member variables, including `output`, `references` of the tree node will include all output attributes, and it will complain about missing input: ``` def missingInput: AttributeSet = references -- inputSet -- producedAttributes ``` I think my solution here isn't great but I don't know the best way of deal with this. If someone with deeper catalyst knowledge can suggest, I am happy to give rid of this bit.. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142770337 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import scala.collection.JavaConverters._ + +import org.apache.spark.TaskContext +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} + +case class FlatMapGroupsInPandasExec( +groupingAttributes: Seq[Attribute], +func: Expression, +output: Seq[Attribute], +child: SparkPlan) + extends UnaryExecNode { + + private val pandasFunction = func.asInstanceOf[PythonUDF].func + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def producedAttributes: AttributeSet = AttributeSet(output) + + override def requiredChildDistribution: Seq[Distribution] = +ClusteredDistribution(groupingAttributes) :: Nil + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(groupingAttributes.map(SortOrder(_, Ascending))) + + override protected def doExecute(): RDD[InternalRow] = { +val inputRDD = child.execute() + +val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) +val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) +val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction))) +val argOffsets = Array((0 until child.schema.length).toArray) + +inputRDD.mapPartitionsInternal { iter => + val grouped = GroupedIterator(iter, groupingAttributes, child.output) + val context = TaskContext.get() + + val columnarBatchIter = new ArrowPythonRunner( +chainedFunc, bufferSize, reuseWorker, +PythonEvalType.SQL_PANDAS_UDF, argOffsets, child.schema) +.compute(grouped.map(_._2), context.partitionId(), context) + + val rowIter = new Iterator[InternalRow] { +private var currentIter = if (columnarBatchIter.hasNext) { + val batch = columnarBatchIter.next() + batch.rowIterator.asScala --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142740947 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -435,6 +435,33 @@ class RelationalGroupedDataset protected[sql]( df.logicalPlan.output, df.logicalPlan)) } + + private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = { +require(expr.vectorized, "Must pass a vectorized python udf") + +val output = expr.dataType match { + case s: StructType => s.map { +case StructField(name, dataType, nullable, metadata) => + AttributeReference(name, dataType, nullable, metadata)() + } +} + +val groupingAttributes: Seq[Attribute] = groupingExprs.map { + case ne: NamedExpression => ne.toAttribute +} + +val plan = FlatMapGroupsInPandas( + groupingAttributes, + expr, + output, + df.logicalPlan +) + +Dataset.ofRows( --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142735696 --- Diff: python/pyspark/sql/group.py --- @@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col, values) return GroupedData(jgd, self.sql_ctx) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-function should take a `pandas.DataFrame` and return another `pandas.DataFrame`. +Each group is passed as a `pandas.DataFrame` to the user-function and the returned +`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned `pandas.DataFrame` +can be arbitrary length and its schema should match the returnType of the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP ++---+---+ +| id| v| ++---+---+ +| 1|-0.7071067811865475| +| 1| 0.7071067811865475| +| 2|-0.8320502943378437| +| 2|-0.2773500981126146| +| 2| 1.1094003924504583| ++---+---+ + +.. seealso:: :meth:`pyspark.sql.functions.pandas_udf` + +""" +from pyspark.sql.functions import pandas_udf + +# Columns are special because hasattr always return True +if isinstance(udf, Column) or not hasattr(udf, 'func') or not udf.vectorized: +raise ValueError("The argument to apply must be a pandas_udf") +if not isinstance(udf.returnType, StructType): +raise ValueError("The returnType of the pandas_udf must be a StructType") + +df = DataFrame(self._jgd.df(), self.sql_ctx) +func = udf.func +returnType = udf.returnType + +# The python executors expects the function to take a list of pd.Series as input +# So we to create a wrapper function that turns that to a pd.DataFrame before passing +# down to the user function +columns = df.columns + +def wrapped(*cols): +import pandas as pd +return func(pd.concat(cols, axis=1, keys=columns)) --- End diff -- That's fine, we can leave the serializer as is, but it seems like a waste to unwrap the pandas_udf only to then wrap it again as another pandas_udf. @icexelloss , how about changing the Series to a DataFrame in `worker.py` `wrap_pandas_udf` right before the user function is called? That way both transformations are in one place and `wrap_pandas_udf` would work like this: Changes Series to DataFrame; Call user function with DataFrame; Get result DataFrame; Change DataFrame to Series. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142720877 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import scala.collection.JavaConverters._ + +import org.apache.spark.TaskContext +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} + +case class FlatMapGroupsInPandasExec( +groupingAttributes: Seq[Attribute], +func: Expression, +output: Seq[Attribute], +child: SparkPlan) + extends UnaryExecNode { + + private val pandasFunction = func.asInstanceOf[PythonUDF].func + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def producedAttributes: AttributeSet = AttributeSet(output) + + override def requiredChildDistribution: Seq[Distribution] = +ClusteredDistribution(groupingAttributes) :: Nil + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(groupingAttributes.map(SortOrder(_, Ascending))) + + override protected def doExecute(): RDD[InternalRow] = { +val inputRDD = child.execute() + +val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) +val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) +val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction))) +val argOffsets = Array((0 until child.schema.length).toArray) + +inputRDD.mapPartitionsInternal { iter => + val grouped = GroupedIterator(iter, groupingAttributes, child.output) + val context = TaskContext.get() + + val columnarBatchIter = new ArrowPythonRunner( +chainedFunc, bufferSize, reuseWorker, +PythonEvalType.SQL_PANDAS_UDF, argOffsets, child.schema) +.compute(grouped.map(_._2), context.partitionId(), context) + + val rowIter = new Iterator[InternalRow] { +private var currentIter = if (columnarBatchIter.hasNext) { + val batch = columnarBatchIter.next() + batch.rowIterator.asScala --- End diff -- If we don't need to check the schema, we can simplify the iterator as: ```scala val rowIter = columnarBatchIter.flatMap(_.rowIterator.asScala) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142704642 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,66 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self) + +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-function should take a `pandas.DataFrame` and return another `pandas.DataFrame`. --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142704126 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -435,6 +435,33 @@ class RelationalGroupedDataset protected[sql]( df.logicalPlan.output, df.logicalPlan)) } + + private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = { +require(expr.vectorized, "Must pass a vectorized python udf") + +val output = expr.dataType match { + case s: StructType => s.map { +case StructField(name, dataType, nullable, metadata) => + AttributeReference(name, dataType, nullable, metadata)() + } +} + +val groupingAttributes: Seq[Attribute] = groupingExprs.map { + case ne: NamedExpression => ne.toAttribute --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142703829 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -435,6 +435,33 @@ class RelationalGroupedDataset protected[sql]( df.logicalPlan.output, df.logicalPlan)) } + + private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = { +require(expr.vectorized, "Must pass a vectorized python udf") + +val output = expr.dataType match { + case s: StructType => s.map { --- End diff -- Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142703487 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -435,6 +435,33 @@ class RelationalGroupedDataset protected[sql]( df.logicalPlan.output, df.logicalPlan)) } + + private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = { +require(expr.vectorized, "Must pass a vectorized python udf") + +val output = expr.dataType match { + case s: StructType => s.map { +case StructField(name, dataType, nullable, metadata) => + AttributeReference(name, dataType, nullable, metadata)() + } +} + +val groupingAttributes: Seq[Attribute] = groupingExprs.map { + case ne: NamedExpression => ne.toAttribute +} + +val plan = FlatMapGroupsInPandas( + groupingAttributes, + expr, + output, + df.logicalPlan +) --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142697418 --- Diff: python/pyspark/worker.py --- @@ -74,17 +74,35 @@ def wrap_udf(f, return_type): def wrap_pandas_udf(f, return_type): -arrow_return_type = toArrowType(return_type) - -def verify_result_length(*a): -result = f(*a) -if not hasattr(result, "__len__"): -raise TypeError("Return type of pandas_udf should be a Pandas.Series") -if len(result) != len(a[0]): -raise RuntimeError("Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (len(a[0]), len(result))) -return result -return lambda *a: (verify_result_length(*a), arrow_return_type) +if isinstance(return_type, StructType): --- End diff -- normal pandas doesn't support `StructType` as returnType that's why this works. However, I agree the way we distinguish grouping udf and normal udf is not clean. Ideally we should have a cleaner way of defining such wrapping functions for different pandas_udf use cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142695929 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import scala.collection.JavaConverters._ + +import org.apache.spark.TaskContext +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, Expression, NamedExpression, SortOrder, UnsafeProjection} +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} + +case class FlatMapGroupsInPandasExec( +grouping: Seq[Expression], +func: Expression, +override val output: Seq[Attribute], +override val child: SparkPlan +) extends UnaryExecNode { + + val groupingAttributes: Seq[Attribute] = grouping.map { +case ne: NamedExpression => ne.toAttribute + } + + private val pandasFunction = func.asInstanceOf[PythonUDF].func + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def producedAttributes: AttributeSet = AttributeSet(output) + + override def requiredChildDistribution: Seq[Distribution] = +ClusteredDistribution(groupingAttributes) :: Nil + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(groupingAttributes.map(SortOrder(_, Ascending))) + + override protected def doExecute(): RDD[InternalRow] = { +val inputRDD = child.execute() + +val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) +val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) +val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction))) +val argOffsets = Array((0 until child.schema.length).toArray) + +inputRDD.mapPartitionsInternal { iter => + val grouped = GroupedIterator(iter, groupingAttributes, child.output) --- End diff -- Yes thanks much! I will take a look now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142695843 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -111,6 +111,9 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper { } def apply(plan: SparkPlan): SparkPlan = plan transformUp { +// FlatMapGroupsInPandas and be evaluated in python worker --- End diff -- Good catch. Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142695501 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -26,6 +26,25 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.StructType +private class BatchIterator[T](iter: Iterator[T], batchSize: Int) + extends Iterator[Iterator[T]] { + + override def hasNext: Boolean = iter.hasNext + + override def next(): Iterator[T] = { --- End diff -- +1. Let me add that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142695129 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -519,3 +519,18 @@ case class CoGroup( outputObjAttr: Attribute, left: LogicalPlan, right: LogicalPlan) extends BinaryNode with ObjectProducer + +case class FlatMapGroupsInPandas( --- End diff -- Yes agreed. I will fix that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142694835 --- Diff: python/pyspark/worker.py --- @@ -74,17 +74,35 @@ def wrap_udf(f, return_type): def wrap_pandas_udf(f, return_type): -arrow_return_type = toArrowType(return_type) - -def verify_result_length(*a): -result = f(*a) -if not hasattr(result, "__len__"): -raise TypeError("Return type of pandas_udf should be a Pandas.Series") -if len(result) != len(a[0]): -raise RuntimeError("Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (len(a[0]), len(result))) -return result -return lambda *a: (verify_result_length(*a), arrow_return_type) +if isinstance(return_type, StructType): +arrow_return_types = [to_arrow_type(field.dataType) for field in return_type] + +def fn(*a): +import pandas as pd +out = f(*a) +assert isinstance(out, pd.DataFrame), \ +'Return value from the user function is not a pandas.DataFrame.' +assert len(out.columns) == len(arrow_return_types), \ +'Number of columns of the returned pd.DataFrame doesn\'t match output schema. ' \ --- End diff -- Good catch. Fixed. (Btw thanks for catching these small things) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142694484 --- Diff: python/pyspark/worker.py --- @@ -74,17 +74,35 @@ def wrap_udf(f, return_type): def wrap_pandas_udf(f, return_type): -arrow_return_type = toArrowType(return_type) - -def verify_result_length(*a): -result = f(*a) -if not hasattr(result, "__len__"): -raise TypeError("Return type of pandas_udf should be a Pandas.Series") -if len(result) != len(a[0]): -raise RuntimeError("Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (len(a[0]), len(result))) -return result -return lambda *a: (verify_result_length(*a), arrow_return_type) +if isinstance(return_type, StructType): +arrow_return_types = [to_arrow_type(field.dataType) for field in return_type] + +def fn(*a): +import pandas as pd +out = f(*a) +assert isinstance(out, pd.DataFrame), \ +'Return value from the user function is not a pandas.DataFrame.' --- End diff -- Good catch. Yeah let's keep such terms consistent. Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142694381 --- Diff: python/pyspark/worker.py --- @@ -74,17 +74,35 @@ def wrap_udf(f, return_type): def wrap_pandas_udf(f, return_type): -arrow_return_type = toArrowType(return_type) - -def verify_result_length(*a): -result = f(*a) -if not hasattr(result, "__len__"): -raise TypeError("Return type of pandas_udf should be a Pandas.Series") -if len(result) != len(a[0]): -raise RuntimeError("Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (len(a[0]), len(result))) -return result -return lambda *a: (verify_result_length(*a), arrow_return_type) +if isinstance(return_type, StructType): +arrow_return_types = [to_arrow_type(field.dataType) for field in return_type] + +def fn(*a): +import pandas as pd +out = f(*a) --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142693686 --- Diff: python/pyspark/worker.py --- @@ -74,17 +74,35 @@ def wrap_udf(f, return_type): def wrap_pandas_udf(f, return_type): -arrow_return_type = toArrowType(return_type) - -def verify_result_length(*a): -result = f(*a) -if not hasattr(result, "__len__"): -raise TypeError("Return type of pandas_udf should be a Pandas.Series") -if len(result) != len(a[0]): -raise RuntimeError("Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (len(a[0]), len(result))) -return result -return lambda *a: (verify_result_length(*a), arrow_return_type) +if isinstance(return_type, StructType): +arrow_return_types = [to_arrow_type(field.dataType) for field in return_type] + +def fn(*a): --- End diff -- `verify_result_type` is kind of a misnomer because this function does: 1. convert the output of the user-defined function (pandas.DataFrame) to the form that the serialzer take (list of (pd.Series, DataType)) 2. Validate the return value of the user-defined function. Part of the verifying of the result type is done in the serializer in the process of coercing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142693843 --- Diff: python/pyspark/worker.py --- @@ -74,17 +74,35 @@ def wrap_udf(f, return_type): def wrap_pandas_udf(f, return_type): -arrow_return_type = toArrowType(return_type) - -def verify_result_length(*a): -result = f(*a) -if not hasattr(result, "__len__"): -raise TypeError("Return type of pandas_udf should be a Pandas.Series") -if len(result) != len(a[0]): -raise RuntimeError("Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (len(a[0]), len(result))) -return result -return lambda *a: (verify_result_length(*a), arrow_return_type) +if isinstance(return_type, StructType): +arrow_return_types = [to_arrow_type(field.dataType) for field in return_type] + +def fn(*a): +import pandas as pd +out = f(*a) --- End diff -- Yes that is more consistent. Let me change that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142692448 --- Diff: python/pyspark/sql/tests.py --- @@ -3106,8 +3106,9 @@ def assertFramesEqual(self, df_with_arrow, df_without): self.assertTrue(df_without.equals(df_with_arrow), msg=msg) def test_unsupported_datatype(self): -schema = StructType([StructField("dt", DateType(), True)]) -df = self.spark.createDataFrame([(datetime.date(1970, 1, 1),)], schema=schema) --- End diff -- Reverted. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org