This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5c55812 [SPARK-28198][PYTHON][FOLLOW-UP] Rename mapPartitionsInPandas to mapInPandas with a separate evaluation type 5c55812 is described below commit 5c55812400e1e0a8aaeb50a50be106e80c916c86 Author: HyukjinKwon <gurwls...@apache.org> AuthorDate: Fri Jul 5 09:22:41 2019 +0900 [SPARK-28198][PYTHON][FOLLOW-UP] Rename mapPartitionsInPandas to mapInPandas with a separate evaluation type ## What changes were proposed in this pull request? This PR proposes to rename `mapPartitionsInPandas` to `mapInPandas` with a separate evaluation type . Had an offline discussion with rxin, mengxr and cloud-fan The reason is basically: 1. `SCALAR_ITER` doesn't make sense with `mapPartitionsInPandas`. 2. It cannot share the same Pandas UDF, for instance, at `select` and `mapPartitionsInPandas` unlike `GROUPED_AGG` because iterator's return type is different. 3. `mapPartitionsInPandas` -> `mapInPandas` - see https://github.com/apache/spark/pull/25044#issuecomment-508298552 and https://github.com/apache/spark/pull/25044#issuecomment-508299764 Renaming `SCALAR_ITER` as `MAP_ITER` is abandoned due to 2. reason. For `XXX_ITER`, it might have to have a different interface in the future if we happen to add other versions of them. But this is an orthogonal topic with `mapPartitionsInPandas`. ## How was this patch tested? Existing tests should cover. Closes #25044 from HyukjinKwon/SPARK-28198. Authored-by: HyukjinKwon <gurwls...@apache.org> Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- python/pyspark/sql/dataframe.py | 13 +++++------- python/pyspark/sql/functions.py | 5 ++++- python/pyspark/sql/tests/test_pandas_udf_iter.py | 24 +++++++++++----------- python/pyspark/sql/udf.py | 20 +++++++++++++++--- python/pyspark/worker.py | 7 +++++-- .../plans/logical/pythonLogicalOperators.scala | 4 ++-- .../main/scala/org/apache/spark/sql/Dataset.scala | 10 ++++----- .../spark/sql/execution/SparkStrategies.scala | 4 ++-- ...onsInPandasExec.scala => MapInPandasExec.scala} | 2 +- 9 files changed, 52 insertions(+), 37 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 3f5d1ff..e666973 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -2193,7 +2193,7 @@ class DataFrame(object): _check_series_convert_timestamps_local_tz(pdf[field.name], timezone) return pdf - def mapPartitionsInPandas(self, udf): + def mapInPandas(self, udf): """ Maps each partition of the current :class:`DataFrame` using a pandas udf and returns the result as a `DataFrame`. @@ -2215,7 +2215,7 @@ class DataFrame(object): ... def filter_func(iterator): ... for pdf in iterator: ... yield pdf[pdf.id == 1] - >>> df.mapPartitionsInPandas(filter_func).show() # doctest: +SKIP + >>> df.mapInPandas(filter_func).show() # doctest: +SKIP +---+---+ | id|age| +---+---+ @@ -2227,15 +2227,12 @@ class DataFrame(object): """ # Columns are special because hasattr always return True if isinstance(udf, Column) or not hasattr(udf, 'func') \ - or udf.evalType != PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF: + or udf.evalType != PythonEvalType.SQL_MAP_PANDAS_ITER_UDF: raise ValueError("Invalid udf: the udf argument must be a pandas_udf of type " - "SCALAR_ITER.") - - if not isinstance(udf.returnType, StructType): - raise ValueError("The returnType of the pandas_udf must be a StructType") + "MAP_ITER.") udf_column = udf(*[self[col] for col in self.columns]) - jdf = self._jdf.mapPartitionsInPandas(udf_column._jc.expr()) + jdf = self._jdf.mapInPandas(udf_column._jc.expr()) return DataFrame(jdf, self.sql_ctx) def _collectAsArrow(self): diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 5d1e69e..bf33b9a 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2802,6 +2802,8 @@ class PandasUDFType(object): GROUPED_AGG = PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF + MAP_ITER = PythonEvalType.SQL_MAP_PANDAS_ITER_UDF + @since(1.3) def udf(f=None, returnType=StringType()): @@ -3278,7 +3280,8 @@ def pandas_udf(f=None, returnType=None, functionType=None): if eval_type not in [PythonEvalType.SQL_SCALAR_PANDAS_UDF, PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, - PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF]: + PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF, + PythonEvalType.SQL_MAP_PANDAS_ITER_UDF]: raise ValueError("Invalid functionType: " "functionType must be one the values from PandasUDFType") diff --git a/python/pyspark/sql/tests/test_pandas_udf_iter.py b/python/pyspark/sql/tests/test_pandas_udf_iter.py index c27cc5a..2a5709e 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_iter.py +++ b/python/pyspark/sql/tests/test_pandas_udf_iter.py @@ -57,7 +57,7 @@ class ScalarPandasIterUDFTests(ReusedSQLTestCase): ReusedSQLTestCase.tearDownClass() def test_map_partitions_in_pandas(self): - @pandas_udf('id long', PandasUDFType.SCALAR_ITER) + @pandas_udf('id long', PandasUDFType.MAP_ITER) def func(iterator): for pdf in iterator: assert isinstance(pdf, pd.DataFrame) @@ -65,7 +65,7 @@ class ScalarPandasIterUDFTests(ReusedSQLTestCase): yield pdf df = self.spark.range(10) - actual = df.mapPartitionsInPandas(func).collect() + actual = df.mapInPandas(func).collect() expected = df.collect() self.assertEquals(actual, expected) @@ -73,45 +73,45 @@ class ScalarPandasIterUDFTests(ReusedSQLTestCase): data = [(1, "foo"), (2, None), (3, "bar"), (4, "bar")] df = self.spark.createDataFrame(data, "a int, b string") - @pandas_udf(df.schema, PandasUDFType.SCALAR_ITER) + @pandas_udf(df.schema, PandasUDFType.MAP_ITER) def func(iterator): for pdf in iterator: assert isinstance(pdf, pd.DataFrame) assert [d.name for d in list(pdf.dtypes)] == ['int32', 'object'] yield pdf - actual = df.mapPartitionsInPandas(func).collect() + actual = df.mapInPandas(func).collect() expected = df.collect() self.assertEquals(actual, expected) def test_different_output_length(self): - @pandas_udf('a long', PandasUDFType.SCALAR_ITER) + @pandas_udf('a long', PandasUDFType.MAP_ITER) def func(iterator): for _ in iterator: yield pd.DataFrame({'a': list(range(100))}) df = self.spark.range(10) - actual = df.repartition(1).mapPartitionsInPandas(func).collect() + actual = df.repartition(1).mapInPandas(func).collect() self.assertEquals(set((r.a for r in actual)), set(range(100))) def test_empty_iterator(self): - @pandas_udf('a int, b string', PandasUDFType.SCALAR_ITER) + @pandas_udf('a int, b string', PandasUDFType.MAP_ITER) def empty_iter(_): return iter([]) self.assertEqual( - self.spark.range(10).mapPartitionsInPandas(empty_iter).count(), 0) + self.spark.range(10).mapInPandas(empty_iter).count(), 0) def test_empty_rows(self): - @pandas_udf('a int', PandasUDFType.SCALAR_ITER) + @pandas_udf('a int', PandasUDFType.MAP_ITER) def empty_rows(_): return iter([pd.DataFrame({'a': []})]) self.assertEqual( - self.spark.range(10).mapPartitionsInPandas(empty_rows).count(), 0) + self.spark.range(10).mapInPandas(empty_rows).count(), 0) def test_chain_map_partitions_in_pandas(self): - @pandas_udf('id long', PandasUDFType.SCALAR_ITER) + @pandas_udf('id long', PandasUDFType.MAP_ITER) def func(iterator): for pdf in iterator: assert isinstance(pdf, pd.DataFrame) @@ -119,7 +119,7 @@ class ScalarPandasIterUDFTests(ReusedSQLTestCase): yield pdf df = self.spark.range(10) - actual = df.mapPartitionsInPandas(func).mapPartitionsInPandas(func).collect() + actual = df.mapInPandas(func).mapInPandas(func).collect() expected = df.collect() self.assertEquals(actual, expected) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 84be2d2..0944c87 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -42,7 +42,8 @@ def _create_udf(f, returnType, evalType): if evalType in (PythonEvalType.SQL_SCALAR_PANDAS_UDF, PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, - PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF): + PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF, + PythonEvalType.SQL_MAP_PANDAS_ITER_UDF): from pyspark.sql.utils import require_minimum_pyarrow_version require_minimum_pyarrow_version() @@ -135,6 +136,17 @@ class UserDefinedFunction(object): else: raise TypeError("Invalid returnType for grouped map Pandas " "UDFs: returnType must be a StructType.") + elif self.evalType == PythonEvalType.SQL_MAP_PANDAS_ITER_UDF: + if isinstance(self._returnType_placeholder, StructType): + try: + to_arrow_type(self._returnType_placeholder) + except TypeError: + raise NotImplementedError( + "Invalid returnType with map iterator Pandas UDFs: " + "%s is not supported" % str(self._returnType_placeholder)) + else: + raise TypeError("Invalid returnType for map iterator Pandas " + "UDFs: returnType must be a StructType.") elif self.evalType == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF: try: # StructType is not yet allowed as a return type, explicitly check here to fail fast @@ -328,10 +340,12 @@ class UDFRegistration(object): if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF, PythonEvalType.SQL_SCALAR_PANDAS_UDF, PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF, - PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF]: + PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF, + PythonEvalType.SQL_MAP_PANDAS_ITER_UDF]: raise ValueError( "Invalid f: f must be SQL_BATCHED_UDF, SQL_SCALAR_PANDAS_UDF, " - "SQL_SCALAR_PANDAS_ITER_UDF, or SQL_GROUPED_AGG_PANDAS_UDF") + "SQL_SCALAR_PANDAS_ITER_UDF, SQL_GROUPED_AGG_PANDAS_UDF or " + "SQL_MAP_PANDAS_ITER_UDF.") register_udf = UserDefinedFunction(f.func, returnType=f.returnType, name=name, evalType=f.evalType, deterministic=f.deterministic) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 568902f..b34abd0 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -295,7 +295,10 @@ def read_udfs(pickleSer, infile, eval_type): is_map_iter = eval_type == PythonEvalType.SQL_MAP_PANDAS_ITER_UDF if is_scalar_iter or is_map_iter: - assert num_udfs == 1, "One SCALAR_ITER UDF expected here." + if is_scalar_iter: + assert num_udfs == 1, "One SCALAR_ITER UDF expected here." + if is_map_iter: + assert num_udfs == 1, "One MAP_ITER UDF expected here." arg_offsets, udf = read_single_udf( pickleSer, infile, eval_type, runner_conf, udf_index=0) @@ -318,7 +321,7 @@ def read_udfs(pickleSer, infile, eval_type): for result_batch, result_type in result_iter: num_output_rows += len(result_batch) assert is_map_iter or num_output_rows <= num_input_rows[0], \ - "Pandas SCALAR_ITER UDF outputted more rows than input rows." + "Pandas MAP_ITER UDF outputted more rows than input rows." yield (result_batch, result_type) if is_scalar_iter: diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala index 757e46a..83695e3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala @@ -41,9 +41,9 @@ case class FlatMapGroupsInPandas( /** * Map partitions using an udf: iter(pandas.Dataframe) -> iter(pandas.DataFrame). - * This is used by DataFrame.mapPartitionsInPandas() + * This is used by DataFrame.mapInPandas() */ -case class MapPartitionsInPandas( +case class MapInPandas( functionExpr: Expression, output: Seq[Attribute], child: LogicalPlan) extends UnaryNode { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index fe5b15c..147222c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2651,14 +2651,12 @@ class Dataset[T] private[sql]( * This function uses Apache Arrow as serialization format between Java executors and Python * workers. */ - private[sql] def mapPartitionsInPandas(f: PythonUDF): DataFrame = { + private[sql] def mapInPandas(func: PythonUDF): DataFrame = { Dataset.ofRows( sparkSession, - MapPartitionsInPandas( - // Here, the evalType is SQL_SCALAR_PANDAS_ITER_UDF since we share the - // same Pandas type. To avoid conflicts, it sets SQL_MAP_PANDAS_ITER_UDF here. - f.copy(evalType = PythonEvalType.SQL_MAP_PANDAS_ITER_UDF), - f.dataType.asInstanceOf[StructType].toAttributes, + MapInPandas( + func, + func.dataType.asInstanceOf[StructType].toAttributes, logicalPlan)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index ea0c970..c4d5a2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -682,8 +682,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { f, p, b, is, ot, planLater(child)) :: Nil case logical.FlatMapGroupsInPandas(grouping, func, output, child) => execution.python.FlatMapGroupsInPandasExec(grouping, func, output, planLater(child)) :: Nil - case logical.MapPartitionsInPandas(func, output, child) => - execution.python.MapPartitionsInPandasExec(func, output, planLater(child)) :: Nil + case logical.MapInPandas(func, output, child) => + execution.python.MapInPandasExec(func, output, planLater(child)) :: Nil case logical.MapElements(f, _, _, objAttr, child) => execution.MapElementsExec(f, objAttr, planLater(child)) :: Nil case logical.AppendColumns(f, _, _, in, out, child) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapPartitionsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInPandasExec.scala similarity index 99% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapPartitionsInPandasExec.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInPandasExec.scala index 814366c..2bb8081 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapPartitionsInPandasExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInPandasExec.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch} * `org.apache.spark.sql.catalyst.plans.logical.MapPartitionsInRWithArrow` * */ -case class MapPartitionsInPandasExec( +case class MapInPandasExec( func: Expression, output: Seq[Attribute], child: SparkPlan) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org