[spark] branch master updated: [SPARK-40992][CONNECT] Support toDF(columnNames) in Connect DSL
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e1382c566b7 [SPARK-40992][CONNECT] Support toDF(columnNames) in Connect DSL e1382c566b7 is described below commit e1382c566b7b2ba324fec1aed6556325ebe43f7b Author: Rui Wang AuthorDate: Wed Nov 9 15:48:24 2022 +0800 [SPARK-40992][CONNECT] Support toDF(columnNames) in Connect DSL ### What changes were proposed in this pull request? Add `RenameColumns` to proto to support the implementation for `toDF(columnNames: String*)` which renames the input relation to a different set of column names. ### Why are the changes needed? Improve API coverage. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT Closes #38475 from amaliujia/SPARK-40992. Authored-by: Rui Wang Signed-off-by: Wenchen Fan --- .../main/protobuf/spark/connect/relations.proto| 12 ++ .../org/apache/spark/sql/connect/dsl/package.scala | 10 ++ .../sql/connect/planner/SparkConnectPlanner.scala | 9 ++ .../connect/planner/SparkConnectProtoSuite.scala | 4 + python/pyspark/sql/connect/proto/relations_pb2.py | 126 +++-- python/pyspark/sql/connect/proto/relations_pb2.pyi | 44 +++ 6 files changed, 143 insertions(+), 62 deletions(-) diff --git a/connector/connect/src/main/protobuf/spark/connect/relations.proto b/connector/connect/src/main/protobuf/spark/connect/relations.proto index dd03bd86940..cce9f3b939e 100644 --- a/connector/connect/src/main/protobuf/spark/connect/relations.proto +++ b/connector/connect/src/main/protobuf/spark/connect/relations.proto @@ -47,6 +47,7 @@ message Relation { Range range = 15; SubqueryAlias subquery_alias = 16; Repartition repartition = 17; +RenameColumns rename_columns = 18; StatFunction stat_function = 100; @@ -274,3 +275,14 @@ message StatFunction { } } +// Rename columns on the input relation. +message RenameColumns { + // Required. The input relation. + Relation input = 1; + + // Required. + // + // The number of columns of the input relation must be equal to the length + // of this field. If this is not true, an exception will be returned. + repeated string column_names = 2; +} diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala index 3e68b101057..d6f7a6756c3 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala @@ -457,6 +457,16 @@ package object dsl { .build() } + def toDF(columnNames: String*): Relation = +Relation + .newBuilder() + .setRenameColumns( +RenameColumns + .newBuilder() + .setInput(logicalPlan) + .addAllColumnNames(columnNames.asJava)) + .build() + private def createSetOperation( left: Relation, right: Relation, diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 3bbdbf80276..87716c702b5 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -69,6 +69,8 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) { case proto.Relation.RelTypeCase.REPARTITION => transformRepartition(rel.getRepartition) case proto.Relation.RelTypeCase.STAT_FUNCTION => transformStatFunction(rel.getStatFunction) + case proto.Relation.RelTypeCase.RENAME_COLUMNS => +transformRenameColumns(rel.getRenameColumns) case proto.Relation.RelTypeCase.RELTYPE_NOT_SET => throw new IndexOutOfBoundsException("Expected Relation to be set, but is empty.") case _ => throw InvalidPlanInput(s"${rel.getUnknown} not supported.") @@ -133,6 +135,13 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) { } } + private def transformRenameColumns(rel: proto.RenameColumns): LogicalPlan = { +Dataset + .ofRows(session, transformRelation(rel.getInput)) + .toDF(rel.getColumnNamesList.asScala.toSeq: _*) + .logicalPlan + } + private def transformDeduplicate(rel: proto.Deduplicate): LogicalPlan = { if (!rel.hasInput) { throw InvalidPlanInput("Deduplicate needs a plan input") diff --git
[spark] branch master updated: [SPARK-41056][R] Fix new R_LIBS_SITE behavior introduced in R 4.2
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a2dbfeefa98 [SPARK-41056][R] Fix new R_LIBS_SITE behavior introduced in R 4.2 a2dbfeefa98 is described below commit a2dbfeefa98e561734c10fc25cd44017882313d8 Author: Hyukjin Kwon AuthorDate: Wed Nov 9 16:41:07 2022 +0900 [SPARK-41056][R] Fix new R_LIBS_SITE behavior introduced in R 4.2 ### What changes were proposed in this pull request? This PR proposes to keep the `R_LIBS_SITE` as was. It has been changed from R 4.2. ### Why are the changes needed? To keep the behaviour same as the previous. This especially affects the external libraries installed in SparkR worker sides. Especially this can break the user-installed libraries. See the paths below: **R 4.2** ```r # R > Sys.getenv("R_LIBS_SITE") [1] "/usr/local/lib/R/site-library/:/usr/lib/R/site-library:/usr/lib/R/library'" # R --vanilla > Sys.getenv("R_LIBS_SITE") [1] "/usr/lib/R/site-library" ``` **R 4.1** ```r # R > Sys.getenv("R_LIBS_SITE") [1] "/usr/local/lib/R/site-library:/usr/lib/R/site-library:/usr/lib/R/library" # R --vanilla > Sys.getenv("R_LIBS_SITE") [1] "/usr/local/lib/R/site-library:/usr/lib/R/site-library:/usr/lib/R/library" ``` ### Does this PR introduce _any_ user-facing change? Yes. With R 4.2, user-installed libraries won't be found in SparkR workers. ### How was this patch tested? Manually tested, unittest added. It's difficult to add an e2e tests. I also manually tested `getROptions` in Scala shall. Closes #38570 from HyukjinKwon/SPARK-41056. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- .../scala/org/apache/spark/api/r/BaseRRunner.scala | 12 +++- .../org/apache/spark/api/r/BaseRRunnerSuite.scala | 36 ++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/r/BaseRRunner.scala b/core/src/main/scala/org/apache/spark/api/r/BaseRRunner.scala index fdfe5f5b41d..0f93873c06a 100644 --- a/core/src/main/scala/org/apache/spark/api/r/BaseRRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/r/BaseRRunner.scala @@ -278,6 +278,16 @@ private[r] object BaseRRunner { thread } + private[r] def getROptions(rCommand: String): String = Try { +val result = scala.sys.process.Process(Seq(rCommand, "--version")).!! +"([0-9]+)\\.([0-9]+)\\.([0-9]+)".r.findFirstMatchIn(result).map { m => + val major = m.group(1).toInt + val minor = m.group(2).toInt + val shouldUseNoRestore = major > 4 || major == 4 && minor >= 2 + if (shouldUseNoRestore) "--no-restore" else "--vanilla" +}.getOrElse("--vanilla") + }.getOrElse("--vanilla") + private def createRProcess(port: Int, script: String): BufferedStreamThread = { // "spark.sparkr.r.command" is deprecated and replaced by "spark.r.command", // but kept here for backward compatibility. @@ -286,7 +296,7 @@ private[r] object BaseRRunner { rCommand = sparkConf.get(R_COMMAND).orElse(Some(rCommand)).get val rConnectionTimeout = sparkConf.get(R_BACKEND_CONNECTION_TIMEOUT) -val rOptions = "--vanilla" +val rOptions = getROptions(rCommand) val rLibDir = RUtils.sparkRPackagePath(isDriver = false) val rExecScript = rLibDir(0) + "/SparkR/worker/" + script val pb = new ProcessBuilder(Arrays.asList(rCommand, rOptions, rExecScript)) diff --git a/core/src/test/scala/org/apache/spark/api/r/BaseRRunnerSuite.scala b/core/src/test/scala/org/apache/spark/api/r/BaseRRunnerSuite.scala new file mode 100644 index 000..01dd7df3a1a --- /dev/null +++ b/core/src/test/scala/org/apache/spark/api/r/BaseRRunnerSuite.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.r + +import org.apache.spark.SparkFunSuite +import org.apache.spark.TestUtils.testCommandAvailable +import
[spark] branch master updated: [SPARK-41058][CONNECT] Remove unused import in commands.proto
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6858ba95bf7 [SPARK-41058][CONNECT] Remove unused import in commands.proto 6858ba95bf7 is described below commit 6858ba95bf7ecf37b2bee540cad3b9317f13781b Author: dengziming AuthorDate: Wed Nov 9 14:42:28 2022 +0800 [SPARK-41058][CONNECT] Remove unused import in commands.proto ### What changes were proposed in this pull request? expressions.proto is not used in commands.proto ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? github CI Closes #38491 from dengziming/minor-import. Authored-by: dengziming Signed-off-by: Wenchen Fan --- .../src/main/protobuf/spark/connect/commands.proto | 1 - .../sql/connect/planner/SparkConnectPlanner.scala | 7 - .../service/SparkConnectStreamHandler.scala| 3 --- python/pyspark/sql/connect/proto/commands_pb2.py | 31 +++--- 4 files changed, 15 insertions(+), 27 deletions(-) diff --git a/connector/connect/src/main/protobuf/spark/connect/commands.proto b/connector/connect/src/main/protobuf/spark/connect/commands.proto index bc8bb478122..79c6cffdf60 100644 --- a/connector/connect/src/main/protobuf/spark/connect/commands.proto +++ b/connector/connect/src/main/protobuf/spark/connect/commands.proto @@ -17,7 +17,6 @@ syntax = 'proto3'; -import "spark/connect/expressions.proto"; import "spark/connect/relations.proto"; import "spark/connect/types.proto"; diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 6a5808bc77f..3bbdbf80276 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.connect.planner -import scala.annotation.elidable.byName import scala.collection.JavaConverters._ import org.apache.spark.connect.proto @@ -49,12 +48,6 @@ class SparkConnectPlanner(plan: proto.Relation, session: SparkSession) { // The root of the query plan is a relation and we apply the transformations to it. private def transformRelation(rel: proto.Relation): LogicalPlan = { -val common = if (rel.hasCommon) { - Some(rel.getCommon) -} else { - None -} - rel.getRelTypeCase match { case proto.Relation.RelTypeCase.READ => transformReadRel(rel.getRead) case proto.Relation.RelTypeCase.PROJECT => transformProject(rel.getProject) diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala index a429823c02f..58fc6237867 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.connect.command.SparkConnectCommandPlanner import org.apache.spark.sql.connect.planner.SparkConnectPlanner import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, QueryStageExec} -import org.apache.spark.sql.internal.SQLConf class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) extends Logging { @@ -58,8 +57,6 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte } def processRows(clientId: String, rows: DataFrame): Unit = { -val timeZoneId = SQLConf.get.sessionLocalTimeZone - // Only process up to 10MB of data. val sb = new StringBuilder var rowCount = 0 diff --git a/python/pyspark/sql/connect/proto/commands_pb2.py b/python/pyspark/sql/connect/proto/commands_pb2.py index 905c621011f..fa05b6ff76c 100644 --- a/python/pyspark/sql/connect/proto/commands_pb2.py +++ b/python/pyspark/sql/connect/proto/commands_pb2.py @@ -28,13 +28,12 @@ from google.protobuf import symbol_database as _symbol_database _sym_db = _symbol_database.Default() -from pyspark.sql.connect.proto import expressions_pb2 as spark_dot_connect_dot_expressions__pb2 from pyspark.sql.connect.proto import relations_pb2 as spark_dot_connect_dot_relations__pb2 from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__pb2 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( -
[spark] branch master updated (0add57a1c02 -> 2071c960fc1)
This is an automated email from the ASF dual-hosted git repository. srowen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 0add57a1c02 [SPARK-41035][SQL] Don't patch foldable children of aggregate functions in `RewriteDistinctAggregates` add 2071c960fc1 [SPARK-41039][BUILD] Upgrade `scala-parallel-collections` to 1.0.4 for Scala 2.13 No new revisions were added by this update. Summary of changes: pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark-docker] branch master updated: [SPARK-40569][TESTS] Add smoke test in standalone cluster for spark-docker
This is an automated email from the ASF dual-hosted git repository. yikun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark-docker.git The following commit(s) were added to refs/heads/master by this push: new 52152c1 [SPARK-40569][TESTS] Add smoke test in standalone cluster for spark-docker 52152c1 is described below commit 52152c1b6d70acc2e7c5e32bffe0265b55df7b6f Author: Qian.Sun AuthorDate: Wed Nov 9 09:34:47 2022 +0800 [SPARK-40569][TESTS] Add smoke test in standalone cluster for spark-docker ### What changes were proposed in this pull request? This PR aims to add smoke test in standalone cluster for spark-docker repo. ### Why are the changes needed? Verify spark docker works normally in standalone cluster. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New test in GA. Closes #21 from dcoliversun/SPARK-40569. Authored-by: Qian.Sun Signed-off-by: Yikun Jiang --- .github/workflows/main.yml | 3 + testing/run_tests.sh | 25 ++ testing/testing.sh | 207 + 3 files changed, 235 insertions(+) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 08bba68..accf8ae 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -110,6 +110,9 @@ jobs: platforms: linux/amd64,linux/arm64 push: true + - name : Test - Run spark application for standalone cluster on docker +run: testing/run_tests.sh --image-url $IMAGE_URL --scala-version ${{ matrix.scala_version }} --spark-version ${{ matrix.spark_version }} + - name: Test - Checkout Spark repository uses: actions/checkout@v3 with: diff --git a/testing/run_tests.sh b/testing/run_tests.sh new file mode 100755 index 000..c612dcd --- /dev/null +++ b/testing/run_tests.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +set -eo errexit + +SCRIPT_DIR=$(cd "$(dirname "$0")" && pwd) + +. "${SCRIPT_DIR}/testing.sh" + +echo "Test successfully finished" diff --git a/testing/testing.sh b/testing/testing.sh new file mode 100755 index 000..d399d6d --- /dev/null +++ b/testing/testing.sh @@ -0,0 +1,207 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This test script runs a simple smoke test in standalone cluster: +# - create docker network +# - start up a master +# - start up a worker +# - wait for the web UI endpoint to return successfully +# - run a simple smoke test in standalone cluster +# - clean up test resource + +CURL_TIMEOUT=1 +CURL_COOLDOWN=1 +CURL_MAX_TRIES=30 + +NETWORK_NAME=spark-net-bridge + +SUBMIT_CONTAINER_NAME=spark-submit +MASTER_CONTAINER_NAME=spark-master +WORKER_CONTAINER_NAME=spark-worker +SPARK_MASTER_PORT=7077 +SPARK_MASTER_WEBUI_CONTAINER_PORT=8080 +SPARK_MASTER_WEBUI_HOST_PORT=8080 +SPARK_WORKER_WEBUI_CONTAINER_PORT=8081 +SPARK_WORKER_WEBUI_HOST_PORT=8081 + +SCALA_VERSION="2.12" +SPARK_VERSION="3.3.0" +IMAGE_URL= + +# Create a new docker bridge network +function create_network() { + if [ ! -z $(docker network ls --filter name=^${NETWORK_NAME}$ --format="{{ .Name }}") ]; then +# bridge network already exists, need to kill containers attached to the network and remove network +cleanup +remove_network + fi + docker network create --driver bridge
[spark] branch master updated (4f096dba9d2 -> 0add57a1c02)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 4f096dba9d2 [SPARK-40852][CONNECT][PYTHON] Introduce `StatFunction` in proto and implement `DataFrame.summary` add 0add57a1c02 [SPARK-41035][SQL] Don't patch foldable children of aggregate functions in `RewriteDistinctAggregates` No new revisions were added by this update. Summary of changes: .../sql/catalyst/optimizer/RewriteDistinctAggregates.scala | 2 +- .../scala/org/apache/spark/sql/DataFrameAggregateSuite.scala | 10 ++ 2 files changed, 11 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40852][CONNECT][PYTHON] Introduce `StatFunction` in proto and implement `DataFrame.summary`
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4f096dba9d2 [SPARK-40852][CONNECT][PYTHON] Introduce `StatFunction` in proto and implement `DataFrame.summary` 4f096dba9d2 is described below commit 4f096dba9d2c28cfd8595ac58417025fdb2d7073 Author: Ruifeng Zheng AuthorDate: Wed Nov 9 09:19:50 2022 +0800 [SPARK-40852][CONNECT][PYTHON] Introduce `StatFunction` in proto and implement `DataFrame.summary` ### What changes were proposed in this pull request? Implement `DataFrame.summary` there is a set of DataFrame APIs implemented in [`StatFunctions`](https://github.com/apache/spark/blob/9cae423075145d3dd81d53f4b82d4f2af6fe7c15/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala), [`DataFrameStatFunctions`](https://github.com/apache/spark/blob/b69c26833c99337bb17922f21dd72ee3a12e0c0a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala) and [`DataFrameNaFunctions`](https://github.com/apache/spark/blob/5d74ace648422e7a [...] 1. depend on Catalyst's analysis (most of them); ~~2. implemented in RDD operations (like `summary`,`approxQuantile`);~~ (resolved by reimpl) ~~3. internally trigger jobs (like `summary`);~~ (resolved by reimpl) This PR introduced a new proto `StatFunction` to support `StatFunctions` method ### Why are the changes needed? for Connect API coverage ### Does this PR introduce _any_ user-facing change? yes, new API ### How was this patch tested? added UT Closes #38318 from zhengruifeng/connect_df_summary. Authored-by: Ruifeng Zheng Signed-off-by: Wenchen Fan --- .../main/protobuf/spark/connect/relations.proto| 20 .../org/apache/spark/sql/connect/dsl/package.scala | 16 +++ .../sql/connect/planner/SparkConnectPlanner.scala | 18 +++- .../connect/planner/SparkConnectProtoSuite.scala | 6 ++ python/pyspark/sql/connect/dataframe.py| 10 ++ python/pyspark/sql/connect/plan.py | 38 +++ python/pyspark/sql/connect/proto/relations_pb2.py | 120 +++-- python/pyspark/sql/connect/proto/relations_pb2.pyi | 68 .../sql/tests/connect/test_connect_plan_only.py| 15 +++ 9 files changed, 252 insertions(+), 59 deletions(-) diff --git a/connector/connect/src/main/protobuf/spark/connect/relations.proto b/connector/connect/src/main/protobuf/spark/connect/relations.proto index 36113e2a30c..dd03bd86940 100644 --- a/connector/connect/src/main/protobuf/spark/connect/relations.proto +++ b/connector/connect/src/main/protobuf/spark/connect/relations.proto @@ -48,6 +48,8 @@ message Relation { SubqueryAlias subquery_alias = 16; Repartition repartition = 17; +StatFunction stat_function = 100; + Unknown unknown = 999; } } @@ -254,3 +256,21 @@ message Repartition { // Optional. Default value is false. bool shuffle = 3; } + +// StatFunction +message StatFunction { + // Required. The input relation. + Relation input = 1; + // Required. The function and its parameters. + oneof function { +Summary summary = 2; + +Unknown unknown = 999; + } + + // StatFunctions.summary + message Summary { +repeated string statistics = 1; + } +} + diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala index 2755727de11..3e68b101057 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala +++ b/connector/connect/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala @@ -441,6 +441,22 @@ package object dsl { Repartition.newBuilder().setInput(logicalPlan).setNumPartitions(num).setShuffle(true)) .build() + def summary(statistics: String*): Relation = { +Relation + .newBuilder() + .setStatFunction( +proto.StatFunction + .newBuilder() + .setInput(logicalPlan) + .setSummary( +proto.StatFunction.Summary + .newBuilder() + .addAllStatistics(statistics.toSeq.asJava) + .build()) + .build()) + .build() + } + private def createSetOperation( left: Relation, right: Relation, diff --git a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 1615fc56ab6..6a5808bc77f 100644 --- a/connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++
[spark] branch master updated (4eef44ee9fb -> 8ee12bb3d46)
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 4eef44ee9fb [SPARK-41050][BUILD] Upgrade scalafmt from 3.5.9 to 3.6.1 add 8ee12bb3d46 Revert "[SPARK-38550][SQL][CORE] Use a disk-based store to save more debug information for live UI" No new revisions were added by this update. Summary of changes: .../org/apache/spark/internal/config/Status.scala | 8 -- .../org/apache/spark/status/AppStatusStore.scala | 27 + docs/monitoring.md | 11 -- .../org/apache/spark/sql/internal/SQLConf.scala| 11 -- .../spark/sql/diagnostic/DiagnosticListener.scala | 112 - .../spark/sql/diagnostic/DiagnosticStore.scala | 71 - .../spark/sql/execution/QueryExecution.scala | 6 +- .../apache/spark/sql/execution/SQLExecution.scala | 6 +- .../spark/sql/execution/ui/SQLListener.scala | 6 +- .../apache/spark/sql/internal/SharedState.scala| 7 -- .../status/api/v1/sql/ApiSqlRootResource.scala | 11 -- .../status/api/v1/sql/SQLDiagnosticResource.scala | 67 .../org/apache/spark/status/api/v1/sql/api.scala | 10 -- 13 files changed, 8 insertions(+), 345 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticListener.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/diagnostic/DiagnosticStore.scala delete mode 100644 sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SQLDiagnosticResource.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41050][BUILD] Upgrade scalafmt from 3.5.9 to 3.6.1
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4eef44ee9fb [SPARK-41050][BUILD] Upgrade scalafmt from 3.5.9 to 3.6.1 4eef44ee9fb is described below commit 4eef44ee9fb2ec90580cfb1c1933ce2460a187ee Author: panbingkun AuthorDate: Tue Nov 8 18:22:31 2022 -0600 [SPARK-41050][BUILD] Upgrade scalafmt from 3.5.9 to 3.6.1 ### What changes were proposed in this pull request? The pr aims to upgrade scalafmt from 3.5.9 to 3.6.1 ### Why are the changes needed? A. Release note: > https://github.com/scalameta/scalafmt/releases B. V3.5.9 VS V3.6.1 > https://github.com/scalameta/scalafmt/compare/v3.5.9...v3.6.1 C. Bring bug fix: https://user-images.githubusercontent.com/15246973/200554901-ac6678f8-a865-4aae-bace-5a6ba4fc9804.png;> https://user-images.githubusercontent.com/15246973/200554977-1ad218df-d8b0-426f-ac71-0697852bbaec.png;> ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually run: sh ./dev/scalafmt Closes #38559 from panbingkun/upgrade_scalafmt_3_6_1. Authored-by: panbingkun Signed-off-by: Sean Owen --- dev/.scalafmt.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/.scalafmt.conf b/dev/.scalafmt.conf index f9c908b6680..e06ea5bbfd2 100644 --- a/dev/.scalafmt.conf +++ b/dev/.scalafmt.conf @@ -32,4 +32,4 @@ fileOverride { runner.dialect = scala213 } } -version = 3.5.9 +version = 3.6.1 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41043][SQL] Rename the error class `_LEGACY_ERROR_TEMP_2429` to `NUM_COLUMNS_MISMATCH`
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new be74ee79d5f [SPARK-41043][SQL] Rename the error class `_LEGACY_ERROR_TEMP_2429` to `NUM_COLUMNS_MISMATCH` be74ee79d5f is described below commit be74ee79d5f6a9bad02f5254fa3c32308ea7263f Author: Max Gekk AuthorDate: Tue Nov 8 23:25:19 2022 +0300 [SPARK-41043][SQL] Rename the error class `_LEGACY_ERROR_TEMP_2429` to `NUM_COLUMNS_MISMATCH` ### What changes were proposed in this pull request? In the PR, I propose to assign the proper name `NUM_COLUMNS_MISMATCH ` to the legacy error class `_LEGACY_ERROR_TEMP_2429 `, and modify test suite to use `checkError()` which checks the error class name, context and etc. ### Why are the changes needed? Proper name improves user experience w/ Spark SQL. ### Does this PR introduce _any_ user-facing change? Yes, the PR changes an user-facing error message. ### How was this patch tested? By running the modified tests: ``` $ build/sbt "test:testOnly *DataFrameSetOperationsSuite" $ PYSPARK_PYTHON=python3 build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite" ``` Closes #38537 from MaxGekk/columns-num-mismatch. Authored-by: Max Gekk Signed-off-by: Max Gekk --- core/src/main/resources/error/error-classes.json | 10 +- .../spark/sql/catalyst/analysis/CheckAnalysis.scala| 14 +++--- .../resources/sql-tests/results/except-all.sql.out | 10 +- .../resources/sql-tests/results/intersect-all.sql.out | 10 +- .../sql-tests/results/udf/udf-except-all.sql.out | 10 +- .../sql-tests/results/udf/udf-intersect-all.sql.out| 10 +- .../apache/spark/sql/DataFrameSetOperationsSuite.scala | 18 +++--- 7 files changed, 43 insertions(+), 39 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 5107dd1778a..57fe79ef184 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -758,6 +758,11 @@ ], "sqlState" : "22005" }, + "NUM_COLUMNS_MISMATCH" : { +"message" : [ + " can only be performed on tables with the same number of columns, but the first table has columns and the table has columns." +] + }, "ORDER_BY_POS_OUT_OF_RANGE" : { "message" : [ "ORDER BY position is not in select list (valid range is [1, ])." @@ -5033,11 +5038,6 @@ "The sum of the LIMIT clause and the OFFSET clause must not be greater than the maximum 32-bit integer value (2,147,483,647) but found limit = , offset = ." ] }, - "_LEGACY_ERROR_TEMP_2429" : { -"message" : [ - " can only be performed on tables with the same number of columns, but the first table has columns and the table has columns." -] - }, "_LEGACY_ERROR_TEMP_2430" : { "message" : [ " can only be performed on tables with compatible column types. The column of the table is type which is not compatible with at the same column of the first table." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 544bb3cc301..9e41bcebe47 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_WINDOW_EXPRESSION import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, StringUtils, TypeUtils} import org.apache.spark.sql.connector.catalog.{LookupCatalog, SupportsPartitionManagement} -import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils @@ -38,7 +38,7 @@ import org.apache.spark.util.Utils /** * Throws user facing errors when passed invalid queries that fail to analyze. */ -trait CheckAnalysis extends PredicateHelper with LookupCatalog { +trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsBase { protected def isView(nameParts: Seq[String]): Boolean @@ -541,12 +541,12 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { // Check the number of columns if (child.output.length != ref.length) { e.failAnalysis( - errorClass = "_LEGACY_ERROR_TEMP_2429", +
[spark] branch master updated: [SPARK-41051][CORE] Optimize ProcfsMetrics file acquisition
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 1d29403bf0f [SPARK-41051][CORE] Optimize ProcfsMetrics file acquisition 1d29403bf0f is described below commit 1d29403bf0fbeaebb427b8d8fa863744d689074c Author: mask AuthorDate: Tue Nov 8 12:24:20 2022 -0600 [SPARK-41051][CORE] Optimize ProcfsMetrics file acquisition What changes were proposed in this pull request? Reuse variables from declared procfs files instead of duplicate code Why are the changes needed? The cost of looking up the config is often insignificant, but there reduce some duplicate code Does this PR introduce any user-facing change? No. How was this patch tested? Existing unit tests. Closes #41051 from sus/[SPARK-41051](https://issues.apache.org/jira/browse/SPARK-41051). Closes #38563 from Narcasserun/optimizer_val. Authored-by: mask Signed-off-by: Mridul gmail.com> --- core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala index 547d9df4e4f..2c9ccbc5d86 100644 --- a/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala +++ b/core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala @@ -170,7 +170,7 @@ private[spark] class ProcfsMetricsGetter(procfsDir: String = "/proc/") extends L try { val pidDir = new File(procfsDir, pid.toString) def openReader(): BufferedReader = { -val f = new File(new File(procfsDir, pid.toString), procfsStatFile) +val f = new File(pidDir, procfsStatFile) new BufferedReader(new InputStreamReader(new FileInputStream(f), UTF_8)) } Utils.tryWithResource(openReader) { in => - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40096][CORE][TESTS][FOLLOW-UP] Fix flaky test case
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0b770e18bd7 [SPARK-40096][CORE][TESTS][FOLLOW-UP] Fix flaky test case 0b770e18bd7 is described below commit 0b770e18bd7250a8215f2e593fa38d3094f7106c Author: Kun Wan AuthorDate: Tue Nov 8 12:12:34 2022 -0600 [SPARK-40096][CORE][TESTS][FOLLOW-UP] Fix flaky test case ### What changes were proposed in this pull request? This PR is a followup of https://github.com/apache/spark/pull/37533 that fix the flaky test case. ### Why are the changes needed? The test case is flaky, and will failure due to some unexpected error. https://github.com/apache/spark/pull/37989 https://github.com/apache/spark/actions/runs/3145115911/jobs/5112006948 https://github.com/apache/spark/actions/runs/3146198025/jobs/5114387367 ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? CI in this PR should verify that. Closes #38091 from wankunde/SPARK-40096-2. Authored-by: Kun Wan Signed-off-by: Mridul gmail.com> --- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 9 - 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 59e725e2b75..f4e67eba40d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -4514,14 +4514,13 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti initPushBasedShuffleConfs(conf) sc.conf.set("spark.shuffle.push.results.timeout", "1s") - val myScheduler = new MyDAGScheduler( + val scheduler = new DAGScheduler( sc, taskScheduler, sc.listenerBus, mapOutputTracker, blockManagerMaster, -sc.env, -shuffleMergeFinalize = false) +sc.env) val mergerLocs = Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")) val timeoutSecs = 1 @@ -4554,9 +4553,9 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti val shuffleMapRdd = new MyRDD(sc, 1, Nil) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2)) shuffleDep.setMergerLocs(mergerLocs) - val shuffleStage = myScheduler.createShuffleMapStage(shuffleDep, 0) + val shuffleStage = scheduler.createShuffleMapStage(shuffleDep, 0) - myScheduler.finalizeShuffleMerge(shuffleStage, registerMergeResults) + scheduler.finalizeShuffleMerge(shuffleStage, registerMergeResults) sendRequestsLatch.await() verify(blockStoreClient, times(2)) .finalizeShuffleMerge(any(), any(), any(), any(), any()) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41040][SS] Fix self-union streaming query failure when using readStream.table
This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7074e4fee7e [SPARK-41040][SS] Fix self-union streaming query failure when using readStream.table 7074e4fee7e is described below commit 7074e4fee7e6944013cfaa3c0c2a1458cce8a72d Author: Shixiong Zhu AuthorDate: Tue Nov 8 08:31:24 2022 -0800 [SPARK-41040][SS] Fix self-union streaming query failure when using readStream.table ### What changes were proposed in this pull request? https://github.com/apache/spark/pull/36963 added a check to disallow any source setting `CatalogTable` in the batch plan. However, this check is not safe to enforce: - In a self-union query, the batch plan created by the source will be shared by multiple nodes in the plan. When we transform the plan, the batch plan will be visited multiple times. Hence, the first visit will set the `CatalogTable` and the second visit will try to set it again and fail the query. - A source built by arbitrary developers can set `CatalogTable` in the batch plan. We should not fail as it would break an existing source. This PR fixes the issue by removing the check and set `CatalogTable` only if the batch plan doesn't have one. ### Why are the changes needed? Fix a bug in master. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The new added unit test Closes #38553 from zsxwing/SPARK-41040. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu --- .../sql/execution/streaming/MicroBatchExecution.scala | 18 +++--- .../sql/streaming/test/DataStreamTableAPISuite.scala | 13 + 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 7ed19b35114..051e45c71e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -621,10 +621,22 @@ class MicroBatchExecution( if (hasFileMetadata) { newRelation = newRelation.withMetadataColumns() } - catalogTable.foreach { table => -assert(newRelation.catalogTable.isEmpty, + // If the catalog table is not set in the batch plan generated by the source, we will + // pick up the one from `StreamingExecutionRelation`. Otherwise, we will skip this + // step. The skipping can happen in the following cases: + // - We re-visit the same `StreamingExecutionRelation`. For example, self-union will + // share the same `StreamingExecutionRelation` and `transform` will visit it twice. + // This is safe to skip. + // - A source that sets the catalog table explicitly. We will pick up the one provided + // by the source directly to maintain the same behavior. + if (newRelation.catalogTable.isEmpty) { +catalogTable.foreach { table => + newRelation = newRelation.copy(catalogTable = Some(table)) +} + } else if (catalogTable.exists(_ ne newRelation.catalogTable.get)) { +// Output a warning if `catalogTable` is provided by the source rather than engine +logWarning( s"Source $source should not produce the information of catalog table by its own.") -newRelation = newRelation.copy(catalogTable = Some(table)) } newRelation } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala index 0d1242fbb19..6bbf2239dbf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala @@ -484,6 +484,19 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter { } } + test("SPARK-41040: self-union using readStream.table should not fail") { +withTable("self_union_table") { + spark.range(10).write.format("parquet").saveAsTable("self_union_table") + val df = spark.readStream.format("parquet").table("self_union_table") + val q = df.union(df).writeStream.format("noop").start() + try { +q.processAllAvailable() + } finally { +q.stop() + } +} + } + private def
[spark] branch master updated: [SPARK-41045][SQL] Pre-compute to eliminate ScalaReflection calls after deserializer is created
This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ef402edff91 [SPARK-41045][SQL] Pre-compute to eliminate ScalaReflection calls after deserializer is created ef402edff91 is described below commit ef402edff91377d37c0c1b8d40921ed7bd9f7160 Author: Shixiong Zhu AuthorDate: Tue Nov 8 08:18:50 2022 -0800 [SPARK-41045][SQL] Pre-compute to eliminate ScalaReflection calls after deserializer is created ### What changes were proposed in this pull request? Currently when `ScalaReflection` returns a deserializer, for a few complex types, such as array, map, udt, etc, it creates functions that may still touch `ScalaReflection` after the deserializer is created. `ScalaReflection` is a performance bottleneck for multiple threads as it holds multiple global locks. We can refactor `ScalaReflection.deserializerFor` to pre-compute everything that needs to touch `ScalaReflection` before creating the deserializer. After this, once the deserializer is created, it can be reused by multiple threads without touching `ScalaReflection.deserializerFor` any more and it will be much faster. ### Why are the changes needed? Optimize `ScalaReflection.deserializerFor` to make deserializers faster under multiple threads. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? This is refactoring `deserializerFor` to optimize the code. Existing tests should already cover the correctness. Closes #38556 from zsxwing/scala-ref. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu --- .../sql/catalyst/DeserializerBuildHelper.scala | 5 +- .../spark/sql/catalyst/JavaTypeInference.scala | 8 +- .../spark/sql/catalyst/ScalaReflection.scala | 157 +++-- 3 files changed, 85 insertions(+), 85 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala index 0d3b9977e4f..7051c2d2264 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala @@ -49,10 +49,9 @@ object DeserializerBuildHelper { dataType: DataType, nullable: Boolean, walkedTypePath: WalkedTypePath, - funcForCreatingDeserializer: (Expression, WalkedTypePath) => Expression): Expression = { + funcForCreatingDeserializer: Expression => Expression): Expression = { val casted = upCastToExpectedType(expr, dataType, walkedTypePath) -expressionWithNullSafety(funcForCreatingDeserializer(casted, walkedTypePath), - nullable, walkedTypePath) +expressionWithNullSafety(funcForCreatingDeserializer(casted), nullable, walkedTypePath) } def expressionWithNullSafety( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala index dccaf1c4835..827807055ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -218,9 +218,7 @@ object JavaTypeInference { // Assumes we are deserializing the first column of a row. deserializerForWithNullSafetyAndUpcast(GetColumnByOrdinal(0, dataType), dataType, - nullable = nullable, walkedTypePath, (casted, walkedTypePath) => { -deserializerFor(typeToken, casted, walkedTypePath) - }) + nullable = nullable, walkedTypePath, deserializerFor(typeToken, _, walkedTypePath)) } private def deserializerFor( @@ -280,7 +278,7 @@ object JavaTypeInference { dataType, nullable = elementNullable, newTypePath, -(casted, typePath) => deserializerFor(typeToken.getComponentType, casted, typePath)) +deserializerFor(typeToken.getComponentType, _, newTypePath)) } val arrayData = UnresolvedMapObjects(mapFunction, path) @@ -309,7 +307,7 @@ object JavaTypeInference { dataType, nullable = elementNullable, newTypePath, -(casted, typePath) => deserializerFor(et, casted, typePath)) +deserializerFor(et, _, newTypePath)) } UnresolvedMapObjects(mapFunction, path, customCollectionCls = Some(c)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index 12093b9f4b2..d895a0fbe19 100644 ---
[spark] branch master updated (0d435411ec5 -> fabea7101ea)
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 0d435411ec5 [SPARK-41029][SQL] Optimize constructor use of `GenericArrayData` for Scala 2.13 add fabea7101ea [SPARK-41042][SQL] Rename `PARSE_CHAR_MISSING_LENGTH` to `DATATYPE_MISSING_SIZE` No new revisions were added by this update. Summary of changes: core/src/main/resources/error/error-classes.json | 12 ++-- .../org/apache/spark/sql/errors/QueryParsingErrors.scala | 2 +- .../apache/spark/sql/catalyst/parser/ErrorParserSuite.scala | 6 +++--- 3 files changed, 10 insertions(+), 10 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41029][SQL] Optimize constructor use of `GenericArrayData` for Scala 2.13
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0d435411ec5 [SPARK-41029][SQL] Optimize constructor use of `GenericArrayData` for Scala 2.13 0d435411ec5 is described below commit 0d435411ec5c69e6fd94636986f9749abbcf09a1 Author: yangjie01 AuthorDate: Tue Nov 8 08:42:35 2022 -0600 [SPARK-41029][SQL] Optimize constructor use of `GenericArrayData` for Scala 2.13 ### What changes were proposed in this pull request? This pr change to use a more appropriate constructor when the input is `ArrayBuffer` or `Empty Collection` to improve the construction performance of `GenericArrayData` with Scala 2.13. ### Why are the changes needed? Minor performance improvement. `GenericArrayData ` has the following constructor https://github.com/apache/spark/blob/57d492556768eb341f525ce7eb5c934089fa9e7e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GenericArrayData.scala#L30 When the input type is `ArrayBuffer`, the following code is similar in Spark ``` new GenericArrayData(arrayBuffer.toSeq) ``` For Scala 2.12, there will be no performance gap between `new GenericArrayData(arrayBuffer.toSeq)` and `new GenericArrayData(arrayBuffer)`. However, when Scala 2.13 is used, there will be a performance gap, because 'toSeq' will cause a redundant memory copy. For the following test case: ```scala val valuesPerIteration: Long = 1000 * 1000 * 10 val buffer = if (bufferSize == 0) { ArrayBuffer.empty[Any] } else { ArrayBuffer.fill[Any](bufferSize)(() => 1) } val benchmark = new Benchmark(s"constructor with buffer size = $bufferSize", valuesPerIteration, output = output) benchmark.addCase("toSeq and construct") { _ => var n = 0 while (n < valuesPerIteration) { new GenericArrayData(buffer.toSeq) n += 1 } } benchmark.addCase("construct directly") { _ => var n = 0 while (n < valuesPerIteration) { new GenericArrayData(buffer) n += 1 } } ``` When bufferSize=10, there is a performance gap of more than 5 times between a and b, and the performance gap increases almost linearly with the increase of bufferSize There will be more than 5 times performance gap between `new GenericArrayData(buffer.toSeq)` and `new GenericArrayData(buffer)` when `bufferSize = 10` and the performance gap will increase with the increase of bufferSize. ``` OpenJDK 64-Bit Server VM 1.8.0_345-b01 on Linux 5.15.0-1022-azure Intel(R) Xeon(R) Platinum 8370C CPU 2.80GHz constructor with buffer size = 10:Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative toSeq and construct2617 2622 7 3.8 261.7 1.0X construct directly 399406 11 25.1 39.9 6.6X OpenJDK 64-Bit Server VM 1.8.0_345-b01 on Linux 5.15.0-1022-azure Intel(R) Xeon(R) Platinum 8370C CPU 2.80GHz constructor with buffer size = 100: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative toSeq and construct 12512 12554 60 0.81251.2 1.0X construct directly 779781 2 12.8 77.9 16.1X OpenJDK 64-Bit Server VM 1.8.0_345-b01 on Linux 5.15.0-1022-azure Intel(R) Xeon(R) Platinum 8370C CPU 2.80GHz constructor with buffer size = 1000: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative toSeq and construct 108882 109400 732 0.1 10888.2 1.0X construct directly 5717 5731 20 1.7 571.7 19.0X ``` We can safely change `new GenericArrayData(buffer.toSeq)` to `new GenericArrayData(buffer)` due to `ArrayBuffer` is still `scala.collection.Seq` in Scala 2.13. On the other hand, when input is an empty set, using `Array.empty` is 10% faster than using `Seq.empty`. ### Does this PR introduce _any_ user-facing
[spark] branch master updated: [SPARK-41014][PYTHON][DOC] Improve documentation and typing of groupby and cogroup applyInPandas
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0eaa8e1e76a [SPARK-41014][PYTHON][DOC] Improve documentation and typing of groupby and cogroup applyInPandas 0eaa8e1e76a is described below commit 0eaa8e1e76ab6ecdd3b51d751857e50530ccdeb6 Author: Enrico Minack AuthorDate: Tue Nov 8 20:17:31 2022 +0900 [SPARK-41014][PYTHON][DOC] Improve documentation and typing of groupby and cogroup applyInPandas ### What changes were proposed in this pull request? Documentation of method `applyInPandas` for grouby and cogroup should mention in the main description that there are two allowed signatures for the provided function. The Examples state that this is possible, and the parameters sections states that for cogroup. Also type information for `PandasCogroupedMapFunction` should mention the three-argument callable alternative. ### Why are the changes needed? Better documenting these alternative to find this piece of information quicker. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Untested Closes #38509 from EnricoMi/branch-pyspark-group-and-cogroup-doc-and-typing. Authored-by: Enrico Minack Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/pandas/_typing/__init__.pyi | 5 - python/pyspark/sql/pandas/group_ops.py | 15 ++- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/pandas/_typing/__init__.pyi b/python/pyspark/sql/pandas/_typing/__init__.pyi index acca8c00f2a..69279727ca9 100644 --- a/python/pyspark/sql/pandas/_typing/__init__.pyi +++ b/python/pyspark/sql/pandas/_typing/__init__.pyi @@ -336,6 +336,9 @@ PandasMapIterFunction = Callable[[Iterable[DataFrameLike]], Iterable[DataFrameLi ArrowMapIterFunction = Callable[[Iterable[pyarrow.RecordBatch]], Iterable[pyarrow.RecordBatch]] -PandasCogroupedMapFunction = Callable[[DataFrameLike, DataFrameLike], DataFrameLike] +PandasCogroupedMapFunction = Union[ +Callable[[DataFrameLike, DataFrameLike], DataFrameLike], +Callable[[Any, DataFrameLike, DataFrameLike], DataFrameLike], +] GroupedMapPandasUserDefinedFunction = NewType("GroupedMapPandasUserDefinedFunction", FunctionType) diff --git a/python/pyspark/sql/pandas/group_ops.py b/python/pyspark/sql/pandas/group_ops.py index c34a285144f..bca96eaf205 100644 --- a/python/pyspark/sql/pandas/group_ops.py +++ b/python/pyspark/sql/pandas/group_ops.py @@ -114,7 +114,9 @@ class PandasGroupedOpsMixin: as a `DataFrame`. The function should take a `pandas.DataFrame` and return another -`pandas.DataFrame`. For each group, all columns are passed together as a `pandas.DataFrame` +`pandas.DataFrame`. Alternatively, the user can pass a function that takes +a tuple of the grouping key(s) and a `pandas.DataFrame`. +For each group, all columns are passed together as a `pandas.DataFrame` to the user-function and the returned `pandas.DataFrame` are combined as a :class:`DataFrame`. @@ -129,8 +131,9 @@ class PandasGroupedOpsMixin: Parameters -- func : function -a Python native function that takes a `pandas.DataFrame`, and outputs a -`pandas.DataFrame`. +a Python native function that takes a `pandas.DataFrame` and outputs a +`pandas.DataFrame`, or that takes one tuple (grouping keys) and a +`pandas.DataFrame` and outputs a `pandas.DataFrame`. schema : :class:`pyspark.sql.types.DataType` or str the return type of the `func` in PySpark. The value can be either a :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string. @@ -377,7 +380,9 @@ class PandasCogroupedOps: as a `DataFrame`. The function should take two `pandas.DataFrame`\\s and return another -`pandas.DataFrame`. For each side of the cogroup, all columns are passed together as a +`pandas.DataFrame`. Alternatively, the user can pass a function that takes +a tuple of the grouping key(s) and the two `pandas.DataFrame`\\s. +For each side of the cogroup, all columns are passed together as a `pandas.DataFrame` to the user-function and the returned `pandas.DataFrame` are combined as a :class:`DataFrame`. @@ -394,7 +399,7 @@ class PandasCogroupedOps: func : function a Python native function that takes two `pandas.DataFrame`\\s, and outputs a `pandas.DataFrame`, or that takes one tuple (grouping keys) and two -pandas ``DataFrame``\\s, and outputs a pandas ``DataFrame``. +``pandas.DataFrame``\\s, and outputs a
[spark] branch master updated: [SPARK-40994][DOCS][SQL] Add code example in JDBC data source with partitionColumn
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6754a243284 [SPARK-40994][DOCS][SQL] Add code example in JDBC data source with partitionColumn 6754a243284 is described below commit 6754a243284b41c0092508e7409323e43ef2ba25 Author: Cheng Su AuthorDate: Tue Nov 8 20:15:16 2022 +0900 [SPARK-40994][DOCS][SQL] Add code example in JDBC data source with partitionColumn ### What changes were proposed in this pull request? [For setting `partitionColumn, lowerBound, upperBound` in JDBC data source](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html), it'd better to have a code example to guide users for how to use it. Other sections are having code examples, so we'd better add it for these options as well. The added example is from our own error message - https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala#L141-L148 . ### Why are the changes needed? Better user documentation. ### Does this PR introduce _any_ user-facing change? Yes, the documentation itself. ### How was this patch tested? Verified the documentation rendered as expected. https://user-images.githubusercontent.com/4629931/199451458-a6df1f53-810b-4a41-9a1a-516608b12d1a.png;> Closes #38480 from c21/jdbc. Authored-by: Cheng Su Signed-off-by: Hyukjin Kwon --- docs/sql-data-sources-jdbc.md | 13 - 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/docs/sql-data-sources-jdbc.md b/docs/sql-data-sources-jdbc.md index 833e0805ec6..1ce411db190 100644 --- a/docs/sql-data-sources-jdbc.md +++ b/docs/sql-data-sources-jdbc.md @@ -149,7 +149,18 @@ logging into the data sources. partitionColumn must be a numeric, date, or timestamp column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be - partitioned and returned. This option applies only to reading. + partitioned and returned. This option applies only to reading. + Example: + + spark.read.format("jdbc") + .option("url", jdbcUrl) + .option("dbtable", "(select c1, c2 from t1) as subq") + .option("partitionColumn", "c1") + .option("lowerBound", "1") + .option("upperBound", "100") + .option("numPartitions", "3") + .load() + read - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-40798][SQL][TESTS][FOLLOW-UP] Disable ANSI at the test case for DSv2
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5ae42456f3a [SPARK-40798][SQL][TESTS][FOLLOW-UP] Disable ANSI at the test case for DSv2 5ae42456f3a is described below commit 5ae42456f3aa96b9a7966f4c20582afc89294c06 Author: Hyukjin Kwon AuthorDate: Tue Nov 8 20:13:18 2022 +0900 [SPARK-40798][SQL][TESTS][FOLLOW-UP] Disable ANSI at the test case for DSv2 ### What changes were proposed in this pull request? This PR disables ANSI at the test case for DSv2 because such partition casting wasn't supported in the legacy behaviour as well (with ANSI mode on). See also https://github.com/apache/spark/pull/38547#discussion_r1016111632 ### Why are the changes needed? To recover the ANSI enabled tests. Currently it fails as below: https://github.com/apache/spark/actions/runs/3406894388/jobs/5665999638 ``` - ALTER TABLE .. ADD PARTITION using V2 catalog V2 command: SPARK-40798: Alter partition should verify partition value - legacy *** FAILED *** (18 milliseconds) org.apache.spark.SparkNumberFormatException: [CAST_INVALID_INPUT] The value 'aaa' of the type "STRING" cannot be cast to "INT" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error. == SQL(line 1, position 1) == ALTER TABLE test_catalog.ns.tbl ADD PARTITION (p='aaa') ^^^ at org.apache.spark.sql.errors.QueryExecutionErrors$.invalidInputInCastToNumberError(QueryExecutionErrors.scala:161) at org.apache.spark.sql.catalyst.util.UTF8StringUtils$.withException(UTF8StringUtils.scala:51) at org.apache.spark.sql.catalyst.util.UTF8StringUtils$.toIntExact(UTF8StringUtils.scala:34) at org.apache.spark.sql.catalyst.expressions.Cast.$anonfun$castToInt$2(Cast.scala:927) at org.apache.spark.sql.catalyst.expressions.Cast.$anonfun$castToInt$2$adapted(Cast.scala:927) at org.apache.spark.sql.catalyst.expressions.Cast.buildCast(Cast.scala:588) at org.apache.spark.sql.catalyst.expressions.Cast.$anonfun$castToInt$1(Cast.scala:927) at org.apache.spark.sql.catalyst.expressions.Cast.nullSafeEval(Cast.scala:1285) at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:526) at org.apache.spark.sql.catalyst.analysis.ResolvePartitionSpec$.$anonfun$convertToPartIdent$1(ResolvePartitionSpec.scala:83) at scala.collection.immutable.List.map(List.scala:293) at org.apache.spark.sql.catalyst.analysis.ResolvePartitionSpec$.convertToPartIdent(ResolvePartitionSpec.scala:79) at org.apache.spark.sql.catalyst.analysis.ResolvePartitionSpec$.org$apache$spark$sql$catalyst$analysis$ResolvePartitionSpec$$resolvePartitionSpec(ResolvePartitionSpec.scala:72) at org.apache.spark.sql.catalyst.analysis.ResolvePartitionSpec$$anonfun$apply$2$$anonfun$applyOrElse$1.applyOrElse(ResolvePartitionSpec.scala:49) at org.apache.spark.sql.catalyst.analysis.ResolvePartitionSpec$$anonfun$apply$2$$anonfun$applyOrElse$1.applyOrElse(ResolvePartitionSpec.scala:42) ``` ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Manually tested locally. Closes #38547 from HyukjinKwon/SPARK-40798-followup2. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- .../spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala index a9ab11e483f..c33d9b0101a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala @@ -129,7 +129,9 @@ class AlterTableAddPartitionSuite withNamespaceAndTable("ns", "tbl") { t => sql(s"CREATE TABLE $t (c int) $defaultUsing PARTITIONED BY (p int)") - withSQLConf(SQLConf.SKIP_TYPE_VALIDATION_ON_ALTER_PARTITION.key -> "true") { + withSQLConf( + SQLConf.SKIP_TYPE_VALIDATION_ON_ALTER_PARTITION.key -> "true", + SQLConf.ANSI_ENABLED.key -> "false") { sql(s"ALTER TABLE $t ADD PARTITION (p='aaa')") checkPartitions(t, Map("p" -> defaultPartitionName)) sql(s"ALTER TABLE $t DROP PARTITION (p=null)")
[spark] branch master updated: [SPARK-40973][SQL] Rename `_LEGACY_ERROR_TEMP_0055` to `UNCLOSED_BRACKETED_COMMENT`
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 31b923d50fa [SPARK-40973][SQL] Rename `_LEGACY_ERROR_TEMP_0055` to `UNCLOSED_BRACKETED_COMMENT` 31b923d50fa is described below commit 31b923d50fa6176312f7217069c0055cd778788f Author: itholic AuthorDate: Tue Nov 8 13:34:27 2022 +0300 [SPARK-40973][SQL] Rename `_LEGACY_ERROR_TEMP_0055` to `UNCLOSED_BRACKETED_COMMENT` ### What changes were proposed in this pull request? This PR proposes to introduce new error class `UNCLOSED_BRACKETED_COMMENT`, by updating the existing legacy temp error class `_LEGACY_ERROR_TEMP_0055 `. ### Why are the changes needed? We should use appropriate error class name that matches the error message. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The existing CI should pass. Closes #38447 from itholic/LEGACY_0055. Authored-by: itholic Signed-off-by: Max Gekk --- core/src/main/resources/error/error-classes.json | 10 +- .../org/apache/spark/sql/catalyst/parser/ParseDriver.scala | 5 - .../scala/org/apache/spark/sql/errors/QueryParsingErrors.scala | 10 +++--- .../org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala | 4 ++-- sql/core/src/test/resources/sql-tests/results/comments.sql.out | 4 ++-- .../org/apache/spark/sql/execution/SparkSqlParserSuite.scala | 4 ++-- .../org/apache/spark/sql/hive/thriftserver/CliSuite.scala | 3 ++- 7 files changed, 24 insertions(+), 16 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 107bf5ebd5a..e28e5208784 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -938,6 +938,11 @@ "Unable to convert SQL type to Protobuf type ." ] }, + "UNCLOSED_BRACKETED_COMMENT" : { +"message" : [ + "Found an unclosed bracketed comment. Please, append */ at the end of the comment." +] + }, "UNKNOWN_PROTOBUF_MESSAGE_TYPE" : { "message" : [ "Attempting to treat as a Message, but it was " @@ -1567,11 +1572,6 @@ "It is not allowed to add database prefix `` for the TEMPORARY view name." ] }, - "_LEGACY_ERROR_TEMP_0055" : { -"message" : [ - "Unclosed bracketed comment" -] - }, "_LEGACY_ERROR_TEMP_0056" : { "message" : [ "Invalid time travel spec: ." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala index 10a213373ad..727d35d5c91 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala @@ -429,7 +429,10 @@ case class UnclosedCommentProcessor( val failedToken = tokenStream.get(tokenStream.size() - 2) assert(failedToken.getType() == SqlBaseParser.BRACKETED_COMMENT) val position = Origin(Option(failedToken.getLine), Option(failedToken.getCharPositionInLine)) - throw QueryParsingErrors.unclosedBracketedCommentError(command, position) + throw QueryParsingErrors.unclosedBracketedCommentError( +command = command, +start = Origin(Option(failedToken.getStartIndex)), +stop = Origin(Option(failedToken.getStopIndex))) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala index 1fce265bece..0fcf6edcbdf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala @@ -601,9 +601,13 @@ private[sql] object QueryParsingErrors extends QueryErrorsBase { ctx) } - def unclosedBracketedCommentError(command: String, position: Origin): Throwable = { -new ParseException(Some(command), "Unclosed bracketed comment", position, position, - Some("_LEGACY_ERROR_TEMP_0055")) + def unclosedBracketedCommentError(command: String, start: Origin, stop: Origin): Throwable = { +new ParseException( + command = Some(command), + start = start, + stop = stop, + errorClass = "UNCLOSED_BRACKETED_COMMENT", + messageParameters = Map.empty) } def invalidTimeTravelSpec(reason: String, ctx: ParserRuleContext): Throwable = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
[spark] branch master updated (75643f4e9b0 -> 3bbf0f35b7a)
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 75643f4e9b0 [SPARK-41015][SQL][PROTOBUF] UnitTest null check for data generator add 3bbf0f35b7a [SPARK-41027][SQL] Use `UNEXPECTED_INPUT_TYPE` instead of `MAP_FROM_ENTRIES_WRONG_TYPE` No new revisions were added by this update. Summary of changes: core/src/main/resources/error/error-classes.json | 5 - .../spark/sql/catalyst/expressions/collectionOperations.scala | 9 + .../sql/catalyst/expressions/CollectionExpressionsSuite.scala | 10 ++ .../scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala | 9 + 4 files changed, 16 insertions(+), 17 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-41015][SQL][PROTOBUF] UnitTest null check for data generator
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 75643f4e9b0 [SPARK-41015][SQL][PROTOBUF] UnitTest null check for data generator 75643f4e9b0 is described below commit 75643f4e9b0622f8d6848a155e23e5f44c9d Author: SandishKumarHN AuthorDate: Tue Nov 8 13:18:54 2022 +0300 [SPARK-41015][SQL][PROTOBUF] UnitTest null check for data generator ### What changes were proposed in this pull request? null check for data generator after type conversion. ### Why are the changes needed? To fix a test failure. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I have tested all the random numbers manually, current unit tests. Closes #38515 from SandishKumarHN/SPARK-41015-UTests. Authored-by: SandishKumarHN Signed-off-by: Max Gekk --- .../spark/sql/protobuf/utils/ProtobufUtils.scala | 2 +- .../src/test/resources/protobuf/basicmessage.proto | 1 + .../resources/protobuf/basicmessage_noimports.desc | 18 + .../ProtobufCatalystDataConversionSuite.scala | 13 .../sql/protobuf/ProtobufFunctionsSuite.scala | 14 + .../spark/sql/protobuf/ProtobufSerdeSuite.scala| 23 ++ .../spark/sql/errors/QueryCompilationErrors.scala | 4 ++-- 7 files changed, 68 insertions(+), 7 deletions(-) diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala index 4bd59ddce6c..52870be5fbe 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala @@ -208,7 +208,7 @@ private[sql] object ProtobufUtils extends Logging { ).toList fileDescriptorList } catch { - case e: Descriptors.DescriptorValidationException => + case e: Exception => throw QueryCompilationErrors.failedParsingDescriptorError(descFilePath, e) } } diff --git a/connector/protobuf/src/test/resources/protobuf/basicmessage.proto b/connector/protobuf/src/test/resources/protobuf/basicmessage.proto index 4252f349cf0..8f4c1bb8eae 100644 --- a/connector/protobuf/src/test/resources/protobuf/basicmessage.proto +++ b/connector/protobuf/src/test/resources/protobuf/basicmessage.proto @@ -17,6 +17,7 @@ // cd connector/protobuf/src/test/resources/protobuf // protoc --java_out=./ basicmessage.proto // protoc --include_imports --descriptor_set_out=basicmessage.desc --java_out=org/apache/spark/sql/protobuf/ basicmessage.proto +// protoc --descriptor_set_out=basicmessage_noimports.desc --java_out=org/apache/spark/sql/protobuf/ basicmessage.proto syntax = "proto3"; diff --git a/connector/protobuf/src/test/resources/protobuf/basicmessage_noimports.desc b/connector/protobuf/src/test/resources/protobuf/basicmessage_noimports.desc new file mode 100644 index 000..26ba6552cb0 --- /dev/null +++ b/connector/protobuf/src/test/resources/protobuf/basicmessage_noimports.desc @@ -0,0 +1,18 @@ + +� +basicmessage.proto$org.apache.spark.sql.protobuf.protosnestedenum.proto"� +BasicMessage +id (Rid! +string_value ( RstringValue +int32_value (R +int32Value +int64_value (R +int64Value! +double_value (RdoubleValue +float_value (R +floatValue + +bool_value (R boolValue +bytes_value (R +bytesValueS +rnested_enum (20.org.apache.spark.sql.protobuf.protos.NestedEnumR rnestedEnumBBBasicMessageProtobproto3 \ No newline at end of file diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala index 271c5b0fec8..9f9b51006ca 100644 --- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala +++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala @@ -123,16 +123,21 @@ class ProtobufCatalystDataConversionSuite StringType -> ("StringMsg", "")) testingTypes.foreach { dt => -val seed = 1 + scala.util.Random.nextInt((1024 - 1) + 1) +val seed = scala.util.Random.nextInt(RandomDataGenerator.MAX_STR_LEN) test(s"single $dt with seed $seed") { val (messageName, defaultValue) = catalystTypesToProtoMessages(dt.fields(0).dataType) val rand = new scala.util.Random(seed) val generator = RandomDataGenerator.forType(dt, rand = rand).get - var data = generator() - while (data.asInstanceOf[Row].get(0)
[spark] branch master updated: [SPARK-40984][CORE][SQL] Use `NON_FOLDABLE_INPUT` instead of `FRAME_LESS_OFFSET_WITHOUT_FOLDABLE`
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5c6c5e64ade [SPARK-40984][CORE][SQL] Use `NON_FOLDABLE_INPUT` instead of `FRAME_LESS_OFFSET_WITHOUT_FOLDABLE` 5c6c5e64ade is described below commit 5c6c5e64ade5310ce84ec9f6eda3a50074324903 Author: yangjie01 AuthorDate: Tue Nov 8 13:13:00 2022 +0300 [SPARK-40984][CORE][SQL] Use `NON_FOLDABLE_INPUT` instead of `FRAME_LESS_OFFSET_WITHOUT_FOLDABLE` ### What changes were proposed in this pull request? This pr aims replace `FRAME_LESS_OFFSET_WITHOUT_FOLDABLE` with `NON_FOLDABLE_INPUT` to clean up similar error subclass. ### Why are the changes needed? `FRAME_LESS_OFFSET_WITHOUT_FOLDABLE` and `NON_FOLDABLE_INPUT` look similar, but `NON_FOLDABLE_INPUT` is more general ### Does this PR introduce _any_ user-facing change? Yes. The PR changes user-facing error messages. For example, the error message will change from ``` Offset expression "(- nonfoldableliteral())\" must be a literal. ``` to ``` the input offset should be a foldable "INT" expression; however, got "(- nonfoldableliteral())\". ``` ### How was this patch tested? Pass Github Actions Closes #38536 from LuciferYang/SPARK-40984. Authored-by: yangjie01 Signed-off-by: Max Gekk --- core/src/main/resources/error/error-classes.json | 5 - .../apache/spark/sql/catalyst/expressions/windowExpressions.scala | 6 -- .../spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala | 8 ++-- .../org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala | 7 +-- 4 files changed, 15 insertions(+), 11 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index ceb3e4ed5b1..2cbcc165982 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -209,11 +209,6 @@ "Input to should all be the same type, but it's ." ] }, - "FRAME_LESS_OFFSET_WITHOUT_FOLDABLE" : { -"message" : [ - "Offset expression must be a literal." -] - }, "HASH_MAP_TYPE" : { "message" : [ "Input to the function cannot contain elements of the \"MAP\" type. In Spark, same maps may have different hashcode, thus hash expressions are prohibited on \"MAP\" elements. To restore previous behavior set \"spark.sql.legacy.allowHashOnMapType\" to \"true\"." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala index 511a124fd1a..353ab22b5a5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala @@ -452,9 +452,11 @@ sealed abstract class FrameLessOffsetWindowFunction check } else if (!offset.foldable) { DataTypeMismatch( -errorSubClass = "FRAME_LESS_OFFSET_WITHOUT_FOLDABLE", +errorSubClass = "NON_FOLDABLE_INPUT", messageParameters = Map( - "offset" -> toSQLExpr(offset) + "inputName" -> "offset", + "inputType" -> toSQLType(offset.dataType), + "inputExpr" -> toSQLExpr(offset) ) ) } else { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala index b192f12d569..256cf439b65 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ExpressionTypeCheckingSuite.scala @@ -770,8 +770,12 @@ class ExpressionTypeCheckingSuite extends SparkFunSuite with SQLHelper with Quer val lag = Lag(Literal(1), NonFoldableLiteral(10), Literal(null), true) assert(lag.checkInputDataTypes() == DataTypeMismatch( -errorSubClass = "FRAME_LESS_OFFSET_WITHOUT_FOLDABLE", -messageParameters = Map("offset" -> "\"(- nonfoldableliteral())\"") +errorSubClass = "NON_FOLDABLE_INPUT", +messageParameters = Map( + "inputName" -> "offset", + "inputType" -> "\"INT\"", + "inputExpr" -> "\"(- nonfoldableliteral())\"" +) )) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala index 5a543547708..990c1e1b2de
[spark] branch master updated: [SPARK-40663][SQL][FOLLOWUP] `SparkIllegalArgumentException` should accept `cause`
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 755ed4512dd [SPARK-40663][SQL][FOLLOWUP] `SparkIllegalArgumentException` should accept `cause` 755ed4512dd is described below commit 755ed4512dded18406f77dc4708d11c16dc99b21 Author: itholic AuthorDate: Tue Nov 8 13:03:45 2022 +0300 [SPARK-40663][SQL][FOLLOWUP] `SparkIllegalArgumentException` should accept `cause` ### What changes were proposed in this pull request? This PR proposes to enable `SparkIllegalArgumentException` accept `cause` parameter. ### Why are the changes needed? The original error message generated by `IllegalArgumentException` can accept the `cause` parameter, but `SparkIllegalArgumentException` cannot, so there it some regression was made that we should enable back. e.g. https://github.com/apache/spark/pull/38123/files#r1014645753 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? ``` ./build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite*" ``` Closes #38548 from itholic/SPARK-40633-followup. Authored-by: itholic Signed-off-by: Max Gekk --- core/src/main/scala/org/apache/spark/SparkException.scala | 5 +++-- .../scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala| 6 -- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkException.scala b/core/src/main/scala/org/apache/spark/SparkException.scala index 9eecb5e2727..03938444e12 100644 --- a/core/src/main/scala/org/apache/spark/SparkException.scala +++ b/core/src/main/scala/org/apache/spark/SparkException.scala @@ -258,9 +258,10 @@ private[spark] class SparkIllegalArgumentException( errorClass: String, messageParameters: Map[String, String], context: Array[QueryContext] = Array.empty, -summary: String = "") +summary: String = "", +cause: Throwable = null) extends IllegalArgumentException( -SparkThrowableHelper.getMessage(errorClass, messageParameters, summary)) +SparkThrowableHelper.getMessage(errorClass, messageParameters, summary), cause) with SparkThrowable { override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 73e0f580727..aad9a9ba51e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -367,7 +367,8 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { errorClass = "_LEGACY_ERROR_TEMP_2008", messageParameters = Map( "url" -> url.toString, -"ansiConfig" -> toSQLConf(SQLConf.ANSI_ENABLED.key))) +"ansiConfig" -> toSQLConf(SQLConf.ANSI_ENABLED.key)), + cause = e) } def illegalUrlError(url: UTF8String): Throwable = { @@ -1226,7 +1227,8 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { stats: String, e: NumberFormatException): SparkIllegalArgumentException = { new SparkIllegalArgumentException( errorClass = "_LEGACY_ERROR_TEMP_2113", - messageParameters = Map("stats" -> stats)) + messageParameters = Map("stats" -> stats), + cause = e) } def statisticNotRecognizedError(stats: String): SparkIllegalArgumentException = { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-38959][SQL][FOLLOW-UP] Address feedback for RowLevelOperationRuntimeGroupFiltering
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7429223cfd6 [SPARK-38959][SQL][FOLLOW-UP] Address feedback for RowLevelOperationRuntimeGroupFiltering 7429223cfd6 is described below commit 7429223cfd6c53f9d847d58e43190d2a0311f6c4 Author: aokolnychyi AuthorDate: Tue Nov 8 16:24:59 2022 +0800 [SPARK-38959][SQL][FOLLOW-UP] Address feedback for RowLevelOperationRuntimeGroupFiltering ### What changes were proposed in this pull request? This PR is to address the feedback on PR #36304 after that change was merged. ### Why are the changes needed? These changes are needed for better code quality. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #38526 from aokolnychyi/spark-38959-follow-up. Authored-by: aokolnychyi Signed-off-by: Wenchen Fan --- .../dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala index 232c320bcd4..d9dad43532e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala @@ -78,9 +78,8 @@ case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[Logic // clone the relation and assign new expr IDs to avoid conflicts matchingRowsPlan transformUpWithNewOutput { case r: DataSourceV2Relation if r eq relation => -val oldOutput = r.output -val newOutput = oldOutput.map(_.newInstance()) -r.copy(output = newOutput) -> oldOutput.zip(newOutput) +val newRelation = r.newInstance() +newRelation -> r.output.zip(newRelation.output) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org