[spark] branch branch-3.3 updated: [SPARK-39218][SS][PYTHON] Make foreachBatch streaming query stop gracefully

2022-05-19 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new e8e330fbbca [SPARK-39218][SS][PYTHON] Make foreachBatch streaming 
query stop gracefully
e8e330fbbca is described below

commit e8e330fbbca5452e9af0a78e5f2cfae0cc6be134
Author: Hyukjin Kwon 
AuthorDate: Fri May 20 13:02:17 2022 +0900

[SPARK-39218][SS][PYTHON] Make foreachBatch streaming query stop gracefully

### What changes were proposed in this pull request?

This PR proposes to make the `foreachBatch` streaming query stop gracefully 
by handling the interrupted exceptions at 
`StreamExecution.isInterruptionException`.

Because there is no straightforward way to access to the original JVM 
exception, here we rely on string pattern match for now (see also "Why are the 
changes needed?" below). There is only one place from Py4J 
https://github.com/py4j/py4j/blob/master/py4j-python/src/py4j/protocol.py#L326-L328
 so the approach would work at least.

### Why are the changes needed?

In `foreachBatch`,  the Python user-defined function in the microbatch runs 
till the end even when `StreamingQuery.stop` is invoked. However, when any Py4J 
access is attempted within the user-defined function:

- With the pinned thread mode disabled, the interrupt exception is not 
blocked, and the Python function is executed till the end in a different thread.
- With the pinned thread mode enabled, the interrupt exception is raised in 
the same thread, and the Python thread raises a Py4J exception in the same 
thread.

The latter case is a problem because the interrupt exception is first 
thrown from JVM side (`java.lang. InterruptedException`) -> Python callback 
server (`py4j.protocol.Py4JJavaError`) -> JVM (`py4j.Py4JException`), and 
`py4j.Py4JException` is not listed in `StreamExecution.isInterruptionException` 
which doesn't gracefully stop the query.

Therefore, we should handle this exception at 
`StreamExecution.isInterruptionException`.

### Does this PR introduce _any_ user-facing change?

Yes, it will make the query gracefully stop.

### How was this patch tested?

Manually tested with:

```python
import time

def func(batch_df, batch_id):
time.sleep(10)
print(batch_df.count())

q = 
spark.readStream.format("rate").load().writeStream.foreachBatch(func).start()
time.sleep(5)
q.stop()
```

Closes #36589 from HyukjinKwon/SPARK-39218.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 499de87b77944157828a6d905d9b9df37b7c9a67)
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/tests/test_streaming.py | 10 ++
 .../spark/sql/execution/streaming/StreamExecution.scala| 11 +++
 .../scala/org/apache/spark/sql/streaming/StreamSuite.scala | 14 --
 3 files changed, 33 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/sql/tests/test_streaming.py 
b/python/pyspark/sql/tests/test_streaming.py
index 4920423be22..809294d34c3 100644
--- a/python/pyspark/sql/tests/test_streaming.py
+++ b/python/pyspark/sql/tests/test_streaming.py
@@ -592,6 +592,16 @@ class StreamingTests(ReusedSQLTestCase):
 if q:
 q.stop()
 
+def test_streaming_foreachBatch_graceful_stop(self):
+# SPARK-39218: Make foreachBatch streaming query stop gracefully
+def func(batch_df, _):
+batch_df.sparkSession._jvm.java.lang.Thread.sleep(1)
+
+q = 
self.spark.readStream.format("rate").load().writeStream.foreachBatch(func).start()
+time.sleep(3)  # 'rowsPerSecond' defaults to 1. Waits 3 secs out for 
the input.
+q.stop()
+self.assertIsNone(q.exception(), "No exception has to be propagated.")
+
 def test_streaming_read_from_table(self):
 with self.table("input_table", "this_query"):
 self.spark.sql("CREATE TABLE input_table (value string) USING 
parquet")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index f9ae65cdc47..c7ce9f52e06 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -618,6 +618,13 @@ abstract class StreamExecution(
 object StreamExecution {
   val QUERY_ID_KEY = "sql.streaming.queryId"
   val IS_CONTINUOUS_PROCESSING = "__is_continuous_processing"
+  val IO_EXCEPTION_NAMES = Seq(
+classOf[InterruptedException].getName,
+classOf[InterruptedIOException].getName,
+classOf[ClosedByInterruptException].getName)
+  

[spark] branch master updated: [SPARK-39218][SS][PYTHON] Make foreachBatch streaming query stop gracefully

2022-05-19 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 499de87b779 [SPARK-39218][SS][PYTHON] Make foreachBatch streaming 
query stop gracefully
499de87b779 is described below

commit 499de87b77944157828a6d905d9b9df37b7c9a67
Author: Hyukjin Kwon 
AuthorDate: Fri May 20 13:02:17 2022 +0900

[SPARK-39218][SS][PYTHON] Make foreachBatch streaming query stop gracefully

### What changes were proposed in this pull request?

This PR proposes to make the `foreachBatch` streaming query stop gracefully 
by handling the interrupted exceptions at 
`StreamExecution.isInterruptionException`.

Because there is no straightforward way to access to the original JVM 
exception, here we rely on string pattern match for now (see also "Why are the 
changes needed?" below). There is only one place from Py4J 
https://github.com/py4j/py4j/blob/master/py4j-python/src/py4j/protocol.py#L326-L328
 so the approach would work at least.

### Why are the changes needed?

In `foreachBatch`,  the Python user-defined function in the microbatch runs 
till the end even when `StreamingQuery.stop` is invoked. However, when any Py4J 
access is attempted within the user-defined function:

- With the pinned thread mode disabled, the interrupt exception is not 
blocked, and the Python function is executed till the end in a different thread.
- With the pinned thread mode enabled, the interrupt exception is raised in 
the same thread, and the Python thread raises a Py4J exception in the same 
thread.

The latter case is a problem because the interrupt exception is first 
thrown from JVM side (`java.lang. InterruptedException`) -> Python callback 
server (`py4j.protocol.Py4JJavaError`) -> JVM (`py4j.Py4JException`), and 
`py4j.Py4JException` is not listed in `StreamExecution.isInterruptionException` 
which doesn't gracefully stop the query.

Therefore, we should handle this exception at 
`StreamExecution.isInterruptionException`.

### Does this PR introduce _any_ user-facing change?

Yes, it will make the query gracefully stop.

### How was this patch tested?

Manually tested with:

```python
import time

def func(batch_df, batch_id):
time.sleep(10)
print(batch_df.count())

q = 
spark.readStream.format("rate").load().writeStream.foreachBatch(func).start()
time.sleep(5)
q.stop()
```

Closes #36589 from HyukjinKwon/SPARK-39218.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/tests/test_streaming.py | 10 ++
 .../spark/sql/execution/streaming/StreamExecution.scala| 11 +++
 .../scala/org/apache/spark/sql/streaming/StreamSuite.scala | 14 --
 3 files changed, 33 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/sql/tests/test_streaming.py 
b/python/pyspark/sql/tests/test_streaming.py
index 4920423be22..809294d34c3 100644
--- a/python/pyspark/sql/tests/test_streaming.py
+++ b/python/pyspark/sql/tests/test_streaming.py
@@ -592,6 +592,16 @@ class StreamingTests(ReusedSQLTestCase):
 if q:
 q.stop()
 
+def test_streaming_foreachBatch_graceful_stop(self):
+# SPARK-39218: Make foreachBatch streaming query stop gracefully
+def func(batch_df, _):
+batch_df.sparkSession._jvm.java.lang.Thread.sleep(1)
+
+q = 
self.spark.readStream.format("rate").load().writeStream.foreachBatch(func).start()
+time.sleep(3)  # 'rowsPerSecond' defaults to 1. Waits 3 secs out for 
the input.
+q.stop()
+self.assertIsNone(q.exception(), "No exception has to be propagated.")
+
 def test_streaming_read_from_table(self):
 with self.table("input_table", "this_query"):
 self.spark.sql("CREATE TABLE input_table (value string) USING 
parquet")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 324a833f178..5d06afbbf61 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -618,6 +618,13 @@ abstract class StreamExecution(
 object StreamExecution {
   val QUERY_ID_KEY = "sql.streaming.queryId"
   val IS_CONTINUOUS_PROCESSING = "__is_continuous_processing"
+  val IO_EXCEPTION_NAMES = Seq(
+classOf[InterruptedException].getName,
+classOf[InterruptedIOException].getName,
+classOf[ClosedByInterruptException].getName)
+  val PROXY_ERROR = (
+"py4j.protocol.Py4JJavaError: An error occurred while calling" +
+

[spark] branch master updated: [SPARK-39221][SQL] Make sensitive information be redacted correctly for thrift server job/stage tab

2022-05-19 Thread yao
This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5ee6f727441 [SPARK-39221][SQL] Make sensitive information be redacted 
correctly for thrift server job/stage tab
5ee6f727441 is described below

commit 5ee6f72744143cc5e19aa058df209f7156e51cee
Author: Kent Yao 
AuthorDate: Fri May 20 10:10:07 2022 +0800

[SPARK-39221][SQL] Make sensitive information be redacted correctly for 
thrift server job/stage tab

### What changes were proposed in this pull request?

set redacted statement in job desc correctly

 http://localhost:4040/sqlserver/
https://user-images.githubusercontent.com/8326978/168989638-d88ec124-fda7-4642-9c11-bad11c52fcea.png;>

 http://localhost:4040/jobs/
https://user-images.githubusercontent.com/8326978/168989800-4b67c3cf-c4f7-482a-9479-47f8705d6e9f.png;>

### Why are the changes needed?

bugfix

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

verified locally

Closes #36592 from yaooqinn/SPARK-39221.

Authored-by: Kent Yao 
Signed-off-by: Kent Yao 
---
 .../hive/thriftserver/SparkExecuteStatementOperation.scala | 14 --
 1 file changed, 8 insertions(+), 6 deletions(-)

diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 2c77e00c46c..090d741d9ee 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -62,8 +62,11 @@ private[hive] class SparkExecuteStatementOperation(
 
   private val forceCancel = 
sqlContext.conf.getConf(SQLConf.THRIFTSERVER_FORCE_CANCEL)
 
-  private val substitutorStatement = SQLConf.withExistingConf(sqlContext.conf) 
{
-new VariableSubstitution().substitute(statement)
+  private val redactedStatement = {
+val substitutorStatement = SQLConf.withExistingConf(sqlContext.conf) {
+  new VariableSubstitution().substitute(statement)
+}
+SparkUtils.redact(sqlContext.conf.stringRedactionPattern, 
substitutorStatement)
   }
 
   private var result: DataFrame = _
@@ -76,14 +79,14 @@ private[hive] class SparkExecuteStatementOperation(
   val sparkType = new StructType().add("Result", "string")
   SparkExecuteStatementOperation.toTTableSchema(sparkType)
 } else {
-  logInfo(s"Result Schema: ${result.schema}")
+  logInfo(s"Result Schema: ${result.schema.sql}")
   SparkExecuteStatementOperation.toTTableSchema(result.schema)
 }
   }
 
   def getNextRowSet(order: FetchOrientation, maxRowsL: Long): TRowSet = 
withLocalProperties {
 try {
-  sqlContext.sparkContext.setJobGroup(statementId, substitutorStatement, 
forceCancel)
+  sqlContext.sparkContext.setJobGroup(statementId, redactedStatement, 
forceCancel)
   getNextRowSetInternal(order, maxRowsL)
 } finally {
   sqlContext.sparkContext.clearJobGroup()
@@ -118,7 +121,6 @@ private[hive] class SparkExecuteStatementOperation(
 
   override def runInternal(): Unit = {
 setState(OperationState.PENDING)
-val redactedStatement = 
SparkUtils.redact(sqlContext.conf.stringRedactionPattern, statement)
 logInfo(s"Submitting query '$redactedStatement' with $statementId")
 HiveThriftServer2.eventManager.onStatementStart(
   statementId,
@@ -220,7 +222,7 @@ private[hive] class SparkExecuteStatementOperation(
 
parentSession.getSessionState.getConf.setClassLoader(executionHiveClassLoader)
   }
 
-  sqlContext.sparkContext.setJobGroup(statementId, substitutorStatement, 
forceCancel)
+  sqlContext.sparkContext.setJobGroup(statementId, redactedStatement, 
forceCancel)
   result = sqlContext.sql(statement)
   logDebug(result.queryExecution.toString())
   HiveThriftServer2.eventManager.onStatementParsed(statementId,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-39225][CORE] Support `spark.history.fs.update.batchSize`

2022-05-19 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 508b1d81fab [SPARK-39225][CORE] Support 
`spark.history.fs.update.batchSize`
508b1d81fab is described below

commit 508b1d81fab3920efed582db4c9b634490b2f26b
Author: Hai Tao 
AuthorDate: Thu May 19 13:09:17 2022 -0700

[SPARK-39225][CORE] Support `spark.history.fs.update.batchSize`

### What changes were proposed in this pull request?
- Add a spark property, `spark.history.fs.update.batchSize`(default 1000), 
which specifies the batch size for updating new eventlog files.
- For each eventlog scan/parse process, only a max of 
`spark.history.fs.update.batchSize` jobs are scanned/parsed. This allows the 
scan process ends within a reasonable time, so new eventlog files(appeared 
after the scan process starts) can be scanned sooner.

### Why are the changes needed?

Current Spark History Server suffers when there are a large number of 
eventlog files under eventLog.dir: when a SHS starts, the initial scan may take 
a long time, and new eventlog files would not be scanned/parsed until the 
initial scan completes.

For example, if the initial scan takes 1-2 days(this is not uncommon in 
large environments), the newly finished spark jobs would not show up in SHS 
since their eventlog files are not scanned/parsed until the initial scan 
process finishes. This would result in a 1-2 days SHS malfunctioning since the 
newly finished spark jobs are most likely to be queried by users.

This PR adds a limit for each scan process to 
`spark.history.fs.update.batchSize`, so that each scan can finish within a 
reasonable time(default 1000 jobs can usually be scanned in ~10-15 mins but 
this can vary depending on the sizes of the eventlog files). This will prevent 
the long initial scan process from blocking the newly appeared eventlog files 
be scanned.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Tested locally:

- Tested with unit tests in the core module and all passed.
- Placed 10 eventlog files under `eventLog.dir`, and set 
`spark.history.fs.update.batchSize` to 2. When the SHS started, observed that 
the scan process parsed 2 files at each time(starting from the files with the 
latest modification time).
- Added a new eventlog file after the first scan started, and observed that 
the 2nd scan process picked up and parsed the new eventlog file before parsing 
other remaining (older) eventlog files.

Closes #36597 from hai-tao-1/updateBatchSize.

Authored-by: Hai Tao 
Signed-off-by: Dongjoon Hyun 
---
 .../spark/deploy/history/FsHistoryProvider.scala   | 33 +++---
 .../org/apache/spark/internal/config/History.scala | 10 +
 .../deploy/history/FsHistoryProviderSuite.scala| 50 ++
 3 files changed, 86 insertions(+), 7 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 01b9e7952b1..a2b162468de 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -472,8 +472,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
* Builds the application list based on the current contents of the log 
directory.
* Tries to reuse as much of the data already in memory as possible, by not 
reading
* applications that haven't been updated since last time the logs were 
checked.
+   * Only a max of UPDATE_BATCHSIZE jobs are processed in each cycle, to 
prevent the process
+   * from running for too long which blocks updating newly appeared eventlog 
files.
*/
   private[history] def checkForLogs(): Unit = {
+var count: Int = 0
 try {
   val newLastScanTime = clock.getTimeMillis()
   logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
@@ -494,6 +497,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
   }
 }
 .flatMap { entry => EventLogFileReader(fs, entry) }
+.filter { reader =>
+  try {
+reader.modificationTime
+true
+  } catch {
+case e: IllegalArgumentException =>
+  logInfo("Exception in getting modificationTime of "
++ reader.rootPath.getName + ". " + e.toString)
+  false
+  }
+}
+.sortWith { case (entry1, entry2) =>
+  entry1.modificationTime > entry2.modificationTime
+}
 .filter { reader =>
   try {
 val info = listing.read(classOf[LogInfo], 

[spark] branch master updated: [SPARK-39163][SQL] Throw an exception w/ error class for an invalid bucket file

2022-05-19 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c6dccc7dd41 [SPARK-39163][SQL] Throw an exception w/ error class for 
an invalid bucket file
c6dccc7dd41 is described below

commit c6dccc7dd412a95007f5bb2584d69b85ff9ebf8e
Author: panbingkun 
AuthorDate: Thu May 19 20:39:35 2022 +0300

[SPARK-39163][SQL] Throw an exception w/ error class for an invalid bucket 
file

### What changes were proposed in this pull request?
In the PR, I propose to use the INVALID_BUCKET_FILE error classes for an 
invalid bucket file.

### Why are the changes needed?
Porting the executing errors for multiple rows from a subquery used as an 
expression to the new error framework should improve user experience with Spark 
SQL.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
UT

Closes #36603 from panbingkun/SPARK-39163.

Authored-by: panbingkun 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json   |  3 +++
 .../spark/sql/errors/QueryExecutionErrors.scala|  5 
 .../spark/sql/execution/DataSourceScanExec.scala   |  4 ++--
 .../sql/errors/QueryExecutionErrorsSuite.scala | 28 --
 .../adaptive/AdaptiveQueryExecSuite.scala  |  6 ++---
 .../spark/sql/sources/BucketedReadSuite.scala  | 23 --
 6 files changed, 38 insertions(+), 31 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index e4ee09ea8a7..1a139c018e8 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -115,6 +115,9 @@
   "INVALID_ARRAY_INDEX_IN_ELEMENT_AT" : {
 "message" : [ "The index  is out of bounds. The array has 
 elements. To return NULL instead, use `try_element_at`. If 
necessary set  to \"false\" to bypass this error." ]
   },
+  "INVALID_BUCKET_FILE" : {
+"message" : [ "Invalid bucket file: " ]
+  },
   "INVALID_FIELD_NAME" : {
 "message" : [ "Field name  is invalid:  is not a struct." 
],
 "sqlState" : "42000"
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index a155b0694b5..1e664100545 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -2000,4 +2000,9 @@ object QueryExecutionErrors extends QueryErrorsBase {
 s"add ${toSQLValue(amount, IntegerType)} $unit to " +
 s"${toSQLValue(DateTimeUtils.microsToInstant(micros), 
TimestampType)}"))
   }
+
+  def invalidBucketFile(path: String): Throwable = {
+new SparkException(errorClass = "INVALID_BUCKET_FILE", messageParameters = 
Array(path),
+  cause = null)
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index f7b627cef08..f5d349d975f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, UnknownPartitioning}
 import org.apache.spark.sql.catalyst.util.{truncatedString, CaseInsensitiveMap}
+import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat 
=> ParquetSource}
 import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
@@ -618,8 +619,7 @@ case class FileSourceScanExec(
   }.groupBy { f =>
 BucketingUtils
   .getBucketId(new Path(f.filePath).getName)
-  // TODO(SPARK-39163): Throw an exception w/ error class for an 
invalid bucket file
-  .getOrElse(throw new IllegalStateException(s"Invalid bucket file 
${f.filePath}"))
+  .getOrElse(throw QueryExecutionErrors.invalidBucketFile(f.filePath))
   }
 
 val prunedFilesGroupedToBuckets = if (optionalBucketSet.isDefined) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
index bdc0772c1de..bbf6c0dda79 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryExecutionErrorsSuite.scala
+++ 

[spark] branch master updated: [SPARK-39234][SQL] Code clean up in SparkThrowableHelper.getMessage

2022-05-19 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ebd916b0054 [SPARK-39234][SQL] Code clean up in 
SparkThrowableHelper.getMessage
ebd916b0054 is described below

commit ebd916b005499c724bbec54b3df85cd28a864e03
Author: Gengliang Wang 
AuthorDate: Thu May 19 19:13:15 2022 +0300

[SPARK-39234][SQL] Code clean up in SparkThrowableHelper.getMessage

### What changes were proposed in this pull request?

1. Remove the starting "\n" in `Origin.context`. The "\n" will be append in 
the method `SparkThrowableHelper.getMessage` instead.
2. Code clean up the method SparkThrowableHelper.getMessage to eliminate 
redundant code.

### Why are the changes needed?

Code clean up to eliminate redundant code.
### Does this PR introduce _any_ user-facing change?

No
### How was this patch tested?

Existing UT

Closes #36612 from gengliangwang/moveNewLine.

Authored-by: Gengliang Wang 
Signed-off-by: Max Gekk 
---
 .../src/main/scala/org/apache/spark/ErrorInfo.scala | 21 +
 .../apache/spark/sql/catalyst/trees/TreeNode.scala  |  2 +-
 .../spark/sql/catalyst/trees/TreeNodeSuite.scala|  3 +--
 3 files changed, 15 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/ErrorInfo.scala 
b/core/src/main/scala/org/apache/spark/ErrorInfo.scala
index e11e6485851..4639e56aa50 100644
--- a/core/src/main/scala/org/apache/spark/ErrorInfo.scala
+++ b/core/src/main/scala/org/apache/spark/ErrorInfo.scala
@@ -77,20 +77,25 @@ private[spark] object SparkThrowableHelper {
   queryContext: String = ""): String = {
 val errorInfo = errorClassToInfoMap.getOrElse(errorClass,
   throw new IllegalArgumentException(s"Cannot find error class 
'$errorClass'"))
-if (errorInfo.subClass.isDefined) {
+val (displayClass, displayMessageParameters, displayFormat) = if 
(errorInfo.subClass.isEmpty) {
+  (errorClass, messageParameters, errorInfo.messageFormat)
+} else {
   val subClass = errorInfo.subClass.get
   val subErrorClass = messageParameters.head
   val errorSubInfo = subClass.getOrElse(subErrorClass,
 throw new IllegalArgumentException(s"Cannot find sub error class 
'$subErrorClass'"))
-  val subMessageParameters = messageParameters.tail
-  "[" + errorClass + "." + subErrorClass + "] " + 
String.format((errorInfo.messageFormat +
-errorSubInfo.messageFormat).replaceAll("<[a-zA-Z0-9_-]+>", "%s"),
-subMessageParameters: _*) + queryContext
+  (errorClass + "." + subErrorClass, messageParameters.tail,
+errorInfo.messageFormat + errorSubInfo.messageFormat)
+}
+val displayMessage = String.format(
+  displayFormat.replaceAll("<[a-zA-Z0-9_-]+>", "%s"),
+  displayMessageParameters : _*)
+val displayQueryContext = if (queryContext.isEmpty) {
+  ""
 } else {
-  "[" + errorClass + "] " + String.format(
-errorInfo.messageFormat.replaceAll("<[a-zA-Z0-9_-]+>", "%s"),
-messageParameters: _*) + queryContext
+  s"\n$queryContext"
 }
+s"[$displayClass] $displayMessage$displayQueryContext"
   }
 
   def getSqlState(errorClass: String): String = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 0714898e19d..54c64515ee4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -89,7 +89,7 @@ case class Origin(
 ""
   }
   val builder = new StringBuilder
-  builder ++= s"\n== SQL$objectContext$positionContext ==\n"
+  builder ++= s"== SQL$objectContext$positionContext ==\n"
 
   val text = sqlText.get
   val start = math.max(startIndex.get, 0)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
index ffbc5d89bdb..899a740bdae 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala
@@ -876,8 +876,7 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper {
   objectType = Some("VIEW"),
   objectName = Some("some_view"))
 val expected =
-  """
-|== SQL of VIEW some_view(line 3, position 38) ==
+  """== SQL of VIEW some_view(line 3, position 38) ==
 |...7890 + 1234567890 + 1234567890, cast('a'
 |   
 |as /* comment */



[spark] branch branch-3.3 updated (88c076de9d0 -> 24a3fa95a38)

2022-05-19 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a change to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


from 88c076de9d0 [SPARK-39229][SQL][3.3] Separate query contexts from 
error-classes.json
 add 24a3fa95a38 [SPARK-39233][SQL] Remove the check for TimestampNTZ 
output in Analyzer

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala  | 6 --
 1 file changed, 6 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: Revert "[SPARK-39043][SQL] Spark SQL Hive client should not gather statistic by default"

2022-05-19 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f2d6b7bb5dd Revert "[SPARK-39043][SQL] Spark SQL Hive client should 
not gather statistic by default"
f2d6b7bb5dd is described below

commit f2d6b7bb5ddc121249fcd0cd6d01f1c47e6e4c08
Author: Wenchen Fan 
AuthorDate: Thu May 19 20:06:11 2022 +0800

Revert "[SPARK-39043][SQL] Spark SQL Hive client should not gather 
statistic by default"

This reverts commit fba30cd491b6163f4a469296bb5af293712ca8d4.
---
 docs/sql-migration-guide.md   | 4 
 .../main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala  | 3 ---
 .../src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala| 3 ++-
 .../test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala | 1 -
 4 files changed, 2 insertions(+), 9 deletions(-)

diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index 555b3125a6b..59b8d47d306 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -22,10 +22,6 @@ license: |
 * Table of contents
 {:toc}
 
-## Upgrading from Spark SQL 3.3 to 3.4
-  
-  - Since Spark 3.4, Spark disables `hive.stats.autogather` by default, which 
means Hive tables won't automatically update statistics that can be consumed by 
Hive (not Spark). To restore the behavior before Spark 3.4, you can set 
`spark.hadoop.hive.stats.autogather` to `true`.
-
 ## Upgrading from Spark SQL 3.2 to 3.3
 
   - Since Spark 3.3, the `histogram_numeric` function in Spark SQL returns an 
output type of an array of structs (x, y), where the type of the 'x' field in 
the return value is propagated from the input values consumed in the aggregate 
function. In Spark 3.2 or earlier, 'x' always had double type. Optionally, use 
the configuration `spark.sql.legacy.histogramNumericPropagateInputType` since 
Spark 3.3 to revert back to the previous behavior. 
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 12cd2740d54..d70ac781c03 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -1270,9 +1270,6 @@ private[hive] object HiveClientImpl extends Logging {
 }
 // Disable CBO because we removed the Calcite dependency.
 hiveConf.setBoolean("hive.cbo.enable", false)
-// Disable auto gather statistic by default.
-hiveConf.setBoolean("hive.stats.autogather", 
confMap.contains("hive.stats.autogather") &&
-  confMap("hive.stats.autogather").equalsIgnoreCase("true"))
 // If this is true, SessionState.start will create a file to log hive job 
which will not be
 // deleted on exit and is useless for spark
 if (hiveConf.getBoolean("hive.session.history.enabled", false)) {
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index fb79c025fc3..c689682a46b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -101,7 +101,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase 
with TestHiveSingleto
 .asInstanceOf[HiveTableRelation]
 
   val properties = relation.tableMeta.ignoredProperties
-  assert(properties.get("totalSize").isEmpty)
+  // Since HIVE-6727, Hive fixes table-level stats for external tables 
are incorrect.
+  assert(properties("totalSize").toLong == 6)
   assert(properties.get("rawDataSize").isEmpty)
 
   val sizeInBytes = relation.stats.sizeInBytes
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
index 5949ada158d..ad0f9a56a82 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
@@ -62,7 +62,6 @@ class HiveClientSuite(version: String, allVersions: 
Seq[String])
 System.gc() // Hack to avoid SEGV on some JVM versions.
 val hadoopConf = new Configuration()
 hadoopConf.set("test", "success")
-hadoopConf.set("hive.stats.autogather", "true")
 client = buildClient(hadoopConf)
 if (versionSpark != null) versionSpark.reset()
 versionSpark = TestHiveVersion(client)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated (e088c820e1e -> 88c076de9d0)

2022-05-19 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a change to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


from e088c820e1e [SPARK-39212][SQL][3.3] Use double quotes for values of 
SQL configs/DS options in error messages
 add 88c076de9d0 [SPARK-39229][SQL][3.3] Separate query contexts from 
error-classes.json

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/memory/SparkOutOfMemoryError.java |  2 +-
 core/src/main/resources/error/error-classes.json   | 10 +++
 .../main/scala/org/apache/spark/ErrorInfo.scala|  7 +++--
 .../scala/org/apache/spark/SparkException.scala| 34 +++---
 .../org/apache/spark/SparkThrowableSuite.scala |  2 +-
 .../spark/sql/errors/QueryExecutionErrors.scala| 29 +-
 6 files changed, 52 insertions(+), 32 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-39201][PYTHON][PS] Implement `ignore_index` of `DataFrame.explode` and `DataFrame.drop_duplicates`

2022-05-19 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d282f8933bc [SPARK-39201][PYTHON][PS] Implement `ignore_index` of 
`DataFrame.explode` and `DataFrame.drop_duplicates`
d282f8933bc is described below

commit d282f8933bc80124bd534e1c03c5162ba0803255
Author: Xinrong Meng 
AuthorDate: Thu May 19 19:18:43 2022 +0900

[SPARK-39201][PYTHON][PS] Implement `ignore_index` of `DataFrame.explode` 
and `DataFrame.drop_duplicates`

### What changes were proposed in this pull request?
Implement `ignore_index` of `DataFrame.explode` and 
`DataFrame.drop_duplicates`.

### Why are the changes needed?
Increase pandas API coverage.

### Does this PR introduce _any_ user-facing change?
Yes. `ignore_index` of `DataFrame.explode` and `DataFrame.drop_duplicates` 
is supported as below.

### How was this patch tested?
Unit tests.

Closes #36569 from xinrong-databricks/explode.ignore_index.

Authored-by: Xinrong Meng 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/pandas/frame.py| 34 --
 python/pyspark/pandas/tests/test_dataframe.py | 65 ++-
 2 files changed, 73 insertions(+), 26 deletions(-)

diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index f3ef0b15879..6049249d827 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -9367,6 +9367,7 @@ defaultdict(, {'col..., 'col...})]
 subset: Optional[Union[Name, List[Name]]] = None,
 keep: Union[bool, str] = "first",
 inplace: bool = False,
+ignore_index: bool = False,
 ) -> Optional["DataFrame"]:
 """
 Return DataFrame with duplicate rows removed, optionally only
@@ -9384,6 +9385,8 @@ defaultdict(, {'col..., 'col...})]
 - False : Drop all duplicates.
 inplace : boolean, default False
 Whether to drop duplicates in place or to return a copy.
+ignore_index : boolean, default False
+If True, the resulting axis will be labeled 0, 1, …, n - 1.
 
 Returns
 ---
@@ -9407,6 +9410,13 @@ defaultdict(, {'col..., 'col...})]
 3  2  c
 4  3  d
 
+>>> df.drop_duplicates(ignore_index=True).sort_index()
+   a  b
+0  1  a
+1  2  a
+2  2  c
+3  3  d
+
 >>> df.drop_duplicates('a').sort_index()
a  b
 0  1  a
@@ -9439,11 +9449,15 @@ defaultdict(, {'col..., 'col...})]
 
 sdf = sdf.where(~scol_for(sdf, column)).drop(column)
 internal = self._internal.with_new_sdf(sdf)
+psdf: DataFrame = DataFrame(internal)
+
 if inplace:
-self._update_internal_frame(internal)
+if ignore_index:
+psdf.reset_index(drop=True, inplace=inplace)
+self._update_internal_frame(psdf._internal)
 return None
 else:
-return DataFrame(internal)
+return psdf.reset_index(drop=True) if ignore_index else psdf
 
 def reindex(
 self,
@@ -12146,7 +12160,7 @@ defaultdict(, {'col..., 'col...})]
 # Returns a frame
 return result
 
-def explode(self, column: Name) -> "DataFrame":
+def explode(self, column: Name, ignore_index: bool = False) -> "DataFrame":
 """
 Transform each element of a list-like to a row, replicating index 
values.
 
@@ -12154,6 +12168,8 @@ defaultdict(, {'col..., 'col...})]
 --
 column : str or tuple
 Column to explode.
+ignore_index : bool, default False
+If True, the resulting index will be labeled 0, 1, …, n - 1.
 
 Returns
 ---
@@ -12184,6 +12200,15 @@ defaultdict(, {'col..., 'col...})]
 1  NaN  1
 2  3.0  1
 2  4.0  1
+
+>>> df.explode('A', ignore_index=True)
+ A  B
+0  1.0  1
+1  2.0  1
+2  3.0  1
+3  NaN  1
+4  3.0  1
+5  4.0  1
 """
 from pyspark.pandas.series import Series
 
@@ -12212,7 +12237,8 @@ defaultdict(, {'col..., 'col...})]
 data_fields[idx] = field.copy(dtype=dtype, spark_type=spark_type, 
nullable=True)
 
 internal = psdf._internal.with_new_sdf(sdf, data_fields=data_fields)
-return DataFrame(internal)
+result_df: DataFrame = DataFrame(internal)
+return result_df.reset_index(drop=True) if ignore_index else result_df
 
 def mad(self, axis: Axis = 0) -> "Series":
 """
diff --git a/python/pyspark/pandas/tests/test_dataframe.py 
b/python/pyspark/pandas/tests/test_dataframe.py
index 8915ec1ca64..2a159423a2d 100644
--- a/python/pyspark/pandas/tests/test_dataframe.py
+++ 

[spark] branch master updated: [SPARK-38947][PYTHON][PS] Supports groupby positional indexing

2022-05-19 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3bbad5c417a [SPARK-38947][PYTHON][PS] Supports groupby positional 
indexing
3bbad5c417a is described below

commit 3bbad5c417a8f819fa6e4838543ed8e63b4a6e75
Author: Yikun Jiang 
AuthorDate: Thu May 19 19:02:43 2022 +0900

[SPARK-38947][PYTHON][PS] Supports groupby positional indexing

### What changes were proposed in this pull request?
Add groupby positional indexing support for Pandas on Spark.

### Why are the changes needed?

Pandas supports Groupby positional indexing since v1.4.0

https://pandas.pydata.org/docs/whatsnew/v1.4.0.html#groupby-positional-indexing

Before 1.4.0, `pdf.groupby("a").head(-2) == pdf.groupby("a").head(0)` 
return empty dataframe, after 1.4.0, it allows to specify positional ranges 
relative to the ends of each group.

```python
df = pd.DataFrame([["g", "g0"], ["g", "g1"], ["g", "g2"], ["g", "g3"], 
["h", "h0"], ["h", "h1"]], columns=["A", "B"])
df.groupby("A").head(-1)

   A   B
0  g  g0
1  g  g1
2  g  g2
4  h  h0
```

### Does this PR introduce _any_ user-facing change?
Yes, follow pandas 1.4+ behaviors, still keep the behavor when using pandas 
< 1.4

### How was this patch tested?
- Added some more postional indexing test (-1, -10).
- `test_tail` and `test_head` pass with pandas 1.3.x and 1.4.x

Closes #36464 from Yikun/SPARK-38947.

Authored-by: Yikun Jiang 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/pandas/groupby.py|  90 +++--
 python/pyspark/pandas/tests/test_groupby.py | 188 
 2 files changed, 133 insertions(+), 145 deletions(-)

diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py
index 9a931f20fd4..2fc237031cd 100644
--- a/python/pyspark/pandas/groupby.py
+++ b/python/pyspark/pandas/groupby.py
@@ -2122,22 +2122,60 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
 groupkey_scols = [psdf._internal.spark_column_for(label) for label in 
groupkey_labels]
 
 sdf = psdf._internal.spark_frame
-tmp_col = verify_temp_column_name(sdf, "__row_number__")
 
+window = Window.partitionBy(*groupkey_scols)
 # This part is handled differently depending on whether it is a tail 
or a head.
-window = (
-
Window.partitionBy(*groupkey_scols).orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).asc())
+ordered_window = (
+window.orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).asc())
 if asc
-else Window.partitionBy(*groupkey_scols).orderBy(
-F.col(NATURAL_ORDER_COLUMN_NAME).desc()
-)
+else window.orderBy(F.col(NATURAL_ORDER_COLUMN_NAME).desc())
 )
 
-sdf = (
-sdf.withColumn(tmp_col, F.row_number().over(window))
-.filter(F.col(tmp_col) <= n)
-.drop(tmp_col)
-)
+if n >= 0 or LooseVersion(pd.__version__) < LooseVersion("1.4.0"):
+tmp_row_num_col = verify_temp_column_name(sdf, "__row_number__")
+sdf = (
+sdf.withColumn(tmp_row_num_col, 
F.row_number().over(ordered_window))
+.filter(F.col(tmp_row_num_col) <= n)
+.drop(tmp_row_num_col)
+)
+else:
+# Pandas supports Groupby positional indexing since v1.4.0
+# 
https://pandas.pydata.org/docs/whatsnew/v1.4.0.html#groupby-positional-indexing
+#
+# To support groupby positional indexing, we need add a 
`__tmp_lag__` column to help
+# us filtering rows before the specified offset row.
+#
+# For example for the dataframe:
+# >>> df = ps.DataFrame([["g", "g0"],
+# ...   ["g", "g1"],
+# ...   ["g", "g2"],
+# ...   ["g", "g3"],
+# ...   ["h", "h0"],
+# ...   ["h", "h1"]], columns=["A", "B"])
+# >>> df.groupby("A").head(-1)
+#
+# Below is a result to show the `__tmp_lag__` column for above df, 
the limit n is
+# `-1`, the `__tmp_lag__` will be set to `0` in rows[:-1], and 
left will be set to
+# `null`:
+#
+# >>> sdf.withColumn(tmp_lag_col, F.lag(F.lit(0), 
-1).over(ordered_window))
+# 
+-+--+---+---+-+---+
+# |__index_level_0__|__groupkey_0__|  A|  
B|__natural_order__|__tmp_lag__|
+# 
+-+--+---+---+-+---+
+# |0| 

[spark] branch branch-3.3 updated: [SPARK-39212][SQL][3.3] Use double quotes for values of SQL configs/DS options in error messages

2022-05-19 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new e088c820e1e [SPARK-39212][SQL][3.3] Use double quotes for values of 
SQL configs/DS options in error messages
e088c820e1e is described below

commit e088c820e1ee5736e130f5d7d1030990b0059141
Author: Max Gekk 
AuthorDate: Thu May 19 16:51:20 2022 +0800

[SPARK-39212][SQL][3.3] Use double quotes for values of SQL configs/DS 
options in error messages

### What changes were proposed in this pull request?
Wrap values of SQL configs and datasource options in error messages by 
double quotes. Added the `toDSOption()` method to `QueryErrorsBase` to quote DS 
options.

This is a backport of https://github.com/apache/spark/pull/36579.

### Why are the changes needed?
1. To highlight SQL config/DS option values and make them more visible for 
users.
2. To be able to easily parse values from error text.
3. To be consistent to other outputs of identifiers, sql statement and etc. 
where Spark uses quotes or ticks.

### Does this PR introduce _any_ user-facing change?
Yes, it changes user-facing error messages.

### How was this patch tested?
By running the modified test suites:
```
$ build/sbt "testOnly *QueryCompilationErrorsSuite"
$ build/sbt "testOnly *QueryExecutionAnsiErrorsSuite"
$ build/sbt "testOnly *QueryExecutionErrorsSuite"
```

Authored-by: Max Gekk 
Signed-off-by: Max Gekk 
(cherry picked from commit 96f4b7dbc1facd1a38be296263606aa312861c95)
Signed-off-by: Max Gekk 

Closes #36600 from MaxGekk/move-ise-from-query-errors-3.3-2.

Authored-by: Max Gekk 
Signed-off-by: Gengliang Wang 
---
 core/src/main/resources/error/error-classes.json   | 18 +++---
 .../org/apache/spark/SparkThrowableSuite.scala |  4 +-
 .../apache/spark/sql/errors/QueryErrorsBase.scala  |  4 ++
 .../spark/sql/errors/QueryExecutionErrors.scala| 20 ---
 .../resources/sql-tests/results/ansi/array.sql.out | 24 
 .../resources/sql-tests/results/ansi/cast.sql.out  | 70 +++---
 .../resources/sql-tests/results/ansi/date.sql.out  |  6 +-
 .../results/ansi/datetime-parsing-invalid.sql.out  |  4 +-
 .../ansi/decimalArithmeticOperations.sql.out   |  8 +--
 .../sql-tests/results/ansi/interval.sql.out| 40 ++---
 .../resources/sql-tests/results/ansi/map.sql.out   |  8 +--
 .../results/ansi/string-functions.sql.out  |  8 +--
 .../sql-tests/results/ansi/timestamp.sql.out   |  2 +-
 .../resources/sql-tests/results/interval.sql.out   | 18 +++---
 .../sql-tests/results/postgreSQL/boolean.sql.out   | 32 +-
 .../sql-tests/results/postgreSQL/float4.sql.out| 14 ++---
 .../sql-tests/results/postgreSQL/float8.sql.out| 10 ++--
 .../sql-tests/results/postgreSQL/int4.sql.out  | 12 ++--
 .../sql-tests/results/postgreSQL/int8.sql.out  | 22 +++
 .../results/postgreSQL/select_having.sql.out   |  2 +-
 .../sql-tests/results/postgreSQL/text.sql.out  |  4 +-
 .../results/postgreSQL/window_part2.sql.out|  6 +-
 .../results/postgreSQL/window_part3.sql.out|  2 +-
 .../results/postgreSQL/window_part4.sql.out|  2 +-
 .../results/timestampNTZ/timestamp-ansi.sql.out|  4 +-
 .../udf/postgreSQL/udf-select_having.sql.out   |  2 +-
 .../sql/errors/QueryExecutionErrorsSuite.scala | 12 ++--
 27 files changed, 184 insertions(+), 174 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 7fef9e563c2..e4ab3a7a353 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -4,7 +4,7 @@
 "sqlState" : "42000"
   },
   "ARITHMETIC_OVERFLOW" : {
-"message" : [ ". If necessary set  to false 
(except for ANSI interval type) to bypass this error." ],
+"message" : [ ". If necessary set  to 
\"false\" (except for ANSI interval type) to bypass this error." ],
 "sqlState" : "22003"
   },
   "CANNOT_CAST_DATATYPE" : {
@@ -12,7 +12,7 @@
 "sqlState" : "22005"
   },
   "CANNOT_CHANGE_DECIMAL_PRECISION" : {
-"message" : [ " cannot be represented as Decimal(, 
). If necessary set  to false to bypass this error." ],
+"message" : [ " cannot be represented as Decimal(, 
). If necessary set  to \"false\" to bypass this 
error." ],
 "sqlState" : "22005"
   },
   "CANNOT_PARSE_DECIMAL" : {
@@ -26,11 +26,11 @@
 "message" : [ "Cannot use a mixture of aggregate function and group 
aggregate pandas UDF" ]
   },
   "CAST_INVALID_INPUT" : {
-"message" : [ "The value  of the type  cannot be cast 
to  because it is malformed. To return NULL instead, use 
`try_cast`. If necessary set  to false to bypass this error." 
],
+

[spark] branch master updated: [SPARK-37939][SQL] Use error classes in the parsing errors of properties

2022-05-19 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7309e76d8b9 [SPARK-37939][SQL] Use error classes in the parsing errors 
of properties
7309e76d8b9 is described below

commit 7309e76d8b95e306d6f3d2f611316b748949e9cf
Author: panbingkun 
AuthorDate: Thu May 19 11:29:37 2022 +0300

[SPARK-37939][SQL] Use error classes in the parsing errors of properties

## What changes were proposed in this pull request?
Migrate the following errors in QueryParsingErrors onto use error classes:

- cannotCleanReservedNamespacePropertyError =>
UNSUPPORTED_FEATURE.SET_NAMESPACE_PROPERTY
- cannotCleanReservedTablePropertyError => 
UNSUPPORTED_FEATURE.SET_TABLE_PROPERTY
- invalidPropertyKeyForSetQuotedConfigurationError => INVALID_PROPERTY_KEY
- invalidPropertyValueForSetQuotedConfigurationError => 
INVALID_PROPERTY_VALUE
- propertiesAndDbPropertiesBothSpecifiedError => 
UNSUPPORTED_FEATURE.SET_PROPERTIES_AND_DBPROPERTIES

### Why are the changes needed?
Porting parsing errors of partitions to new error framework, improve test 
coverage, and document expected error messages in tests.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
By running new test:
```
$ build/sbt "sql/testOnly *QueryParsingErrorsSuite*"
```

Closes #36561 from panbingkun/SPARK-37939.

Authored-by: panbingkun 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json   | 15 
 .../spark/sql/errors/QueryParsingErrors.scala  | 28 +--
 .../spark/sql/errors/QueryParsingErrorsSuite.scala | 88 ++
 .../spark/sql/execution/SparkSqlParserSuite.scala  |  6 +-
 .../command/CreateNamespaceParserSuite.scala   |  3 +-
 5 files changed, 129 insertions(+), 11 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 21fde82adbb..e4ee09ea8a7 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -133,6 +133,12 @@
 "message" : [ "The value of parameter(s) '' in  
is invalid: " ],
 "sqlState" : "22023"
   },
+  "INVALID_PROPERTY_KEY" : {
+"message" : [ " is an invalid property key, please use quotes, e.g. 
SET =" ]
+  },
+  "INVALID_PROPERTY_VALUE" : {
+"message" : [ " is an invalid property value, please use quotes, 
e.g. SET =" ]
+  },
   "INVALID_SQL_SYNTAX" : {
 "message" : [ "Invalid SQL syntax: " ],
 "sqlState" : "42000"
@@ -262,6 +268,15 @@
   "REPEATED_PIVOT" : {
 "message" : [ "Repeated PIVOT operation." ]
   },
+  "SET_NAMESPACE_PROPERTY" : {
+"message" : [ " is a reserved namespace property, ." ]
+  },
+  "SET_PROPERTIES_AND_DBPROPERTIES" : {
+"message" : [ "set PROPERTIES and DBPROPERTIES at the same time." ]
+  },
+  "SET_TABLE_PROPERTY" : {
+"message" : [ " is a reserved table property, ." ]
+  },
   "TOO_MANY_TYPE_ARGUMENTS_FOR_UDF_CLASS" : {
 "message" : [ "UDF class with  type arguments." ]
   },
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
index debfe1b0891..8fa28c0d347 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
@@ -267,16 +267,26 @@ object QueryParsingErrors extends QueryErrorsBase {
 
   def cannotCleanReservedNamespacePropertyError(
   property: String, ctx: ParserRuleContext, msg: String): Throwable = {
-new ParseException(s"$property is a reserved namespace property, $msg.", 
ctx)
+new ParseException(
+  errorClass = "UNSUPPORTED_FEATURE",
+  messageParameters = Array("SET_NAMESPACE_PROPERTY", property, msg),
+  ctx)
   }
 
   def propertiesAndDbPropertiesBothSpecifiedError(ctx: 
CreateNamespaceContext): Throwable = {
-new ParseException("Either PROPERTIES or DBPROPERTIES is allowed.", ctx)
+new ParseException(
+  errorClass = "UNSUPPORTED_FEATURE",
+  messageParameters = Array("SET_PROPERTIES_AND_DBPROPERTIES"),
+  ctx
+)
   }
 
   def cannotCleanReservedTablePropertyError(
   property: String, ctx: ParserRuleContext, msg: String): Throwable = {
-new ParseException(s"$property is a reserved table property, $msg.", ctx)
+new ParseException(
+  errorClass = "UNSUPPORTED_FEATURE",
+  messageParameters = Array("SET_TABLE_PROPERTY", property, msg),
+  ctx)
   }
 
   def duplicatedTablePathsFoundError(
@@ -378,14 +388,18 @@ object QueryParsingErrors extends QueryErrorsBase {
 
   

[spark] branch master updated: [SPARK-39229][SQL] Separate query contexts from error-classes.json

2022-05-19 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3c74aed2cbd [SPARK-39229][SQL] Separate query contexts from 
error-classes.json
3c74aed2cbd is described below

commit 3c74aed2cbde2968fab93b2799a56d075420e7d3
Author: Gengliang Wang 
AuthorDate: Thu May 19 11:00:16 2022 +0300

[SPARK-39229][SQL] Separate query contexts from error-classes.json

### What changes were proposed in this pull request?

Separate query contexts for runtime errors from error-classes.json.

### Why are the changes needed?

The message is JSON should only contain parameters explicitly thrown. It is 
more elegant to separate query contexts from error-classes.json.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing UT

Closes #36604 from gengliangwang/refactorErrorClass.

Authored-by: Gengliang Wang 
Signed-off-by: Max Gekk 
---
 .../apache/spark/memory/SparkOutOfMemoryError.java |  2 +-
 core/src/main/resources/error/error-classes.json   | 10 +++
 .../main/scala/org/apache/spark/ErrorInfo.scala|  9 --
 .../scala/org/apache/spark/SparkException.scala| 34 +++---
 .../org/apache/spark/SparkThrowableSuite.scala |  2 +-
 .../spark/sql/errors/QueryExecutionErrors.scala| 29 +-
 6 files changed, 53 insertions(+), 33 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java 
b/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java
index 22dfe4d4dbe..c5f19a0c201 100644
--- a/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java
+++ b/core/src/main/java/org/apache/spark/memory/SparkOutOfMemoryError.java
@@ -39,7 +39,7 @@ public final class SparkOutOfMemoryError extends 
OutOfMemoryError implements Spa
 }
 
 public SparkOutOfMemoryError(String errorClass, String[] 
messageParameters) {
-super(SparkThrowableHelper.getMessage(errorClass, messageParameters));
+super(SparkThrowableHelper.getMessage(errorClass, messageParameters, 
""));
 this.errorClass = errorClass;
 this.messageParameters = messageParameters;
 }
diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index f4eadd4a368..21fde82adbb 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -4,7 +4,7 @@
 "sqlState" : "42000"
   },
   "ARITHMETIC_OVERFLOW" : {
-"message" : [ ". If necessary set  to 
\"false\" (except for ANSI interval type) to bypass this error." ],
+"message" : [ ". If necessary set  to 
\"false\" (except for ANSI interval type) to bypass this error." ],
 "sqlState" : "22003"
   },
   "CANNOT_CAST_DATATYPE" : {
@@ -12,7 +12,7 @@
 "sqlState" : "22005"
   },
   "CANNOT_CHANGE_DECIMAL_PRECISION" : {
-"message" : [ " cannot be represented as Decimal(, 
). If necessary set  to \"false\" to bypass this 
error." ],
+"message" : [ " cannot be represented as Decimal(, 
). If necessary set  to \"false\" to bypass this error." ],
 "sqlState" : "22005"
   },
   "CANNOT_PARSE_DECIMAL" : {
@@ -23,7 +23,7 @@
 "message" : [ "Cannot up cast  from  to 
.\n" ]
   },
   "CAST_INVALID_INPUT" : {
-"message" : [ "The value  of the type  cannot be cast 
to  because it is malformed. To return NULL instead, use 
`try_cast`. If necessary set  to \"false\" to bypass this 
error." ],
+"message" : [ "The value  of the type  cannot be cast 
to  because it is malformed. To return NULL instead, use 
`try_cast`. If necessary set  to \"false\" to bypass this error." ],
 "sqlState" : "42000"
   },
   "CAST_OVERFLOW" : {
@@ -38,7 +38,7 @@
 "sqlState" : "22008"
   },
   "DIVIDE_BY_ZERO" : {
-"message" : [ "Division by zero. To return NULL instead, use `try_divide`. 
If necessary set  to \"false\" (except for ANSI interval type) to 
bypass this error." ],
+"message" : [ "Division by zero. To return NULL instead, use `try_divide`. 
If necessary set  to \"false\" (except for ANSI interval type) to 
bypass this error." ],
 "sqlState" : "22012"
   },
   "DUPLICATE_KEY" : {
@@ -138,7 +138,7 @@
 "sqlState" : "42000"
   },
   "MAP_KEY_DOES_NOT_EXIST" : {
-"message" : [ "Key  does not exist. To return NULL instead, use 
`try_element_at`. If necessary set  to \"false\" to bypass this 
error." ]
+"message" : [ "Key  does not exist. To return NULL instead, use 
`try_element_at`. If necessary set  to \"false\" to bypass this error." 
]
   },
   "MISSING_COLUMN" : {
 "message" : [ "Column '' does not exist. Did you mean one of 
the following? []" ],
diff --git a/core/src/main/scala/org/apache/spark/ErrorInfo.scala 

[spark] branch master updated (85bb7bf008d -> 0a99060157f)

2022-05-19 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 85bb7bf008d [SPARK-39216][SQL] Do not collapse projects in 
CombineUnions if it hasCorrelatedSubquery
 add 0a99060157f [SPARK-39223][PS] Implement skew and kurt in 
Rolling/RollingGroupby/Expanding/ExpandingGroupby

No new revisions were added by this update.

Summary of changes:
 python/pyspark/pandas/missing/window.py   |   8 -
 python/pyspark/pandas/tests/test_expanding.py |  22 +-
 python/pyspark/pandas/tests/test_rolling.py   |  14 +-
 python/pyspark/pandas/window.py   | 305 ++
 4 files changed, 337 insertions(+), 12 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-39216][SQL] Do not collapse projects in CombineUnions if it hasCorrelatedSubquery

2022-05-19 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 669fc1b2c1c [SPARK-39216][SQL] Do not collapse projects in 
CombineUnions if it hasCorrelatedSubquery
669fc1b2c1c is described below

commit 669fc1b2c1cce7049a9f10e386ed1af050de3909
Author: Yuming Wang 
AuthorDate: Wed May 18 23:37:25 2022 -0700

[SPARK-39216][SQL] Do not collapse projects in CombineUnions if it 
hasCorrelatedSubquery

### What changes were proposed in this pull request?

Makes `CombineUnions` do not collapse projects if it hasCorrelatedSubquery. 
For example:
```sql
SELECT (SELECT IF(x, 1, 0)) AS a
FROM (SELECT true) t(x)
UNION
SELECT 1 AS a
```

It will throw exception:
```
java.lang.IllegalStateException: Couldn't find x#4 in []
```

### Why are the changes needed?

Fix bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #36595 from wangyum/SPARK-39216.

Authored-by: Yuming Wang 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 85bb7bf008d0346feaedc2aab55857d8f1b19908)
Signed-off-by: Dongjoon Hyun 
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  4 +++-
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 25 ++
 2 files changed, 28 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 94e9d3cdd14..02f9a9eb01c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1340,7 +1340,9 @@ object CombineUnions extends Rule[LogicalPlan] {
 while (stack.nonEmpty) {
   stack.pop() match {
 case p1 @ Project(_, p2: Project)
-if canCollapseExpressions(p1.projectList, p2.projectList, 
alwaysInline = false) =>
+if canCollapseExpressions(p1.projectList, p2.projectList, 
alwaysInline = false) &&
+  !p1.projectList.exists(SubqueryExpression.hasCorrelatedSubquery) 
&&
+  !p2.projectList.exists(SubqueryExpression.hasCorrelatedSubquery) 
=>
   val newProjectList = buildCleanedProjectList(p1.projectList, 
p2.projectList)
   stack.pushAll(Seq(p2.copy(projectList = newProjectList)))
 case Distinct(Union(children, byName, allowMissingCol))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 919fe88ec4b..0761f8e2749 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -4478,6 +4478,31 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
 ))
 }
   }
+
+  test("SPARK-39216: Don't collapse projects in CombineUnions if it 
hasCorrelatedSubquery") {
+checkAnswer(
+  sql(
+"""
+  |SELECT (SELECT IF(x, 1, 0)) AS a
+  |FROM (SELECT true) t(x)
+  |UNION
+  |SELECT 1 AS a
+""".stripMargin),
+  Seq(Row(1)))
+
+checkAnswer(
+  sql(
+"""
+  |SELECT x + 1
+  |FROM   (SELECT id
+  |   + (SELECT Max(id)
+  |  FROM   range(2)) AS x
+  |FROM   range(1)) t
+  |UNION
+  |SELECT 1 AS a
+""".stripMargin),
+  Seq(Row(2), Row(1)))
+  }
 }
 
 case class Foo(bar: Option[String])


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-39216][SQL] Do not collapse projects in CombineUnions if it hasCorrelatedSubquery

2022-05-19 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 85bb7bf008d [SPARK-39216][SQL] Do not collapse projects in 
CombineUnions if it hasCorrelatedSubquery
85bb7bf008d is described below

commit 85bb7bf008d0346feaedc2aab55857d8f1b19908
Author: Yuming Wang 
AuthorDate: Wed May 18 23:37:25 2022 -0700

[SPARK-39216][SQL] Do not collapse projects in CombineUnions if it 
hasCorrelatedSubquery

### What changes were proposed in this pull request?

Makes `CombineUnions` do not collapse projects if it hasCorrelatedSubquery. 
For example:
```sql
SELECT (SELECT IF(x, 1, 0)) AS a
FROM (SELECT true) t(x)
UNION
SELECT 1 AS a
```

It will throw exception:
```
java.lang.IllegalStateException: Couldn't find x#4 in []
```

### Why are the changes needed?

Fix bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #36595 from wangyum/SPARK-39216.

Authored-by: Yuming Wang 
Signed-off-by: Dongjoon Hyun 
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  4 +++-
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 25 ++
 2 files changed, 28 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 2f93cf2d8c3..6b9746a880f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1357,7 +1357,9 @@ object CombineUnions extends Rule[LogicalPlan] {
 while (stack.nonEmpty) {
   stack.pop() match {
 case p1 @ Project(_, p2: Project)
-if canCollapseExpressions(p1.projectList, p2.projectList, 
alwaysInline = false) =>
+if canCollapseExpressions(p1.projectList, p2.projectList, 
alwaysInline = false) &&
+  !p1.projectList.exists(SubqueryExpression.hasCorrelatedSubquery) 
&&
+  !p2.projectList.exists(SubqueryExpression.hasCorrelatedSubquery) 
=>
   val newProjectList = buildCleanedProjectList(p1.projectList, 
p2.projectList)
   stack.pushAll(Seq(p2.copy(projectList = newProjectList)))
 case Distinct(Union(children, byName, allowMissingCol))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 72897d15302..2bfaa5b22d5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -4431,6 +4431,31 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
 ))
 }
   }
+
+  test("SPARK-39216: Don't collapse projects in CombineUnions if it 
hasCorrelatedSubquery") {
+checkAnswer(
+  sql(
+"""
+  |SELECT (SELECT IF(x, 1, 0)) AS a
+  |FROM (SELECT true) t(x)
+  |UNION
+  |SELECT 1 AS a
+""".stripMargin),
+  Seq(Row(1)))
+
+checkAnswer(
+  sql(
+"""
+  |SELECT x + 1
+  |FROM   (SELECT id
+  |   + (SELECT Max(id)
+  |  FROM   range(2)) AS x
+  |FROM   range(1)) t
+  |UNION
+  |SELECT 1 AS a
+""".stripMargin),
+  Seq(Row(2), Row(1)))
+  }
 }
 
 case class Foo(bar: Option[String])


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-28516][SQL][TEST][FOLLOWUP] Do not run PostgreSQL tests that are not supported by Spark yet

2022-05-19 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7be2ef9bace [SPARK-28516][SQL][TEST][FOLLOWUP] Do not run PostgreSQL 
tests that are not supported by Spark yet
7be2ef9bace is described below

commit 7be2ef9bacefca0b30ee08a2474887d6c1172e35
Author: Wenchen Fan 
AuthorDate: Wed May 18 23:19:44 2022 -0700

[SPARK-28516][SQL][TEST][FOLLOWUP] Do not run PostgreSQL tests that are not 
supported by Spark yet

### What changes were proposed in this pull request?

In https://github.com/apache/spark/pull/36365, we added the `to_char` 
function and uncommented some PostgreSQL tests. However, the `to_char` function 
in PostgreSQL is more powerful and many of the uncommented tests are not 
supported by Spark yet. This wastes test runtime and this PR comments out these 
unsupported tests like before.

### Why are the changes needed?

test cleanup

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A

Closes #36605 from cloud-fan/test.

Authored-by: Wenchen Fan 
Signed-off-by: Dongjoon Hyun 
---
 .../resources/sql-tests/inputs/postgreSQL/int8.sql |  26 ++--
 .../sql-tests/inputs/postgreSQL/numeric.sql|  35 ++---
 .../sql-tests/results/postgreSQL/int8.sql.out  | 107 +--
 .../sql-tests/results/postgreSQL/numeric.sql.out   | 151 +
 4 files changed, 33 insertions(+), 286 deletions(-)

diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql 
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
index 28be31f9048..fac23b4a26f 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/int8.sql
@@ -102,12 +102,12 @@ SELECT min(q1), min(q2) FROM INT8_TBL;
 SELECT max(q1), max(q2) FROM INT8_TBL;
 
 -- TO_CHAR()
-
+-- some queries are commented out as the format string is not supported by 
Spark
 SELECT '' AS to_char_1, to_char(q1, '9G999G999G999G999G999'), to_char(q2, 
'9,999,999,999,999,999')
 FROM INT8_TBL;
 
-SELECT '' AS to_char_2, to_char(q1, '9G999G999G999G999G999D999G999'), 
to_char(q2, '9,999,999,999,999,999.999,999')
-FROM INT8_TBL;
+-- SELECT '' AS to_char_2, to_char(q1, '9G999G999G999G999G999D999G999'), 
to_char(q2, '9,999,999,999,999,999.999,999')
+-- FROM INT8_TBL;
 
 SELECT '' AS to_char_3, to_char( (q1 * -1), 'PR'), to_char( 
(q2 * -1), '.999PR')
 FROM INT8_TBL;
@@ -116,18 +116,18 @@ SELECT '' AS to_char_4, to_char( (q1 * -1), 
'S'), to_char( (q2 *
 FROM INT8_TBL;
 
 SELECT '' AS to_char_5,  to_char(q2, 'MI') FROM INT8_TBL;
-SELECT '' AS to_char_6,  to_char(q2, 'FMS')FROM INT8_TBL;
-SELECT '' AS to_char_7,  to_char(q2, 'FMTHPR') FROM INT8_TBL;
-SELECT '' AS to_char_8,  to_char(q2, 'SGth')   FROM INT8_TBL;
+-- SELECT '' AS to_char_6,  to_char(q2, 'FMS')FROM 
INT8_TBL;
+-- SELECT '' AS to_char_7,  to_char(q2, 'FMTHPR') FROM 
INT8_TBL;
+-- SELECT '' AS to_char_8,  to_char(q2, 'SGth')   FROM 
INT8_TBL;
 SELECT '' AS to_char_9,  to_char(q2, '0999')   FROM INT8_TBL;
 SELECT '' AS to_char_10, to_char(q2, 'S0999')  FROM INT8_TBL;
-SELECT '' AS to_char_11, to_char(q2, 'FM0999') FROM INT8_TBL;
-SELECT '' AS to_char_12, to_char(q2, 'FM.000') FROM INT8_TBL;
-SELECT '' AS to_char_13, to_char(q2, 'L.000')  FROM INT8_TBL;
-SELECT '' AS to_char_14, to_char(q2, 'FM.999') FROM INT8_TBL;
-SELECT '' AS to_char_15, to_char(q2, 'S 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 . 9 9 
9') FROM INT8_TBL;
-SELECT '' AS to_char_16, to_char(q2, E'9 "text"  "" 999 "\\"text 
between quote marks\\"" ') FROM INT8_TBL;
-SELECT '' AS to_char_17, to_char(q2, '99SG99') FROM INT8_TBL;
+-- SELECT '' AS to_char_11, to_char(q2, 'FM0999') FROM 
INT8_TBL;
+-- SELECT '' AS to_char_12, to_char(q2, 'FM.000') FROM 
INT8_TBL;
+-- SELECT '' AS to_char_13, to_char(q2, 'L.000')  FROM 
INT8_TBL;
+-- SELECT '' AS to_char_14, to_char(q2, 'FM.999') FROM 
INT8_TBL;
+-- SELECT '' AS to_char_15, to_char(q2, 'S 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 . 9 
9 9') FROM INT8_TBL;
+-- SELECT '' AS to_char_16, to_char(q2, E'9 "text"  "" 999 
"\\"text between quote marks\\"" ') FROM INT8_TBL;
+-- SELECT '' AS to_char_17, to_char(q2, '99SG99') FROM 
INT8_TBL;
 
 -- [SPARK-26218] Throw exception on overflow for integers
 -- check min/max values and overflow behavior
diff --git 

Error while running notifications feature from .asf.yaml in spark!

2022-05-19 Thread Apache Infrastructure


An error occurred while running notifications feature in .asf.yaml!:
Unterminated string starting at: line 1 column 24559 (char 24558)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org