[spark] branch branch-3.4 updated: [SPARK-45227][CORE] Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend

2023-09-28 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new e706ba1cd4b [SPARK-45227][CORE] Fix a subtle thread-safety issue with 
CoarseGrainedExecutorBackend
e706ba1cd4b is described below

commit e706ba1cd4b94bcb215355721914002748590d46
Author: Bo Xiong 
AuthorDate: Thu Sep 28 22:53:37 2023 -0500

[SPARK-45227][CORE] Fix a subtle thread-safety issue with 
CoarseGrainedExecutorBackend

### What changes were proposed in this pull request?
Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an 
executor process randomly gets stuck

### Why are the changes needed?
For each executor, the single-threaded dispatcher can run into an "infinite 
loop" (as explained in the SPARK-45227). Once an executor process runs into a 
state, it'd stop launching tasks from the driver or reporting task status back.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
```
$ build/mvn package -DskipTests -pl core
$ build/mvn -Dtest=none 
-DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite 
test
```

### Was this patch authored or co-authored using generative AI tooling?
No


**
**_Please feel free to skip reading unless you're interested in details_**

**

### Symptom

Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the 
very last step of writing a data frame to S3 by calling `df.write`. Looking at 
Spark UI, we saw that an executor process hung over 1 hour. After we manually 
killed the executor process, the app succeeded. Note that the same EMR cluster 
with two worker nodes was able to run the same app without any issue before and 
after the incident.

Below is what's observed from relevant container logs and thread dump.

- A regular task that's sent to the executor, which also reported back to 
the driver upon the task completion.

```
$zgrep 'task 150' container_1694029806204_12865_01_01/stderr.gz
23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 
23.0 (TID 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, 
NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()
23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 
23.0 (TID 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200)

$zgrep ' 923' container_1694029806204_12865_01_04/stderr.gz
23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned 
task 923

$zgrep 'task 150' container_1694029806204_12865_01_04/stderr.gz
23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 
923)
23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 
923). 4495 bytes result sent to driver
```

- Another task that's sent to the executor but didn't get launched since 
the single-threaded dispatcher was stuck (presumably in an "infinite loop" as 
explained later).

```
$zgrep 'task 153' container_1694029806204_12865_01_01/stderr.gz
23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 
23.0 (TID 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, 
NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()

$zgrep ' 924' container_1694029806204_12865_01_04/stderr.gz
23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned 
task 924

$zgrep 'task 153' container_1694029806204_12865_01_04/stderr.gz
>> note that the above command has no matching result, indicating that 
task 153.0 in stage 23.0 (TID 924) was never launched
```

- Thread dump shows that the dispatcher-Executor thread has the following 
stack trace.

```
"dispatcher-Executor" #40 daemon prio=5 os_prio=0 
tid=0x98e37800 nid=0x1aff runnable [0x73bba000]
java.lang.Thread.State: RUNNABLE
at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142)
at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131)
at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123)
at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365)
at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365)
at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44)
at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140)
at 
scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169)
at 

[spark] branch branch-3.5 updated: [SPARK-45227][CORE] Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend

2023-09-28 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 5cdb4ab6f0b [SPARK-45227][CORE] Fix a subtle thread-safety issue with 
CoarseGrainedExecutorBackend
5cdb4ab6f0b is described below

commit 5cdb4ab6f0b7aea7d890ee7dff61350671a09e79
Author: Bo Xiong 
AuthorDate: Thu Sep 28 22:53:37 2023 -0500

[SPARK-45227][CORE] Fix a subtle thread-safety issue with 
CoarseGrainedExecutorBackend

### What changes were proposed in this pull request?
Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an 
executor process randomly gets stuck

### Why are the changes needed?
For each executor, the single-threaded dispatcher can run into an "infinite 
loop" (as explained in the SPARK-45227). Once an executor process runs into a 
state, it'd stop launching tasks from the driver or reporting task status back.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
```
$ build/mvn package -DskipTests -pl core
$ build/mvn -Dtest=none 
-DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite 
test
```

### Was this patch authored or co-authored using generative AI tooling?
No


**
**_Please feel free to skip reading unless you're interested in details_**

**

### Symptom

Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the 
very last step of writing a data frame to S3 by calling `df.write`. Looking at 
Spark UI, we saw that an executor process hung over 1 hour. After we manually 
killed the executor process, the app succeeded. Note that the same EMR cluster 
with two worker nodes was able to run the same app without any issue before and 
after the incident.

Below is what's observed from relevant container logs and thread dump.

- A regular task that's sent to the executor, which also reported back to 
the driver upon the task completion.

```
$zgrep 'task 150' container_1694029806204_12865_01_01/stderr.gz
23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 
23.0 (TID 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, 
NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()
23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 
23.0 (TID 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200)

$zgrep ' 923' container_1694029806204_12865_01_04/stderr.gz
23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned 
task 923

$zgrep 'task 150' container_1694029806204_12865_01_04/stderr.gz
23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 
923)
23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 
923). 4495 bytes result sent to driver
```

- Another task that's sent to the executor but didn't get launched since 
the single-threaded dispatcher was stuck (presumably in an "infinite loop" as 
explained later).

```
$zgrep 'task 153' container_1694029806204_12865_01_01/stderr.gz
23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 
23.0 (TID 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, 
NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()

$zgrep ' 924' container_1694029806204_12865_01_04/stderr.gz
23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned 
task 924

$zgrep 'task 153' container_1694029806204_12865_01_04/stderr.gz
>> note that the above command has no matching result, indicating that 
task 153.0 in stage 23.0 (TID 924) was never launched
```

- Thread dump shows that the dispatcher-Executor thread has the following 
stack trace.

```
"dispatcher-Executor" #40 daemon prio=5 os_prio=0 
tid=0x98e37800 nid=0x1aff runnable [0x73bba000]
java.lang.Thread.State: RUNNABLE
at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142)
at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131)
at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123)
at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365)
at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365)
at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44)
at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140)
at 
scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169)
at 

[spark] branch master updated: [SPARK-45227][CORE] Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend

2023-09-28 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8e6b1603a66 [SPARK-45227][CORE] Fix a subtle thread-safety issue with 
CoarseGrainedExecutorBackend
8e6b1603a66 is described below

commit 8e6b1603a66706ee27a0b16d850f5ee56d633354
Author: Bo Xiong 
AuthorDate: Thu Sep 28 22:53:37 2023 -0500

[SPARK-45227][CORE] Fix a subtle thread-safety issue with 
CoarseGrainedExecutorBackend

### What changes were proposed in this pull request?
Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an 
executor process randomly gets stuck

### Why are the changes needed?
For each executor, the single-threaded dispatcher can run into an "infinite 
loop" (as explained in the SPARK-45227). Once an executor process runs into a 
state, it'd stop launching tasks from the driver or reporting task status back.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
```
$ build/mvn package -DskipTests -pl core
$ build/mvn -Dtest=none 
-DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite 
test
```

### Was this patch authored or co-authored using generative AI tooling?
No


**
**_Please feel free to skip reading unless you're interested in details_**

**

### Symptom

Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the 
very last step of writing a data frame to S3 by calling `df.write`. Looking at 
Spark UI, we saw that an executor process hung over 1 hour. After we manually 
killed the executor process, the app succeeded. Note that the same EMR cluster 
with two worker nodes was able to run the same app without any issue before and 
after the incident.

Below is what's observed from relevant container logs and thread dump.

- A regular task that's sent to the executor, which also reported back to 
the driver upon the task completion.

```
$zgrep 'task 150' container_1694029806204_12865_01_01/stderr.gz
23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 
23.0 (TID 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, 
NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()
23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 
23.0 (TID 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200)

$zgrep ' 923' container_1694029806204_12865_01_04/stderr.gz
23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned 
task 923

$zgrep 'task 150' container_1694029806204_12865_01_04/stderr.gz
23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 
923)
23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 
923). 4495 bytes result sent to driver
```

- Another task that's sent to the executor but didn't get launched since 
the single-threaded dispatcher was stuck (presumably in an "infinite loop" as 
explained later).

```
$zgrep 'task 153' container_1694029806204_12865_01_01/stderr.gz
23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 
23.0 (TID 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, 
NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()

$zgrep ' 924' container_1694029806204_12865_01_04/stderr.gz
23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned 
task 924

$zgrep 'task 153' container_1694029806204_12865_01_04/stderr.gz
>> note that the above command has no matching result, indicating that 
task 153.0 in stage 23.0 (TID 924) was never launched
```

- Thread dump shows that the dispatcher-Executor thread has the following 
stack trace.

```
"dispatcher-Executor" #40 daemon prio=5 os_prio=0 
tid=0x98e37800 nid=0x1aff runnable [0x73bba000]
java.lang.Thread.State: RUNNABLE
at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142)
at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131)
at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123)
at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365)
at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365)
at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44)
at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140)
at 
scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169)
at 

[spark] branch master updated: [SPARK-45276][INFRA] Replace Java 8 and Java 11 installed in the Dockerfile with Java

2023-09-28 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 1d9905ab428 [SPARK-45276][INFRA] Replace Java 8 and Java 11 installed 
in the Dockerfile with Java
1d9905ab428 is described below

commit 1d9905ab4289ab4407dd93487dd1f8f036938a59
Author: panbingkun 
AuthorDate: Fri Sep 29 09:16:05 2023 +0800

[SPARK-45276][INFRA] Replace Java 8 and Java 11 installed in the Dockerfile 
with Java

### What changes were proposed in this pull request?
The pr aims to replace Java 8 and Java 11 installed in the Dockerfile, 
include:
- dev/create-release/spark-rm/Dockerfile
- connector/docker/spark-test/base/Dockerfile

### Why are the changes needed?
After SPARK-44112, the minimum supported Java version for Apache Spark 4.0 
is Java 17.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Manually test:

> 1.dev/create-release/spark-rm/Dockerfile

```
cd dev/create-release/spark-rm
docker build -t spark-rm --build-arg UID=$UID .
```

> 2. connector/docker/spark-test/base/Dockerfile
```
cd connector/docker
sh build
```

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #43076 from panbingkun/SPARK-45293.

Authored-by: panbingkun 
Signed-off-by: yangjie01 
---
 connector/docker/spark-test/base/Dockerfile | 2 +-
 dev/create-release/do-release-docker.sh | 2 +-
 dev/create-release/spark-rm/Dockerfile  | 6 +++---
 3 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/connector/docker/spark-test/base/Dockerfile 
b/connector/docker/spark-test/base/Dockerfile
index d4a30c4681c..0e8593f8af5 100644
--- a/connector/docker/spark-test/base/Dockerfile
+++ b/connector/docker/spark-test/base/Dockerfile
@@ -22,7 +22,7 @@ FROM ubuntu:20.04
 # Remove unneeded /var/lib/apt/lists/* after install to reduce the
 # docker image size (by ~30MB)
 RUN apt-get update && \
-apt-get install -y less openjdk-11-jre-headless iproute2 vim-tiny sudo 
openssh-server && \
+apt-get install -y less openjdk-17-jre-headless iproute2 vim-tiny sudo 
openssh-server && \
 rm -rf /var/lib/apt/lists/*
 
 ENV SPARK_HOME /opt/spark
diff --git a/dev/create-release/do-release-docker.sh 
b/dev/create-release/do-release-docker.sh
index 88398bc14dd..c44d0193069 100755
--- a/dev/create-release/do-release-docker.sh
+++ b/dev/create-release/do-release-docker.sh
@@ -45,7 +45,7 @@ Options are:
   -n  : dry run mode. Performs checks and local builds, but do not 
upload anything.
   -t [tag]: tag for the spark-rm docker image to use for building 
(default: "latest").
   -j [path]   : path to local JDK installation to use for building. By default 
the script will
-use openjdk8 installed in the docker image.
+use openjdk17 installed in the docker image.
   -s [step]   : runs a single step of the process; valid steps are: tag, 
build, docs, publish, finalize
 EOF
 }
diff --git a/dev/create-release/spark-rm/Dockerfile 
b/dev/create-release/spark-rm/Dockerfile
index 85155b67bd5..50562e38fb5 100644
--- a/dev/create-release/spark-rm/Dockerfile
+++ b/dev/create-release/spark-rm/Dockerfile
@@ -61,9 +61,9 @@ RUN apt-get clean && apt-get update && $APT_INSTALL gnupg 
ca-certificates && \
   apt-get update && \
   $APT_INSTALL software-properties-common && \
   apt-get update && \
-  # Install openjdk 8.
-  $APT_INSTALL openjdk-8-jdk && \
-  update-alternatives --set java $(ls 
/usr/lib/jvm/java-8-openjdk-*/jre/bin/java) && \
+  # Install openjdk 17.
+  $APT_INSTALL openjdk-17-jdk && \
+  update-alternatives --set java $(ls /usr/lib/jvm/java-17-openjdk-*/bin/java) 
&& \
   # Install build / source control tools
   $APT_INSTALL curl wget git maven ivy subversion make gcc lsof libffi-dev \
 pandoc pandoc-citeproc libssl-dev libcurl4-openssl-dev libxml2-dev && \


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45338][SQL][FOLLOWUP] Remove useless `toSeq`

2023-09-28 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5c4aef4d4ca [SPARK-45338][SQL][FOLLOWUP] Remove useless `toSeq`
5c4aef4d4ca is described below

commit 5c4aef4d4caf753ce9c45d07472df67479371738
Author: Jia Fan 
AuthorDate: Thu Sep 28 19:10:03 2023 -0500

[SPARK-45338][SQL][FOLLOWUP] Remove useless `toSeq`

### What changes were proposed in this pull request?
This is a follow up PR for #43126 , remove useless  invoke `toSeq`

### Why are the changes needed?
Remove useless convert.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
exist test

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43172 from Hisoka-X/SPARK-45338-followup-remove-toseq.

Authored-by: Jia Fan 
Signed-off-by: Sean Owen 
---
 .../apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala   | 2 +-
 .../apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
index 990a7162ea4..5dd8caf3f22 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala
@@ -87,7 +87,7 @@ private[hive] class SparkGetColumnsOperation(
 }.toMap
 
 if (isAuthV2Enabled) {
-  val privObjs = getPrivObjs(db2Tabs).toSeq.asJava
+  val privObjs = getPrivObjs(db2Tabs).asJava
   authorizeMetaGets(HiveOperationType.GET_COLUMNS, privObjs, cmdStr)
 }
 
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
index 7fa492befa0..53a94a128c0 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala
@@ -68,7 +68,7 @@ private[hive] class SparkGetFunctionsOperation(
 if (isAuthV2Enabled) {
   // authorize this call on the schema objects
   val privObjs =
-HivePrivilegeObjectUtils.getHivePrivDbObjects(matchingDbs.toSeq.asJava)
+HivePrivilegeObjectUtils.getHivePrivDbObjects(matchingDbs.asJava)
   authorizeMetaGets(HiveOperationType.GET_FUNCTIONS, privObjs, cmdStr)
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.4 updated: [SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false

2023-09-28 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 68db395867a [SPARK-45057][CORE] Avoid acquire read lock when 
keepReadLock is false
68db395867a is described below

commit 68db395867a3292e9261dd8a3dc191754e1645ef
Author: Warren Zhu 
AuthorDate: Thu Sep 28 18:51:33 2023 -0500

[SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false

### What changes were proposed in this pull request?
Add `keepReadLock` parameter in `lockNewBlockForWriting()`. When 
`keepReadLock` is `false`, skip `lockForReading()` to avoid block on read Lock 
or potential deadlock issue.

When 2 tasks try to compute same rdd with replication level of 2 and 
running on only 2 executors. Deadlock will happen. Details refer [SPARK-45057]

Task thread hold write lock and waiting for replication to remote executor 
while shuffle server thread which handling block upload request waiting on 
`lockForReading` in 
[BlockInfoManager.scala](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala#L457C24-L457C24)

### Why are the changes needed?
This could save unnecessary read lock acquire and avoid deadlock issue 
mention above.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT in BlockInfoManagerSuite

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43067 from warrenzhu25/deadlock.

Authored-by: Warren Zhu 
Signed-off-by: Mridul Muralidharan gmail.com>
(cherry picked from commit 0d6fda5bbee99f9d1821952195efc6764816ec2f)
Signed-off-by: Mridul Muralidharan 
---
 .../scala/org/apache/spark/storage/BlockInfoManager.scala  | 11 +++
 .../main/scala/org/apache/spark/storage/BlockManager.scala |  6 +-
 .../org/apache/spark/storage/BlockInfoManagerSuite.scala   | 14 ++
 3 files changed, 22 insertions(+), 9 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index 9eb1418fd16..d89e6682adf 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -383,13 +383,14 @@ private[storage] class BlockInfoManager extends Logging {
* then just go ahead and acquire the write lock. Otherwise, if another 
thread is already
* writing the block, then we wait for the write to finish before acquiring 
the read lock.
*
-   * @return true if the block did not already exist, false otherwise. If this 
returns false, then
-   * a read lock on the existing block will be held. If this returns 
true, a write lock on
-   * the new block will be held.
+   * @return true if the block did not already exist, false otherwise.
+   * If this returns true, a write lock on the new block will be held.
+   * If this returns false then a read lock will be held iff 
keepReadLock == true.
*/
   def lockNewBlockForWriting(
   blockId: BlockId,
-  newBlockInfo: BlockInfo): Boolean = {
+  newBlockInfo: BlockInfo,
+  keepReadLock: Boolean = true): Boolean = {
 logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
 // Get the lock that will be associated with the to-be written block and 
lock it for the entire
 // duration of this operation. This way we prevent race conditions when 
two threads try to write
@@ -405,6 +406,8 @@ private[storage] class BlockInfoManager extends Logging {
   val result = lockForWriting(blockId, blocking = false)
   assert(result.isDefined)
   return true
+} else if (!keepReadLock) {
+  return false
 } else {
   // Block already exists. This could happen if another thread races 
with us to compute
   // the same block. In this case we try to acquire a read lock, if 
the locking succeeds
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 06c2e615fbc..389fbeb90f5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1444,14 +1444,10 @@ private[spark] class BlockManager(
 
 val putBlockInfo = {
   val newInfo = new BlockInfo(level, classTag, tellMaster)
-  if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
+  if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo, 
keepReadLock)) {
 newInfo
   } else {
 logWarning(s"Block $blockId already exists on this machine; not 
re-adding it")
- 

[spark] branch master updated: [SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false

2023-09-28 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 0d6fda5bbee [SPARK-45057][CORE] Avoid acquire read lock when 
keepReadLock is false
0d6fda5bbee is described below

commit 0d6fda5bbee99f9d1821952195efc6764816ec2f
Author: Warren Zhu 
AuthorDate: Thu Sep 28 18:51:33 2023 -0500

[SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false

### What changes were proposed in this pull request?
Add `keepReadLock` parameter in `lockNewBlockForWriting()`. When 
`keepReadLock` is `false`, skip `lockForReading()` to avoid block on read Lock 
or potential deadlock issue.

When 2 tasks try to compute same rdd with replication level of 2 and 
running on only 2 executors. Deadlock will happen. Details refer [SPARK-45057]

Task thread hold write lock and waiting for replication to remote executor 
while shuffle server thread which handling block upload request waiting on 
`lockForReading` in 
[BlockInfoManager.scala](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala#L457C24-L457C24)

### Why are the changes needed?
This could save unnecessary read lock acquire and avoid deadlock issue 
mention above.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT in BlockInfoManagerSuite

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43067 from warrenzhu25/deadlock.

Authored-by: Warren Zhu 
Signed-off-by: Mridul Muralidharan gmail.com>
---
 .../scala/org/apache/spark/storage/BlockInfoManager.scala  | 11 +++
 .../main/scala/org/apache/spark/storage/BlockManager.scala |  6 +-
 .../org/apache/spark/storage/BlockInfoManagerSuite.scala   | 14 ++
 3 files changed, 22 insertions(+), 9 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index 8dccfbc5e2d..f80190c96e8 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -415,13 +415,14 @@ private[storage] class 
BlockInfoManager(trackingCacheVisibility: Boolean = false
* then just go ahead and acquire the write lock. Otherwise, if another 
thread is already
* writing the block, then we wait for the write to finish before acquiring 
the read lock.
*
-   * @return true if the block did not already exist, false otherwise. If this 
returns false, then
-   * a read lock on the existing block will be held. If this returns 
true, a write lock on
-   * the new block will be held.
+   * @return true if the block did not already exist, false otherwise.
+   * If this returns true, a write lock on the new block will be held.
+   * If this returns false then a read lock will be held iff 
keepReadLock == true.
*/
   def lockNewBlockForWriting(
   blockId: BlockId,
-  newBlockInfo: BlockInfo): Boolean = {
+  newBlockInfo: BlockInfo,
+  keepReadLock: Boolean = true): Boolean = {
 logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
 // Get the lock that will be associated with the to-be written block and 
lock it for the entire
 // duration of this operation. This way we prevent race conditions when 
two threads try to write
@@ -449,6 +450,8 @@ private[storage] class 
BlockInfoManager(trackingCacheVisibility: Boolean = false
   val result = lockForWriting(blockId, blocking = false)
   assert(result.isDefined)
   return true
+} else if (!keepReadLock) {
+  return false
 } else {
   // Block already exists. This could happen if another thread races 
with us to compute
   // the same block. In this case we try to acquire a read lock, if 
the locking succeeds
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 81933744472..cccee78aee1 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1510,14 +1510,10 @@ private[spark] class BlockManager(
 
 val putBlockInfo = {
   val newInfo = new BlockInfo(level, classTag, tellMaster)
-  if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
+  if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo, 
keepReadLock)) {
 newInfo
   } else {
 logWarning(s"Block $blockId already exists on this machine; not 
re-adding it")
-if (!keepReadLock) {
-  // lockNewBlockForWriting 

[spark] branch branch-3.3 updated: [SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false

2023-09-28 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 48138eb1a35 [SPARK-45057][CORE] Avoid acquire read lock when 
keepReadLock is false
48138eb1a35 is described below

commit 48138eb1a3561bcdbfb44a94cf94a481a8bcec98
Author: Warren Zhu 
AuthorDate: Thu Sep 28 18:51:33 2023 -0500

[SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false

### What changes were proposed in this pull request?
Add `keepReadLock` parameter in `lockNewBlockForWriting()`. When 
`keepReadLock` is `false`, skip `lockForReading()` to avoid block on read Lock 
or potential deadlock issue.

When 2 tasks try to compute same rdd with replication level of 2 and 
running on only 2 executors. Deadlock will happen. Details refer [SPARK-45057]

Task thread hold write lock and waiting for replication to remote executor 
while shuffle server thread which handling block upload request waiting on 
`lockForReading` in 
[BlockInfoManager.scala](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala#L457C24-L457C24)

### Why are the changes needed?
This could save unnecessary read lock acquire and avoid deadlock issue 
mention above.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT in BlockInfoManagerSuite

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43067 from warrenzhu25/deadlock.

Authored-by: Warren Zhu 
Signed-off-by: Mridul Muralidharan gmail.com>
(cherry picked from commit 0d6fda5bbee99f9d1821952195efc6764816ec2f)
Signed-off-by: Mridul Muralidharan 
---
 .../scala/org/apache/spark/storage/BlockInfoManager.scala  | 11 +++
 .../main/scala/org/apache/spark/storage/BlockManager.scala |  6 +-
 .../org/apache/spark/storage/BlockInfoManagerSuite.scala   | 14 ++
 3 files changed, 22 insertions(+), 9 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index 9eb1418fd16..d89e6682adf 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -383,13 +383,14 @@ private[storage] class BlockInfoManager extends Logging {
* then just go ahead and acquire the write lock. Otherwise, if another 
thread is already
* writing the block, then we wait for the write to finish before acquiring 
the read lock.
*
-   * @return true if the block did not already exist, false otherwise. If this 
returns false, then
-   * a read lock on the existing block will be held. If this returns 
true, a write lock on
-   * the new block will be held.
+   * @return true if the block did not already exist, false otherwise.
+   * If this returns true, a write lock on the new block will be held.
+   * If this returns false then a read lock will be held iff 
keepReadLock == true.
*/
   def lockNewBlockForWriting(
   blockId: BlockId,
-  newBlockInfo: BlockInfo): Boolean = {
+  newBlockInfo: BlockInfo,
+  keepReadLock: Boolean = true): Boolean = {
 logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
 // Get the lock that will be associated with the to-be written block and 
lock it for the entire
 // duration of this operation. This way we prevent race conditions when 
two threads try to write
@@ -405,6 +406,8 @@ private[storage] class BlockInfoManager extends Logging {
   val result = lockForWriting(blockId, blocking = false)
   assert(result.isDefined)
   return true
+} else if (!keepReadLock) {
+  return false
 } else {
   // Block already exists. This could happen if another thread races 
with us to compute
   // the same block. In this case we try to acquire a read lock, if 
the locking succeeds
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index f35131ee5b9..cdd91fb4c07 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1432,14 +1432,10 @@ private[spark] class BlockManager(
 
 val putBlockInfo = {
   val newInfo = new BlockInfo(level, classTag, tellMaster)
-  if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
+  if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo, 
keepReadLock)) {
 newInfo
   } else {
 logWarning(s"Block $blockId already exists on this machine; not 
re-adding it")
- 

[spark] branch branch-3.5 updated: [SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false

2023-09-28 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new aeba488ccd9 [SPARK-45057][CORE] Avoid acquire read lock when 
keepReadLock is false
aeba488ccd9 is described below

commit aeba488ccd9213d28e6401d1bf7eadfdb9d955c3
Author: Warren Zhu 
AuthorDate: Thu Sep 28 18:51:33 2023 -0500

[SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false

### What changes were proposed in this pull request?
Add `keepReadLock` parameter in `lockNewBlockForWriting()`. When 
`keepReadLock` is `false`, skip `lockForReading()` to avoid block on read Lock 
or potential deadlock issue.

When 2 tasks try to compute same rdd with replication level of 2 and 
running on only 2 executors. Deadlock will happen. Details refer [SPARK-45057]

Task thread hold write lock and waiting for replication to remote executor 
while shuffle server thread which handling block upload request waiting on 
`lockForReading` in 
[BlockInfoManager.scala](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala#L457C24-L457C24)

### Why are the changes needed?
This could save unnecessary read lock acquire and avoid deadlock issue 
mention above.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT in BlockInfoManagerSuite

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43067 from warrenzhu25/deadlock.

Authored-by: Warren Zhu 
Signed-off-by: Mridul Muralidharan gmail.com>
(cherry picked from commit 0d6fda5bbee99f9d1821952195efc6764816ec2f)
Signed-off-by: Mridul Muralidharan 
---
 .../scala/org/apache/spark/storage/BlockInfoManager.scala  | 11 +++
 .../main/scala/org/apache/spark/storage/BlockManager.scala |  6 +-
 .../org/apache/spark/storage/BlockInfoManagerSuite.scala   | 14 ++
 3 files changed, 22 insertions(+), 9 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index 45ebb6eafa6..ab4073fe8c0 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -415,13 +415,14 @@ private[storage] class 
BlockInfoManager(trackingCacheVisibility: Boolean = false
* then just go ahead and acquire the write lock. Otherwise, if another 
thread is already
* writing the block, then we wait for the write to finish before acquiring 
the read lock.
*
-   * @return true if the block did not already exist, false otherwise. If this 
returns false, then
-   * a read lock on the existing block will be held. If this returns 
true, a write lock on
-   * the new block will be held.
+   * @return true if the block did not already exist, false otherwise.
+   * If this returns true, a write lock on the new block will be held.
+   * If this returns false then a read lock will be held iff 
keepReadLock == true.
*/
   def lockNewBlockForWriting(
   blockId: BlockId,
-  newBlockInfo: BlockInfo): Boolean = {
+  newBlockInfo: BlockInfo,
+  keepReadLock: Boolean = true): Boolean = {
 logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
 // Get the lock that will be associated with the to-be written block and 
lock it for the entire
 // duration of this operation. This way we prevent race conditions when 
two threads try to write
@@ -449,6 +450,8 @@ private[storage] class 
BlockInfoManager(trackingCacheVisibility: Boolean = false
   val result = lockForWriting(blockId, blocking = false)
   assert(result.isDefined)
   return true
+} else if (!keepReadLock) {
+  return false
 } else {
   // Block already exists. This could happen if another thread races 
with us to compute
   // the same block. In this case we try to acquire a read lock, if 
the locking succeeds
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 05d57c67576..6de6069d2fe 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1510,14 +1510,10 @@ private[spark] class BlockManager(
 
 val putBlockInfo = {
   val newInfo = new BlockInfo(level, classTag, tellMaster)
-  if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) {
+  if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo, 
keepReadLock)) {
 newInfo
   } else {
 logWarning(s"Block $blockId 

[spark] branch master updated: [SPARK-44937][CORE] Add test keys for SSL functionality

2023-09-28 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d52561cfb1f [SPARK-44937][CORE] Add test keys for SSL functionality
d52561cfb1f is described below

commit d52561cfb1f19bf4e02510f3ac71e6a04533b347
Author: Hasnain Lakhani 
AuthorDate: Thu Sep 28 18:32:06 2023 -0500

[SPARK-44937][CORE] Add test keys for SSL functionality

### What changes were proposed in this pull request?

This PR introduces test keys for SSL functionality. They keys were 
generated using something like the following:

```
openssl genpkey -algorithm RSA -out key.pem -hexseed deadbeef
openssl pkcs8 -topk8 -in key.pem -out key.pem.out
openssl req -new -key key.pem -out csr.csr -days 3650 -subj 
"/CN=test/ST=California/L=San Francisco/OU=Org1/O=Org2/C=US"
openssl x509 -req -in csr.csr -signkey key.pem -out certchain.pem -days 3650
rm key.pem csr.csr
mv key.pem.enc key.pem
```

And then copied to all the relevant folders. I also copied over the 
keystore and trustore files (did not regenerate those).

### Why are the changes needed?

We need these test files to run tests using PEM keys for SSL connections.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

These test files will be used by follow up PRs. This was tested as part of 
https://github.com/apache/spark/pull/42685, whic is being split.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43163 from hasnain-db/spark-tls-test-files.

Authored-by: Hasnain Lakhani 
Signed-off-by: Mridul Muralidharan gmail.com>
---
 .../src/test/resources/certchain.pem   |  17 
 common/network-common/src/test/resources/key.pem   |  29 +
 common/network-common/src/test/resources/keystore  | Bin 0 -> 2247 bytes
 .../network-common/src/test/resources/truststore   | Bin 0 -> 957 bytes
 .../src/test/resources/untrusted-keystore  | Bin 0 -> 2246 bytes
 .../src/test/resources/certchain.pem   |  17 
 common/network-shuffle/src/test/resources/key.pem  |  29 +
 common/network-shuffle/src/test/resources/keystore | Bin 0 -> 2247 bytes
 .../network-shuffle/src/test/resources/truststore  | Bin 0 -> 957 bytes
 .../src/test/resources/untrusted-keystore  | Bin 0 -> 2246 bytes
 core/src/test/resources/certchain.pem  |  17 
 core/src/test/resources/key.pem|  29 +
 .../yarn/src/test/resources/certchain.pem  |  17 
 resource-managers/yarn/src/test/resources/key.pem  |  29 +
 resource-managers/yarn/src/test/resources/keystore | Bin 0 -> 2247 bytes
 .../yarn/src/test/resources/truststore | Bin 0 -> 957 bytes
 .../yarn/src/test/resources/untrusted-keystore | Bin 0 -> 2246 bytes
 17 files changed, 184 insertions(+)

diff --git a/common/network-common/src/test/resources/certchain.pem 
b/common/network-common/src/test/resources/certchain.pem
new file mode 100644
index 000..1004cacc9bf
--- /dev/null
+++ b/common/network-common/src/test/resources/certchain.pem
@@ -0,0 +1,17 @@
+-BEGIN CERTIFICATE-
+MIICsDCCAZgCCQD7yXTHZWZZlDANBgkqhkiG9w0BAQsFADAaMQswCQYDVQQGEwJV
+UzELMAkGA1UECAwCQ0EwHhcNMjMwNjIwMTczMjAzWhcNMzMwNjE3MTczMjAzWjAa
+MQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0EwggEiMA0GCSqGSIb3DQEBAQUAA4IB
+DwAwggEKAoIBAQDH4DO8IP/7xZgpzmYrBaqzsnpamq54cXP8JdQUOXP/dmh8myGg
+CUau/nNdpPNr1Od2iUvf1Z9OW+KcHdNAL/zcwe1ehU3d6/M+UinDtfbEb4HSyQ31
+9AIlPSUq+pJAlsAGJYERLGHPBNXEay0r0+TR0cd9CfSN79rXUMag40pZC3zdxXmY
+JpSkhNuiYfa+Z9TgXoki5MzNiyH12gAb9tO8tr55BnE5s/QujOp7LMjlf6VkE7Bp
+hqj1UbcHmFw7U9jyLDfi98uIvlEDFCwXARdmLxxaYAOqdgZ3TtjBvbugVRpRFQiw
+haFzkiok9bh+MclKQBKvF0ArHmMLHkcCd5oPAgMBAAEwDQYJKoZIhvcNAQELBQAD
+ggEBADYIPLwlnuH6rTbkjeZFYC2UXjNesbUe1TXbsBo9DDHJUSFjNNDDAUpSzhxb
+q6nMvex7tnTvTjAgOQR/qwAueAfcXHWe0EKvn4Y6yJERepSsYg5bSYBt+UJxW89R
+JRLmzBFxEJy1YhsqGCh+I2wRoRz8ZGokDyqcrAlwlzXYVDfNC4wUo14Cm+s90yc3
+2I/roX/MWec8QbEbr25psAYVnRdUL1mzCeQMc83A8Y0SDPfF5ECFhvFXkVaDTULO
+RddXWJoC4K5RuGa6yvpb75I8VTE3fwE2ykSgPuMShNZREDCuszkpPjjFumq9pCOJ
+nUO1huCqjxC1ehPe/9/jgmzoVX4=
+-END CERTIFICATE-
diff --git a/common/network-common/src/test/resources/key.pem 
b/common/network-common/src/test/resources/key.pem
new file mode 100644
index 000..77122755bfd
--- /dev/null
+++ b/common/network-common/src/test/resources/key.pem
@@ -0,0 +1,29 @@
+-BEGIN ENCRYPTED PRIVATE KEY-
+MIIE6TAbBgkqhkiG9w0BBQMwDgQIGBIe7ugOgfACAggABIIEyJgkzYc/ixcvwLJC
+eTzGOVwk+F1cqM4H63FOxIjroaxisceqoBmty6Rf4PJ1C9nprkSs6G/SkupbNUUB
+YiWmsQ91orllbHsczAc+qaa0tmommwgt27ZrfXdXBxDB0mJWQTijkVHWfyTqcXmC

[spark] branch branch-3.3 updated: [SPARK-44937][CORE] Mark connection as timedOut in TransportClient.close

2023-09-28 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new e965cc835c9 [SPARK-44937][CORE] Mark connection as timedOut in 
TransportClient.close
e965cc835c9 is described below

commit e965cc835c91c6f7a87ec7ed140e5cf34c445798
Author: Hasnain Lakhani 
AuthorDate: Thu Sep 28 18:16:49 2023 -0500

[SPARK-44937][CORE] Mark connection as timedOut in TransportClient.close

### What changes were proposed in this pull request?

This PR avoids a race condition where a connection which is in the process 
of being closed could be returned by the TransportClientFactory only to be 
immediately closed and cause errors upon use.

This race condition is rare and not easily triggered, but with the upcoming 
changes to introduce SSL connection support, connection closing can take just a 
slight bit longer and it's much easier to trigger this issue.

Looking at the history of the code I believe this was an oversight in 
https://github.com/apache/spark/pull/9853.

### Why are the changes needed?

Without this change, some of the new tests added in 
https://github.com/apache/spark/pull/42685 would fail

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests were run in CI.
Without this change, some of the new tests added in 
https://github.com/apache/spark/pull/42685 fail

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43162 from hasnain-db/spark-tls-timeout.

Authored-by: Hasnain Lakhani 
Signed-off-by: Mridul Muralidharan gmail.com>
(cherry picked from commit 2a88feadd4b7cec9e01bc744e589783e3390e5ce)
Signed-off-by: Mridul Muralidharan 
---
 .../main/java/org/apache/spark/network/client/TransportClient.java   | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
index dd2fdb08ee5..884df899981 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -325,7 +325,10 @@ public class TransportClient implements Closeable {
 
   @Override
   public void close() {
-// close is a local operation and should finish with milliseconds; timeout 
just to be safe
+// Mark the connection as timed out, so we do not return a connection 
that's being closed
+// from the TransportClientFactory if closing takes some time (e.g. with 
SSL)
+this.timedOut = true;
+// close should not take this long; use a timeout just to be safe
 channel.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.4 updated: [SPARK-44937][CORE] Mark connection as timedOut in TransportClient.close

2023-09-28 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 85bf705a93f [SPARK-44937][CORE] Mark connection as timedOut in 
TransportClient.close
85bf705a93f is described below

commit 85bf705a93f17767bf8653b0ce5a144af5a21e95
Author: Hasnain Lakhani 
AuthorDate: Thu Sep 28 18:16:49 2023 -0500

[SPARK-44937][CORE] Mark connection as timedOut in TransportClient.close

### What changes were proposed in this pull request?

This PR avoids a race condition where a connection which is in the process 
of being closed could be returned by the TransportClientFactory only to be 
immediately closed and cause errors upon use.

This race condition is rare and not easily triggered, but with the upcoming 
changes to introduce SSL connection support, connection closing can take just a 
slight bit longer and it's much easier to trigger this issue.

Looking at the history of the code I believe this was an oversight in 
https://github.com/apache/spark/pull/9853.

### Why are the changes needed?

Without this change, some of the new tests added in 
https://github.com/apache/spark/pull/42685 would fail

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests were run in CI.
Without this change, some of the new tests added in 
https://github.com/apache/spark/pull/42685 fail

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43162 from hasnain-db/spark-tls-timeout.

Authored-by: Hasnain Lakhani 
Signed-off-by: Mridul Muralidharan gmail.com>
(cherry picked from commit 2a88feadd4b7cec9e01bc744e589783e3390e5ce)
Signed-off-by: Mridul Muralidharan 
---
 .../main/java/org/apache/spark/network/client/TransportClient.java   | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
index 4a0a1566998..40825e06b82 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -325,7 +325,10 @@ public class TransportClient implements Closeable {
 
   @Override
   public void close() {
-// close is a local operation and should finish with milliseconds; timeout 
just to be safe
+// Mark the connection as timed out, so we do not return a connection 
that's being closed
+// from the TransportClientFactory if closing takes some time (e.g. with 
SSL)
+this.timedOut = true;
+// close should not take this long; use a timeout just to be safe
 channel.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44937][CORE] Mark connection as timedOut in TransportClient.close

2023-09-28 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2a88feadd4b [SPARK-44937][CORE] Mark connection as timedOut in 
TransportClient.close
2a88feadd4b is described below

commit 2a88feadd4b7cec9e01bc744e589783e3390e5ce
Author: Hasnain Lakhani 
AuthorDate: Thu Sep 28 18:16:49 2023 -0500

[SPARK-44937][CORE] Mark connection as timedOut in TransportClient.close

### What changes were proposed in this pull request?

This PR avoids a race condition where a connection which is in the process 
of being closed could be returned by the TransportClientFactory only to be 
immediately closed and cause errors upon use.

This race condition is rare and not easily triggered, but with the upcoming 
changes to introduce SSL connection support, connection closing can take just a 
slight bit longer and it's much easier to trigger this issue.

Looking at the history of the code I believe this was an oversight in 
https://github.com/apache/spark/pull/9853.

### Why are the changes needed?

Without this change, some of the new tests added in 
https://github.com/apache/spark/pull/42685 would fail

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests were run in CI.
Without this change, some of the new tests added in 
https://github.com/apache/spark/pull/42685 fail

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43162 from hasnain-db/spark-tls-timeout.

Authored-by: Hasnain Lakhani 
Signed-off-by: Mridul Muralidharan gmail.com>
---
 .../main/java/org/apache/spark/network/client/TransportClient.java   | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
index 4a0a1566998..40825e06b82 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -325,7 +325,10 @@ public class TransportClient implements Closeable {
 
   @Override
   public void close() {
-// close is a local operation and should finish with milliseconds; timeout 
just to be safe
+// Mark the connection as timed out, so we do not return a connection 
that's being closed
+// from the TransportClientFactory if closing takes some time (e.g. with 
SSL)
+this.timedOut = true;
+// close should not take this long; use a timeout just to be safe
 channel.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-44937][CORE] Mark connection as timedOut in TransportClient.close

2023-09-28 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 3a723a171b1 [SPARK-44937][CORE] Mark connection as timedOut in 
TransportClient.close
3a723a171b1 is described below

commit 3a723a171b1b77a8a8b2ccce2bd489acb8db00a3
Author: Hasnain Lakhani 
AuthorDate: Thu Sep 28 18:16:49 2023 -0500

[SPARK-44937][CORE] Mark connection as timedOut in TransportClient.close

### What changes were proposed in this pull request?

This PR avoids a race condition where a connection which is in the process 
of being closed could be returned by the TransportClientFactory only to be 
immediately closed and cause errors upon use.

This race condition is rare and not easily triggered, but with the upcoming 
changes to introduce SSL connection support, connection closing can take just a 
slight bit longer and it's much easier to trigger this issue.

Looking at the history of the code I believe this was an oversight in 
https://github.com/apache/spark/pull/9853.

### Why are the changes needed?

Without this change, some of the new tests added in 
https://github.com/apache/spark/pull/42685 would fail

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests were run in CI.
Without this change, some of the new tests added in 
https://github.com/apache/spark/pull/42685 fail

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43162 from hasnain-db/spark-tls-timeout.

Authored-by: Hasnain Lakhani 
Signed-off-by: Mridul Muralidharan gmail.com>
(cherry picked from commit 2a88feadd4b7cec9e01bc744e589783e3390e5ce)
Signed-off-by: Mridul Muralidharan 
---
 .../main/java/org/apache/spark/network/client/TransportClient.java   | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
index 4a0a1566998..40825e06b82 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
@@ -325,7 +325,10 @@ public class TransportClient implements Closeable {
 
   @Override
   public void close() {
-// close is a local operation and should finish with milliseconds; timeout 
just to be safe
+// Mark the connection as timed out, so we do not return a connection 
that's being closed
+// from the TransportClientFactory if closing takes some time (e.g. with 
SSL)
+this.timedOut = true;
+// close should not take this long; use a timeout just to be safe
 channel.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44895][CORE][UI] Add 'daemon', 'priority' for ThreadStackTrace

2023-09-28 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6341310711e [SPARK-44895][CORE][UI] Add 'daemon', 'priority' for 
ThreadStackTrace
6341310711e is described below

commit 6341310711ee0e3edbdd42aaeaf806cad4edefb5
Author: Kent Yao 
AuthorDate: Thu Sep 28 18:04:03 2023 -0500

[SPARK-44895][CORE][UI] Add 'daemon', 'priority' for ThreadStackTrace

### What changes were proposed in this pull request?

Since version 9, Java has supported the 'daemon' and 'priority' fields in 
ThreadInfo. In this PR, we extract them from ThreadInfo to ThreadStackTrace

### Why are the changes needed?

more information for thread pages in UI and rest APIs

### Does this PR introduce _any_ user-facing change?

yes, ThreadStackTrace changes

### How was this patch tested?

new tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #43095 from yaooqinn/SPARK-44895.

Authored-by: Kent Yao 
Signed-off-by: Sean Owen 
---
 .../main/scala/org/apache/spark/status/api/v1/api.scala| 10 ++
 core/src/main/scala/org/apache/spark/util/Utils.scala  |  4 +++-
 .../test/scala/org/apache/spark/ui/UISeleniumSuite.scala   | 14 ++
 3 files changed, 23 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 3e4e2f17a77..7a0c69e2948 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -540,19 +540,21 @@ case class ThreadStackTrace(
 lockName: Option[String],
 lockOwnerName: Option[String],
 suspended: Boolean,
-inNative: Boolean) {
+inNative: Boolean,
+isDaemon: Boolean,
+priority: Int) {
 
   /**
* Returns a string representation of this thread stack trace
* w.r.t java.lang.management.ThreadInfo(JDK 8)'s toString.
*
-   * TODO(SPARK-44895): Considering 'daemon', 'priority' from higher JDKs
-   *
* TODO(SPARK-44896): Also considering adding information os_prio, cpu, 
elapsed, tid, nid, etc.,
*   from the jstack tool
*/
   override def toString: String = {
-val sb = new StringBuilder(s$threadName" Id=$threadId $threadState""")
+val daemon = if (isDaemon) " daemon" else ""
+val sb = new StringBuilder(
+  s$threadName"$daemon prio=$priority Id=$threadId $threadState""")
 lockName.foreach(lock => sb.append(s" on $lock"))
 lockOwnerName.foreach {
   owner => sb.append(s"""owned by "$owner)
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 48dfbecb7cd..dcffa99dc64 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2196,7 +2196,9 @@ private[spark] object Utils
   Option(threadInfo.getLockName),
   Option(threadInfo.getLockOwnerName),
   threadInfo.isSuspended,
-  threadInfo.isInNative)
+  threadInfo.isInNative,
+  threadInfo.isDaemon,
+  threadInfo.getPriority)
   }
 
   /**
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index dd9927d7ba1..7e74cc9287f 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -885,6 +885,20 @@ class UISeleniumSuite extends SparkFunSuite with 
WebBrowser with Matchers {
 }
   }
 
+  test("SPARK-44895: Add 'daemon', 'priority' for ThreadStackTrace") {
+withSpark(newSparkContext()) { sc =>
+  val uiThreads = getJson(sc.ui.get, "executors/driver/threads")
+.children
+.filter(v => (v \ 
"threadName").extract[String].matches("SparkUI-\\d+"))
+  val priority = Thread.currentThread().getPriority
+
+  uiThreads.foreach { v =>
+assert((v \ "isDaemon").extract[Boolean])
+assert((v \ "priority").extract[Int] === priority)
+  }
+}
+  }
+
   def goToUi(sc: SparkContext, path: String): Unit = {
 goToUi(sc.ui.get, path)
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45362][PYTHON] Project out PARTITION BY expressions before Python UDTF 'eval' method consumes them

2023-09-28 Thread ueshin
This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b3d5bc0c109 [SPARK-45362][PYTHON] Project out PARTITION BY expressions 
before Python UDTF 'eval' method consumes them
b3d5bc0c109 is described below

commit b3d5bc0c10908aa66510844eaabc43b6764dd7c0
Author: Daniel Tenedorio 
AuthorDate: Thu Sep 28 14:02:46 2023 -0700

[SPARK-45362][PYTHON] Project out PARTITION BY expressions before Python 
UDTF 'eval' method consumes them

### What changes were proposed in this pull request?

This PR projects out PARTITION BY expressions before Python UDTF 'eval' 
method consumes them.

Before this PR, if a query included this `PARTITION BY` clause:

```
SELECT * FROM udtf((SELECT a, b FROM TABLE t) PARTITION BY (c, d))
```

Then the `eval` method received four columns in each row: `a, b, c, d`.

After this PR, the `eval` method only receives two columns: `a, b`, as 
expected.

### Why are the changes needed?

This makes the Python UDTF `TABLE` columns consistently match what the 
`eval` method receives, as expected.

### Does this PR introduce _any_ user-facing change?

Yes, see above.

### How was this patch tested?

This PR adds new unit tests.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43156 from dtenedor/project-out-partition-exprs.

Authored-by: Daniel Tenedorio 
Signed-off-by: Takuya UESHIN 
---
 python/pyspark/sql/tests/test_udtf.py | 12 
 python/pyspark/worker.py  | 31 +++
 2 files changed, 39 insertions(+), 4 deletions(-)

diff --git a/python/pyspark/sql/tests/test_udtf.py 
b/python/pyspark/sql/tests/test_udtf.py
index 97d5190a506..a1d82056c50 100644
--- a/python/pyspark/sql/tests/test_udtf.py
+++ b/python/pyspark/sql/tests/test_udtf.py
@@ -2009,6 +2009,10 @@ class BaseUDTFTestsMixin:
 self._partition_col = None
 
 def eval(self, row: Row):
+# Make sure that the PARTITION BY expressions were projected 
out.
+assert len(row.asDict().items()) == 2
+assert "partition_col" in row
+assert "input" in row
 self._sum += row["input"]
 if self._partition_col is not None and self._partition_col != 
row["partition_col"]:
 # Make sure that all values of the partitioning column are 
the same
@@ -2092,6 +2096,10 @@ class BaseUDTFTestsMixin:
 self._partition_col = None
 
 def eval(self, row: Row, partition_col: str):
+# Make sure that the PARTITION BY and ORDER BY expressions 
were projected out.
+assert len(row.asDict().items()) == 2
+assert "partition_col" in row
+assert "input" in row
 # Make sure that all values of the partitioning column are the 
same
 # for each row consumed by this method for this instance of 
the class.
 if self._partition_col is not None and self._partition_col != 
row[partition_col]:
@@ -2247,6 +2255,10 @@ class BaseUDTFTestsMixin:
 )
 
 def eval(self, row: Row):
+# Make sure that the PARTITION BY and ORDER BY expressions 
were projected out.
+assert len(row.asDict().items()) == 2
+assert "partition_col" in row
+assert "input" in row
 # Make sure that all values of the partitioning column are the 
same
 # for each row consumed by this method for this instance of 
the class.
 if self._partition_col is not None and self._partition_col != 
row["partition_col"]:
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 77481704979..4cffb02a64a 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -51,7 +51,14 @@ from pyspark.sql.pandas.serializers import (
 ApplyInPandasWithStateSerializer,
 )
 from pyspark.sql.pandas.types import to_arrow_type
-from pyspark.sql.types import BinaryType, Row, StringType, StructType, 
_parse_datatype_json_string
+from pyspark.sql.types import (
+BinaryType,
+Row,
+StringType,
+StructType,
+_create_row,
+_parse_datatype_json_string,
+)
 from pyspark.util import fail_on_stopiteration, handle_worker_exception
 from pyspark import shuffle
 from pyspark.errors import PySparkRuntimeError, PySparkTypeError
@@ -735,7 +742,12 @@ def read_udtf(pickleSer, infile, eval_type):
 yield row
 self._udtf = self._create_udtf()
 if self._udtf.eval is not None:
-result = self._udtf.eval(*args, **kwargs)
+

[spark] branch master updated: [SPARK-45365][INFRA] Allow `branch-3.4` daily test to use the new test group tags

2023-09-28 Thread sunchao
This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b938ff9f520 [SPARK-45365][INFRA] Allow `branch-3.4` daily test to use 
the new test group tags
b938ff9f520 is described below

commit b938ff9f520fd4e4997938284ffa0aba9ea271fc
Author: yangjie01 
AuthorDate: Thu Sep 28 13:05:57 2023 -0700

[SPARK-45365][INFRA] Allow `branch-3.4` daily test to use the new test 
group tags

### What changes were proposed in this pull request?
This pr aims to allow branch-3.4 daily test to use the new test group tags.

### Why are the changes needed?
Already backport SPARK-44034 to branch-3.4, new test tags can be used to 
reduce the average time consumption of daily tests.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Should monitor GA.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43159 from LuciferYang/SPARK-45365.

Authored-by: yangjie01 
Signed-off-by: Chao Sun 
---
 .github/workflows/build_and_test.yml | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/.github/workflows/build_and_test.yml 
b/.github/workflows/build_and_test.yml
index 89da6fcffe4..53994788d8b 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -180,10 +180,10 @@ jobs:
 java: ${{ inputs.java }}
 hadoop: ${{ inputs.hadoop }}
 hive: hive2.3
-# Using a tag that will not appear in sql module for placeholder, 
branch-3.3 and branch-3.4 will not run any UTs.
+# Using a tag that will not appear in sql module for placeholder, 
branch-3.3 will not run any UTs.
 included-tags: >-
   ${{
-((inputs.branch == 'branch-3.3' || inputs.branch == 
'branch-3.4') && 'org.apache.spark.tags.SlowHiveTest')
+(inputs.branch == 'branch-3.3' && 
'org.apache.spark.tags.SlowHiveTest')
 || 'org.apache.spark.tags.SlowSQLTest'
   }}
 comment: "- slow tests"
@@ -193,7 +193,7 @@ jobs:
 hive: hive2.3
 excluded-tags: >-
   ${{
-((inputs.branch == 'branch-3.3' || inputs.branch == 
'branch-3.4') && 'org.apache.spark.tags.ExtendedSQLTest')
+(inputs.branch == 'branch-3.3' && 
'org.apache.spark.tags.ExtendedSQLTest')
 || 
'org.apache.spark.tags.ExtendedSQLTest,org.apache.spark.tags.SlowSQLTest'
   }}
 comment: "- other tests"


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [MINOR][SQL] Remove duplicate cases of escaping characters in string literals

2023-09-28 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ee21b12c395 [MINOR][SQL] Remove duplicate cases of escaping characters 
in string literals
ee21b12c395 is described below

commit ee21b12c395ac184c8ddc2f74b66f6e6285de5fa
Author: Max Gekk 
AuthorDate: Thu Sep 28 21:18:40 2023 +0300

[MINOR][SQL] Remove duplicate cases of escaping characters in string 
literals

### What changes were proposed in this pull request?
In the PR, I propose to remove some cases in `appendEscapedChar()` because 
they fall to the default case.

The following tests check the cases:
- 
https://github.com/apache/spark/blob/187e9a851758c0e9cec11edab2bc07d6f4404001/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ParserUtilsSuite.scala#L97-L98
- 
https://github.com/apache/spark/blob/187e9a851758c0e9cec11edab2bc07d6f4404001/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ParserUtilsSuite.scala#L104

### Why are the changes needed?
To improve code maintainability.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By running the affected test suite:
```
$ build/sbt "test:testOnly *.ParserUtilsSuite"
```

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #43170 from MaxGekk/cleanup-escaping.

Authored-by: Max Gekk 
Signed-off-by: Max Gekk 
---
 .../scala/org/apache/spark/sql/catalyst/util/SparkParserUtils.scala| 3 ---
 1 file changed, 3 deletions(-)

diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkParserUtils.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkParserUtils.scala
index c318f208255..a4ce5fb1203 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkParserUtils.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkParserUtils.scala
@@ -38,14 +38,11 @@ trait SparkParserUtils {
 def appendEscapedChar(n: Char): Unit = {
   n match {
 case '0' => sb.append('\u')
-case '\'' => sb.append('\'')
-case '"' => sb.append('\"')
 case 'b' => sb.append('\b')
 case 'n' => sb.append('\n')
 case 'r' => sb.append('\r')
 case 't' => sb.append('\t')
 case 'Z' => sb.append('\u001A')
-case '\\' => sb.append('\\')
 // The following 2 lines are exactly what MySQL does TODO: why do we 
do this?
 case '%' => sb.append("\\%")
 case '_' => sb.append("\\_")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45266][PYTHON] Refactor ResolveFunctions analyzer rule to delay making lateral join when table arguments are used

2023-09-28 Thread ueshin
This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2cf937f9bac [SPARK-45266][PYTHON] Refactor ResolveFunctions analyzer 
rule to delay making lateral join when table arguments are used
2cf937f9bac is described below

commit 2cf937f9bac2131f3657660a8d65d07ab4ece490
Author: Takuya UESHIN 
AuthorDate: Thu Sep 28 10:37:18 2023 -0700

[SPARK-45266][PYTHON] Refactor ResolveFunctions analyzer rule to delay 
making lateral join when table arguments are used

### What changes were proposed in this pull request?

Refactors `ResolveFunctions` analyzer rule to delay making lateral join 
when table arguments are used.

- Delay making lateral join when table arguments are used to after all the 
children are resolved
- Resolve `UnresolvedPolymorphicPythonUDTF` in one place
- Introduce a new error class 
`UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.UNSUPPORTED_TABLE_ARGUMENT` if table 
arguments are used inproperly.

### Why are the changes needed?

The analyzer rule `ResolveFunctions` became complicated.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43042 from ueshin/issues/SPARK-45266/analyzer.

Authored-by: Takuya UESHIN 
Signed-off-by: Takuya UESHIN 
---
 .../src/main/resources/error/error-classes.json|   5 +
 ...ted-subquery-expression-category-error-class.md |   4 +
 .../spark/sql/catalyst/analysis/Analyzer.scala | 155 +
 .../sql/catalyst/analysis/CheckAnalysis.scala  |   5 +
 .../spark/sql/catalyst/expressions/PythonUDF.scala |   6 +-
 .../named-function-arguments.sql.out   |  16 +--
 .../results/named-function-arguments.sql.out   |  16 +--
 .../sql/execution/python/PythonUDTFSuite.scala |  20 ++-
 8 files changed, 103 insertions(+), 124 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 0882e387176..58fcedae332 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -3484,6 +3484,11 @@
 "message" : [
   "IN/EXISTS predicate subqueries can only be used in filters, joins, 
aggregations, window functions, projections, and UPDATE/MERGE/DELETE 
commands."
 ]
+  },
+  "UNSUPPORTED_TABLE_ARGUMENT" : {
+"message" : [
+  "Table arguments are used in a function where they are not 
supported."
+]
   }
 },
 "sqlState" : "0A000"
diff --git 
a/docs/sql-error-conditions-unsupported-subquery-expression-category-error-class.md
 
b/docs/sql-error-conditions-unsupported-subquery-expression-category-error-class.md
index f61ea721aa0..45ad386c666 100644
--- 
a/docs/sql-error-conditions-unsupported-subquery-expression-category-error-class.md
+++ 
b/docs/sql-error-conditions-unsupported-subquery-expression-category-error-class.md
@@ -73,4 +73,8 @@ Correlated scalar subqueries can only be used in filters, 
aggregations, projecti
 
 IN/EXISTS predicate subqueries can only be used in filters, joins, 
aggregations, window functions, projections, and UPDATE/MERGE/DELETE 
commands``.
 
+## UNSUPPORTED_TABLE_ARGUMENT
+
+Table arguments are used in a function where they are not 
supported``.
+
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 67a958d73f7..cc0bfd3fc31 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2082,7 +2082,7 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
   case u: UnresolvedTableValuedFunction if 
u.functionArgs.forall(_.resolved) =>
 withPosition(u) {
   try {
-val resolvedTvf = resolveBuiltinOrTempTableFunction(u.name, 
u.functionArgs).getOrElse {
+val resolvedFunc = resolveBuiltinOrTempTableFunction(u.name, 
u.functionArgs).getOrElse {
   val CatalogAndIdentifier(catalog, ident) = 
expandIdentifier(u.name)
   if (CatalogV2Util.isSessionCatalog(catalog)) {
 v1SessionCatalog.resolvePersistentTableFunction(
@@ -2092,93 +2092,19 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
   catalog, "table-valued functions")
   }
 }
-// Resolve Python UDTF calls if needed.
-val resolvedFunc = 

[spark] branch branch-3.5 updated: [SPARK-45360][SQL][CONNECT] Initialize spark session builder configuration from SPARK_REMOTE

2023-09-28 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 917bc8cb927 [SPARK-45360][SQL][CONNECT] Initialize spark session 
builder configuration from SPARK_REMOTE
917bc8cb927 is described below

commit 917bc8cb92728267fb93891f4ef9da13c06e4589
Author: Yihong He 
AuthorDate: Thu Sep 28 12:58:07 2023 -0400

[SPARK-45360][SQL][CONNECT] Initialize spark session builder configuration 
from SPARK_REMOTE

### What changes were proposed in this pull request?

- Initialize spark session builder configuration from SPARK_REMOTE

### Why are the changes needed?

- `SparkSession.builder().getOrCreate()` should follow the behavior 
documents 
[here](https://github.com/apache/spark/blob/2cc1ee4d3a05a641d7a245f015ef824d8f7bae8b/docs/spark-connect-overview.md?plain=1#L241-L244)
 and support initialization from SPARK_REMOTE

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

- `build/sbt "connect-client-jvm/testOnly *SparkConnectClientSuite"`

### Was this patch authored or co-authored using generative AI tooling?

Closes #43153 from heyihong/SPARK-45360.

Authored-by: Yihong He 
Signed-off-by: Herman van Hovell 
(cherry picked from commit 183a3d761f36d35572cfb37ab921b6a86f8f28ed)
Signed-off-by: Herman van Hovell 
---
 .../scala/org/apache/spark/sql/SparkSession.scala  |  5 +-
 .../connect/client/SparkConnectClientSuite.scala   | 61 ++
 2 files changed, 65 insertions(+), 1 deletion(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 7bd8fa59aea..421f37b9e8a 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -783,7 +783,10 @@ object SparkSession extends Logging {
   }
 
   class Builder() extends Logging {
-private val builder = SparkConnectClient.builder()
+// Initialize the connection string of the Spark Connect client builder 
from SPARK_REMOTE
+// by default, if it exists. The connection string can be overridden using
+// the remote() function, as it takes precedence over the SPARK_REMOTE 
environment variable.
+private val builder = SparkConnectClient.builder().loadFromEnvironment()
 private var client: SparkConnectClient = _
 private[this] val options = new scala.collection.mutable.HashMap[String, 
String]
 
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
index 80e245ec78b..89acc2c60ac 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
@@ -86,6 +86,24 @@ class SparkConnectClientSuite extends ConnectFunSuite with 
BeforeAndAfterEach {
 assert(response.getSessionId === "abc123")
   }
 
+  private def withEnvs(pairs: (String, String)*)(f: => Unit): Unit = {
+val readonlyEnv = System.getenv()
+val field = readonlyEnv.getClass.getDeclaredField("m")
+field.setAccessible(true)
+val modifiableEnv = 
field.get(readonlyEnv).asInstanceOf[java.util.Map[String, String]]
+try {
+  for ((k, v) <- pairs) {
+assert(!modifiableEnv.containsKey(k))
+modifiableEnv.put(k, v)
+  }
+  f
+} finally {
+  for ((k, _) <- pairs) {
+modifiableEnv.remove(k)
+  }
+}
+  }
+
   test("Test connection") {
 testClientConnection() { testPort => 
SparkConnectClient.builder().port(testPort).build() }
   }
@@ -112,6 +130,49 @@ class SparkConnectClientSuite extends ConnectFunSuite with 
BeforeAndAfterEach {
 }
   }
 
+  test("SparkSession create with SPARK_REMOTE") {
+startDummyServer(0)
+
+withEnvs("SPARK_REMOTE" -> s"sc://localhost:${server.getPort}") {
+  val session = SparkSession.builder().create()
+  val df = session.range(10)
+  df.analyze // Trigger RPC
+  assert(df.plan === service.getAndClearLatestInputPlan())
+
+  val session2 = SparkSession.builder().create()
+  assert(session != session2)
+}
+  }
+
+  test("SparkSession getOrCreate with SPARK_REMOTE") {
+startDummyServer(0)
+
+withEnvs("SPARK_REMOTE" -> s"sc://localhost:${server.getPort}") {
+  val session = SparkSession.builder().getOrCreate()
+
+  val df = session.range(10)
+  df.analyze // Trigger RPC
+  

[spark] branch master updated: [SPARK-45360][SQL][CONNECT] Initialize spark session builder configuration from SPARK_REMOTE

2023-09-28 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 183a3d761f3 [SPARK-45360][SQL][CONNECT] Initialize spark session 
builder configuration from SPARK_REMOTE
183a3d761f3 is described below

commit 183a3d761f36d35572cfb37ab921b6a86f8f28ed
Author: Yihong He 
AuthorDate: Thu Sep 28 12:58:07 2023 -0400

[SPARK-45360][SQL][CONNECT] Initialize spark session builder configuration 
from SPARK_REMOTE

### What changes were proposed in this pull request?

- Initialize spark session builder configuration from SPARK_REMOTE

### Why are the changes needed?

- `SparkSession.builder().getOrCreate()` should follow the behavior 
documents 
[here](https://github.com/apache/spark/blob/2cc1ee4d3a05a641d7a245f015ef824d8f7bae8b/docs/spark-connect-overview.md?plain=1#L241-L244)
 and support initialization from SPARK_REMOTE

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

- `build/sbt "connect-client-jvm/testOnly *SparkConnectClientSuite"`

### Was this patch authored or co-authored using generative AI tooling?

Closes #43153 from heyihong/SPARK-45360.

Authored-by: Yihong He 
Signed-off-by: Herman van Hovell 
---
 .../scala/org/apache/spark/sql/SparkSession.scala  |  5 +-
 .../connect/client/SparkConnectClientSuite.scala   | 61 ++
 2 files changed, 65 insertions(+), 1 deletion(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index abe4d6a96e6..42052e3f8e6 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -786,7 +786,10 @@ object SparkSession extends Logging {
   }
 
   class Builder() extends Logging {
-private val builder = SparkConnectClient.builder()
+// Initialize the connection string of the Spark Connect client builder 
from SPARK_REMOTE
+// by default, if it exists. The connection string can be overridden using
+// the remote() function, as it takes precedence over the SPARK_REMOTE 
environment variable.
+private val builder = SparkConnectClient.builder().loadFromEnvironment()
 private var client: SparkConnectClient = _
 private[this] val options = new scala.collection.mutable.HashMap[String, 
String]
 
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
index 488118d0552..57e0b4016f1 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
@@ -86,6 +86,24 @@ class SparkConnectClientSuite extends ConnectFunSuite with 
BeforeAndAfterEach {
 assert(response.getSessionId === "abc123")
   }
 
+  private def withEnvs(pairs: (String, String)*)(f: => Unit): Unit = {
+val readonlyEnv = System.getenv()
+val field = readonlyEnv.getClass.getDeclaredField("m")
+field.setAccessible(true)
+val modifiableEnv = 
field.get(readonlyEnv).asInstanceOf[java.util.Map[String, String]]
+try {
+  for ((k, v) <- pairs) {
+assert(!modifiableEnv.containsKey(k))
+modifiableEnv.put(k, v)
+  }
+  f
+} finally {
+  for ((k, _) <- pairs) {
+modifiableEnv.remove(k)
+  }
+}
+  }
+
   test("Test connection") {
 testClientConnection() { testPort => 
SparkConnectClient.builder().port(testPort).build() }
   }
@@ -112,6 +130,49 @@ class SparkConnectClientSuite extends ConnectFunSuite with 
BeforeAndAfterEach {
 }
   }
 
+  test("SparkSession create with SPARK_REMOTE") {
+startDummyServer(0)
+
+withEnvs("SPARK_REMOTE" -> s"sc://localhost:${server.getPort}") {
+  val session = SparkSession.builder().create()
+  val df = session.range(10)
+  df.analyze // Trigger RPC
+  assert(df.plan === service.getAndClearLatestInputPlan())
+
+  val session2 = SparkSession.builder().create()
+  assert(session != session2)
+}
+  }
+
+  test("SparkSession getOrCreate with SPARK_REMOTE") {
+startDummyServer(0)
+
+withEnvs("SPARK_REMOTE" -> s"sc://localhost:${server.getPort}") {
+  val session = SparkSession.builder().getOrCreate()
+
+  val df = session.range(10)
+  df.analyze // Trigger RPC
+  assert(df.plan === service.getAndClearLatestInputPlan())
+
+  val session2 = SparkSession.builder().getOrCreate()
+  

[spark] branch master updated: [SPARK-45364][INFRA][BUILD] Clean up the unnecessary Scala 2.12 logical in SparkBuild

2023-09-28 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 187e9a85175 [SPARK-45364][INFRA][BUILD] Clean up the unnecessary Scala 
2.12 logical in SparkBuild
187e9a85175 is described below

commit 187e9a851758c0e9cec11edab2bc07d6f4404001
Author: panbingkun 
AuthorDate: Thu Sep 28 08:36:08 2023 -0500

[SPARK-45364][INFRA][BUILD] Clean up the unnecessary Scala 2.12 logical in 
SparkBuild

### What changes were proposed in this pull request?
The pr aims to clean up the unnecessary Scala 2.12 logical in SparkBuild.

### Why are the changes needed?
Spark 4.0 no longer supports Scala 2.12.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #43158 from panbingkun/SPARK-45364.

Authored-by: panbingkun 
Signed-off-by: Sean Owen 
---
 project/SparkBuild.scala | 5 +
 1 file changed, 1 insertion(+), 4 deletions(-)

diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 85ffda304bc..13c92142d46 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -352,10 +352,7 @@ object SparkBuild extends PomBuild {
 "org.apache.spark.util.collection"
   ).mkString(":"),
   "-doc-title", "Spark " + version.value.replaceAll("-SNAPSHOT", "") + " 
ScalaDoc"
-) ++ {
-  // Do not attempt to scaladoc javadoc comments under 2.12 since it can't 
handle inner classes
-  if (scalaBinaryVersion.value == "2.12") Seq("-no-java-comments") else 
Seq.empty
-},
+),
 
 // disable Mima check for all modules,
 // to be enabled in specific ones that have previous artifacts


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org