[spark] branch master updated: [SPARK-44044][SS] Improve Error message for Window functions with streaming

2023-06-30 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f406b54b2a8 [SPARK-44044][SS] Improve Error message for Window 
functions with streaming
f406b54b2a8 is described below

commit f406b54b2a899d03bae2e6f70eef7fedfed63d65
Author: Siying Dong 
AuthorDate: Sat Jul 1 08:51:22 2023 +0300

[SPARK-44044][SS] Improve Error message for Window functions with streaming

### What changes were proposed in this pull request?
Replace existing error message when non-time window function is used with 
streaming to include aggregation function and column. The error message looks 
like following now:

org.apache.spark.sql.AnalysisException: Window function is not supported in 
'row_number()' as column 'rn_col' on streaming DataFrames/Datasets. Structured 
Streaming only supports time-window aggregation using the `window` unction. 
(window specification: '(PARTITION BY col1 ORDER BY col2 ASC NULLS FIRST ROWS 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)')

Note that the message is a little bit unnatural as the existing unit test 
requires the exception follows the pattern that it includes "not supported", 
"streaming" "DataFrames" and "Dataset".

### Why are the changes needed?
The exiting error message is vague and a full logical plan is included. A 
user reports that they aren't able to identify what the problem is.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Added a unit test

Closes #41578 from siying/window_error.

Lead-authored-by: Siying Dong 
Co-authored-by: Siying Dong 
Signed-off-by: Max Gekk 
---
 .../src/main/resources/error/error-classes.json|  5 
 .../analysis/UnsupportedOperationChecker.scala | 17 ++---
 .../spark/sql/errors/QueryExecutionErrors.scala| 16 -
 .../analysis/UnsupportedOperationsSuite.scala  | 24 ++-
 .../apache/spark/sql/streaming/StreamSuite.scala   | 28 ++
 5 files changed, 80 insertions(+), 10 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index eabd5533e13..14bd3bc6bac 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -1775,6 +1775,11 @@
 ],
 "sqlState" : "42000"
   },
+  "NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING" : {
+"message" : [
+  "Window function is not supported in  (as column 
) on streaming DataFrames/Datasets. Structured Streaming only 
supports time-window aggregation using the WINDOW function. (window 
specification: )"
+]
+  },
   "NOT_ALLOWED_IN_FROM" : {
 "message" : [
   "Not allowed in the FROM clause:"
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index daa7c0d54b7..2a09d85d8f2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -19,11 +19,12 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, BinaryComparison, CurrentDate, CurrentTimestampLike, 
Expression, GreaterThan, GreaterThanOrEqual, GroupingSets, LessThan, 
LessThanOrEqual, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, BinaryComparison, CurrentDate, CurrentTimestampLike, 
Expression, GreaterThan, GreaterThanOrEqual, GroupingSets, LessThan, 
LessThanOrEqual, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow, 
WindowExpression}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
+import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode}
 
@@ -508,8 +509,18 @@ object UnsupportedOperationChecker extends Logging {
 case Sample(_, _, _, _, child) if child.isStreaming =>
   throwError("Sampling is not supported on streaming 
DataFrames/Datasets")
 
-case Window(_, _, _, child) if child.isStreaming =>
-  throwError("Non-time-based windows are not supported on 

[spark] branch master updated: [SPARK-43851][SQL] Support LCA in grouping expressions

2023-06-30 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 9353d67f929 [SPARK-43851][SQL] Support LCA in grouping expressions
9353d67f929 is described below

commit 9353d67f9290bae1e7d7e16a2caf5256cc4e2f92
Author: Jia Fan 
AuthorDate: Sat Jul 1 08:48:10 2023 +0300

[SPARK-43851][SQL] Support LCA in grouping expressions

### What changes were proposed in this pull request?
This PR bring support lateral column alias reference in grouping 
expressions.

### Why are the changes needed?
add new feature for LCA

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
exist test

Closes #41804 from Hisoka-X/SPARK-43851_LCA_in_group.

Authored-by: Jia Fan 
Signed-off-by: Max Gekk 
---
 .../src/main/resources/error/error-classes.json|  5 -
 ...r-conditions-unsupported-feature-error-class.md |  4 
 .../analysis/ResolveReferencesInAggregate.scala| 22 --
 .../column-resolution-aggregate.sql.out| 26 +-
 .../results/column-resolution-aggregate.sql.out| 16 -
 5 files changed, 29 insertions(+), 44 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 3cc35d668e0..eabd5533e13 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -2530,11 +2530,6 @@
   "Referencing lateral column alias  in the aggregate query both 
with window expressions and with having clause. Please rewrite the aggregate 
query by removing the having clause or removing lateral alias reference in the 
SELECT list."
 ]
   },
-  "LATERAL_COLUMN_ALIAS_IN_GROUP_BY" : {
-"message" : [
-  "Referencing a lateral column alias via GROUP BY alias/ALL is not 
supported yet."
-]
-  },
   "LATERAL_COLUMN_ALIAS_IN_WINDOW" : {
 "message" : [
   "Referencing a lateral column alias  in window expression 
."
diff --git a/docs/sql-error-conditions-unsupported-feature-error-class.md 
b/docs/sql-error-conditions-unsupported-feature-error-class.md
index 64d7eb347e5..78bf301c49d 100644
--- a/docs/sql-error-conditions-unsupported-feature-error-class.md
+++ b/docs/sql-error-conditions-unsupported-feature-error-class.md
@@ -65,10 +65,6 @@ Referencing a lateral column alias `` in the aggregate 
function ``
 
 Referencing lateral column alias `` in the aggregate query both with 
window expressions and with having clause. Please rewrite the aggregate query 
by removing the having clause or removing lateral alias reference in the SELECT 
list.
 
-## LATERAL_COLUMN_ALIAS_IN_GROUP_BY
-
-Referencing a lateral column alias via GROUP BY alias/ALL is not supported yet.
-
 ## LATERAL_COLUMN_ALIAS_IN_WINDOW
 
 Referencing a lateral column alias `` in window expression ``.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInAggregate.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInAggregate.scala
index 09ae87b071f..41bcb337c67 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInAggregate.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInAggregate.scala
@@ -17,9 +17,8 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.catalyst.expressions.{AliasHelper, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{AliasHelper, Attribute, 
Expression, LateralColumnAliasReference, NamedExpression}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, AppendColumns, 
LogicalPlan}
 import 
org.apache.spark.sql.catalyst.trees.TreePattern.{LATERAL_COLUMN_ALIAS_REFERENCE,
 UNRESOLVED_ATTRIBUTE}
@@ -74,12 +73,6 @@ object ResolveReferencesInAggregate extends SQLConfHelper
 resolvedAggExprsWithOuter,
 resolveGroupByAlias(resolvedAggExprsWithOuter, 
resolvedGroupExprsNoOuter)
   ).map(resolveOuterRef)
-  // TODO: currently we don't support LCA in `groupingExpressions` yet.
-  if (resolved.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE))) {
-throw new AnalysisException(
-  errorClass = "UNSUPPORTED_FEATURE.LATERAL_COLUMN_ALIAS_IN_GROUP_BY",
-  messageParameters = Map.empty)
-  }
   resolved
 } else {
   // Do not resolve columns in grouping expressions to outer references 
here, 

[spark] branch master updated: [SPARK-44060][SQL] Code-gen for build side outer shuffled hash join

2023-06-30 Thread huaxingao
This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2db8cfb3bd9 [SPARK-44060][SQL] Code-gen for build side outer shuffled 
hash join
2db8cfb3bd9 is described below

commit 2db8cfb3bd9bf5e85379c6d5ca414d36cfd9292d
Author: Szehon Ho 
AuthorDate: Fri Jun 30 22:04:22 2023 -0700

[SPARK-44060][SQL] Code-gen for build side outer shuffled hash join

### What changes were proposed in this pull request?
Codegen of shuffled hash join of build side outer join (ie, left outer join 
build left or right outer join build right)

 ### Why are the changes needed?
The implementation of https://github.com/apache/spark/pull/41398 was only 
for non-codegen version, and codegen was disabled in this scenario.

 ### Does this PR introduce _any_ user-facing change?
No

 ### How was this patch tested?
New unit test in WholeStageCodegenSuite

Closes #41614 from szehon-ho/same_side_outer_join_codegen_master.

Authored-by: Szehon Ho 
Signed-off-by: huaxingao 
---
 .../org/apache/spark/sql/internal/SQLConf.scala|   9 ++
 .../sql/execution/joins/ShuffledHashJoinExec.scala |  68 ++
 .../scala/org/apache/spark/sql/JoinSuite.scala | 146 +++--
 .../sql/execution/WholeStageCodegenSuite.scala |  89 +
 4 files changed, 217 insertions(+), 95 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index d60f5d170e7..270508139e4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2182,6 +2182,15 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val ENABLE_BUILD_SIDE_OUTER_SHUFFLED_HASH_JOIN_CODEGEN =
+buildConf("spark.sql.codegen.join.buildSideOuterShuffledHashJoin.enabled")
+  .internal()
+  .doc("When true, enable code-gen for an OUTER shuffled hash join where 
outer side" +
+" is the build side.")
+  .version("3.5.0")
+  .booleanConf
+  .createWithDefault(true)
+
   val ENABLE_FULL_OUTER_SORT_MERGE_JOIN_CODEGEN =
 buildConf("spark.sql.codegen.join.fullOuterSortMergeJoin.enabled")
   .internal()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
index 8953bf19f35..974f6f9e50c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
@@ -340,8 +340,10 @@ case class ShuffledHashJoinExec(
 
   override def supportCodegen: Boolean = joinType match {
 case FullOuter => 
conf.getConf(SQLConf.ENABLE_FULL_OUTER_SHUFFLED_HASH_JOIN_CODEGEN)
-case LeftOuter if buildSide == BuildLeft => false
-case RightOuter if buildSide == BuildRight => false
+case LeftOuter if buildSide == BuildLeft =>
+  conf.getConf(SQLConf.ENABLE_BUILD_SIDE_OUTER_SHUFFLED_HASH_JOIN_CODEGEN)
+case RightOuter if buildSide == BuildRight =>
+  conf.getConf(SQLConf.ENABLE_BUILD_SIDE_OUTER_SHUFFLED_HASH_JOIN_CODEGEN)
 case _ => true
   }
 
@@ -362,9 +364,15 @@ case class ShuffledHashJoinExec(
   }
 
   override def doProduce(ctx: CodegenContext): String = {
-// Specialize `doProduce` code for full outer join, because full outer 
join needs to
-// iterate streamed and build side separately.
-if (joinType != FullOuter) {
+// Specialize `doProduce` code for full outer join and build-side outer 
join,
+// because we need to iterate streamed and build side separately.
+val specializedProduce = joinType match {
+  case FullOuter => true
+  case LeftOuter if buildSide == BuildLeft => true
+  case RightOuter if buildSide == BuildRight => true
+  case _ => false
+}
+if (!specializedProduce) {
   return super.doProduce(ctx)
 }
 
@@ -407,21 +415,24 @@ case class ShuffledHashJoinExec(
   case BuildLeft => buildResultVars ++ streamedResultVars
   case BuildRight => streamedResultVars ++ buildResultVars
 }
-val consumeFullOuterJoinRow = ctx.freshName("consumeFullOuterJoinRow")
-ctx.addNewFunction(consumeFullOuterJoinRow,
+val consumeOuterJoinRow = ctx.freshName("consumeOuterJoinRow")
+ctx.addNewFunction(consumeOuterJoinRow,
   s"""
- |private void $consumeFullOuterJoinRow() throws java.io.IOException {
+ |private void $consumeOuterJoinRow() throws java.io.IOException {
  |  ${metricTerm(ctx, "numOutputRows")}.add(1);
  |  ${consume(ctx, resultVars)}
  |}
 

[spark] branch branch-3.3 updated: [SPARK-42784] should still create subDir when the number of subDir in merge dir is less than conf

2023-06-30 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new e9b525e2054 [SPARK-42784] should still create subDir when the number 
of subDir in merge dir is less than conf
e9b525e2054 is described below

commit e9b525e205402ac458db682802771544ced86758
Author: meifencheng 
AuthorDate: Fri Jun 30 22:50:14 2023 -0500

[SPARK-42784] should still create subDir when the number of subDir in merge 
dir is less than conf

### What changes were proposed in this pull request?
Fixed a minor issue with diskBlockManager after push-based shuffle is 
enabled

### Why are the changes needed?
this bug will affect the efficiency of push based shuffle

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test

Closes #40412 from Stove-hust/feature-42784.

Authored-by: meifencheng 
Signed-off-by: Mridul Muralidharan gmail.com>
(cherry picked from commit 35d51571a803b8fa7d14542236276425b517d3af)
Signed-off-by: Mridul Muralidharan 
---
 core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala   | 2 +-
 .../test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala   | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index e29f3fc1b80..b6f36fb6cd1 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -273,7 +273,7 @@ private[spark] class DiskBlockManager(
   Utils.getConfiguredLocalDirs(conf).foreach { rootDir =>
 try {
   val mergeDir = new File(rootDir, mergeDirName)
-  if (!mergeDir.exists()) {
+  if (!mergeDir.exists() || mergeDir.listFiles().length < 
subDirsPerLocalDir) {
 // This executor does not find merge_manager directory, it will 
try to create
 // the merge_manager directory and the sub directories.
 logDebug(s"Try to create $mergeDir and its sub dirs since the " +
diff --git 
a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 3e4002614ca..eb8fea827bf 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -110,8 +110,8 @@ class DiskBlockManagerSuite extends SparkFunSuite with 
BeforeAndAfterEach with B
 assert(Utils.getConfiguredLocalDirs(testConf).map(
   rootDir => new File(rootDir, DiskBlockManager.MERGE_DIRECTORY))
   .filter(mergeDir => mergeDir.exists()).length === 2)
-// mergeDir0 will be skipped as it already exists
-assert(mergeDir0.list().length === 0)
+// mergeDir0 can not be skipped even if it already exists
+assert(mergeDir0.list().length === 
testConf.get(config.DISKSTORE_SUB_DIRECTORIES))
 // Sub directories get created under mergeDir1
 assert(mergeDir1.list().length === 
testConf.get(config.DISKSTORE_SUB_DIRECTORIES))
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.4 updated: [SPARK-42784] should still create subDir when the number of subDir in merge dir is less than conf

2023-06-30 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 9ec84ca47d8 [SPARK-42784] should still create subDir when the number 
of subDir in merge dir is less than conf
9ec84ca47d8 is described below

commit 9ec84ca47d8624d8deb8c29a8b9b593bb7c27669
Author: meifencheng 
AuthorDate: Fri Jun 30 22:50:14 2023 -0500

[SPARK-42784] should still create subDir when the number of subDir in merge 
dir is less than conf

### What changes were proposed in this pull request?
Fixed a minor issue with diskBlockManager after push-based shuffle is 
enabled

### Why are the changes needed?
this bug will affect the efficiency of push based shuffle

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test

Closes #40412 from Stove-hust/feature-42784.

Authored-by: meifencheng 
Signed-off-by: Mridul Muralidharan gmail.com>
(cherry picked from commit 35d51571a803b8fa7d14542236276425b517d3af)
Signed-off-by: Mridul Muralidharan 
---
 core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala   | 2 +-
 .../test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala   | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index a7ed9226c57..a3eac701a24 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -273,7 +273,7 @@ private[spark] class DiskBlockManager(
   Utils.getConfiguredLocalDirs(conf).foreach { rootDir =>
 try {
   val mergeDir = new File(rootDir, mergeDirName)
-  if (!mergeDir.exists()) {
+  if (!mergeDir.exists() || mergeDir.listFiles().length < 
subDirsPerLocalDir) {
 // This executor does not find merge_manager directory, it will 
try to create
 // the merge_manager directory and the sub directories.
 logDebug(s"Try to create $mergeDir and its sub dirs since the " +
diff --git 
a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index ac896c0b17a..48610cbc025 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -108,8 +108,8 @@ class DiskBlockManagerSuite extends SparkFunSuite {
 assert(Utils.getConfiguredLocalDirs(testConf).map(
   rootDir => new File(rootDir, DiskBlockManager.MERGE_DIRECTORY))
   .filter(mergeDir => mergeDir.exists()).length === 2)
-// mergeDir0 will be skipped as it already exists
-assert(mergeDir0.list().length === 0)
+// mergeDir0 can not be skipped even if it already exists
+assert(mergeDir0.list().length === 
testConf.get(config.DISKSTORE_SUB_DIRECTORIES))
 // Sub directories get created under mergeDir1
 assert(mergeDir1.list().length === 
testConf.get(config.DISKSTORE_SUB_DIRECTORIES))
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-42784] should still create subDir when the number of subDir in merge dir is less than conf

2023-06-30 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 35d51571a80 [SPARK-42784] should still create subDir when the number 
of subDir in merge dir is less than conf
35d51571a80 is described below

commit 35d51571a803b8fa7d14542236276425b517d3af
Author: meifencheng 
AuthorDate: Fri Jun 30 22:50:14 2023 -0500

[SPARK-42784] should still create subDir when the number of subDir in merge 
dir is less than conf

### What changes were proposed in this pull request?
Fixed a minor issue with diskBlockManager after push-based shuffle is 
enabled

### Why are the changes needed?
this bug will affect the efficiency of push based shuffle

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test

Closes #40412 from Stove-hust/feature-42784.

Authored-by: meifencheng 
Signed-off-by: Mridul Muralidharan gmail.com>
---
 core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala   | 2 +-
 .../test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala   | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 971647be06e..0427fbd9b62 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -273,7 +273,7 @@ private[spark] class DiskBlockManager(
   Utils.getConfiguredLocalDirs(conf).foreach { rootDir =>
 try {
   val mergeDir = new File(rootDir, mergeDirName)
-  if (!mergeDir.exists()) {
+  if (!mergeDir.exists() || mergeDir.listFiles().length < 
subDirsPerLocalDir) {
 // This executor does not find merge_manager directory, it will 
try to create
 // the merge_manager directory and the sub directories.
 logDebug(s"Try to create $mergeDir and its sub dirs since the " +
diff --git 
a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index ac896c0b17a..48610cbc025 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -108,8 +108,8 @@ class DiskBlockManagerSuite extends SparkFunSuite {
 assert(Utils.getConfiguredLocalDirs(testConf).map(
   rootDir => new File(rootDir, DiskBlockManager.MERGE_DIRECTORY))
   .filter(mergeDir => mergeDir.exists()).length === 2)
-// mergeDir0 will be skipped as it already exists
-assert(mergeDir0.list().length === 0)
+// mergeDir0 can not be skipped even if it already exists
+assert(mergeDir0.list().length === 
testConf.get(config.DISKSTORE_SUB_DIRECTORIES))
 // Sub directories get created under mergeDir1
 assert(mergeDir1.list().length === 
testConf.get(config.DISKSTORE_SUB_DIRECTORIES))
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [MINOR] Fix Typo in `build/mvn` script

2023-06-30 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 482e852b853 [MINOR] Fix Typo in `build/mvn` script
482e852b853 is described below

commit 482e852b85345fa758a03e06be29fc506d33fe27
Author: slfan1989 
AuthorDate: Fri Jun 30 18:03:27 2023 -0700

[MINOR] Fix Typo in `build/mvn` script

### What changes were proposed in this pull request?

I'm trying to compile the Spark code and I noticed a spelling mistake in 
the Maven file. I would like to fix this minor issue.

### Why are the changes needed?

fix typo.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

No unit test required.

Closes #41810 from slfan1989/fix-typo.

Authored-by: slfan1989 
Signed-off-by: Dongjoon Hyun 
---
 build/mvn | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/build/mvn b/build/mvn
index aee9358fe44..3179099304c 100755
--- a/build/mvn
+++ b/build/mvn
@@ -92,7 +92,7 @@ install_app() {
   if [ -f "${local_checksum}" ]; then
 echo "  ${local_tarball}" >> ${local_checksum} # two spaces + file are 
important!
 # Assuming SHA512 here for now
-echo "Veryfing checksum from ${local_checksum}" 1>&2
+echo "Verifying checksum from ${local_checksum}" 1>&2
 if ! shasum -a 512 -c "${local_checksum}" > /dev/null ; then
   echo "Bad checksum from ${remote_checksum}"
   exit 2


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44259][CONNECT][TESTS] Make `connect-client-jvm` pass on Java 21 except `RemoteSparkSession`-based tests

2023-06-30 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8c635a0fa558 [SPARK-44259][CONNECT][TESTS] Make `connect-client-jvm` 
pass on Java 21 except `RemoteSparkSession`-based tests
8c635a0fa558 is described below

commit 8c635a0fa5584b35d6dd2e5fb774a2a8de7201a2
Author: yangjie01 
AuthorDate: Fri Jun 30 17:30:20 2023 -0700

[SPARK-44259][CONNECT][TESTS] Make `connect-client-jvm` pass on Java 21 
except `RemoteSparkSession`-based tests

### What changes were proposed in this pull request?
This pr ignore all tests inherit `RemoteSparkSession` as default for Java 
21 by override the `test` function in `RemoteSparkSession`,  they are all 
arrow-based tests due to the use of arrow data format for rpc communication in 
connect.

```
23/06/30 11:45:41 ERROR SparkConnectService: Error during: execute. UserId: 
. SessionId: e7479b73-d02c-47e9-85c8-40b3e9315561.
java.lang.UnsupportedOperationException: sun.misc.Unsafe or 
java.nio.DirectByteBuffer.(long, int) not available
at 
org.apache.arrow.memory.util.MemoryUtil.directBuffer(MemoryUtil.java:174)
at 
org.apache.arrow.memory.ArrowBuf.getDirectBuffer(ArrowBuf.java:229)
at org.apache.arrow.memory.ArrowBuf.nioBuffer(ArrowBuf.java:224)
at 
org.apache.arrow.vector.ipc.WriteChannel.write(WriteChannel.java:133)
at 
org.apache.arrow.vector.ipc.message.MessageSerializer.writeBatchBuffers(MessageSerializer.java:303)
at 
org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:276)
at 
org.apache.arrow.vector.ipc.message.MessageSerializer.serialize(MessageSerializer.java:237)
at 
org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchWithSchemaIterator.$anonfun$next$3(ArrowConverters.scala:174)
at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1487)
at 
org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchWithSchemaIterator.next(ArrowConverters.scala:181)
at 
org.apache.spark.sql.execution.arrow.ArrowConverters$ArrowBatchWithSchemaIterator.next(ArrowConverters.scala:128)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at 
org.apache.spark.sql.connect.service.SparkConnectStreamHandler$.processAsArrowBatches(SparkConnectStreamHandler.scala:178)
at 
org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handlePlan(SparkConnectStreamHandler.scala:104)
at 
org.apache.spark.sql.connect.service.SparkConnectStreamHandler.$anonfun$handle$1(SparkConnectStreamHandler.scala:86)
at 
org.apache.spark.sql.connect.service.SparkConnectStreamHandler.$anonfun$handle$1$adapted(SparkConnectStreamHandler.scala:53)
at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$3(SessionHolder.scala:152)
at 
org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:857)
at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:152)
at 
org.apache.spark.JobArtifactSet$.withActive(JobArtifactSet.scala:109)
at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withContext$1(SessionHolder.scala:122)
at 
org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:209)
at 
org.apache.spark.sql.connect.service.SessionHolder.withContext(SessionHolder.scala:121)
at 
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:151)
at 
org.apache.spark.sql.connect.service.SessionHolder.withSessionBasedPythonPaths(SessionHolder.scala:137)
at 
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:150)
at 
org.apache.spark.sql.connect.service.SparkConnectStreamHandler.handle(SparkConnectStreamHandler.scala:53)
at 
org.apache.spark.sql.connect.service.SparkConnectService.executePlan(SparkConnectService.scala:166)
at 
org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:584)
at 
org.sparkproject.connect.grpc.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
at 
org.sparkproject.connect.grpc.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:346)
at 

[spark] branch master updated: [SPARK-44256][BUILD] Upgrade rocksdbjni to 8.3.2

2023-06-30 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6d8d11be6a4 [SPARK-44256][BUILD] Upgrade rocksdbjni to 8.3.2
6d8d11be6a4 is described below

commit 6d8d11be6a4586f959b33726293f2014c3b98dfa
Author: panbingkun 
AuthorDate: Fri Jun 30 17:26:30 2023 -0700

[SPARK-44256][BUILD] Upgrade rocksdbjni to 8.3.2

### What changes were proposed in this pull request?
This pr aims to upgrade rocksdbjni from 8.1.1.1 to 8.3.2.

### Why are the changes needed?
The release notes: https://github.com/facebook/rocksdb/releases/tag/v8.3.2
- Bug Fixes
Reduced cases of illegally using Env::Default() during static destruction 
by never destroying the internal PosixEnv itself (except for builds checking 
for memory leaks). (https://github.com/facebook/rocksdb/pull/11538)

- Performance Improvements
Fixed higher read QPS during DB::Open() reading files created prior to 
https://github.com/facebook/rocksdb/pull/11406, especially when reading many 
small file (size < 52 MB) during DB::Open() and partitioned filter or index is 
used.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
- Pass GA.
- Manual test: `org.apache.spark.util.kvstore.RocksDBBenchmark`

**A.8.1.1.1**
```
count   meanmin max 
95th
dbClose 4   0.472   0.413   
0.527   0.527
dbCreation  4   263.388 42.032  
922.916 922.916
naturalIndexCreateIterator  10240.005   0.001   
2.646   0.004
naturalIndexDescendingCreateIterator10240.002   0.002   
0.055   0.002
naturalIndexDescendingIteration 10240.003   0.003   
0.021   0.004
naturalIndexIteration   10240.009   0.003   
3.156   0.012
randomDeleteIndexed 10240.017   0.013   
0.381   0.023
randomDeletesNoIndex10240.010   0.009   
0.032   0.011
randomUpdatesIndexed10240.066   0.025   
19.900  0.074
randomUpdatesNoIndex10240.017   0.015   
0.380   0.019
randomWritesIndexed 10240.097   0.024   
52.970  0.093
randomWritesNoIndex 10240.019   0.015   
1.101   0.021
refIndexCreateIterator  10240.002   0.002   
0.044   0.002
refIndexDescendingCreateIterator10240.001   0.001   
0.013   0.001
refIndexDescendingIteration 10240.004   0.003   
0.070   0.005
refIndexIteration   10240.005   0.003   
0.230   0.005
sequentialDeleteIndexed 10240.016   0.013   
0.104   0.022
sequentialDeleteNoIndex 10240.011   0.009   
0.044   0.011
sequentialUpdatesIndexed10240.027   0.019   
0.660   0.050
sequentialUpdatesNoIndex10240.025   0.016   
0.523   0.033
sequentialWritesIndexed 10240.030   0.020   
1.526   0.040
sequentialWritesNoIndex 10240.030   0.017   
4.410   0.035
```
**B.8.3.2**
```
count   meanmin max 
95th
dbClose 4   0.488   0.424   
0.556   0.556
dbCreation  4   241.375 35.710  
850.488 850.488
naturalIndexCreateIterator  10240.004   0.001   
1.555   0.006
naturalIndexDescendingCreateIterator10240.002   0.002   
0.064   0.002
naturalIndexDescendingIteration 10240.004   0.003   
0.035   0.004
naturalIndexIteration   10240.011   0.003   
4.464   0.012
randomDeleteIndexed 10240.018   0.013   
0.505   0.024
randomDeletesNoIndex10240.010   0.009   
0.025   0.011
randomUpdatesIndexed10240.065   0.024   
20.210  0.077
randomUpdatesNoIndex10240.019   0.015   
0.449   0.027
randomWritesIndexed 10240.087   0.026   
38.782  0.096
randomWritesNoIndex 10240.019   0.016   
1.040   0.019
refIndexCreateIterator  10240.002   0.002   
0.051   0.002
refIndexDescendingCreateIterator10240.001   0.001   
0.013   0.001

[spark] branch master updated: [SPARK-41487][SQL] Assign name to _LEGACY_ERROR_TEMP_1020

2023-06-30 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 706829d9731 [SPARK-41487][SQL] Assign name to _LEGACY_ERROR_TEMP_1020
706829d9731 is described below

commit 706829d97312c6812bf791d9893d0a70d81676ae
Author: itholic 
AuthorDate: Fri Jun 30 21:25:04 2023 +0300

[SPARK-41487][SQL] Assign name to _LEGACY_ERROR_TEMP_1020

### What changes were proposed in this pull request?

This PR proposes to assign name to _LEGACY_ERROR_TEMP_1020, 
"INVALID_USAGE_OF_STAR_OR_REGEX".

### Why are the changes needed?

We should assign proper name to _LEGACY_ERROR_TEMP_*

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

`./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*`

Closes #39702 from itholic/LEGACY_1020.

Authored-by: itholic 
Signed-off-by: Max Gekk 
---
 .../src/main/resources/error/error-classes.json| 11 +--
 .../spark/sql/catalyst/analysis/Analyzer.scala |  2 +-
 .../spark/sql/errors/QueryCompilationErrors.scala  |  2 +-
 .../sql/catalyst/analysis/AnalysisErrorSuite.scala | 86 --
 .../catalyst/analysis/ResolveSubquerySuite.scala   |  6 +-
 .../org/apache/spark/sql/DataFrameSuite.scala  | 11 ++-
 .../scala/org/apache/spark/sql/DatasetSuite.scala  |  4 +-
 7 files changed, 82 insertions(+), 40 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index abe88db1267..3cc35d668e0 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -1596,6 +1596,12 @@
   "The url is invalid: . If necessary set  to \"false\" 
to bypass this error."
 ]
   },
+  "INVALID_USAGE_OF_STAR_OR_REGEX" : {
+"message" : [
+  "Invalid usage of  in ."
+],
+"sqlState" : "42000"
+  },
   "INVALID_VIEW_TEXT" : {
 "message" : [
   "The view  cannot be displayed due to invalid view text: 
. This may be caused by an unauthorized modification of the view or 
an incorrect query syntax. Please check your query syntax and verify that the 
view has not been tampered with."
@@ -3169,11 +3175,6 @@
   " is a permanent view, which is not supported by streaming 
reading API such as `DataStreamReader.table` yet."
 ]
   },
-  "_LEGACY_ERROR_TEMP_1020" : {
-"message" : [
-  "Invalid usage of  in ."
-]
-  },
   "_LEGACY_ERROR_TEMP_1021" : {
 "message" : [
   "count(.*) is not allowed. Please use count(*) or expand 
the columns manually, e.g. count(col1, col2)."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 32cec909401..b61dbae686b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1897,7 +1897,7 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
   })
 // count(*) has been replaced by count(1)
 case o if containsStar(o.children) =>
-  throw QueryCompilationErrors.invalidStarUsageError(s"expression 
'${o.prettyName}'",
+  throw QueryCompilationErrors.invalidStarUsageError(s"expression 
`${o.prettyName}`",
 extractStar(o.children))
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 94cbf880b57..e02708105d2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -475,7 +475,7 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
 }
 val elem = Seq(starMsg, resExprMsg).flatten.mkString(" and ")
 new AnalysisException(
-  errorClass = "_LEGACY_ERROR_TEMP_1020",
+  errorClass = "INVALID_USAGE_OF_STAR_OR_REGEX",
   messageParameters = Map("elem" -> elem, "prettyName" -> prettyName))
   }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index f994c03..fdaeadc5445 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -332,10 +332,12 @@ class AnalysisErrorSuite extends AnalysisTest 

[spark] branch master updated: [SPARK-43986][SQL] Create error classes for HyperLogLog function call failures

2023-06-30 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ab67f461987 [SPARK-43986][SQL] Create error classes for HyperLogLog 
function call failures
ab67f461987 is described below

commit ab67f4619873f21b5dcf7f67658afce7e1028657
Author: Daniel Tenedorio 
AuthorDate: Fri Jun 30 19:44:14 2023 +0300

[SPARK-43986][SQL] Create error classes for HyperLogLog function call 
failures

### What changes were proposed in this pull request?

This PR creates error classes for HyperLogLog function call failures.

### Why are the changes needed?

These replace previous Java exceptions or other cases, in order to improve 
the user experience and bring consistency with other parts of Spark.

### Does this PR introduce _any_ user-facing change?

Yes, error messages change slightly.

### How was this patch tested?

This PR also adds SQL query test files for the HLL functions.

Closes #41486 from dtenedor/hll-error-classes.

Authored-by: Daniel Tenedorio 
Signed-off-by: Max Gekk 
---
 .../src/main/resources/error/error-classes.json|  15 +
 .../aggregate/datasketchesAggregates.scala |  71 +++--
 .../expressions/datasketchesExpressions.scala  |  29 +-
 .../spark/sql/errors/QueryExecutionErrors.scala|  26 ++
 .../sql-tests/analyzer-results/hll.sql.out | 215 +
 .../src/test/resources/sql-tests/inputs/hll.sql|  76 +
 .../test/resources/sql-tests/results/hll.sql.out   | 262 
 .../apache/spark/sql/DataFrameAggregateSuite.scala | 338 -
 8 files changed, 850 insertions(+), 182 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index db6b9a97012..abe88db1267 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -782,6 +782,21 @@
   "The expression  cannot be used as a grouping expression 
because its data type  is not an orderable data type."
 ]
   },
+  "HLL_INVALID_INPUT_SKETCH_BUFFER" : {
+"message" : [
+  "Invalid call to ; only valid HLL sketch buffers are supported 
as inputs (such as those produced by the `hll_sketch_agg` function)."
+]
+  },
+  "HLL_INVALID_LG_K" : {
+"message" : [
+  "Invalid call to ; the `lgConfigK` value must be between  
and , inclusive: ."
+]
+  },
+  "HLL_UNION_DIFFERENT_LG_K" : {
+"message" : [
+  "Sketches have different `lgConfigK` values:  and . Set the 
`allowDifferentLgConfigK` parameter to true to call  with different 
`lgConfigK` values."
+]
+  },
   "IDENTIFIER_TOO_MANY_NAME_PARTS" : {
 "message" : [
   " is not a valid identifier as it has more than 2 name 
parts."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala
index 8b24efe12b4..17c69f798d8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala
@@ -17,23 +17,23 @@
 
 package org.apache.spark.sql.catalyst.expressions.aggregate
 
-import org.apache.datasketches.SketchesArgumentException
 import org.apache.datasketches.hll.{HllSketch, TgtHllType, Union}
 import org.apache.datasketches.memory.Memory
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression, ExpressionDescription, Literal}
 import org.apache.spark.sql.catalyst.trees.BinaryLike
+import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.types.{AbstractDataType, BinaryType, BooleanType, 
DataType, IntegerType, LongType, StringType, TypeCollection}
 import org.apache.spark.unsafe.types.UTF8String
 
 
 /**
- * The HllSketchAgg function utilizes a Datasketches HllSketch instance to
- * count a probabilistic approximation of the number of unique values in
- * a given column, and outputs the binary representation of the HllSketch.
+ * The HllSketchAgg function utilizes a Datasketches HllSketch instance to 
count a probabilistic
+ * approximation of the number of unique values in a given column, and outputs 
the binary
+ * representation of the HllSketch.
  *
- * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more 
information
+ * See [[https://datasketches.apache.org/docs/HLL/HLL.html]] for more 
information.
  *
  * @param left child expression against which unique counting will occur
  * @param right the log-base-2 of K, 

[spark] branch master updated: [SPARK-44260][SQL] Assign names to the error class _LEGACY_ERROR_TEMP_[1215-1245-2329] & Use checkError() to check Exception in *CharVarchar*Suite

2023-06-30 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3fb9a2c6135 [SPARK-44260][SQL] Assign names to the error class 
_LEGACY_ERROR_TEMP_[1215-1245-2329] & Use checkError() to check Exception in 
*CharVarchar*Suite
3fb9a2c6135 is described below

commit 3fb9a2c6135d49cc7b80546c0f228d7d2bc78bf6
Author: panbingkun 
AuthorDate: Fri Jun 30 18:36:46 2023 +0300

[SPARK-44260][SQL] Assign names to the error class 
_LEGACY_ERROR_TEMP_[1215-1245-2329] & Use checkError() to check Exception in 
*CharVarchar*Suite

### What changes were proposed in this pull request?
The pr aims to:
1.Assign clear error class names for some logic in 
`CharVarcharCodegenUtils` that directly uses exceptions
- EXCEED_LIMIT_LENGTH

2.Assign names to the error class
- _LEGACY_ERROR_TEMP_1215 -> UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING
- _LEGACY_ERROR_TEMP_1245 -> NOT_SUPPORTED_CHANGE_COLUMN
- _LEGACY_ERROR_TEMP_2329  -> merge to 
NOT_SUPPORTED_CHANGE_COLUMN(_LEGACY_ERROR_TEMP_1245)

3.Use checkError() to check Exception in `*CharVarchar*Suite`

### Why are the changes needed?
The changes improve the error framework.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
- Update UT
- Pass GA
- Manually test.

Closes #41768 from panbingkun/CharVarchar_checkError.

Authored-by: panbingkun 
Signed-off-by: Max Gekk 
---
 .../src/main/resources/error/error-classes.json|  30 +--
 .../spark/sql/jdbc/v2/DB2IntegrationSuite.scala|  19 +-
 .../sql/jdbc/v2/MsSqlServerIntegrationSuite.scala  |  19 +-
 .../spark/sql/jdbc/v2/MySQLIntegrationSuite.scala  |  19 +-
 .../spark/sql/jdbc/v2/OracleIntegrationSuite.scala |  19 +-
 .../sql/jdbc/v2/PostgresIntegrationSuite.scala |  19 +-
 .../sql/catalyst/util/CharVarcharCodegenUtils.java |   3 +-
 .../sql/catalyst/analysis/CheckAnalysis.scala  |  11 +-
 .../spark/sql/errors/QueryCompilationErrors.scala  |  17 +-
 .../spark/sql/errors/QueryExecutionErrors.scala|   7 +
 .../apache/spark/sql/execution/command/ddl.scala   |   3 +-
 .../analyzer-results/change-column.sql.out |  11 +-
 .../sql-tests/analyzer-results/charvarchar.sql.out |  11 +-
 .../sql-tests/results/change-column.sql.out|  11 +-
 .../sql-tests/results/charvarchar.sql.out  |  11 +-
 .../apache/spark/sql/CharVarcharTestSuite.scala| 291 +
 .../spark/sql/connector/AlterTableTests.scala  |  25 +-
 .../execution/command/CharVarcharDDLTestBase.scala | 120 +++--
 .../spark/sql/HiveCharVarcharTestSuite.scala   |  12 +-
 19 files changed, 494 insertions(+), 164 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 1b2a1ce305a..db6b9a97012 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -680,6 +680,11 @@
   "The event time  has the invalid type , but 
expected \"TIMESTAMP\"."
 ]
   },
+  "EXCEED_LIMIT_LENGTH" : {
+"message" : [
+  "Exceeds char/varchar type length limitation: ."
+]
+  },
   "EXPRESSION_TYPE_IS_NOT_ORDERABLE" : {
 "message" : [
   "Column expression  cannot be sorted because its type  
is not orderable."
@@ -1817,6 +1822,11 @@
 },
 "sqlState" : "42000"
   },
+  "NOT_SUPPORTED_CHANGE_COLUMN" : {
+"message" : [
+  "ALTER TABLE ALTER/CHANGE COLUMN is not supported for changing 's 
column  with type  to  with type ."
+]
+  },
   "NOT_SUPPORTED_COMMAND_FOR_V2_TABLE" : {
 "message" : [
   " is not supported for v2 tables."
@@ -2351,6 +2361,11 @@
 ],
 "sqlState" : "0A000"
   },
+  "UNSUPPORTED_CHAR_OR_VARCHAR_AS_STRING" : {
+"message" : [
+  "The char/varchar type can't be used in the table schema. If you want 
Spark treat them as string type as same as Spark 3.0 and earlier, please set 
\"spark.sql.legacy.charVarcharAsString\" to \"true\"."
+]
+  },
   "UNSUPPORTED_DATASOURCE_FOR_DIRECT_QUERY" : {
 "message" : [
   "Unsupported data source type for direct query on files: 
"
@@ -3875,11 +3890,6 @@
   "Found different window function type in ."
 ]
   },
-  "_LEGACY_ERROR_TEMP_1215" : {
-"message" : [
-  "char/varchar type can only be used in the table schema. You can set 
 to true, so that Spark treat them as string type as same as Spark 3.0 
and earlier."
-]
-  },
   "_LEGACY_ERROR_TEMP_1218" : {
 "message" : [
   " should be converted to HadoopFsRelation."
@@ -3955,11 +3965,6 @@
   "CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty 
directory . To allow overwriting the existing non-empty directory, 
set '' to true."
  

[spark] branch master updated: [SPARK-44257][BUILD] Update some maven plugins & scalafmt to newest version

2023-06-30 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 05f5dccbd34 [SPARK-44257][BUILD] Update some maven plugins & scalafmt 
to newest version
05f5dccbd34 is described below

commit 05f5dccbd34218c7d399228529853bdb1595f3a2
Author: panbingkun 
AuthorDate: Fri Jun 30 09:14:22 2023 -0500

[SPARK-44257][BUILD] Update some maven plugins & scalafmt to newest version

### What changes were proposed in this pull request?
The pr aims to update some maven plugins & scalafmt to newest version, 
include:
- maven-clean-plugin from 3.2.0 to 3.3.1
- maven-shade-plugin from 3.4.1 to 3.5.0
- scalafmt from 3.7.4 to 3.7.5

### Why are the changes needed?
1.maven-clean-plugin

https://github.com/apache/maven-clean-plugin/releases/tag/maven-clean-plugin-3.3.1

2.maven-shade-plugin

https://github.com/apache/maven-shade-plugin/releases/tag/maven-shade-plugin-3.5.0

3.scalafmt
https://github.com/scalameta/scalafmt/releases/tag/v3.7.5
Router: make sure to indent comments after lambda 
(https://github.com/scalameta/scalafmt/pull/3556) kitbellew
Fix proposed version syntax 
(https://github.com/scalameta/scalafmt/pull/3555) JD557

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.

Closes #41803 from panbingkun/SPARK-44257.

Authored-by: panbingkun 
Signed-off-by: Sean Owen 
---
 .../src/main/scala/org/apache/spark/sql/Dataset.scala| 16 +++-
 .../scala/org/apache/spark/sql/catalog/Catalog.scala |  7 +++
 .../org/apache/spark/sql/internal/CatalogImpl.scala  |  7 +++
 dev/.scalafmt.conf   |  2 +-
 pom.xml  |  4 ++--
 5 files changed, 16 insertions(+), 20 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index eba425ce127..b959974dc30 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -535,7 +535,7 @@ class Dataset[T] private[sql] (
   assert(result.schema.size == 1)
   // scalastyle:off println
   println(result.toArray.head)
-// scalastyle:on println
+  // scalastyle:on println
 }
   }
 
@@ -2214,10 +2214,9 @@ class Dataset[T] private[sql] (
* tied to this Spark application.
*
* Global temporary view is cross-session. Its lifetime is the lifetime of 
the Spark
-   * application,
-   * i.e. it will be automatically dropped when the application terminates. 
It's tied to a system
-   * preserved database `global_temp`, and we must use the qualified name to 
refer a global temp
-   * view, e.g. `SELECT * FROM global_temp.view1`.
+   * application, i.e. it will be automatically dropped when the application 
terminates. It's tied
+   * to a system preserved database `global_temp`, and we must use the 
qualified name to refer a
+   * global temp view, e.g. `SELECT * FROM global_temp.view1`.
*
* @throws AnalysisException
*   if the view name is invalid or already exists
@@ -2235,10 +2234,9 @@ class Dataset[T] private[sql] (
* temporary view is tied to this Spark application.
*
* Global temporary view is cross-session. Its lifetime is the lifetime of 
the Spark
-   * application,
-   * i.e. it will be automatically dropped when the application terminates. 
It's tied to a system
-   * preserved database `global_temp`, and we must use the qualified name to 
refer a global temp
-   * view, e.g. `SELECT * FROM global_temp.view1`.
+   * application, i.e. it will be automatically dropped when the application 
terminates. It's tied
+   * to a system preserved database `global_temp`, and we must use the 
qualified name to refer a
+   * global temp view, e.g. `SELECT * FROM global_temp.view1`.
*
* @group basic
* @since 3.4.0
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 268f162cbfa..11c3f4e3d18 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -543,10 +543,9 @@ abstract class Catalog {
* cached before, then it will also be uncached.
*
* Global temporary view is cross-session. Its lifetime is the lifetime of 
the Spark
-   * application,
-   * i.e. it will be automatically dropped when the application terminates. 
It's tied to a system
-   

[spark] branch master updated: [SPARK-41599] Memory leak in FileSystem.CACHE when submitting apps to secure cluster using InProcessLauncher

2023-06-30 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7971e1c6a7c [SPARK-41599] Memory leak in FileSystem.CACHE when 
submitting apps to secure cluster using InProcessLauncher
7971e1c6a7c is described below

commit 7971e1c6a7c074c65829c2bdfad857a33e0a7a5d
Author: Xieming LI 
AuthorDate: Fri Jun 30 08:20:04 2023 -0500

[SPARK-41599] Memory leak in FileSystem.CACHE when submitting apps to 
secure cluster using InProcessLauncher

### What changes were proposed in this pull request?

Using `FileSystem.closeAllForUGI` to close the cache to prevent memory leak.

### Why are the changes needed?

There seems to be a memory leak in FileSystem.CACHE when submitting apps to 
secure cluster using InProcessLauncher.
For more detail, see 
[SPARK-41599](https://issues.apache.org/jira/browse/SPARK-41599)

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

I have tested the patch with my code which uses inProcessLauncher.
Confirmed that the memory leak issue is mitigated.

https://github.com/apache/spark/assets/4378066/cfdef4d3-cb43-464c-bb46-de60f3b91622;>

I will be very helpful if I can have some feedback and I will add some test 
cases if required.

Closes #41692 from risyomei/fix-SPARK-41599.

Authored-by: Xieming LI 
Signed-off-by: Sean Owen 
---
 core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 2 ++
 .../apache/spark/deploy/security/HadoopDelegationTokenManager.scala   | 4 
 2 files changed, 6 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 8f9477385e7..60253ed5fda 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -186,6 +186,8 @@ private[spark] class SparkSubmit extends Logging {
   } else {
 throw e
   }
+  } finally {
+FileSystem.closeAllForUGI(proxyUser)
   }
 }
   } else {
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
index 6ce195b6c7a..54a24927ded 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManager.scala
@@ -26,6 +26,7 @@ import java.util.concurrent.{ScheduledExecutorService, 
TimeUnit}
 import scala.collection.mutable
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 
 import org.apache.spark.SparkConf
@@ -149,6 +150,9 @@ private[spark] class HadoopDelegationTokenManager(
   creds.addAll(newTokens)
 }
   })
+  if(!currentUser.equals(freshUGI)) {
+FileSystem.closeAllForUGI(freshUGI)
+  }
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.3 updated: [SPARK-44241][CORE] Mistakenly set io.connectionTimeout/connectionCreationTimeout to zero or negative will cause incessant executor cons/destructions

2023-06-30 Thread yao
This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
 new 84620f2b877 [SPARK-44241][CORE] Mistakenly set 
io.connectionTimeout/connectionCreationTimeout to zero or negative will cause 
incessant executor cons/destructions
84620f2b877 is described below

commit 84620f2b877b9ea52b95343ca46d069a906e28a9
Author: Kent Yao 
AuthorDate: Fri Jun 30 18:33:16 2023 +0800

[SPARK-44241][CORE] Mistakenly set 
io.connectionTimeout/connectionCreationTimeout to zero or negative will cause 
incessant executor cons/destructions

### What changes were proposed in this pull request?

This PR makes zero when io.connectionTimeout/connectionCreationTimeout is 
negative. Zero here means
- connectionCreationTimeout = 0,an unlimited CONNNETION_TIMEOUT for 
connection establishment
- connectionTimeout=0, `IdleStateHandler` for triggering `IdleStateEvent` 
is disabled.

### Why are the changes needed?

1. This PR fixes a bug when connectionCreationTimeout is 0, which means 
unlimited to netty, but ChannelFuture.await(0) fails directly and 
inappropriately.
2. This PR fixes a bug when connectionCreationTimeout is less than 0, which 
causes meaningless transport client reconnections and endless executor 
reconstructions

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new unit tests

Closes #41785 from yaooqinn/SPARK-44241.

Authored-by: Kent Yao 
Signed-off-by: Kent Yao 
(cherry picked from commit 38645fa470b5af7c2e41efa4fb092bdf2463fbbd)
Signed-off-by: Kent Yao 
---
 .../network/client/TransportClientFactory.java | 16 +--
 .../apache/spark/network/util/TransportConf.java   |  4 +--
 .../client/TransportClientFactorySuite.java| 33 +++---
 3 files changed, 44 insertions(+), 9 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index 43408d43e57..188e4ba0f8e 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -249,12 +249,13 @@ public class TransportClientFactory implements Closeable {
 logger.debug("Creating new connection to {}", address);
 
 Bootstrap bootstrap = new Bootstrap();
+int connCreateTimeout = conf.connectionCreationTimeoutMs();
 bootstrap.group(workerGroup)
   .channel(socketChannelClass)
   // Disable Nagle's Algorithm since we don't want packets to wait
   .option(ChannelOption.TCP_NODELAY, true)
   .option(ChannelOption.SO_KEEPALIVE, true)
-  .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
conf.connectionCreationTimeoutMs())
+  .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connCreateTimeout)
   .option(ChannelOption.ALLOCATOR, pooledAllocator);
 
 if (conf.receiveBuf() > 0) {
@@ -280,10 +281,19 @@ public class TransportClientFactory implements Closeable {
 // Connect to the remote server
 long preConnect = System.nanoTime();
 ChannelFuture cf = bootstrap.connect(address);
-if (!cf.await(conf.connectionCreationTimeoutMs())) {
+
+if (connCreateTimeout <= 0) {
+  cf.awaitUninterruptibly();
+  assert cf.isDone();
+  if (cf.isCancelled()) {
+throw new IOException(String.format("Connecting to %s cancelled", 
address));
+  } else if (!cf.isSuccess()) {
+throw new IOException(String.format("Failed to connect to %s", 
address), cf.cause());
+  }
+} else if (!cf.await(connCreateTimeout)) {
   throw new IOException(
 String.format("Connecting to %s timed out (%s ms)",
-  address, conf.connectionCreationTimeoutMs()));
+  address, connCreateTimeout));
 } else if (cf.cause() != null) {
   throw new IOException(String.format("Failed to connect to %s", address), 
cf.cause());
 }
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index 9dedd5d9849..7c2a408d86d 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -103,7 +103,7 @@ public class TransportConf {
   conf.get("spark.network.timeout", "120s"));
 long defaultTimeoutMs = JavaUtils.timeStringAsSec(
   conf.get(SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY, defaultNetworkTimeoutS 
+ "s")) * 1000;
-return (int) defaultTimeoutMs;
+return 

[spark] branch branch-3.4 updated: [SPARK-44241][CORE] Mistakenly set io.connectionTimeout/connectionCreationTimeout to zero or negative will cause incessant executor cons/destructions

2023-06-30 Thread yao
This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new fe971de447d [SPARK-44241][CORE] Mistakenly set 
io.connectionTimeout/connectionCreationTimeout to zero or negative will cause 
incessant executor cons/destructions
fe971de447d is described below

commit fe971de447d55bf357c161a9f1930e822e38fa45
Author: Kent Yao 
AuthorDate: Fri Jun 30 18:33:16 2023 +0800

[SPARK-44241][CORE] Mistakenly set 
io.connectionTimeout/connectionCreationTimeout to zero or negative will cause 
incessant executor cons/destructions

### What changes were proposed in this pull request?

This PR makes zero when io.connectionTimeout/connectionCreationTimeout is 
negative. Zero here means
- connectionCreationTimeout = 0,an unlimited CONNNETION_TIMEOUT for 
connection establishment
- connectionTimeout=0, `IdleStateHandler` for triggering `IdleStateEvent` 
is disabled.

### Why are the changes needed?

1. This PR fixes a bug when connectionCreationTimeout is 0, which means 
unlimited to netty, but ChannelFuture.await(0) fails directly and 
inappropriately.
2. This PR fixes a bug when connectionCreationTimeout is less than 0, which 
causes meaningless transport client reconnections and endless executor 
reconstructions

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new unit tests

Closes #41785 from yaooqinn/SPARK-44241.

Authored-by: Kent Yao 
Signed-off-by: Kent Yao 
(cherry picked from commit 38645fa470b5af7c2e41efa4fb092bdf2463fbbd)
Signed-off-by: Kent Yao 
---
 .../network/client/TransportClientFactory.java | 16 +--
 .../apache/spark/network/util/TransportConf.java   |  4 +--
 .../client/TransportClientFactorySuite.java| 33 +++---
 3 files changed, 44 insertions(+), 9 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index 6fb9923cd3d..3df72e65c2a 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -245,12 +245,13 @@ public class TransportClientFactory implements Closeable {
 logger.debug("Creating new connection to {}", address);
 
 Bootstrap bootstrap = new Bootstrap();
+int connCreateTimeout = conf.connectionCreationTimeoutMs();
 bootstrap.group(workerGroup)
   .channel(socketChannelClass)
   // Disable Nagle's Algorithm since we don't want packets to wait
   .option(ChannelOption.TCP_NODELAY, true)
   .option(ChannelOption.SO_KEEPALIVE, true)
-  .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
conf.connectionCreationTimeoutMs())
+  .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connCreateTimeout)
   .option(ChannelOption.ALLOCATOR, pooledAllocator);
 
 if (conf.receiveBuf() > 0) {
@@ -276,10 +277,19 @@ public class TransportClientFactory implements Closeable {
 // Connect to the remote server
 long preConnect = System.nanoTime();
 ChannelFuture cf = bootstrap.connect(address);
-if (!cf.await(conf.connectionCreationTimeoutMs())) {
+
+if (connCreateTimeout <= 0) {
+  cf.awaitUninterruptibly();
+  assert cf.isDone();
+  if (cf.isCancelled()) {
+throw new IOException(String.format("Connecting to %s cancelled", 
address));
+  } else if (!cf.isSuccess()) {
+throw new IOException(String.format("Failed to connect to %s", 
address), cf.cause());
+  }
+} else if (!cf.await(connCreateTimeout)) {
   throw new IOException(
 String.format("Connecting to %s timed out (%s ms)",
-  address, conf.connectionCreationTimeoutMs()));
+  address, connCreateTimeout));
 } else if (cf.cause() != null) {
   throw new IOException(String.format("Failed to connect to %s", address), 
cf.cause());
 }
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index bbfb99168da..deac78ffedd 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -103,7 +103,7 @@ public class TransportConf {
   conf.get("spark.network.timeout", "120s"));
 long defaultTimeoutMs = JavaUtils.timeStringAsSec(
   conf.get(SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY, defaultNetworkTimeoutS 
+ "s")) * 1000;
-return (int) defaultTimeoutMs;
+return 

[spark] branch master updated (91c45812520 -> 38645fa470b)

2023-06-30 Thread yao
This is an automated email from the ASF dual-hosted git repository.

yao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 91c45812520 [SPARK-43922][SQL] Add named parameter support in parser 
for function calls
 add 38645fa470b [SPARK-44241][CORE] Mistakenly set 
io.connectionTimeout/connectionCreationTimeout to zero or negative will cause 
incessant executor cons/destructions

No new revisions were added by this update.

Summary of changes:
 .../network/client/TransportClientFactory.java | 16 +--
 .../apache/spark/network/util/TransportConf.java   |  4 +--
 .../client/TransportClientFactorySuite.java| 33 +++---
 3 files changed, 44 insertions(+), 9 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43922][SQL] Add named parameter support in parser for function calls

2023-06-30 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 91c45812520 [SPARK-43922][SQL] Add named parameter support in parser 
for function calls
91c45812520 is described below

commit 91c458125203d2feefd1e7443a9315c480dfaa00
Author: Richard Yu 
AuthorDate: Fri Jun 30 13:09:12 2023 +0300

[SPARK-43922][SQL] Add named parameter support in parser for function calls

### What changes were proposed in this pull request?
We plan on adding two new tokens called ```namedArgumentExpression``` and 
```functionArgument``` which would enable this feature. When parsing this 
logic, we also make changes to ASTBuilder such that it can detect if the 
argument passed is a named argument or a positional one.

Here is the link for the design document:

https://docs.google.com/document/d/1uOTX0MICxqu8fNanIsiyB8FV68CceGGpa8BJLP2u9o4/edit

### Why are the changes needed?
This is part of a larger project to implement named parameter support for 
user defined functions, built-in functions, and table valued functions.

### Does this PR introduce _any_ user-facing change?
Yes, the user would be able to call functions with argument lists that 
contain named arguments.

### How was this patch tested?
We add tests in the PlanParserSuite that will verify that the plan parsed 
is as intended.

Closes #41796 from learningchess2003/43922-new.

Authored-by: Richard Yu 
Signed-off-by: Max Gekk 
---
 .../src/main/resources/error/error-classes.json|   5 +
 .../spark/sql/catalyst/parser/SqlBaseLexer.g4  |   1 +
 .../spark/sql/catalyst/parser/SqlBaseParser.g4 |  14 ++-
 .../expressions/NamedArgumentExpression.scala  |  58 ++
 .../spark/sql/catalyst/parser/AstBuilder.scala |  37 +--
 .../spark/sql/errors/QueryCompilationErrors.scala  |   9 ++
 .../org/apache/spark/sql/internal/SQLConf.scala|   7 ++
 .../catalyst/parser/ExpressionParserSuite.scala|  18 +++
 .../sql/catalyst/parser/PlanParserSuite.scala  |  29 +
 .../named-function-arguments.sql.out   | 112 +++
 .../sql-tests/inputs/named-function-arguments.sql  |   5 +
 .../results/named-function-arguments.sql.out   | 122 +
 .../spark/sql/errors/QueryParsingErrorsSuite.scala |  38 ++-
 13 files changed, 443 insertions(+), 12 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index 6db8c5e3bf1..1b2a1ce305a 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -1708,6 +1708,11 @@
   "Not allowed to implement multiple UDF interfaces, UDF class 
."
 ]
   },
+  "NAMED_ARGUMENTS_SUPPORT_DISABLED" : {
+"message" : [
+  "Cannot call function  because named argument references 
are not enabled here. In this case, the named argument reference was 
. Set \"spark.sql.allowNamedFunctionArguments\" to \"true\" to turn 
on feature."
+]
+  },
   "NESTED_AGGREGATE_FUNCTION" : {
 "message" : [
   "It is not allowed to use an aggregate function in the argument of 
another aggregate function. Please use the inner aggregate function in a 
sub-query."
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
index 6c9b3a71266..fb440ef8d37 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
@@ -443,6 +443,7 @@ CONCAT_PIPE: '||';
 HAT: '^';
 COLON: ':';
 ARROW: '->';
+FAT_ARROW : '=>';
 HENT_START: '/*+';
 HENT_END: '*/';
 QUESTION: '?';
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index d1e672e9472..ab6c0d0861f 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -789,7 +789,7 @@ inlineTable
 ;
 
 functionTable
-: funcName=functionName LEFT_PAREN (expression (COMMA expression)*)? 
RIGHT_PAREN tableAlias
+: funcName=functionName LEFT_PAREN (functionArgument (COMMA 
functionArgument)*)? RIGHT_PAREN tableAlias
 ;
 
 tableAlias
@@ -862,6 +862,15 @@ expression
 : booleanExpression
 ;
 
+namedArgumentExpression
+: key=identifier FAT_ARROW value=expression
+;
+
+functionArgument
+: expression
+| namedArgumentExpression
+;
+
 expressionSeq
 : expression (COMMA expression)*
 ;