[spark] branch master updated: [SPARK-28128][PYTHON][SQL] Pandas Grouped UDFs skip empty partitions
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5ad1053 [SPARK-28128][PYTHON][SQL] Pandas Grouped UDFs skip empty partitions 5ad1053 is described below commit 5ad1053f3e8b7acab58e07e7548e7f14e192e5b4 Author: Bryan Cutler AuthorDate: Sat Jun 22 11:20:35 2019 +0900 [SPARK-28128][PYTHON][SQL] Pandas Grouped UDFs skip empty partitions ## What changes were proposed in this pull request? When running FlatMapGroupsInPandasExec or AggregateInPandasExec the shuffle uses a default number of partitions of 200 in "spark.sql.shuffle.partitions". If the data is small, e.g. in testing, many of the partitions will be empty but are treated just the same. This PR checks the `mapPartitionsInternal` iterator to be non-empty before calling `ArrowPythonRunner` to start computation on the iterator. ## How was this patch tested? Existing tests. Ran the following benchmarks a simple example where most partitions are empty: ```python from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.types import * df = spark.createDataFrame( [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v")) pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) def normalize(pdf): v = pdf.v return pdf.assign(v=(v - v.mean()) / v.std()) df.groupby("id").apply(normalize).count() ``` **Before** ``` In [4]: %timeit df.groupby("id").apply(normalize).count() 1.58 s ± 62.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) In [5]: %timeit df.groupby("id").apply(normalize).count() 1.52 s ± 29.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) In [6]: %timeit df.groupby("id").apply(normalize).count() 1.52 s ± 37.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) ``` **After this Change** ``` In [2]: %timeit df.groupby("id").apply(normalize).count() 646 ms ± 89.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) In [3]: %timeit df.groupby("id").apply(normalize).count() 408 ms ± 84.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) In [4]: %timeit df.groupby("id").apply(normalize).count() 381 ms ± 29.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) ``` Closes #24926 from BryanCutler/pyspark-pandas_udf-map-agg-skip-empty-parts-SPARK-28128. Authored-by: Bryan Cutler Signed-off-by: HyukjinKwon --- python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py | 13 + python/pyspark/sql/tests/test_pandas_udf_grouped_map.py | 12 .../spark/sql/execution/python/AggregateInPandasExec.scala | 5 +++-- .../sql/execution/python/FlatMapGroupsInPandasExec.scala| 5 +++-- 4 files changed, 31 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py b/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py index 9eda1aa..f5fd725 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py +++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py @@ -18,6 +18,7 @@ import unittest from pyspark.rdd import PythonEvalType +from pyspark.sql import Row from pyspark.sql.functions import array, explode, col, lit, mean, sum, \ udf, pandas_udf, PandasUDFType from pyspark.sql.types import * @@ -461,6 +462,18 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase): expected = [1, 5] self.assertEqual(actual, expected) +def test_grouped_with_empty_partition(self): +data = [Row(id=1, x=2), Row(id=1, x=3), Row(id=2, x=4)] +expected = [Row(id=1, sum=5), Row(id=2, x=4)] +num_parts = len(data) + 1 +df = self.spark.createDataFrame(self.sc.parallelize(data, numSlices=num_parts)) + +f = pandas_udf(lambda x: x.sum(), + 'int', PandasUDFType.GROUPED_AGG) + +result = df.groupBy('id').agg(f(df['x']).alias('sum')).collect() +self.assertEqual(result, expected) + if __name__ == "__main__": from pyspark.sql.tests.test_pandas_udf_grouped_agg import * diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py index 1d87c63..32d6720 100644 --- a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py +++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py @@ -504,6 +504,18 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase): self.assertEquals(result.collect()[0]['sum'], 165) +def test_grouped_with_empty_partition(self): +data = [Row(id=1, x=2), Row(id=1, x=3), Row(id=2, x=4)] +expected = [Row(id=1, x=5), Row(id=1, x=5), Row(id=2, x=4)] +num_parts = len(data) + 1
[spark] branch branch-2.3 updated: [SPARK-28093][SPARK-28109][SQL][2.3] Fix TRIM/LTRIM/RTRIM function parameter order issue
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.3 by this push: new 1201a0a [SPARK-28093][SPARK-28109][SQL][2.3] Fix TRIM/LTRIM/RTRIM function parameter order issue 1201a0a is described below commit 1201a0a7580a8c41d0501c95826c900f84e1db45 Author: Yuming Wang AuthorDate: Fri Jun 21 18:40:23 2019 -0700 [SPARK-28093][SPARK-28109][SQL][2.3] Fix TRIM/LTRIM/RTRIM function parameter order issue ## What changes were proposed in this pull request? This pr backport #24902 and #24911 to branch-2.3. ## How was this patch tested? unit tests Closes #24908 from wangyum/SPARK-28093-branch-2.3. Authored-by: Yuming Wang Signed-off-by: Dongjoon Hyun --- .../apache/spark/sql/catalyst/parser/SqlBase.g4| 6 +- .../catalyst/expressions/stringExpressions.scala | 6 +- .../spark/sql/catalyst/parser/AstBuilder.scala | 43 +++--- .../expressions/StringExpressionsSuite.scala | 11 .../sql/catalyst/parser/PlanParserSuite.scala | 24 +--- .../parser/TableIdentifierParserSuite.scala| 2 +- .../sql-tests/inputs/string-functions.sql | 12 +++- .../sql-tests/results/string-functions.sql.out | 66 +- 8 files changed, 130 insertions(+), 40 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 5fa75fe..15fd48b 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -582,12 +582,12 @@ primaryExpression | '(' query ')' #subqueryExpression | qualifiedName '(' (setQuantifier? argument+=expression (',' argument+=expression)*)? ')' (OVER windowSpec)? #functionCall -| qualifiedName '(' trimOption=(BOTH | LEADING | TRAILING) argument+=expression - FROM argument+=expression ')' #functionCall | value=primaryExpression '[' index=valueExpression ']' #subscript | identifier #columnReference | base=primaryExpression '.' fieldName=identifier #dereference | '(' expression ')' #parenthesizedExpression +| TRIM '(' trimOption=(BOTH | LEADING | TRAILING) (trimStr=valueExpression)? + FROM srcStr=valueExpression ')' #trim ; constant @@ -735,6 +735,7 @@ nonReserved | VIEW | REPLACE | IF | POSITION +| TRIM | NO | DATA | START | TRANSACTION | COMMIT | ROLLBACK | IGNORE | SORT | CLUSTER | DISTRIBUTE | UNSET | TBLPROPERTIES | SKEWED | STORED | DIRECTORIES | LOCATION @@ -872,6 +873,7 @@ TRAILING: 'TRAILING'; IF: 'IF'; POSITION: 'POSITION'; +TRIM: 'TRIM'; EQ : '=' | '=='; NSEQ: '<=>'; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index c855581..1166e77 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -714,7 +714,7 @@ case class StringTrim( trimStr: Option[Expression] = None) extends String2TrimExpression { - def this(trimStr: Expression, srcStr: Expression) = this(srcStr, Option(trimStr)) + def this(srcStr: Expression, trimStr: Expression) = this(srcStr, Option(trimStr)) def this(srcStr: Expression) = this(srcStr, None) @@ -814,7 +814,7 @@ case class StringTrimLeft( trimStr: Option[Expression] = None) extends String2TrimExpression { - def this(trimStr: Expression, srcStr: Expression) = this(srcStr, Option(trimStr)) + def this(srcStr: Expression, trimStr: Expression) = this(srcStr, Option(trimStr)) def this(srcStr: Expression) = this(srcStr, None) @@ -917,7 +917,7 @@ case class StringTrimRight( trimStr: Option[Expression] = None) extends String2TrimExpression { - def this(trimStr: Expression, srcStr: Expression) = this(srcStr, Option(trimStr)) + def this(srcStr: Expression, trimStr: Expression) = this(srcStr, Option(trimStr)) def this(srcStr: Expression) = this(srcStr, None) diff --git
[spark] branch master updated (9b9d81b -> 113f8c8)
This is an automated email from the ASF dual-hosted git repository. cutlerb pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 9b9d81b [SPARK-28131][PYTHON] Update document type conversion between Python data and SQL types in normal UDFs (Python 3.7) add 113f8c8 [SPARK-28132][PYTHON] Update document type conversion for Pandas UDFs (pyarrow 0.13.0, pandas 0.24.2, Python 3.7) No new revisions were added by this update. Summary of changes: python/pyspark/sql/functions.py | 47 +++-- 1 file changed, 22 insertions(+), 25 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (54da3bb -> 9b9d81b)
This is an automated email from the ASF dual-hosted git repository. cutlerb pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 54da3bb [SPARK-28127][SQL] Micro optimization on TreeNode's mapChildren method add 9b9d81b [SPARK-28131][PYTHON] Update document type conversion between Python data and SQL types in normal UDFs (Python 3.7) No new revisions were added by this update. Summary of changes: python/pyspark/sql/functions.py | 43 - 1 file changed, 21 insertions(+), 22 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated: [SPARK-26038][BRANCH-2.4] Decimal toScalaBigInt/toJavaBigInteger for decimals not fitting in long
This is an automated email from the ASF dual-hosted git repository. joshrosen pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-2.4 by this push: new a71e90a [SPARK-26038][BRANCH-2.4] Decimal toScalaBigInt/toJavaBigInteger for decimals not fitting in long a71e90a is described below commit a71e90a76a982dde09d3b60bb2cf4548c62f57a1 Author: Juliusz Sompolski AuthorDate: Fri Jun 21 07:56:49 2019 -0700 [SPARK-26038][BRANCH-2.4] Decimal toScalaBigInt/toJavaBigInteger for decimals not fitting in long This is a Spark 2.4.x backport of #23022. Original description follows below: ## What changes were proposed in this pull request? Fix Decimal `toScalaBigInt` and `toJavaBigInteger` used to only work for decimals not fitting long. ## How was this patch tested? Added test to DecimalSuite. Closes #24928 from JoshRosen/joshrosen/SPARK-26038-backport. Authored-by: Juliusz Sompolski Signed-off-by: Josh Rosen --- .../main/scala/org/apache/spark/sql/types/Decimal.scala | 16 ++-- .../scala/org/apache/spark/sql/types/DecimalSuite.scala | 11 +++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala index 9eed2eb..12182324 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala @@ -185,9 +185,21 @@ final class Decimal extends Ordered[Decimal] with Serializable { } } - def toScalaBigInt: BigInt = BigInt(toLong) + def toScalaBigInt: BigInt = { +if (decimalVal.ne(null)) { + decimalVal.toBigInt() +} else { + BigInt(toLong) +} + } - def toJavaBigInteger: java.math.BigInteger = java.math.BigInteger.valueOf(toLong) + def toJavaBigInteger: java.math.BigInteger = { +if (decimalVal.ne(null)) { + decimalVal.underlying().toBigInteger() +} else { + java.math.BigInteger.valueOf(toLong) +} + } def toUnscaledLong: Long = { if (decimalVal.ne(null)) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala index 10de90c..8abd762 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala @@ -228,4 +228,15 @@ class DecimalSuite extends SparkFunSuite with PrivateMethodTester { val decimal = Decimal.apply(bigInt) assert(decimal.toJavaBigDecimal.unscaledValue.toString === "9223372036854775808") } + + test("SPARK-26038: toScalaBigInt/toJavaBigInteger") { +// not fitting long +val decimal = Decimal("1234568790123456789012348790.1234879012345678901234568790") +assert(decimal.toScalaBigInt == scala.math.BigInt("1234568790123456789012348790")) +assert(decimal.toJavaBigInteger == new java.math.BigInteger("1234568790123456789012348790")) +// fitting long +val decimalLong = Decimal(123456789123456789L, 18, 9) +assert(decimalLong.toScalaBigInt == scala.math.BigInt("123456789")) +assert(decimalLong.toJavaBigInteger == new java.math.BigInteger("123456789")) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org