[spark] branch branch-3.4 updated: [SPARK-45227][CORE] Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new e706ba1cd4b [SPARK-45227][CORE] Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend e706ba1cd4b is described below commit e706ba1cd4b94bcb215355721914002748590d46 Author: Bo Xiong AuthorDate: Thu Sep 28 22:53:37 2023 -0500 [SPARK-45227][CORE] Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend ### What changes were proposed in this pull request? Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck ### Why are the changes needed? For each executor, the single-threaded dispatcher can run into an "infinite loop" (as explained in the SPARK-45227). Once an executor process runs into a state, it'd stop launching tasks from the driver or reporting task status back. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ``` $ build/mvn package -DskipTests -pl core $ build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite test ``` ### Was this patch authored or co-authored using generative AI tooling? No ** **_Please feel free to skip reading unless you're interested in details_** ** ### Symptom Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the very last step of writing a data frame to S3 by calling `df.write`. Looking at Spark UI, we saw that an executor process hung over 1 hour. After we manually killed the executor process, the app succeeded. Note that the same EMR cluster with two worker nodes was able to run the same app without any issue before and after the incident. Below is what's observed from relevant container logs and thread dump. - A regular task that's sent to the executor, which also reported back to the driver upon the task completion. ``` $zgrep 'task 150' container_1694029806204_12865_01_01/stderr.gz 23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 23.0 (TID 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map() 23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 23.0 (TID 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200) $zgrep ' 923' container_1694029806204_12865_01_04/stderr.gz 23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 923 $zgrep 'task 150' container_1694029806204_12865_01_04/stderr.gz 23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 923) 23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 923). 4495 bytes result sent to driver ``` - Another task that's sent to the executor but didn't get launched since the single-threaded dispatcher was stuck (presumably in an "infinite loop" as explained later). ``` $zgrep 'task 153' container_1694029806204_12865_01_01/stderr.gz 23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 23.0 (TID 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map() $zgrep ' 924' container_1694029806204_12865_01_04/stderr.gz 23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 924 $zgrep 'task 153' container_1694029806204_12865_01_04/stderr.gz >> note that the above command has no matching result, indicating that task 153.0 in stage 23.0 (TID 924) was never launched ``` - Thread dump shows that the dispatcher-Executor thread has the following stack trace. ``` "dispatcher-Executor" #40 daemon prio=5 os_prio=0 tid=0x98e37800 nid=0x1aff runnable [0x73bba000] java.lang.Thread.State: RUNNABLE at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142) at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131) at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123) at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365) at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365) at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44) at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140) at scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169) at
[spark] branch branch-3.5 updated: [SPARK-45227][CORE] Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 5cdb4ab6f0b [SPARK-45227][CORE] Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend 5cdb4ab6f0b is described below commit 5cdb4ab6f0b7aea7d890ee7dff61350671a09e79 Author: Bo Xiong AuthorDate: Thu Sep 28 22:53:37 2023 -0500 [SPARK-45227][CORE] Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend ### What changes were proposed in this pull request? Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck ### Why are the changes needed? For each executor, the single-threaded dispatcher can run into an "infinite loop" (as explained in the SPARK-45227). Once an executor process runs into a state, it'd stop launching tasks from the driver or reporting task status back. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ``` $ build/mvn package -DskipTests -pl core $ build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite test ``` ### Was this patch authored or co-authored using generative AI tooling? No ** **_Please feel free to skip reading unless you're interested in details_** ** ### Symptom Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the very last step of writing a data frame to S3 by calling `df.write`. Looking at Spark UI, we saw that an executor process hung over 1 hour. After we manually killed the executor process, the app succeeded. Note that the same EMR cluster with two worker nodes was able to run the same app without any issue before and after the incident. Below is what's observed from relevant container logs and thread dump. - A regular task that's sent to the executor, which also reported back to the driver upon the task completion. ``` $zgrep 'task 150' container_1694029806204_12865_01_01/stderr.gz 23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 23.0 (TID 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map() 23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 23.0 (TID 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200) $zgrep ' 923' container_1694029806204_12865_01_04/stderr.gz 23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 923 $zgrep 'task 150' container_1694029806204_12865_01_04/stderr.gz 23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 923) 23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 923). 4495 bytes result sent to driver ``` - Another task that's sent to the executor but didn't get launched since the single-threaded dispatcher was stuck (presumably in an "infinite loop" as explained later). ``` $zgrep 'task 153' container_1694029806204_12865_01_01/stderr.gz 23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 23.0 (TID 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map() $zgrep ' 924' container_1694029806204_12865_01_04/stderr.gz 23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 924 $zgrep 'task 153' container_1694029806204_12865_01_04/stderr.gz >> note that the above command has no matching result, indicating that task 153.0 in stage 23.0 (TID 924) was never launched ``` - Thread dump shows that the dispatcher-Executor thread has the following stack trace. ``` "dispatcher-Executor" #40 daemon prio=5 os_prio=0 tid=0x98e37800 nid=0x1aff runnable [0x73bba000] java.lang.Thread.State: RUNNABLE at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142) at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131) at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123) at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365) at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365) at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44) at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140) at scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169) at
[spark] branch master updated: [SPARK-45227][CORE] Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8e6b1603a66 [SPARK-45227][CORE] Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend 8e6b1603a66 is described below commit 8e6b1603a66706ee27a0b16d850f5ee56d633354 Author: Bo Xiong AuthorDate: Thu Sep 28 22:53:37 2023 -0500 [SPARK-45227][CORE] Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend ### What changes were proposed in this pull request? Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck ### Why are the changes needed? For each executor, the single-threaded dispatcher can run into an "infinite loop" (as explained in the SPARK-45227). Once an executor process runs into a state, it'd stop launching tasks from the driver or reporting task status back. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ``` $ build/mvn package -DskipTests -pl core $ build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite test ``` ### Was this patch authored or co-authored using generative AI tooling? No ** **_Please feel free to skip reading unless you're interested in details_** ** ### Symptom Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the very last step of writing a data frame to S3 by calling `df.write`. Looking at Spark UI, we saw that an executor process hung over 1 hour. After we manually killed the executor process, the app succeeded. Note that the same EMR cluster with two worker nodes was able to run the same app without any issue before and after the incident. Below is what's observed from relevant container logs and thread dump. - A regular task that's sent to the executor, which also reported back to the driver upon the task completion. ``` $zgrep 'task 150' container_1694029806204_12865_01_01/stderr.gz 23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 23.0 (TID 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map() 23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 23.0 (TID 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200) $zgrep ' 923' container_1694029806204_12865_01_04/stderr.gz 23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 923 $zgrep 'task 150' container_1694029806204_12865_01_04/stderr.gz 23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 923) 23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 923). 4495 bytes result sent to driver ``` - Another task that's sent to the executor but didn't get launched since the single-threaded dispatcher was stuck (presumably in an "infinite loop" as explained later). ``` $zgrep 'task 153' container_1694029806204_12865_01_01/stderr.gz 23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 23.0 (TID 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map() $zgrep ' 924' container_1694029806204_12865_01_04/stderr.gz 23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 924 $zgrep 'task 153' container_1694029806204_12865_01_04/stderr.gz >> note that the above command has no matching result, indicating that task 153.0 in stage 23.0 (TID 924) was never launched ``` - Thread dump shows that the dispatcher-Executor thread has the following stack trace. ``` "dispatcher-Executor" #40 daemon prio=5 os_prio=0 tid=0x98e37800 nid=0x1aff runnable [0x73bba000] java.lang.Thread.State: RUNNABLE at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142) at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131) at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123) at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365) at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365) at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44) at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140) at scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169) at
[spark] branch master updated: [SPARK-45276][INFRA] Replace Java 8 and Java 11 installed in the Dockerfile with Java
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 1d9905ab428 [SPARK-45276][INFRA] Replace Java 8 and Java 11 installed in the Dockerfile with Java 1d9905ab428 is described below commit 1d9905ab4289ab4407dd93487dd1f8f036938a59 Author: panbingkun AuthorDate: Fri Sep 29 09:16:05 2023 +0800 [SPARK-45276][INFRA] Replace Java 8 and Java 11 installed in the Dockerfile with Java ### What changes were proposed in this pull request? The pr aims to replace Java 8 and Java 11 installed in the Dockerfile, include: - dev/create-release/spark-rm/Dockerfile - connector/docker/spark-test/base/Dockerfile ### Why are the changes needed? After SPARK-44112, the minimum supported Java version for Apache Spark 4.0 is Java 17. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually test: > 1.dev/create-release/spark-rm/Dockerfile ``` cd dev/create-release/spark-rm docker build -t spark-rm --build-arg UID=$UID . ``` > 2. connector/docker/spark-test/base/Dockerfile ``` cd connector/docker sh build ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43076 from panbingkun/SPARK-45293. Authored-by: panbingkun Signed-off-by: yangjie01 --- connector/docker/spark-test/base/Dockerfile | 2 +- dev/create-release/do-release-docker.sh | 2 +- dev/create-release/spark-rm/Dockerfile | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/connector/docker/spark-test/base/Dockerfile b/connector/docker/spark-test/base/Dockerfile index d4a30c4681c..0e8593f8af5 100644 --- a/connector/docker/spark-test/base/Dockerfile +++ b/connector/docker/spark-test/base/Dockerfile @@ -22,7 +22,7 @@ FROM ubuntu:20.04 # Remove unneeded /var/lib/apt/lists/* after install to reduce the # docker image size (by ~30MB) RUN apt-get update && \ -apt-get install -y less openjdk-11-jre-headless iproute2 vim-tiny sudo openssh-server && \ +apt-get install -y less openjdk-17-jre-headless iproute2 vim-tiny sudo openssh-server && \ rm -rf /var/lib/apt/lists/* ENV SPARK_HOME /opt/spark diff --git a/dev/create-release/do-release-docker.sh b/dev/create-release/do-release-docker.sh index 88398bc14dd..c44d0193069 100755 --- a/dev/create-release/do-release-docker.sh +++ b/dev/create-release/do-release-docker.sh @@ -45,7 +45,7 @@ Options are: -n : dry run mode. Performs checks and local builds, but do not upload anything. -t [tag]: tag for the spark-rm docker image to use for building (default: "latest"). -j [path] : path to local JDK installation to use for building. By default the script will -use openjdk8 installed in the docker image. +use openjdk17 installed in the docker image. -s [step] : runs a single step of the process; valid steps are: tag, build, docs, publish, finalize EOF } diff --git a/dev/create-release/spark-rm/Dockerfile b/dev/create-release/spark-rm/Dockerfile index 85155b67bd5..50562e38fb5 100644 --- a/dev/create-release/spark-rm/Dockerfile +++ b/dev/create-release/spark-rm/Dockerfile @@ -61,9 +61,9 @@ RUN apt-get clean && apt-get update && $APT_INSTALL gnupg ca-certificates && \ apt-get update && \ $APT_INSTALL software-properties-common && \ apt-get update && \ - # Install openjdk 8. - $APT_INSTALL openjdk-8-jdk && \ - update-alternatives --set java $(ls /usr/lib/jvm/java-8-openjdk-*/jre/bin/java) && \ + # Install openjdk 17. + $APT_INSTALL openjdk-17-jdk && \ + update-alternatives --set java $(ls /usr/lib/jvm/java-17-openjdk-*/bin/java) && \ # Install build / source control tools $APT_INSTALL curl wget git maven ivy subversion make gcc lsof libffi-dev \ pandoc pandoc-citeproc libssl-dev libcurl4-openssl-dev libxml2-dev && \ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45338][SQL][FOLLOWUP] Remove useless `toSeq`
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5c4aef4d4ca [SPARK-45338][SQL][FOLLOWUP] Remove useless `toSeq` 5c4aef4d4ca is described below commit 5c4aef4d4caf753ce9c45d07472df67479371738 Author: Jia Fan AuthorDate: Thu Sep 28 19:10:03 2023 -0500 [SPARK-45338][SQL][FOLLOWUP] Remove useless `toSeq` ### What changes were proposed in this pull request? This is a follow up PR for #43126 , remove useless invoke `toSeq` ### Why are the changes needed? Remove useless convert. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? exist test ### Was this patch authored or co-authored using generative AI tooling? No Closes #43172 from Hisoka-X/SPARK-45338-followup-remove-toseq. Authored-by: Jia Fan Signed-off-by: Sean Owen --- .../apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala | 2 +- .../apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala index 990a7162ea4..5dd8caf3f22 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala @@ -87,7 +87,7 @@ private[hive] class SparkGetColumnsOperation( }.toMap if (isAuthV2Enabled) { - val privObjs = getPrivObjs(db2Tabs).toSeq.asJava + val privObjs = getPrivObjs(db2Tabs).asJava authorizeMetaGets(HiveOperationType.GET_COLUMNS, privObjs, cmdStr) } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala index 7fa492befa0..53a94a128c0 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala @@ -68,7 +68,7 @@ private[hive] class SparkGetFunctionsOperation( if (isAuthV2Enabled) { // authorize this call on the schema objects val privObjs = -HivePrivilegeObjectUtils.getHivePrivDbObjects(matchingDbs.toSeq.asJava) +HivePrivilegeObjectUtils.getHivePrivDbObjects(matchingDbs.asJava) authorizeMetaGets(HiveOperationType.GET_FUNCTIONS, privObjs, cmdStr) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated: [SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 68db395867a [SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false 68db395867a is described below commit 68db395867a3292e9261dd8a3dc191754e1645ef Author: Warren Zhu AuthorDate: Thu Sep 28 18:51:33 2023 -0500 [SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false ### What changes were proposed in this pull request? Add `keepReadLock` parameter in `lockNewBlockForWriting()`. When `keepReadLock` is `false`, skip `lockForReading()` to avoid block on read Lock or potential deadlock issue. When 2 tasks try to compute same rdd with replication level of 2 and running on only 2 executors. Deadlock will happen. Details refer [SPARK-45057] Task thread hold write lock and waiting for replication to remote executor while shuffle server thread which handling block upload request waiting on `lockForReading` in [BlockInfoManager.scala](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala#L457C24-L457C24) ### Why are the changes needed? This could save unnecessary read lock acquire and avoid deadlock issue mention above. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT in BlockInfoManagerSuite ### Was this patch authored or co-authored using generative AI tooling? No Closes #43067 from warrenzhu25/deadlock. Authored-by: Warren Zhu Signed-off-by: Mridul Muralidharan gmail.com> (cherry picked from commit 0d6fda5bbee99f9d1821952195efc6764816ec2f) Signed-off-by: Mridul Muralidharan --- .../scala/org/apache/spark/storage/BlockInfoManager.scala | 11 +++ .../main/scala/org/apache/spark/storage/BlockManager.scala | 6 +- .../org/apache/spark/storage/BlockInfoManagerSuite.scala | 14 ++ 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index 9eb1418fd16..d89e6682adf 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -383,13 +383,14 @@ private[storage] class BlockInfoManager extends Logging { * then just go ahead and acquire the write lock. Otherwise, if another thread is already * writing the block, then we wait for the write to finish before acquiring the read lock. * - * @return true if the block did not already exist, false otherwise. If this returns false, then - * a read lock on the existing block will be held. If this returns true, a write lock on - * the new block will be held. + * @return true if the block did not already exist, false otherwise. + * If this returns true, a write lock on the new block will be held. + * If this returns false then a read lock will be held iff keepReadLock == true. */ def lockNewBlockForWriting( blockId: BlockId, - newBlockInfo: BlockInfo): Boolean = { + newBlockInfo: BlockInfo, + keepReadLock: Boolean = true): Boolean = { logTrace(s"Task $currentTaskAttemptId trying to put $blockId") // Get the lock that will be associated with the to-be written block and lock it for the entire // duration of this operation. This way we prevent race conditions when two threads try to write @@ -405,6 +406,8 @@ private[storage] class BlockInfoManager extends Logging { val result = lockForWriting(blockId, blocking = false) assert(result.isDefined) return true +} else if (!keepReadLock) { + return false } else { // Block already exists. This could happen if another thread races with us to compute // the same block. In this case we try to acquire a read lock, if the locking succeeds diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 06c2e615fbc..389fbeb90f5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1444,14 +1444,10 @@ private[spark] class BlockManager( val putBlockInfo = { val newInfo = new BlockInfo(level, classTag, tellMaster) - if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) { + if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo, keepReadLock)) { newInfo } else { logWarning(s"Block $blockId already exists on this machine; not re-adding it") -
[spark] branch master updated: [SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0d6fda5bbee [SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false 0d6fda5bbee is described below commit 0d6fda5bbee99f9d1821952195efc6764816ec2f Author: Warren Zhu AuthorDate: Thu Sep 28 18:51:33 2023 -0500 [SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false ### What changes were proposed in this pull request? Add `keepReadLock` parameter in `lockNewBlockForWriting()`. When `keepReadLock` is `false`, skip `lockForReading()` to avoid block on read Lock or potential deadlock issue. When 2 tasks try to compute same rdd with replication level of 2 and running on only 2 executors. Deadlock will happen. Details refer [SPARK-45057] Task thread hold write lock and waiting for replication to remote executor while shuffle server thread which handling block upload request waiting on `lockForReading` in [BlockInfoManager.scala](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala#L457C24-L457C24) ### Why are the changes needed? This could save unnecessary read lock acquire and avoid deadlock issue mention above. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT in BlockInfoManagerSuite ### Was this patch authored or co-authored using generative AI tooling? No Closes #43067 from warrenzhu25/deadlock. Authored-by: Warren Zhu Signed-off-by: Mridul Muralidharan gmail.com> --- .../scala/org/apache/spark/storage/BlockInfoManager.scala | 11 +++ .../main/scala/org/apache/spark/storage/BlockManager.scala | 6 +- .../org/apache/spark/storage/BlockInfoManagerSuite.scala | 14 ++ 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index 8dccfbc5e2d..f80190c96e8 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -415,13 +415,14 @@ private[storage] class BlockInfoManager(trackingCacheVisibility: Boolean = false * then just go ahead and acquire the write lock. Otherwise, if another thread is already * writing the block, then we wait for the write to finish before acquiring the read lock. * - * @return true if the block did not already exist, false otherwise. If this returns false, then - * a read lock on the existing block will be held. If this returns true, a write lock on - * the new block will be held. + * @return true if the block did not already exist, false otherwise. + * If this returns true, a write lock on the new block will be held. + * If this returns false then a read lock will be held iff keepReadLock == true. */ def lockNewBlockForWriting( blockId: BlockId, - newBlockInfo: BlockInfo): Boolean = { + newBlockInfo: BlockInfo, + keepReadLock: Boolean = true): Boolean = { logTrace(s"Task $currentTaskAttemptId trying to put $blockId") // Get the lock that will be associated with the to-be written block and lock it for the entire // duration of this operation. This way we prevent race conditions when two threads try to write @@ -449,6 +450,8 @@ private[storage] class BlockInfoManager(trackingCacheVisibility: Boolean = false val result = lockForWriting(blockId, blocking = false) assert(result.isDefined) return true +} else if (!keepReadLock) { + return false } else { // Block already exists. This could happen if another thread races with us to compute // the same block. In this case we try to acquire a read lock, if the locking succeeds diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 81933744472..cccee78aee1 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1510,14 +1510,10 @@ private[spark] class BlockManager( val putBlockInfo = { val newInfo = new BlockInfo(level, classTag, tellMaster) - if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) { + if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo, keepReadLock)) { newInfo } else { logWarning(s"Block $blockId already exists on this machine; not re-adding it") -if (!keepReadLock) { - // lockNewBlockForWriting
[spark] branch branch-3.3 updated: [SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 48138eb1a35 [SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false 48138eb1a35 is described below commit 48138eb1a3561bcdbfb44a94cf94a481a8bcec98 Author: Warren Zhu AuthorDate: Thu Sep 28 18:51:33 2023 -0500 [SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false ### What changes were proposed in this pull request? Add `keepReadLock` parameter in `lockNewBlockForWriting()`. When `keepReadLock` is `false`, skip `lockForReading()` to avoid block on read Lock or potential deadlock issue. When 2 tasks try to compute same rdd with replication level of 2 and running on only 2 executors. Deadlock will happen. Details refer [SPARK-45057] Task thread hold write lock and waiting for replication to remote executor while shuffle server thread which handling block upload request waiting on `lockForReading` in [BlockInfoManager.scala](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala#L457C24-L457C24) ### Why are the changes needed? This could save unnecessary read lock acquire and avoid deadlock issue mention above. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT in BlockInfoManagerSuite ### Was this patch authored or co-authored using generative AI tooling? No Closes #43067 from warrenzhu25/deadlock. Authored-by: Warren Zhu Signed-off-by: Mridul Muralidharan gmail.com> (cherry picked from commit 0d6fda5bbee99f9d1821952195efc6764816ec2f) Signed-off-by: Mridul Muralidharan --- .../scala/org/apache/spark/storage/BlockInfoManager.scala | 11 +++ .../main/scala/org/apache/spark/storage/BlockManager.scala | 6 +- .../org/apache/spark/storage/BlockInfoManagerSuite.scala | 14 ++ 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index 9eb1418fd16..d89e6682adf 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -383,13 +383,14 @@ private[storage] class BlockInfoManager extends Logging { * then just go ahead and acquire the write lock. Otherwise, if another thread is already * writing the block, then we wait for the write to finish before acquiring the read lock. * - * @return true if the block did not already exist, false otherwise. If this returns false, then - * a read lock on the existing block will be held. If this returns true, a write lock on - * the new block will be held. + * @return true if the block did not already exist, false otherwise. + * If this returns true, a write lock on the new block will be held. + * If this returns false then a read lock will be held iff keepReadLock == true. */ def lockNewBlockForWriting( blockId: BlockId, - newBlockInfo: BlockInfo): Boolean = { + newBlockInfo: BlockInfo, + keepReadLock: Boolean = true): Boolean = { logTrace(s"Task $currentTaskAttemptId trying to put $blockId") // Get the lock that will be associated with the to-be written block and lock it for the entire // duration of this operation. This way we prevent race conditions when two threads try to write @@ -405,6 +406,8 @@ private[storage] class BlockInfoManager extends Logging { val result = lockForWriting(blockId, blocking = false) assert(result.isDefined) return true +} else if (!keepReadLock) { + return false } else { // Block already exists. This could happen if another thread races with us to compute // the same block. In this case we try to acquire a read lock, if the locking succeeds diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f35131ee5b9..cdd91fb4c07 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1432,14 +1432,10 @@ private[spark] class BlockManager( val putBlockInfo = { val newInfo = new BlockInfo(level, classTag, tellMaster) - if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) { + if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo, keepReadLock)) { newInfo } else { logWarning(s"Block $blockId already exists on this machine; not re-adding it") -
[spark] branch branch-3.5 updated: [SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new aeba488ccd9 [SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false aeba488ccd9 is described below commit aeba488ccd9213d28e6401d1bf7eadfdb9d955c3 Author: Warren Zhu AuthorDate: Thu Sep 28 18:51:33 2023 -0500 [SPARK-45057][CORE] Avoid acquire read lock when keepReadLock is false ### What changes were proposed in this pull request? Add `keepReadLock` parameter in `lockNewBlockForWriting()`. When `keepReadLock` is `false`, skip `lockForReading()` to avoid block on read Lock or potential deadlock issue. When 2 tasks try to compute same rdd with replication level of 2 and running on only 2 executors. Deadlock will happen. Details refer [SPARK-45057] Task thread hold write lock and waiting for replication to remote executor while shuffle server thread which handling block upload request waiting on `lockForReading` in [BlockInfoManager.scala](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala#L457C24-L457C24) ### Why are the changes needed? This could save unnecessary read lock acquire and avoid deadlock issue mention above. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT in BlockInfoManagerSuite ### Was this patch authored or co-authored using generative AI tooling? No Closes #43067 from warrenzhu25/deadlock. Authored-by: Warren Zhu Signed-off-by: Mridul Muralidharan gmail.com> (cherry picked from commit 0d6fda5bbee99f9d1821952195efc6764816ec2f) Signed-off-by: Mridul Muralidharan --- .../scala/org/apache/spark/storage/BlockInfoManager.scala | 11 +++ .../main/scala/org/apache/spark/storage/BlockManager.scala | 6 +- .../org/apache/spark/storage/BlockInfoManagerSuite.scala | 14 ++ 3 files changed, 22 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala index 45ebb6eafa6..ab4073fe8c0 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala @@ -415,13 +415,14 @@ private[storage] class BlockInfoManager(trackingCacheVisibility: Boolean = false * then just go ahead and acquire the write lock. Otherwise, if another thread is already * writing the block, then we wait for the write to finish before acquiring the read lock. * - * @return true if the block did not already exist, false otherwise. If this returns false, then - * a read lock on the existing block will be held. If this returns true, a write lock on - * the new block will be held. + * @return true if the block did not already exist, false otherwise. + * If this returns true, a write lock on the new block will be held. + * If this returns false then a read lock will be held iff keepReadLock == true. */ def lockNewBlockForWriting( blockId: BlockId, - newBlockInfo: BlockInfo): Boolean = { + newBlockInfo: BlockInfo, + keepReadLock: Boolean = true): Boolean = { logTrace(s"Task $currentTaskAttemptId trying to put $blockId") // Get the lock that will be associated with the to-be written block and lock it for the entire // duration of this operation. This way we prevent race conditions when two threads try to write @@ -449,6 +450,8 @@ private[storage] class BlockInfoManager(trackingCacheVisibility: Boolean = false val result = lockForWriting(blockId, blocking = false) assert(result.isDefined) return true +} else if (!keepReadLock) { + return false } else { // Block already exists. This could happen if another thread races with us to compute // the same block. In this case we try to acquire a read lock, if the locking succeeds diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 05d57c67576..6de6069d2fe 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1510,14 +1510,10 @@ private[spark] class BlockManager( val putBlockInfo = { val newInfo = new BlockInfo(level, classTag, tellMaster) - if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo)) { + if (blockInfoManager.lockNewBlockForWriting(blockId, newInfo, keepReadLock)) { newInfo } else { logWarning(s"Block $blockId
[spark] branch master updated: [SPARK-44937][CORE] Add test keys for SSL functionality
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d52561cfb1f [SPARK-44937][CORE] Add test keys for SSL functionality d52561cfb1f is described below commit d52561cfb1f19bf4e02510f3ac71e6a04533b347 Author: Hasnain Lakhani AuthorDate: Thu Sep 28 18:32:06 2023 -0500 [SPARK-44937][CORE] Add test keys for SSL functionality ### What changes were proposed in this pull request? This PR introduces test keys for SSL functionality. They keys were generated using something like the following: ``` openssl genpkey -algorithm RSA -out key.pem -hexseed deadbeef openssl pkcs8 -topk8 -in key.pem -out key.pem.out openssl req -new -key key.pem -out csr.csr -days 3650 -subj "/CN=test/ST=California/L=San Francisco/OU=Org1/O=Org2/C=US" openssl x509 -req -in csr.csr -signkey key.pem -out certchain.pem -days 3650 rm key.pem csr.csr mv key.pem.enc key.pem ``` And then copied to all the relevant folders. I also copied over the keystore and trustore files (did not regenerate those). ### Why are the changes needed? We need these test files to run tests using PEM keys for SSL connections. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? These test files will be used by follow up PRs. This was tested as part of https://github.com/apache/spark/pull/42685, whic is being split. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43163 from hasnain-db/spark-tls-test-files. Authored-by: Hasnain Lakhani Signed-off-by: Mridul Muralidharan gmail.com> --- .../src/test/resources/certchain.pem | 17 common/network-common/src/test/resources/key.pem | 29 + common/network-common/src/test/resources/keystore | Bin 0 -> 2247 bytes .../network-common/src/test/resources/truststore | Bin 0 -> 957 bytes .../src/test/resources/untrusted-keystore | Bin 0 -> 2246 bytes .../src/test/resources/certchain.pem | 17 common/network-shuffle/src/test/resources/key.pem | 29 + common/network-shuffle/src/test/resources/keystore | Bin 0 -> 2247 bytes .../network-shuffle/src/test/resources/truststore | Bin 0 -> 957 bytes .../src/test/resources/untrusted-keystore | Bin 0 -> 2246 bytes core/src/test/resources/certchain.pem | 17 core/src/test/resources/key.pem| 29 + .../yarn/src/test/resources/certchain.pem | 17 resource-managers/yarn/src/test/resources/key.pem | 29 + resource-managers/yarn/src/test/resources/keystore | Bin 0 -> 2247 bytes .../yarn/src/test/resources/truststore | Bin 0 -> 957 bytes .../yarn/src/test/resources/untrusted-keystore | Bin 0 -> 2246 bytes 17 files changed, 184 insertions(+) diff --git a/common/network-common/src/test/resources/certchain.pem b/common/network-common/src/test/resources/certchain.pem new file mode 100644 index 000..1004cacc9bf --- /dev/null +++ b/common/network-common/src/test/resources/certchain.pem @@ -0,0 +1,17 @@ +-BEGIN CERTIFICATE- +MIICsDCCAZgCCQD7yXTHZWZZlDANBgkqhkiG9w0BAQsFADAaMQswCQYDVQQGEwJV +UzELMAkGA1UECAwCQ0EwHhcNMjMwNjIwMTczMjAzWhcNMzMwNjE3MTczMjAzWjAa +MQswCQYDVQQGEwJVUzELMAkGA1UECAwCQ0EwggEiMA0GCSqGSIb3DQEBAQUAA4IB +DwAwggEKAoIBAQDH4DO8IP/7xZgpzmYrBaqzsnpamq54cXP8JdQUOXP/dmh8myGg +CUau/nNdpPNr1Od2iUvf1Z9OW+KcHdNAL/zcwe1ehU3d6/M+UinDtfbEb4HSyQ31 +9AIlPSUq+pJAlsAGJYERLGHPBNXEay0r0+TR0cd9CfSN79rXUMag40pZC3zdxXmY +JpSkhNuiYfa+Z9TgXoki5MzNiyH12gAb9tO8tr55BnE5s/QujOp7LMjlf6VkE7Bp +hqj1UbcHmFw7U9jyLDfi98uIvlEDFCwXARdmLxxaYAOqdgZ3TtjBvbugVRpRFQiw +haFzkiok9bh+MclKQBKvF0ArHmMLHkcCd5oPAgMBAAEwDQYJKoZIhvcNAQELBQAD +ggEBADYIPLwlnuH6rTbkjeZFYC2UXjNesbUe1TXbsBo9DDHJUSFjNNDDAUpSzhxb +q6nMvex7tnTvTjAgOQR/qwAueAfcXHWe0EKvn4Y6yJERepSsYg5bSYBt+UJxW89R +JRLmzBFxEJy1YhsqGCh+I2wRoRz8ZGokDyqcrAlwlzXYVDfNC4wUo14Cm+s90yc3 +2I/roX/MWec8QbEbr25psAYVnRdUL1mzCeQMc83A8Y0SDPfF5ECFhvFXkVaDTULO +RddXWJoC4K5RuGa6yvpb75I8VTE3fwE2ykSgPuMShNZREDCuszkpPjjFumq9pCOJ +nUO1huCqjxC1ehPe/9/jgmzoVX4= +-END CERTIFICATE- diff --git a/common/network-common/src/test/resources/key.pem b/common/network-common/src/test/resources/key.pem new file mode 100644 index 000..77122755bfd --- /dev/null +++ b/common/network-common/src/test/resources/key.pem @@ -0,0 +1,29 @@ +-BEGIN ENCRYPTED PRIVATE KEY- +MIIE6TAbBgkqhkiG9w0BBQMwDgQIGBIe7ugOgfACAggABIIEyJgkzYc/ixcvwLJC +eTzGOVwk+F1cqM4H63FOxIjroaxisceqoBmty6Rf4PJ1C9nprkSs6G/SkupbNUUB +YiWmsQ91orllbHsczAc+qaa0tmommwgt27ZrfXdXBxDB0mJWQTijkVHWfyTqcXmC
[spark] branch branch-3.3 updated: [SPARK-44937][CORE] Mark connection as timedOut in TransportClient.close
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new e965cc835c9 [SPARK-44937][CORE] Mark connection as timedOut in TransportClient.close e965cc835c9 is described below commit e965cc835c91c6f7a87ec7ed140e5cf34c445798 Author: Hasnain Lakhani AuthorDate: Thu Sep 28 18:16:49 2023 -0500 [SPARK-44937][CORE] Mark connection as timedOut in TransportClient.close ### What changes were proposed in this pull request? This PR avoids a race condition where a connection which is in the process of being closed could be returned by the TransportClientFactory only to be immediately closed and cause errors upon use. This race condition is rare and not easily triggered, but with the upcoming changes to introduce SSL connection support, connection closing can take just a slight bit longer and it's much easier to trigger this issue. Looking at the history of the code I believe this was an oversight in https://github.com/apache/spark/pull/9853. ### Why are the changes needed? Without this change, some of the new tests added in https://github.com/apache/spark/pull/42685 would fail ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests were run in CI. Without this change, some of the new tests added in https://github.com/apache/spark/pull/42685 fail ### Was this patch authored or co-authored using generative AI tooling? No Closes #43162 from hasnain-db/spark-tls-timeout. Authored-by: Hasnain Lakhani Signed-off-by: Mridul Muralidharan gmail.com> (cherry picked from commit 2a88feadd4b7cec9e01bc744e589783e3390e5ce) Signed-off-by: Mridul Muralidharan --- .../main/java/org/apache/spark/network/client/TransportClient.java | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java index dd2fdb08ee5..884df899981 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -325,7 +325,10 @@ public class TransportClient implements Closeable { @Override public void close() { -// close is a local operation and should finish with milliseconds; timeout just to be safe +// Mark the connection as timed out, so we do not return a connection that's being closed +// from the TransportClientFactory if closing takes some time (e.g. with SSL) +this.timedOut = true; +// close should not take this long; use a timeout just to be safe channel.close().awaitUninterruptibly(10, TimeUnit.SECONDS); } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.4 updated: [SPARK-44937][CORE] Mark connection as timedOut in TransportClient.close
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 85bf705a93f [SPARK-44937][CORE] Mark connection as timedOut in TransportClient.close 85bf705a93f is described below commit 85bf705a93f17767bf8653b0ce5a144af5a21e95 Author: Hasnain Lakhani AuthorDate: Thu Sep 28 18:16:49 2023 -0500 [SPARK-44937][CORE] Mark connection as timedOut in TransportClient.close ### What changes were proposed in this pull request? This PR avoids a race condition where a connection which is in the process of being closed could be returned by the TransportClientFactory only to be immediately closed and cause errors upon use. This race condition is rare and not easily triggered, but with the upcoming changes to introduce SSL connection support, connection closing can take just a slight bit longer and it's much easier to trigger this issue. Looking at the history of the code I believe this was an oversight in https://github.com/apache/spark/pull/9853. ### Why are the changes needed? Without this change, some of the new tests added in https://github.com/apache/spark/pull/42685 would fail ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests were run in CI. Without this change, some of the new tests added in https://github.com/apache/spark/pull/42685 fail ### Was this patch authored or co-authored using generative AI tooling? No Closes #43162 from hasnain-db/spark-tls-timeout. Authored-by: Hasnain Lakhani Signed-off-by: Mridul Muralidharan gmail.com> (cherry picked from commit 2a88feadd4b7cec9e01bc744e589783e3390e5ce) Signed-off-by: Mridul Muralidharan --- .../main/java/org/apache/spark/network/client/TransportClient.java | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java index 4a0a1566998..40825e06b82 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -325,7 +325,10 @@ public class TransportClient implements Closeable { @Override public void close() { -// close is a local operation and should finish with milliseconds; timeout just to be safe +// Mark the connection as timed out, so we do not return a connection that's being closed +// from the TransportClientFactory if closing takes some time (e.g. with SSL) +this.timedOut = true; +// close should not take this long; use a timeout just to be safe channel.close().awaitUninterruptibly(10, TimeUnit.SECONDS); } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44937][CORE] Mark connection as timedOut in TransportClient.close
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2a88feadd4b [SPARK-44937][CORE] Mark connection as timedOut in TransportClient.close 2a88feadd4b is described below commit 2a88feadd4b7cec9e01bc744e589783e3390e5ce Author: Hasnain Lakhani AuthorDate: Thu Sep 28 18:16:49 2023 -0500 [SPARK-44937][CORE] Mark connection as timedOut in TransportClient.close ### What changes were proposed in this pull request? This PR avoids a race condition where a connection which is in the process of being closed could be returned by the TransportClientFactory only to be immediately closed and cause errors upon use. This race condition is rare and not easily triggered, but with the upcoming changes to introduce SSL connection support, connection closing can take just a slight bit longer and it's much easier to trigger this issue. Looking at the history of the code I believe this was an oversight in https://github.com/apache/spark/pull/9853. ### Why are the changes needed? Without this change, some of the new tests added in https://github.com/apache/spark/pull/42685 would fail ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests were run in CI. Without this change, some of the new tests added in https://github.com/apache/spark/pull/42685 fail ### Was this patch authored or co-authored using generative AI tooling? No Closes #43162 from hasnain-db/spark-tls-timeout. Authored-by: Hasnain Lakhani Signed-off-by: Mridul Muralidharan gmail.com> --- .../main/java/org/apache/spark/network/client/TransportClient.java | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java index 4a0a1566998..40825e06b82 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -325,7 +325,10 @@ public class TransportClient implements Closeable { @Override public void close() { -// close is a local operation and should finish with milliseconds; timeout just to be safe +// Mark the connection as timed out, so we do not return a connection that's being closed +// from the TransportClientFactory if closing takes some time (e.g. with SSL) +this.timedOut = true; +// close should not take this long; use a timeout just to be safe channel.close().awaitUninterruptibly(10, TimeUnit.SECONDS); } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-44937][CORE] Mark connection as timedOut in TransportClient.close
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 3a723a171b1 [SPARK-44937][CORE] Mark connection as timedOut in TransportClient.close 3a723a171b1 is described below commit 3a723a171b1b77a8a8b2ccce2bd489acb8db00a3 Author: Hasnain Lakhani AuthorDate: Thu Sep 28 18:16:49 2023 -0500 [SPARK-44937][CORE] Mark connection as timedOut in TransportClient.close ### What changes were proposed in this pull request? This PR avoids a race condition where a connection which is in the process of being closed could be returned by the TransportClientFactory only to be immediately closed and cause errors upon use. This race condition is rare and not easily triggered, but with the upcoming changes to introduce SSL connection support, connection closing can take just a slight bit longer and it's much easier to trigger this issue. Looking at the history of the code I believe this was an oversight in https://github.com/apache/spark/pull/9853. ### Why are the changes needed? Without this change, some of the new tests added in https://github.com/apache/spark/pull/42685 would fail ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests were run in CI. Without this change, some of the new tests added in https://github.com/apache/spark/pull/42685 fail ### Was this patch authored or co-authored using generative AI tooling? No Closes #43162 from hasnain-db/spark-tls-timeout. Authored-by: Hasnain Lakhani Signed-off-by: Mridul Muralidharan gmail.com> (cherry picked from commit 2a88feadd4b7cec9e01bc744e589783e3390e5ce) Signed-off-by: Mridul Muralidharan --- .../main/java/org/apache/spark/network/client/TransportClient.java | 5 - 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java index 4a0a1566998..40825e06b82 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java @@ -325,7 +325,10 @@ public class TransportClient implements Closeable { @Override public void close() { -// close is a local operation and should finish with milliseconds; timeout just to be safe +// Mark the connection as timed out, so we do not return a connection that's being closed +// from the TransportClientFactory if closing takes some time (e.g. with SSL) +this.timedOut = true; +// close should not take this long; use a timeout just to be safe channel.close().awaitUninterruptibly(10, TimeUnit.SECONDS); } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44895][CORE][UI] Add 'daemon', 'priority' for ThreadStackTrace
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6341310711e [SPARK-44895][CORE][UI] Add 'daemon', 'priority' for ThreadStackTrace 6341310711e is described below commit 6341310711ee0e3edbdd42aaeaf806cad4edefb5 Author: Kent Yao AuthorDate: Thu Sep 28 18:04:03 2023 -0500 [SPARK-44895][CORE][UI] Add 'daemon', 'priority' for ThreadStackTrace ### What changes were proposed in this pull request? Since version 9, Java has supported the 'daemon' and 'priority' fields in ThreadInfo. In this PR, we extract them from ThreadInfo to ThreadStackTrace ### Why are the changes needed? more information for thread pages in UI and rest APIs ### Does this PR introduce _any_ user-facing change? yes, ThreadStackTrace changes ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #43095 from yaooqinn/SPARK-44895. Authored-by: Kent Yao Signed-off-by: Sean Owen --- .../main/scala/org/apache/spark/status/api/v1/api.scala| 10 ++ core/src/main/scala/org/apache/spark/util/Utils.scala | 4 +++- .../test/scala/org/apache/spark/ui/UISeleniumSuite.scala | 14 ++ 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 3e4e2f17a77..7a0c69e2948 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -540,19 +540,21 @@ case class ThreadStackTrace( lockName: Option[String], lockOwnerName: Option[String], suspended: Boolean, -inNative: Boolean) { +inNative: Boolean, +isDaemon: Boolean, +priority: Int) { /** * Returns a string representation of this thread stack trace * w.r.t java.lang.management.ThreadInfo(JDK 8)'s toString. * - * TODO(SPARK-44895): Considering 'daemon', 'priority' from higher JDKs - * * TODO(SPARK-44896): Also considering adding information os_prio, cpu, elapsed, tid, nid, etc., * from the jstack tool */ override def toString: String = { -val sb = new StringBuilder(s$threadName" Id=$threadId $threadState""") +val daemon = if (isDaemon) " daemon" else "" +val sb = new StringBuilder( + s$threadName"$daemon prio=$priority Id=$threadId $threadState""") lockName.foreach(lock => sb.append(s" on $lock")) lockOwnerName.foreach { owner => sb.append(s"""owned by "$owner) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 48dfbecb7cd..dcffa99dc64 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2196,7 +2196,9 @@ private[spark] object Utils Option(threadInfo.getLockName), Option(threadInfo.getLockOwnerName), threadInfo.isSuspended, - threadInfo.isInNative) + threadInfo.isInNative, + threadInfo.isDaemon, + threadInfo.getPriority) } /** diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index dd9927d7ba1..7e74cc9287f 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -885,6 +885,20 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers { } } + test("SPARK-44895: Add 'daemon', 'priority' for ThreadStackTrace") { +withSpark(newSparkContext()) { sc => + val uiThreads = getJson(sc.ui.get, "executors/driver/threads") +.children +.filter(v => (v \ "threadName").extract[String].matches("SparkUI-\\d+")) + val priority = Thread.currentThread().getPriority + + uiThreads.foreach { v => +assert((v \ "isDaemon").extract[Boolean]) +assert((v \ "priority").extract[Int] === priority) + } +} + } + def goToUi(sc: SparkContext, path: String): Unit = { goToUi(sc.ui.get, path) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45362][PYTHON] Project out PARTITION BY expressions before Python UDTF 'eval' method consumes them
This is an automated email from the ASF dual-hosted git repository. ueshin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b3d5bc0c109 [SPARK-45362][PYTHON] Project out PARTITION BY expressions before Python UDTF 'eval' method consumes them b3d5bc0c109 is described below commit b3d5bc0c10908aa66510844eaabc43b6764dd7c0 Author: Daniel Tenedorio AuthorDate: Thu Sep 28 14:02:46 2023 -0700 [SPARK-45362][PYTHON] Project out PARTITION BY expressions before Python UDTF 'eval' method consumes them ### What changes were proposed in this pull request? This PR projects out PARTITION BY expressions before Python UDTF 'eval' method consumes them. Before this PR, if a query included this `PARTITION BY` clause: ``` SELECT * FROM udtf((SELECT a, b FROM TABLE t) PARTITION BY (c, d)) ``` Then the `eval` method received four columns in each row: `a, b, c, d`. After this PR, the `eval` method only receives two columns: `a, b`, as expected. ### Why are the changes needed? This makes the Python UDTF `TABLE` columns consistently match what the `eval` method receives, as expected. ### Does this PR introduce _any_ user-facing change? Yes, see above. ### How was this patch tested? This PR adds new unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43156 from dtenedor/project-out-partition-exprs. Authored-by: Daniel Tenedorio Signed-off-by: Takuya UESHIN --- python/pyspark/sql/tests/test_udtf.py | 12 python/pyspark/worker.py | 31 +++ 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/tests/test_udtf.py b/python/pyspark/sql/tests/test_udtf.py index 97d5190a506..a1d82056c50 100644 --- a/python/pyspark/sql/tests/test_udtf.py +++ b/python/pyspark/sql/tests/test_udtf.py @@ -2009,6 +2009,10 @@ class BaseUDTFTestsMixin: self._partition_col = None def eval(self, row: Row): +# Make sure that the PARTITION BY expressions were projected out. +assert len(row.asDict().items()) == 2 +assert "partition_col" in row +assert "input" in row self._sum += row["input"] if self._partition_col is not None and self._partition_col != row["partition_col"]: # Make sure that all values of the partitioning column are the same @@ -2092,6 +2096,10 @@ class BaseUDTFTestsMixin: self._partition_col = None def eval(self, row: Row, partition_col: str): +# Make sure that the PARTITION BY and ORDER BY expressions were projected out. +assert len(row.asDict().items()) == 2 +assert "partition_col" in row +assert "input" in row # Make sure that all values of the partitioning column are the same # for each row consumed by this method for this instance of the class. if self._partition_col is not None and self._partition_col != row[partition_col]: @@ -2247,6 +2255,10 @@ class BaseUDTFTestsMixin: ) def eval(self, row: Row): +# Make sure that the PARTITION BY and ORDER BY expressions were projected out. +assert len(row.asDict().items()) == 2 +assert "partition_col" in row +assert "input" in row # Make sure that all values of the partitioning column are the same # for each row consumed by this method for this instance of the class. if self._partition_col is not None and self._partition_col != row["partition_col"]: diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 77481704979..4cffb02a64a 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -51,7 +51,14 @@ from pyspark.sql.pandas.serializers import ( ApplyInPandasWithStateSerializer, ) from pyspark.sql.pandas.types import to_arrow_type -from pyspark.sql.types import BinaryType, Row, StringType, StructType, _parse_datatype_json_string +from pyspark.sql.types import ( +BinaryType, +Row, +StringType, +StructType, +_create_row, +_parse_datatype_json_string, +) from pyspark.util import fail_on_stopiteration, handle_worker_exception from pyspark import shuffle from pyspark.errors import PySparkRuntimeError, PySparkTypeError @@ -735,7 +742,12 @@ def read_udtf(pickleSer, infile, eval_type): yield row self._udtf = self._create_udtf() if self._udtf.eval is not None: -result = self._udtf.eval(*args, **kwargs) +
[spark] branch master updated: [SPARK-45365][INFRA] Allow `branch-3.4` daily test to use the new test group tags
This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b938ff9f520 [SPARK-45365][INFRA] Allow `branch-3.4` daily test to use the new test group tags b938ff9f520 is described below commit b938ff9f520fd4e4997938284ffa0aba9ea271fc Author: yangjie01 AuthorDate: Thu Sep 28 13:05:57 2023 -0700 [SPARK-45365][INFRA] Allow `branch-3.4` daily test to use the new test group tags ### What changes were proposed in this pull request? This pr aims to allow branch-3.4 daily test to use the new test group tags. ### Why are the changes needed? Already backport SPARK-44034 to branch-3.4, new test tags can be used to reduce the average time consumption of daily tests. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Should monitor GA. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43159 from LuciferYang/SPARK-45365. Authored-by: yangjie01 Signed-off-by: Chao Sun --- .github/workflows/build_and_test.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 89da6fcffe4..53994788d8b 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -180,10 +180,10 @@ jobs: java: ${{ inputs.java }} hadoop: ${{ inputs.hadoop }} hive: hive2.3 -# Using a tag that will not appear in sql module for placeholder, branch-3.3 and branch-3.4 will not run any UTs. +# Using a tag that will not appear in sql module for placeholder, branch-3.3 will not run any UTs. included-tags: >- ${{ -((inputs.branch == 'branch-3.3' || inputs.branch == 'branch-3.4') && 'org.apache.spark.tags.SlowHiveTest') +(inputs.branch == 'branch-3.3' && 'org.apache.spark.tags.SlowHiveTest') || 'org.apache.spark.tags.SlowSQLTest' }} comment: "- slow tests" @@ -193,7 +193,7 @@ jobs: hive: hive2.3 excluded-tags: >- ${{ -((inputs.branch == 'branch-3.3' || inputs.branch == 'branch-3.4') && 'org.apache.spark.tags.ExtendedSQLTest') +(inputs.branch == 'branch-3.3' && 'org.apache.spark.tags.ExtendedSQLTest') || 'org.apache.spark.tags.ExtendedSQLTest,org.apache.spark.tags.SlowSQLTest' }} comment: "- other tests" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [MINOR][SQL] Remove duplicate cases of escaping characters in string literals
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ee21b12c395 [MINOR][SQL] Remove duplicate cases of escaping characters in string literals ee21b12c395 is described below commit ee21b12c395ac184c8ddc2f74b66f6e6285de5fa Author: Max Gekk AuthorDate: Thu Sep 28 21:18:40 2023 +0300 [MINOR][SQL] Remove duplicate cases of escaping characters in string literals ### What changes were proposed in this pull request? In the PR, I propose to remove some cases in `appendEscapedChar()` because they fall to the default case. The following tests check the cases: - https://github.com/apache/spark/blob/187e9a851758c0e9cec11edab2bc07d6f4404001/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ParserUtilsSuite.scala#L97-L98 - https://github.com/apache/spark/blob/187e9a851758c0e9cec11edab2bc07d6f4404001/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/ParserUtilsSuite.scala#L104 ### Why are the changes needed? To improve code maintainability. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By running the affected test suite: ``` $ build/sbt "test:testOnly *.ParserUtilsSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43170 from MaxGekk/cleanup-escaping. Authored-by: Max Gekk Signed-off-by: Max Gekk --- .../scala/org/apache/spark/sql/catalyst/util/SparkParserUtils.scala| 3 --- 1 file changed, 3 deletions(-) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkParserUtils.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkParserUtils.scala index c318f208255..a4ce5fb1203 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkParserUtils.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkParserUtils.scala @@ -38,14 +38,11 @@ trait SparkParserUtils { def appendEscapedChar(n: Char): Unit = { n match { case '0' => sb.append('\u') -case '\'' => sb.append('\'') -case '"' => sb.append('\"') case 'b' => sb.append('\b') case 'n' => sb.append('\n') case 'r' => sb.append('\r') case 't' => sb.append('\t') case 'Z' => sb.append('\u001A') -case '\\' => sb.append('\\') // The following 2 lines are exactly what MySQL does TODO: why do we do this? case '%' => sb.append("\\%") case '_' => sb.append("\\_") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45266][PYTHON] Refactor ResolveFunctions analyzer rule to delay making lateral join when table arguments are used
This is an automated email from the ASF dual-hosted git repository. ueshin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2cf937f9bac [SPARK-45266][PYTHON] Refactor ResolveFunctions analyzer rule to delay making lateral join when table arguments are used 2cf937f9bac is described below commit 2cf937f9bac2131f3657660a8d65d07ab4ece490 Author: Takuya UESHIN AuthorDate: Thu Sep 28 10:37:18 2023 -0700 [SPARK-45266][PYTHON] Refactor ResolveFunctions analyzer rule to delay making lateral join when table arguments are used ### What changes were proposed in this pull request? Refactors `ResolveFunctions` analyzer rule to delay making lateral join when table arguments are used. - Delay making lateral join when table arguments are used to after all the children are resolved - Resolve `UnresolvedPolymorphicPythonUDTF` in one place - Introduce a new error class `UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.UNSUPPORTED_TABLE_ARGUMENT` if table arguments are used inproperly. ### Why are the changes needed? The analyzer rule `ResolveFunctions` became complicated. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43042 from ueshin/issues/SPARK-45266/analyzer. Authored-by: Takuya UESHIN Signed-off-by: Takuya UESHIN --- .../src/main/resources/error/error-classes.json| 5 + ...ted-subquery-expression-category-error-class.md | 4 + .../spark/sql/catalyst/analysis/Analyzer.scala | 155 + .../sql/catalyst/analysis/CheckAnalysis.scala | 5 + .../spark/sql/catalyst/expressions/PythonUDF.scala | 6 +- .../named-function-arguments.sql.out | 16 +-- .../results/named-function-arguments.sql.out | 16 +-- .../sql/execution/python/PythonUDTFSuite.scala | 20 ++- 8 files changed, 103 insertions(+), 124 deletions(-) diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 0882e387176..58fcedae332 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -3484,6 +3484,11 @@ "message" : [ "IN/EXISTS predicate subqueries can only be used in filters, joins, aggregations, window functions, projections, and UPDATE/MERGE/DELETE commands." ] + }, + "UNSUPPORTED_TABLE_ARGUMENT" : { +"message" : [ + "Table arguments are used in a function where they are not supported." +] } }, "sqlState" : "0A000" diff --git a/docs/sql-error-conditions-unsupported-subquery-expression-category-error-class.md b/docs/sql-error-conditions-unsupported-subquery-expression-category-error-class.md index f61ea721aa0..45ad386c666 100644 --- a/docs/sql-error-conditions-unsupported-subquery-expression-category-error-class.md +++ b/docs/sql-error-conditions-unsupported-subquery-expression-category-error-class.md @@ -73,4 +73,8 @@ Correlated scalar subqueries can only be used in filters, aggregations, projecti IN/EXISTS predicate subqueries can only be used in filters, joins, aggregations, window functions, projections, and UPDATE/MERGE/DELETE commands``. +## UNSUPPORTED_TABLE_ARGUMENT + +Table arguments are used in a function where they are not supported``. + diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 67a958d73f7..cc0bfd3fc31 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2082,7 +2082,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor case u: UnresolvedTableValuedFunction if u.functionArgs.forall(_.resolved) => withPosition(u) { try { -val resolvedTvf = resolveBuiltinOrTempTableFunction(u.name, u.functionArgs).getOrElse { +val resolvedFunc = resolveBuiltinOrTempTableFunction(u.name, u.functionArgs).getOrElse { val CatalogAndIdentifier(catalog, ident) = expandIdentifier(u.name) if (CatalogV2Util.isSessionCatalog(catalog)) { v1SessionCatalog.resolvePersistentTableFunction( @@ -2092,93 +2092,19 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor catalog, "table-valued functions") } } -// Resolve Python UDTF calls if needed. -val resolvedFunc =
[spark] branch branch-3.5 updated: [SPARK-45360][SQL][CONNECT] Initialize spark session builder configuration from SPARK_REMOTE
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 917bc8cb927 [SPARK-45360][SQL][CONNECT] Initialize spark session builder configuration from SPARK_REMOTE 917bc8cb927 is described below commit 917bc8cb92728267fb93891f4ef9da13c06e4589 Author: Yihong He AuthorDate: Thu Sep 28 12:58:07 2023 -0400 [SPARK-45360][SQL][CONNECT] Initialize spark session builder configuration from SPARK_REMOTE ### What changes were proposed in this pull request? - Initialize spark session builder configuration from SPARK_REMOTE ### Why are the changes needed? - `SparkSession.builder().getOrCreate()` should follow the behavior documents [here](https://github.com/apache/spark/blob/2cc1ee4d3a05a641d7a245f015ef824d8f7bae8b/docs/spark-connect-overview.md?plain=1#L241-L244) and support initialization from SPARK_REMOTE ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - `build/sbt "connect-client-jvm/testOnly *SparkConnectClientSuite"` ### Was this patch authored or co-authored using generative AI tooling? Closes #43153 from heyihong/SPARK-45360. Authored-by: Yihong He Signed-off-by: Herman van Hovell (cherry picked from commit 183a3d761f36d35572cfb37ab921b6a86f8f28ed) Signed-off-by: Herman van Hovell --- .../scala/org/apache/spark/sql/SparkSession.scala | 5 +- .../connect/client/SparkConnectClientSuite.scala | 61 ++ 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index 7bd8fa59aea..421f37b9e8a 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -783,7 +783,10 @@ object SparkSession extends Logging { } class Builder() extends Logging { -private val builder = SparkConnectClient.builder() +// Initialize the connection string of the Spark Connect client builder from SPARK_REMOTE +// by default, if it exists. The connection string can be overridden using +// the remote() function, as it takes precedence over the SPARK_REMOTE environment variable. +private val builder = SparkConnectClient.builder().loadFromEnvironment() private var client: SparkConnectClient = _ private[this] val options = new scala.collection.mutable.HashMap[String, String] diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala index 80e245ec78b..89acc2c60ac 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala @@ -86,6 +86,24 @@ class SparkConnectClientSuite extends ConnectFunSuite with BeforeAndAfterEach { assert(response.getSessionId === "abc123") } + private def withEnvs(pairs: (String, String)*)(f: => Unit): Unit = { +val readonlyEnv = System.getenv() +val field = readonlyEnv.getClass.getDeclaredField("m") +field.setAccessible(true) +val modifiableEnv = field.get(readonlyEnv).asInstanceOf[java.util.Map[String, String]] +try { + for ((k, v) <- pairs) { +assert(!modifiableEnv.containsKey(k)) +modifiableEnv.put(k, v) + } + f +} finally { + for ((k, _) <- pairs) { +modifiableEnv.remove(k) + } +} + } + test("Test connection") { testClientConnection() { testPort => SparkConnectClient.builder().port(testPort).build() } } @@ -112,6 +130,49 @@ class SparkConnectClientSuite extends ConnectFunSuite with BeforeAndAfterEach { } } + test("SparkSession create with SPARK_REMOTE") { +startDummyServer(0) + +withEnvs("SPARK_REMOTE" -> s"sc://localhost:${server.getPort}") { + val session = SparkSession.builder().create() + val df = session.range(10) + df.analyze // Trigger RPC + assert(df.plan === service.getAndClearLatestInputPlan()) + + val session2 = SparkSession.builder().create() + assert(session != session2) +} + } + + test("SparkSession getOrCreate with SPARK_REMOTE") { +startDummyServer(0) + +withEnvs("SPARK_REMOTE" -> s"sc://localhost:${server.getPort}") { + val session = SparkSession.builder().getOrCreate() + + val df = session.range(10) + df.analyze // Trigger RPC +
[spark] branch master updated: [SPARK-45360][SQL][CONNECT] Initialize spark session builder configuration from SPARK_REMOTE
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 183a3d761f3 [SPARK-45360][SQL][CONNECT] Initialize spark session builder configuration from SPARK_REMOTE 183a3d761f3 is described below commit 183a3d761f36d35572cfb37ab921b6a86f8f28ed Author: Yihong He AuthorDate: Thu Sep 28 12:58:07 2023 -0400 [SPARK-45360][SQL][CONNECT] Initialize spark session builder configuration from SPARK_REMOTE ### What changes were proposed in this pull request? - Initialize spark session builder configuration from SPARK_REMOTE ### Why are the changes needed? - `SparkSession.builder().getOrCreate()` should follow the behavior documents [here](https://github.com/apache/spark/blob/2cc1ee4d3a05a641d7a245f015ef824d8f7bae8b/docs/spark-connect-overview.md?plain=1#L241-L244) and support initialization from SPARK_REMOTE ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - `build/sbt "connect-client-jvm/testOnly *SparkConnectClientSuite"` ### Was this patch authored or co-authored using generative AI tooling? Closes #43153 from heyihong/SPARK-45360. Authored-by: Yihong He Signed-off-by: Herman van Hovell --- .../scala/org/apache/spark/sql/SparkSession.scala | 5 +- .../connect/client/SparkConnectClientSuite.scala | 61 ++ 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index abe4d6a96e6..42052e3f8e6 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -786,7 +786,10 @@ object SparkSession extends Logging { } class Builder() extends Logging { -private val builder = SparkConnectClient.builder() +// Initialize the connection string of the Spark Connect client builder from SPARK_REMOTE +// by default, if it exists. The connection string can be overridden using +// the remote() function, as it takes precedence over the SPARK_REMOTE environment variable. +private val builder = SparkConnectClient.builder().loadFromEnvironment() private var client: SparkConnectClient = _ private[this] val options = new scala.collection.mutable.HashMap[String, String] diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala index 488118d0552..57e0b4016f1 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala @@ -86,6 +86,24 @@ class SparkConnectClientSuite extends ConnectFunSuite with BeforeAndAfterEach { assert(response.getSessionId === "abc123") } + private def withEnvs(pairs: (String, String)*)(f: => Unit): Unit = { +val readonlyEnv = System.getenv() +val field = readonlyEnv.getClass.getDeclaredField("m") +field.setAccessible(true) +val modifiableEnv = field.get(readonlyEnv).asInstanceOf[java.util.Map[String, String]] +try { + for ((k, v) <- pairs) { +assert(!modifiableEnv.containsKey(k)) +modifiableEnv.put(k, v) + } + f +} finally { + for ((k, _) <- pairs) { +modifiableEnv.remove(k) + } +} + } + test("Test connection") { testClientConnection() { testPort => SparkConnectClient.builder().port(testPort).build() } } @@ -112,6 +130,49 @@ class SparkConnectClientSuite extends ConnectFunSuite with BeforeAndAfterEach { } } + test("SparkSession create with SPARK_REMOTE") { +startDummyServer(0) + +withEnvs("SPARK_REMOTE" -> s"sc://localhost:${server.getPort}") { + val session = SparkSession.builder().create() + val df = session.range(10) + df.analyze // Trigger RPC + assert(df.plan === service.getAndClearLatestInputPlan()) + + val session2 = SparkSession.builder().create() + assert(session != session2) +} + } + + test("SparkSession getOrCreate with SPARK_REMOTE") { +startDummyServer(0) + +withEnvs("SPARK_REMOTE" -> s"sc://localhost:${server.getPort}") { + val session = SparkSession.builder().getOrCreate() + + val df = session.range(10) + df.analyze // Trigger RPC + assert(df.plan === service.getAndClearLatestInputPlan()) + + val session2 = SparkSession.builder().getOrCreate() +
[spark] branch master updated: [SPARK-45364][INFRA][BUILD] Clean up the unnecessary Scala 2.12 logical in SparkBuild
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 187e9a85175 [SPARK-45364][INFRA][BUILD] Clean up the unnecessary Scala 2.12 logical in SparkBuild 187e9a85175 is described below commit 187e9a851758c0e9cec11edab2bc07d6f4404001 Author: panbingkun AuthorDate: Thu Sep 28 08:36:08 2023 -0500 [SPARK-45364][INFRA][BUILD] Clean up the unnecessary Scala 2.12 logical in SparkBuild ### What changes were proposed in this pull request? The pr aims to clean up the unnecessary Scala 2.12 logical in SparkBuild. ### Why are the changes needed? Spark 4.0 no longer supports Scala 2.12. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43158 from panbingkun/SPARK-45364. Authored-by: panbingkun Signed-off-by: Sean Owen --- project/SparkBuild.scala | 5 + 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 85ffda304bc..13c92142d46 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -352,10 +352,7 @@ object SparkBuild extends PomBuild { "org.apache.spark.util.collection" ).mkString(":"), "-doc-title", "Spark " + version.value.replaceAll("-SNAPSHOT", "") + " ScalaDoc" -) ++ { - // Do not attempt to scaladoc javadoc comments under 2.12 since it can't handle inner classes - if (scalaBinaryVersion.value == "2.12") Seq("-no-java-comments") else Seq.empty -}, +), // disable Mima check for all modules, // to be enabled in specific ones that have previous artifacts - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org