[spark] branch master updated: [SPARK-41539][SQL] Remap stats and constraints against output in logical plan for LogicalRDD
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 074e1b39d27 [SPARK-41539][SQL] Remap stats and constraints against output in logical plan for LogicalRDD 074e1b39d27 is described below commit 074e1b39d279f12ff8d822a03741f33f159f5df8 Author: Jungtaek Lim AuthorDate: Wed Dec 21 15:02:18 2022 +0900 [SPARK-41539][SQL] Remap stats and constraints against output in logical plan for LogicalRDD ### What changes were proposed in this pull request? This PR proposes to remap stats and constraints against the output in logical for LogicalRDD, like we remap stats and constraints against the "new" output when we call newInstance. ### Why are the changes needed? The output in logical plan and optimized plan can be "slightly" different (we observed the difference of exprId), and then the query can fail due to the invalid attribute reference(s) in stats and constraints for LogicalRDD. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Modified test cases. Closes #39082 from HeartSaVioR/SPARK-41539. Authored-by: Jungtaek Lim Signed-off-by: Jungtaek Lim --- .../apache/spark/sql/execution/ExistingRDD.scala | 89 +- .../org/apache/spark/sql/DataFrameSuite.scala | 29 ++- 2 files changed, 98 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 3acadee5fb4..3dcf0efaadd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, Encoder, SparkSession} import org.apache.spark.sql.catalyst.InternalRow @@ -103,6 +104,8 @@ case class LogicalRDD( originConstraints: Option[ExpressionSet] = None) extends LeafNode with MultiInstanceRelation { + import LogicalRDD._ + override protected final def otherCopyArgs: Seq[AnyRef] = session :: originStats :: originConstraints :: Nil @@ -122,22 +125,8 @@ case class LogicalRDD( case e: Attribute => rewrite.getOrElse(e, e) }.asInstanceOf[SortOrder]) -val rewrittenStatistics = originStats.map { s => - Statistics( -s.sizeInBytes, -s.rowCount, -AttributeMap[ColumnStat](s.attributeStats.map { - case (attr, v) => (rewrite.getOrElse(attr, attr), v) -}), -s.isRuntime - ) -} - -val rewrittenConstraints = originConstraints.map { c => - c.map(_.transform { -case e: Attribute => rewrite.getOrElse(e, e) - }) -} +val rewrittenStatistics = originStats.map(rewriteStatistics(_, rewrite)) +val rewrittenConstraints = originConstraints.map(rewriteConstraints(_, rewrite)) LogicalRDD( output.map(rewrite), @@ -163,7 +152,7 @@ case class LogicalRDD( override lazy val constraints: ExpressionSet = originConstraints.getOrElse(ExpressionSet()) } -object LogicalRDD { +object LogicalRDD extends Logging { /** * Create a new LogicalRDD based on existing Dataset. Stats and constraints are inherited from * origin Dataset. @@ -183,16 +172,80 @@ object LogicalRDD { } } +val logicalPlan = originDataset.logicalPlan val optimizedPlan = originDataset.queryExecution.optimizedPlan val executedPlan = originDataset.queryExecution.executedPlan +val (stats, constraints) = rewriteStatsAndConstraints(logicalPlan, optimizedPlan) + LogicalRDD( originDataset.logicalPlan.output, rdd, firstLeafPartitioning(executedPlan.outputPartitioning), executedPlan.outputOrdering, isStreaming -)(originDataset.sparkSession, Some(optimizedPlan.stats), Some(optimizedPlan.constraints)) +)(originDataset.sparkSession, stats, constraints) + } + + private[sql] def buildOutputAssocForRewrite( + source: Seq[Attribute], + destination: Seq[Attribute]): Option[Map[Attribute, Attribute]] = { +// We check the name and type, allowing nullability, exprId, metadata, qualifier be different +// E.g. This could happen during optimization phase. +val rewrite = source.zip(destination).flatMap { case (attr1, attr2) => + if (attr1.name == attr2.name && attr1.dataType == attr2.dataType) { +Some(attr1 -> attr2) + } else { +None + } +}.toMap + +if (rewrite.size == source.size) { + Some(rewrite) +} else { + None +} + } + + private[sql] def
[spark] branch master updated: [SPARK-41634][BUILD] Upgrade `minimatch` to 3.1.2
This is an automated email from the ASF dual-hosted git repository. sarutak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4539260f4ac [SPARK-41634][BUILD] Upgrade `minimatch` to 3.1.2 4539260f4ac is described below commit 4539260f4ac346f22ce1a47ca9e94e3181803490 Author: Bjørn AuthorDate: Wed Dec 21 13:49:45 2022 +0900 [SPARK-41634][BUILD] Upgrade `minimatch` to 3.1.2 ### What changes were proposed in this pull request? Upgrade `minimatch` to 3.1.2 $ npm -v 9.1.2 $ npm install added 118 packages, and audited 119 packages in 2s 15 packages are looking for funding run `npm fund` for details found 0 vulnerabilities ### Why are the changes needed? [CVE-2022-3517](https://nvd.nist.gov/vuln/detail/CVE-2022-3517) ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA Closes #39143 from bjornjorgensen/upgrade-minimatch. Authored-by: Bjørn Signed-off-by: Kousuke Saruta --- dev/package-lock.json | 15 --- dev/package.json | 3 ++- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/dev/package-lock.json b/dev/package-lock.json index c2a61b389ac..104a3fb7854 100644 --- a/dev/package-lock.json +++ b/dev/package-lock.json @@ -6,7 +6,8 @@ "": { "devDependencies": { "ansi-regex": "^5.0.1", -"eslint": "^7.25.0" +"eslint": "^7.25.0", +"minimatch": "^3.1.2" } }, "node_modules/@babel/code-frame": { @@ -853,9 +854,9 @@ } }, "node_modules/minimatch": { - "version": "3.0.4", - "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.0.4.tgz;, - "integrity": "sha512-yJHVQEhyqPLUTgt9B83PXu6W3rx4MvvHvSUvToogpwoGDOUQ+yDrR0HRot+yOCdCO7u4hX3pWft6kWBBcqh0UA==", + "version": "3.1.2", + "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.1.2.tgz;, + "integrity": "sha512-J7p63hRiAjw1NDEww1W7i37+ByIrOWO5XQQAzZ3VOcL0PNybwpfmV/N05zFAzwQ9USyEcX6t3UO+K5aqBQOIHw==", "dev": true, "dependencies": { "brace-expansion": "^1.1.7" @@ -1931,9 +1932,9 @@ } }, "minimatch": { - "version": "3.0.4", - "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.0.4.tgz;, - "integrity": "sha512-yJHVQEhyqPLUTgt9B83PXu6W3rx4MvvHvSUvToogpwoGDOUQ+yDrR0HRot+yOCdCO7u4hX3pWft6kWBBcqh0UA==", + "version": "3.1.2", + "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.1.2.tgz;, + "integrity": "sha512-J7p63hRiAjw1NDEww1W7i37+ByIrOWO5XQQAzZ3VOcL0PNybwpfmV/N05zFAzwQ9USyEcX6t3UO+K5aqBQOIHw==", "dev": true, "requires": { "brace-expansion": "^1.1.7" diff --git a/dev/package.json b/dev/package.json index f975bdde831..4e4a4bf1bca 100644 --- a/dev/package.json +++ b/dev/package.json @@ -1,6 +1,7 @@ { "devDependencies": { "eslint": "^7.25.0", -"ansi-regex": "^5.0.1" +"ansi-regex": "^5.0.1", +"minimatch": "^3.1.2" } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41587][BUILD] Upgrade `org.scalatestplus:selenium-4-4` to `org.scalatestplus:selenium-4-7`
This is an automated email from the ASF dual-hosted git repository. sarutak pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ee2e582ff19 [SPARK-41587][BUILD] Upgrade `org.scalatestplus:selenium-4-4` to `org.scalatestplus:selenium-4-7` ee2e582ff19 is described below commit ee2e582ff195fa11047545f43d1cb0ebd20a7091 Author: yangjie01 AuthorDate: Wed Dec 21 13:40:40 2022 +0900 [SPARK-41587][BUILD] Upgrade `org.scalatestplus:selenium-4-4` to `org.scalatestplus:selenium-4-7` ### What changes were proposed in this pull request? This pr aims upgrade `org.scalatestplus:selenium-4-4` to `org.scalatestplus:selenium-4-7`: - `org.scalatestplus:selenium-4-4` -> `org.scalatestplus:selenium-4-7` - `selenium-java`: 4.4.0 -> 4.7.1 - `htmlunit-driver`: 3.64.0 -> 4.7.0 - `htmlunit` -> 2.64.0 -> 2.67.0 And all upgraded dependencies versions are matched. ### Why are the changes needed? The release notes as follows: - https://github.com/scalatest/scalatestplus-selenium/releases/tag/release-3.2.14.0-for-selenium-4.7 ### Does this PR introduce _any_ user-facing change? No, just for test ### How was this patch tested? - Pass Github Actions - Manual test: - ChromeUISeleniumSuite ``` build/sbt -Dguava.version=31.1-jre -Dspark.test.webdriver.chrome.driver=/path/to/chromedriver -Dtest.default.exclude.tags="" -Phive -Phive-thriftserver "core/testOnly org.apache.spark.ui.ChromeUISeleniumSuite" ``` ``` [info] ChromeUISeleniumSuite: Starting ChromeDriver 108.0.5359.71 (1e0e3868ee06e91ad636a874420e3ca3ae3756ac-refs/branch-heads/5359{#1016}) on port 13600 Only local connections are allowed. Please see https://chromedriver.chromium.org/security-considerations for suggestions on keeping ChromeDriver safe. ChromeDriver was started successfully. [info] - SPARK-31534: text for tooltip should be escaped (2 seconds, 702 milliseconds) [info] - SPARK-31882: Link URL for Stage DAGs should not depend on paged table. (824 milliseconds) [info] - SPARK-31886: Color barrier execution mode RDD correctly (313 milliseconds) [info] - Search text for paged tables should not be saved (1 second, 745 milliseconds) [info] Run completed in 10 seconds, 266 milliseconds. [info] Total number of tests run: 4 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 4, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [success] Total time: 23 s, completed 2022-12-19 19:41:26 ``` - RocksDBBackendChromeUIHistoryServerSuite ``` build/sbt -Dguava.version=31.1-jre -Dspark.test.webdriver.chrome.driver=/path/to/chromedriver -Dtest.default.exclude.tags="" -Phive -Phive-thriftserver "core/testOnly org.apache.spark.deploy.history.RocksDBBackendChromeUIHistoryServerSuite" ``` ``` [info] RocksDBBackendChromeUIHistoryServerSuite: Starting ChromeDriver 108.0.5359.71 (1e0e3868ee06e91ad636a874420e3ca3ae3756ac-refs/branch-heads/5359{#1016}) on port 2201 Only local connections are allowed. Please see https://chromedriver.chromium.org/security-considerations for suggestions on keeping ChromeDriver safe. ChromeDriver was started successfully. [info] - ajax rendered relative links are prefixed with uiRoot (spark.ui.proxyBase) (2 seconds, 362 milliseconds) [info] Run completed in 10 seconds, 254 milliseconds. [info] Total number of tests run: 1 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [success] Total time: 24 s, completed 2022-12-19 19:40:42 ``` Closes #39129 from LuciferYang/selenium-47. Authored-by: yangjie01 Signed-off-by: Kousuke Saruta --- pom.xml | 10 +- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pom.xml b/pom.xml index 5ae26570e2d..f09207c660f 100644 --- a/pom.xml +++ b/pom.xml @@ -205,9 +205,9 @@ 4.9.3 1.1 -4.4.0 -3.64.0 -2.64.0 +4.7.1 +4.7.0 +2.67.0 1.8 1.1.0 1.5.0 @@ -416,7 +416,7 @@ org.scalatestplus - selenium-4-4_${scala.binary.version} + selenium-4-7_${scala.binary.version} test @@ -1144,7 +1144,7 @@ org.scalatestplus -selenium-4-4_${scala.binary.version} +selenium-4-7_${scala.binary.version} 3.2.14.0 test - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (e23983a32df -> 7efc6f493fe)
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from e23983a32df [SPARK-41434][CONNECT][PYTHON] Initial `LambdaFunction` implementation add 7efc6f493fe [SPARK-41426][UI] Protobuf serializer for ResourceProfileWrapper No new revisions were added by this update. Summary of changes: .../apache/spark/status/protobuf/store_types.proto | 6 +++- ...plicationEnvironmentInfoWrapperSerializer.scala | 4 +-- .../protobuf/KVStoreProtobufSerializer.scala | 3 ++ ...cala => ResourceProfileWrapperSerializer.scala} | 27 +++--- .../protobuf/KVStoreProtobufSerializerSuite.scala | 42 ++ 5 files changed, 64 insertions(+), 18 deletions(-) copy core/src/main/scala/org/apache/spark/status/protobuf/{ExecutorMetricsSerializer.scala => ResourceProfileWrapperSerializer.scala} (54%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (801e07996a4 -> e23983a32df)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 801e07996a4 [SPARK-41192][CORE] Remove unscheduled speculative tasks when task finished to obtain better dynamic add e23983a32df [SPARK-41434][CONNECT][PYTHON] Initial `LambdaFunction` implementation No new revisions were added by this update. Summary of changes: .../main/protobuf/spark/connect/expressions.proto | 12 ++ .../sql/connect/planner/SparkConnectPlanner.scala | 34 + python/pyspark/sql/connect/column.py | 29 python/pyspark/sql/connect/functions.py| 149 - .../pyspark/sql/connect/proto/expressions_pb2.py | 69 ++ .../pyspark/sql/connect/proto/expressions_pb2.pyi | 42 ++ .../sql/tests/connect/test_connect_function.py | 32 + 7 files changed, 302 insertions(+), 65 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41192][CORE] Remove unscheduled speculative tasks when task finished to obtain better dynamic
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 801e07996a4 [SPARK-41192][CORE] Remove unscheduled speculative tasks when task finished to obtain better dynamic 801e07996a4 is described below commit 801e07996a4d4ea448b6fc468cc6c9d6904ceef2 Author: wangyazhi AuthorDate: Tue Dec 20 21:35:37 2022 -0600 [SPARK-41192][CORE] Remove unscheduled speculative tasks when task finished to obtain better dynamic ### What changes were proposed in this pull request? ExecutorAllocationManager only record count for speculative task, `stageAttemptToNumSpeculativeTasks` increment when speculative task submit, and only decrement when speculative task end. If task finished before speculative task start, the speculative task will never be scheduled, which will cause leak of `stageAttemptToNumSpeculativeTasks` and mislead the calculation of target executors. This PR fixes the issue by add task index in `SparkListenerSpeculativeTaskSubmitted` event, and record speculative task with task index when submitted, task index should be removed when speculative task start or task finished(whether it is speculative or not) ### Why are the changes needed? To fix idle executors caused by pending speculative task from task that has been finished ### Does this PR introduce _any_ user-facing change? DeveloperApi `SparkListenerSpeculativeTaskSubmitted` add taskIndex with default value -1 ### How was this patch tested? Add a comprehensive unit test. Pass the GA Closes #38711 from toujours33/SPARK-41192. Lead-authored-by: wangyazhi Co-authored-by: toujours33 Signed-off-by: Mridul Muralidharan gmail.com> --- .../apache/spark/ExecutorAllocationManager.scala | 38 + .../org/apache/spark/scheduler/DAGScheduler.scala | 14 ++-- .../apache/spark/scheduler/DAGSchedulerEvent.scala | 2 +- .../org/apache/spark/scheduler/SparkListener.scala | 16 +++- .../apache/spark/scheduler/TaskSetManager.scala| 2 +- .../spark/ExecutorAllocationManagerSuite.scala | 97 -- .../spark/scheduler/TaskSetManagerSuite.scala | 2 +- 7 files changed, 139 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 204ffc39a11..f06312c15cf 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -643,10 +643,12 @@ private[spark] class ExecutorAllocationManager( // Should be 0 when no stages are active. private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int] private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]] -// Number of speculative tasks pending/running in each stageAttempt -private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int] -// The speculative tasks started in each stageAttempt +// Map from each stageAttempt to a set of running speculative task indexes +// TODO(SPARK-41192): We simply need an Int for this. private val stageAttemptToSpeculativeTaskIndices = + new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]() +// Map from each stageAttempt to a set of pending speculative task indexes +private val stageAttemptToPendingSpeculativeTasks = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]] private val resourceProfileIdToStageAttempt = @@ -722,7 +724,7 @@ private[spark] class ExecutorAllocationManager( // because the attempt may still have running tasks, // even after another attempt for the stage is submitted. stageAttemptToNumTasks -= stageAttempt -stageAttemptToNumSpeculativeTasks -= stageAttempt +stageAttemptToPendingSpeculativeTasks -= stageAttempt stageAttemptToTaskIndices -= stageAttempt stageAttemptToSpeculativeTaskIndices -= stageAttempt stageAttemptToExecutorPlacementHints -= stageAttempt @@ -733,7 +735,9 @@ private[spark] class ExecutorAllocationManager( // If this is the last stage with pending tasks, mark the scheduler queue as empty // This is needed in case the stage is aborted for any reason -if (stageAttemptToNumTasks.isEmpty && stageAttemptToNumSpeculativeTasks.isEmpty) { +if (stageAttemptToNumTasks.isEmpty + && stageAttemptToPendingSpeculativeTasks.isEmpty + && stageAttemptToSpeculativeTaskIndices.isEmpty) { allocationManager.onSchedulerQueueEmpty() } } @@ -751,6 +755,8 @@ private[spark] class ExecutorAllocationManager( if
[spark] branch master updated (940946515bd -> fd6d226528e)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 940946515bd [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit` add fd6d226528e [SPARK-41631][SQL] Support implicit lateral column alias resolution on Aggregate No new revisions were added by this update. Summary of changes: core/src/main/resources/error/error-classes.json | 5 + .../spark/sql/catalyst/analysis/Analyzer.scala | 32 +- .../ResolveLateralColumnAliasReference.scala | 107 +++- .../spark/sql/errors/QueryCompilationErrors.scala | 15 +- .../org/apache/spark/sql/internal/SQLConf.scala| 2 +- .../apache/spark/sql/LateralColumnAliasSuite.scala | 613 ++--- .../scala/org/apache/spark/sql/QueryTest.scala | 2 +- 7 files changed, 674 insertions(+), 102 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 940946515bd [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit` 940946515bd is described below commit 940946515bd199930051be89f9fd557a35f2af0d Author: Jiaan Geng AuthorDate: Wed Dec 21 11:31:00 2022 +0900 [SPARK-41440][CONNECT][PYTHON] Implement `DataFrame.randomSplit` ### What changes were proposed in this pull request? Implement `DataFrame.randomSplit` with a proto message Implement `DataFrame.randomSplit` for scala API Implement `DataFrame.randomSplit` for python API ### Why are the changes needed? for Connect API coverage ### Does this PR introduce _any_ user-facing change? 'No'. New API ### How was this patch tested? New test cases. Closes #39017 from beliefer/SPARK-41440. Authored-by: Jiaan Geng Signed-off-by: Hyukjin Kwon --- .../main/protobuf/spark/connect/relations.proto| 4 ++ .../org/apache/spark/sql/connect/dsl/package.scala | 34 ++ .../sql/connect/planner/SparkConnectPlanner.scala | 24 +++- .../connect/planner/SparkConnectProtoSuite.scala | 16 + python/pyspark/sql/connect/dataframe.py| 57 + python/pyspark/sql/connect/plan.py | 3 + python/pyspark/sql/connect/proto/relations_pb2.py | 72 +++--- python/pyspark/sql/connect/proto/relations_pb2.pyi | 18 ++ .../sql/tests/connect/test_connect_basic.py| 15 + .../sql/tests/connect/test_connect_plan_only.py| 39 10 files changed, 244 insertions(+), 38 deletions(-) diff --git a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto index 2f83db1176a..42471821634 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto @@ -317,6 +317,10 @@ message Sample { // (Optional) The random seed. optional int64 seed = 5; + + // (Optional) Explicitly sort the underlying plan to make the ordering deterministic. + // This flag is only used to randomly splits DataFrame with the provided weights. + optional bool force_stable_sort = 6; } // Relation of type [[Range]] that generates a sequence of integers. diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala index 8211dc21bde..bce8d390fcb 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala @@ -27,6 +27,7 @@ import org.apache.spark.connect.proto.SetOperation.SetOpType import org.apache.spark.sql.SaveMode import org.apache.spark.sql.connect.planner.DataTypeProtoConverter import org.apache.spark.sql.connect.planner.LiteralValueProtoConverter.toConnectProtoValue +import org.apache.spark.util.Utils /** * A collection of implicit conversions that create a DSL for constructing connect protos. @@ -775,6 +776,39 @@ package object dsl { valueColumnName: String): Relation = unpivot(ids, variableColumnName, valueColumnName) + def randomSplit(weights: Array[Double], seed: Long): Array[Relation] = { +require( + weights.forall(_ >= 0), + s"Weights must be nonnegative, but got ${weights.mkString("[", ",", "]")}") +require( + weights.sum > 0, + s"Sum of weights must be positive, but got ${weights.mkString("[", ",", "]")}") + +val sum = weights.toSeq.sum +val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _) +normalizedCumWeights + .sliding(2) + .map { x => +Relation + .newBuilder() + .setSample( +Sample + .newBuilder() + .setInput(logicalPlan) + .setLowerBound(x(0)) + .setUpperBound(x(1)) + .setWithReplacement(false) + .setSeed(seed) + .setForceStableSort(true) + .build()) + .build() + } + .toArray + } + + def randomSplit(weights: Array[Double]): Array[Relation] = +randomSplit(weights, Utils.random.nextLong) + private def createSetOperation( left: Relation, right: Relation, diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
[spark] branch master updated (d33a59c940f -> ff68d0ef945)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from d33a59c940f [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks add ff68d0ef945 [SPARK-41566][BUILD] Upgrade `netty` to 4.1.86.Final No new revisions were added by this update. Summary of changes: dev/deps/spark-deps-hadoop-2-hive-2.3 | 36 +-- dev/deps/spark-deps-hadoop-3-hive-2.3 | 36 +-- pom.xml | 6 +- 3 files changed, 41 insertions(+), 37 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark-docker] branch master updated: [SPARK-40520] Add support to generate DOI mainifest
This is an automated email from the ASF dual-hosted git repository. yikun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark-docker.git The following commit(s) were added to refs/heads/master by this push: new 7bb8661 [SPARK-40520] Add support to generate DOI mainifest 7bb8661 is described below commit 7bb8661f7d57356f94fd5874696df1b1c058cb0b Author: Yikun Jiang AuthorDate: Wed Dec 21 10:15:44 2022 +0800 [SPARK-40520] Add support to generate DOI mainifest ### What changes were proposed in this pull request? This patch add support to generate DOI mainifest from versions.json. ### Why are the changes needed? To help generate DOI mainifest ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? ```shell $ flake8 ./tools/manifest.py --max-line-length=100 $ black ./tools/manifest.py All done! ✨ ✨ 1 file left unchanged. ``` ```shell $ tools/manifest.py manifest Maintainers: Apache Spark Developers (ApacheSpark) GitRepo: https://github.com/apache/spark-docker.git Tags: 3.3.1-scala2.12-java11-python3-ubuntu, 3.3.1-python3, 3.3.1, python3, latest Architectures: amd64, arm64v8 GitCommit: 496edb6dee0ade08bc5d180d7a6da0ff8b5d91ff Directory: ./3.3.1/scala2.12-java11-python3-ubuntu Tags: 3.3.1-scala2.12-java11-r-ubuntu, 3.3.1-r, r Architectures: amd64, arm64v8 GitCommit: 496edb6dee0ade08bc5d180d7a6da0ff8b5d91ff Directory: ./3.3.1/scala2.12-java11-r-ubuntu // ... ... ``` Closes #27 from Yikun/SPARK-40520. Authored-by: Yikun Jiang Signed-off-by: Yikun Jiang --- tools/manifest.py | 34 -- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/tools/manifest.py b/tools/manifest.py index fbfad6f..13bc631 100755 --- a/tools/manifest.py +++ b/tools/manifest.py @@ -19,7 +19,33 @@ from argparse import ArgumentParser import json -from statistics import mode +import subprocess + + +def run_cmd(cmd): +if isinstance(cmd, list): +return subprocess.check_output(cmd).decode("utf-8") +else: +return subprocess.check_output(cmd.split(" ")).decode("utf-8") + + +def generate_manifest(versions): +output = ( +"Maintainers: Apache Spark Developers (@ApacheSpark)\n" +"GitRepo: https://github.com/apache/spark-docker.git\n\n; +) +git_commit = run_cmd("git rev-parse HEAD").replace("\n", "") +content = ( +"Tags: %s\n" +"Architectures: amd64, arm64v8\n" +"GitCommit: %s\n" +"Directory: ./%s\n\n" +) +for version in versions: +tags = ", ".join(version["tags"]) +path = version["path"] +output += content % (tags, git_commit, path) +return output def parse_opts(): @@ -27,7 +53,7 @@ def parse_opts(): parser.add_argument( dest="mode", -choices=["tags"], +choices=["tags", "manifest"], type=str, help="The print mode of script", ) @@ -76,6 +102,10 @@ def main(): # Get matched version's tags tags = versions[0]["tags"] if versions else [] print(",".join(["%s:%s" % (image, t) for t in tags])) +elif mode == "manifest": +with open(version_file, "r") as f: +versions = json.load(f).get("versions") +print(generate_manifest(versions)) if __name__ == "__main__": - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d33a59c940f [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks d33a59c940f is described below commit d33a59c940f0e8f0b93d91cc9e700c2cb533d54e Author: SandishKumarHN AuthorDate: Wed Dec 21 09:37:15 2022 +0800 [SPARK-41396][SQL][PROTOBUF] OneOf field support and recursion checks Oneof fields allow a message to contain one and only one of a defined set of field types, while recursive fields provide a way to define messages that can refer to themselves, allowing for the creation of complex and nested data structures. with this change users will be able to use protobuf OneOf fields with spark-protobuf, making it a more complete and useful tool for processing protobuf data. **Support for circularReferenceDepth:** The `recursive.fields.max.depth` parameter can be specified in the from_protobuf options to control the maximum allowed recursion depth for a field. Setting `recursive.fields.max.depth` to 0 drops all-recursive fields, setting it to 1 allows it to be recursed once, and setting it to 2 allows it to be recursed twice. Attempting to set the `recursive.fields.max.depth` to a value greater than 10 is not allowed. If the `recursive.fields.max.depth` is not specified, it will default to -1; [...] SQL Schema for the protobuf message ``` message Person { string name = 1; Person bff = 2 } ``` will vary based on the value of `recursive.fields.max.depth`. ``` 0: struct 1: struct> 2: struct>> ... ``` ### What changes were proposed in this pull request? - Add support for protobuf oneof field - Stop recursion at the first level when a recursive field is encountered. (instead of throwing an error) ### Why are the changes needed? Stop recursion at the first level and handle nulltype in deserilization. ### Does this PR introduce _any_ user-facing change? NA ### How was this patch tested? Added Unit tests for OneOf field support and recursion checks. Tested full support for nested OneOf fields and message types using real data from Kafka on a real cluster cc: rangadi mposdev21 Closes #38922 from SandishKumarHN/SPARK-41396. Authored-by: SandishKumarHN Signed-off-by: Wenchen Fan --- .../sql/protobuf/ProtobufDataToCatalyst.scala | 2 +- .../spark/sql/protobuf/ProtobufDeserializer.scala | 8 +- .../spark/sql/protobuf/utils/ProtobufOptions.scala | 8 + .../sql/protobuf/utils/SchemaConverters.scala | 69 ++- .../test/resources/protobuf/functions_suite.desc | Bin 6678 -> 8739 bytes .../test/resources/protobuf/functions_suite.proto | 85 ++- .../sql/protobuf/ProtobufFunctionsSuite.scala | 576 - core/src/main/resources/error/error-classes.json | 2 +- 8 files changed, 721 insertions(+), 29 deletions(-) diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala index c0997b1bd06..da44f94d5ea 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDataToCatalyst.scala @@ -39,7 +39,7 @@ private[protobuf] case class ProtobufDataToCatalyst( override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType) override lazy val dataType: DataType = { -val dt = SchemaConverters.toSqlType(messageDescriptor).dataType +val dt = SchemaConverters.toSqlType(messageDescriptor, protobufOptions).dataType parseMode match { // With PermissiveMode, the output Catalyst row might contain columns of null values for // corrupt records, even if some of the columns are not nullable in the user-provided schema. diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala index 46366ba268b..224e22c0f52 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala @@ -156,6 +156,9 @@ private[sql] class ProtobufDeserializer( (protoType.getJavaType, catalystType) match { case (null, NullType) => (updater, ordinal, _) => updater.setNullAt(ordinal) + // It is possible that this will result in data being dropped, This is intentional, + // to catch recursive fields and drop them as necessary. + case (MESSAGE, NullType) =>
[spark] branch master updated: [MINOR] Fix some typos
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 52e4b31903c [MINOR] Fix some typos 52e4b31903c is described below commit 52e4b31903cde37bef24a5abf808b11615845867 Author: Liu Chunbo AuthorDate: Wed Dec 21 10:36:40 2022 +0900 [MINOR] Fix some typos What changes were proposed in this pull request? Fix some typos in the code comments. Why are the changes needed? Modify these two comment mistakes and make code comments more standardized. Does this PR introduce any user-facing change? No How was this patch tested? No test required Closes #39111 from for08/SPARK-41560. Authored-by: Liu Chunbo Signed-off-by: Hyukjin Kwon --- .../src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java| 2 +- .../main/java/org/apache/spark/network/protocol/MessageWithHeader.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java index 2d573f51243..4dd8cec2900 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java @@ -68,7 +68,7 @@ public abstract class ManagedBuffer { public abstract ManagedBuffer release(); /** - * Convert the buffer into an Netty object, used to write the data out. The return value is either + * Convert the buffer into a Netty object, used to write the data out. The return value is either * a {@link io.netty.buffer.ByteBuf} or a {@link io.netty.channel.FileRegion}. * * If this method returns a ByteBuf, then that buffer's reference count will be incremented and diff --git a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java index 19eeddb842c..dfcb1c642eb 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java +++ b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java @@ -140,7 +140,7 @@ class MessageWithHeader extends AbstractFileRegion { // SPARK-24578: cap the sub-region's size of returned nio buffer to improve the performance // for the case that the passed-in buffer has too many components. int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT); -// If the ByteBuf holds more then one ByteBuffer we should better call nioBuffers(...) +// If the ByteBuf holds more than one ByteBuffer we should better call nioBuffers(...) // to eliminate extra memory copies. int written = 0; if (buf.nioBufferCount() == 1) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (7ed4d448daa -> 746167a1638)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 7ed4d448daa [SPARK-41334][CONNECT][FOLLOWUP] Handle SortOrder Expression add 746167a1638 [SPARK-41584][BUILD] Upgrade RoaringBitmap to 0.9.36 No new revisions were added by this update. Summary of changes: core/benchmarks/MapStatusesConvertBenchmark-jdk11-results.txt | 10 +- core/benchmarks/MapStatusesConvertBenchmark-jdk17-results.txt | 10 +- core/benchmarks/MapStatusesConvertBenchmark-results.txt | 8 dev/deps/spark-deps-hadoop-2-hive-2.3 | 4 ++-- dev/deps/spark-deps-hadoop-3-hive-2.3 | 4 ++-- pom.xml | 2 +- 6 files changed, 19 insertions(+), 19 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41334][CONNECT][FOLLOWUP] Handle SortOrder Expression
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7ed4d448daa [SPARK-41334][CONNECT][FOLLOWUP] Handle SortOrder Expression 7ed4d448daa is described below commit 7ed4d448daab372b2c5cc846f1f66f70f7fd574c Author: dengziming AuthorDate: Wed Dec 21 08:56:03 2022 +0800 [SPARK-41334][CONNECT][FOLLOWUP] Handle SortOrder Expression ### What changes were proposed in this pull request? in #39090 we moved `SortOrder` proto from relations to expressions, in this PR we add logic to handle it. ### Why are the changes needed? [](https://github.com/dengziming/spark/pull/new/SortOrder) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? A unit test and existing tests. Closes #39138 from dengziming/SortOrder. Authored-by: dengziming Signed-off-by: Ruifeng Zheng --- .../sql/connect/planner/SparkConnectPlanner.scala | 5 ++- .../connect/planner/SparkConnectPlannerSuite.scala | 46 +- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 3bdf3654c68..9fe9acd354d 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -480,6 +480,7 @@ class SparkConnectPlanner(session: SparkSession) { case proto.Expression.ExprTypeCase.CAST => transformCast(exp.getCast) case proto.Expression.ExprTypeCase.UNRESOLVED_REGEX => transformUnresolvedRegex(exp.getUnresolvedRegex) + case proto.Expression.ExprTypeCase.SORT_ORDER => transformSortOrder(exp.getSortOrder) case _ => throw InvalidPlanInput( s"Expression with ID: ${exp.getExprTypeCase.getNumber} is not supported") @@ -699,10 +700,10 @@ class SparkConnectPlanner(session: SparkSession) { logical.Sort( child = transformRelation(sort.getInput), global = sort.getIsGlobal, - order = sort.getOrderList.asScala.toSeq.map(transformSortOrderExpression)) + order = sort.getOrderList.asScala.toSeq.map(transformSortOrder)) } - private def transformSortOrderExpression(order: proto.Expression.SortOrder) = { + private def transformSortOrder(order: proto.Expression.SortOrder) = { expressions.SortOrder( child = transformExpression(order.getChild), direction = order.getDirection match { diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala index 2e0aa018467..93cb97b4421 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala @@ -24,7 +24,7 @@ import com.google.protobuf.ByteString import org.apache.spark.SparkFunSuite import org.apache.spark.connect.proto import org.apache.spark.connect.proto.Expression.{Alias, ExpressionString, UnresolvedStar} -import org.apache.spark.sql.{AnalysisException, Dataset} +import org.apache.spark.sql.{AnalysisException, Dataset, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{AttributeReference, UnsafeProjection} import org.apache.spark.sql.catalyst.plans.logical @@ -661,4 +661,48 @@ class SparkConnectPlannerSuite extends SparkFunSuite with SparkConnectPlanTest { .build()) intercept[AnalysisException](Dataset.ofRows(spark, logical)) } + + test("transform SortOrder") { +val input = proto.Relation + .newBuilder() + .setSql( +proto.SQL + .newBuilder() + .setQuery("SELECT id FROM VALUES (5),(1),(2),(6),(4),(3),(7),(9),(8),(null) AS tab(id)") + .build()) + +val relation = proto.Relation + .newBuilder() + .setSort( +proto.Sort + .newBuilder() + .setInput(input) + .setIsGlobal(false) + .addOrder( +proto.Expression.SortOrder + .newBuilder() + .setDirectionValue( + proto.Expression.SortOrder.SortDirection.SORT_DIRECTION_ASCENDING_VALUE) + .setNullOrdering(proto.Expression.SortOrder.NullOrdering.SORT_NULLS_FIRST) + .setChild(proto.Expression +.newBuilder() +.setExpressionString( +
[spark] branch master updated: [SPARK-41528][CONNECT] Merge namespace of Spark Connect and PySpark API
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 764edaf8b2e [SPARK-41528][CONNECT] Merge namespace of Spark Connect and PySpark API 764edaf8b2e is described below commit 764edaf8b2e1c42a32e7bfa058cf8ee26ce02a9e Author: Hyukjin Kwon AuthorDate: Wed Dec 21 09:13:07 2022 +0900 [SPARK-41528][CONNECT] Merge namespace of Spark Connect and PySpark API ### What changes were proposed in this pull request? This PR proposes to merge namespaces between Spark Connect and PySpark with adding an CLI option `--remote` and `spark.remote` configuration as a symmetry of `--master` and `spark.master`. ### Why are the changes needed? In order to provide the same user experience to the end users, see also the design document attached ([here](https://docs.google.com/document/d/10XJFHnzH8a1cQq9iDf9KORAveK6uy6mBvWA8zZP7rjc/edit?usp=sharing)). ### Does this PR introduce _any_ user-facing change? Yes, users now can use Spark Connect as below: ``` $ ./bin/pyspark --remote ... $ ./bin/pyspark --conf spark.remote ... ... >>> # **Same as regular PySpark from here** ... # Do something with `spark` that is a remote client ... spark.range(1) ``` ``` $ ./bin/spark-submit --remote ... app.py $ ./bin/spark-submit --conf spark.remote ... app.py ... # **Same as regular PySpark from here** # app.py from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() # Do something with `spark` that is a remote client ``` See the design document attached ([here](https://docs.google.com/document/d/10XJFHnzH8a1cQq9iDf9KORAveK6uy6mBvWA8zZP7rjc/edit?usp=sharing)). ### How was this patch tested? Reusing PySpark unittests of DataFrame and functions. Closes #39041 from HyukjinKwon/prototype_merged_pyspark. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- .../org/apache/spark/deploy/PythonRunner.scala | 1 + .../org/apache/spark/deploy/SparkSubmit.scala | 31 ++- .../apache/spark/deploy/SparkSubmitArguments.scala | 30 ++- dev/sparktestsupport/modules.py| 2 + .../spark/launcher/AbstractCommandBuilder.java | 1 + .../apache/spark/launcher/AbstractLauncher.java| 15 ++ .../spark/launcher/SparkSubmitCommandBuilder.java | 15 ++ .../spark/launcher/SparkSubmitOptionParser.java| 2 + .../source/reference/pyspark.sql/spark_session.rst | 1 + python/pyspark/context.py | 4 + python/pyspark/shell.py| 65 +++-- python/pyspark/sql/connect/session.py | 4 +- python/pyspark/sql/functions.py| 264 ++- python/pyspark/sql/observation.py | 6 +- python/pyspark/sql/session.py | 83 +- .../sql/tests/connect/test_parity_dataframe.py | 237 + .../sql/tests/connect/test_parity_functions.py | 292 + python/pyspark/sql/tests/test_dataframe.py | 6 +- python/pyspark/sql/tests/test_functions.py | 25 +- python/pyspark/sql/utils.py| 61 - python/pyspark/sql/window.py | 14 +- python/pyspark/testing/connectutils.py | 4 + 22 files changed, 1095 insertions(+), 68 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index c3f73ed745d..c3cb6831e39 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -74,6 +74,7 @@ object PythonRunner { // Launch Python process val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava) val env = builder.environment() +sparkConf.getOption("spark.remote").foreach(url => env.put("SPARK_REMOTE", url)) env.put("PYTHONPATH", pythonPath) // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u: env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 73acfedd8bc..745836dfbef 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -229,15 +229,20 @@ private[spark] class SparkSubmit extends Logging { var childMainClass = "" // Set the cluster manager -val clusterManager: Int = args.master match { - case "yarn" =>
[spark] branch master updated: [SPARK-41568][SQL] Assign name to _LEGACY_ERROR_TEMP_1236
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2440f699797 [SPARK-41568][SQL] Assign name to _LEGACY_ERROR_TEMP_1236 2440f699797 is described below commit 2440f6997978ca033579a311caea561140ef76d5 Author: panbingkun AuthorDate: Tue Dec 20 21:16:43 2022 +0300 [SPARK-41568][SQL] Assign name to _LEGACY_ERROR_TEMP_1236 ### What changes were proposed in this pull request? In the PR, I propose to assign the name `UNSUPPORTED_FEATURE.ANALYZE_VIEW` to the error class `_LEGACY_ERROR_TEMP_1236`. ### Why are the changes needed? Proper names of error classes should improve user experience with Spark SQL. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA. Closes #39119 from panbingkun/LEGACY_ERROR_TEMP_1236. Lead-authored-by: panbingkun Co-authored-by: Maxim Gekk Signed-off-by: Max Gekk --- core/src/main/resources/error/error-classes.json | 10 - .../spark/sql/errors/QueryCompilationErrors.scala | 2 +- .../spark/sql/StatisticsCollectionSuite.scala | 24 +- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 30b0a5ce8f3..b5e846a8a89 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -1309,6 +1309,11 @@ "The ANALYZE TABLE FOR COLUMNS command does not support the type of the column in the table ." ] }, + "ANALYZE_VIEW" : { +"message" : [ + "The ANALYZE TABLE command does not support views." +] + }, "CATALOG_OPERATION" : { "message" : [ "Catalog does not support ." @@ -2895,11 +2900,6 @@ "Partition spec is invalid. The spec () must match the partition spec () defined in table ''." ] }, - "_LEGACY_ERROR_TEMP_1236" : { -"message" : [ - "ANALYZE TABLE is not supported on views." -] - }, "_LEGACY_ERROR_TEMP_1237" : { "message" : [ "The list of partition columns with values in partition specification for table '' in database '' is not a prefix of the list of partition columns defined in the table schema. Expected a prefix of [], but got []." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 2ddd0704565..b0cf8f6876c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -2302,7 +2302,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { def analyzeTableNotSupportedOnViewsError(): Throwable = { new AnalysisException( - errorClass = "_LEGACY_ERROR_TEMP_1236", + errorClass = "UNSUPPORTED_FEATURE.ANALYZE_VIEW", messageParameters = Map.empty) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index dda1cc5b52b..2ab8bb25a8b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -63,22 +63,26 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared } test("analyzing views is not supported") { -def assertAnalyzeUnsupported(analyzeCommand: String): Unit = { - val err = intercept[AnalysisException] { -sql(analyzeCommand) - } - assert(err.message.contains("ANALYZE TABLE is not supported")) -} - val tableName = "tbl" withTable(tableName) { spark.range(10).write.saveAsTable(tableName) val viewName = "view" withView(viewName) { sql(s"CREATE VIEW $viewName AS SELECT * FROM $tableName") - -assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS") -assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS FOR COLUMNS id") +checkError( + exception = intercept[AnalysisException] { +sql(s"ANALYZE TABLE $viewName COMPUTE STATISTICS") + }, + errorClass = "UNSUPPORTED_FEATURE.ANALYZE_VIEW", + parameters = Map.empty +) +checkError( + exception = intercept[AnalysisException] { +sql(s"ANALYZE TABLE $viewName COMPUTE STATISTICS FOR COLUMNS id") + }, + errorClass = "UNSUPPORTED_FEATURE.ANALYZE_VIEW", + parameters = Map.empty +
[spark] branch master updated: [SPARK-41582][CORE][SQL] Reuse `INVALID_TYPED_LITERAL` instead of `_LEGACY_ERROR_TEMP_0022`
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 9840a0327a3 [SPARK-41582][CORE][SQL] Reuse `INVALID_TYPED_LITERAL` instead of `_LEGACY_ERROR_TEMP_0022` 9840a0327a3 is described below commit 9840a0327a3f242877759c97d2e7bbf8b4ac1072 Author: yangjie01 AuthorDate: Tue Dec 20 18:15:08 2022 +0300 [SPARK-41582][CORE][SQL] Reuse `INVALID_TYPED_LITERAL` instead of `_LEGACY_ERROR_TEMP_0022` ### What changes were proposed in this pull request? This pr aims reuse `INVALID_TYPED_LITERAL` instead of `_LEGACY_ERROR_TEMP_0022`. ### Why are the changes needed? Proper names of error classes to improve user experience with Spark SQL. ### Does this PR introduce _any_ user-facing change? Yes, the PR changes user-facing error message. ### How was this patch tested? Pass GitHub Actions Closes #39122 from LuciferYang/SPARK-41582. Authored-by: yangjie01 Signed-off-by: Max Gekk --- core/src/main/resources/error/error-classes.json | 5 - .../spark/sql/catalyst/parser/AstBuilder.scala | 130 ++--- .../spark/sql/errors/QueryParsingErrors.scala | 9 -- .../catalyst/parser/ExpressionParserSuite.scala| 8 +- .../sql-tests/results/ansi/literals.sql.out| 6 +- .../resources/sql-tests/results/literals.sql.out | 6 +- 6 files changed, 77 insertions(+), 87 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 68034a5221e..30b0a5ce8f3 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -1663,11 +1663,6 @@ "Function trim doesn't support with type . Please use BOTH, LEADING or TRAILING as trim type." ] }, - "_LEGACY_ERROR_TEMP_0022" : { -"message" : [ - "." -] - }, "_LEGACY_ERROR_TEMP_0023" : { "message" : [ "Numeric literal does not fit in range [, ] for type ." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 545d5d97d88..ea752a420d5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2379,76 +2379,72 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit specialTs.getOrElse(toLiteral(stringToTimestamp(_, zoneId), TimestampType)) } -try { - valueType match { -case "DATE" => - val zoneId = getZoneId(conf.sessionLocalTimeZone) - val specialDate = convertSpecialDate(value, zoneId).map(Literal(_, DateType)) - specialDate.getOrElse(toLiteral(stringToDate, DateType)) -case "TIMESTAMP_NTZ" => - convertSpecialTimestampNTZ(value, getZoneId(conf.sessionLocalTimeZone)) -.map(Literal(_, TimestampNTZType)) -.getOrElse(toLiteral(stringToTimestampWithoutTimeZone, TimestampNTZType)) -case "TIMESTAMP_LTZ" => - constructTimestampLTZLiteral(value) -case "TIMESTAMP" => - SQLConf.get.timestampType match { -case TimestampNTZType => - convertSpecialTimestampNTZ(value, getZoneId(conf.sessionLocalTimeZone)) -.map(Literal(_, TimestampNTZType)) -.getOrElse { - val containsTimeZonePart = - DateTimeUtils.parseTimestampString(UTF8String.fromString(value))._2.isDefined - // If the input string contains time zone part, return a timestamp with local time - // zone literal. - if (containsTimeZonePart) { -constructTimestampLTZLiteral(value) - } else { -toLiteral(stringToTimestampWithoutTimeZone, TimestampNTZType) - } +valueType match { + case "DATE" => +val zoneId = getZoneId(conf.sessionLocalTimeZone) +val specialDate = convertSpecialDate(value, zoneId).map(Literal(_, DateType)) +specialDate.getOrElse(toLiteral(stringToDate, DateType)) + case "TIMESTAMP_NTZ" => +convertSpecialTimestampNTZ(value, getZoneId(conf.sessionLocalTimeZone)) + .map(Literal(_, TimestampNTZType)) + .getOrElse(toLiteral(stringToTimestampWithoutTimeZone, TimestampNTZType)) + case "TIMESTAMP_LTZ" => +constructTimestampLTZLiteral(value) + case "TIMESTAMP" => +SQLConf.get.timestampType match { + case TimestampNTZType => +convertSpecialTimestampNTZ(value, getZoneId(conf.sessionLocalTimeZone)) +