[spark] branch master updated: [SPARK-28128][PYTHON][SQL] Pandas Grouped UDFs skip empty partitions

2019-06-21 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5ad1053  [SPARK-28128][PYTHON][SQL] Pandas Grouped UDFs skip empty 
partitions
5ad1053 is described below

commit 5ad1053f3e8b7acab58e07e7548e7f14e192e5b4
Author: Bryan Cutler 
AuthorDate: Sat Jun 22 11:20:35 2019 +0900

[SPARK-28128][PYTHON][SQL] Pandas Grouped UDFs skip empty partitions

## What changes were proposed in this pull request?

When running FlatMapGroupsInPandasExec or AggregateInPandasExec the shuffle 
uses a default number of partitions of 200 in "spark.sql.shuffle.partitions". 
If the data is small, e.g. in testing, many of the partitions will be empty but 
are treated just the same.

This PR checks the `mapPartitionsInternal` iterator to be non-empty before 
calling `ArrowPythonRunner` to start computation on the iterator.

## How was this patch tested?

Existing tests. Ran the following benchmarks a simple example where most 
partitions are empty:

```python
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *

df = spark.createDataFrame(
 [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
 ("id", "v"))

pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def normalize(pdf):
v = pdf.v
return pdf.assign(v=(v - v.mean()) / v.std())

df.groupby("id").apply(normalize).count()
```

**Before**
```
In [4]: %timeit df.groupby("id").apply(normalize).count()
1.58 s ± 62.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [5]: %timeit df.groupby("id").apply(normalize).count()
1.52 s ± 29.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [6]: %timeit df.groupby("id").apply(normalize).count()
1.52 s ± 37.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
```

**After this Change**
```
In [2]: %timeit df.groupby("id").apply(normalize).count()
646 ms ± 89.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [3]: %timeit df.groupby("id").apply(normalize).count()
408 ms ± 84.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [4]: %timeit df.groupby("id").apply(normalize).count()
381 ms ± 29.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
```

Closes #24926 from 
BryanCutler/pyspark-pandas_udf-map-agg-skip-empty-parts-SPARK-28128.

Authored-by: Bryan Cutler 
Signed-off-by: HyukjinKwon 
---
 python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py | 13 +
 python/pyspark/sql/tests/test_pandas_udf_grouped_map.py | 12 
 .../spark/sql/execution/python/AggregateInPandasExec.scala  |  5 +++--
 .../sql/execution/python/FlatMapGroupsInPandasExec.scala|  5 +++--
 4 files changed, 31 insertions(+), 4 deletions(-)

diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py 
b/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py
index 9eda1aa..f5fd725 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py
@@ -18,6 +18,7 @@
 import unittest
 
 from pyspark.rdd import PythonEvalType
+from pyspark.sql import Row
 from pyspark.sql.functions import array, explode, col, lit, mean, sum, \
 udf, pandas_udf, PandasUDFType
 from pyspark.sql.types import *
@@ -461,6 +462,18 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
 expected = [1, 5]
 self.assertEqual(actual, expected)
 
+def test_grouped_with_empty_partition(self):
+data = [Row(id=1, x=2), Row(id=1, x=3), Row(id=2, x=4)]
+expected = [Row(id=1, sum=5), Row(id=2, x=4)]
+num_parts = len(data) + 1
+df = self.spark.createDataFrame(self.sc.parallelize(data, 
numSlices=num_parts))
+
+f = pandas_udf(lambda x: x.sum(),
+   'int', PandasUDFType.GROUPED_AGG)
+
+result = df.groupBy('id').agg(f(df['x']).alias('sum')).collect()
+self.assertEqual(result, expected)
+
 
 if __name__ == "__main__":
 from pyspark.sql.tests.test_pandas_udf_grouped_agg import *
diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py 
b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
index 1d87c63..32d6720 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
@@ -504,6 +504,18 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
 
 self.assertEquals(result.collect()[0]['sum'], 165)
 
+def test_grouped_with_empty_partition(self):
+data = [Row(id=1, x=2), Row(id=1, x=3), Row(id=2, x=4)]
+expected = [Row(id=1, x=5), Row(id=1, x=5), Row(id=2, x=4)]
+num_parts = len(data) + 1

[spark] branch branch-2.3 updated: [SPARK-28093][SPARK-28109][SQL][2.3] Fix TRIM/LTRIM/RTRIM function parameter order issue

2019-06-21 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
 new 1201a0a  [SPARK-28093][SPARK-28109][SQL][2.3] Fix TRIM/LTRIM/RTRIM 
function parameter order issue
1201a0a is described below

commit 1201a0a7580a8c41d0501c95826c900f84e1db45
Author: Yuming Wang 
AuthorDate: Fri Jun 21 18:40:23 2019 -0700

[SPARK-28093][SPARK-28109][SQL][2.3] Fix TRIM/LTRIM/RTRIM function 
parameter order issue

## What changes were proposed in this pull request?

This pr backport #24902 and #24911 to branch-2.3.

## How was this patch tested?

unit tests

Closes #24908 from wangyum/SPARK-28093-branch-2.3.

Authored-by: Yuming Wang 
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/sql/catalyst/parser/SqlBase.g4|  6 +-
 .../catalyst/expressions/stringExpressions.scala   |  6 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala | 43 +++---
 .../expressions/StringExpressionsSuite.scala   | 11 
 .../sql/catalyst/parser/PlanParserSuite.scala  | 24 +---
 .../parser/TableIdentifierParserSuite.scala|  2 +-
 .../sql-tests/inputs/string-functions.sql  | 12 +++-
 .../sql-tests/results/string-functions.sql.out | 66 +-
 8 files changed, 130 insertions(+), 40 deletions(-)

diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 5fa75fe..15fd48b 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -582,12 +582,12 @@ primaryExpression
 | '(' query ')'
#subqueryExpression
 | qualifiedName '(' (setQuantifier? argument+=expression (',' 
argument+=expression)*)? ')'
(OVER windowSpec)?  
#functionCall
-| qualifiedName '(' trimOption=(BOTH | LEADING | TRAILING) 
argument+=expression
-  FROM argument+=expression ')'
#functionCall
 | value=primaryExpression '[' index=valueExpression ']'
#subscript
 | identifier   
#columnReference
 | base=primaryExpression '.' fieldName=identifier  
#dereference
 | '(' expression ')'   
#parenthesizedExpression
+| TRIM '(' trimOption=(BOTH | LEADING | TRAILING) 
(trimStr=valueExpression)?
+   FROM srcStr=valueExpression ')' 
#trim
 ;
 
 constant
@@ -735,6 +735,7 @@ nonReserved
 | VIEW | REPLACE
 | IF
 | POSITION
+| TRIM
 | NO | DATA
 | START | TRANSACTION | COMMIT | ROLLBACK | IGNORE
 | SORT | CLUSTER | DISTRIBUTE | UNSET | TBLPROPERTIES | SKEWED | STORED | 
DIRECTORIES | LOCATION
@@ -872,6 +873,7 @@ TRAILING: 'TRAILING';
 
 IF: 'IF';
 POSITION: 'POSITION';
+TRIM: 'TRIM';
 
 EQ  : '=' | '==';
 NSEQ: '<=>';
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
index c855581..1166e77 100755
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
@@ -714,7 +714,7 @@ case class StringTrim(
 trimStr: Option[Expression] = None)
   extends String2TrimExpression {
 
-  def this(trimStr: Expression, srcStr: Expression) = this(srcStr, 
Option(trimStr))
+  def this(srcStr: Expression, trimStr: Expression) = this(srcStr, 
Option(trimStr))
 
   def this(srcStr: Expression) = this(srcStr, None)
 
@@ -814,7 +814,7 @@ case class StringTrimLeft(
 trimStr: Option[Expression] = None)
   extends String2TrimExpression {
 
-  def this(trimStr: Expression, srcStr: Expression) = this(srcStr, 
Option(trimStr))
+  def this(srcStr: Expression, trimStr: Expression) = this(srcStr, 
Option(trimStr))
 
   def this(srcStr: Expression) = this(srcStr, None)
 
@@ -917,7 +917,7 @@ case class StringTrimRight(
 trimStr: Option[Expression] = None)
   extends String2TrimExpression {
 
-  def this(trimStr: Expression, srcStr: Expression) = this(srcStr, 
Option(trimStr))
+  def this(srcStr: Expression, trimStr: Expression) = this(srcStr, 
Option(trimStr))
 
   def this(srcStr: Expression) = this(srcStr, None)
 
diff --git 

[spark] branch master updated (9b9d81b -> 113f8c8)

2019-06-21 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 9b9d81b  [SPARK-28131][PYTHON] Update document type conversion between 
Python data and SQL types in normal UDFs (Python 3.7)
 add 113f8c8  [SPARK-28132][PYTHON] Update document type conversion for 
Pandas UDFs (pyarrow 0.13.0, pandas 0.24.2, Python 3.7)

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/functions.py | 47 +++--
 1 file changed, 22 insertions(+), 25 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (54da3bb -> 9b9d81b)

2019-06-21 Thread cutlerb
This is an automated email from the ASF dual-hosted git repository.

cutlerb pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git.


from 54da3bb  [SPARK-28127][SQL] Micro optimization on TreeNode's 
mapChildren method
 add 9b9d81b  [SPARK-28131][PYTHON] Update document type conversion between 
Python data and SQL types in normal UDFs (Python 3.7)

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/functions.py | 43 -
 1 file changed, 21 insertions(+), 22 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-2.4 updated: [SPARK-26038][BRANCH-2.4] Decimal toScalaBigInt/toJavaBigInteger for decimals not fitting in long

2019-06-21 Thread joshrosen
This is an automated email from the ASF dual-hosted git repository.

joshrosen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new a71e90a  [SPARK-26038][BRANCH-2.4] Decimal 
toScalaBigInt/toJavaBigInteger for decimals not fitting in long
a71e90a is described below

commit a71e90a76a982dde09d3b60bb2cf4548c62f57a1
Author: Juliusz Sompolski 
AuthorDate: Fri Jun 21 07:56:49 2019 -0700

[SPARK-26038][BRANCH-2.4] Decimal toScalaBigInt/toJavaBigInteger for 
decimals not fitting in long

This is a Spark 2.4.x backport of #23022. Original description follows 
below:

## What changes were proposed in this pull request?

Fix Decimal `toScalaBigInt` and `toJavaBigInteger` used to only work for 
decimals not fitting long.

## How was this patch tested?

Added test to DecimalSuite.

Closes #24928 from JoshRosen/joshrosen/SPARK-26038-backport.

Authored-by: Juliusz Sompolski 
Signed-off-by: Josh Rosen 
---
 .../main/scala/org/apache/spark/sql/types/Decimal.scala  | 16 ++--
 .../scala/org/apache/spark/sql/types/DecimalSuite.scala  | 11 +++
 2 files changed, 25 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
index 9eed2eb..12182324 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
@@ -185,9 +185,21 @@ final class Decimal extends Ordered[Decimal] with 
Serializable {
 }
   }
 
-  def toScalaBigInt: BigInt = BigInt(toLong)
+  def toScalaBigInt: BigInt = {
+if (decimalVal.ne(null)) {
+  decimalVal.toBigInt()
+} else {
+  BigInt(toLong)
+}
+  }
 
-  def toJavaBigInteger: java.math.BigInteger = 
java.math.BigInteger.valueOf(toLong)
+  def toJavaBigInteger: java.math.BigInteger = {
+if (decimalVal.ne(null)) {
+  decimalVal.underlying().toBigInteger()
+} else {
+  java.math.BigInteger.valueOf(toLong)
+}
+  }
 
   def toUnscaledLong: Long = {
 if (decimalVal.ne(null)) {
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
index 10de90c..8abd762 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
@@ -228,4 +228,15 @@ class DecimalSuite extends SparkFunSuite with 
PrivateMethodTester {
 val decimal = Decimal.apply(bigInt)
 assert(decimal.toJavaBigDecimal.unscaledValue.toString === 
"9223372036854775808")
   }
+
+  test("SPARK-26038: toScalaBigInt/toJavaBigInteger") {
+// not fitting long
+val decimal = 
Decimal("1234568790123456789012348790.1234879012345678901234568790")
+assert(decimal.toScalaBigInt == 
scala.math.BigInt("1234568790123456789012348790"))
+assert(decimal.toJavaBigInteger == new 
java.math.BigInteger("1234568790123456789012348790"))
+// fitting long
+val decimalLong = Decimal(123456789123456789L, 18, 9)
+assert(decimalLong.toScalaBigInt == scala.math.BigInt("123456789"))
+assert(decimalLong.toJavaBigInteger == new 
java.math.BigInteger("123456789"))
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org