svn commit: r63350 - /dev/spark/KEYS

2023-08-03 Thread yumwang
Author: yumwang
Date: Fri Aug  4 05:56:48 2023
New Revision: 63350

Log:
Update KEYS

Modified:
dev/spark/KEYS

Modified: dev/spark/KEYS
==
--- dev/spark/KEYS (original)
+++ dev/spark/KEYS Fri Aug  4 05:56:48 2023
@@ -1108,61 +1108,61 @@ kR7loYvuYi9fxvlaW0kc3Bd10JCHCEmobHno5Gfl
 =l4ux
 -END PGP PUBLIC KEY BLOCK-
 
-pub   rsa4096 2023-08-01 [SC]
-  3A1377B1A8B067354AB2A590C94CC8441B16BB6E
+pub   rsa4096 2023-08-04 [SC]
+  64FD17541B72D286FF9C7F5BA352F9C17B8DABD8
 uid  Yuming Wang (CODE SIGNING KEY) 
-sub   rsa4096 2023-08-01 [E]
+sub   rsa4096 2023-08-04 [E]
 -BEGIN PGP PUBLIC KEY BLOCK-
 
-mQINBGTIaYgBEAC9jDvoAq+A4PeFCy7oSLE5csGV64YMstUQS1Wz/LdBLFTd9lpD
-Ik7Tmd7ykiKtDGXHKzNen8ClwRcN3fn6xFhCG9cc7V7LCyLFaPdAmuNv3IYsNR4+
-hLpVNl86t2f2Ns/yULjswZ+yrqx0P8lZq62Wnsa2vJYyVqQCh9LsRmcxUcHRDJ9T
-5n/Remoh84WwOxrg2NFTWCSHN3hM+LatUSQ4ruUsbcr17SkOaWOeP+jLdVR7kRS6
-u/MDJlIBWAvGNy8QJcXL5cgg26N4vArCL/r7sAjyyBAzLs2H/0bZ8G4Cq3XqDlRb
-WN854RpXTt3YeusSg4UyTdvVE9qXr0Stpku1/97s2I4Nf0Zyka+WdLgbJAkTyFXT
-gSPGsoMZJb6nvohYdngcVlTw5UnjFkwAlWW1t+rXcpIrx1RJuS3TwfkszdyKTSCi
-VW971s5KH3XK2QClndCjIyi7JgWwBT/XIQKgsH4VKaR+YfwKvV3vE5uXNk8Zr1kK
-XtR9/jn4voMmyO4xtmk7epXqfjPOhBJg0jdS5d+c+j6mXk2TY1kVRzR4tlbf/lDl
-M4cBAD2pweavveZH6b4h70pxsvo0Wze8ref862miON0d4+w6Hpl6mWBA0AQJw9sj
-dfysxc/0xNPrHGPA6eLzdOVvbkK0q5lQKqceHoy+Hkg9knk+zv/LC22IRwARAQAB
+mQINBGTMkbkBEACgopx3RbV2uoANDjV3jw0utLKc7LpYRue2FutmtsYrpd1d/Tdo
+h2xSGOzt1gsstbXLytamlOy7w2cNWFVGA0bMir/jmNQqjJRB26CLoc8jG8WMyyYN
+3JXoI/DdvDyAB2rRGKlJAm58OFLG2XOM+ua3n09wQULZz4LhCqr7G2NAqldoPBJZ
+NDWv2qL9fxnxNKfxxT1qmtWN4MYHUz8cI2oJXDN7uVKQDiVjCnYICx3dXwKHjAqn
+PpZR5u9iYE9l/v3PrLSh6IbrgPci9k+QtS+C5W4LbT4jKu4+GPl+gNH5hDUnyTU2
+6qeU2KwwW9UQqPcUb0pE6tI4m+3R+OjvOMbzaXZfVyoBZmd1Be/Eb8LOKXhvSN3h
+bcipUd/fvD3EglSLrvoGrfuE/xSU5bQqrG4UfeD0nI04DUy/sEn31/PId7m2n+Uc
+5BPF27KLIB0AYkso5+1VYGdciYxWRF5bkYsVStZwlaxhnF2XHcdqs/nM2rs1KHU6
+S+plYwqecJ9X4rlYJC5ORyDXwGIsQ5ko2zXXbLVWSzDdaIlJ2LvSE9sYveZDhqq5
+LEDNr5RqxEDsqJ57V7VjbhmlFfZVRp6EscYeQryBgexVArNo0FVIttFtuCdHOEEY
+iXz5I6xeNFblKCmuPnCB0iHKEJtAeMHpKzOB3BnqfacEEYBCgI4z1BrdWQARAQAB
 tDNZdW1pbmcgV2FuZyAoQ09ERSBTSUdOSU5HIEtFWSkgPHl1bXdhbmdAYXBhY2hl
-Lm9yZz6JAlEEEwEIADsWIQQ6E3exqLBnNUqypZDJTMhEGxa7bgUCZMhpiAIbAwUL
-CQgHAgIiAgYVCgkICwIEFgIDAQIeBwIXgAAKCRDJTMhEGxa7buQOEACvffrZDNO9
-+6+WB4bRYU+d3FOf9RL7/ZRaaU+jgpQDyWMyslKSG6oHNfnFTJl7QpnCkeYDWODl
-oekfJr+Ntx2X+875kMmSbC+IlYScEOVh2+WIfuCTVmEM+BJc3VNdoO/2l1KVDvsZ
-rBEMLFKnEmpAmGd8AUJVnRb4ZgRCEd1b1e9M2wyAw0toBw7XsDWHoVOpSXY4bduC
-UZuhfIhDmDa/Xfqj92lyE9fnQmGotrP4OhKsGdHKEmTArtB9PCGTf3fOzEg9mygK
-TuYQLkJSpS1Lwyp8192FpW8nCzs/yF0oBGhqeBFKtnRO23VjtklWAgd0SmSx6Juy
-EmDKo3VSuT0F+4SAtXzGu56ZMNXu/DJdEjJu8uKKeqdyJXOfQ0cmeMsWu3A7n1V4
-a0gJ/4XyFl1aGHuNNZGxewKmCsSU6ixWi42JsL00qEb6sTViyzBPxlw3Y5bUyFHa
-LOc2s1u+NCiJFu5Ucy4UmexoQrvI3w7nldson107TSIw/6wjA1uTUtaFx43RGphm
-e54ihMa3p+mBUHsWn37JEcT0X13vzPPMEPPpA9tJW+NjyCzoppZdh3CONVnQTMSQ
-eyv8xpq9lYglHTJME0UlMfK/YGd6Wgc4ZJWpt1g4XyfUyiYj+dek6RjyGUfE8NIK
-91PPrINXqkNiZuiPk28xsHyuBNyxsCDs6rkCDQRkyGmIARAAsKQDzF71zTUiFL6s
-kwBiZc9EeR7tGq6DYFpy5hMaqwLicQCKBcLl5izaOham8hzCHd6bII1OFMDWKFM3
-05L6revZPgypG0InXM9j4irIsYgZAACgUkg2gA5JHbSJ78Q4dUH33qCClz0Z4Dyv
-v/h32Tl9AJZNchzowC9DZPCU+pUQnSCjKmVsDmOvdYJ5PuQCQ+KEneXBC224DfZ5
-o3Q+4B3Vmvpb72DX0az/4G509P5gF4lvdeTRgSCnE+Rwm9mM4+Moks5O5xwbWwKy
-GcEW44uO9KAnK9BaUgE6TL0WG4nZTRA2CZixolxcCCml6wY0Gt3AYRs9ia1lR79K
-g7OrUzWaUQR+e8xuddbtMYKpC50g+8mZQaxo0RLP+u0ScXnvg8XP/zrw+VfPOb9Z
-jMVe4MArGMCEolttshCGd7rIY28YErfEiN3gVIf8YbhzRL4xFZhChULqvINnp+6V
-WTqui9R6SJ/geMQaMBOEz1lCTAt4j56VBsuYjpiPVDK1K/nCswOpMgkPq1BS1g+g
-lWB9po00V7d6+oAQk3Ck3ryKf/E6o8bGBVglzFg7BGPqyPJ+ZlgyZE/5+AFsSE4G
-XdKRvNsxcCTzTKied8DtDq43EIkY3uhX+Zoup9MxefGQKeIBTqN+AZuFXNzT3IF6
-nIk/h8SLl3M3WUL1++7kOITXmqsAEQEAAYkCNgQYAQgAIBYhBDoTd7GosGc1SrKl
-kMlMyEQbFrtuBQJkyGmIAhsMAAoJEMlMyEQbFrtu/FYQAIaIHzU5tzKVn8YtAtSF
-j3Qxe+wjrvGHYXx7hivwItfmqh7lSD1XD4LftJi5sjnBm2mKVI8vXnjqto2hpxL4
-nnRLmW7wITsLEmu8EeyeKhtSZgPTPIWrhEjQBTDZvjVvKox+XWn3H3O+JderjYrX
-zcN15T/paDBZgLXNBrVpxtUu1TQOmv2FPHNP8qNA+WazuqgIwS2nf95lbAxBNqCS
-+b/XFVgJaL0ozZhrlp7Bmd3VHLZ2TaUBgJ+F1uTYjT2mFCG2h2nk2whHLkPaPgYA
-ogZVBw2iI5WPV6f2N1HcEv2T+lB10LFL1Zm98rQyNhFpzpGddGvPMgZiM9PpNwif
-bvFJ+00g0XwN0sxrRFlTlqLTnvbG/wTD/qiYTlSNC0qiJY8sEVQ777zkEtlS3wkh
-OjE+GvE8bYNWJCpK42E8p/VENywU8nKrA0Q4S1pJ8xMg73dFZuk1QdBZg5/vSmiz
-7cmdEjHxnRqFo9NNc8OmVhaWK/JFJkidZJreHiMcsd2jsR1fgK5/ywL0mhgektfw
-1ADkUvIe6aoGCQLBFbTCcjqfKOe08YMcDG9nwGH3tHrsov0qHz73v0uEktcXFrjn
-zrtbJJnBfKuqTzGLfXoVkl2CMUuTUYi27wdBR3c7Pp/wSUq7RA5bLjBR78SUNzh7
-FU8PmKQE113B7a6dd+caANqr
-=LU9z
+Lm9yZz6JAlEEEwEIADsWIQRk/RdUG3LShv+cf1ujUvnBe42r2AUCZMyRuQIbAwUL
+CQgHAgIiAgYVCgkICwIEFgIDAQIeBwIXgAAKCRCjUvnBe42r2NIYEACQ506btjZE
+0fxzr2cYtn1kCs2kIW401Sslbij71jL8e0A+sNfWo8kICxyHyWX8i/55HDLk4lkr
+pu9gqUhRpaJb0keR5/9muZ9F+vG4or/Bv8OCaIf9UCa/ee/wtHsA29JgH6wEaOOc
+7niBgxyvWuVQ6nshn9tswjDQ/2ffGUScvdRTn8OdkCpKDFsnQ9JEoorbyVlAZfMg
+DQt2iX3QtLF5J6pWzjsVnwfyC8qRmrdR4XeJt1Qt11IM0xhJT+IFui6j8lxhbVeA

[spark] branch master updated: [SPARK-44582][SQL] Skip iterator on SMJ if it was cleaned up

2023-08-03 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2c2d6534beb [SPARK-44582][SQL] Skip iterator on SMJ if it was cleaned 
up
2c2d6534beb is described below

commit 2c2d6534bebed3c7bfa0842b84aa27674b721410
Author: Kun Wan 
AuthorDate: Fri Aug 4 14:24:02 2023 +0900

[SPARK-44582][SQL] Skip iterator on SMJ if it was cleaned up

### What changes were proposed in this pull request?

Bugfix for SMJ which may cause JVM crash.

**When will the JVM crash**

```
Query pattern:

TableScan TableScan
   |  |
 Exchange  Exchange
   |  |
  Sort 1 Sort 2
   |  |
 Window 1  Window 2
\  /
  \  /
SMJ
 |
 |
  WriteFileCommand
```

1. WriteFileCommand call hasNext() to check if the input is empty.
2. SMJ call findNextJoinRows() to find all matched rows.
2.1 SMJ tries to get the first row in the left child.
2.1.1 Sort 1 will sort all the input rows in the Offheap memory.
2.1.2 Window 1 will read one group data and the first row in next group 
(named X), return the first row in the first group.
2.2 SMJ tries to get the first row in the right child.
2.2.1 Sort 2 and Window 2 are empty, do nothing.
2.3  Inner SMJ will finish, since there will definitely be no join rows, 
call earlyCleanupResources() to free offHeap memory.
3. WriteFileCommand call hasNext() again to write the input data to the 
files.
4. SMJ call findNextJoinRows() to find all matched rows.
4.1 SMJ tries to get the first row in the left child.
4.2 Window 1 tries to add row X into the group buffer, which will accesse 
unallocated memory, the JVM may or may not crash.

In this PR, if SMJ has already been cleaned up, skip iterator on it.

### Why are the changes needed?

Bugfix for SMJ.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Test in our production environment.
For unsafe API, when read the unallocated memory, the program may get the 
old value, or get a unexpected value, or cause the JVM crash.
I don't think the UIT will be stable.

The JVM crash stack
```
Stack: [0x7f8a0380,0x7f8a0400],  sp=0x7f8a03ffd620,  
free space=8181k
Native frames: (J=compiled Java code, j=interpreted, Vv=VM code, C=native 
code)
v  ~StubRoutines::jint_disjoint_arraycopy
J 36127 C2 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.add(Lorg/apache/spark/sql/catalyst/expressions/UnsafeRow;)V
 (188 bytes)  0x7f966187ac9f [0x7f966187a820+0x47f]
J 36146 C2 
org.apache.spark.sql.execution.window.WindowExec$$anon$1.next()Ljava/lang/Object;
 (5 bytes)  0x7f9661a8eefc [0x7f9661a8dd60+0x119c]
J 36153 C2 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext()V
 (381 bytes)  0x7f966180185c [0x7f9661801760+0xfc]
J 36246 C2 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.smj_findNextJoinRows_0$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage7;Lscala/collection/Iterator;Lscala/collection/Iterator;)Z
 (392 bytes)  0x7f96607388f0 [0x7f96607381e0+0x710]
J 36249 C1 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext()V
 (109 bytes)  0x7f965fa8ee64 [0x7f965fa8e560+0x904]
J 35645 C2 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext()Z (31 
bytes)  0x7f965fbc58e4 [0x7f965fbc58a0+0x44]
j  
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(Lscala/collection/Iterator;Lorg/apache/spark/sql/execution/datasources/FileFormatDataWriter;)Lorg/apache/spark/sql/execution/datasources/WriteTaskResult;+1
j  
org.apache.spark.sql.execution.datasources.FileFormatWriter$$$Lambda$4398.apply()Ljava/lang/Object;+8
j  
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Lscala/Function0;Lscala/Function0;Lscala/Function0;)Ljava/lang/Object;+4
j  
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(Lorg/apache/spark/sql/execution/datasources/WriteJobDescription;JIIILorg/apache/spark/internal/io/FileCommitProtocol;ILscala/collection/Iterator;)Lorg/apache/spark/sql/execution/datasources/WriteTaskResult;+258
J 30523 C1 

[spark] 01/01: Preparing development version 3.3.4-SNAPSHOT

2023-08-03 Thread yumwang
This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 951cb644fa178a9f2f397ef3f1398144087410fe
Author: Yuming Wang 
AuthorDate: Fri Aug 4 04:33:52 2023 +

Preparing development version 3.3.4-SNAPSHOT
---
 R/pkg/DESCRIPTION  | 2 +-
 assembly/pom.xml   | 2 +-
 common/kvstore/pom.xml | 2 +-
 common/network-common/pom.xml  | 2 +-
 common/network-shuffle/pom.xml | 2 +-
 common/network-yarn/pom.xml| 2 +-
 common/sketch/pom.xml  | 2 +-
 common/tags/pom.xml| 2 +-
 common/unsafe/pom.xml  | 2 +-
 core/pom.xml   | 2 +-
 docs/_config.yml   | 6 +++---
 examples/pom.xml   | 2 +-
 external/avro/pom.xml  | 2 +-
 external/docker-integration-tests/pom.xml  | 2 +-
 external/kafka-0-10-assembly/pom.xml   | 2 +-
 external/kafka-0-10-sql/pom.xml| 2 +-
 external/kafka-0-10-token-provider/pom.xml | 2 +-
 external/kafka-0-10/pom.xml| 2 +-
 external/kinesis-asl-assembly/pom.xml  | 2 +-
 external/kinesis-asl/pom.xml   | 2 +-
 external/spark-ganglia-lgpl/pom.xml| 2 +-
 graphx/pom.xml | 2 +-
 hadoop-cloud/pom.xml   | 2 +-
 launcher/pom.xml   | 2 +-
 mllib-local/pom.xml| 2 +-
 mllib/pom.xml  | 2 +-
 pom.xml| 2 +-
 python/pyspark/version.py  | 2 +-
 repl/pom.xml   | 2 +-
 resource-managers/kubernetes/core/pom.xml  | 2 +-
 resource-managers/kubernetes/integration-tests/pom.xml | 2 +-
 resource-managers/mesos/pom.xml| 2 +-
 resource-managers/yarn/pom.xml | 2 +-
 sql/catalyst/pom.xml   | 2 +-
 sql/core/pom.xml   | 2 +-
 sql/hive-thriftserver/pom.xml  | 2 +-
 sql/hive/pom.xml   | 2 +-
 streaming/pom.xml  | 2 +-
 tools/pom.xml  | 2 +-
 39 files changed, 41 insertions(+), 41 deletions(-)

diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION
index 2999723471a..ac01697363e 100644
--- a/R/pkg/DESCRIPTION
+++ b/R/pkg/DESCRIPTION
@@ -1,6 +1,6 @@
 Package: SparkR
 Type: Package
-Version: 3.3.3
+Version: 3.3.4
 Title: R Front End for 'Apache Spark'
 Description: Provides an R Front end for 'Apache Spark' 
.
 Authors@R:
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 7bf2bcebfd0..b3d7ee94bb3 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.12
-3.3.3
+3.3.4-SNAPSHOT
 ../pom.xml
   
 
diff --git a/common/kvstore/pom.xml b/common/kvstore/pom.xml
index 72090c7f0ac..8a8d44dfd94 100644
--- a/common/kvstore/pom.xml
+++ b/common/kvstore/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.12
-3.3.3
+3.3.4-SNAPSHOT
 ../../pom.xml
   
 
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index 20dfa309ecd..e76b78305aa 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.12
-3.3.3
+3.3.4-SNAPSHOT
 ../../pom.xml
   
 
diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index d4090b1dea4..92324bf1746 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.12
-3.3.3
+3.3.4-SNAPSHOT
 ../../pom.xml
   
 
diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml
index 3800c05a16e..6eaf00a3dca 100644
--- a/common/network-yarn/pom.xml
+++ b/common/network-yarn/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.12
-3.3.3
+3.3.4-SNAPSHOT
 ../../pom.xml
   
 
diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml
index fe6517b2806..3fcff5046da 100644
--- a/common/sketch/pom.xml
+++ b/common/sketch/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.12
-3.3.3
+3.3.4-SNAPSHOT
 ../../pom.xml
   
 
diff --git a/common/tags/pom.xml b/common/tags/pom.xml
index 

[spark] branch branch-3.3 updated (407bb57ac22 -> 951cb644fa1)

2023-08-03 Thread yumwang
This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a change to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


from 407bb57ac22 [SPARK-44653][SQL] Non-trivial DataFrame unions should not 
break caching
 add 8c2b3319c67 Preparing Spark release v3.3.3-rc1
 new 951cb644fa1 Preparing development version 3.3.4-SNAPSHOT

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 R/pkg/DESCRIPTION  | 2 +-
 assembly/pom.xml   | 2 +-
 common/kvstore/pom.xml | 2 +-
 common/network-common/pom.xml  | 2 +-
 common/network-shuffle/pom.xml | 2 +-
 common/network-yarn/pom.xml| 2 +-
 common/sketch/pom.xml  | 2 +-
 common/tags/pom.xml| 2 +-
 common/unsafe/pom.xml  | 2 +-
 core/pom.xml   | 2 +-
 docs/_config.yml   | 6 +++---
 examples/pom.xml   | 2 +-
 external/avro/pom.xml  | 2 +-
 external/docker-integration-tests/pom.xml  | 2 +-
 external/kafka-0-10-assembly/pom.xml   | 2 +-
 external/kafka-0-10-sql/pom.xml| 2 +-
 external/kafka-0-10-token-provider/pom.xml | 2 +-
 external/kafka-0-10/pom.xml| 2 +-
 external/kinesis-asl-assembly/pom.xml  | 2 +-
 external/kinesis-asl/pom.xml   | 2 +-
 external/spark-ganglia-lgpl/pom.xml| 2 +-
 graphx/pom.xml | 2 +-
 hadoop-cloud/pom.xml   | 2 +-
 launcher/pom.xml   | 2 +-
 mllib-local/pom.xml| 2 +-
 mllib/pom.xml  | 2 +-
 pom.xml| 2 +-
 python/pyspark/version.py  | 2 +-
 repl/pom.xml   | 2 +-
 resource-managers/kubernetes/core/pom.xml  | 2 +-
 resource-managers/kubernetes/integration-tests/pom.xml | 2 +-
 resource-managers/mesos/pom.xml| 2 +-
 resource-managers/yarn/pom.xml | 2 +-
 sql/catalyst/pom.xml   | 2 +-
 sql/core/pom.xml   | 2 +-
 sql/hive-thriftserver/pom.xml  | 2 +-
 sql/hive/pom.xml   | 2 +-
 streaming/pom.xml  | 2 +-
 tools/pom.xml  | 2 +-
 39 files changed, 41 insertions(+), 41 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] tag v3.3.3-rc1 created (now 8c2b3319c67)

2023-08-03 Thread yumwang
This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a change to tag v3.3.3-rc1
in repository https://gitbox.apache.org/repos/asf/spark.git


  at 8c2b3319c67 (commit)
This tag includes the following new commits:

 new 8c2b3319c67 Preparing Spark release v3.3.3-rc1

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] 01/01: Preparing Spark release v3.3.3-rc1

2023-08-03 Thread yumwang
This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to tag v3.3.3-rc1
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 8c2b3319c6734250ff9d72f3d7e5cab56b142195
Author: Yuming Wang 
AuthorDate: Fri Aug 4 04:33:19 2023 +

Preparing Spark release v3.3.3-rc1
---
 assembly/pom.xml   | 2 +-
 common/kvstore/pom.xml | 2 +-
 common/network-common/pom.xml  | 2 +-
 common/network-shuffle/pom.xml | 2 +-
 common/network-yarn/pom.xml| 2 +-
 common/sketch/pom.xml  | 2 +-
 common/tags/pom.xml| 2 +-
 common/unsafe/pom.xml  | 2 +-
 core/pom.xml   | 2 +-
 docs/_config.yml   | 2 +-
 examples/pom.xml   | 2 +-
 external/avro/pom.xml  | 2 +-
 external/docker-integration-tests/pom.xml  | 2 +-
 external/kafka-0-10-assembly/pom.xml   | 2 +-
 external/kafka-0-10-sql/pom.xml| 2 +-
 external/kafka-0-10-token-provider/pom.xml | 2 +-
 external/kafka-0-10/pom.xml| 2 +-
 external/kinesis-asl-assembly/pom.xml  | 2 +-
 external/kinesis-asl/pom.xml   | 2 +-
 external/spark-ganglia-lgpl/pom.xml| 2 +-
 graphx/pom.xml | 2 +-
 hadoop-cloud/pom.xml   | 2 +-
 launcher/pom.xml   | 2 +-
 mllib-local/pom.xml| 2 +-
 mllib/pom.xml  | 2 +-
 pom.xml| 2 +-
 python/pyspark/version.py  | 2 +-
 repl/pom.xml   | 2 +-
 resource-managers/kubernetes/core/pom.xml  | 2 +-
 resource-managers/kubernetes/integration-tests/pom.xml | 2 +-
 resource-managers/mesos/pom.xml| 2 +-
 resource-managers/yarn/pom.xml | 2 +-
 sql/catalyst/pom.xml   | 2 +-
 sql/core/pom.xml   | 2 +-
 sql/hive-thriftserver/pom.xml  | 2 +-
 sql/hive/pom.xml   | 2 +-
 streaming/pom.xml  | 2 +-
 tools/pom.xml  | 2 +-
 38 files changed, 38 insertions(+), 38 deletions(-)

diff --git a/assembly/pom.xml b/assembly/pom.xml
index 69d13998a80..7bf2bcebfd0 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.12
-3.3.3-SNAPSHOT
+3.3.3
 ../pom.xml
   
 
diff --git a/common/kvstore/pom.xml b/common/kvstore/pom.xml
index dfab7dc83c8..72090c7f0ac 100644
--- a/common/kvstore/pom.xml
+++ b/common/kvstore/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.12
-3.3.3-SNAPSHOT
+3.3.3
 ../../pom.xml
   
 
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index e4102646b6f..20dfa309ecd 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.12
-3.3.3-SNAPSHOT
+3.3.3
 ../../pom.xml
   
 
diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index 56a52616213..d4090b1dea4 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.12
-3.3.3-SNAPSHOT
+3.3.3
 ../../pom.xml
   
 
diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml
index 77eb4dec215..3800c05a16e 100644
--- a/common/network-yarn/pom.xml
+++ b/common/network-yarn/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.12
-3.3.3-SNAPSHOT
+3.3.3
 ../../pom.xml
   
 
diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml
index cb9bfd6b049..fe6517b2806 100644
--- a/common/sketch/pom.xml
+++ b/common/sketch/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.12
-3.3.3-SNAPSHOT
+3.3.3
 ../../pom.xml
   
 
diff --git a/common/tags/pom.xml b/common/tags/pom.xml
index 46714bb0476..a2b3f5a3383 100644
--- a/common/tags/pom.xml
+++ b/common/tags/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.12
-3.3.3-SNAPSHOT
+3.3.3
 ../../pom.xml
   
 
diff --git a/common/unsafe/pom.xml b/common/unsafe/pom.xml
index 7b39d096277..63dd3418493 100644
--- a/common/unsafe/pom.xml
+++ b/common/unsafe/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.12
-

[spark] branch branch-3.3 updated: [SPARK-44653][SQL] Non-trivial DataFrame unions should not break caching

2023-08-03 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 407bb57ac22 [SPARK-44653][SQL] Non-trivial DataFrame unions should not 
break caching
407bb57ac22 is described below

commit 407bb57ac2242471cb68dd279b265bc4c9cbeb5e
Author: Wenchen Fan 
AuthorDate: Fri Aug 4 11:26:36 2023 +0800

[SPARK-44653][SQL] Non-trivial DataFrame unions should not break caching

We have a long-standing tricky optimization in `Dataset.union`, which 
invokes the optimizer rule `CombineUnions` to pre-optimize the analyzed plan. 
This is to avoid too large analyzed plan for a specific dataframe query pattern 
`df1.union(df2).union(df3).union...`.

This tricky optimization is designed to break dataframe caching, but we 
thought it was fine as people usually won't cache the intermediate dataframe in 
a union chain. However, `CombineUnions` gets improved from time to time (e.g. 
https://github.com/apache/spark/pull/35214) and now it can optimize a wide 
range of Union patterns. Now it's possible that people union two dataframe, do 
something with `select`, and cache it. Then the dataframe is unioned again with 
other dataframes and peop [...]

This PR updates `Dataset.union` to only combine adjacent Unions to match 
the original purpose.

Fix perf regression due to breaking df caching

no

new test

Closes #42315 from cloud-fan/union.

Lead-authored-by: Wenchen Fan 
Co-authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
(cherry picked from commit ce1fe57cdd7004a891ef8b97c77ac96b3719efcd)
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  2 +-
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 56 ++
 .../org/apache/spark/sql/DatasetCacheSuite.scala   | 21 
 3 files changed, 70 insertions(+), 9 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 9794a310b6d..a2af6db8020 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -159,7 +159,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
 //   since the other rules might make two separate Unions operators 
adjacent.
 Batch("Inline CTE", Once,
   InlineCTE()) ::
-Batch("Union", Once,
+Batch("Union", fixedPoint,
   RemoveNoopOperators,
   CombineUnions,
   RemoveNoopUnion) ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 6ef9bc2a703..233b61926c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -42,12 +42,11 @@ import 
org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions}
-import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
PartitioningCollection}
-import org.apache.spark.sql.catalyst.trees.TreeNodeTag
+import org.apache.spark.sql.catalyst.trees.{TreeNodeTag, TreePattern}
 import org.apache.spark.sql.catalyst.util.IntervalUtils
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.execution._
@@ -2044,6 +2043,51 @@ class Dataset[T] private[sql](
 Limit(Literal(n), logicalPlan)
   }
 
+  // This breaks caching, but it's usually ok because it addresses a very 
specific use case:
+  // using union to union many files or partitions.
+  private def combineUnions(plan: LogicalPlan): LogicalPlan = {
+plan.transformDownWithPruning(_.containsPattern(TreePattern.UNION)) {
+  case Distinct(u: Union) =>
+Distinct(flattenUnion(u, isUnionDistinct = true))
+  // Only handle distinct-like 'Deduplicate', where the keys == output
+  case Deduplicate(keys: Seq[Attribute], u: Union) if AttributeSet(keys) 
== u.outputSet =>
+Deduplicate(keys, flattenUnion(u, true))
+  case u: Union =>
+flattenUnion(u, isUnionDistinct = false)
+}
+  }
+
+  private def flattenUnion(u: Union, isUnionDistinct: Boolean): Union = {
+var changed = false
+// We only need to look at the direct children of Union, as the nested 
adjacent 

[spark] branch branch-3.4 updated: [SPARK-44653][SQL] Non-trivial DataFrame unions should not break caching

2023-08-03 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new b888ea87723 [SPARK-44653][SQL] Non-trivial DataFrame unions should not 
break caching
b888ea87723 is described below

commit b888ea877230976e15b251310e3840e3f73f904f
Author: Wenchen Fan 
AuthorDate: Fri Aug 4 11:26:36 2023 +0800

[SPARK-44653][SQL] Non-trivial DataFrame unions should not break caching

We have a long-standing tricky optimization in `Dataset.union`, which 
invokes the optimizer rule `CombineUnions` to pre-optimize the analyzed plan. 
This is to avoid too large analyzed plan for a specific dataframe query pattern 
`df1.union(df2).union(df3).union...`.

This tricky optimization is designed to break dataframe caching, but we 
thought it was fine as people usually won't cache the intermediate dataframe in 
a union chain. However, `CombineUnions` gets improved from time to time (e.g. 
https://github.com/apache/spark/pull/35214) and now it can optimize a wide 
range of Union patterns. Now it's possible that people union two dataframe, do 
something with `select`, and cache it. Then the dataframe is unioned again with 
other dataframes and peop [...]

This PR updates `Dataset.union` to only combine adjacent Unions to match 
the original purpose.

Fix perf regression due to breaking df caching

no

new test

Closes #42315 from cloud-fan/union.

Lead-authored-by: Wenchen Fan 
Co-authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
(cherry picked from commit ce1fe57cdd7004a891ef8b97c77ac96b3719efcd)
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  2 +-
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 56 ++
 .../org/apache/spark/sql/DatasetCacheSuite.scala   | 21 
 3 files changed, 70 insertions(+), 9 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 1233f2207f5..bc4dbb47c92 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -153,7 +153,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
 //   since the other rules might make two separate Unions operators 
adjacent.
 Batch("Inline CTE", Once,
   InlineCTE()) ::
-Batch("Union", Once,
+Batch("Union", fixedPoint,
   RemoveNoopOperators,
   CombineUnions,
   RemoveNoopUnion) ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 9da2e992a3d..57d63299621 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -41,11 +41,10 @@ import 
org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions}
-import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.trees.TreeNodeTag
+import org.apache.spark.sql.catalyst.trees.{TreeNodeTag, TreePattern}
 import org.apache.spark.sql.catalyst.util.IntervalUtils
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.errors.QueryCompilationErrors.toSQLId
@@ -2279,6 +2278,51 @@ class Dataset[T] private[sql](
 Offset(Literal(n), logicalPlan)
   }
 
+  // This breaks caching, but it's usually ok because it addresses a very 
specific use case:
+  // using union to union many files or partitions.
+  private def combineUnions(plan: LogicalPlan): LogicalPlan = {
+plan.transformDownWithPruning(_.containsPattern(TreePattern.UNION)) {
+  case Distinct(u: Union) =>
+Distinct(flattenUnion(u, isUnionDistinct = true))
+  // Only handle distinct-like 'Deduplicate', where the keys == output
+  case Deduplicate(keys: Seq[Attribute], u: Union) if AttributeSet(keys) 
== u.outputSet =>
+Deduplicate(keys, flattenUnion(u, true))
+  case u: Union =>
+flattenUnion(u, isUnionDistinct = false)
+}
+  }
+
+  private def flattenUnion(u: Union, isUnionDistinct: Boolean): Union = {
+var changed = false
+// We only need to look at the direct children of Union, as the nested 
adjacent Unions should
+// have been combined already by previous 

[spark] branch branch-3.5 updated: [SPARK-44653][SQL] Non-trivial DataFrame unions should not break caching

2023-08-03 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new aeb0cabbea7 [SPARK-44653][SQL] Non-trivial DataFrame unions should not 
break caching
aeb0cabbea7 is described below

commit aeb0cabbea7377b77e7befd937babcfd390816e5
Author: Wenchen Fan 
AuthorDate: Fri Aug 4 11:26:36 2023 +0800

[SPARK-44653][SQL] Non-trivial DataFrame unions should not break caching

### What changes were proposed in this pull request?

We have a long-standing tricky optimization in `Dataset.union`, which 
invokes the optimizer rule `CombineUnions` to pre-optimize the analyzed plan. 
This is to avoid too large analyzed plan for a specific dataframe query pattern 
`df1.union(df2).union(df3).union...`.

This tricky optimization is designed to break dataframe caching, but we 
thought it was fine as people usually won't cache the intermediate dataframe in 
a union chain. However, `CombineUnions` gets improved from time to time (e.g. 
https://github.com/apache/spark/pull/35214) and now it can optimize a wide 
range of Union patterns. Now it's possible that people union two dataframe, do 
something with `select`, and cache it. Then the dataframe is unioned again with 
other dataframes and peop [...]

This PR updates `Dataset.union` to only combine adjacent Unions to match 
the original purpose.

### Why are the changes needed?

Fix perf regression due to breaking df caching

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new test

Closes #42315 from cloud-fan/union.

Lead-authored-by: Wenchen Fan 
Co-authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
(cherry picked from commit ce1fe57cdd7004a891ef8b97c77ac96b3719efcd)
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  2 +-
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 56 ++
 .../org/apache/spark/sql/DatasetCacheSuite.scala   | 21 
 3 files changed, 70 insertions(+), 9 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 95cf3aee16f..bb2a86556c0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -157,7 +157,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
 //   since the other rules might make two separate Unions operators 
adjacent.
 Batch("Inline CTE", Once,
   InlineCTE()) ::
-Batch("Union", Once,
+Batch("Union", fixedPoint,
   RemoveNoopOperators,
   CombineUnions,
   RemoveNoopUnion) ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 7b2259a6d99..61c83829d20 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -42,11 +42,10 @@ import 
org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions}
-import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.trees.TreeNodeTag
+import org.apache.spark.sql.catalyst.trees.{TreeNodeTag, TreePattern}
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, IntervalUtils}
 import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
@@ -2241,6 +2240,51 @@ class Dataset[T] private[sql](
 Offset(Literal(n), logicalPlan)
   }
 
+  // This breaks caching, but it's usually ok because it addresses a very 
specific use case:
+  // using union to union many files or partitions.
+  private def combineUnions(plan: LogicalPlan): LogicalPlan = {
+plan.transformDownWithPruning(_.containsPattern(TreePattern.UNION)) {
+  case Distinct(u: Union) =>
+Distinct(flattenUnion(u, isUnionDistinct = true))
+  // Only handle distinct-like 'Deduplicate', where the keys == output
+  case Deduplicate(keys: Seq[Attribute], u: Union) if AttributeSet(keys) 
== u.outputSet =>
+Deduplicate(keys, flattenUnion(u, true))
+  case u: Union =>
+flattenUnion(u, isUnionDistinct = false)
+}
+  }
+
+  private def flattenUnion(u: Union, 

[spark] branch master updated: [SPARK-44653][SQL] Non-trivial DataFrame unions should not break caching

2023-08-03 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ce1fe57cdd7 [SPARK-44653][SQL] Non-trivial DataFrame unions should not 
break caching
ce1fe57cdd7 is described below

commit ce1fe57cdd7004a891ef8b97c77ac96b3719efcd
Author: Wenchen Fan 
AuthorDate: Fri Aug 4 11:26:36 2023 +0800

[SPARK-44653][SQL] Non-trivial DataFrame unions should not break caching

### What changes were proposed in this pull request?

We have a long-standing tricky optimization in `Dataset.union`, which 
invokes the optimizer rule `CombineUnions` to pre-optimize the analyzed plan. 
This is to avoid too large analyzed plan for a specific dataframe query pattern 
`df1.union(df2).union(df3).union...`.

This tricky optimization is designed to break dataframe caching, but we 
thought it was fine as people usually won't cache the intermediate dataframe in 
a union chain. However, `CombineUnions` gets improved from time to time (e.g. 
https://github.com/apache/spark/pull/35214) and now it can optimize a wide 
range of Union patterns. Now it's possible that people union two dataframe, do 
something with `select`, and cache it. Then the dataframe is unioned again with 
other dataframes and peop [...]

This PR updates `Dataset.union` to only combine adjacent Unions to match 
the original purpose.

### Why are the changes needed?

Fix perf regression due to breaking df caching

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new test

Closes #42315 from cloud-fan/union.

Lead-authored-by: Wenchen Fan 
Co-authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  2 +-
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 56 ++
 .../org/apache/spark/sql/DatasetCacheSuite.scala   | 21 
 3 files changed, 70 insertions(+), 9 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 9fc664bb1c2..f83cd36f0a8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -157,7 +157,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
 //   since the other rules might make two separate Unions operators 
adjacent.
 Batch("Inline CTE", Once,
   InlineCTE()) ::
-Batch("Union", Once,
+Batch("Union", fixedPoint,
   RemoveNoopOperators,
   CombineUnions,
   RemoveNoopUnion) ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 7b2259a6d99..61c83829d20 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -42,11 +42,10 @@ import 
org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions}
-import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.trees.TreeNodeTag
+import org.apache.spark.sql.catalyst.trees.{TreeNodeTag, TreePattern}
 import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
 import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, IntervalUtils}
 import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
@@ -2241,6 +2240,51 @@ class Dataset[T] private[sql](
 Offset(Literal(n), logicalPlan)
   }
 
+  // This breaks caching, but it's usually ok because it addresses a very 
specific use case:
+  // using union to union many files or partitions.
+  private def combineUnions(plan: LogicalPlan): LogicalPlan = {
+plan.transformDownWithPruning(_.containsPattern(TreePattern.UNION)) {
+  case Distinct(u: Union) =>
+Distinct(flattenUnion(u, isUnionDistinct = true))
+  // Only handle distinct-like 'Deduplicate', where the keys == output
+  case Deduplicate(keys: Seq[Attribute], u: Union) if AttributeSet(keys) 
== u.outputSet =>
+Deduplicate(keys, flattenUnion(u, true))
+  case u: Union =>
+flattenUnion(u, isUnionDistinct = false)
+}
+  }
+
+  private def flattenUnion(u: Union, isUnionDistinct: Boolean): Union = {
+var changed = false
+// We only need to look at the direct children of Union, 

[spark] branch branch-3.5 updated: [SPARK-44624][CONNECT] Retry ExecutePlan in case initial request didn't reach server

2023-08-03 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new b2966d762721 [SPARK-44624][CONNECT] Retry ExecutePlan in case initial 
request didn't reach server
b2966d762721 is described below

commit b2966d7627216845d6a1c3854077a02c6d4e84c5
Author: Juliusz Sompolski 
AuthorDate: Fri Aug 4 12:05:19 2023 +0900

[SPARK-44624][CONNECT] Retry ExecutePlan in case initial request didn't 
reach server

### What changes were proposed in this pull request?

If the ExecutePlan never reached the server, a ReattachExecute will fail 
with INVALID_HANDLE.OPERATION_NOT_FOUND. In that case, we could try to send 
ExecutePlan again.

### Why are the changes needed?

This solves an edge case of reattachable execution where the initial 
execution never reached the server.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Testing these failures is difficult, will require some special testing setup

Closes #42282 from juliuszsompolski/SPARK-44624-fix.

Authored-by: Juliusz Sompolski 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 52437bc73695e392bee60fbb340b6de4324b25d8)
Signed-off-by: Hyukjin Kwon 
---
 .../ExecutePlanResponseReattachableIterator.scala  | 43 +-
 .../sql/connect/client/GrpcRetryHandler.scala  | 10 -
 2 files changed, 43 insertions(+), 10 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
index fc07deaa081f..41648c3c1004 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
@@ -20,7 +20,8 @@ import java.util.UUID
 
 import scala.util.control.NonFatal
 
-import io.grpc.ManagedChannel
+import io.grpc.{ManagedChannel, StatusRuntimeException}
+import io.grpc.protobuf.StatusProto
 import io.grpc.stub.StreamObserver
 
 import org.apache.spark.connect.proto
@@ -38,15 +39,12 @@ import org.apache.spark.internal.Logging
  * Initial iterator is the result of an ExecutePlan on the request, but it can 
be reattached with
  * ReattachExecute request. ReattachExecute request is provided the responseId 
of last returned
  * ExecutePlanResponse on the iterator to return a new iterator from server 
that continues after
- * that.
+ * that. If the initial ExecutePlan did not even reach the server, and hence 
reattach fails with
+ * INVALID_HANDLE.OPERATION_NOT_FOUND, we attempt to retry ExecutePlan.
  *
  * In reattachable execute the server does buffer some responses in case the 
client needs to
  * backtrack. To let server release this buffer sooner, this iterator 
asynchronously sends
  * ReleaseExecute RPCs that instruct the server to release responses that it 
already processed.
- *
- * Note: If the initial ExecutePlan did not even reach the server and 
execution didn't start, the
- * ReattachExecute can still fail with INVALID_HANDLE.OPERATION_NOT_FOUND, 
failing the whole
- * operation.
  */
 class ExecutePlanResponseReattachableIterator(
 request: proto.ExecutePlanRequest,
@@ -113,7 +111,7 @@ class ExecutePlanResponseReattachableIterator(
   // on retry, the iterator is borked, so we need a new one
   iterator = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
 }
-iterator.next()
+callIter(_.next())
   }
 
   // Record last returned response, to know where to restart in case of 
reattach.
@@ -146,7 +144,7 @@ class ExecutePlanResponseReattachableIterator(
   // on retry, the iterator is borked, so we need a new one
   iterator = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
 }
-var hasNext = iterator.hasNext()
+var hasNext = callIter(_.hasNext())
 // Graceful reattach:
 // If iterator ended, but there was no ResultComplete, it means that 
there is more,
 // and we need to reattach.
@@ -154,7 +152,7 @@ class ExecutePlanResponseReattachableIterator(
   do {
 iterator = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
 assert(!resultComplete) // shouldn't change...
-hasNext = iterator.hasNext()
+hasNext = callIter(_.hasNext())
 // It's possible that the new iterator will be empty, so we need 
to loop to get another.
 // Eventually, there will be a non empty iterator, because 

[spark] branch master updated: [SPARK-44624][CONNECT] Retry ExecutePlan in case initial request didn't reach server

2023-08-03 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 52437bc73695 [SPARK-44624][CONNECT] Retry ExecutePlan in case initial 
request didn't reach server
52437bc73695 is described below

commit 52437bc73695e392bee60fbb340b6de4324b25d8
Author: Juliusz Sompolski 
AuthorDate: Fri Aug 4 12:05:19 2023 +0900

[SPARK-44624][CONNECT] Retry ExecutePlan in case initial request didn't 
reach server

### What changes were proposed in this pull request?

If the ExecutePlan never reached the server, a ReattachExecute will fail 
with INVALID_HANDLE.OPERATION_NOT_FOUND. In that case, we could try to send 
ExecutePlan again.

### Why are the changes needed?

This solves an edge case of reattachable execution where the initial 
execution never reached the server.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Testing these failures is difficult, will require some special testing setup

Closes #42282 from juliuszsompolski/SPARK-44624-fix.

Authored-by: Juliusz Sompolski 
Signed-off-by: Hyukjin Kwon 
---
 .../ExecutePlanResponseReattachableIterator.scala  | 43 +-
 .../sql/connect/client/GrpcRetryHandler.scala  | 10 -
 2 files changed, 43 insertions(+), 10 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
index fc07deaa081f..41648c3c1004 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
@@ -20,7 +20,8 @@ import java.util.UUID
 
 import scala.util.control.NonFatal
 
-import io.grpc.ManagedChannel
+import io.grpc.{ManagedChannel, StatusRuntimeException}
+import io.grpc.protobuf.StatusProto
 import io.grpc.stub.StreamObserver
 
 import org.apache.spark.connect.proto
@@ -38,15 +39,12 @@ import org.apache.spark.internal.Logging
  * Initial iterator is the result of an ExecutePlan on the request, but it can 
be reattached with
  * ReattachExecute request. ReattachExecute request is provided the responseId 
of last returned
  * ExecutePlanResponse on the iterator to return a new iterator from server 
that continues after
- * that.
+ * that. If the initial ExecutePlan did not even reach the server, and hence 
reattach fails with
+ * INVALID_HANDLE.OPERATION_NOT_FOUND, we attempt to retry ExecutePlan.
  *
  * In reattachable execute the server does buffer some responses in case the 
client needs to
  * backtrack. To let server release this buffer sooner, this iterator 
asynchronously sends
  * ReleaseExecute RPCs that instruct the server to release responses that it 
already processed.
- *
- * Note: If the initial ExecutePlan did not even reach the server and 
execution didn't start, the
- * ReattachExecute can still fail with INVALID_HANDLE.OPERATION_NOT_FOUND, 
failing the whole
- * operation.
  */
 class ExecutePlanResponseReattachableIterator(
 request: proto.ExecutePlanRequest,
@@ -113,7 +111,7 @@ class ExecutePlanResponseReattachableIterator(
   // on retry, the iterator is borked, so we need a new one
   iterator = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
 }
-iterator.next()
+callIter(_.next())
   }
 
   // Record last returned response, to know where to restart in case of 
reattach.
@@ -146,7 +144,7 @@ class ExecutePlanResponseReattachableIterator(
   // on retry, the iterator is borked, so we need a new one
   iterator = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
 }
-var hasNext = iterator.hasNext()
+var hasNext = callIter(_.hasNext())
 // Graceful reattach:
 // If iterator ended, but there was no ResultComplete, it means that 
there is more,
 // and we need to reattach.
@@ -154,7 +152,7 @@ class ExecutePlanResponseReattachableIterator(
   do {
 iterator = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
 assert(!resultComplete) // shouldn't change...
-hasNext = iterator.hasNext()
+hasNext = callIter(_.hasNext())
 // It's possible that the new iterator will be empty, so we need 
to loop to get another.
 // Eventually, there will be a non empty iterator, because there 
is always a
 // ResultComplete inserted by the server at the end of the stream.
@@ -197,6 

[spark] branch branch-3.5 updated: [SPARK-44664][PYTHON][CONNECT] Release the execute when closing the iterator in Python client

2023-08-03 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new cd653cff3cbe [SPARK-44664][PYTHON][CONNECT] Release the execute when 
closing the iterator in Python client
cd653cff3cbe is described below

commit cd653cff3cbe37a1ab06d74a58d29b4264335506
Author: Hyukjin Kwon 
AuthorDate: Fri Aug 4 12:03:54 2023 +0900

[SPARK-44664][PYTHON][CONNECT] Release the execute when closing the 
iterator in Python client

This PR implements the symmetry of 
https://github.com/apache/spark/pull/42331 and 
https://github.com/apache/spark/pull/42304

1. It releases the execute when the error is raised during the iteration
2. When you explicitly close the generator, (e.g., either 
`generator.close()` or explicit `GeneratorExit`), it releases the execution.

For the feature parity, see also https://github.com/apache/spark/pull/42331 
and https://github.com/apache/spark/pull/42304

See also https://github.com/apache/spark/pull/42331 and 
https://github.com/apache/spark/pull/42304

Tests will be added separately.

Closes #42330 from HyukjinKwon/python-error-release.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 492f6fac02a00b9ad545d84fa3f10a021a8e71b9)
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/connect/client/reattach.py | 110 +-
 1 file changed, 72 insertions(+), 38 deletions(-)

diff --git a/python/pyspark/sql/connect/client/reattach.py 
b/python/pyspark/sql/connect/client/reattach.py
index 4d4cce0ca441..702107d97f54 100644
--- a/python/pyspark/sql/connect/client/reattach.py
+++ b/python/pyspark/sql/connect/client/reattach.py
@@ -111,9 +111,9 @@ class ExecutePlanResponseReattachableIterator(Generator):
 self._last_returned_response_id = ret.response_id
 if ret.HasField("result_complete"):
 self._result_complete = True
-self._release_execute(None)  # release all
+self._release_all()
 else:
-self._release_execute(self._last_returned_response_id)
+self._release_until(self._last_returned_response_id)
 self._current = None
 return ret
 
@@ -125,61 +125,93 @@ class ExecutePlanResponseReattachableIterator(Generator):
 # After response complete response
 return False
 else:
-for attempt in Retrying(
-can_retry=SparkConnectClient.retry_exception, 
**self._retry_policy
-):
-with attempt:
-# on first try, we use the existing iterator.
-if not attempt.is_first_try():
-# on retry, the iterator is borked, so we need a new 
one
-self._iterator = iter(
-
self._stub.ReattachExecute(self._create_reattach_execute_request())
-)
-
-if self._current is None:
-try:
-self._current = next(self._iterator)
-except StopIteration:
-pass
-
-has_next = self._current is not None
-
-# Graceful reattach:
-# If iterator ended, but there was no ResponseComplete, it 
means that
-# there is more, and we need to reattach. While 
ResponseComplete didn't
-# arrive, we keep reattaching.
-if not self._result_complete and not has_next:
-while not has_next:
+try:
+for attempt in Retrying(
+can_retry=SparkConnectClient.retry_exception, 
**self._retry_policy
+):
+with attempt:
+# on first try, we use the existing iterator.
+if not attempt.is_first_try():
+# on retry, the iterator is borked, so we need a 
new one
 self._iterator = iter(
 
self._stub.ReattachExecute(self._create_reattach_execute_request())
 )
-# shouldn't change
-assert not self._result_complete
+
+if self._current is None:
 try:
 self._current = next(self._iterator)
 except StopIteration:
 pass
-has_next = self._current is not None
-return has_next
+
+has_next = self._current is not None
+
+# Graceful reattach:
+# If 

[spark] branch master updated (4ed59d93263 -> 492f6fac02a)

2023-08-03 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 4ed59d93263 [SPARK-44619][INFRA] Free up disk space for container jobs
 add 492f6fac02a [SPARK-44664][PYTHON][CONNECT] Release the execute when 
closing the iterator in Python client

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/connect/client/reattach.py | 110 +-
 1 file changed, 72 insertions(+), 38 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44619][INFRA] Free up disk space for container jobs

2023-08-03 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4ed59d93263 [SPARK-44619][INFRA] Free up disk space for container jobs
4ed59d93263 is described below

commit 4ed59d932631ba819d6b6071ee91408622a312db
Author: Ruifeng Zheng 
AuthorDate: Fri Aug 4 09:51:31 2023 +0800

[SPARK-44619][INFRA] Free up disk space for container jobs

### What changes were proposed in this pull request?
Free up disk space for container jobs

### Why are the changes needed?
increase the available disk space

before this PR

![image](https://github.com/apache/spark/assets/7322292/64230324-607b-4c1d-ac2d-84b9bcaab12a)

after this PR

![image](https://github.com/apache/spark/assets/7322292/aafed2d6-5d26-4f7f-b020-1efe4f551a8f)

### Does this PR introduce _any_ user-facing change?
No, infra-only

### How was this patch tested?
updated CI

Closes #42253 from zhengruifeng/infra_clean_container.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 .github/workflows/build_and_test.yml |  6 ++
 dev/free_disk_space_container| 33 +
 2 files changed, 39 insertions(+)

diff --git a/.github/workflows/build_and_test.yml 
b/.github/workflows/build_and_test.yml
index d9bcdfcbfa4..ea0c8e1d7fd 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -410,6 +410,8 @@ jobs:
 key: pyspark-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }}
 restore-keys: |
   pyspark-coursier-
+- name: Free up disk space
+  run: ./dev/free_disk_space_container
 - name: Install Java ${{ matrix.java }}
   uses: actions/setup-java@v3
   with:
@@ -508,6 +510,8 @@ jobs:
 key: sparkr-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }}
 restore-keys: |
   sparkr-coursier-
+- name: Free up disk space
+  run: ./dev/free_disk_space_container
 - name: Install Java ${{ inputs.java }}
   uses: actions/setup-java@v3
   with:
@@ -616,6 +620,8 @@ jobs:
 key: docs-maven-${{ hashFiles('**/pom.xml') }}
 restore-keys: |
   docs-maven-
+- name: Free up disk space
+  run: ./dev/free_disk_space_container
 - name: Install Java 8
   uses: actions/setup-java@v3
   with:
diff --git a/dev/free_disk_space_container b/dev/free_disk_space_container
new file mode 100755
index 000..cc3b74643e4
--- /dev/null
+++ b/dev/free_disk_space_container
@@ -0,0 +1,33 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+echo "=="
+echo "Free up disk space on CI system"
+echo "=="
+
+echo "Listing 100 largest packages"
+dpkg-query -Wf '${Installed-Size}\t${Package}\n' | sort -n | tail -n 100
+df -h
+
+echo "Removing large packages"
+rm -rf /__t/CodeQL
+rm -rf /__t/go
+rm -rf /__t/node
+
+df -h


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-40770][PYTHON][FOLLOW-UP][3.5] Improved error messages for mapInPandas for schema mismatch

2023-08-03 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new d85f3bc5a54 [SPARK-40770][PYTHON][FOLLOW-UP][3.5] Improved error 
messages for mapInPandas for schema mismatch
d85f3bc5a54 is described below

commit d85f3bc5a54302469824b2d8b5e71ebbcfc7c4c4
Author: Enrico Minack 
AuthorDate: Fri Aug 4 10:38:35 2023 +0900

[SPARK-40770][PYTHON][FOLLOW-UP][3.5] Improved error messages for 
mapInPandas for schema mismatch

### What changes were proposed in this pull request?
This merges #39952 into 3.5 branch.

Similar to #38223, improve the error messages when a Python method provided 
to `DataFrame.mapInPandas` returns a Pandas DataFrame that does not match the 
expected schema.

With
```Python
df = spark.range(2).withColumn("v", col("id"))
```

**Mismatching column names:**
```Python
df.mapInPandas(lambda it: it, "id long, val long").show()
# was: KeyError: 'val'
# now: RuntimeError: Column names of the returned pandas.DataFrame do not 
match specified schema.
#  Missing: val  Unexpected: v
```

**Python function not returning iterator:**
```Python
df.mapInPandas(lambda it: 1, "id long").show()
# was: TypeError: 'int' object is not iterable
# now: TypeError: Return type of the user-defined function should be 
iterator of pandas.DataFrame, but is 
```

**Python function not returning iterator of pandas.DataFrame:**
```Python
df.mapInPandas(lambda it: [1], "id long").show()
# was: TypeError: Return type of the user-defined function should be 
Pandas.DataFrame, but is 
# now: TypeError: Return type of the user-defined function should be 
iterator of pandas.DataFrame, but is iterator of 
# sometimes: ValueError: A field of type StructType expects a 
pandas.DataFrame, but got: 
# now: TypeError: Return type of the user-defined function should be 
iterator of pandas.DataFrame, but is iterator of 
```

**Mismatching types (ValueError and TypeError):**
```Python
df.mapInPandas(lambda it: it, "id int, v string").show()
# was: pyarrow.lib.ArrowTypeError: Expected a string or bytes dtype, got 
int64
# now: pyarrow.lib.ArrowTypeError: Expected a string or bytes dtype, got 
int64
#  The above exception was the direct cause of the following exception:
#  TypeError: Exception thrown when converting pandas.Series (int64) 
with name 'v' to Arrow Array (string).

df.mapInPandas(lambda it: [pdf.assign(v=pdf["v"].apply(str)) for pdf in 
it], "id int, v double").show()
# was: pyarrow.lib.ArrowInvalid: Could not convert '0' with type str: tried 
to convert to double
# now: pyarrow.lib.ArrowInvalid: Could not convert '0' with type str: tried 
to convert to double
#  The above exception was the direct cause of the following exception:
#  ValueError: Exception thrown when converting pandas.Series (object) 
with name 'v' to Arrow Array (double).

with self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": 
True}):
  df.mapInPandas(lambda it: [pdf.assign(v=pdf["v"].apply(str)) for pdf in 
it], "id int, v double").show()
# was: ValueError: Exception thrown when converting pandas.Series (object) 
to Arrow Array (double).
#  It can be caused by overflows or other unsafe conversions warned by 
Arrow. Arrow safe type check can be disabled
#  by using SQL config 
`spark.sql.execution.pandas.convertToArrowArraySafely`.
# now: ValueError: Exception thrown when converting pandas.Series (object) 
with name 'v' to Arrow Array (double).
#  It can be caused by overflows or other unsafe conversions warned by 
Arrow. Arrow safe type check can be disabled
#  by using SQL config 
`spark.sql.execution.pandas.convertToArrowArraySafely`.
```

### Why are the changes needed?
Existing errors are generic (`KeyError`) or meaningless (`'int' object is 
not iterable`). The errors should help users in spotting the mismatching 
columns by naming them.

The schema of the returned Pandas DataFrames can only be checked during 
processing the DataFrame, so such errors are very expensive. Therefore, they 
should be expressive.

### Does this PR introduce _any_ user-facing change?
This only changes error messages, not behaviour.

### How was this patch tested?
Tests all cases of schema mismatch for `DataFrame.mapInPandas`.

Closes #42316 from 
EnricoMi/branch-pyspark-map-in-pandas-schema-mismatch-3.5.

Authored-by: Enrico Minack 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/errors/error_classes.py |   5 +
 python/pyspark/pandas/frame.py |   2 +-
 python/pyspark/sql/pandas/serializers.py 

[spark] branch master updated: [SPARK-43562][SPARK-43870][PS] Remove APIs from `DataFrame` and `Series`

2023-08-03 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 678f47264e0 [SPARK-43562][SPARK-43870][PS] Remove APIs from 
`DataFrame` and `Series`
678f47264e0 is described below

commit 678f47264e084af766ed339df21513f44d05897f
Author: itholic 
AuthorDate: Fri Aug 4 10:36:04 2023 +0900

[SPARK-43562][SPARK-43870][PS] Remove APIs from `DataFrame` and `Series`

### What changes were proposed in this pull request?

This PR proposes to remove DataFrame/Series APIs that removed from [pandas 
2](https://pandas.pydata.org/docs/dev/whatsnew/v2.0.0.html) and above.

### Why are the changes needed?

To match the behavior to pandas.

### Does this PR introduce _any_ user-facing change?

(DataFrame|Series).(iteritems|mad|append) will be removed.

### How was this patch tested?

Enabling the existing tests.

Closes #42268 from itholic/pandas_remove_df_api.

Authored-by: itholic 
Signed-off-by: Hyukjin Kwon 
---
 .../source/migration_guide/pyspark_upgrade.rst |  11 ++
 .../docs/source/reference/pyspark.pandas/frame.rst |   3 -
 .../source/reference/pyspark.pandas/groupby.rst|   1 -
 .../source/reference/pyspark.pandas/series.rst |   3 -
 python/pyspark/pandas/frame.py | 204 +
 python/pyspark/pandas/groupby.py   |  81 
 python/pyspark/pandas/namespace.py |   1 -
 python/pyspark/pandas/series.py| 112 +--
 .../pandas/tests/computation/test_combine.py   |  71 ++-
 .../pandas/tests/computation/test_compute.py   |  34 
 python/pyspark/pandas/tests/groupby/test_stat.py   |   7 -
 .../pyspark/pandas/tests/indexes/test_indexing.py  |   8 +-
 python/pyspark/pandas/tests/series/test_compute.py |  18 +-
 python/pyspark/pandas/tests/series/test_series.py  |   8 +-
 python/pyspark/pandas/tests/series/test_stat.py|  35 
 15 files changed, 41 insertions(+), 556 deletions(-)

diff --git a/python/docs/source/migration_guide/pyspark_upgrade.rst 
b/python/docs/source/migration_guide/pyspark_upgrade.rst
index 7513d64ef6c..9bd879fb1a1 100644
--- a/python/docs/source/migration_guide/pyspark_upgrade.rst
+++ b/python/docs/source/migration_guide/pyspark_upgrade.rst
@@ -19,6 +19,17 @@
 Upgrading PySpark
 ==
 
+Upgrading from PySpark 3.5 to 4.0
+-
+
+* In Spark 4.0, ``DataFrame.iteritems`` has been removed from pandas API on 
Spark, use ``DataFrame.items`` instead.
+* In Spark 4.0, ``Series.iteritems`` has been removed from pandas API on 
Spark, use ``Series.items`` instead.
+* In Spark 4.0, ``DataFrame.append`` has been removed from pandas API on 
Spark, use ``ps.concat`` instead.
+* In Spark 4.0, ``Series.append`` has been removed from pandas API on Spark, 
use ``ps.concat`` instead.
+* In Spark 4.0, ``DataFrame.mad`` has been removed from pandas API on Spark.
+* In Spark 4.0, ``Series.mad`` has been removed from pandas API on Spark.
+
+
 Upgrading from PySpark 3.3 to 3.4
 -
 
diff --git a/python/docs/source/reference/pyspark.pandas/frame.rst 
b/python/docs/source/reference/pyspark.pandas/frame.rst
index a8d114187b9..5f839a803d7 100644
--- a/python/docs/source/reference/pyspark.pandas/frame.rst
+++ b/python/docs/source/reference/pyspark.pandas/frame.rst
@@ -79,7 +79,6 @@ Indexing, iteration
DataFrame.iloc
DataFrame.insert
DataFrame.items
-   DataFrame.iteritems
DataFrame.iterrows
DataFrame.itertuples
DataFrame.keys
@@ -155,7 +154,6 @@ Computations / Descriptive Stats
DataFrame.ewm
DataFrame.kurt
DataFrame.kurtosis
-   DataFrame.mad
DataFrame.max
DataFrame.mean
DataFrame.min
@@ -252,7 +250,6 @@ Combining / joining / merging
 .. autosummary::
:toctree: api/
 
-   DataFrame.append
DataFrame.assign
DataFrame.merge
DataFrame.join
diff --git a/python/docs/source/reference/pyspark.pandas/groupby.rst 
b/python/docs/source/reference/pyspark.pandas/groupby.rst
index da1579fd723..e71e81c56dd 100644
--- a/python/docs/source/reference/pyspark.pandas/groupby.rst
+++ b/python/docs/source/reference/pyspark.pandas/groupby.rst
@@ -68,7 +68,6 @@ Computations / Descriptive Stats
GroupBy.filter
GroupBy.first
GroupBy.last
-   GroupBy.mad
GroupBy.max
GroupBy.mean
GroupBy.median
diff --git a/python/docs/source/reference/pyspark.pandas/series.rst 
b/python/docs/source/reference/pyspark.pandas/series.rst
index a0119593f96..552acec096f 100644
--- a/python/docs/source/reference/pyspark.pandas/series.rst
+++ b/python/docs/source/reference/pyspark.pandas/series.rst
@@ -70,7 +70,6 @@ Indexing, iteration
Series.keys
Series.pop
Series.items
-   Series.iteritems
Series.item

[spark] branch master updated: [SPARK-43873][PS] Enabling `FrameDescribeTests`

2023-08-03 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 26ed4fbc00d [SPARK-43873][PS] Enabling `FrameDescribeTests`
26ed4fbc00d is described below

commit 26ed4fbc00dd9331807f747dd4e8ed7993c2497f
Author: itholic 
AuthorDate: Fri Aug 4 10:35:06 2023 +0900

[SPARK-43873][PS] Enabling `FrameDescribeTests`

### What changes were proposed in this pull request?

This PR proposes to enable the test `FrameDescribeTests`.

### Why are the changes needed?

To increate test coverage for pandas API on Spark with pandas 2.0.0 and 
above.

### Does this PR introduce _any_ user-facing change?

No, it's test-only.

### How was this patch tested?

Enabling the existing test.

Closes #42319 from itholic/pandas_describe.

Authored-by: itholic 
Signed-off-by: Hyukjin Kwon 
---
 .../pandas/tests/computation/test_describe.py  | 39 +-
 1 file changed, 9 insertions(+), 30 deletions(-)

diff --git a/python/pyspark/pandas/tests/computation/test_describe.py 
b/python/pyspark/pandas/tests/computation/test_describe.py
index af98d2869da..bbee9654eae 100644
--- a/python/pyspark/pandas/tests/computation/test_describe.py
+++ b/python/pyspark/pandas/tests/computation/test_describe.py
@@ -39,10 +39,6 @@ class FrameDescribeMixin:
 psdf = ps.from_pandas(pdf)
 return pdf, psdf
 
-@unittest.skipIf(
-LooseVersion(pd.__version__) >= LooseVersion("2.0.0"),
-"TODO(SPARK-43556): Enable DataFrameSlowTests.test_describe for pandas 
2.0.0.",
-)
 def test_describe(self):
 pdf, psdf = self.df_pair
 
@@ -78,19 +74,10 @@ class FrameDescribeMixin:
 }
 )
 pdf = psdf._to_pandas()
-# NOTE: Set `datetime_is_numeric=True` for pandas:
-# FutureWarning: Treating datetime data as categorical rather than 
numeric in
-# `.describe` is deprecated and will be removed in a future version of 
pandas.
-# Specify `datetime_is_numeric=True` to silence this
-# warning and adopt the future behavior now.
-# NOTE: Compare the result except percentiles, since we use 
approximate percentile
-# so the result is different from pandas.
 if LooseVersion(pd.__version__) >= LooseVersion("1.1.0"):
 self.assert_eq(
 psdf.describe().loc[["count", "mean", "min", "max"]],
-pdf.describe(datetime_is_numeric=True)
-.astype(str)
-.loc[["count", "mean", "min", "max"]],
+pdf.describe().astype(str).loc[["count", "mean", "min", 
"max"]],
 )
 else:
 self.assert_eq(
@@ -136,17 +123,13 @@ class FrameDescribeMixin:
 if LooseVersion(pd.__version__) >= LooseVersion("1.1.0"):
 self.assert_eq(
 psdf.describe().loc[["count", "mean", "min", "max"]],
-pdf.describe(datetime_is_numeric=True)
-.astype(str)
-.loc[["count", "mean", "min", "max"]],
+pdf.describe().astype(str).loc[["count", "mean", "min", 
"max"]],
 )
 psdf.A += psdf.A
 pdf.A += pdf.A
 self.assert_eq(
 psdf.describe().loc[["count", "mean", "min", "max"]],
-pdf.describe(datetime_is_numeric=True)
-.astype(str)
-.loc[["count", "mean", "min", "max"]],
+pdf.describe().astype(str).loc[["count", "mean", "min", 
"max"]],
 )
 else:
 expected_result = ps.DataFrame(
@@ -187,7 +170,7 @@ class FrameDescribeMixin:
 )
 pdf = psdf._to_pandas()
 if LooseVersion(pd.__version__) >= LooseVersion("1.1.0"):
-pandas_result = pdf.describe(datetime_is_numeric=True)
+pandas_result = pdf.describe()
 pandas_result.B = pandas_result.B.astype(str)
 self.assert_eq(
 psdf.describe().loc[["count", "mean", "min", "max"]],
@@ -195,7 +178,7 @@ class FrameDescribeMixin:
 )
 psdf.A += psdf.A
 pdf.A += pdf.A
-pandas_result = pdf.describe(datetime_is_numeric=True)
+pandas_result = pdf.describe()
 pandas_result.B = pandas_result.B.astype(str)
 self.assert_eq(
 psdf.describe().loc[["count", "mean", "min", "max"]],
@@ -252,7 +235,7 @@ class FrameDescribeMixin:
 )
 pdf = psdf._to_pandas()
 if LooseVersion(pd.__version__) >= LooseVersion("1.1.0"):
-pandas_result = pdf.describe(datetime_is_numeric=True)
+pandas_result = pdf.describe()
 pandas_result.b = pandas_result.b.astype(str)
 self.assert_eq(
 

[spark] branch master updated: [SPARK-44640][PYTHON] Improve error messages for Python UDTF returning non Iterable

2023-08-03 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 380c0f2033f [SPARK-44640][PYTHON] Improve error messages for Python 
UDTF returning non Iterable
380c0f2033f is described below

commit 380c0f2033fb83b5e4f13693d2576d72c5cc01f2
Author: allisonwang-db 
AuthorDate: Fri Aug 4 10:22:46 2023 +0900

[SPARK-44640][PYTHON] Improve error messages for Python UDTF returning non 
Iterable

### What changes were proposed in this pull request?

This PR improves error messages when the result of a Python UDTF is not an 
Iterable. It also improves the error messages when a UDTF encounters an 
exception when executing `eval`.

### Why are the changes needed?

To make Python UDTFs more user-friendly.

### Does this PR introduce _any_ user-facing change?

Yes. For example this UDTF:
```
udtf(returnType="x: int")
class TestUDTF:
def eval(self, a):
return a
```
Before this PR, it fails with this error for regular UDTFs:
```
return tuple(map(verify_and_convert_result, res))
TypeError: 'int' object is not iterable
```
And this error for arrow-optimized UDTFs:
```
raise ValueError("DataFrame constructor not properly called!")
ValueError: DataFrame constructor not properly called!
```

After this PR, the error message will be:
`pyspark.errors.exceptions.base.PySparkRuntimeError: 
[UDTF_RETURN_NOT_ITERABLE] The return value of the UDTF is invalid. It should 
be an iterable (e.g., generator or list), but got 'int'. Please make sure that 
the UDTF returns one of these types.`

### How was this patch tested?

New UTs.

Closes #42302 from allisonwang-db/spark-44640-udtf-non-iterable.

Authored-by: allisonwang-db 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/errors/error_classes.py |  5 
 python/pyspark/sql/tests/test_udtf.py  | 42 +--
 python/pyspark/sql/udtf.py | 40 -
 python/pyspark/worker.py   | 53 ++
 4 files changed, 105 insertions(+), 35 deletions(-)

diff --git a/python/pyspark/errors/error_classes.py 
b/python/pyspark/errors/error_classes.py
index d6f093246da..84448f1507d 100644
--- a/python/pyspark/errors/error_classes.py
+++ b/python/pyspark/errors/error_classes.py
@@ -738,6 +738,11 @@ ERROR_CLASSES_JSON = """
   "User defined table function encountered an error in the '' 
method: "
 ]
   },
+  "UDTF_RETURN_NOT_ITERABLE" : {
+"message" : [
+  "The return value of the UDTF is invalid. It should be an iterable 
(e.g., generator or list), but got ''. Please make sure that the UDTF 
returns one of these types."
+]
+  },
   "UDTF_RETURN_SCHEMA_MISMATCH" : {
 "message" : [
   "The number of columns in the result does not match the specified 
schema. Expected column count: , Actual column count: . 
Please make sure the values returned by the function have the same number of 
columns as specified in the output schema."
diff --git a/python/pyspark/sql/tests/test_udtf.py 
b/python/pyspark/sql/tests/test_udtf.py
index 65184549573..26da83980e1 100644
--- a/python/pyspark/sql/tests/test_udtf.py
+++ b/python/pyspark/sql/tests/test_udtf.py
@@ -180,6 +180,15 @@ class BaseUDTFTestsMixin:
 with self.assertRaisesRegex(PythonException, "Unexpected tuple 1 with 
StructType"):
 func(lit(1)).collect()
 
+def test_udtf_with_invalid_return_value(self):
+@udtf(returnType="x: int")
+class TestUDTF:
+def eval(self, a):
+return a
+
+with self.assertRaisesRegex(PythonException, 
"UDTF_RETURN_NOT_ITERABLE"):
+TestUDTF(lit(1)).collect()
+
 def test_udtf_eval_with_no_return(self):
 @udtf(returnType="a: int")
 class TestUDTF:
@@ -375,6 +384,35 @@ class BaseUDTFTestsMixin:
 ],
 )
 
+def test_init_with_exception(self):
+@udtf(returnType="x: int")
+class TestUDTF:
+def __init__(self):
+raise Exception("error")
+
+def eval(self):
+yield 1,
+
+with self.assertRaisesRegex(
+PythonException,
+r"\[UDTF_EXEC_ERROR\] User defined table function encountered an 
error "
+r"in the '__init__' method: error",
+):
+TestUDTF().show()
+
+def test_eval_with_exception(self):
+@udtf(returnType="x: int")
+class TestUDTF:
+def eval(self):
+raise Exception("error")
+
+with self.assertRaisesRegex(
+PythonException,
+r"\[UDTF_EXEC_ERROR\] User defined table function encountered an 
error "
+r"in the 'eval' method: 

[spark] branch master updated: [SPARK-44433][PYTHON][CONNECT][SS][FOLLOWUP] Terminate listener process with `removeListener` and improvements

2023-08-03 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5c36c580477 [SPARK-44433][PYTHON][CONNECT][SS][FOLLOWUP] Terminate 
listener process with `removeListener` and improvements
5c36c580477 is described below

commit 5c36c58047724885864cb781f17038a6b9c94513
Author: Wei Liu 
AuthorDate: Fri Aug 4 09:14:05 2023 +0900

[SPARK-44433][PYTHON][CONNECT][SS][FOLLOWUP] Terminate listener process 
with `removeListener` and improvements

### What changes were proposed in this pull request?

This is a followup to #42116. It addresses the following issues:

1. When `removeListener` is called upon one listener, before the python 
process is left running, now it also get stopped.
2. When multiple `removeListener` is called on the same listener, in 
non-connect mode, subsequent calls will be noop. But before this PR, in connect 
it actually throws an error, which doesn't align with existing behavior, this 
PR addresses it.
3. Set the socket timeout to be None (\infty) for `foreachBatch_worker` and 
`listener_worker`, because there could be a long time between each microbatch. 
If not setting this, the socket will timeout and won't be able to process new 
data.

```
scala> Streaming query listener worker is starting with url 
sc://localhost:15002/;user_id=wei.liu and sessionId 
886191f0-2b64-4c44-b067-de511f04b42d.
Traceback (most recent call last):
  File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
return _run_code(code, main_globals, None,
  File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
exec(code, run_globals)
  File 
"/home/wei.liu/oss-spark/python/lib/pyspark.zip/pyspark/sql/connect/streaming/worker/listener_worker.py",
 line 95, in 
  File 
"/home/wei.liu/oss-spark/python/lib/pyspark.zip/pyspark/sql/connect/streaming/worker/listener_worker.py",
 line 82, in main
  File 
"/home/wei.liu/oss-spark/python/lib/pyspark.zip/pyspark/serializers.py", line 
557, in loads
  File 
"/home/wei.liu/oss-spark/python/lib/pyspark.zip/pyspark/serializers.py", line 
594, in read_int
  File "/usr/lib/python3.9/socket.py", line 704, in readinto
return self._sock.recv_into(b)
socket.timeout: timed out
```

### Why are the changes needed?

Necessary improvements

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manual test + unit test

Closes #42283 from WweiL/SPARK-44433-listener-process-termination.

Authored-by: Wei Liu 
Signed-off-by: Hyukjin Kwon 
---
 .../sql/streaming/StreamingQueryListener.scala | 28 -
 .../sql/connect/planner/SparkConnectPlanner.scala  | 12 +---
 .../planner/StreamingForeachBatchHelper.scala  | 10 +++---
 .../planner/StreamingQueryListenerHelper.scala | 21 +++--
 .../spark/sql/connect/service/SessionHolder.scala  | 19 +++-
 .../spark/api/python/StreamingPythonRunner.scala   | 36 --
 .../streaming/worker/foreachBatch_worker.py|  4 ++-
 .../connect/streaming/worker/listener_worker.py|  4 ++-
 .../connect/streaming/test_parity_listener.py  |  7 +
 9 files changed, 77 insertions(+), 64 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index e2f3be02ad3..404bd1b078b 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -75,34 +75,6 @@ abstract class StreamingQueryListener extends Serializable {
   def onQueryTerminated(event: QueryTerminatedEvent): Unit
 }
 
-/**
- * Py4J allows a pure interface so this proxy is required.
- */
-private[spark] trait PythonStreamingQueryListener {
-  import StreamingQueryListener._
-
-  def onQueryStarted(event: QueryStartedEvent): Unit
-
-  def onQueryProgress(event: QueryProgressEvent): Unit
-
-  def onQueryIdle(event: QueryIdleEvent): Unit
-
-  def onQueryTerminated(event: QueryTerminatedEvent): Unit
-}
-
-private[spark] class PythonStreamingQueryListenerWrapper(listener: 
PythonStreamingQueryListener)
-extends StreamingQueryListener {
-  import StreamingQueryListener._
-
-  def onQueryStarted(event: QueryStartedEvent): Unit = 
listener.onQueryStarted(event)
-
-  def onQueryProgress(event: QueryProgressEvent): Unit = 
listener.onQueryProgress(event)
-
-  override def onQueryIdle(event: QueryIdleEvent): Unit = 
listener.onQueryIdle(event)
-
-  def onQueryTerminated(event: 

[spark] branch branch-3.5 updated: [MINOR][DOC] Fix a typo in ResolveReferencesInUpdate scaladoc

2023-08-03 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 1ad71ffc33d [MINOR][DOC] Fix a typo in ResolveReferencesInUpdate 
scaladoc
1ad71ffc33d is described below

commit 1ad71ffc33ddf0861f62e389a5e8ad438f9afb26
Author: Sergii Druzkin <65374769+sdruz...@users.noreply.github.com>
AuthorDate: Thu Aug 3 18:52:44 2023 -0500

[MINOR][DOC] Fix a typo in ResolveReferencesInUpdate scaladoc

### What changes were proposed in this pull request?
Fixed a typo in the ResolveReferencesInUpdate documentation.

### Why are the changes needed?

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
CI

Closes #42322 from sdruzkin/master.

Authored-by: Sergii Druzkin <65374769+sdruz...@users.noreply.github.com>
Signed-off-by: Sean Owen 
(cherry picked from commit 52a9002fa2383bd9b26c77e62e0c6bcd46f8944b)
Signed-off-by: Sean Owen 
---
 .../apache/spark/sql/catalyst/analysis/ResolveReferencesInUpdate.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInUpdate.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInUpdate.scala
index cebc1e25f92..ead323ce985 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInUpdate.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInUpdate.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
 /**
  * A virtual rule to resolve [[UnresolvedAttribute]] in [[UpdateTable]]. It's 
only used by the real
  * rule `ResolveReferences`. The column resolution order for [[UpdateTable]] 
is:
- * 1. Resolves the column to `AttributeReference`` with the output of the 
child plan. This
+ * 1. Resolves the column to `AttributeReference` with the output of the child 
plan. This
  *includes metadata columns as well.
  * 2. Resolves the column to a literal function which is allowed to be invoked 
without braces, e.g.
  *`SELECT col, current_date FROM t`.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [MINOR][DOC] Fix a typo in ResolveReferencesInUpdate scaladoc

2023-08-03 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 52a9002fa23 [MINOR][DOC] Fix a typo in ResolveReferencesInUpdate 
scaladoc
52a9002fa23 is described below

commit 52a9002fa2383bd9b26c77e62e0c6bcd46f8944b
Author: Sergii Druzkin <65374769+sdruz...@users.noreply.github.com>
AuthorDate: Thu Aug 3 18:52:44 2023 -0500

[MINOR][DOC] Fix a typo in ResolveReferencesInUpdate scaladoc

### What changes were proposed in this pull request?
Fixed a typo in the ResolveReferencesInUpdate documentation.

### Why are the changes needed?

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
CI

Closes #42322 from sdruzkin/master.

Authored-by: Sergii Druzkin <65374769+sdruz...@users.noreply.github.com>
Signed-off-by: Sean Owen 
---
 .../apache/spark/sql/catalyst/analysis/ResolveReferencesInUpdate.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInUpdate.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInUpdate.scala
index cebc1e25f92..ead323ce985 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInUpdate.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInUpdate.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
 /**
  * A virtual rule to resolve [[UnresolvedAttribute]] in [[UpdateTable]]. It's 
only used by the real
  * rule `ResolveReferences`. The column resolution order for [[UpdateTable]] 
is:
- * 1. Resolves the column to `AttributeReference`` with the output of the 
child plan. This
+ * 1. Resolves the column to `AttributeReference` with the output of the child 
plan. This
  *includes metadata columns as well.
  * 2. Resolves the column to a literal function which is allowed to be invoked 
without braces, e.g.
  *`SELECT col, current_date FROM t`.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-44642][CONNECT] ReleaseExecute in ExecutePlanResponseReattachableIterator after it gets error from server

2023-08-03 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 43a853660b0 [SPARK-44642][CONNECT] ReleaseExecute in 
ExecutePlanResponseReattachableIterator after it gets error from server
43a853660b0 is described below

commit 43a853660b08aa176bb8eb194ec74043006f219f
Author: Juliusz Sompolski 
AuthorDate: Fri Aug 4 08:45:44 2023 +0900

[SPARK-44642][CONNECT] ReleaseExecute in 
ExecutePlanResponseReattachableIterator after it gets error from server

### What changes were proposed in this pull request?

Client:
When server returns error on the response stream via onError, the 
ExecutePlanResponseReattachableIterator will not see the stream finish with a 
ResultsComplete. Instead, a StatusRuntimeException will be thrown from next() 
or hasNext(). Handle catching that exception, telling the server to 
ReleaseExecute when we receive it, and rethrow it to the user.

Server:
We also have to tweak the behaviour of ReleaseAll to also interrupt the 
query. The previous behaviour that in case of a running query one has to first 
send an interrupt, and then release was done to prevent race conditions of an 
interrupt coming after ResultComplete. Now, this has been resolved with proper 
synchronization at the final moments of execution in ExecuteThreadRunner, and 
as we want the release to be async, having one ReleaseExecutel vs. needing a 
combination of Interrupt+R [...]

### Why are the changes needed?

If ReleaseExecute is not called by the client to acknowledge that the error 
was received, the execution will keep dangling on the server until cleaned up 
by timeout.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Automated tests will come with 
https://issues.apache.org/jira/browse/SPARK-44625.

Closes #42304 from juliuszsompolski/SPARK-44642.

Authored-by: Juliusz Sompolski 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit d2d43b888aebbb5d4099faec26b076ef390890ce)
Signed-off-by: Hyukjin Kwon 
---
 .../ExecutePlanResponseReattachableIterator.scala  | 120 -
 .../src/main/protobuf/spark/connect/base.proto |   4 +-
 .../connect/execution/ExecuteThreadRunner.scala|  31 --
 .../spark/sql/connect/service/ExecuteHolder.scala  |   6 +-
 .../connect/planner/SparkConnectServiceSuite.scala |  25 +++--
 python/pyspark/sql/connect/proto/base_pb2.pyi  |   4 +-
 6 files changed, 116 insertions(+), 74 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
index 008b3c3dd5c..fc07deaa081 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
@@ -102,28 +102,33 @@ class ExecutePlanResponseReattachableIterator(
   throw new java.util.NoSuchElementException()
 }
 
-// Get next response, possibly triggering reattach in case of stream error.
-var firstTry = true
-val ret = retry {
-  if (firstTry) {
-// on first try, we use the existing iterator.
-firstTry = false
-  } else {
-// on retry, the iterator is borked, so we need a new one
-iterator = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
+try {
+  // Get next response, possibly triggering reattach in case of stream 
error.
+  var firstTry = true
+  val ret = retry {
+if (firstTry) {
+  // on first try, we use the existing iterator.
+  firstTry = false
+} else {
+  // on retry, the iterator is borked, so we need a new one
+  iterator = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
+}
+iterator.next()
   }
-  iterator.next()
-}
 
-// Record last returned response, to know where to restart in case of 
reattach.
-lastReturnedResponseId = Some(ret.getResponseId)
-if (ret.hasResultComplete) {
-  resultComplete = true
-  releaseExecute(None) // release all
-} else {
-  releaseExecute(lastReturnedResponseId) // release until this response
+  // Record last returned response, to know where to restart in case of 
reattach.
+  lastReturnedResponseId = Some(ret.getResponseId)
+  if (ret.hasResultComplete) {
+releaseAll()
+  } else {
+releaseUntil(lastReturnedResponseId.get)
+  }
+  ret
+} catch {
+  case 

[spark] branch master updated (16b031eb144 -> d2d43b888ae)

2023-08-03 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 16b031eb144 [SPARK-44652] Raise error when only one df is None
 add d2d43b888ae [SPARK-44642][CONNECT] ReleaseExecute in 
ExecutePlanResponseReattachableIterator after it gets error from server

No new revisions were added by this update.

Summary of changes:
 .../ExecutePlanResponseReattachableIterator.scala  | 120 -
 .../src/main/protobuf/spark/connect/base.proto |   4 +-
 .../connect/execution/ExecuteThreadRunner.scala|  31 --
 .../spark/sql/connect/service/ExecuteHolder.scala  |   6 +-
 .../connect/planner/SparkConnectServiceSuite.scala |  25 +++--
 python/pyspark/sql/connect/proto/base_pb2.pyi  |   4 +-
 6 files changed, 116 insertions(+), 74 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-44652] Raise error when only one df is None

2023-08-03 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 1d0d8f5fa6c [SPARK-44652] Raise error when only one df is None
1d0d8f5fa6c is described below

commit 1d0d8f5fa6cd1ebf35301443d5119de752ccd18f
Author: Amanda Liu 
AuthorDate: Fri Aug 4 08:42:39 2023 +0900

[SPARK-44652] Raise error when only one df is None

### What changes were proposed in this pull request?
Adds a "raise PySparkAssertionError" for the case when one of `actual` or 
`expected` is None, instead of just returning False.

### Why are the changes needed?
The PR ensures that an error is thrown in the assertion for the edge case 
when one of `actual` or `expected` is None

### Does this PR introduce _any_ user-facing change?
Yes, the PR affects the user-facing API `assertDataFrameEqual`

### How was this patch tested?
Added tests to `python/pyspark/sql/tests/test_utils.py`

Closes #42314 from asl3/raise-none-error.

Authored-by: Amanda Liu 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 16b031eb144f6ba1c1103be5dcf00d6209adaa85)
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/tests/test_utils.py | 82 ++
 python/pyspark/testing/utils.py| 32 ++---
 2 files changed, 99 insertions(+), 15 deletions(-)

diff --git a/python/pyspark/sql/tests/test_utils.py 
b/python/pyspark/sql/tests/test_utils.py
index 76d397e3ade..93895465de7 100644
--- a/python/pyspark/sql/tests/test_utils.py
+++ b/python/pyspark/sql/tests/test_utils.py
@@ -41,6 +41,7 @@ from pyspark.sql.types import (
 BooleanType,
 )
 from pyspark.sql.dataframe import DataFrame
+import pyspark.pandas as ps
 
 import difflib
 from typing import List, Union
@@ -672,9 +673,79 @@ class UtilsTestsMixin:
 assertDataFrameEqual(df1, df2, checkRowOrder=False)
 assertDataFrameEqual(df1, df2, checkRowOrder=True)
 
-def test_assert_equal_exact_pandas_df(self):
-import pyspark.pandas as ps
+def test_assert_unequal_null_actual(self):
+df1 = None
+df2 = self.spark.createDataFrame(
+data=[
+("1", 1000),
+("2", 3000),
+],
+schema=["id", "amount"],
+)
+
+with self.assertRaises(PySparkAssertionError) as pe:
+assertDataFrameEqual(df1, df2)
+
+self.check_error(
+exception=pe.exception,
+error_class="INVALID_TYPE_DF_EQUALITY_ARG",
+message_parameters={
+"expected_type": Union[DataFrame, ps.DataFrame, List[Row]],
+"arg_name": "actual",
+"actual_type": None,
+},
+)
+
+with self.assertRaises(PySparkAssertionError) as pe:
+assertDataFrameEqual(df1, df2, checkRowOrder=True)
+
+self.check_error(
+exception=pe.exception,
+error_class="INVALID_TYPE_DF_EQUALITY_ARG",
+message_parameters={
+"expected_type": Union[DataFrame, ps.DataFrame, List[Row]],
+"arg_name": "actual",
+"actual_type": None,
+},
+)
+
+def test_assert_unequal_null_expected(self):
+df1 = self.spark.createDataFrame(
+data=[
+("1", 1000),
+("2", 3000),
+],
+schema=["id", "amount"],
+)
+df2 = None
+
+with self.assertRaises(PySparkAssertionError) as pe:
+assertDataFrameEqual(df1, df2)
 
+self.check_error(
+exception=pe.exception,
+error_class="INVALID_TYPE_DF_EQUALITY_ARG",
+message_parameters={
+"expected_type": Union[DataFrame, ps.DataFrame, List[Row]],
+"arg_name": "expected",
+"actual_type": None,
+},
+)
+
+with self.assertRaises(PySparkAssertionError) as pe:
+assertDataFrameEqual(df1, df2, checkRowOrder=True)
+
+self.check_error(
+exception=pe.exception,
+error_class="INVALID_TYPE_DF_EQUALITY_ARG",
+message_parameters={
+"expected_type": Union[DataFrame, ps.DataFrame, List[Row]],
+"arg_name": "expected",
+"actual_type": None,
+},
+)
+
+def test_assert_equal_exact_pandas_df(self):
 df1 = ps.DataFrame(data=[10, 20, 30], columns=["Numbers"])
 df2 = ps.DataFrame(data=[10, 20, 30], columns=["Numbers"])
 
@@ -682,16 +753,12 @@ class UtilsTestsMixin:
 assertDataFrameEqual(df1, df2, checkRowOrder=True)
 
 def test_assert_equal_exact_pandas_df(self):
-import pyspark.pandas as ps
-
 df1 = ps.DataFrame(data=[10, 20, 30], columns=["Numbers"])
  

[spark] branch master updated: [SPARK-44652] Raise error when only one df is None

2023-08-03 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 16b031eb144 [SPARK-44652] Raise error when only one df is None
16b031eb144 is described below

commit 16b031eb144f6ba1c1103be5dcf00d6209adaa85
Author: Amanda Liu 
AuthorDate: Fri Aug 4 08:42:39 2023 +0900

[SPARK-44652] Raise error when only one df is None

### What changes were proposed in this pull request?
Adds a "raise PySparkAssertionError" for the case when one of `actual` or 
`expected` is None, instead of just returning False.

### Why are the changes needed?
The PR ensures that an error is thrown in the assertion for the edge case 
when one of `actual` or `expected` is None

### Does this PR introduce _any_ user-facing change?
Yes, the PR affects the user-facing API `assertDataFrameEqual`

### How was this patch tested?
Added tests to `python/pyspark/sql/tests/test_utils.py`

Closes #42314 from asl3/raise-none-error.

Authored-by: Amanda Liu 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/tests/test_utils.py | 82 ++
 python/pyspark/testing/utils.py| 32 ++---
 2 files changed, 99 insertions(+), 15 deletions(-)

diff --git a/python/pyspark/sql/tests/test_utils.py 
b/python/pyspark/sql/tests/test_utils.py
index 76d397e3ade..93895465de7 100644
--- a/python/pyspark/sql/tests/test_utils.py
+++ b/python/pyspark/sql/tests/test_utils.py
@@ -41,6 +41,7 @@ from pyspark.sql.types import (
 BooleanType,
 )
 from pyspark.sql.dataframe import DataFrame
+import pyspark.pandas as ps
 
 import difflib
 from typing import List, Union
@@ -672,9 +673,79 @@ class UtilsTestsMixin:
 assertDataFrameEqual(df1, df2, checkRowOrder=False)
 assertDataFrameEqual(df1, df2, checkRowOrder=True)
 
-def test_assert_equal_exact_pandas_df(self):
-import pyspark.pandas as ps
+def test_assert_unequal_null_actual(self):
+df1 = None
+df2 = self.spark.createDataFrame(
+data=[
+("1", 1000),
+("2", 3000),
+],
+schema=["id", "amount"],
+)
+
+with self.assertRaises(PySparkAssertionError) as pe:
+assertDataFrameEqual(df1, df2)
+
+self.check_error(
+exception=pe.exception,
+error_class="INVALID_TYPE_DF_EQUALITY_ARG",
+message_parameters={
+"expected_type": Union[DataFrame, ps.DataFrame, List[Row]],
+"arg_name": "actual",
+"actual_type": None,
+},
+)
+
+with self.assertRaises(PySparkAssertionError) as pe:
+assertDataFrameEqual(df1, df2, checkRowOrder=True)
+
+self.check_error(
+exception=pe.exception,
+error_class="INVALID_TYPE_DF_EQUALITY_ARG",
+message_parameters={
+"expected_type": Union[DataFrame, ps.DataFrame, List[Row]],
+"arg_name": "actual",
+"actual_type": None,
+},
+)
+
+def test_assert_unequal_null_expected(self):
+df1 = self.spark.createDataFrame(
+data=[
+("1", 1000),
+("2", 3000),
+],
+schema=["id", "amount"],
+)
+df2 = None
+
+with self.assertRaises(PySparkAssertionError) as pe:
+assertDataFrameEqual(df1, df2)
 
+self.check_error(
+exception=pe.exception,
+error_class="INVALID_TYPE_DF_EQUALITY_ARG",
+message_parameters={
+"expected_type": Union[DataFrame, ps.DataFrame, List[Row]],
+"arg_name": "expected",
+"actual_type": None,
+},
+)
+
+with self.assertRaises(PySparkAssertionError) as pe:
+assertDataFrameEqual(df1, df2, checkRowOrder=True)
+
+self.check_error(
+exception=pe.exception,
+error_class="INVALID_TYPE_DF_EQUALITY_ARG",
+message_parameters={
+"expected_type": Union[DataFrame, ps.DataFrame, List[Row]],
+"arg_name": "expected",
+"actual_type": None,
+},
+)
+
+def test_assert_equal_exact_pandas_df(self):
 df1 = ps.DataFrame(data=[10, 20, 30], columns=["Numbers"])
 df2 = ps.DataFrame(data=[10, 20, 30], columns=["Numbers"])
 
@@ -682,16 +753,12 @@ class UtilsTestsMixin:
 assertDataFrameEqual(df1, df2, checkRowOrder=True)
 
 def test_assert_equal_exact_pandas_df(self):
-import pyspark.pandas as ps
-
 df1 = ps.DataFrame(data=[10, 20, 30], columns=["Numbers"])
 df2 = ps.DataFrame(data=[30, 20, 10], columns=["Numbers"])
 
 assertDataFrameEqual(df1, df2)
 

[spark] branch branch-3.4 updated: [SPARK-44661][CORE][TESTS] `getMapOutputLocation` should not throw NPE

2023-08-03 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new f9155dee910 [SPARK-44661][CORE][TESTS] `getMapOutputLocation` should 
not throw NPE
f9155dee910 is described below

commit f9155dee910f018f95e25de94bcdc73c580f1c9e
Author: Dongjoon Hyun 
AuthorDate: Thu Aug 3 14:40:59 2023 -0700

[SPARK-44661][CORE][TESTS] `getMapOutputLocation` should not throw NPE

This PR aims to add a test coverage for Apache Spark 4.0/3.5/3.4.
This PR depends on SPARK-44658 (#42323) but is created separately because 
this aims to land `branch-3.4` too.

To prevent a future regression.

No.

Pass the CIs.

Closes #42326 from dongjoon-hyun/SPARK-44661.

Lead-authored-by: Dongjoon Hyun 
Co-authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 9fbf0b4853c6209675daa0731f8b33a83b2f5cef)
Signed-off-by: Dongjoon Hyun 
---
 .../scala/org/apache/spark/MapOutputTrackerSuite.scala  | 17 +
 1 file changed, 17 insertions(+)

diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index dfad4a924d7..796096bb87d 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -1030,4 +1030,21 @@ class MapOutputTrackerSuite extends SparkFunSuite with 
LocalSparkContext {
 rpcEnv.shutdown()
 assert(npeCounter.intValue() == 0)
   }
+
+  test("SPARK-44661: getMapOutputLocation should not throw NPE") {
+val rpcEnv = createRpcEnv("test")
+val tracker = newTrackerMaster()
+try {
+  tracker.trackerEndpoint = 
rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
+new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf))
+  tracker.registerShuffle(0, 1, 1)
+  tracker.registerMapOutput(0, 0, MapStatus(BlockManagerId("exec-1", 
"hostA", 1000),
+Array(2L), 0))
+  tracker.removeOutputsOnHost("hostA")
+  assert(tracker.getMapOutputLocation(0, 0) == None)
+} finally {
+  tracker.stop()
+  rpcEnv.shutdown()
+}
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-44661][CORE][TESTS] `getMapOutputLocation` should not throw NPE

2023-08-03 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 4939889a301 [SPARK-44661][CORE][TESTS] `getMapOutputLocation` should 
not throw NPE
4939889a301 is described below

commit 4939889a301209c678a2f4df978a9f7e6b15edbd
Author: Dongjoon Hyun 
AuthorDate: Thu Aug 3 14:40:59 2023 -0700

[SPARK-44661][CORE][TESTS] `getMapOutputLocation` should not throw NPE

### What changes were proposed in this pull request?

This PR aims to add a test coverage for Apache Spark 4.0/3.5/3.4.
This PR depends on SPARK-44658 (#42323) but is created separately because 
this aims to land `branch-3.4` too.

### Why are the changes needed?

To prevent a future regression.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

Closes #42326 from dongjoon-hyun/SPARK-44661.

Lead-authored-by: Dongjoon Hyun 
Co-authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 9fbf0b4853c6209675daa0731f8b33a83b2f5cef)
Signed-off-by: Dongjoon Hyun 
---
 .../scala/org/apache/spark/MapOutputTrackerSuite.scala  | 17 +
 1 file changed, 17 insertions(+)

diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 7ee36137e27..450ff01921a 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -1092,4 +1092,21 @@ class MapOutputTrackerSuite extends SparkFunSuite with 
LocalSparkContext {
 shuffleStatus.removeMapOutput(mapIndex = 1, bmID)
 assert(shuffleStatus.getMapStatus(0).isEmpty)
   }
+
+  test("SPARK-44661: getMapOutputLocation should not throw NPE") {
+val rpcEnv = createRpcEnv("test")
+val tracker = newTrackerMaster()
+try {
+  tracker.trackerEndpoint = 
rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
+new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf))
+  tracker.registerShuffle(0, 1, 1)
+  tracker.registerMapOutput(0, 0, MapStatus(BlockManagerId("exec-1", 
"hostA", 1000),
+Array(2L), 0))
+  tracker.removeOutputsOnHost("hostA")
+  assert(tracker.getMapOutputLocation(0, 0) == None)
+} finally {
+  tracker.stop()
+  rpcEnv.shutdown()
+}
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44661][CORE][TESTS] `getMapOutputLocation` should not throw NPE

2023-08-03 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 9fbf0b4853c [SPARK-44661][CORE][TESTS] `getMapOutputLocation` should 
not throw NPE
9fbf0b4853c is described below

commit 9fbf0b4853c6209675daa0731f8b33a83b2f5cef
Author: Dongjoon Hyun 
AuthorDate: Thu Aug 3 14:40:59 2023 -0700

[SPARK-44661][CORE][TESTS] `getMapOutputLocation` should not throw NPE

### What changes were proposed in this pull request?

This PR aims to add a test coverage for Apache Spark 4.0/3.5/3.4.
This PR depends on SPARK-44658 (#42323) but is created separately because 
this aims to land `branch-3.4` too.

### Why are the changes needed?

To prevent a future regression.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

Closes #42326 from dongjoon-hyun/SPARK-44661.

Lead-authored-by: Dongjoon Hyun 
Co-authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 .../scala/org/apache/spark/MapOutputTrackerSuite.scala  | 17 +
 1 file changed, 17 insertions(+)

diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 7ee36137e27..450ff01921a 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -1092,4 +1092,21 @@ class MapOutputTrackerSuite extends SparkFunSuite with 
LocalSparkContext {
 shuffleStatus.removeMapOutput(mapIndex = 1, bmID)
 assert(shuffleStatus.getMapStatus(0).isEmpty)
   }
+
+  test("SPARK-44661: getMapOutputLocation should not throw NPE") {
+val rpcEnv = createRpcEnv("test")
+val tracker = newTrackerMaster()
+try {
+  tracker.trackerEndpoint = 
rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
+new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf))
+  tracker.registerShuffle(0, 1, 1)
+  tracker.registerMapOutput(0, 0, MapStatus(BlockManagerId("exec-1", 
"hostA", 1000),
+Array(2L), 0))
+  tracker.removeOutputsOnHost("hostA")
+  assert(tracker.getMapOutputLocation(0, 0) == None)
+} finally {
+  tracker.stop()
+  rpcEnv.shutdown()
+}
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-44658][CORE] `ShuffleStatus.getMapStatus` should return `None` instead of `Some(null)`

2023-08-03 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 7f9a4737845 [SPARK-44658][CORE] `ShuffleStatus.getMapStatus` should 
return `None` instead of `Some(null)`
7f9a4737845 is described below

commit 7f9a4737845fdbaf5ac3b311a32e5d9c105bc226
Author: Dongjoon Hyun 
AuthorDate: Thu Aug 3 14:18:16 2023 -0700

[SPARK-44658][CORE] `ShuffleStatus.getMapStatus` should return `None` 
instead of `Some(null)`

### What changes were proposed in this pull request?

This PR is for `master` and `branch-3.5` and aims to fix a regression due 
to SPARK-43043 which landed at Apache Spark 3.4.1 and reverted via SPARK-44630. 
This PR makes `ShuffleStatus.getMapStatus` return `None` instead of 
`Some(null)`.

### Why are the changes needed?

`None` is better because `Some(null)` is unsafe because it causes NPE in 
some cases.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs with the newly added test case.

Closes #42323 from dongjoon-hyun/SPARK-44658.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit ed036a9d0aab2d75b5c0db5caebfc158ce22ec15)
Signed-off-by: Dongjoon Hyun 
---
 core/src/main/scala/org/apache/spark/MapOutputTracker.scala  | 5 -
 core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala | 9 +
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 47ac3df4cc6..3495536a350 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -171,7 +171,10 @@ private class ShuffleStatus(
* Get the map output that corresponding to a given mapId.
*/
   def getMapStatus(mapId: Long): Option[MapStatus] = withReadLock {
-mapIdToMapIndex.get(mapId).map(mapStatuses(_))
+mapIdToMapIndex.get(mapId).map(mapStatuses(_)) match {
+  case Some(null) => None
+  case m => m
+}
   }
 
   /**
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 7ac3d0092c8..7ee36137e27 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -1083,4 +1083,13 @@ class MapOutputTrackerSuite extends SparkFunSuite with 
LocalSparkContext {
   rpcEnv.shutdown()
 }
   }
+
+  test("SPARK-44658: ShuffleStatus.getMapStatus should return None") {
+val bmID = BlockManagerId("a", "hostA", 1000)
+val mapStatus = MapStatus(bmID, Array(1000L, 1L), mapTaskId = 0)
+val shuffleStatus = new ShuffleStatus(1000)
+shuffleStatus.addMapOutput(mapIndex = 1, mapStatus)
+shuffleStatus.removeMapOutput(mapIndex = 1, bmID)
+assert(shuffleStatus.getMapStatus(0).isEmpty)
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44658][CORE] `ShuffleStatus.getMapStatus` should return `None` instead of `Some(null)`

2023-08-03 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ed036a9d0aa [SPARK-44658][CORE] `ShuffleStatus.getMapStatus` should 
return `None` instead of `Some(null)`
ed036a9d0aa is described below

commit ed036a9d0aab2d75b5c0db5caebfc158ce22ec15
Author: Dongjoon Hyun 
AuthorDate: Thu Aug 3 14:18:16 2023 -0700

[SPARK-44658][CORE] `ShuffleStatus.getMapStatus` should return `None` 
instead of `Some(null)`

### What changes were proposed in this pull request?

This PR is for `master` and `branch-3.5` and aims to fix a regression due 
to SPARK-43043 which landed at Apache Spark 3.4.1 and reverted via SPARK-44630. 
This PR makes `ShuffleStatus.getMapStatus` return `None` instead of 
`Some(null)`.

### Why are the changes needed?

`None` is better because `Some(null)` is unsafe because it causes NPE in 
some cases.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs with the newly added test case.

Closes #42323 from dongjoon-hyun/SPARK-44658.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 core/src/main/scala/org/apache/spark/MapOutputTracker.scala  | 5 -
 core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala | 9 +
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala 
b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 47ac3df4cc6..3495536a350 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -171,7 +171,10 @@ private class ShuffleStatus(
* Get the map output that corresponding to a given mapId.
*/
   def getMapStatus(mapId: Long): Option[MapStatus] = withReadLock {
-mapIdToMapIndex.get(mapId).map(mapStatuses(_))
+mapIdToMapIndex.get(mapId).map(mapStatuses(_)) match {
+  case Some(null) => None
+  case m => m
+}
   }
 
   /**
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 7ac3d0092c8..7ee36137e27 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -1083,4 +1083,13 @@ class MapOutputTrackerSuite extends SparkFunSuite with 
LocalSparkContext {
   rpcEnv.shutdown()
 }
   }
+
+  test("SPARK-44658: ShuffleStatus.getMapStatus should return None") {
+val bmID = BlockManagerId("a", "hostA", 1000)
+val mapStatus = MapStatus(bmID, Array(1000L, 1L), mapTaskId = 0)
+val shuffleStatus = new ShuffleStatus(1000)
+shuffleStatus.addMapOutput(mapIndex = 1, mapStatus)
+shuffleStatus.removeMapOutput(mapIndex = 1, bmID)
+assert(shuffleStatus.getMapStatus(0).isEmpty)
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org