[spark] branch master updated: [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes

2022-11-19 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 82a41d8ca27 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to 
be spark.task.cpus by default for spark executor JVM processes
82a41d8ca27 is described below

commit 82a41d8ca273e7a9268324c6958f8bb14d9e
Author: Weichen Xu 
AuthorDate: Sat Nov 19 17:23:20 2022 +0800

[SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be 
spark.task.cpus by default for spark executor JVM processes

Signed-off-by: Weichen Xu 

### What changes were proposed in this pull request?

Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark 
executor JVM processes.

### Why are the changes needed?

This is for limiting the thread number for OpenBLAS routine to the number 
of cores assigned to this executor because some spark ML algorithms calls 
OpenBlAS via netlib-java,
e.g.:
Spark ALS estimator training calls LAPACK API `dppsv` (internally it will 
call BLAS lib), if it calls OpenBLAS lib, by default OpenBLAS will try to use 
all CPU cores. But spark will launch multiple spark tasks on a spark worker, 
and each spark task might call `dppsv` API at the same time, and each call 
internally it will create multiple threads (threads number equals to CPU 
cores), this causes CPU oversubscription.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually.

Closes #38699 from WeichenXu123/SPARK-41188.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
---
 core/src/main/scala/org/apache/spark/SparkContext.scala| 10 ++
 .../main/scala/org/apache/spark/api/python/PythonRunner.scala  |  7 ---
 .../scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala | 10 ++
 3 files changed, 16 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d2c7067e596..5cbf2e83371 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -541,6 +541,16 @@ class SparkContext(config: SparkConf) extends Logging {
 executorEnvs ++= _conf.getExecutorEnv
 executorEnvs("SPARK_USER") = sparkUser
 
+if (_conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) {
+  // if OMP_NUM_THREADS is not explicitly set, override it with the value 
of "spark.task.cpus"
+  // SPARK-41188: limit the thread number for OpenBLAS routine to the 
number of cores assigned
+  // to this executor because some spark ML algorithms calls OpenBlAS via 
netlib-java
+  // SPARK-28843: limit the OpenMP thread pool to the number of cores 
assigned to this executor
+  // this avoids high memory consumption with pandas/numpy because of a 
large OpenMP thread pool
+  // see https://github.com/numpy/numpy/issues/10455
+  executorEnvs.put("OMP_NUM_THREADS", _conf.get("spark.task.cpus", "1"))
+}
+
 _shuffleDriverComponents = 
ShuffleDataIOUtils.loadShuffleDataIO(config).driver()
 _shuffleDriverComponents.initializeApplication().asScala.foreach { case 
(k, v) =>
   _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 14d5df14ed8..cdb2c620656 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -135,13 +135,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
 val execCoresProp = 
Option(context.getLocalProperty(EXECUTOR_CORES_LOCAL_PROPERTY))
 val memoryMb = 
Option(context.getLocalProperty(PYSPARK_MEMORY_LOCAL_PROPERTY)).map(_.toLong)
 val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
-// if OMP_NUM_THREADS is not explicitly set, override it with the number 
of cores
-if (conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) {
-  // SPARK-28843: limit the OpenMP thread pool to the number of cores 
assigned to this executor
-  // this avoids high memory consumption with pandas/numpy because of a 
large OpenMP thread pool
-  // see https://github.com/numpy/numpy/issues/10455
-  execCoresProp.foreach(envVars.put("OMP_NUM_THREADS", _))
-}
 envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor 
thread
 if (reuseWorker) {
   envVars.put("SPARK_REUSE_WORKER", "1")
diff --git 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
 
b/resource-managers/mesos/src/test/scala/or

[spark] branch branch-3.3 updated: [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes

2022-11-19 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new f431cdf0944 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to 
be spark.task.cpus by default for spark executor JVM processes
f431cdf0944 is described below

commit f431cdf09442b86133d17fa6e56cb8b77ca4e486
Author: Weichen Xu 
AuthorDate: Sat Nov 19 17:23:20 2022 +0800

[SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be 
spark.task.cpus by default for spark executor JVM processes

Signed-off-by: Weichen Xu 

### What changes were proposed in this pull request?

Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark 
executor JVM processes.

### Why are the changes needed?

This is for limiting the thread number for OpenBLAS routine to the number 
of cores assigned to this executor because some spark ML algorithms calls 
OpenBlAS via netlib-java,
e.g.:
Spark ALS estimator training calls LAPACK API `dppsv` (internally it will 
call BLAS lib), if it calls OpenBLAS lib, by default OpenBLAS will try to use 
all CPU cores. But spark will launch multiple spark tasks on a spark worker, 
and each spark task might call `dppsv` API at the same time, and each call 
internally it will create multiple threads (threads number equals to CPU 
cores), this causes CPU oversubscription.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually.

Closes #38699 from WeichenXu123/SPARK-41188.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
(cherry picked from commit 82a41d8ca273e7a9268324c6958f8bb14d9e)
Signed-off-by: Weichen Xu 
---
 core/src/main/scala/org/apache/spark/SparkContext.scala| 10 ++
 .../main/scala/org/apache/spark/api/python/PythonRunner.scala  |  7 ---
 .../scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala | 10 ++
 3 files changed, 16 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 02c58d2a9b4..0d0d4fe83a4 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -546,6 +546,16 @@ class SparkContext(config: SparkConf) extends Logging {
 executorEnvs ++= _conf.getExecutorEnv
 executorEnvs("SPARK_USER") = sparkUser
 
+if (_conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) {
+  // if OMP_NUM_THREADS is not explicitly set, override it with the value 
of "spark.task.cpus"
+  // SPARK-41188: limit the thread number for OpenBLAS routine to the 
number of cores assigned
+  // to this executor because some spark ML algorithms calls OpenBlAS via 
netlib-java
+  // SPARK-28843: limit the OpenMP thread pool to the number of cores 
assigned to this executor
+  // this avoids high memory consumption with pandas/numpy because of a 
large OpenMP thread pool
+  // see https://github.com/numpy/numpy/issues/10455
+  executorEnvs.put("OMP_NUM_THREADS", _conf.get("spark.task.cpus", "1"))
+}
+
 _shuffleDriverComponents = 
ShuffleDataIOUtils.loadShuffleDataIO(config).driver()
 _shuffleDriverComponents.initializeApplication().asScala.foreach { case 
(k, v) =>
   _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index f32c80f3ef5..bf5b862438a 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -133,13 +133,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
 val execCoresProp = 
Option(context.getLocalProperty(EXECUTOR_CORES_LOCAL_PROPERTY))
 val memoryMb = 
Option(context.getLocalProperty(PYSPARK_MEMORY_LOCAL_PROPERTY)).map(_.toLong)
 val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
-// if OMP_NUM_THREADS is not explicitly set, override it with the number 
of cores
-if (conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) {
-  // SPARK-28843: limit the OpenMP thread pool to the number of cores 
assigned to this executor
-  // this avoids high memory consumption with pandas/numpy because of a 
large OpenMP thread pool
-  // see https://github.com/numpy/numpy/issues/10455
-  execCoresProp.foreach(envVars.put("OMP_NUM_THREADS", _))
-}
 envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor 
thread
 if (reuseWorker) {
   envVars.put("SPARK_REUSE_WORKER", "1")
diff --git 
a/resource-managers/mesos/s

[spark] branch branch-3.2 updated: [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes

2022-11-19 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
 new 3b9cca7aa32 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to 
be spark.task.cpus by default for spark executor JVM processes
3b9cca7aa32 is described below

commit 3b9cca7aa32f659ea7413abeb373fe7ed069e6f7
Author: Weichen Xu 
AuthorDate: Sat Nov 19 17:23:20 2022 +0800

[SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be 
spark.task.cpus by default for spark executor JVM processes

Signed-off-by: Weichen Xu 

### What changes were proposed in this pull request?

Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark 
executor JVM processes.

### Why are the changes needed?

This is for limiting the thread number for OpenBLAS routine to the number 
of cores assigned to this executor because some spark ML algorithms calls 
OpenBlAS via netlib-java,
e.g.:
Spark ALS estimator training calls LAPACK API `dppsv` (internally it will 
call BLAS lib), if it calls OpenBLAS lib, by default OpenBLAS will try to use 
all CPU cores. But spark will launch multiple spark tasks on a spark worker, 
and each spark task might call `dppsv` API at the same time, and each call 
internally it will create multiple threads (threads number equals to CPU 
cores), this causes CPU oversubscription.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually.

Closes #38699 from WeichenXu123/SPARK-41188.

Authored-by: Weichen Xu 
Signed-off-by: Weichen Xu 
(cherry picked from commit 82a41d8ca273e7a9268324c6958f8bb14d9e)
Signed-off-by: Weichen Xu 
---
 core/src/main/scala/org/apache/spark/SparkContext.scala| 10 ++
 .../main/scala/org/apache/spark/api/python/PythonRunner.scala  |  7 ---
 .../scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala | 10 ++
 3 files changed, 16 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f7d8c799029..f991d2ea09c 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -542,6 +542,16 @@ class SparkContext(config: SparkConf) extends Logging {
 executorEnvs ++= _conf.getExecutorEnv
 executorEnvs("SPARK_USER") = sparkUser
 
+if (_conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) {
+  // if OMP_NUM_THREADS is not explicitly set, override it with the value 
of "spark.task.cpus"
+  // SPARK-41188: limit the thread number for OpenBLAS routine to the 
number of cores assigned
+  // to this executor because some spark ML algorithms calls OpenBlAS via 
netlib-java
+  // SPARK-28843: limit the OpenMP thread pool to the number of cores 
assigned to this executor
+  // this avoids high memory consumption with pandas/numpy because of a 
large OpenMP thread pool
+  // see https://github.com/numpy/numpy/issues/10455
+  executorEnvs.put("OMP_NUM_THREADS", _conf.get("spark.task.cpus", "1"))
+}
+
 _shuffleDriverComponents = 
ShuffleDataIOUtils.loadShuffleDataIO(config).driver()
 _shuffleDriverComponents.initializeApplication().asScala.foreach { case 
(k, v) =>
   _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 3a3e7e04e7f..d854874c0e8 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -131,13 +131,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
 val execCoresProp = 
Option(context.getLocalProperty(EXECUTOR_CORES_LOCAL_PROPERTY))
 val memoryMb = 
Option(context.getLocalProperty(PYSPARK_MEMORY_LOCAL_PROPERTY)).map(_.toLong)
 val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
-// if OMP_NUM_THREADS is not explicitly set, override it with the number 
of cores
-if (conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) {
-  // SPARK-28843: limit the OpenMP thread pool to the number of cores 
assigned to this executor
-  // this avoids high memory consumption with pandas/numpy because of a 
large OpenMP thread pool
-  // see https://github.com/numpy/numpy/issues/10455
-  execCoresProp.foreach(envVars.put("OMP_NUM_THREADS", _))
-}
 envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor 
thread
 if (reuseWorker) {
   envVars.put("SPARK_REUSE_WORKER", "1")
diff --git 
a/resource-managers/mesos/s

[spark] branch branch-3.3 updated: [SPARK-41202][BUILD][3.3] Update ORC to 1.7.7

2022-11-19 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 0152c2e91e7 [SPARK-41202][BUILD][3.3] Update ORC to 1.7.7
0152c2e91e7 is described below

commit 0152c2e91e7d2a3197886e41b493613c7e2c258d
Author: William Hyun 
AuthorDate: Sat Nov 19 04:17:20 2022 -0800

[SPARK-41202][BUILD][3.3] Update ORC to 1.7.7

### What changes were proposed in this pull request?
This PR aims to update ORC to 1.7.7.

### Why are the changes needed?
This will bring the latest bug fixes.
- https://orc.apache.org/news/2022/11/17/ORC-1.7.7/

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass the CIs.

Closes #38724 from williamhyun/SPARK-41202.

Authored-by: William Hyun 
Signed-off-by: Dongjoon Hyun 
---
 dev/deps/spark-deps-hadoop-2-hive-2.3 | 6 +++---
 dev/deps/spark-deps-hadoop-3-hive-2.3 | 6 +++---
 pom.xml   | 2 +-
 3 files changed, 7 insertions(+), 7 deletions(-)

diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 
b/dev/deps/spark-deps-hadoop-2-hive-2.3
index 6bcd447dc64..6a16cb2ec03 100644
--- a/dev/deps/spark-deps-hadoop-2-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-2-hive-2.3
@@ -219,9 +219,9 @@ objenesis/3.2//objenesis-3.2.jar
 okhttp/3.12.12//okhttp-3.12.12.jar
 okio/1.14.0//okio-1.14.0.jar
 opencsv/2.3//opencsv-2.3.jar
-orc-core/1.7.6//orc-core-1.7.6.jar
-orc-mapreduce/1.7.6//orc-mapreduce-1.7.6.jar
-orc-shims/1.7.6//orc-shims-1.7.6.jar
+orc-core/1.7.7//orc-core-1.7.7.jar
+orc-mapreduce/1.7.7//orc-mapreduce-1.7.7.jar
+orc-shims/1.7.7//orc-shims-1.7.7.jar
 oro/2.0.8//oro-2.0.8.jar
 osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar
 paranamer/2.8//paranamer-2.8.jar
diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 
b/dev/deps/spark-deps-hadoop-3-hive-2.3
index 7429ecab6b9..9738cabd217 100644
--- a/dev/deps/spark-deps-hadoop-3-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-3-hive-2.3
@@ -208,9 +208,9 @@ opencsv/2.3//opencsv-2.3.jar
 opentracing-api/0.33.0//opentracing-api-0.33.0.jar
 opentracing-noop/0.33.0//opentracing-noop-0.33.0.jar
 opentracing-util/0.33.0//opentracing-util-0.33.0.jar
-orc-core/1.7.6//orc-core-1.7.6.jar
-orc-mapreduce/1.7.6//orc-mapreduce-1.7.6.jar
-orc-shims/1.7.6//orc-shims-1.7.6.jar
+orc-core/1.7.7//orc-core-1.7.7.jar
+orc-mapreduce/1.7.7//orc-mapreduce-1.7.7.jar
+orc-shims/1.7.7//orc-shims-1.7.7.jar
 oro/2.0.8//oro-2.0.8.jar
 osgi-resource-locator/1.0.3//osgi-resource-locator-1.0.3.jar
 paranamer/2.8//paranamer-2.8.jar
diff --git a/pom.xml b/pom.xml
index 34043d43758..6eae980212b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -132,7 +132,7 @@
 
 10.14.2.0
 1.12.2
-1.7.6
+1.7.7
 9.4.48.v20220622
 4.0.3
 0.10.0


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41199][SS] Fix metrics issue when DSv1 streaming source and DSv2 streaming source are co-used

2022-11-19 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 25849684b78 [SPARK-41199][SS] Fix metrics issue when DSv1 streaming 
source and DSv2 streaming source are co-used
25849684b78 is described below

commit 25849684b78cca6651e25d6efc9644a576e7e20f
Author: Jungtaek Lim 
AuthorDate: Sat Nov 19 22:42:26 2022 +0900

[SPARK-41199][SS] Fix metrics issue when DSv1 streaming source and DSv2 
streaming source are co-used

### What changes were proposed in this pull request?

This PR proposes to fix the metrics issue for streaming query when DSv1 
streaming source and DSv2 streaming source are co-used. If the streaming query 
has both DSv1 streaming source and DSv2 streaming source, only DSv1 streaming 
source produced correct metrics.

There is a bug in ProgressReporter that it tries to match logical node for 
DSv2 streaming source with OffsetHolder (association map has OffsetHolder 
instances for DSv2 streaming sources), which will be never matched. Given that 
physical node for DSv2 streaming source contains both source information and 
metrics, we can simply deduce all the necessary information from the physical 
node rather than trying to find the source from association map.

### Why are the changes needed?

The logic of collecting metrics does not collect metrics for DSv2 streaming 
sources properly.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test case.

Closes #38719 from HeartSaVioR/SPARK-41999.

Authored-by: Jungtaek Lim 
Signed-off-by: Jungtaek Lim 
---
 .../sql/execution/streaming/ProgressReporter.scala |  9 +++-
 .../spark/sql/streaming/StreamingQuerySuite.scala  | 56 +-
 2 files changed, 63 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 8a89ca7b85d..a4c975861c5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -345,7 +345,14 @@ trait ProgressReporter extends Logging {
   val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
   if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) {
 val execLeafToSource = 
allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap {
-  case (lp, ep) => logicalPlanLeafToSource.get(lp).map { source => ep 
-> source }
+  case (_, ep: MicroBatchScanExec) =>
+// SPARK-41199: `logicalPlanLeafToSource` contains OffsetHolder 
instance for DSv2
+// streaming source, hence we cannot lookup the actual source from 
the map.
+// The physical node for DSv2 streaming source contains the 
information of the source
+// by itself, so leverage it.
+Some(ep -> ep.stream)
+  case (lp, ep) =>
+logicalPlanLeafToSource.get(lp).map { source => ep -> source }
 }
 val sourceToInputRowsTuples = execLeafToSource.map { case (execLeaf, 
source) =>
   val numRows = 
execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 090a2081219..71eb4c15701 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -36,7 +36,7 @@ import org.scalatestplus.mockito.MockitoSugar
 
 import org.apache.spark.{SparkException, TestUtils}
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, 
Row}
+import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Dataset, 
Row, SaveMode}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Literal, Rand, Randn, 
Shuffle, Uuid}
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
@@ -657,6 +657,60 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 )
   }
 
+  test("SPARK-41199: input row calculation with mixed-up of DSv1 and DSv2 
streaming sources") {
+withTable("parquet_streaming_tbl") {
+  val streamInput = MemoryStream[Int]
+  val streamDf = streamInput.toDF().selectExpr("value AS key", "value AS 
value_stream")
+
+  spark.sql(
+"""
+  |CREATE TABLE parquet_streaming_tbl
+  |(
+  |  key integer,
+  |  value

[spark] branch master updated: [SPARK-41172][SQL] Migrate the ambiguous ref error to an error class

2022-11-19 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 62f8ce40ddb [SPARK-41172][SQL] Migrate the ambiguous ref error to an 
error class
62f8ce40ddb is described below

commit 62f8ce40ddbf76ce86fd5e51cc73c67d66e12f48
Author: panbingkun 
AuthorDate: Sat Nov 19 20:31:38 2022 +0300

[SPARK-41172][SQL] Migrate the ambiguous ref error to an error class

### What changes were proposed in this pull request?
The pr aims to migrate the ambiguous ref error to an error class.

### Why are the changes needed?
The changes improve the error framework.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.

Closes #38721 from panbingkun/SPARK-41172.

Authored-by: panbingkun 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json   |   5 +
 .../spark/sql/catalyst/expressions/package.scala   |   5 +-
 .../spark/sql/errors/QueryCompilationErrors.scala  |   9 ++
 .../sql/catalyst/analysis/AnalysisSuite.scala  |   5 +-
 .../catalyst/analysis/ResolveSubquerySuite.scala   |   4 +-
 .../expressions/AttributeResolutionSuite.scala |  30 +++--
 .../results/columnresolution-negative.sql.out  | 135 +++--
 .../sql-tests/results/postgreSQL/join.sql.out  |  30 -
 .../results/postgreSQL/select_implicit.sql.out |  45 ++-
 .../results/udf/postgreSQL/udf-join.sql.out|  30 -
 .../udf/postgreSQL/udf-select_implicit.sql.out |  45 ++-
 .../spark/sql/DataFrameNaFunctionsSuite.scala  |  42 +--
 .../org/apache/spark/sql/DataFrameStatSuite.scala  |  52 ++--
 .../execution/command/PlanResolutionSuite.scala|  22 ++--
 .../execution/datasources/orc/OrcFilterSuite.scala |  20 ++-
 15 files changed, 406 insertions(+), 73 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index fe340c517a2..4da9d2f9fbc 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -5,6 +5,11 @@
 ],
 "sqlState" : "42000"
   },
+  "AMBIGUOUS_REFERENCE" : {
+"message" : [
+  "Reference  is ambiguous, could be: ."
+]
+  },
   "ARITHMETIC_OVERFLOW" : {
 "message" : [
   ". If necessary set  to \"false\" to 
bypass this error."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
index 7913f396120..ededac3d917 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
@@ -21,9 +21,9 @@ import java.util.Locale
 
 import com.google.common.collect.Maps
 
-import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.util.MetadataColumnHelper
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.types.{StructField, StructType}
 
 /**
@@ -368,8 +368,7 @@ package object expressions  {
 
 case ambiguousReferences =>
   // More than one match.
-  val referenceNames = 
ambiguousReferences.map(_.qualifiedName).mkString(", ")
-  throw new AnalysisException(s"Reference '$name' is ambiguous, could 
be: $referenceNames.")
+  throw QueryCompilationErrors.ambiguousReferenceError(name, 
ambiguousReferences)
   }
 }
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 22b4cfdb3c6..cbdbb6adc11 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -1834,6 +1834,15 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
 "n" -> numMatches.toString))
   }
 
+  def ambiguousReferenceError(name: String, ambiguousReferences: 
Seq[Attribute]): Throwable = {
+new AnalysisException(
+  errorClass = "AMBIGUOUS_REFERENCE",
+  messageParameters = Map(
+"name" -> toSQLId(name),
+"referenceNames" ->
+  ambiguousReferences.map(ar => 
toSQLId(ar.qualifiedName)).sorted.mkString("[", ", ", "]")))
+  }
+
   def cannotUseIntervalTypeInTableSchemaError(): Throwable = {
 new AnalysisException(
   errorClass = "_LEGACY_ERROR_TEMP_1183",
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scal