[spark] branch master updated: [SPARK-41539][SQL] Remap stats and constraints against output in logical plan for LogicalRDD

2022-12-20 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 074e1b39d27 [SPARK-41539][SQL] Remap stats and constraints against 
output in logical plan for LogicalRDD
074e1b39d27 is described below

commit 074e1b39d279f12ff8d822a03741f33f159f5df8
Author: Jungtaek Lim 
AuthorDate: Wed Dec 21 15:02:18 2022 +0900

[SPARK-41539][SQL] Remap stats and constraints against output in logical 
plan for LogicalRDD

### What changes were proposed in this pull request?

This PR proposes to remap stats and constraints against the output in 
logical for LogicalRDD, like we remap stats and constraints against the "new" 
output when we call newInstance.

### Why are the changes needed?

The output in logical plan and optimized plan can be "slightly" different 
(we observed the difference of exprId), and then the query can fail due to the 
invalid attribute reference(s) in stats and constraints for LogicalRDD.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Modified test cases.

Closes #39082 from HeartSaVioR/SPARK-41539.

Authored-by: Jungtaek Lim 
Signed-off-by: Jungtaek Lim 
---
 .../apache/spark/sql/execution/ExistingRDD.scala   | 89 +-
 .../org/apache/spark/sql/DataFrameSuite.scala  | 29 ++-
 2 files changed, 98 insertions(+), 20 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 3acadee5fb4..3dcf0efaadd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Dataset, Encoder, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
@@ -103,6 +104,8 @@ case class LogicalRDD(
 originConstraints: Option[ExpressionSet] = None)
   extends LeafNode with MultiInstanceRelation {
 
+  import LogicalRDD._
+
   override protected final def otherCopyArgs: Seq[AnyRef] =
 session :: originStats :: originConstraints :: Nil
 
@@ -122,22 +125,8 @@ case class LogicalRDD(
   case e: Attribute => rewrite.getOrElse(e, e)
 }.asInstanceOf[SortOrder])
 
-val rewrittenStatistics = originStats.map { s =>
-  Statistics(
-s.sizeInBytes,
-s.rowCount,
-AttributeMap[ColumnStat](s.attributeStats.map {
-  case (attr, v) => (rewrite.getOrElse(attr, attr), v)
-}),
-s.isRuntime
-  )
-}
-
-val rewrittenConstraints = originConstraints.map { c =>
-  c.map(_.transform {
-case e: Attribute => rewrite.getOrElse(e, e)
-  })
-}
+val rewrittenStatistics = originStats.map(rewriteStatistics(_, rewrite))
+val rewrittenConstraints = originConstraints.map(rewriteConstraints(_, 
rewrite))
 
 LogicalRDD(
   output.map(rewrite),
@@ -163,7 +152,7 @@ case class LogicalRDD(
   override lazy val constraints: ExpressionSet = 
originConstraints.getOrElse(ExpressionSet())
 }
 
-object LogicalRDD {
+object LogicalRDD extends Logging {
   /**
* Create a new LogicalRDD based on existing Dataset. Stats and constraints 
are inherited from
* origin Dataset.
@@ -183,16 +172,80 @@ object LogicalRDD {
   }
 }
 
+val logicalPlan = originDataset.logicalPlan
 val optimizedPlan = originDataset.queryExecution.optimizedPlan
 val executedPlan = originDataset.queryExecution.executedPlan
 
+val (stats, constraints) = rewriteStatsAndConstraints(logicalPlan, 
optimizedPlan)
+
 LogicalRDD(
   originDataset.logicalPlan.output,
   rdd,
   firstLeafPartitioning(executedPlan.outputPartitioning),
   executedPlan.outputOrdering,
   isStreaming
-)(originDataset.sparkSession, Some(optimizedPlan.stats), 
Some(optimizedPlan.constraints))
+)(originDataset.sparkSession, stats, constraints)
+  }
+
+  private[sql] def buildOutputAssocForRewrite(
+  source: Seq[Attribute],
+  destination: Seq[Attribute]): Option[Map[Attribute, Attribute]] = {
+// We check the name and type, allowing nullability, exprId, metadata, 
qualifier be different
+// E.g. This could happen during optimization phase.
+val rewrite = source.zip(destination).flatMap { case (attr1, attr2) =>
+  if (attr1.name == attr2.name && attr1.dataType == attr2.dataType) {
+Some(attr1 -> attr2)
+  } else {
+None
+  }
+}.toMap
+
+if (rewrite.size == source.size) {
+  Some(rewrite)
+} else {
+  None
+}
+  }
+
+  private[sql] def 

[spark] branch master updated: [SPARK-41634][BUILD] Upgrade `minimatch` to 3.1.2

2022-12-20 Thread sarutak
This is an automated email from the ASF dual-hosted git repository.

sarutak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4539260f4ac [SPARK-41634][BUILD] Upgrade `minimatch` to 3.1.2
4539260f4ac is described below

commit 4539260f4ac346f22ce1a47ca9e94e3181803490
Author: Bjørn 
AuthorDate: Wed Dec 21 13:49:45 2022 +0900

[SPARK-41634][BUILD] Upgrade `minimatch` to 3.1.2

### What changes were proposed in this pull request?
Upgrade `minimatch` to 3.1.2

$ npm -v
9.1.2

$ npm install

added 118 packages, and audited 119 packages in 2s

15 packages are looking for funding
 run `npm fund` for details

found 0 vulnerabilities

### Why are the changes needed?
[CVE-2022-3517](https://nvd.nist.gov/vuln/detail/CVE-2022-3517)

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA

Closes #39143 from bjornjorgensen/upgrade-minimatch.

Authored-by: Bjørn 
Signed-off-by: Kousuke Saruta 
---
 dev/package-lock.json | 15 ---
 dev/package.json  |  3 ++-
 2 files changed, 10 insertions(+), 8 deletions(-)

diff --git a/dev/package-lock.json b/dev/package-lock.json
index c2a61b389ac..104a3fb7854 100644
--- a/dev/package-lock.json
+++ b/dev/package-lock.json
@@ -6,7 +6,8 @@
 "": {
   "devDependencies": {
 "ansi-regex": "^5.0.1",
-"eslint": "^7.25.0"
+"eslint": "^7.25.0",
+"minimatch": "^3.1.2"
   }
 },
 "node_modules/@babel/code-frame": {
@@ -853,9 +854,9 @@
   }
 },
 "node_modules/minimatch": {
-  "version": "3.0.4",
-  "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.0.4.tgz;,
-  "integrity": 
"sha512-yJHVQEhyqPLUTgt9B83PXu6W3rx4MvvHvSUvToogpwoGDOUQ+yDrR0HRot+yOCdCO7u4hX3pWft6kWBBcqh0UA==",
+  "version": "3.1.2",
+  "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.1.2.tgz;,
+  "integrity": 
"sha512-J7p63hRiAjw1NDEww1W7i37+ByIrOWO5XQQAzZ3VOcL0PNybwpfmV/N05zFAzwQ9USyEcX6t3UO+K5aqBQOIHw==",
   "dev": true,
   "dependencies": {
 "brace-expansion": "^1.1.7"
@@ -1931,9 +1932,9 @@
   }
 },
 "minimatch": {
-  "version": "3.0.4",
-  "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.0.4.tgz;,
-  "integrity": 
"sha512-yJHVQEhyqPLUTgt9B83PXu6W3rx4MvvHvSUvToogpwoGDOUQ+yDrR0HRot+yOCdCO7u4hX3pWft6kWBBcqh0UA==",
+  "version": "3.1.2",
+  "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.1.2.tgz;,
+  "integrity": 
"sha512-J7p63hRiAjw1NDEww1W7i37+ByIrOWO5XQQAzZ3VOcL0PNybwpfmV/N05zFAzwQ9USyEcX6t3UO+K5aqBQOIHw==",
   "dev": true,
   "requires": {
 "brace-expansion": "^1.1.7"
diff --git a/dev/package.json b/dev/package.json
index f975bdde831..4e4a4bf1bca 100644
--- a/dev/package.json
+++ b/dev/package.json
@@ -1,6 +1,7 @@
 {
   "devDependencies": {
 "eslint": "^7.25.0",
-"ansi-regex": "^5.0.1"
+"ansi-regex": "^5.0.1",
+"minimatch": "^3.1.2"
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41587][BUILD] Upgrade `org.scalatestplus:selenium-4-4` to `org.scalatestplus:selenium-4-7`

2022-12-20 Thread sarutak
This is an automated email from the ASF dual-hosted git repository.

sarutak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ee2e582ff19 [SPARK-41587][BUILD] Upgrade 
`org.scalatestplus:selenium-4-4` to `org.scalatestplus:selenium-4-7`
ee2e582ff19 is described below

commit ee2e582ff195fa11047545f43d1cb0ebd20a7091
Author: yangjie01 
AuthorDate: Wed Dec 21 13:40:40 2022 +0900

[SPARK-41587][BUILD] Upgrade `org.scalatestplus:selenium-4-4` to 
`org.scalatestplus:selenium-4-7`

### What changes were proposed in this pull request?
This pr aims upgrade `org.scalatestplus:selenium-4-4` to 
`org.scalatestplus:selenium-4-7`:

- `org.scalatestplus:selenium-4-4` -> `org.scalatestplus:selenium-4-7`
- `selenium-java`: 4.4.0 -> 4.7.1
- `htmlunit-driver`: 3.64.0 -> 4.7.0
- `htmlunit` -> 2.64.0 -> 2.67.0

And all upgraded dependencies versions are matched.

### Why are the changes needed?
The release notes as follows:

- 
https://github.com/scalatest/scalatestplus-selenium/releases/tag/release-3.2.14.0-for-selenium-4.7

### Does this PR introduce _any_ user-facing change?
No, just for test

### How was this patch tested?

- Pass Github Actions
- Manual test:
   - ChromeUISeleniumSuite

```
build/sbt -Dguava.version=31.1-jre 
-Dspark.test.webdriver.chrome.driver=/path/to/chromedriver 
-Dtest.default.exclude.tags="" -Phive -Phive-thriftserver "core/testOnly 
org.apache.spark.ui.ChromeUISeleniumSuite"
```

```
[info] ChromeUISeleniumSuite:
Starting ChromeDriver 108.0.5359.71 
(1e0e3868ee06e91ad636a874420e3ca3ae3756ac-refs/branch-heads/5359{#1016}) on 
port 13600
Only local connections are allowed.
Please see https://chromedriver.chromium.org/security-considerations for 
suggestions on keeping ChromeDriver safe.
ChromeDriver was started successfully.
[info] - SPARK-31534: text for tooltip should be escaped (2 seconds, 702 
milliseconds)
[info] - SPARK-31882: Link URL for Stage DAGs should not depend on paged 
table. (824 milliseconds)
[info] - SPARK-31886: Color barrier execution mode RDD correctly (313 
milliseconds)
[info] - Search text for paged tables should not be saved (1 second, 745 
milliseconds)
[info] Run completed in 10 seconds, 266 milliseconds.
[info] Total number of tests run: 4
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 4, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 23 s, completed 2022-12-19 19:41:26
```

   - RocksDBBackendChromeUIHistoryServerSuite

```
build/sbt -Dguava.version=31.1-jre 
-Dspark.test.webdriver.chrome.driver=/path/to/chromedriver 
-Dtest.default.exclude.tags="" -Phive -Phive-thriftserver "core/testOnly 
org.apache.spark.deploy.history.RocksDBBackendChromeUIHistoryServerSuite"
```

```
[info] RocksDBBackendChromeUIHistoryServerSuite:
Starting ChromeDriver 108.0.5359.71 
(1e0e3868ee06e91ad636a874420e3ca3ae3756ac-refs/branch-heads/5359{#1016}) on 
port 2201
Only local connections are allowed.
Please see https://chromedriver.chromium.org/security-considerations for 
suggestions on keeping ChromeDriver safe.
ChromeDriver was started successfully.
[info] - ajax rendered relative links are prefixed with uiRoot 
(spark.ui.proxyBase) (2 seconds, 362 milliseconds)
[info] Run completed in 10 seconds, 254 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 24 s, completed 2022-12-19 19:40:42
```

Closes #39129 from LuciferYang/selenium-47.

Authored-by: yangjie01 
Signed-off-by: Kousuke Saruta 
---
 pom.xml | 10 +-
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/pom.xml b/pom.xml
index 5ae26570e2d..f09207c660f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -205,9 +205,9 @@
 
 4.9.3
 1.1
-4.4.0
-3.64.0
-2.64.0
+4.7.1
+4.7.0
+2.67.0
 1.8
 1.1.0
 1.5.0
@@ -416,7 +416,7 @@
 
 
   org.scalatestplus
-  selenium-4-4_${scala.binary.version}
+  selenium-4-7_${scala.binary.version}
   test
 
 
@@ -1144,7 +1144,7 @@
   
   
 org.scalatestplus
-selenium-4-4_${scala.binary.version}
+selenium-4-7_${scala.binary.version}
 3.2.14.0
 test
   


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (e23983a32df -> 7efc6f493fe)

2022-12-20 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from e23983a32df [SPARK-41434][CONNECT][PYTHON] Initial `LambdaFunction` 
implementation
 add 7efc6f493fe [SPARK-41426][UI] Protobuf serializer for 
ResourceProfileWrapper

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/status/protobuf/store_types.proto |  6 +++-
 ...plicationEnvironmentInfoWrapperSerializer.scala |  4 +--
 .../protobuf/KVStoreProtobufSerializer.scala   |  3 ++
 ...cala => ResourceProfileWrapperSerializer.scala} | 27 +++---
 .../protobuf/KVStoreProtobufSerializerSuite.scala  | 42 ++
 5 files changed, 64 insertions(+), 18 deletions(-)
 copy 
core/src/main/scala/org/apache/spark/status/protobuf/{ExecutorMetricsSerializer.scala
 => ResourceProfileWrapperSerializer.scala} (54%)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (801e07996a4 -> e23983a32df)

2022-12-20 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 801e07996a4 [SPARK-41192][CORE] Remove unscheduled speculative tasks 
when task finished to obtain better dynamic
 add e23983a32df [SPARK-41434][CONNECT][PYTHON] Initial `LambdaFunction` 
implementation

No new revisions were added by this update.

Summary of changes:
 .../main/protobuf/spark/connect/expressions.proto  |  12 ++
 .../sql/connect/planner/SparkConnectPlanner.scala  |  34 +
 python/pyspark/sql/connect/column.py   |  29 
 python/pyspark/sql/connect/functions.py| 149 -
 .../pyspark/sql/connect/proto/expressions_pb2.py   |  69 ++
 .../pyspark/sql/connect/proto/expressions_pb2.pyi  |  42 ++
 .../sql/tests/connect/test_connect_function.py |  32 +
 7 files changed, 302 insertions(+), 65 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41192][CORE] Remove unscheduled speculative tasks when task finished to obtain better dynamic

2022-12-20 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 801e07996a4 [SPARK-41192][CORE] Remove unscheduled speculative tasks 
when task finished to obtain better dynamic
801e07996a4 is described below

commit 801e07996a4d4ea448b6fc468cc6c9d6904ceef2
Author: wangyazhi 
AuthorDate: Tue Dec 20 21:35:37 2022 -0600

[SPARK-41192][CORE] Remove unscheduled speculative tasks when task finished 
to obtain better dynamic

### What changes were proposed in this pull request?
ExecutorAllocationManager only record count for speculative task, 
`stageAttemptToNumSpeculativeTasks` increment when speculative task submit, and 
only decrement when speculative task end.
If task finished before speculative task start, the speculative task will 
never be scheduled, which will cause leak of 
`stageAttemptToNumSpeculativeTasks` and mislead the calculation of target 
executors.

This PR fixes the issue by add task index in 
`SparkListenerSpeculativeTaskSubmitted` event, and record speculative task with 
task index when submitted, task index should be removed when speculative task 
start or task finished(whether it is speculative or not)

### Why are the changes needed?
To fix idle executors caused by pending speculative task from task that has 
been finished

### Does this PR introduce _any_ user-facing change?
DeveloperApi `SparkListenerSpeculativeTaskSubmitted` add taskIndex with 
default value -1

### How was this patch tested?
Add a comprehensive unit test.
Pass the GA

Closes #38711 from toujours33/SPARK-41192.

Lead-authored-by: wangyazhi 
Co-authored-by: toujours33 
Signed-off-by: Mridul Muralidharan gmail.com>
---
 .../apache/spark/ExecutorAllocationManager.scala   | 38 +
 .../org/apache/spark/scheduler/DAGScheduler.scala  | 14 ++--
 .../apache/spark/scheduler/DAGSchedulerEvent.scala |  2 +-
 .../org/apache/spark/scheduler/SparkListener.scala | 16 +++-
 .../apache/spark/scheduler/TaskSetManager.scala|  2 +-
 .../spark/ExecutorAllocationManagerSuite.scala | 97 --
 .../spark/scheduler/TaskSetManagerSuite.scala  |  2 +-
 7 files changed, 139 insertions(+), 32 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 204ffc39a11..f06312c15cf 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager(
 // Should be 0 when no stages are active.
 private val stageAttemptToNumRunningTask = new 
mutable.HashMap[StageAttempt, Int]
 private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, 
mutable.HashSet[Int]]
-// Number of speculative tasks pending/running in each stageAttempt
-private val stageAttemptToNumSpeculativeTasks = new 
mutable.HashMap[StageAttempt, Int]
-// The speculative tasks started in each stageAttempt
+// Map from each stageAttempt to a set of running speculative task indexes
+// TODO(SPARK-41192): We simply need an Int for this.
 private val stageAttemptToSpeculativeTaskIndices =
+  new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]()
+// Map from each stageAttempt to a set of pending speculative task indexes
+private val stageAttemptToPendingSpeculativeTasks =
   new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
 
 private val resourceProfileIdToStageAttempt =
@@ -722,7 +724,7 @@ private[spark] class ExecutorAllocationManager(
 // because the attempt may still have running tasks,
 // even after another attempt for the stage is submitted.
 stageAttemptToNumTasks -= stageAttempt
-stageAttemptToNumSpeculativeTasks -= stageAttempt
+stageAttemptToPendingSpeculativeTasks -= stageAttempt
 stageAttemptToTaskIndices -= stageAttempt
 stageAttemptToSpeculativeTaskIndices -= stageAttempt
 stageAttemptToExecutorPlacementHints -= stageAttempt
@@ -733,7 +735,9 @@ private[spark] class ExecutorAllocationManager(
 
 // If this is the last stage with pending tasks, mark the scheduler 
queue as empty
 // This is needed in case the stage is aborted for any reason
-if (stageAttemptToNumTasks.isEmpty && 
stageAttemptToNumSpeculativeTasks.isEmpty) {
+if (stageAttemptToNumTasks.isEmpty
+  && stageAttemptToPendingSpeculativeTasks.isEmpty
+  && stageAttemptToSpeculativeTaskIndices.isEmpty) {
   allocationManager.onSchedulerQueueEmpty()
 }
   }
@@ -751,6 +755,8 @@ private[spark] class ExecutorAllocationManager(
 if 

[spark] branch master updated (940946515bd -> fd6d226528e)

2022-12-20 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 940946515bd [SPARK-41440][CONNECT][PYTHON] Implement 
`DataFrame.randomSplit`
 add fd6d226528e [SPARK-41631][SQL] Support implicit lateral column alias 
resolution on Aggregate

No new revisions were added by this update.

Summary of changes:
 core/src/main/resources/error/error-classes.json   |   5 +
 .../spark/sql/catalyst/analysis/Analyzer.scala |  32 +-
 .../ResolveLateralColumnAliasReference.scala   | 107 +++-
 .../spark/sql/errors/QueryCompilationErrors.scala  |  15 +-
 .../org/apache/spark/sql/internal/SQLConf.scala|   2 +-
 .../apache/spark/sql/LateralColumnAliasSuite.scala | 613 ++---
 .../scala/org/apache/spark/sql/QueryTest.scala |   2 +-
 7 files changed, 674 insertions(+), 102 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

2022-12-20 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 940946515bd [SPARK-41440][CONNECT][PYTHON] Implement 
`DataFrame.randomSplit`
940946515bd is described below

commit 940946515bd199930051be89f9fd557a35f2af0d
Author: Jiaan Geng 
AuthorDate: Wed Dec 21 11:31:00 2022 +0900

[SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`

### What changes were proposed in this pull request?
Implement `DataFrame.randomSplit` with a proto message

Implement `DataFrame.randomSplit` for scala API
Implement `DataFrame.randomSplit` for python API

### Why are the changes needed?
for Connect API coverage

### Does this PR introduce _any_ user-facing change?
'No'. New API

### How was this patch tested?
New test cases.

Closes #39017 from beliefer/SPARK-41440.

Authored-by: Jiaan Geng 
Signed-off-by: Hyukjin Kwon 
---
 .../main/protobuf/spark/connect/relations.proto|  4 ++
 .../org/apache/spark/sql/connect/dsl/package.scala | 34 ++
 .../sql/connect/planner/SparkConnectPlanner.scala  | 24 +++-
 .../connect/planner/SparkConnectProtoSuite.scala   | 16 +
 python/pyspark/sql/connect/dataframe.py| 57 +
 python/pyspark/sql/connect/plan.py |  3 +
 python/pyspark/sql/connect/proto/relations_pb2.py  | 72 +++---
 python/pyspark/sql/connect/proto/relations_pb2.pyi | 18 ++
 .../sql/tests/connect/test_connect_basic.py| 15 +
 .../sql/tests/connect/test_connect_plan_only.py| 39 
 10 files changed, 244 insertions(+), 38 deletions(-)

diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
index 2f83db1176a..42471821634 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
@@ -317,6 +317,10 @@ message Sample {
 
   // (Optional) The random seed.
   optional int64 seed = 5;
+
+  // (Optional) Explicitly sort the underlying plan to make the ordering 
deterministic.
+  // This flag is only used to randomly splits DataFrame with the provided 
weights.
+  optional bool force_stable_sort = 6;
 }
 
 // Relation of type [[Range]] that generates a sequence of integers.
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
index 8211dc21bde..bce8d390fcb 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -27,6 +27,7 @@ import org.apache.spark.connect.proto.SetOperation.SetOpType
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.connect.planner.DataTypeProtoConverter
 import 
org.apache.spark.sql.connect.planner.LiteralValueProtoConverter.toConnectProtoValue
+import org.apache.spark.util.Utils
 
 /**
  * A collection of implicit conversions that create a DSL for constructing 
connect protos.
@@ -775,6 +776,39 @@ package object dsl {
   valueColumnName: String): Relation =
 unpivot(ids, variableColumnName, valueColumnName)
 
+  def randomSplit(weights: Array[Double], seed: Long): Array[Relation] = {
+require(
+  weights.forall(_ >= 0),
+  s"Weights must be nonnegative, but got ${weights.mkString("[", ",", 
"]")}")
+require(
+  weights.sum > 0,
+  s"Sum of weights must be positive, but got ${weights.mkString("[", 
",", "]")}")
+
+val sum = weights.toSeq.sum
+val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
+normalizedCumWeights
+  .sliding(2)
+  .map { x =>
+Relation
+  .newBuilder()
+  .setSample(
+Sample
+  .newBuilder()
+  .setInput(logicalPlan)
+  .setLowerBound(x(0))
+  .setUpperBound(x(1))
+  .setWithReplacement(false)
+  .setSeed(seed)
+  .setForceStableSort(true)
+  .build())
+  .build()
+  }
+  .toArray
+  }
+
+  def randomSplit(weights: Array[Double]): Array[Relation] =
+randomSplit(weights, Utils.random.nextLong)
+
   private def createSetOperation(
   left: Relation,
   right: Relation,
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 

[spark] branch master updated (d33a59c940f -> ff68d0ef945)

2022-12-20 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from d33a59c940f [SPARK-41396][SQL][PROTOBUF] OneOf field support and 
recursion checks
 add ff68d0ef945 [SPARK-41566][BUILD] Upgrade `netty`  to 4.1.86.Final

No new revisions were added by this update.

Summary of changes:
 dev/deps/spark-deps-hadoop-2-hive-2.3 | 36 +--
 dev/deps/spark-deps-hadoop-3-hive-2.3 | 36 +--
 pom.xml   |  6 +-
 3 files changed, 41 insertions(+), 37 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark-docker] branch master updated: [SPARK-40520] Add support to generate DOI mainifest

2022-12-20 Thread yikun
This is an automated email from the ASF dual-hosted git repository.

yikun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark-docker.git


The following commit(s) were added to refs/heads/master by this push:
 new 7bb8661  [SPARK-40520] Add support to generate DOI mainifest
7bb8661 is described below

commit 7bb8661f7d57356f94fd5874696df1b1c058cb0b
Author: Yikun Jiang 
AuthorDate: Wed Dec 21 10:15:44 2022 +0800

[SPARK-40520] Add support to generate DOI mainifest

### What changes were proposed in this pull request?
This patch add support to generate DOI mainifest from versions.json.

### Why are the changes needed?
To help generate DOI mainifest

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
```shell
$ flake8 ./tools/manifest.py --max-line-length=100
$ black ./tools/manifest.py
All done! ✨  ✨
1 file left unchanged.
```

```shell
$ tools/manifest.py manifest
Maintainers: Apache Spark Developers  (ApacheSpark)
GitRepo: https://github.com/apache/spark-docker.git

Tags: 3.3.1-scala2.12-java11-python3-ubuntu, 3.3.1-python3, 3.3.1, python3, 
latest
Architectures: amd64, arm64v8
GitCommit: 496edb6dee0ade08bc5d180d7a6da0ff8b5d91ff
Directory: ./3.3.1/scala2.12-java11-python3-ubuntu

Tags: 3.3.1-scala2.12-java11-r-ubuntu, 3.3.1-r, r
Architectures: amd64, arm64v8
GitCommit: 496edb6dee0ade08bc5d180d7a6da0ff8b5d91ff
Directory: ./3.3.1/scala2.12-java11-r-ubuntu

// ... ...
```

Closes #27 from Yikun/SPARK-40520.

Authored-by: Yikun Jiang 
Signed-off-by: Yikun Jiang 
---
 tools/manifest.py | 34 --
 1 file changed, 32 insertions(+), 2 deletions(-)

diff --git a/tools/manifest.py b/tools/manifest.py
index fbfad6f..13bc631 100755
--- a/tools/manifest.py
+++ b/tools/manifest.py
@@ -19,7 +19,33 @@
 
 from argparse import ArgumentParser
 import json
-from statistics import mode
+import subprocess
+
+
+def run_cmd(cmd):
+if isinstance(cmd, list):
+return subprocess.check_output(cmd).decode("utf-8")
+else:
+return subprocess.check_output(cmd.split(" ")).decode("utf-8")
+
+
+def generate_manifest(versions):
+output = (
+"Maintainers: Apache Spark Developers  
(@ApacheSpark)\n"
+"GitRepo: https://github.com/apache/spark-docker.git\n\n;
+)
+git_commit = run_cmd("git rev-parse HEAD").replace("\n", "")
+content = (
+"Tags: %s\n"
+"Architectures: amd64, arm64v8\n"
+"GitCommit: %s\n"
+"Directory: ./%s\n\n"
+)
+for version in versions:
+tags = ", ".join(version["tags"])
+path = version["path"]
+output += content % (tags, git_commit, path)
+return output
 
 
 def parse_opts():
@@ -27,7 +53,7 @@ def parse_opts():
 
 parser.add_argument(
 dest="mode",
-choices=["tags"],
+choices=["tags", "manifest"],
 type=str,
 help="The print mode of script",
 )
@@ -76,6 +102,10 @@ def main():
 # Get matched version's tags
 tags = versions[0]["tags"] if versions else []
 print(",".join(["%s:%s" % (image, t) for t in tags]))
+elif mode == "manifest":
+with open(version_file, "r") as f:
+versions = json.load(f).get("versions")
+print(generate_manifest(versions))
 
 
 if __name__ == "__main__":


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

2022-12-20 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d33a59c940f [SPARK-41396][SQL][PROTOBUF] OneOf field support and 
recursion checks
d33a59c940f is described below

commit d33a59c940f0e8f0b93d91cc9e700c2cb533d54e
Author: SandishKumarHN 
AuthorDate: Wed Dec 21 09:37:15 2022 +0800

[SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks

Oneof fields allow a message to contain one and only one of a defined set 
of field types, while recursive fields provide a way to define messages that 
can refer to themselves, allowing for the creation of complex and nested data 
structures.  with this change users will be able to use protobuf OneOf fields 
with spark-protobuf, making it a more complete and useful tool for processing 
protobuf data.

**Support for circularReferenceDepth:**
The `recursive.fields.max.depth` parameter can be specified in the 
from_protobuf options to control the maximum allowed recursion depth for a 
field. Setting `recursive.fields.max.depth` to 0 drops all-recursive fields, 
setting it to 1 allows it to be recursed once, and setting it to 2 allows it to 
be recursed twice. Attempting to set the `recursive.fields.max.depth` to a 
value greater than 10 is not allowed. If the `recursive.fields.max.depth` is 
not specified, it will default to -1;  [...]
SQL Schema for the protobuf message
 ```
message Person {
 string name = 1;
 Person bff = 2
}
```
will vary based on the value of `recursive.fields.max.depth`.
```
0: struct
1: struct>
2: struct>> ...
```

### What changes were proposed in this pull request?

- Add support for protobuf oneof field
- Stop recursion at the first level when a recursive field is encountered. 
(instead of throwing an error)

### Why are the changes needed?

Stop recursion at the first level and handle nulltype in deserilization.

### Does this PR introduce _any_ user-facing change?

NA
### How was this patch tested?

Added Unit tests for OneOf field support and recursion checks.
Tested full support for nested OneOf fields and message types using real 
data from Kafka on a real cluster

cc: rangadi mposdev21

Closes #38922 from SandishKumarHN/SPARK-41396.

Authored-by: SandishKumarHN 
Signed-off-by: Wenchen Fan 
---
 .../sql/protobuf/ProtobufDataToCatalyst.scala  |   2 +-
 .../spark/sql/protobuf/ProtobufDeserializer.scala  |   8 +-
 .../spark/sql/protobuf/utils/ProtobufOptions.scala |   8 +
 .../sql/protobuf/utils/SchemaConverters.scala  |  69 ++-
 .../test/resources/protobuf/functions_suite.desc   | Bin 6678 -> 8739 bytes
 .../test/resources/protobuf/functions_suite.proto  |  85 ++-
 .../sql/protobuf/ProtobufFunctionsSuite.scala  | 576 -
 core/src/main/resources/error/error-classes.json   |   2 +-
 8 files changed, 721 insertions(+), 29 deletions(-)

diff --git 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
index c0997b1bd06..da44f94d5ea 100644
--- 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
+++ 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala
@@ -39,7 +39,7 @@ private[protobuf] case class ProtobufDataToCatalyst(
   override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
 
   override lazy val dataType: DataType = {
-val dt = SchemaConverters.toSqlType(messageDescriptor).dataType
+val dt = SchemaConverters.toSqlType(messageDescriptor, 
protobufOptions).dataType
 parseMode match {
   // With PermissiveMode, the output Catalyst row might contain columns of 
null values for
   // corrupt records, even if some of the columns are not nullable in the 
user-provided schema.
diff --git 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala
 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala
index 46366ba268b..224e22c0f52 100644
--- 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala
+++ 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala
@@ -156,6 +156,9 @@ private[sql] class ProtobufDeserializer(
 (protoType.getJavaType, catalystType) match {
 
   case (null, NullType) => (updater, ordinal, _) => 
updater.setNullAt(ordinal)
+  // It is possible that this will result in data being dropped, This is 
intentional,
+  // to catch recursive fields and drop them as necessary.
+  case (MESSAGE, NullType) => 

[spark] branch master updated: [MINOR] Fix some typos

2022-12-20 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 52e4b31903c [MINOR] Fix some typos
52e4b31903c is described below

commit 52e4b31903cde37bef24a5abf808b11615845867
Author: Liu Chunbo 
AuthorDate: Wed Dec 21 10:36:40 2022 +0900

[MINOR] Fix some typos

What changes were proposed in this pull request?
Fix some typos in the code comments.
Why are the changes needed?
Modify these two comment mistakes and make code comments more standardized.
Does this PR introduce any user-facing change?
No
How was this patch tested?
No test required

Closes #39111 from for08/SPARK-41560.

Authored-by: Liu Chunbo 
Signed-off-by: Hyukjin Kwon 
---
 .../src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java| 2 +-
 .../main/java/org/apache/spark/network/protocol/MessageWithHeader.java  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
 
b/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
index 2d573f51243..4dd8cec2900 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java
@@ -68,7 +68,7 @@ public abstract class ManagedBuffer {
   public abstract ManagedBuffer release();
 
   /**
-   * Convert the buffer into an Netty object, used to write the data out. The 
return value is either
+   * Convert the buffer into a Netty object, used to write the data out. The 
return value is either
* a {@link io.netty.buffer.ByteBuf} or a {@link 
io.netty.channel.FileRegion}.
*
* If this method returns a ByteBuf, then that buffer's reference count will 
be incremented and
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
 
b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
index 19eeddb842c..dfcb1c642eb 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
@@ -140,7 +140,7 @@ class MessageWithHeader extends AbstractFileRegion {
 // SPARK-24578: cap the sub-region's size of returned nio buffer to 
improve the performance
 // for the case that the passed-in buffer has too many components.
 int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT);
-// If the ByteBuf holds more then one ByteBuffer we should better call 
nioBuffers(...)
+// If the ByteBuf holds more than one ByteBuffer we should better call 
nioBuffers(...)
 // to eliminate extra memory copies.
 int written = 0;
 if (buf.nioBufferCount() == 1) {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (7ed4d448daa -> 746167a1638)

2022-12-20 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 7ed4d448daa [SPARK-41334][CONNECT][FOLLOWUP] Handle SortOrder 
Expression
 add 746167a1638 [SPARK-41584][BUILD] Upgrade RoaringBitmap to 0.9.36

No new revisions were added by this update.

Summary of changes:
 core/benchmarks/MapStatusesConvertBenchmark-jdk11-results.txt | 10 +-
 core/benchmarks/MapStatusesConvertBenchmark-jdk17-results.txt | 10 +-
 core/benchmarks/MapStatusesConvertBenchmark-results.txt   |  8 
 dev/deps/spark-deps-hadoop-2-hive-2.3 |  4 ++--
 dev/deps/spark-deps-hadoop-3-hive-2.3 |  4 ++--
 pom.xml   |  2 +-
 6 files changed, 19 insertions(+), 19 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-41334][CONNECT][FOLLOWUP] Handle SortOrder Expression

2022-12-20 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7ed4d448daa [SPARK-41334][CONNECT][FOLLOWUP] Handle SortOrder 
Expression
7ed4d448daa is described below

commit 7ed4d448daab372b2c5cc846f1f66f70f7fd574c
Author: dengziming 
AuthorDate: Wed Dec 21 08:56:03 2022 +0800

[SPARK-41334][CONNECT][FOLLOWUP] Handle SortOrder Expression

### What changes were proposed in this pull request?
in #39090 we moved `SortOrder` proto from relations to expressions, in this 
PR we add logic to handle it.

### Why are the changes needed?
[](https://github.com/dengziming/spark/pull/new/SortOrder)

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
A unit test and existing tests.

Closes #39138 from dengziming/SortOrder.

Authored-by: dengziming 
Signed-off-by: Ruifeng Zheng 
---
 .../sql/connect/planner/SparkConnectPlanner.scala  |  5 ++-
 .../connect/planner/SparkConnectPlannerSuite.scala | 46 +-
 2 files changed, 48 insertions(+), 3 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 3bdf3654c68..9fe9acd354d 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -480,6 +480,7 @@ class SparkConnectPlanner(session: SparkSession) {
   case proto.Expression.ExprTypeCase.CAST => transformCast(exp.getCast)
   case proto.Expression.ExprTypeCase.UNRESOLVED_REGEX =>
 transformUnresolvedRegex(exp.getUnresolvedRegex)
+  case proto.Expression.ExprTypeCase.SORT_ORDER => 
transformSortOrder(exp.getSortOrder)
   case _ =>
 throw InvalidPlanInput(
   s"Expression with ID: ${exp.getExprTypeCase.getNumber} is not 
supported")
@@ -699,10 +700,10 @@ class SparkConnectPlanner(session: SparkSession) {
 logical.Sort(
   child = transformRelation(sort.getInput),
   global = sort.getIsGlobal,
-  order = 
sort.getOrderList.asScala.toSeq.map(transformSortOrderExpression))
+  order = sort.getOrderList.asScala.toSeq.map(transformSortOrder))
   }
 
-  private def transformSortOrderExpression(order: proto.Expression.SortOrder) 
= {
+  private def transformSortOrder(order: proto.Expression.SortOrder) = {
 expressions.SortOrder(
   child = transformExpression(order.getChild),
   direction = order.getDirection match {
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
index 2e0aa018467..93cb97b4421 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala
@@ -24,7 +24,7 @@ import com.google.protobuf.ByteString
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.Expression.{Alias, ExpressionString, 
UnresolvedStar}
-import org.apache.spark.sql.{AnalysisException, Dataset}
+import org.apache.spark.sql.{AnalysisException, Dataset, Row}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
UnsafeProjection}
 import org.apache.spark.sql.catalyst.plans.logical
@@ -661,4 +661,48 @@ class SparkConnectPlannerSuite extends SparkFunSuite with 
SparkConnectPlanTest {
 .build())
 intercept[AnalysisException](Dataset.ofRows(spark, logical))
   }
+
+  test("transform SortOrder") {
+val input = proto.Relation
+  .newBuilder()
+  .setSql(
+proto.SQL
+  .newBuilder()
+  .setQuery("SELECT id FROM VALUES 
(5),(1),(2),(6),(4),(3),(7),(9),(8),(null) AS tab(id)")
+  .build())
+
+val relation = proto.Relation
+  .newBuilder()
+  .setSort(
+proto.Sort
+  .newBuilder()
+  .setInput(input)
+  .setIsGlobal(false)
+  .addOrder(
+proto.Expression.SortOrder
+  .newBuilder()
+  .setDirectionValue(
+
proto.Expression.SortOrder.SortDirection.SORT_DIRECTION_ASCENDING_VALUE)
+  
.setNullOrdering(proto.Expression.SortOrder.NullOrdering.SORT_NULLS_FIRST)
+  .setChild(proto.Expression
+.newBuilder()
+.setExpressionString(
+  

[spark] branch master updated: [SPARK-41528][CONNECT] Merge namespace of Spark Connect and PySpark API

2022-12-20 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 764edaf8b2e [SPARK-41528][CONNECT] Merge namespace of Spark Connect 
and PySpark API
764edaf8b2e is described below

commit 764edaf8b2e1c42a32e7bfa058cf8ee26ce02a9e
Author: Hyukjin Kwon 
AuthorDate: Wed Dec 21 09:13:07 2022 +0900

[SPARK-41528][CONNECT] Merge namespace of Spark Connect and PySpark API

### What changes were proposed in this pull request?

This PR proposes to merge namespaces between Spark Connect and PySpark with 
adding an CLI option `--remote` and `spark.remote` configuration as a symmetry 
of `--master` and `spark.master`.

### Why are the changes needed?

In order to provide the same user experience to the end users, see also the 
design document attached 
([here](https://docs.google.com/document/d/10XJFHnzH8a1cQq9iDf9KORAveK6uy6mBvWA8zZP7rjc/edit?usp=sharing)).

### Does this PR introduce _any_ user-facing change?

Yes, users now can use Spark Connect as below:

```
$ ./bin/pyspark --remote ...
$ ./bin/pyspark --conf spark.remote ...

...

>>> # **Same as regular PySpark from here**
... # Do something with `spark` that is a remote client
... spark.range(1)
```

```
$ ./bin/spark-submit --remote ... app.py
$ ./bin/spark-submit --conf spark.remote ... app.py

...

# **Same as regular PySpark from here**
# app.py
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Do something with `spark` that is a remote client
```

See the design document attached 
([here](https://docs.google.com/document/d/10XJFHnzH8a1cQq9iDf9KORAveK6uy6mBvWA8zZP7rjc/edit?usp=sharing)).

### How was this patch tested?

Reusing PySpark unittests of DataFrame and functions.

Closes #39041 from HyukjinKwon/prototype_merged_pyspark.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 .../org/apache/spark/deploy/PythonRunner.scala |   1 +
 .../org/apache/spark/deploy/SparkSubmit.scala  |  31 ++-
 .../apache/spark/deploy/SparkSubmitArguments.scala |  30 ++-
 dev/sparktestsupport/modules.py|   2 +
 .../spark/launcher/AbstractCommandBuilder.java |   1 +
 .../apache/spark/launcher/AbstractLauncher.java|  15 ++
 .../spark/launcher/SparkSubmitCommandBuilder.java  |  15 ++
 .../spark/launcher/SparkSubmitOptionParser.java|   2 +
 .../source/reference/pyspark.sql/spark_session.rst |   1 +
 python/pyspark/context.py  |   4 +
 python/pyspark/shell.py|  65 +++--
 python/pyspark/sql/connect/session.py  |   4 +-
 python/pyspark/sql/functions.py| 264 ++-
 python/pyspark/sql/observation.py  |   6 +-
 python/pyspark/sql/session.py  |  83 +-
 .../sql/tests/connect/test_parity_dataframe.py | 237 +
 .../sql/tests/connect/test_parity_functions.py | 292 +
 python/pyspark/sql/tests/test_dataframe.py |   6 +-
 python/pyspark/sql/tests/test_functions.py |  25 +-
 python/pyspark/sql/utils.py|  61 -
 python/pyspark/sql/window.py   |  14 +-
 python/pyspark/testing/connectutils.py |   4 +
 22 files changed, 1095 insertions(+), 68 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index c3f73ed745d..c3cb6831e39 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -74,6 +74,7 @@ object PythonRunner {
 // Launch Python process
 val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ 
otherArgs).asJava)
 val env = builder.environment()
+sparkConf.getOption("spark.remote").foreach(url => env.put("SPARK_REMOTE", 
url))
 env.put("PYTHONPATH", pythonPath)
 // This is equivalent to setting the -u flag; we use it because ipython 
doesn't support -u:
 env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a 
non-empty string
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 73acfedd8bc..745836dfbef 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -229,15 +229,20 @@ private[spark] class SparkSubmit extends Logging {
 var childMainClass = ""
 
 // Set the cluster manager
-val clusterManager: Int = args.master match {
-  case "yarn" => 

[spark] branch master updated: [SPARK-41568][SQL] Assign name to _LEGACY_ERROR_TEMP_1236

2022-12-20 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2440f699797 [SPARK-41568][SQL] Assign name to _LEGACY_ERROR_TEMP_1236
2440f699797 is described below

commit 2440f6997978ca033579a311caea561140ef76d5
Author: panbingkun 
AuthorDate: Tue Dec 20 21:16:43 2022 +0300

[SPARK-41568][SQL] Assign name to _LEGACY_ERROR_TEMP_1236

### What changes were proposed in this pull request?
In the PR, I propose to assign the name `UNSUPPORTED_FEATURE.ANALYZE_VIEW` 
to the error class `_LEGACY_ERROR_TEMP_1236`.

### Why are the changes needed?
Proper names of error classes should improve user experience with Spark SQL.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.

Closes #39119 from panbingkun/LEGACY_ERROR_TEMP_1236.

Lead-authored-by: panbingkun 
Co-authored-by: Maxim Gekk 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json   | 10 -
 .../spark/sql/errors/QueryCompilationErrors.scala  |  2 +-
 .../spark/sql/StatisticsCollectionSuite.scala  | 24 +-
 3 files changed, 20 insertions(+), 16 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 30b0a5ce8f3..b5e846a8a89 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -1309,6 +1309,11 @@
   "The ANALYZE TABLE FOR COLUMNS command does not support the type 
 of the column  in the table ."
 ]
   },
+  "ANALYZE_VIEW" : {
+"message" : [
+  "The ANALYZE TABLE command does not support views."
+]
+  },
   "CATALOG_OPERATION" : {
 "message" : [
   "Catalog  does not support ."
@@ -2895,11 +2900,6 @@
   "Partition spec is invalid. The spec () must match the 
partition spec () defined in table ''."
 ]
   },
-  "_LEGACY_ERROR_TEMP_1236" : {
-"message" : [
-  "ANALYZE TABLE is not supported on views."
-]
-  },
   "_LEGACY_ERROR_TEMP_1237" : {
 "message" : [
   "The list of partition columns with values in partition specification 
for table '' in database '' is not a prefix of the list of 
partition columns defined in the table schema. Expected a prefix of 
[], but got []."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 2ddd0704565..b0cf8f6876c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2302,7 +2302,7 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase {
 
   def analyzeTableNotSupportedOnViewsError(): Throwable = {
 new AnalysisException(
-  errorClass = "_LEGACY_ERROR_TEMP_1236",
+  errorClass = "UNSUPPORTED_FEATURE.ANALYZE_VIEW",
   messageParameters = Map.empty)
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index dda1cc5b52b..2ab8bb25a8b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -63,22 +63,26 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
   }
 
   test("analyzing views is not supported") {
-def assertAnalyzeUnsupported(analyzeCommand: String): Unit = {
-  val err = intercept[AnalysisException] {
-sql(analyzeCommand)
-  }
-  assert(err.message.contains("ANALYZE TABLE is not supported"))
-}
-
 val tableName = "tbl"
 withTable(tableName) {
   spark.range(10).write.saveAsTable(tableName)
   val viewName = "view"
   withView(viewName) {
 sql(s"CREATE VIEW $viewName AS SELECT * FROM $tableName")
-
-assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS")
-assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS 
FOR COLUMNS id")
+checkError(
+  exception = intercept[AnalysisException] {
+sql(s"ANALYZE TABLE $viewName COMPUTE STATISTICS")
+  },
+  errorClass = "UNSUPPORTED_FEATURE.ANALYZE_VIEW",
+  parameters = Map.empty
+)
+checkError(
+  exception = intercept[AnalysisException] {
+sql(s"ANALYZE TABLE $viewName COMPUTE STATISTICS FOR COLUMNS id")
+  },
+  errorClass = "UNSUPPORTED_FEATURE.ANALYZE_VIEW",
+  parameters = Map.empty
+ 

[spark] branch master updated: [SPARK-41582][CORE][SQL] Reuse `INVALID_TYPED_LITERAL` instead of `_LEGACY_ERROR_TEMP_0022`

2022-12-20 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 9840a0327a3 [SPARK-41582][CORE][SQL] Reuse `INVALID_TYPED_LITERAL` 
instead of `_LEGACY_ERROR_TEMP_0022`
9840a0327a3 is described below

commit 9840a0327a3f242877759c97d2e7bbf8b4ac1072
Author: yangjie01 
AuthorDate: Tue Dec 20 18:15:08 2022 +0300

[SPARK-41582][CORE][SQL] Reuse `INVALID_TYPED_LITERAL` instead of 
`_LEGACY_ERROR_TEMP_0022`

### What changes were proposed in this pull request?
This pr aims reuse `INVALID_TYPED_LITERAL` instead of 
`_LEGACY_ERROR_TEMP_0022`.

### Why are the changes needed?
Proper names of error classes to improve user experience with Spark SQL.

### Does this PR introduce _any_ user-facing change?
Yes, the PR changes user-facing error message.

### How was this patch tested?
Pass GitHub Actions

Closes #39122 from LuciferYang/SPARK-41582.

Authored-by: yangjie01 
Signed-off-by: Max Gekk 
---
 core/src/main/resources/error/error-classes.json   |   5 -
 .../spark/sql/catalyst/parser/AstBuilder.scala | 130 ++---
 .../spark/sql/errors/QueryParsingErrors.scala  |   9 --
 .../catalyst/parser/ExpressionParserSuite.scala|   8 +-
 .../sql-tests/results/ansi/literals.sql.out|   6 +-
 .../resources/sql-tests/results/literals.sql.out   |   6 +-
 6 files changed, 77 insertions(+), 87 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 68034a5221e..30b0a5ce8f3 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -1663,11 +1663,6 @@
   "Function trim doesn't support with type . Please use BOTH, 
LEADING or TRAILING as trim type."
 ]
   },
-  "_LEGACY_ERROR_TEMP_0022" : {
-"message" : [
-  "."
-]
-  },
   "_LEGACY_ERROR_TEMP_0023" : {
 "message" : [
   "Numeric literal  does not fit in range 
[, ] for type ."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 545d5d97d88..ea752a420d5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -2379,76 +2379,72 @@ class AstBuilder extends 
SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
   specialTs.getOrElse(toLiteral(stringToTimestamp(_, zoneId), 
TimestampType))
 }
 
-try {
-  valueType match {
-case "DATE" =>
-  val zoneId = getZoneId(conf.sessionLocalTimeZone)
-  val specialDate = convertSpecialDate(value, zoneId).map(Literal(_, 
DateType))
-  specialDate.getOrElse(toLiteral(stringToDate, DateType))
-case "TIMESTAMP_NTZ" =>
-  convertSpecialTimestampNTZ(value, 
getZoneId(conf.sessionLocalTimeZone))
-.map(Literal(_, TimestampNTZType))
-.getOrElse(toLiteral(stringToTimestampWithoutTimeZone, 
TimestampNTZType))
-case "TIMESTAMP_LTZ" =>
-  constructTimestampLTZLiteral(value)
-case "TIMESTAMP" =>
-  SQLConf.get.timestampType match {
-case TimestampNTZType =>
-  convertSpecialTimestampNTZ(value, 
getZoneId(conf.sessionLocalTimeZone))
-.map(Literal(_, TimestampNTZType))
-.getOrElse {
-  val containsTimeZonePart =
-
DateTimeUtils.parseTimestampString(UTF8String.fromString(value))._2.isDefined
-  // If the input string contains time zone part, return a 
timestamp with local time
-  // zone literal.
-  if (containsTimeZonePart) {
-constructTimestampLTZLiteral(value)
-  } else {
-toLiteral(stringToTimestampWithoutTimeZone, 
TimestampNTZType)
-  }
+valueType match {
+  case "DATE" =>
+val zoneId = getZoneId(conf.sessionLocalTimeZone)
+val specialDate = convertSpecialDate(value, zoneId).map(Literal(_, 
DateType))
+specialDate.getOrElse(toLiteral(stringToDate, DateType))
+  case "TIMESTAMP_NTZ" =>
+convertSpecialTimestampNTZ(value, getZoneId(conf.sessionLocalTimeZone))
+  .map(Literal(_, TimestampNTZType))
+  .getOrElse(toLiteral(stringToTimestampWithoutTimeZone, 
TimestampNTZType))
+  case "TIMESTAMP_LTZ" =>
+constructTimestampLTZLiteral(value)
+  case "TIMESTAMP" =>
+SQLConf.get.timestampType match {
+  case TimestampNTZType =>
+convertSpecialTimestampNTZ(value, 
getZoneId(conf.sessionLocalTimeZone))
+