commits@spark.apache.org

2023-11-09 Thread yao
This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new fbc150fbbb70 [SPARK-45847][SQL][TESTS] CliSuite flakiness due to 
non-sequential guarantee for stdout&stderr
fbc150fbbb70 is described below

commit fbc150fbbb702f18ca12c6e6dec3fe01dbe76612
Author: Kent Yao 
AuthorDate: Thu Nov 9 16:23:38 2023 +0800

[SPARK-45847][SQL][TESTS] CliSuite flakiness due to non-sequential 
guarantee for stdout&stderr

### What changes were proposed in this pull request?

In CliSuite, This PR adds a retry for tests that write errors to STDERR.

### Why are the changes needed?

To fix flakiness tests as below

https://github.com/chenhao-db/apache-spark/actions/runs/6791437199/job/18463313766

https://github.com/dongjoon-hyun/spark/actions/runs/6753670527/job/18361206900

```sql
[info]   Spark master: local, Application Id: local-1699402393189
[info]   spark-sql> /* SELECT /*+ HINT() 4; */;
[info]
[info]   [PARSE_SYNTAX_ERROR] Syntax error at or near ';'. SQLSTATE: 42601 
(line 1, pos 26)
[info]
[info]   == SQL ==
[info]   /* SELECT /*+ HINT() 4; */;
[info]   --^^^
[info]
[info]   spark-sql> /* SELECT /*+ HINT() 4; */ SELECT 1;
[info]   1
[info]   Time taken: 1.499 seconds, Fetched 1 row(s)
[info]
[info]   [UNCLOSED_BRACKETED_COMMENT] Found an unclosed bracketed comment. 
Please, append */ at the end of the comment. SQLSTATE: 42601
[info]   == SQL ==
[info]   /* Here is a unclosed bracketed comment SELECT 1;
[info]   spark-sql> /* Here is a unclosed bracketed comment SELECT 1;
[info]   spark-sql> /* SELECT /*+ HINT() */ 4; */;
[info]   spark-sql>
```

As you can see the fragment above, the query on the 3rd line from the 
bottom, came from STDOUT, was printed later than its error output, came from 
STDERR.

In this scenario, the error output would not match anything and would 
simply go unnoticed. Finally, timed out and failed.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests and CI

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #43725 from yaooqinn/SPARK-45847.

Authored-by: Kent Yao 
Signed-off-by: Kent Yao 
(cherry picked from commit 06d8cbe073499ff16bca3165e2de1192daad3984)
Signed-off-by: Kent Yao 
---
 .../apache/spark/sql/hive/thriftserver/CliSuite.scala | 19 +--
 1 file changed, 9 insertions(+), 10 deletions(-)

diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 8ba9ea28a5a9..343b32e6227c 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -383,7 +383,7 @@ class CliSuite extends SparkFunSuite {
 )
   }
 
-  test("SPARK-11188 Analysis error reporting") {
+  testRetry("SPARK-11188 Analysis error reporting") {
 runCliWithin(timeout = 2.minute,
   errorResponses = Seq("AnalysisException"))(
   "select * from nonexistent_table;" -> "nonexistent_table"
@@ -551,7 +551,7 @@ class CliSuite extends SparkFunSuite {
 )
   }
 
-  test("SparkException with root cause will be printStacktrace") {
+  testRetry("SparkException with root cause will be printStacktrace") {
 // If it is not in silent mode, will print the stacktrace
 runCliWithin(
   1.minute,
@@ -575,8 +575,8 @@ class CliSuite extends SparkFunSuite {
 runCliWithin(1.minute)("SELECT MAKE_DATE(-44, 3, 15);" -> "-0044-03-15")
   }
 
-  test("SPARK-33100: Ignore a semicolon inside a bracketed comment in 
spark-sql") {
-runCliWithin(4.minute)(
+  testRetry("SPARK-33100: Ignore a semicolon inside a bracketed comment in 
spark-sql") {
+runCliWithin(1.minute)(
   "/* SELECT 'test';*/ SELECT 'test';" -> "test",
   ";;/* SELECT 'test';*/ SELECT 'test';" -> "test",
   "/* SELECT 'test';*/;; SELECT 'test';" -> "test",
@@ -623,8 +623,8 @@ class CliSuite extends SparkFunSuite {
 )
   }
 
-  test("SPARK-37555: spark-sql should pass last unclosed comment to backend") {
-runCliWithin(5.minute)(
+  testRetry("SPARK-37555: spark-sql should pass last unclosed comment to 
backend") {
+runCliWithin(1.minute)(
   // Only unclosed comment.
   "/* SELECT /*+ HINT() 4; */;".stripMargin -> "Syntax error at or near 
';'",
   // Unclosed nested bracketed comment.
@@ -637,7 +637,7 @@ class CliSuite extends SparkFunSuite {
 )
   }
 
-  test("SPARK-37694: delete [jar|file|ar

commits@spark.apache.org

2023-11-09 Thread yao
This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 06d8cbe07349 [SPARK-45847][SQL][TESTS] CliSuite flakiness due to 
non-sequential guarantee for stdout&stderr
06d8cbe07349 is described below

commit 06d8cbe073499ff16bca3165e2de1192daad3984
Author: Kent Yao 
AuthorDate: Thu Nov 9 16:23:38 2023 +0800

[SPARK-45847][SQL][TESTS] CliSuite flakiness due to non-sequential 
guarantee for stdout&stderr

### What changes were proposed in this pull request?

In CliSuite, This PR adds a retry for tests that write errors to STDERR.

### Why are the changes needed?

To fix flakiness tests as below

https://github.com/chenhao-db/apache-spark/actions/runs/6791437199/job/18463313766

https://github.com/dongjoon-hyun/spark/actions/runs/6753670527/job/18361206900

```sql
[info]   Spark master: local, Application Id: local-1699402393189
[info]   spark-sql> /* SELECT /*+ HINT() 4; */;
[info]
[info]   [PARSE_SYNTAX_ERROR] Syntax error at or near ';'. SQLSTATE: 42601 
(line 1, pos 26)
[info]
[info]   == SQL ==
[info]   /* SELECT /*+ HINT() 4; */;
[info]   --^^^
[info]
[info]   spark-sql> /* SELECT /*+ HINT() 4; */ SELECT 1;
[info]   1
[info]   Time taken: 1.499 seconds, Fetched 1 row(s)
[info]
[info]   [UNCLOSED_BRACKETED_COMMENT] Found an unclosed bracketed comment. 
Please, append */ at the end of the comment. SQLSTATE: 42601
[info]   == SQL ==
[info]   /* Here is a unclosed bracketed comment SELECT 1;
[info]   spark-sql> /* Here is a unclosed bracketed comment SELECT 1;
[info]   spark-sql> /* SELECT /*+ HINT() */ 4; */;
[info]   spark-sql>
```

As you can see the fragment above, the query on the 3rd line from the 
bottom, came from STDOUT, was printed later than its error output, came from 
STDERR.

In this scenario, the error output would not match anything and would 
simply go unnoticed. Finally, timed out and failed.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests and CI

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #43725 from yaooqinn/SPARK-45847.

Authored-by: Kent Yao 
Signed-off-by: Kent Yao 
---
 .../apache/spark/sql/hive/thriftserver/CliSuite.scala | 19 +--
 1 file changed, 9 insertions(+), 10 deletions(-)

diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 5391965ded2e..4f0d4dff566c 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -383,7 +383,7 @@ class CliSuite extends SparkFunSuite {
 )
   }
 
-  test("SPARK-11188 Analysis error reporting") {
+  testRetry("SPARK-11188 Analysis error reporting") {
 runCliWithin(timeout = 2.minute,
   errorResponses = Seq("AnalysisException"))(
   "select * from nonexistent_table;" -> "nonexistent_table"
@@ -551,7 +551,7 @@ class CliSuite extends SparkFunSuite {
 )
   }
 
-  test("SparkException with root cause will be printStacktrace") {
+  testRetry("SparkException with root cause will be printStacktrace") {
 // If it is not in silent mode, will print the stacktrace
 runCliWithin(
   1.minute,
@@ -575,8 +575,8 @@ class CliSuite extends SparkFunSuite {
 runCliWithin(1.minute)("SELECT MAKE_DATE(-44, 3, 15);" -> "-0044-03-15")
   }
 
-  test("SPARK-33100: Ignore a semicolon inside a bracketed comment in 
spark-sql") {
-runCliWithin(4.minute)(
+  testRetry("SPARK-33100: Ignore a semicolon inside a bracketed comment in 
spark-sql") {
+runCliWithin(1.minute)(
   "/* SELECT 'test';*/ SELECT 'test';" -> "test",
   ";;/* SELECT 'test';*/ SELECT 'test';" -> "test",
   "/* SELECT 'test';*/;; SELECT 'test';" -> "test",
@@ -623,8 +623,8 @@ class CliSuite extends SparkFunSuite {
 )
   }
 
-  test("SPARK-37555: spark-sql should pass last unclosed comment to backend") {
-runCliWithin(5.minute)(
+  testRetry("SPARK-37555: spark-sql should pass last unclosed comment to 
backend") {
+runCliWithin(1.minute)(
   // Only unclosed comment.
   "/* SELECT /*+ HINT() 4; */;".stripMargin -> "Syntax error at or near 
';'",
   // Unclosed nested bracketed comment.
@@ -637,7 +637,7 @@ class CliSuite extends SparkFunSuite {
 )
   }
 
-  test("SPARK-37694: delete [jar|file|archive] shall use spark sql processor") 
{
+  testRetry("SPARK-37694: delete [jar|file|archive] shall use spark

(spark) branch master updated: [SPARK-44886][SQL] Introduce CLUSTER BY clause for CREATE/REPLACE TABLE

2023-11-09 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5ac88b12f86b [SPARK-44886][SQL] Introduce CLUSTER BY clause for 
CREATE/REPLACE TABLE
5ac88b12f86b is described below

commit 5ac88b12f86b306e7612591154c26aebabb957a8
Author: Terry Kim 
AuthorDate: Thu Nov 9 19:30:12 2023 +0800

[SPARK-44886][SQL] Introduce CLUSTER BY clause for CREATE/REPLACE TABLE

### What changes were proposed in this pull request?
This proposes to introduce `CLUSTER BY` SQL clause to CREATE/REPLACE SQL 
syntax:

```
CREATE TABLE tbl(a int, b string) CLUSTER BY (a, b)
```

This doesn't introduce a default implementation for clustering, but it's up 
to the catalog/datasource implementation to utilize the clustering information 
(e.g., Delta, Iceberg, etc.).

### Why are the changes needed?
To introduce the concept of clustering to datasources.

### Does this PR introduce _any_ user-facing change?

Yes, this introduces a new SQL keyword.

### How was this patch tested?
Added extensive unit tests.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #42577 from imback82/cluster_by.

Lead-authored-by: Terry Kim 
Co-authored-by: Terry Kim 
Signed-off-by: Wenchen Fan 
---
 .../src/main/resources/error/error-classes.json|  12 +++
 docs/sql-error-conditions.md   |  12 +++
 .../spark/sql/catalyst/parser/SqlBaseParser.g4 |   5 +
 .../spark/sql/errors/QueryParsingErrors.scala  |   8 ++
 .../spark/sql/catalyst/catalog/interface.scala |  63 +++-
 .../spark/sql/catalyst/parser/AstBuilder.scala |  46 +++--
 .../sql/connector/catalog/CatalogV2Implicits.scala |  26 -
 .../sql/connector/expressions/expressions.scala|  36 +++
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 110 -
 .../sql/connector/catalog/InMemoryBaseTable.scala  |   1 +
 .../expressions/TransformExtractorSuite.scala  |  43 +++-
 .../catalyst/analysis/ResolveSessionCatalog.scala  |   8 +-
 .../spark/sql/execution/SparkSqlParser.scala   |   3 +-
 .../datasources/v2/V2SessionCatalog.scala  |   8 +-
 .../apache/spark/sql/internal/CatalogImpl.scala|   3 +-
 .../command/CreateTableClusterBySuiteBase.scala|  83 
 .../command/v1/CreateTableClusterBySuite.scala |  51 ++
 .../command/v2/CreateTableClusterBySuite.scala |  50 ++
 .../command/CreateTableClusterBySuite.scala|  39 
 19 files changed, 583 insertions(+), 24 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index c38171c3d9e6..26f6c0240afb 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -2963,6 +2963,18 @@
 ],
 "sqlState" : "42601"
   },
+  "SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED" : {
+"message" : [
+  "Cannot specify both CLUSTER BY and CLUSTERED BY INTO BUCKETS."
+],
+"sqlState" : "42908"
+  },
+  "SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED" : {
+"message" : [
+  "Cannot specify both CLUSTER BY and PARTITIONED BY."
+],
+"sqlState" : "42908"
+  },
   "SPECIFY_PARTITION_IS_NOT_ALLOWED" : {
 "message" : [
   "A CREATE TABLE without explicit column list cannot specify PARTITIONED 
BY.",
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 8a5faa15dc9c..2cb433b19fa5 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -1852,6 +1852,18 @@ A CREATE TABLE without explicit column list cannot 
specify bucketing information
 Please use the form with explicit column list and specify bucketing 
information.
 Alternatively, allow bucketing information to be inferred by omitting the 
clause.
 
+### SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED
+
+[SQLSTATE: 
42908](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Cannot specify both CLUSTER BY and CLUSTERED BY INTO BUCKETS.
+
+### SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED
+
+[SQLSTATE: 
42908](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Cannot specify both CLUSTER BY and PARTITIONED BY.
+
 ### SPECIFY_PARTITION_IS_NOT_ALLOWED
 
 [SQLSTATE: 
42601](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 84a31dafed98..bd449a4e194e 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalys

(spark) branch master updated: [SPARK-45815][SQL][STREAMING] Provide an interface for other Streaming sources to add `_metadata` columns

2023-11-09 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 1e93c408e19f [SPARK-45815][SQL][STREAMING] Provide an interface for 
other Streaming sources to add `_metadata` columns
1e93c408e19f is described below

commit 1e93c408e19f4ce8cec8220fd5eb6c1cb76468ff
Author: Yaohua Zhao 
AuthorDate: Thu Nov 9 19:35:51 2023 +0800

[SPARK-45815][SQL][STREAMING] Provide an interface for other Streaming 
sources to add `_metadata` columns

### What changes were proposed in this pull request?

Currently, only the native V1 file-based streaming source can read the 
`_metadata` column: 
https://github.com/apache/spark/blob/370870b7a0303e4a2c4b3dea1b479b4fcbc93f8d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala#L63

Our goal is to create an interface that allows other streaming sources to 
add `_metadata` columns. For instance, we would like the Delta Streaming 
source, which you can find here: 
https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala#L49,
 to extend this interface and provide the `_metadata` column for its underlying 
storage format, such as Parquet.

### Why are the changes needed?
A generic interface to enable other streaming sources to expose and add 
`_metadata` columns.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
N/A

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43692 from Yaohua628/spark-45815.

Authored-by: Yaohua Zhao 
Signed-off-by: Wenchen Fan 
---
 .../execution/streaming/StreamingRelation.scala| 11 
 .../org/apache/spark/sql/sources/interfaces.scala  | 31 ++
 2 files changed, 37 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index 135d46c5291e..c5d5a79d3454 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -29,6 +29,7 @@ import 
org.apache.spark.sql.connector.read.streaming.SparkDataStream
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.LeafExecNode
 import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat}
+import org.apache.spark.sql.sources.SupportsStreamSourceMetadataColumns
 
 object StreamingRelation {
   def apply(dataSource: DataSource): StreamingRelation = {
@@ -60,11 +61,11 @@ case class StreamingRelation(dataSource: DataSource, 
sourceName: String, output:
   override def newInstance(): LogicalPlan = this.copy(output = 
output.map(_.newInstance()))
 
   override lazy val metadataOutput: Seq[AttributeReference] = {
-dataSource.providingClass match {
-  // If the dataSource provided class is a same or subclass of FileFormat 
class
-  case f if classOf[FileFormat].isAssignableFrom(f) =>
-metadataOutputWithOutConflicts(
-  
Seq(dataSource.providingInstance().asInstanceOf[FileFormat].createFileMetadataCol()))
+dataSource.providingInstance() match {
+  case f: FileFormat => 
metadataOutputWithOutConflicts(Seq(f.createFileMetadataCol()))
+  case s: SupportsStreamSourceMetadataColumns =>
+metadataOutputWithOutConflicts(s.getMetadataOutput(
+  dataSource.sparkSession, dataSource.options, 
dataSource.userSpecifiedSchema))
   case _ => Nil
 }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 63e57c6804e1..d194ae77e968 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -309,3 +309,34 @@ trait InsertableRelation {
 trait CatalystScan {
   def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): 
RDD[Row]
 }
+
+/**
+ * Implemented by StreamSourceProvider objects that can generate file metadata 
columns.
+ * This trait extends the basic StreamSourceProvider by allowing the addition 
of metadata
+ * columns to the schema of the Stream Data Source.
+ */
+trait SupportsStreamSourceMetadataColumns extends StreamSourceProvider {
+
+  /**
+   * Returns the metadata columns that should be added to the schema of the 
Stream Source.
+   * These metadata columns supplement the columns
+   * defined in the sourceSchema() of the StreamSourceProvider.
+   *
+   * The final schema for the Stream Source, therefore, 

(spark) branch master updated: [SPARK-45867][CORE] Support `spark.worker.idPattern`

2023-11-09 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7bc96e8e3767 [SPARK-45867][CORE] Support `spark.worker.idPattern`
7bc96e8e3767 is described below

commit 7bc96e8e37672483a07088dbbdcf3610a497af1d
Author: Dongjoon Hyun 
AuthorDate: Thu Nov 9 13:59:44 2023 -0800

[SPARK-45867][CORE] Support `spark.worker.idPattern`

### What changes were proposed in this pull request?

This PR aims to support `spark.worker.idPattern`.

### Why are the changes needed?

To allow users to customize the worker IDs if they want.
- From: `worker-20231109183042-[fe80::1%lo0]-39729`
- To: `my-worker-20231109183042-[fe80::1%lo0]`

For example,
```
$ cat conf/spark-defaults.conf
spark.worker.idPattern worker-%2$s
```

https://github.com/apache/spark/assets/9700541/bc367e77-c19d-44f1-bbc5-3f4c5edec33d";>

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs with newly added test case.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43740 from dongjoon-hyun/SPARK-45867.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 .../scala/org/apache/spark/deploy/worker/Worker.scala  |  3 ++-
 .../org/apache/spark/internal/config/Worker.scala  | 11 +++
 .../org/apache/spark/deploy/worker/WorkerSuite.scala   | 18 --
 3 files changed, 29 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 44082ae78794..ddbba55e00b4 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -62,6 +62,7 @@ private[deploy] class Worker(
 
   private val host = rpcEnv.address.host
   private val port = rpcEnv.address.port
+  private val workerIdPattern = conf.get(config.Worker.WORKER_ID_PATTERN)
 
   Utils.checkHost(host)
   assert (port > 0)
@@ -813,7 +814,7 @@ private[deploy] class Worker(
   }
 
   private def generateWorkerId(): String = {
-"worker-%s-%s-%d".format(createDateFormat.format(new Date), host, port)
+workerIdPattern.format(createDateFormat.format(new Date), host, port)
   }
 
   override def onStop(): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala 
b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
index fda3a57546b6..f160470edd8f 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
@@ -89,4 +89,15 @@ private[spark] object Worker {
   .version("3.2.0")
   .stringConf
   .createWithDefaultString("PWR")
+
+  val WORKER_ID_PATTERN = ConfigBuilder("spark.worker.idPattern")
+.internal()
+.doc("The pattern for worker ID generation based on Java `String.format` 
method. The " +
+  "default value is `worker-%s-%s-%d` which represents the existing worker 
id string, e.g.," +
+  " `worker-20231109183042-[fe80::1%lo0]-39729`. Please be careful to 
generate unique IDs")
+.version("4.0.0")
+.stringConf
+.checkValue(!_.format("2023110900", "host", 0).exists(_.isWhitespace),
+  "Whitespace is not allowed.")
+.createWithDefaultString("worker-%s-%s-%d")
 }
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index a07d4f76905a..1b2d92af4b02 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -29,7 +29,7 @@ import org.mockito.Answers.RETURNS_SMART_NULLS
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito._
 import org.mockito.invocation.InvocationOnMock
-import org.scalatest.BeforeAndAfter
+import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
 import org.scalatest.matchers.must.Matchers
 import org.scalatest.matchers.should.Matchers._
@@ -49,7 +49,7 @@ import 
org.apache.spark.resource.TestResourceIDs.{WORKER_FPGA_ID, WORKER_GPU_ID}
 import org.apache.spark.rpc.{RpcAddress, RpcEnv}
 import org.apache.spark.util.Utils
 
-class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
+class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter with 
PrivateMethodTester {
 
   import org.apache.spark.deploy.DeployTestUtils._
 
@@ -62,6 +62,8 @@ class WorkerSuite extends SparkFunSuite with Matchers with 
BeforeAndAfter {
 
   implicit val formats = 

(spark) branch master updated: [SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` command

2023-11-09 Thread sunchao
This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ce818ba96953 [SPARK-45731][SQL] Also update partition statistics with 
`ANALYZE TABLE` command
ce818ba96953 is described below

commit ce818ba969537cf9eb16865a88148407a5992e98
Author: Chao Sun 
AuthorDate: Thu Nov 9 15:56:47 2023 -0800

[SPARK-45731][SQL] Also update partition statistics with `ANALYZE TABLE` 
command

### What changes were proposed in this pull request?

Also update partition statistics (e.g., total size in bytes, row count) 
with `ANALYZE TABLE` command.

### Why are the changes needed?

Currently when a `ANALYZE TABLE ` command is triggered against a 
partition table, only table stats are updated, but not partition stats. For 
Spark users who want to update the latter, they have to use a different syntax: 
`ANALYZE TABLE  PARTITION()` which is more verbose.

Given `ANALYZE TABLE` internally already calculates total size for all the 
partitions, it makes sense to also update partition stats using the result. In 
this way, Spark users do not need to remember two different syntaxes.

In addition, when using `ANALYZE TABLE` with the "scan node", i.e., 
`NOSCAN` is NOT specified, we can also calculate row count for all the 
partitions and update the stats accordingly.

The above behavior is controlled via a new flag 
`spark.sql.statistics.updatePartitionStatsInAnalyzeTable.enabled`, which by 
default is turned off.

### Does this PR introduce _any_ user-facing change?

Not by default. When 
`spark.sql.statistics.updatePartitionStatsInAnalyzeTable.enabled`, Spark will 
now update partition stats as well with `ANALYZE TABLE` command, on a 
partitioned table.

### How was this patch tested?

Added a unit test for this feature.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43629 from sunchao/SPARK-45731.

Authored-by: Chao Sun 
Signed-off-by: Chao Sun 
---
 .../org/apache/spark/sql/internal/SQLConf.scala| 13 
 .../command/AnalyzePartitionCommand.scala  | 50 ++---
 .../spark/sql/execution/command/CommandUtils.scala | 87 ++
 .../apache/spark/sql/hive/StatisticsSuite.scala| 78 +++
 4 files changed, 170 insertions(+), 58 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index ecc3e6e101fc..ff6ab7b541a3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2664,6 +2664,16 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val UPDATE_PART_STATS_IN_ANALYZE_TABLE_ENABLED =
+
buildConf("spark.sql.statistics.updatePartitionStatsInAnalyzeTable.enabled")
+  .doc("When this config is enabled, Spark will also update partition 
statistics in analyze " +
+"table command (i.e., ANALYZE TABLE .. COMPUTE STATISTICS [NOSCAN]). 
Note the command " +
+"will also become more expensive. When this config is disabled, Spark 
will only " +
+"update table level statistics.")
+  .version("4.0.0")
+  .booleanConf
+  .createWithDefault(false)
+
   val CBO_ENABLED =
 buildConf("spark.sql.cbo.enabled")
   .doc("Enables CBO for estimation of plan statistics when set true.")
@@ -5113,6 +5123,9 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
 
   def autoSizeUpdateEnabled: Boolean = 
getConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED)
 
+  def updatePartStatsInAnalyzeTableEnabled: Boolean =
+getConf(SQLConf.UPDATE_PART_STATS_IN_ANALYZE_TABLE_ENABLED)
+
   def joinReorderEnabled: Boolean = getConf(SQLConf.JOIN_REORDER_ENABLED)
 
   def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
index c2b227d6cad7..7fe4c73abf90 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
@@ -17,15 +17,12 @@
 
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.sql.{Column, Row, SparkSession}
+import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, 
ExternalCatalogUti

(spark) branch master updated (ce818ba96953 -> d9c5f9d6d42a)

2023-11-09 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from ce818ba96953 [SPARK-45731][SQL] Also update partition statistics with 
`ANALYZE TABLE` command
 add d9c5f9d6d42a [SPARK-45798][CONNECT] Assert server-side session ID

No new revisions were added by this update.

Summary of changes:
 .../spark/sql/connect/client/ArtifactSuite.scala   |   6 +-
 .../src/main/protobuf/spark/connect/base.proto |  69 +-
 .../spark/sql/connect/client/ArtifactManager.scala |  10 +-
 .../client/CustomSparkConnectBlockingStub.scala|  34 ++-
 .../connect/client/CustomSparkConnectStub.scala|   6 +-
 .../ExecutePlanResponseReattachableIterator.scala  |  19 +-
 .../connect/client/GrpcExceptionConverter.scala|  10 +-
 .../sql/connect/client/ResponseValidator.scala | 111 +
 .../sql/connect/client/SparkConnectClient.scala|   7 +-
 .../sql/connect/client/SparkConnectStubState.scala |  43 
 .../artifact/SparkConnectArtifactManager.scala |   2 +-
 .../execution/ExecuteResponseObserver.scala|   1 +
 .../execution/SparkConnectPlanExecution.scala  |  11 +-
 .../sql/connect/planner/SparkConnectPlanner.scala  |   7 +-
 .../spark/sql/connect/service/SessionHolder.scala  |   7 +
 .../service/SparkConnectAddArtifactsHandler.scala  |   2 +
 .../service/SparkConnectAnalyzeHandler.scala   |   4 +-
 .../SparkConnectArtifactStatusesHandler.scala  |   5 +
 .../service/SparkConnectConfigHandler.scala|   5 +-
 .../service/SparkConnectInterruptHandler.scala |   1 +
 .../SparkConnectReleaseExecuteHandler.scala|   1 +
 .../SparkConnectReleaseSessionHandler.scala|   3 +
 .../spark/sql/connect/utils/MetricGenerator.scala  |   8 +-
 .../spark/sql/connect/SparkConnectServerTest.scala |   7 +-
 .../connect/artifact/ArtifactManagerSuite.scala|  18 +-
 .../service/ArtifactStatusesHandlerSuite.scala |   7 +-
 .../service/SparkConnectServiceE2ESuite.scala  |   2 +
 python/pyspark/sql/connect/client/core.py  |  92 
 python/pyspark/sql/connect/proto/base_pb2.py   | 260 ++---
 python/pyspark/sql/connect/proto/base_pb2.pyi  | 165 +++--
 30 files changed, 671 insertions(+), 252 deletions(-)
 create mode 100644 
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ResponseValidator.scala
 create mode 100644 
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectStubState.scala


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-45756][CORE] Support `spark.master.useAppNameAsAppId.enabled`

2023-11-09 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a566099133ff [SPARK-45756][CORE] Support 
`spark.master.useAppNameAsAppId.enabled`
a566099133ff is described below

commit a566099133ff38cd1b2cd2fe64879bf0ba75fa9b
Author: Dongjoon Hyun 
AuthorDate: Thu Nov 9 18:34:43 2023 -0800

[SPARK-45756][CORE] Support `spark.master.useAppNameAsAppId.enabled`

### What changes were proposed in this pull request?

This PR aims to support `spark.master.useAppNameAsAppId.enabled` as an 
experimental feature in Spark Standalone cluster.

### Why are the changes needed?

This allows the users to control the appID completely.

https://github.com/apache/spark/assets/9700541/ad2b89ce-9d7d-4144-bd52-f29b94051103";>

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual tests with the following procedure.
```
$ SPARK_MASTER_OPTS="-Dspark.master.useAppNameAsAppId.enabled=true" 
sbin/start-master.sh
$ bin/spark-shell --master spark://max.local:7077
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43743 from dongjoon-hyun/SPARK-45756.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 .../scala/org/apache/spark/deploy/master/Master.scala|  7 ++-
 .../scala/org/apache/spark/internal/config/package.scala |  8 
 .../org/apache/spark/deploy/master/MasterSuite.scala | 16 
 3 files changed, 30 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index dbb647252c5f..b3fbec1830e4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -120,6 +120,7 @@ private[deploy] class Master(
   private val defaultCores = conf.get(DEFAULT_CORES)
   val reverseProxy = conf.get(UI_REVERSE_PROXY)
   val historyServerUrl = conf.get(MASTER_UI_HISTORY_SERVER_URL)
+  val useAppNameAsAppId = conf.get(MASTER_USE_APP_NAME_AS_APP_ID)
 
   // Alternative application submission gateway that is stable across Spark 
versions
   private val restServerEnabled = conf.get(MASTER_REST_SERVER_ENABLED)
@@ -1041,7 +1042,11 @@ private[deploy] class Master(
   ApplicationInfo = {
 val now = System.currentTimeMillis()
 val date = new Date(now)
-val appId = newApplicationId(date)
+val appId = if (useAppNameAsAppId) {
+  desc.name.toLowerCase().replaceAll("\\s+", "")
+} else {
+  newApplicationId(date)
+}
 new ApplicationInfo(now, appId, desc, date, driver, defaultCores)
   }
 
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index bbadf91fc41c..b2bf30863a91 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1846,6 +1846,14 @@ package object config {
   .stringConf
   .createOptional
 
+  private[spark] val MASTER_USE_APP_NAME_AS_APP_ID =
+ConfigBuilder("spark.master.useAppNameAsAppId.enabled")
+  .internal()
+  .doc("(Experimental) If true, Spark master uses the user-provided 
appName for appId.")
+  .version("4.0.0")
+  .booleanConf
+  .createWithDefault(false)
+
   private[spark] val IO_COMPRESSION_SNAPPY_BLOCKSIZE =
 ConfigBuilder("spark.io.compression.snappy.blockSize")
   .doc("Block size in bytes used in Snappy compression, in the case when " 
+
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 4f8457f930e4..2e54673649c7 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -804,6 +804,7 @@ class MasterSuite extends SparkFunSuite
   private val _state = PrivateMethod[RecoveryState.Value](Symbol("state"))
   private val _newDriverId = PrivateMethod[String](Symbol("newDriverId"))
   private val _newApplicationId = 
PrivateMethod[String](Symbol("newApplicationId"))
+  private val _createApplication = 
PrivateMethod[ApplicationInfo](Symbol("createApplication"))
 
   private val workerInfo = makeWorkerInfo(4096, 10)
   private val workerInfos = Array(workerInfo, workerInfo, workerInfo)
@@ -1275,6 +1276,21 @@ class MasterSuite extends SparkFunSuite
   assert(master.invokePrivate(_newApplicationId(submitDate)) === s"${i % 
1000}")
 }
   }
+
+  test("SPARK-45756: Use appName for appId") {
+val conf = new Sp

(spark) branch master updated (a566099133f -> 2085ac52175)

2023-11-09 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from a566099133f [SPARK-45756][CORE] Support 
`spark.master.useAppNameAsAppId.enabled`
 add 2085ac52175 [SPARK-45850][BUILD] Upgrade oracle jdbc driver to 
23.3.0.23.09

No new revisions were added by this update.

Summary of changes:
 connector/docker-integration-tests/pom.xml | 2 +-
 pom.xml| 4 ++--
 sql/core/pom.xml   | 2 +-
 3 files changed, 4 insertions(+), 4 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark-docker) branch master updated: Add support for java 17 from spark 3.5.0

2023-11-09 Thread yikun
This is an automated email from the ASF dual-hosted git repository.

yikun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark-docker.git


The following commit(s) were added to refs/heads/master by this push:
 new 6f68fe0  Add support for java 17 from spark 3.5.0
6f68fe0 is described below

commit 6f68fe0f7051c10f2bf43a50a7decfce2e97baf0
Author: vakarisbk 
AuthorDate: Fri Nov 10 11:33:39 2023 +0800

Add support for java 17 from spark 3.5.0

### What changes were proposed in this pull request?
1. Create Java17 base images alongside Java11 images starting from spark 
3.5.0
2. Change ubuntu version to 22.04 for `scala2.12-java17-*`

### Why are the changes needed?

Spark supports multiple Java versions, but the images are currently built 
only with Java 11.

### Does this PR introduce _any_ user-facing change?

New images would be available in the repositories.

### How was this patch tested?

Closes #56 from vakarisbk/master.

Authored-by: vakarisbk 
Signed-off-by: Yikun Jiang 
---
 .github/workflows/build_3.5.0.yaml |   3 +-
 .github/workflows/main.yml |  20 +++-
 .github/workflows/publish.yml  |   4 +-
 .github/workflows/test.yml |   3 +
 3.5.0/scala2.12-java17-python3-r-ubuntu/Dockerfile |  29 +
 3.5.0/scala2.12-java17-python3-ubuntu/Dockerfile   |  26 +
 3.5.0/scala2.12-java17-r-ubuntu/Dockerfile |  28 +
 3.5.0/scala2.12-java17-ubuntu/Dockerfile   |  79 +
 3.5.0/scala2.12-java17-ubuntu/entrypoint.sh| 130 +
 add-dockerfiles.sh |  23 +++-
 tools/ci_runner_cleaner/free_disk_space.sh |  53 +
 .../ci_runner_cleaner/free_disk_space_container.sh |  33 ++
 tools/template.py  |   2 +-
 versions.json  |  29 +
 14 files changed, 454 insertions(+), 8 deletions(-)

diff --git a/.github/workflows/build_3.5.0.yaml 
b/.github/workflows/build_3.5.0.yaml
index 6eb3ad6..9f2b2d6 100644
--- a/.github/workflows/build_3.5.0.yaml
+++ b/.github/workflows/build_3.5.0.yaml
@@ -31,11 +31,12 @@ jobs:
 strategy:
   matrix:
 image-type: ["all", "python", "scala", "r"]
+java: [11, 17]
 name: Run
 secrets: inherit
 uses: ./.github/workflows/main.yml
 with:
   spark: 3.5.0
   scala: 2.12
-  java: 11
+  java: ${{ matrix.java }}
   image-type: ${{ matrix.image-type }}
diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml
index fe755ed..145b529 100644
--- a/.github/workflows/main.yml
+++ b/.github/workflows/main.yml
@@ -79,6 +79,14 @@ jobs:
   - name: Checkout Spark Docker repository
 uses: actions/checkout@v3
 
+  - name: Free up disk space
+shell: 'script -q -e -c "bash {0}"'
+run: |
+  chmod +x tools/ci_runner_cleaner/free_disk_space_container.sh
+  tools/ci_runner_cleaner/free_disk_space_container.sh
+  chmod +x tools/ci_runner_cleaner/free_disk_space.sh
+  tools/ci_runner_cleaner/free_disk_space.sh
+
   - name: Prepare - Generate tags
 run: |
   case "${{ inputs.image-type }}" in
@@ -195,7 +203,8 @@ jobs:
   - name : Test - Run spark application for standalone cluster on docker
 run: testing/run_tests.sh --image-url $IMAGE_URL --scala-version ${{ 
inputs.scala }} --spark-version ${{ inputs.spark }}
 
-  - name: Test - Checkout Spark repository
+  - name: Test - Checkout Spark repository for Spark 3.3.0 (with 
fetch-depth 0)
+if: inputs.spark == '3.3.0'
 uses: actions/checkout@v3
 with:
   fetch-depth: 0
@@ -203,6 +212,14 @@ jobs:
   ref: v${{ inputs.spark }}
   path: ${{ github.workspace }}/spark
 
+  - name: Test - Checkout Spark repository 
+if: inputs.spark != '3.3.0'
+uses: actions/checkout@v3
+with:
+  repository: apache/spark
+  ref: v${{ inputs.spark }}
+  path: ${{ github.workspace }}/spark 
+
   - name: Test - Cherry pick commits
 # Apache Spark enable resource limited k8s IT since v3.3.1, 
cherry-pick patches for old release
 # https://github.com/apache/spark/pull/36087#issuecomment-1251756266
@@ -247,6 +264,7 @@ jobs:
   # TODO(SPARK-44495): Resume to use the latest minikube for 
k8s-integration-tests.
   curl -LO 
https://storage.googleapis.com/minikube/releases/v1.30.1/minikube-linux-amd64
   sudo install minikube-linux-amd64 /usr/local/bin/minikube
+  rm minikube-linux-amd64
   # Github Action limit cpu:2, memory: 6947MB, limit to 2U6G for 
better resource statistic
   minikube start --cpus 2 --memory 6144
 
diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml
index 8cf

(spark) branch branch-3.4 updated: [SPARK-45814][CONNECT][SQL][3.4] Make ArrowConverters.createEmptyArrowBatch call close() to avoid memory leak

2023-11-09 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 259ac250017 [SPARK-45814][CONNECT][SQL][3.4] Make 
ArrowConverters.createEmptyArrowBatch call close() to avoid memory leak
259ac250017 is described below

commit 259ac250017bcc1805f6cb44a5e7eedf9e552a98
Author: xieshuaihu 
AuthorDate: Fri Nov 10 12:33:24 2023 +0800

[SPARK-45814][CONNECT][SQL][3.4] Make ArrowConverters.createEmptyArrowBatch 
call close() to avoid memory leak

### What changes were proposed in this pull request?

Make `ArrowBatchIterator` implement `AutoCloseable` and 
`ArrowConverters.createEmptyArrowBatch()` call close() to avoid memory leak.

### Why are the changes needed?

`ArrowConverters.createEmptyArrowBatch` don't call `super.hasNext`, if 
`TaskContext.get` returns `None`, then memory allocated in `ArrowBatchIterator` 
is leaked.

In spark connect, `createEmptyArrowBatch` is called in 
[SparkConnectPlanner](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L2558)
 and 
[SparkConnectPlanExecution](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/SparkConnectPlanExecution.scala#L224),
 which cause a long running driver consume all off-heap memory sp [...]

This is the exception stack:
```
org.apache.arrow.memory.OutOfMemoryException: Failure allocating buffer.
at 
io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:67)
at 
org.apache.arrow.memory.NettyAllocationManager.(NettyAllocationManager.java:77)
at 
org.apache.arrow.memory.NettyAllocationManager.(NettyAllocationManager.java:84)
at 
org.apache.arrow.memory.NettyAllocationManager$1.create(NettyAllocationManager.java:34)
at 
org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:354)
at 
org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:349)
at 
org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:337)
at 
org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:315)
at 
org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:279)
at 
org.apache.arrow.vector.BaseValueVector.allocFixedDataAndValidityBufs(BaseValueVector.java:192)
at 
org.apache.arrow.vector.BaseFixedWidthVector.allocateBytes(BaseFixedWidthVector.java:338)
at 
org.apache.arrow.vector.BaseFixedWidthVector.allocateNew(BaseFixedWidthVector.java:308)
at 
org.apache.arrow.vector.BaseFixedWidthVector.allocateNew(BaseFixedWidthVector.java:273)
at 
org.apache.spark.sql.execution.arrow.ArrowWriter$.$anonfun$create$1(ArrowWriter.scala:44)
at 
scala.collection.StrictOptimizedIterableOps.map(StrictOptimizedIterableOps.scala:100)
at 
scala.collection.StrictOptimizedIterableOps.map$(StrictOptimizedIterableOps.scala:87)
at 
scala.collection.convert.JavaCollectionWrappers$JListWrapper.map(JavaCollectionWrappers.scala:103)
at 
org.apache.spark.sql.execution.arrow.ArrowWriter$.create(ArrowWriter.scala:43)
at 
org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchIterator.(ArrowConverters.scala:93)
at 
org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchWithSchemaIterator.(ArrowConverters.scala:138)
at 
org.apache.spark.sql.execution.arrow.ArrowConverters$$anon$1.(ArrowConverters.scala:231)
at 
org.apache.spark.sql.execution.arrow.ArrowConverters$.createEmptyArrowBatch(ArrowConverters.scala:229)
at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleSqlCommand(SparkConnectPlanner.scala:2481)
at 
org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2426)
at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:202)
at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:158)
at 
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:132)
at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:189)
at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:189)
at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifa

commits@spark.apache.org

2023-11-09 Thread yao
This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new c53caddb6b1 [SPARK-45847][SQL][TESTS] CliSuite flakiness due to 
non-sequential guarantee for stdout&stderr
c53caddb6b1 is described below

commit c53caddb6b16390c969b4d0f8ee1ce8bc280ba8a
Author: Kent Yao 
AuthorDate: Thu Nov 9 16:23:38 2023 +0800

[SPARK-45847][SQL][TESTS] CliSuite flakiness due to non-sequential 
guarantee for stdout&stderr

### What changes were proposed in this pull request?

In CliSuite, This PR adds a retry for tests that write errors to STDERR.

### Why are the changes needed?

To fix flakiness tests as below

https://github.com/chenhao-db/apache-spark/actions/runs/6791437199/job/18463313766

https://github.com/dongjoon-hyun/spark/actions/runs/6753670527/job/18361206900

```sql
[info]   Spark master: local, Application Id: local-1699402393189
[info]   spark-sql> /* SELECT /*+ HINT() 4; */;
[info]
[info]   [PARSE_SYNTAX_ERROR] Syntax error at or near ';'. SQLSTATE: 42601 
(line 1, pos 26)
[info]
[info]   == SQL ==
[info]   /* SELECT /*+ HINT() 4; */;
[info]   --^^^
[info]
[info]   spark-sql> /* SELECT /*+ HINT() 4; */ SELECT 1;
[info]   1
[info]   Time taken: 1.499 seconds, Fetched 1 row(s)
[info]
[info]   [UNCLOSED_BRACKETED_COMMENT] Found an unclosed bracketed comment. 
Please, append */ at the end of the comment. SQLSTATE: 42601
[info]   == SQL ==
[info]   /* Here is a unclosed bracketed comment SELECT 1;
[info]   spark-sql> /* Here is a unclosed bracketed comment SELECT 1;
[info]   spark-sql> /* SELECT /*+ HINT() */ 4; */;
[info]   spark-sql>
```

As you can see the fragment above, the query on the 3rd line from the 
bottom, came from STDOUT, was printed later than its error output, came from 
STDERR.

In this scenario, the error output would not match anything and would 
simply go unnoticed. Finally, timed out and failed.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests and CI

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #43725 from yaooqinn/SPARK-45847.

Authored-by: Kent Yao 
Signed-off-by: Kent Yao 
(cherry picked from commit 06d8cbe073499ff16bca3165e2de1192daad3984)
Signed-off-by: Kent Yao 
---
 .../apache/spark/sql/hive/thriftserver/CliSuite.scala | 19 +--
 1 file changed, 9 insertions(+), 10 deletions(-)

diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 651c6b7aafb..1b91442c228 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -388,7 +388,7 @@ class CliSuite extends SparkFunSuite {
 )
   }
 
-  test("SPARK-11188 Analysis error reporting") {
+  testRetry("SPARK-11188 Analysis error reporting") {
 runCliWithin(timeout = 2.minute,
   errorResponses = Seq("AnalysisException"))(
   "select * from nonexistent_table;" -> "nonexistent_table"
@@ -556,7 +556,7 @@ class CliSuite extends SparkFunSuite {
 )
   }
 
-  test("SparkException with root cause will be printStacktrace") {
+  testRetry("SparkException with root cause will be printStacktrace") {
 // If it is not in silent mode, will print the stacktrace
 runCliWithin(
   1.minute,
@@ -580,8 +580,8 @@ class CliSuite extends SparkFunSuite {
 runCliWithin(1.minute)("SELECT MAKE_DATE(-44, 3, 15);" -> "-0044-03-15")
   }
 
-  test("SPARK-33100: Ignore a semicolon inside a bracketed comment in 
spark-sql") {
-runCliWithin(4.minute)(
+  testRetry("SPARK-33100: Ignore a semicolon inside a bracketed comment in 
spark-sql") {
+runCliWithin(1.minute)(
   "/* SELECT 'test';*/ SELECT 'test';" -> "test",
   ";;/* SELECT 'test';*/ SELECT 'test';" -> "test",
   "/* SELECT 'test';*/;; SELECT 'test';" -> "test",
@@ -628,8 +628,8 @@ class CliSuite extends SparkFunSuite {
 )
   }
 
-  test("SPARK-37555: spark-sql should pass last unclosed comment to backend") {
-runCliWithin(5.minute)(
+  testRetry("SPARK-37555: spark-sql should pass last unclosed comment to 
backend") {
+runCliWithin(1.minute)(
   // Only unclosed comment.
   "/* SELECT /*+ HINT() 4; */;".stripMargin -> "Syntax error at or near 
';'",
   // Unclosed nested bracketed comment.
@@ -642,7 +642,7 @@ class CliSuite extends SparkFunSuite {
 )
   }
 
-  test("SPARK-37694: delete [jar|file|archiv