[spark] branch master updated (2adb8e12f73 -> bd270830614)
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 2adb8e12f73 [SPARK-44000][SQL][FOLLOWUP] Add comments when picking build side for BNLJ add bd270830614 [SPARK-43935][SQL][PYTHON][CONNECT] Add xpath_* functions to Scala and Python No new revisions were added by this update. Summary of changes: .../scala/org/apache/spark/sql/functions.scala | 87 ++ .../apache/spark/sql/PlanGenerationTestSuite.scala | 36 ++ .../explain-results/function_xpath.explain | 2 + .../explain-results/function_xpath_boolean.explain | 2 + .../explain-results/function_xpath_double.explain | 2 + .../explain-results/function_xpath_float.explain | 2 + .../explain-results/function_xpath_int.explain | 2 + .../explain-results/function_xpath_long.explain| 2 + .../explain-results/function_xpath_number.explain | 2 + .../explain-results/function_xpath_short.explain | 2 + .../explain-results/function_xpath_string.explain | 2 + .../query-tests/queries/function_xpath.json| 29 + .../query-tests/queries/function_xpath.proto.bin | Bin 0 -> 136 bytes .../queries/function_xpath_boolean.json| 29 + .../queries/function_xpath_boolean.proto.bin | Bin 0 -> 137 bytes .../query-tests/queries/function_xpath_double.json | 29 + .../queries/function_xpath_double.proto.bin| Bin 0 -> 136 bytes .../query-tests/queries/function_xpath_float.json | 29 + .../queries/function_xpath_float.proto.bin | Bin 0 -> 135 bytes .../query-tests/queries/function_xpath_int.json| 29 + .../queries/function_xpath_int.proto.bin | Bin 0 -> 132 bytes .../query-tests/queries/function_xpath_long.json | 29 + .../queries/function_xpath_long.proto.bin | Bin 0 -> 133 bytes .../query-tests/queries/function_xpath_number.json | 29 + .../queries/function_xpath_number.proto.bin| Bin 0 -> 136 bytes .../query-tests/queries/function_xpath_short.json | 29 + .../queries/function_xpath_short.proto.bin | Bin 0 -> 135 bytes .../query-tests/queries/function_xpath_string.json | 29 + .../queries/function_xpath_string.proto.bin| Bin 0 -> 136 bytes .../source/reference/pyspark.sql/functions.rst | 15 +++ python/pyspark/sql/connect/functions.py| 63 ++ python/pyspark/sql/functions.py| 133 + .../scala/org/apache/spark/sql/functions.scala | 97 +++ .../org/apache/spark/sql/XPathFunctionsSuite.scala | 17 +++ 34 files changed, 727 insertions(+) create mode 100644 connector/connect/common/src/test/resources/query-tests/explain-results/function_xpath.explain create mode 100644 connector/connect/common/src/test/resources/query-tests/explain-results/function_xpath_boolean.explain create mode 100644 connector/connect/common/src/test/resources/query-tests/explain-results/function_xpath_double.explain create mode 100644 connector/connect/common/src/test/resources/query-tests/explain-results/function_xpath_float.explain create mode 100644 connector/connect/common/src/test/resources/query-tests/explain-results/function_xpath_int.explain create mode 100644 connector/connect/common/src/test/resources/query-tests/explain-results/function_xpath_long.explain create mode 100644 connector/connect/common/src/test/resources/query-tests/explain-results/function_xpath_number.explain create mode 100644 connector/connect/common/src/test/resources/query-tests/explain-results/function_xpath_short.explain create mode 100644 connector/connect/common/src/test/resources/query-tests/explain-results/function_xpath_string.explain create mode 100644 connector/connect/common/src/test/resources/query-tests/queries/function_xpath.json create mode 100644 connector/connect/common/src/test/resources/query-tests/queries/function_xpath.proto.bin create mode 100644 connector/connect/common/src/test/resources/query-tests/queries/function_xpath_boolean.json create mode 100644 connector/connect/common/src/test/resources/query-tests/queries/function_xpath_boolean.proto.bin create mode 100644 connector/connect/common/src/test/resources/query-tests/queries/function_xpath_double.json create mode 100644 connector/connect/common/src/test/resources/query-tests/queries/function_xpath_double.proto.bin create mode 100644 connector/connect/common/src/test/resources/query-tests/queries/function_xpath_float.json create mode 100644 connector/connect/common/src/test/resources/query-tests/queries/function_xpath_float.proto.bin create mode 100644 connector/connect/common/src/test/resources/query-tests/queries/function_xpath_int.json create mode 100644 connector/connect/common/src/test/resources/query-tests/queries/function_xpath_int.proto.bin create mode 100644
[spark] branch master updated (d88633ada5e -> 2adb8e12f73)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from d88633ada5e [SPARK-44000][SQL] Add hint to disable broadcasting and replicating one side of join add 2adb8e12f73 [SPARK-44000][SQL][FOLLOWUP] Add comments when picking build side for BNLJ No new revisions were added by this update. Summary of changes: .../main/scala/org/apache/spark/sql/execution/SparkStrategies.scala | 4 1 file changed, 4 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-44000][SQL] Add hint to disable broadcasting and replicating one side of join
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new d88633ada5e [SPARK-44000][SQL] Add hint to disable broadcasting and replicating one side of join d88633ada5e is described below commit d88633ada5eb73e8876acaa2c2a53b9596f2acdd Author: aokolnychyi AuthorDate: Wed Jun 7 20:15:05 2023 -0700 [SPARK-44000][SQL] Add hint to disable broadcasting and replicating one side of join ### What changes were proposed in this pull request? This PR adds a new internal join hint to disable broadcasting and replicating one side of join. ### Why are the changes needed? These changes are needed to disable broadcasting and replicating one side of join when it is not permitted, such as the cardinality check in MERGE operations in PR #41448. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This PR comes with tests. More tests are in #41448. Closes #41499 from aokolnychyi/spark-44000. Authored-by: aokolnychyi Signed-off-by: Dongjoon Hyun --- .../spark/sql/catalyst/optimizer/joins.scala | 34 +++- .../spark/sql/catalyst/plans/logical/hints.scala | 10 .../spark/sql/execution/SparkStrategies.scala | 29 +++--- .../scala/org/apache/spark/sql/JoinSuite.scala | 64 +- 4 files changed, 127 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 48b4007a897..8f03b93dce7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -341,6 +341,16 @@ trait JoinSelectionHelper { ) } + def getBroadcastNestedLoopJoinBuildSide(hint: JoinHint): Option[BuildSide] = { +if (hintToNotBroadcastAndReplicateLeft(hint)) { + Some(BuildRight) +} else if (hintToNotBroadcastAndReplicateRight(hint)) { + Some(BuildLeft) +} else { + None +} + } + def getSmallerSide(left: LogicalPlan, right: LogicalPlan): BuildSide = { if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft } @@ -413,11 +423,19 @@ trait JoinSelectionHelper { } def hintToNotBroadcastLeft(hint: JoinHint): Boolean = { -hint.leftHint.exists(_.strategy.contains(NO_BROADCAST_HASH)) +hint.leftHint.flatMap(_.strategy).exists { + case NO_BROADCAST_HASH => true + case NO_BROADCAST_AND_REPLICATION => true + case _ => false +} } def hintToNotBroadcastRight(hint: JoinHint): Boolean = { -hint.rightHint.exists(_.strategy.contains(NO_BROADCAST_HASH)) +hint.rightHint.flatMap(_.strategy).exists { + case NO_BROADCAST_HASH => true + case NO_BROADCAST_AND_REPLICATION => true + case _ => false +} } def hintToShuffleHashJoinLeft(hint: JoinHint): Boolean = { @@ -454,6 +472,18 @@ trait JoinSelectionHelper { hint.rightHint.exists(_.strategy.contains(SHUFFLE_REPLICATE_NL)) } + def hintToNotBroadcastAndReplicate(hint: JoinHint): Boolean = { +hintToNotBroadcastAndReplicateLeft(hint) || hintToNotBroadcastAndReplicateRight(hint) + } + + def hintToNotBroadcastAndReplicateLeft(hint: JoinHint): Boolean = { +hint.leftHint.exists(_.strategy.contains(NO_BROADCAST_AND_REPLICATION)) + } + + def hintToNotBroadcastAndReplicateRight(hint: JoinHint): Boolean = { +hint.rightHint.exists(_.strategy.contains(NO_BROADCAST_AND_REPLICATION)) + } + private def getBuildSide( canBuildLeft: Boolean, canBuildRight: Boolean, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala index 5dc3eb707f6..b17bab7849b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala @@ -187,6 +187,16 @@ case object PREFER_SHUFFLE_HASH extends JoinStrategyHint { override def hintAliases: Set[String] = Set.empty } +/** + * An internal hint to prohibit broadcasting and replicating one side of a join. This hint is used + * by some rules where broadcasting or replicating a particular side of the join is not permitted, + * such as the cardinality check in MERGE operations. + */ +case object NO_BROADCAST_AND_REPLICATION extends JoinStrategyHint { + override def displayName: String = "no_broadcast_and_replication" + override def hintAliases: Set[String] = Set.empty +} + /** * The callback for implementing customized strategies
[spark] branch master updated: Revert "[SPARK-43840][INFRA] Switch `scala-213` GitHub Action Job to `scala-212`"
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new caf905d7b37 Revert "[SPARK-43840][INFRA] Switch `scala-213` GitHub Action Job to `scala-212`" caf905d7b37 is described below commit caf905d7b37198a40a299c17033063fe3dd3eb6a Author: yangjie01 AuthorDate: Wed Jun 7 20:08:09 2023 -0700 Revert "[SPARK-43840][INFRA] Switch `scala-213` GitHub Action Job to `scala-212`" ### What changes were proposed in this pull request? This pr revert change of SPARK-43840, Spark 3.5.0 still use Scala 2.12 as default, so we need build check for Scala 2.13 for pull request. ### Why are the changes needed? Restore pipeline check for Scala 2.13. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions Closes #41506 from LuciferYang/r-43840. Authored-by: yangjie01 Signed-off-by: Dongjoon Hyun --- .github/workflows/build_and_test.yml | 18 +- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 8aa0f42916e..a373b0e76e7 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -86,7 +86,7 @@ jobs: sparkr=`./dev/is-changed.py -m sparkr` tpcds=`./dev/is-changed.py -m sql` docker=`./dev/is-changed.py -m docker-integration-tests` - # 'build', 'scala-212', and 'java-11-17' are always true for now. + # 'build', 'scala-213', and 'java-11-17' are always true for now. # It does not save significant time and most of PRs trigger the build. precondition=" { @@ -95,7 +95,7 @@ jobs: \"sparkr\": \"$sparkr\", \"tpcds-1g\": \"$tpcds\", \"docker-integration-tests\": \"$docker\", - \"scala-212\": \"true\", + \"scala-213\": \"true\", \"java-11-17\": \"true\", \"lint\" : \"true\", \"k8s-integration-tests\" : \"true\", @@ -728,10 +728,10 @@ jobs: ./build/mvn $MAVEN_CLI_OPTS -DskipTests -Pyarn -Pmesos -Pkubernetes -Pvolcano -Phive -Phive-thriftserver -Phadoop-cloud -Djava.version=${JAVA_VERSION/-ea} install rm -rf ~/.m2/repository/org/apache/spark - scala-212: + scala-213: needs: precondition -if: fromJson(needs.precondition.outputs.required).scala-212 == 'true' -name: Scala 2.12 build with SBT +if: fromJson(needs.precondition.outputs.required).scala-213 == 'true' +name: Scala 2.13 build with SBT runs-on: ubuntu-22.04 steps: - name: Checkout Spark repository @@ -761,9 +761,9 @@ jobs: uses: actions/cache@v3 with: path: ~/.cache/coursier -key: scala-212-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }} +key: scala-213-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') }} restore-keys: | - scala-212-coursier- + scala-213-coursier- - name: Install Java 8 uses: actions/setup-java@v3 with: @@ -771,8 +771,8 @@ jobs: java-version: 8 - name: Build with SBT run: | -./dev/change-scala-version.sh 2.12 -./build/sbt -Pyarn -Pmesos -Pkubernetes -Pvolcano -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pdocker-integration-tests -Pkubernetes-integration-tests -Pspark-ganglia-lgpl -Pscala-2.12 compile Test/compile +./dev/change-scala-version.sh 2.13 +./build/sbt -Pyarn -Pmesos -Pkubernetes -Pvolcano -Phive -Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pdocker-integration-tests -Pkubernetes-integration-tests -Pspark-ganglia-lgpl -Pscala-2.13 compile Test/compile # Any TPC-DS related updates on this job need to be applied to tpcds-1g-gen job of benchmark.yml as well tpcds-1g: - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-43920][SQL][CONNECT] Create sql/api module
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 15202e53195 [SPARK-43920][SQL][CONNECT] Create sql/api module 15202e53195 is described below commit 15202e531957849e9eaefcaa6fa1c522a8967d80 Author: Rui Wang AuthorDate: Wed Jun 7 20:53:02 2023 -0400 [SPARK-43920][SQL][CONNECT] Create sql/api module ### What changes were proposed in this pull request? We need a sql/api module to host public API like DataType, Row, etc. This module can be shared between Catalyst and Spark Connect client so that client do not need to depend on Catalyst anymore. ### Why are the changes needed? Towards Spark Connect client do not need to depend on Catalyst anymore. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? N/A Closes #41426 from amaliujia/add_sql_api. Authored-by: Rui Wang Signed-off-by: Herman van Hovell --- pom.xml | 1 + project/SparkBuild.scala | 6 +++--- sql/api/pom.xml | 45 + 3 files changed, 49 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 175df1722e6..3c87da45bea 100644 --- a/pom.xml +++ b/pom.xml @@ -89,6 +89,7 @@ mllib-local tools streaming +sql/api sql/catalyst sql/core sql/hive diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index c4c19c65bf1..023ce4ba81c 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -58,10 +58,10 @@ object BuildCommons { val allProjects@Seq( core, graphx, mllib, mllibLocal, repl, networkCommon, networkShuffle, launcher, unsafe, tags, sketch, kvstore, -commonUtils, _* +commonUtils, sqlApi, _* ) = Seq( "core", "graphx", "mllib", "mllib-local", "repl", "network-common", "network-shuffle", "launcher", "unsafe", -"tags", "sketch", "kvstore", "common-utils" +"tags", "sketch", "kvstore", "common-utils", "sql-api" ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects ++ Seq(connectCommon, connect, connectClient) val optionallyEnabledProjects@Seq(kubernetes, mesos, yarn, @@ -408,7 +408,7 @@ object SparkBuild extends PomBuild { Seq( spark, hive, hiveThriftServer, repl, networkCommon, networkShuffle, networkYarn, unsafe, tags, tokenProviderKafka010, sqlKafka010, connectCommon, connect, connectClient, protobuf, - commonUtils + commonUtils, sqlApi ).contains(x) } diff --git a/sql/api/pom.xml b/sql/api/pom.xml new file mode 100644 index 000..9d100b1130e --- /dev/null +++ b/sql/api/pom.xml @@ -0,0 +1,45 @@ + + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + +org.apache.spark +spark-parent_2.12 +3.5.0-SNAPSHOT +../../pom.xml + + +spark-sql-api_2.12 +jar +Spark Project SQL API +https://spark.apache.org/ + +sql-api + + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + + \ No newline at end of file - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (0c7b4306c7c -> 22297345e45)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 0c7b4306c7c [SPARK-43540][K8S][CORE] Add working directory into classpath on the driver in K8S cluster mode add 22297345e45 [SPARK-44002][CONNECT] Fix the artifact statuses handler No new revisions were added by this update. Summary of changes: .../src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala | 8 +--- .../sql/connect/service/SparkConnectArtifactStatusesHandler.scala | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-43540][K8S][CORE] Add working directory into classpath on the driver in K8S cluster mode
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0c7b4306c7c [SPARK-43540][K8S][CORE] Add working directory into classpath on the driver in K8S cluster mode 0c7b4306c7c is described below commit 0c7b4306c7c5fbdd6c54f8172f82e1d23e3b Author: fwang12 AuthorDate: Wed Jun 7 15:38:46 2023 -0700 [SPARK-43540][K8S][CORE] Add working directory into classpath on the driver in K8S cluster mode ### What changes were proposed in this pull request? Adding working directory into classpath on the driver in K8S cluster mode. ### Why are the changes needed? After #37417, the spark.files, spark.jars are placed in the working directory. But seems that the spark context classloader can not access them because they are not in the classpath by default. This pr adds the current working directory into classpath, so that the spark.files, spark.jars placed in the working directory can be accessible by the classloader. For example, the `hive-site.xml` uploaded by `spark.files`. ### Does this PR introduce _any_ user-facing change? yes, users do not need to add the working directory into spark classpath manually. ### How was this patch tested? UT. Closes #41201 from turboFei/work_dir_classpath. Authored-by: fwang12 Signed-off-by: Dongjoon Hyun --- .../src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 12 +++- .../scala/org/apache/spark/deploy/SparkSubmitSuite.scala | 12 .../docker/src/main/dockerfiles/spark/entrypoint.sh | 3 +++ 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index e1d616b9b83..8f9477385e7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -414,6 +414,9 @@ private[spark] class SparkSubmit extends Logging { // directory too. // SPARK-33782 : This downloads all the files , jars , archiveFiles and pyfiles to current // working directory +// SPARK-43540: add current working directory into driver classpath +val workingDirectory = "." +childClasspath += workingDirectory def downloadResourcesToCurrentDirectory(uris: String, isArchive: Boolean = false): String = { val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI) @@ -423,13 +426,12 @@ private[spark] class SparkSubmit extends Logging { targetDir, sparkConf, hadoopConf) Utils.stringToSeq(localResources).map(Utils.resolveURI).zip(resolvedUris).map { case (localResources, resolvedUri) => - val source = new File(localResources.getPath) + val source = new File(localResources.getPath).getCanonicalFile val dest = new File( -".", +workingDirectory, if (resolvedUri.getFragment != null) resolvedUri.getFragment else source.getName) - logInfo( -s"Files $resolvedUri " + - s"from ${source.getAbsolutePath} to ${dest.getAbsolutePath}") +.getCanonicalFile + logInfo(s"Files $resolvedUri from $source to $dest") Utils.deleteRecursively(dest) if (isArchive) { Utils.unpack(source, dest) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 44c35ed70e0..8e2d6e6cf5f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -1618,6 +1618,18 @@ class SparkSubmitSuite conf.get(k) should be (v) } } + + test("SPARK-43540: Add working directory into classpath on the driver in K8S cluster mode") { +val clArgs = Seq( + "--deploy-mode", "client", + "--master", "k8s://host:port", + "--class", "org.SomeClass", + "--conf", "spark.kubernetes.submitInDriver=true", + "/home/thejar.jar") +val appArgs = new SparkSubmitArguments(clArgs) +val (_, classpath, _, _) = submit.prepareSubmitEnvironment(appArgs) +assert(classpath.contains(".")) + } } object JarCreationTest extends Logging { diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index 42f4df88f3d..f9561b9aa4e 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++
[spark] branch master updated: [SPARK-43933][SQL][PYTHON][CONNECT] Add linear regression aggregate functions to Scala and Python
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2057eb7e203 [SPARK-43933][SQL][PYTHON][CONNECT] Add linear regression aggregate functions to Scala and Python 2057eb7e203 is described below commit 2057eb7e203c9fde3f4fa13d5f04225cf6e49a87 Author: Jiaan Geng AuthorDate: Wed Jun 7 22:56:13 2023 +0800 [SPARK-43933][SQL][PYTHON][CONNECT] Add linear regression aggregate functions to Scala and Python ### What changes were proposed in this pull request? Based HyukjinKwon 's suggestion, this PR want add linear regression aggregate functions to Scala and Python API. These functions show below. - `regr_avgx` - `regr_avgy` - `regr_count` - `regr_intercept` - `regr_r2` - `regr_slope` - `regr_sxx` - `regr_sxy` - `regr_syy` ### Why are the changes needed? Add linear regression aggregate functions to Scala and Python API ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? New test cases. Closes #41474 from beliefer/SPARK-43933. Authored-by: Jiaan Geng Signed-off-by: Ruifeng Zheng --- .../scala/org/apache/spark/sql/functions.scala | 82 ++ .../apache/spark/sql/PlanGenerationTestSuite.scala | 36 +++ .../explain-results/function_regr_avgx.explain | 2 + .../explain-results/function_regr_avgy.explain | 2 + .../explain-results/function_regr_count.explain| 2 + .../function_regr_intercept.explain| 2 + .../explain-results/function_regr_r2.explain | 2 + .../explain-results/function_regr_slope.explain| 2 + .../explain-results/function_regr_sxx.explain | 2 + .../explain-results/function_regr_sxy.explain | 2 + .../explain-results/function_regr_syy.explain | 2 + .../query-tests/queries/function_regr_avgx.json| 29 +++ .../queries/function_regr_avgx.proto.bin | Bin 0 -> 185 bytes .../query-tests/queries/function_regr_avgy.json| 29 +++ .../queries/function_regr_avgy.proto.bin | Bin 0 -> 185 bytes .../query-tests/queries/function_regr_count.json | 29 +++ .../queries/function_regr_count.proto.bin | Bin 0 -> 186 bytes .../queries/function_regr_intercept.json | 29 +++ .../queries/function_regr_intercept.proto.bin | Bin 0 -> 190 bytes .../query-tests/queries/function_regr_r2.json | 29 +++ .../query-tests/queries/function_regr_r2.proto.bin | Bin 0 -> 183 bytes .../query-tests/queries/function_regr_slope.json | 29 +++ .../queries/function_regr_slope.proto.bin | Bin 0 -> 186 bytes .../query-tests/queries/function_regr_sxx.json | 29 +++ .../queries/function_regr_sxx.proto.bin| Bin 0 -> 184 bytes .../query-tests/queries/function_regr_sxy.json | 29 +++ .../queries/function_regr_sxy.proto.bin| Bin 0 -> 184 bytes .../query-tests/queries/function_regr_syy.json | 29 +++ .../queries/function_regr_syy.proto.bin| Bin 0 -> 184 bytes .../source/reference/pyspark.sql/functions.rst | 9 + python/pyspark/sql/connect/functions.py| 63 + python/pyspark/sql/functions.py| 280 + .../scala/org/apache/spark/sql/functions.scala | 83 ++ .../apache/spark/sql/DataFrameAggregateSuite.scala | 13 + 34 files changed, 845 insertions(+) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala index 60634388fa1..9179b88a26d 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala @@ -992,6 +992,88 @@ object functions { */ def var_pop(columnName: String): Column = var_pop(Column(columnName)) + /** + * Aggregate function: returns the average of the independent variable for non-null pairs in a + * group, where `y` is the dependent variable and `x` is the independent variable. + * + * @group agg_funcs + * @since 3.5.0 + */ + def regr_avgx(y: Column, x: Column): Column = Column.fn("regr_avgx", y, x) + + /** + * Aggregate function: returns the average of the independent variable for non-null pairs in a + * group, where `y` is the dependent variable and `x` is the independent variable. + * + * @group agg_funcs + * @since 3.5.0 + */ + def regr_avgy(y: Column, x: Column): Column = Column.fn("regr_avgy", y, x) + + /** + * Aggregate function: returns the number of non-null number pairs in a group, where `y` is the + * dependent variable and `x` is the independent
[spark] branch master updated: [SPARK-43984][SQL][PROTOBUF] Change to use `foreach` when `map` doesn't produce results
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4ffa2722d13 [SPARK-43984][SQL][PROTOBUF] Change to use `foreach` when `map` doesn't produce results 4ffa2722d13 is described below commit 4ffa2722d13593e46ef1c8fd59a20c439a535f61 Author: yangjie01 AuthorDate: Wed Jun 7 21:02:54 2023 +0800 [SPARK-43984][SQL][PROTOBUF] Change to use `foreach` when `map` doesn't produce results ### What changes were proposed in this pull request? Similar as https://github.com/apache/spark/pull/36720, this pr change to use `foreach` when `map` doesn't produce results in Spark code, these are new cases after Spark 3.4. ### Why are the changes needed? Use appropriate api. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass Github Actions. Closes #41482 from LuciferYang/SPARK-43984. Authored-by: yangjie01 Signed-off-by: yangjie01 --- .../main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala | 2 +- sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala index b11284d1f28..611b753d024 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala @@ -173,7 +173,7 @@ private[sql] class ProtobufSerializer( case (MapType(kt, vt, valueContainsNull), MESSAGE) => var keyField: FieldDescriptor = null var valueField: FieldDescriptor = null -fieldDescriptor.getMessageType.getFields.asScala.map { field => +fieldDescriptor.getMessageType.getFields.asScala.foreach { field => field.getName match { case "key" => keyField = field diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 0f689ea975d..01684f52ab8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -63,7 +63,7 @@ private[hive] case class HiveSimpleUDF( // TODO: Finish input output types. override def eval(input: InternalRow): Any = { -children.zipWithIndex.map { +children.zipWithIndex.foreach { case (child, idx) => evaluator.setArg(idx, child.eval(input)) } evaluator.evaluate() @@ -135,7 +135,7 @@ private[hive] case class HiveGenericUDF( private lazy val evaluator = new HiveGenericUDFEvaluator(funcWrapper, children) override def eval(input: InternalRow): Any = { -children.zipWithIndex.map { +children.zipWithIndex.foreach { case (child, idx) => evaluator.setArg(idx, child.eval(input)) } evaluator.evaluate() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (fead8a7962a -> 89de4f79e7f)
This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from fead8a7962a [SPARK-43993][SQL][TESTS] Add tests for cache artifacts add 89de4f79e7f [SPARK-43790][PYTHON][CONNECT][ML] Add `copyFromLocalToFs` API No new revisions were added by this update. Summary of changes: .../artifact/SparkConnectArtifactManager.scala | 39 ++ .../apache/spark/sql/connect/config/Connect.scala | 15 + .../service/SparkConnectAddArtifactsHandler.scala | 7 +++- .../connect/artifact/ArtifactManagerSuite.scala| 18 +- python/pyspark/sql/connect/client/artifact.py | 33 +++--- python/pyspark/sql/connect/client/core.py | 3 ++ python/pyspark/sql/connect/session.py | 29 .../sql/tests/connect/client/test_artifact.py | 21 8 files changed, 158 insertions(+), 7 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-43993][SQL][TESTS] Add tests for cache artifacts
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new fead8a7962a [SPARK-43993][SQL][TESTS] Add tests for cache artifacts fead8a7962a is described below commit fead8a7962a717aae5cab9eef51eed2ac684f070 Author: Max Gekk AuthorDate: Wed Jun 7 16:00:49 2023 +0300 [SPARK-43993][SQL][TESTS] Add tests for cache artifacts ### What changes were proposed in this pull request? In the PR, I propose to add a test to check two methods of the artifact manager: - `isCachedArtifact()` - `cacheArtifact()` ### Why are the changes needed? To improve test coverage of Artifacts API. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? By running new test: ``` $ build/sbt "test:testOnly *.ArtifactSuite" ``` Closes #41493 from MaxGekk/test-cache-artifact. Authored-by: Max Gekk Signed-off-by: Max Gekk --- .../spark/sql/connect/client/ArtifactManager.scala | 2 +- .../spark/sql/connect/client/ArtifactSuite.scala | 14 .../connect/client/SparkConnectClientSuite.scala | 25 +- 3 files changed, 39 insertions(+), 2 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala index acd9f279c6d..6d0d16df946 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala @@ -108,7 +108,7 @@ class ArtifactManager( */ def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts)) - private def isCachedArtifact(hash: String): Boolean = { + private[client] def isCachedArtifact(hash: String): Boolean = { val artifactName = CACHE_PREFIX + "/" + hash val request = proto.ArtifactStatusesRequest .newBuilder() diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala index 506ad3625b0..39ab0eef412 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala @@ -25,6 +25,7 @@ import scala.collection.JavaConverters._ import com.google.protobuf.ByteString import io.grpc.{ManagedChannel, Server} import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder} +import org.apache.commons.codec.digest.DigestUtils.sha256Hex import org.scalatest.BeforeAndAfterEach import org.apache.spark.connect.proto @@ -248,4 +249,17 @@ class ArtifactSuite extends ConnectFunSuite with BeforeAndAfterEach { assertFileDataEquality(remainingArtifacts.get(0).getData, Paths.get(file3)) assertFileDataEquality(remainingArtifacts.get(1).getData, Paths.get(file4)) } + + test("cache an artifact and check its presence") { +val s = "Hello, World!" +val blob = s.getBytes("UTF-8") +val expectedHash = sha256Hex(blob) +assert(artifactManager.isCachedArtifact(expectedHash) === false) +val actualHash = artifactManager.cacheArtifact(blob) +assert(actualHash === expectedHash) +assert(artifactManager.isCachedArtifact(expectedHash) === true) + +val receivedRequests = service.getAndClearLatestAddArtifactRequests() +assert(receivedRequests.size == 1) + } } diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala index 7a0ad1a9e2a..7e0b687054d 100755 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.connect.client import java.util.concurrent.TimeUnit +import scala.collection.JavaConverters._ import scala.collection.mutable import io.grpc.{Server, StatusRuntimeException} @@ -26,7 +27,7 @@ import io.grpc.stub.StreamObserver import org.scalatest.BeforeAndAfterEach import org.apache.spark.connect.proto -import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse, AnalyzePlanRequest, AnalyzePlanResponse, ExecutePlanRequest, ExecutePlanResponse, SparkConnectServiceGrpc} +import org.apache.spark.connect.proto.{AddArtifactsRequest,
[spark] branch master updated (e5b006f89d9 -> 1a754913603)
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from e5b006f89d9 [MINOR][PS][DOCS] Add `DataFrame.{from_dict, to_orc}` to API references add 1a754913603 [SPARK-43921][PROTOBUF] Generate Protobuf descriptor files at build time No new revisions were added by this update. Summary of changes: connector/protobuf/pom.xml | 27 +-- .../resources/protobuf/basicmessage_noimports.desc | 18 .../test/resources/protobuf/catalyst_types.desc| 48 .../test/resources/protobuf/functions_suite.desc | Bin 11190 -> 0 bytes .../test/resources/protobuf/proto2_messages.desc | Bin 929 -> 0 bytes .../src/test/resources/protobuf/serde_suite.desc | 27 --- .../ProtobufCatalystDataConversionSuite.scala | 4 +- .../sql/protobuf/ProtobufFunctionsSuite.scala | 15 +++ .../spark/sql/protobuf/ProtobufSerdeSuite.scala| 28 ++-- .../spark/sql/protobuf/ProtobufTestBase.scala | 50 + project/SparkBuild.scala | 7 ++- 11 files changed, 94 insertions(+), 130 deletions(-) delete mode 100644 connector/protobuf/src/test/resources/protobuf/basicmessage_noimports.desc delete mode 100644 connector/protobuf/src/test/resources/protobuf/catalyst_types.desc delete mode 100644 connector/protobuf/src/test/resources/protobuf/functions_suite.desc delete mode 100644 connector/protobuf/src/test/resources/protobuf/proto2_messages.desc delete mode 100644 connector/protobuf/src/test/resources/protobuf/serde_suite.desc - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [MINOR][PS][DOCS] Add `DataFrame.{from_dict, to_orc}` to API references
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e5b006f89d9 [MINOR][PS][DOCS] Add `DataFrame.{from_dict, to_orc}` to API references e5b006f89d9 is described below commit e5b006f89d9e4a50d00e78ba03f2866fde12abb1 Author: Ruifeng Zheng AuthorDate: Wed Jun 7 20:57:43 2023 +0900 [MINOR][PS][DOCS] Add `DataFrame.{from_dict, to_orc}` to API references ### What changes were proposed in this pull request? Add `DataFrame.{from_dict, to_orc}` to API references Move `DataFrame.info` to attributes according to https://pandas.pydata.org/docs/reference/frame.html#attributes-and-underlying-data ### Why are the changes needed? add missing doc ### Does this PR introduce _any_ user-facing change? yes, doc updated ### How was this patch tested? CI and manually check Closes #41492 from zhengruifeng/doc_frame_missing_io. Authored-by: Ruifeng Zheng Signed-off-by: Hyukjin Kwon --- python/docs/source/reference/pyspark.pandas/frame.rst | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/python/docs/source/reference/pyspark.pandas/frame.rst b/python/docs/source/reference/pyspark.pandas/frame.rst index 4ab0ea199be..a8d114187b9 100644 --- a/python/docs/source/reference/pyspark.pandas/frame.rst +++ b/python/docs/source/reference/pyspark.pandas/frame.rst @@ -37,6 +37,7 @@ Attributes and underlying data :toctree: api/ DataFrame.index + DataFrame.info DataFrame.columns DataFrame.empty @@ -272,13 +273,14 @@ Serialization / IO / Conversion .. autosummary:: :toctree: api/ + DataFrame.from_dict DataFrame.from_records - DataFrame.info DataFrame.to_table DataFrame.to_delta DataFrame.to_parquet DataFrame.to_spark_io DataFrame.to_csv + DataFrame.to_orc DataFrame.to_pandas DataFrame.to_html DataFrame.to_numpy - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (e4c27f53e26 -> 41fd030145a)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from e4c27f53e26 [SPARK-43979][SQL] CollectedMetrics should be treated as the same one for self-join add 41fd030145a [SPARK-43376][SQL][FOLLOWUP] lazy construct subquery to improve reuse subquery No new revisions were added by this update. Summary of changes: .../execution/adaptive/AdaptiveSparkPlanExec.scala | 7 +++- .../adaptive/InsertAdaptiveSparkPlan.scala | 39 +++--- .../adaptive/PlanAdaptiveSubqueries.scala | 22 +++- .../execution/adaptive/ReuseAdaptiveSubquery.scala | 15 +++-- 4 files changed, 37 insertions(+), 46 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (5021638ee14 -> e4c27f53e26)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 5021638ee14 [SPARK-43717][CONNECT] Scala client reduce agg cannot handle null partitions for scala primitive inputs add e4c27f53e26 [SPARK-43979][SQL] CollectedMetrics should be treated as the same one for self-join No new revisions were added by this update. Summary of changes: .../sql/catalyst/analysis/CheckAnalysis.scala | 25 +- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 16 ++ 2 files changed, 40 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-43717][CONNECT] Scala client reduce agg cannot handle null partitions for scala primitive inputs
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5021638ee14 [SPARK-43717][CONNECT] Scala client reduce agg cannot handle null partitions for scala primitive inputs 5021638ee14 is described below commit 5021638ee14758b92309942a1bcaed2b6554f810 Author: Zhen Li AuthorDate: Wed Jun 7 14:30:42 2023 +0800 [SPARK-43717][CONNECT] Scala client reduce agg cannot handle null partitions for scala primitive inputs ### What changes were proposed in this pull request? Scala client fails with NPE when running the following reduce agg: ``` spark.range(0, 5, 1, 10).as[Long].reduce(_ + _) == 10 ``` The reason is because the `range` will produce null partitions and the Reduce encoder will not be able to set the default value correctly for partitions that contains Scala primitives. In the example, we expect 0 but receive null. This causes the codegen wrongly assumes the input is nullable and generates wrong code. ### Why are the changes needed? Bug fix ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit and Scala Client E2E tests. Closes #41264 from zhenlineo/fix-agg-null. Authored-by: Zhen Li Signed-off-by: yangjie01 --- .../spark/sql/UserDefinedFunctionE2ETestSuite.scala | 20 .../spark/sql/expressions/ReduceAggregator.scala | 13 - .../sql/expressions/ReduceAggregatorSuite.scala | 10 -- 3 files changed, 36 insertions(+), 7 deletions(-) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala index b5bbee67803..ca1bcf3fe67 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala @@ -198,18 +198,30 @@ class UserDefinedFunctionE2ETestSuite extends RemoteSparkSession { assert(sum.get() == 0) // The value is not 45 } - test("Dataset reduce") { + test("Dataset reduce without null partition inputs") { val session: SparkSession = spark import session.implicits._ -assert(spark.range(10).map(_ + 1).reduce(_ + _) == 55) +assert(spark.range(0, 10, 1, 5).map(_ + 1).reduce(_ + _) == 55) } - test("Dataset reduce - java") { + test("Dataset reduce with null partition inputs") { +val session: SparkSession = spark +import session.implicits._ +assert(spark.range(0, 10, 1, 16).map(_ + 1).reduce(_ + _) == 55) + } + + test("Dataset reduce with null partition inputs - java to scala long type") { +val session: SparkSession = spark +import session.implicits._ +assert(spark.range(0, 5, 1, 10).as[Long].reduce(_ + _) == 10) + } + + test("Dataset reduce with null partition inputs - java") { val session: SparkSession = spark import session.implicits._ assert( spark -.range(10) +.range(0, 10, 1, 16) .map(_ + 1) .reduce(new ReduceFunction[Long] { override def call(v1: Long, v2: Long): Long = v1 + v2 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala b/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala index 41306cd0a99..e897fdfe008 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala @@ -32,7 +32,18 @@ private[sql] class ReduceAggregator[T: Encoder](func: (T, T) => T) @transient private val encoder = implicitly[Encoder[T]] - override def zero: (Boolean, T) = (false, null.asInstanceOf[T]) + private val _zero = encoder.clsTag.runtimeClass match { +case java.lang.Boolean.TYPE => false +case java.lang.Byte.TYPE => 0.toByte +case java.lang.Short.TYPE => 0.toShort +case java.lang.Integer.TYPE => 0 +case java.lang.Long.TYPE => 0L +case java.lang.Float.TYPE => 0f +case java.lang.Double.TYPE => 0d +case _ => null + } + + override def zero: (Boolean, T) = (false, _zero.asInstanceOf[T]) override def bufferEncoder: Encoder[(Boolean, T)] = ExpressionEncoder.tuple( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ReduceAggregatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ReduceAggregatorSuite.scala index f65dcdf119c..c1071373287 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ReduceAggregatorSuite.scala +++