[spark] branch master updated: [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 82a41d8ca27 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes 82a41d8ca27 is described below commit 82a41d8ca273e7a9268324c6958f8bb14d9e Author: Weichen Xu AuthorDate: Sat Nov 19 17:23:20 2022 +0800 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes Signed-off-by: Weichen Xu ### What changes were proposed in this pull request? Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes. ### Why are the changes needed? This is for limiting the thread number for OpenBLAS routine to the number of cores assigned to this executor because some spark ML algorithms calls OpenBlAS via netlib-java, e.g.: Spark ALS estimator training calls LAPACK API `dppsv` (internally it will call BLAS lib), if it calls OpenBLAS lib, by default OpenBLAS will try to use all CPU cores. But spark will launch multiple spark tasks on a spark worker, and each spark task might call `dppsv` API at the same time, and each call internally it will create multiple threads (threads number equals to CPU cores), this causes CPU oversubscription. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually. Closes #38699 from WeichenXu123/SPARK-41188. Authored-by: Weichen Xu Signed-off-by: Weichen Xu --- core/src/main/scala/org/apache/spark/SparkContext.scala| 10 ++ .../main/scala/org/apache/spark/api/python/PythonRunner.scala | 7 --- .../scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala | 10 ++ 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index d2c7067e596..5cbf2e83371 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -541,6 +541,16 @@ class SparkContext(config: SparkConf) extends Logging { executorEnvs ++= _conf.getExecutorEnv executorEnvs("SPARK_USER") = sparkUser +if (_conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) { + // if OMP_NUM_THREADS is not explicitly set, override it with the value of "spark.task.cpus" + // SPARK-41188: limit the thread number for OpenBLAS routine to the number of cores assigned + // to this executor because some spark ML algorithms calls OpenBlAS via netlib-java + // SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor + // this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool + // see https://github.com/numpy/numpy/issues/10455 + executorEnvs.put("OMP_NUM_THREADS", _conf.get("spark.task.cpus", "1")) +} + _shuffleDriverComponents = ShuffleDataIOUtils.loadShuffleDataIO(config).driver() _shuffleDriverComponents.initializeApplication().asScala.foreach { case (k, v) => _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 14d5df14ed8..cdb2c620656 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -135,13 +135,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( val execCoresProp = Option(context.getLocalProperty(EXECUTOR_CORES_LOCAL_PROPERTY)) val memoryMb = Option(context.getLocalProperty(PYSPARK_MEMORY_LOCAL_PROPERTY)).map(_.toLong) val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") -// if OMP_NUM_THREADS is not explicitly set, override it with the number of cores -if (conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) { - // SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor - // this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool - // see https://github.com/numpy/numpy/issues/10455 - execCoresProp.foreach(envVars.put("OMP_NUM_THREADS", _)) -} envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor thread if (reuseWorker) { envVars.put("SPARK_REUSE_WORKER", "1") diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/resource-managers/mesos/src/test/scala/or
[spark] branch branch-3.3 updated: [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new f431cdf0944 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes f431cdf0944 is described below commit f431cdf09442b86133d17fa6e56cb8b77ca4e486 Author: Weichen Xu AuthorDate: Sat Nov 19 17:23:20 2022 +0800 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes Signed-off-by: Weichen Xu ### What changes were proposed in this pull request? Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes. ### Why are the changes needed? This is for limiting the thread number for OpenBLAS routine to the number of cores assigned to this executor because some spark ML algorithms calls OpenBlAS via netlib-java, e.g.: Spark ALS estimator training calls LAPACK API `dppsv` (internally it will call BLAS lib), if it calls OpenBLAS lib, by default OpenBLAS will try to use all CPU cores. But spark will launch multiple spark tasks on a spark worker, and each spark task might call `dppsv` API at the same time, and each call internally it will create multiple threads (threads number equals to CPU cores), this causes CPU oversubscription. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually. Closes #38699 from WeichenXu123/SPARK-41188. Authored-by: Weichen Xu Signed-off-by: Weichen Xu (cherry picked from commit 82a41d8ca273e7a9268324c6958f8bb14d9e) Signed-off-by: Weichen Xu --- core/src/main/scala/org/apache/spark/SparkContext.scala| 10 ++ .../main/scala/org/apache/spark/api/python/PythonRunner.scala | 7 --- .../scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala | 10 ++ 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 02c58d2a9b4..0d0d4fe83a4 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -546,6 +546,16 @@ class SparkContext(config: SparkConf) extends Logging { executorEnvs ++= _conf.getExecutorEnv executorEnvs("SPARK_USER") = sparkUser +if (_conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) { + // if OMP_NUM_THREADS is not explicitly set, override it with the value of "spark.task.cpus" + // SPARK-41188: limit the thread number for OpenBLAS routine to the number of cores assigned + // to this executor because some spark ML algorithms calls OpenBlAS via netlib-java + // SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor + // this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool + // see https://github.com/numpy/numpy/issues/10455 + executorEnvs.put("OMP_NUM_THREADS", _conf.get("spark.task.cpus", "1")) +} + _shuffleDriverComponents = ShuffleDataIOUtils.loadShuffleDataIO(config).driver() _shuffleDriverComponents.initializeApplication().asScala.foreach { case (k, v) => _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index f32c80f3ef5..bf5b862438a 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -133,13 +133,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( val execCoresProp = Option(context.getLocalProperty(EXECUTOR_CORES_LOCAL_PROPERTY)) val memoryMb = Option(context.getLocalProperty(PYSPARK_MEMORY_LOCAL_PROPERTY)).map(_.toLong) val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") -// if OMP_NUM_THREADS is not explicitly set, override it with the number of cores -if (conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) { - // SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor - // this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool - // see https://github.com/numpy/numpy/issues/10455 - execCoresProp.foreach(envVars.put("OMP_NUM_THREADS", _)) -} envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor thread if (reuseWorker) { envVars.put("SPARK_REUSE_WORKER", "1") diff --git a/resource-managers/mesos/s
[spark] branch branch-3.2 updated: [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.2 by this push: new 3b9cca7aa32 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes 3b9cca7aa32 is described below commit 3b9cca7aa32f659ea7413abeb373fe7ed069e6f7 Author: Weichen Xu AuthorDate: Sat Nov 19 17:23:20 2022 +0800 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes Signed-off-by: Weichen Xu ### What changes were proposed in this pull request? Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes. ### Why are the changes needed? This is for limiting the thread number for OpenBLAS routine to the number of cores assigned to this executor because some spark ML algorithms calls OpenBlAS via netlib-java, e.g.: Spark ALS estimator training calls LAPACK API `dppsv` (internally it will call BLAS lib), if it calls OpenBLAS lib, by default OpenBLAS will try to use all CPU cores. But spark will launch multiple spark tasks on a spark worker, and each spark task might call `dppsv` API at the same time, and each call internally it will create multiple threads (threads number equals to CPU cores), this causes CPU oversubscription. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually. Closes #38699 from WeichenXu123/SPARK-41188. Authored-by: Weichen Xu Signed-off-by: Weichen Xu (cherry picked from commit 82a41d8ca273e7a9268324c6958f8bb14d9e) Signed-off-by: Weichen Xu --- core/src/main/scala/org/apache/spark/SparkContext.scala| 10 ++ .../main/scala/org/apache/spark/api/python/PythonRunner.scala | 7 --- .../scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala | 10 ++ 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f7d8c799029..f991d2ea09c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -542,6 +542,16 @@ class SparkContext(config: SparkConf) extends Logging { executorEnvs ++= _conf.getExecutorEnv executorEnvs("SPARK_USER") = sparkUser +if (_conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) { + // if OMP_NUM_THREADS is not explicitly set, override it with the value of "spark.task.cpus" + // SPARK-41188: limit the thread number for OpenBLAS routine to the number of cores assigned + // to this executor because some spark ML algorithms calls OpenBlAS via netlib-java + // SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor + // this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool + // see https://github.com/numpy/numpy/issues/10455 + executorEnvs.put("OMP_NUM_THREADS", _conf.get("spark.task.cpus", "1")) +} + _shuffleDriverComponents = ShuffleDataIOUtils.loadShuffleDataIO(config).driver() _shuffleDriverComponents.initializeApplication().asScala.foreach { case (k, v) => _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 3a3e7e04e7f..d854874c0e8 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -131,13 +131,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( val execCoresProp = Option(context.getLocalProperty(EXECUTOR_CORES_LOCAL_PROPERTY)) val memoryMb = Option(context.getLocalProperty(PYSPARK_MEMORY_LOCAL_PROPERTY)).map(_.toLong) val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") -// if OMP_NUM_THREADS is not explicitly set, override it with the number of cores -if (conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) { - // SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor - // this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool - // see https://github.com/numpy/numpy/issues/10455 - execCoresProp.foreach(envVars.put("OMP_NUM_THREADS", _)) -} envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor thread if (reuseWorker) { envVars.put("SPARK_REUSE_WORKER", "1") diff --git a/resource-managers/mesos/s
[spark] branch branch-3.3 updated: [SPARK-41202][BUILD][3.3] Update ORC to 1.7.7
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.3 by this push: new 0152c2e91e7 [SPARK-41202][BUILD][3.3] Update ORC to 1.7.7 0152c2e91e7 is described below commit 0152c2e91e7d2a3197886e41b493613c7e2c258d Author: William Hyun AuthorDate: Sat Nov 19 04:17:20 2022 -0800 [SPARK-41202][BUILD][3.3] Update ORC to 1.7.7 ### What changes were proposed in this pull request? This PR aims to update ORC to 1.7.7. ### Why are the changes needed? This will bring the latest bug fixes. - https://orc.apache.org/news/2022/11/17/ORC-1.7.7/ ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. Closes #38724 from williamhyun/SPARK-41202. Authored-by: William Hyun Signed-off-by: Dongjoon Hyun --- dev/deps/spark-deps-hadoop-2-hive-2.3 | 6 +++--- dev/deps/spark-deps-hadoop-3-hive-2.3 | 6 +++--- pom.xml | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 b/dev/deps/spark-deps-hadoop-2-hive-2.3 index 6bcd447dc64..6a16cb2ec03 100644 --- a/dev/deps/spark-deps-hadoop-2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2-hive-2.3 @@ -219,9 +219,9 @@ objenesis/3.2//objenesis-3.2.jar okhttp/3.12.12//okhttp-3.12.12.jar okio/1.14.0//okio-1.14.0.jar opencsv/2.3//opencsv-2.3.jar -orc-core/1.7.6//orc-core-1.7.6.jar -orc-mapreduce/1.7.6//orc-mapreduce-1.7.6.jar -orc-shims/1.7.6//orc-shims-1.7.6.jar +orc-core/1.7.7//orc-core-1.7.7.jar +orc-mapreduce/1.7.7//orc-mapreduce-1.7.7.jar +orc-shims/1.7.7//orc-shims-1.7.7.jar oro/2.0.8//oro-2.0.8.jar osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar paranamer/2.8//paranamer-2.8.jar diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index 7429ecab6b9..9738cabd217 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -208,9 +208,9 @@ opencsv/2.3//opencsv-2.3.jar opentracing-api/0.33.0//opentracing-api-0.33.0.jar opentracing-noop/0.33.0//opentracing-noop-0.33.0.jar opentracing-util/0.33.0//opentracing-util-0.33.0.jar -orc-core/1.7.6//orc-core-1.7.6.jar -orc-mapreduce/1.7.6//orc-mapreduce-1.7.6.jar -orc-shims/1.7.6//orc-shims-1.7.6.jar +orc-core/1.7.7//orc-core-1.7.7.jar +orc-mapreduce/1.7.7//orc-mapreduce-1.7.7.jar +orc-shims/1.7.7//orc-shims-1.7.7.jar oro/2.0.8//oro-2.0.8.jar osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar paranamer/2.8//paranamer-2.8.jar diff --git a/pom.xml b/pom.xml index 34043d43758..6eae980212b 100644 --- a/pom.xml +++ b/pom.xml @@ -132,7 +132,7 @@ 10.14.2.0 1.12.2 -1.7.6 +1.7.7 9.4.48.v20220622 4.0.3 0.10.0 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41199][SS] Fix metrics issue when DSv1 streaming source and DSv2 streaming source are co-used
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 25849684b78 [SPARK-41199][SS] Fix metrics issue when DSv1 streaming source and DSv2 streaming source are co-used 25849684b78 is described below commit 25849684b78cca6651e25d6efc9644a576e7e20f Author: Jungtaek Lim AuthorDate: Sat Nov 19 22:42:26 2022 +0900 [SPARK-41199][SS] Fix metrics issue when DSv1 streaming source and DSv2 streaming source are co-used ### What changes were proposed in this pull request? This PR proposes to fix the metrics issue for streaming query when DSv1 streaming source and DSv2 streaming source are co-used. If the streaming query has both DSv1 streaming source and DSv2 streaming source, only DSv1 streaming source produced correct metrics. There is a bug in ProgressReporter that it tries to match logical node for DSv2 streaming source with OffsetHolder (association map has OffsetHolder instances for DSv2 streaming sources), which will be never matched. Given that physical node for DSv2 streaming source contains both source information and metrics, we can simply deduce all the necessary information from the physical node rather than trying to find the source from association map. ### Why are the changes needed? The logic of collecting metrics does not collect metrics for DSv2 streaming sources properly. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New test case. Closes #38719 from HeartSaVioR/SPARK-41999. Authored-by: Jungtaek Lim Signed-off-by: Jungtaek Lim --- .../sql/execution/streaming/ProgressReporter.scala | 9 +++- .../spark/sql/streaming/StreamingQuerySuite.scala | 56 +- 2 files changed, 63 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 8a89ca7b85d..a4c975861c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -345,7 +345,14 @@ trait ProgressReporter extends Logging { val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves() if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) { val execLeafToSource = allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap { - case (lp, ep) => logicalPlanLeafToSource.get(lp).map { source => ep -> source } + case (_, ep: MicroBatchScanExec) => +// SPARK-41199: `logicalPlanLeafToSource` contains OffsetHolder instance for DSv2 +// streaming source, hence we cannot lookup the actual source from the map. +// The physical node for DSv2 streaming source contains the information of the source +// by itself, so leverage it. +Some(ep -> ep.stream) + case (lp, ep) => +logicalPlanLeafToSource.get(lp).map { source => ep -> source } } val sourceToInputRowsTuples = execLeafToSource.map { case (execLeaf, source) => val numRows = execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 090a2081219..71eb4c15701 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -36,7 +36,7 @@ import org.scalatestplus.mockito.MockitoSugar import org.apache.spark.{SparkException, TestUtils} import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row} +import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, Row, SaveMode} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, Shuffle, Uuid} import org.apache.spark.sql.catalyst.plans.logical.LocalRelation @@ -657,6 +657,60 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi ) } + test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 streaming sources") { +withTable("parquet_streaming_tbl") { + val streamInput = MemoryStream[Int] + val streamDf = streamInput.toDF().selectExpr("value AS key", "value AS value_stream") + + spark.sql( +""" + |CREATE TABLE parquet_streaming_tbl + |( + | key integer, + | value
[spark] branch master updated: [SPARK-41172][SQL] Migrate the ambiguous ref error to an error class
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 62f8ce40ddb [SPARK-41172][SQL] Migrate the ambiguous ref error to an error class 62f8ce40ddb is described below commit 62f8ce40ddbf76ce86fd5e51cc73c67d66e12f48 Author: panbingkun AuthorDate: Sat Nov 19 20:31:38 2022 +0300 [SPARK-41172][SQL] Migrate the ambiguous ref error to an error class ### What changes were proposed in this pull request? The pr aims to migrate the ambiguous ref error to an error class. ### Why are the changes needed? The changes improve the error framework. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. Closes #38721 from panbingkun/SPARK-41172. Authored-by: panbingkun Signed-off-by: Max Gekk --- core/src/main/resources/error/error-classes.json | 5 + .../spark/sql/catalyst/expressions/package.scala | 5 +- .../spark/sql/errors/QueryCompilationErrors.scala | 9 ++ .../sql/catalyst/analysis/AnalysisSuite.scala | 5 +- .../catalyst/analysis/ResolveSubquerySuite.scala | 4 +- .../expressions/AttributeResolutionSuite.scala | 30 +++-- .../results/columnresolution-negative.sql.out | 135 +++-- .../sql-tests/results/postgreSQL/join.sql.out | 30 - .../results/postgreSQL/select_implicit.sql.out | 45 ++- .../results/udf/postgreSQL/udf-join.sql.out| 30 - .../udf/postgreSQL/udf-select_implicit.sql.out | 45 ++- .../spark/sql/DataFrameNaFunctionsSuite.scala | 42 +-- .../org/apache/spark/sql/DataFrameStatSuite.scala | 52 ++-- .../execution/command/PlanResolutionSuite.scala| 22 ++-- .../execution/datasources/orc/OrcFilterSuite.scala | 20 ++- 15 files changed, 406 insertions(+), 73 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index fe340c517a2..4da9d2f9fbc 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -5,6 +5,11 @@ ], "sqlState" : "42000" }, + "AMBIGUOUS_REFERENCE" : { +"message" : [ + "Reference is ambiguous, could be: ." +] + }, "ARITHMETIC_OVERFLOW" : { "message" : [ ". If necessary set to \"false\" to bypass this error." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala index 7913f396120..ededac3d917 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala @@ -21,9 +21,9 @@ import java.util.Locale import com.google.common.collect.Maps -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute} import org.apache.spark.sql.catalyst.util.MetadataColumnHelper +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.types.{StructField, StructType} /** @@ -368,8 +368,7 @@ package object expressions { case ambiguousReferences => // More than one match. - val referenceNames = ambiguousReferences.map(_.qualifiedName).mkString(", ") - throw new AnalysisException(s"Reference '$name' is ambiguous, could be: $referenceNames.") + throw QueryCompilationErrors.ambiguousReferenceError(name, ambiguousReferences) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 22b4cfdb3c6..cbdbb6adc11 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -1834,6 +1834,15 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { "n" -> numMatches.toString)) } + def ambiguousReferenceError(name: String, ambiguousReferences: Seq[Attribute]): Throwable = { +new AnalysisException( + errorClass = "AMBIGUOUS_REFERENCE", + messageParameters = Map( +"name" -> toSQLId(name), +"referenceNames" -> + ambiguousReferences.map(ar => toSQLId(ar.qualifiedName)).sorted.mkString("[", ", ", "]"))) + } + def cannotUseIntervalTypeInTableSchemaError(): Throwable = { new AnalysisException( errorClass = "_LEGACY_ERROR_TEMP_1183", diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scal