[spark] branch master updated (2adb8e12f73 -> bd270830614)

2023-06-07 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 2adb8e12f73 [SPARK-44000][SQL][FOLLOWUP] Add comments when picking 
build side for BNLJ
 add bd270830614 [SPARK-43935][SQL][PYTHON][CONNECT] Add xpath_* functions 
to Scala and Python

No new revisions were added by this update.

Summary of changes:
 .../scala/org/apache/spark/sql/functions.scala |  87 ++
 .../apache/spark/sql/PlanGenerationTestSuite.scala |  36 ++
 .../explain-results/function_xpath.explain |   2 +
 .../explain-results/function_xpath_boolean.explain |   2 +
 .../explain-results/function_xpath_double.explain  |   2 +
 .../explain-results/function_xpath_float.explain   |   2 +
 .../explain-results/function_xpath_int.explain |   2 +
 .../explain-results/function_xpath_long.explain|   2 +
 .../explain-results/function_xpath_number.explain  |   2 +
 .../explain-results/function_xpath_short.explain   |   2 +
 .../explain-results/function_xpath_string.explain  |   2 +
 .../query-tests/queries/function_xpath.json|  29 +
 .../query-tests/queries/function_xpath.proto.bin   | Bin 0 -> 136 bytes
 .../queries/function_xpath_boolean.json|  29 +
 .../queries/function_xpath_boolean.proto.bin   | Bin 0 -> 137 bytes
 .../query-tests/queries/function_xpath_double.json |  29 +
 .../queries/function_xpath_double.proto.bin| Bin 0 -> 136 bytes
 .../query-tests/queries/function_xpath_float.json  |  29 +
 .../queries/function_xpath_float.proto.bin | Bin 0 -> 135 bytes
 .../query-tests/queries/function_xpath_int.json|  29 +
 .../queries/function_xpath_int.proto.bin   | Bin 0 -> 132 bytes
 .../query-tests/queries/function_xpath_long.json   |  29 +
 .../queries/function_xpath_long.proto.bin  | Bin 0 -> 133 bytes
 .../query-tests/queries/function_xpath_number.json |  29 +
 .../queries/function_xpath_number.proto.bin| Bin 0 -> 136 bytes
 .../query-tests/queries/function_xpath_short.json  |  29 +
 .../queries/function_xpath_short.proto.bin | Bin 0 -> 135 bytes
 .../query-tests/queries/function_xpath_string.json |  29 +
 .../queries/function_xpath_string.proto.bin| Bin 0 -> 136 bytes
 .../source/reference/pyspark.sql/functions.rst |  15 +++
 python/pyspark/sql/connect/functions.py|  63 ++
 python/pyspark/sql/functions.py| 133 +
 .../scala/org/apache/spark/sql/functions.scala |  97 +++
 .../org/apache/spark/sql/XPathFunctionsSuite.scala |  17 +++
 34 files changed, 727 insertions(+)
 create mode 100644 
connector/connect/common/src/test/resources/query-tests/explain-results/function_xpath.explain
 create mode 100644 
connector/connect/common/src/test/resources/query-tests/explain-results/function_xpath_boolean.explain
 create mode 100644 
connector/connect/common/src/test/resources/query-tests/explain-results/function_xpath_double.explain
 create mode 100644 
connector/connect/common/src/test/resources/query-tests/explain-results/function_xpath_float.explain
 create mode 100644 
connector/connect/common/src/test/resources/query-tests/explain-results/function_xpath_int.explain
 create mode 100644 
connector/connect/common/src/test/resources/query-tests/explain-results/function_xpath_long.explain
 create mode 100644 
connector/connect/common/src/test/resources/query-tests/explain-results/function_xpath_number.explain
 create mode 100644 
connector/connect/common/src/test/resources/query-tests/explain-results/function_xpath_short.explain
 create mode 100644 
connector/connect/common/src/test/resources/query-tests/explain-results/function_xpath_string.explain
 create mode 100644 
connector/connect/common/src/test/resources/query-tests/queries/function_xpath.json
 create mode 100644 
connector/connect/common/src/test/resources/query-tests/queries/function_xpath.proto.bin
 create mode 100644 
connector/connect/common/src/test/resources/query-tests/queries/function_xpath_boolean.json
 create mode 100644 
connector/connect/common/src/test/resources/query-tests/queries/function_xpath_boolean.proto.bin
 create mode 100644 
connector/connect/common/src/test/resources/query-tests/queries/function_xpath_double.json
 create mode 100644 
connector/connect/common/src/test/resources/query-tests/queries/function_xpath_double.proto.bin
 create mode 100644 
connector/connect/common/src/test/resources/query-tests/queries/function_xpath_float.json
 create mode 100644 
connector/connect/common/src/test/resources/query-tests/queries/function_xpath_float.proto.bin
 create mode 100644 
connector/connect/common/src/test/resources/query-tests/queries/function_xpath_int.json
 create mode 100644 
connector/connect/common/src/test/resources/query-tests/queries/function_xpath_int.proto.bin
 create mode 100644 

[spark] branch master updated (d88633ada5e -> 2adb8e12f73)

2023-06-07 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from d88633ada5e [SPARK-44000][SQL] Add hint to disable broadcasting and 
replicating one side of join
 add 2adb8e12f73 [SPARK-44000][SQL][FOLLOWUP] Add comments when picking 
build side for BNLJ

No new revisions were added by this update.

Summary of changes:
 .../main/scala/org/apache/spark/sql/execution/SparkStrategies.scala   | 4 
 1 file changed, 4 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-44000][SQL] Add hint to disable broadcasting and replicating one side of join

2023-06-07 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d88633ada5e [SPARK-44000][SQL] Add hint to disable broadcasting and 
replicating one side of join
d88633ada5e is described below

commit d88633ada5eb73e8876acaa2c2a53b9596f2acdd
Author: aokolnychyi 
AuthorDate: Wed Jun 7 20:15:05 2023 -0700

[SPARK-44000][SQL] Add hint to disable broadcasting and replicating one 
side of join

### What changes were proposed in this pull request?

This PR adds a new internal join hint to disable broadcasting and 
replicating one side of join.

### Why are the changes needed?

These changes are needed to disable broadcasting and replicating one side 
of join when it is not permitted, such as the cardinality check in MERGE 
operations in PR #41448.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

This PR comes with tests. More tests are in #41448.

Closes #41499 from aokolnychyi/spark-44000.

Authored-by: aokolnychyi 
Signed-off-by: Dongjoon Hyun 
---
 .../spark/sql/catalyst/optimizer/joins.scala   | 34 +++-
 .../spark/sql/catalyst/plans/logical/hints.scala   | 10 
 .../spark/sql/execution/SparkStrategies.scala  | 29 +++---
 .../scala/org/apache/spark/sql/JoinSuite.scala | 64 +-
 4 files changed, 127 insertions(+), 10 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
index 48b4007a897..8f03b93dce7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
@@ -341,6 +341,16 @@ trait JoinSelectionHelper {
 )
   }
 
+  def getBroadcastNestedLoopJoinBuildSide(hint: JoinHint): Option[BuildSide] = 
{
+if (hintToNotBroadcastAndReplicateLeft(hint)) {
+  Some(BuildRight)
+} else if (hintToNotBroadcastAndReplicateRight(hint)) {
+  Some(BuildLeft)
+} else {
+  None
+}
+  }
+
   def getSmallerSide(left: LogicalPlan, right: LogicalPlan): BuildSide = {
 if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else 
BuildLeft
   }
@@ -413,11 +423,19 @@ trait JoinSelectionHelper {
   }
 
   def hintToNotBroadcastLeft(hint: JoinHint): Boolean = {
-hint.leftHint.exists(_.strategy.contains(NO_BROADCAST_HASH))
+hint.leftHint.flatMap(_.strategy).exists {
+  case NO_BROADCAST_HASH => true
+  case NO_BROADCAST_AND_REPLICATION => true
+  case _ => false
+}
   }
 
   def hintToNotBroadcastRight(hint: JoinHint): Boolean = {
-hint.rightHint.exists(_.strategy.contains(NO_BROADCAST_HASH))
+hint.rightHint.flatMap(_.strategy).exists {
+  case NO_BROADCAST_HASH => true
+  case NO_BROADCAST_AND_REPLICATION => true
+  case _ => false
+}
   }
 
   def hintToShuffleHashJoinLeft(hint: JoinHint): Boolean = {
@@ -454,6 +472,18 @@ trait JoinSelectionHelper {
   hint.rightHint.exists(_.strategy.contains(SHUFFLE_REPLICATE_NL))
   }
 
+  def hintToNotBroadcastAndReplicate(hint: JoinHint): Boolean = {
+hintToNotBroadcastAndReplicateLeft(hint) || 
hintToNotBroadcastAndReplicateRight(hint)
+  }
+
+  def hintToNotBroadcastAndReplicateLeft(hint: JoinHint): Boolean = {
+hint.leftHint.exists(_.strategy.contains(NO_BROADCAST_AND_REPLICATION))
+  }
+
+  def hintToNotBroadcastAndReplicateRight(hint: JoinHint): Boolean = {
+hint.rightHint.exists(_.strategy.contains(NO_BROADCAST_AND_REPLICATION))
+  }
+
   private def getBuildSide(
   canBuildLeft: Boolean,
   canBuildRight: Boolean,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
index 5dc3eb707f6..b17bab7849b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala
@@ -187,6 +187,16 @@ case object PREFER_SHUFFLE_HASH extends JoinStrategyHint {
   override def hintAliases: Set[String] = Set.empty
 }
 
+/**
+ * An internal hint to prohibit broadcasting and replicating one side of a 
join. This hint is used
+ * by some rules where broadcasting or replicating a particular side of the 
join is not permitted,
+ * such as the cardinality check in MERGE operations.
+ */
+case object NO_BROADCAST_AND_REPLICATION extends JoinStrategyHint {
+  override def displayName: String = "no_broadcast_and_replication"
+  override def hintAliases: Set[String] = Set.empty
+}
+
 /**
  * The callback for implementing customized strategies 

[spark] branch master updated: Revert "[SPARK-43840][INFRA] Switch `scala-213` GitHub Action Job to `scala-212`"

2023-06-07 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new caf905d7b37 Revert "[SPARK-43840][INFRA] Switch `scala-213` GitHub 
Action Job to `scala-212`"
caf905d7b37 is described below

commit caf905d7b37198a40a299c17033063fe3dd3eb6a
Author: yangjie01 
AuthorDate: Wed Jun 7 20:08:09 2023 -0700

Revert "[SPARK-43840][INFRA] Switch `scala-213` GitHub Action Job to 
`scala-212`"

### What changes were proposed in this pull request?
This pr revert change of SPARK-43840, Spark 3.5.0 still use Scala 2.12 as 
default, so we need build check for Scala 2.13 for pull request.

### Why are the changes needed?
Restore pipeline check for Scala 2.13.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

Closes #41506 from LuciferYang/r-43840.

Authored-by: yangjie01 
Signed-off-by: Dongjoon Hyun 
---
 .github/workflows/build_and_test.yml | 18 +-
 1 file changed, 9 insertions(+), 9 deletions(-)

diff --git a/.github/workflows/build_and_test.yml 
b/.github/workflows/build_and_test.yml
index 8aa0f42916e..a373b0e76e7 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -86,7 +86,7 @@ jobs:
   sparkr=`./dev/is-changed.py -m sparkr`
   tpcds=`./dev/is-changed.py -m sql`
   docker=`./dev/is-changed.py -m docker-integration-tests`
-  # 'build', 'scala-212', and 'java-11-17' are always true for now.
+  # 'build', 'scala-213', and 'java-11-17' are always true for now.
   # It does not save significant time and most of PRs trigger the 
build.
   precondition="
 {
@@ -95,7 +95,7 @@ jobs:
   \"sparkr\": \"$sparkr\",
   \"tpcds-1g\": \"$tpcds\",
   \"docker-integration-tests\": \"$docker\",
-  \"scala-212\": \"true\",
+  \"scala-213\": \"true\",
   \"java-11-17\": \"true\",
   \"lint\" : \"true\",
   \"k8s-integration-tests\" : \"true\",
@@ -728,10 +728,10 @@ jobs:
 ./build/mvn $MAVEN_CLI_OPTS -DskipTests -Pyarn -Pmesos -Pkubernetes 
-Pvolcano -Phive -Phive-thriftserver -Phadoop-cloud 
-Djava.version=${JAVA_VERSION/-ea} install
 rm -rf ~/.m2/repository/org/apache/spark
 
-  scala-212:
+  scala-213:
 needs: precondition
-if: fromJson(needs.precondition.outputs.required).scala-212 == 'true'
-name: Scala 2.12 build with SBT
+if: fromJson(needs.precondition.outputs.required).scala-213 == 'true'
+name: Scala 2.13 build with SBT
 runs-on: ubuntu-22.04
 steps:
 - name: Checkout Spark repository
@@ -761,9 +761,9 @@ jobs:
   uses: actions/cache@v3
   with:
 path: ~/.cache/coursier
-key: scala-212-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') 
}}
+key: scala-213-coursier-${{ hashFiles('**/pom.xml', '**/plugins.sbt') 
}}
 restore-keys: |
-  scala-212-coursier-
+  scala-213-coursier-
 - name: Install Java 8
   uses: actions/setup-java@v3
   with:
@@ -771,8 +771,8 @@ jobs:
 java-version: 8
 - name: Build with SBT
   run: |
-./dev/change-scala-version.sh 2.12
-./build/sbt -Pyarn -Pmesos -Pkubernetes -Pvolcano -Phive 
-Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pdocker-integration-tests 
-Pkubernetes-integration-tests -Pspark-ganglia-lgpl -Pscala-2.12 compile 
Test/compile
+./dev/change-scala-version.sh 2.13
+./build/sbt -Pyarn -Pmesos -Pkubernetes -Pvolcano -Phive 
-Phive-thriftserver -Phadoop-cloud -Pkinesis-asl -Pdocker-integration-tests 
-Pkubernetes-integration-tests -Pspark-ganglia-lgpl -Pscala-2.13 compile 
Test/compile
 
   # Any TPC-DS related updates on this job need to be applied to tpcds-1g-gen 
job of benchmark.yml as well
   tpcds-1g:


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43920][SQL][CONNECT] Create sql/api module

2023-06-07 Thread hvanhovell
This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 15202e53195 [SPARK-43920][SQL][CONNECT] Create sql/api module
15202e53195 is described below

commit 15202e531957849e9eaefcaa6fa1c522a8967d80
Author: Rui Wang 
AuthorDate: Wed Jun 7 20:53:02 2023 -0400

[SPARK-43920][SQL][CONNECT] Create sql/api module

### What changes were proposed in this pull request?

We need a sql/api module to host public API like DataType, Row, etc. This 
module can be shared between Catalyst and Spark Connect client so that client 
do not need to depend on Catalyst anymore.

### Why are the changes needed?

Towards Spark Connect client do not need to depend on Catalyst anymore.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

N/A

Closes #41426 from amaliujia/add_sql_api.

Authored-by: Rui Wang 
Signed-off-by: Herman van Hovell 
---
 pom.xml  |  1 +
 project/SparkBuild.scala |  6 +++---
 sql/api/pom.xml  | 45 +
 3 files changed, 49 insertions(+), 3 deletions(-)

diff --git a/pom.xml b/pom.xml
index 175df1722e6..3c87da45bea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,6 +89,7 @@
 mllib-local
 tools
 streaming
+sql/api
 sql/catalyst
 sql/core
 sql/hive
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index c4c19c65bf1..023ce4ba81c 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -58,10 +58,10 @@ object BuildCommons {
 
   val allProjects@Seq(
 core, graphx, mllib, mllibLocal, repl, networkCommon, networkShuffle, 
launcher, unsafe, tags, sketch, kvstore,
-commonUtils, _*
+commonUtils, sqlApi, _*
   ) = Seq(
 "core", "graphx", "mllib", "mllib-local", "repl", "network-common", 
"network-shuffle", "launcher", "unsafe",
-"tags", "sketch", "kvstore", "common-utils"
+"tags", "sketch", "kvstore", "common-utils", "sql-api"
   ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects ++ 
Seq(connectCommon, connect, connectClient)
 
   val optionallyEnabledProjects@Seq(kubernetes, mesos, yarn,
@@ -408,7 +408,7 @@ object SparkBuild extends PomBuild {
 Seq(
   spark, hive, hiveThriftServer, repl, networkCommon, networkShuffle, 
networkYarn,
   unsafe, tags, tokenProviderKafka010, sqlKafka010, connectCommon, 
connect, connectClient, protobuf,
-  commonUtils
+  commonUtils, sqlApi
 ).contains(x)
   }
 
diff --git a/sql/api/pom.xml b/sql/api/pom.xml
new file mode 100644
index 000..9d100b1130e
--- /dev/null
+++ b/sql/api/pom.xml
@@ -0,0 +1,45 @@
+
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+org.apache.spark
+spark-parent_2.12
+3.5.0-SNAPSHOT
+../../pom.xml
+
+
+spark-sql-api_2.12
+jar
+Spark Project SQL API
+https://spark.apache.org/
+
+sql-api
+
+
+
+
+
+
target/scala-${scala.binary.version}/classes
+
target/scala-${scala.binary.version}/test-classes
+
+
+
+
\ No newline at end of file


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (0c7b4306c7c -> 22297345e45)

2023-06-07 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 0c7b4306c7c [SPARK-43540][K8S][CORE] Add working directory into 
classpath on the driver in K8S cluster mode
 add 22297345e45 [SPARK-44002][CONNECT] Fix the artifact statuses handler

No new revisions were added by this update.

Summary of changes:
 .../src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala  | 8 +---
 .../sql/connect/service/SparkConnectArtifactStatusesHandler.scala | 2 +-
 2 files changed, 6 insertions(+), 4 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43540][K8S][CORE] Add working directory into classpath on the driver in K8S cluster mode

2023-06-07 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 0c7b4306c7c [SPARK-43540][K8S][CORE] Add working directory into 
classpath on the driver in K8S cluster mode
0c7b4306c7c is described below

commit 0c7b4306c7c5fbdd6c54f8172f82e1d23e3b
Author: fwang12 
AuthorDate: Wed Jun 7 15:38:46 2023 -0700

[SPARK-43540][K8S][CORE] Add working directory into classpath on the driver 
in K8S cluster mode

### What changes were proposed in this pull request?

Adding working directory into classpath on the driver in K8S cluster mode.

### Why are the changes needed?

After #37417, the spark.files, spark.jars are placed  in the working 
directory.
But seems that the spark context classloader can not access them because 
they are not in the classpath by default.
This pr adds the current working directory into classpath, so that the 
spark.files, spark.jars placed in the working directory can be accessible by 
the classloader.
For example, the `hive-site.xml` uploaded by `spark.files`.

### Does this PR introduce _any_ user-facing change?

yes, users do not need to add the working directory into spark classpath 
manually.

### How was this patch tested?

UT.

Closes #41201 from turboFei/work_dir_classpath.

Authored-by: fwang12 
Signed-off-by: Dongjoon Hyun 
---
 .../src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 12 +++-
 .../scala/org/apache/spark/deploy/SparkSubmitSuite.scala | 12 
 .../docker/src/main/dockerfiles/spark/entrypoint.sh  |  3 +++
 3 files changed, 22 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index e1d616b9b83..8f9477385e7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -414,6 +414,9 @@ private[spark] class SparkSubmit extends Logging {
 // directory too.
 // SPARK-33782 : This downloads all the files , jars , archiveFiles 
and pyfiles to current
 // working directory
+// SPARK-43540: add current working directory into driver classpath
+val workingDirectory = "."
+childClasspath += workingDirectory
 def downloadResourcesToCurrentDirectory(uris: String, isArchive: 
Boolean = false):
 String = {
   val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI)
@@ -423,13 +426,12 @@ private[spark] class SparkSubmit extends Logging {
 targetDir, sparkConf, hadoopConf)
   
Utils.stringToSeq(localResources).map(Utils.resolveURI).zip(resolvedUris).map {
 case (localResources, resolvedUri) =>
-  val source = new File(localResources.getPath)
+  val source = new File(localResources.getPath).getCanonicalFile
   val dest = new File(
-".",
+workingDirectory,
 if (resolvedUri.getFragment != null) resolvedUri.getFragment 
else source.getName)
-  logInfo(
-s"Files  $resolvedUri " +
-  s"from ${source.getAbsolutePath} to ${dest.getAbsolutePath}")
+.getCanonicalFile
+  logInfo(s"Files $resolvedUri from $source to $dest")
   Utils.deleteRecursively(dest)
   if (isArchive) {
 Utils.unpack(source, dest)
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 44c35ed70e0..8e2d6e6cf5f 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -1618,6 +1618,18 @@ class SparkSubmitSuite
   conf.get(k) should be (v)
 }
   }
+
+  test("SPARK-43540: Add working directory into classpath on the driver in K8S 
cluster mode") {
+val clArgs = Seq(
+  "--deploy-mode", "client",
+  "--master", "k8s://host:port",
+  "--class", "org.SomeClass",
+  "--conf", "spark.kubernetes.submitInDriver=true",
+  "/home/thejar.jar")
+val appArgs = new SparkSubmitArguments(clArgs)
+val (_, classpath, _, _) = submit.prepareSubmitEnvironment(appArgs)
+assert(classpath.contains("."))
+  }
 }
 
 object JarCreationTest extends Logging {
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
index 42f4df88f3d..f9561b9aa4e 100755
--- 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
+++ 

[spark] branch master updated: [SPARK-43933][SQL][PYTHON][CONNECT] Add linear regression aggregate functions to Scala and Python

2023-06-07 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2057eb7e203 [SPARK-43933][SQL][PYTHON][CONNECT] Add linear regression 
aggregate functions to Scala and Python
2057eb7e203 is described below

commit 2057eb7e203c9fde3f4fa13d5f04225cf6e49a87
Author: Jiaan Geng 
AuthorDate: Wed Jun 7 22:56:13 2023 +0800

[SPARK-43933][SQL][PYTHON][CONNECT] Add linear regression aggregate 
functions to Scala and Python

### What changes were proposed in this pull request?
Based HyukjinKwon 's suggestion, this PR want add linear regression 
aggregate functions to Scala and Python API.
These functions show below.

- `regr_avgx`

- `regr_avgy`

- `regr_count`

- `regr_intercept`

- `regr_r2`

- `regr_slope`

- `regr_sxx`

- `regr_sxy`

- `regr_syy`

### Why are the changes needed?
Add linear regression aggregate functions to Scala and Python API

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
New test cases.

Closes #41474 from beliefer/SPARK-43933.

Authored-by: Jiaan Geng 
Signed-off-by: Ruifeng Zheng 
---
 .../scala/org/apache/spark/sql/functions.scala |  82 ++
 .../apache/spark/sql/PlanGenerationTestSuite.scala |  36 +++
 .../explain-results/function_regr_avgx.explain |   2 +
 .../explain-results/function_regr_avgy.explain |   2 +
 .../explain-results/function_regr_count.explain|   2 +
 .../function_regr_intercept.explain|   2 +
 .../explain-results/function_regr_r2.explain   |   2 +
 .../explain-results/function_regr_slope.explain|   2 +
 .../explain-results/function_regr_sxx.explain  |   2 +
 .../explain-results/function_regr_sxy.explain  |   2 +
 .../explain-results/function_regr_syy.explain  |   2 +
 .../query-tests/queries/function_regr_avgx.json|  29 +++
 .../queries/function_regr_avgx.proto.bin   | Bin 0 -> 185 bytes
 .../query-tests/queries/function_regr_avgy.json|  29 +++
 .../queries/function_regr_avgy.proto.bin   | Bin 0 -> 185 bytes
 .../query-tests/queries/function_regr_count.json   |  29 +++
 .../queries/function_regr_count.proto.bin  | Bin 0 -> 186 bytes
 .../queries/function_regr_intercept.json   |  29 +++
 .../queries/function_regr_intercept.proto.bin  | Bin 0 -> 190 bytes
 .../query-tests/queries/function_regr_r2.json  |  29 +++
 .../query-tests/queries/function_regr_r2.proto.bin | Bin 0 -> 183 bytes
 .../query-tests/queries/function_regr_slope.json   |  29 +++
 .../queries/function_regr_slope.proto.bin  | Bin 0 -> 186 bytes
 .../query-tests/queries/function_regr_sxx.json |  29 +++
 .../queries/function_regr_sxx.proto.bin| Bin 0 -> 184 bytes
 .../query-tests/queries/function_regr_sxy.json |  29 +++
 .../queries/function_regr_sxy.proto.bin| Bin 0 -> 184 bytes
 .../query-tests/queries/function_regr_syy.json |  29 +++
 .../queries/function_regr_syy.proto.bin| Bin 0 -> 184 bytes
 .../source/reference/pyspark.sql/functions.rst |   9 +
 python/pyspark/sql/connect/functions.py|  63 +
 python/pyspark/sql/functions.py| 280 +
 .../scala/org/apache/spark/sql/functions.scala |  83 ++
 .../apache/spark/sql/DataFrameAggregateSuite.scala |  13 +
 34 files changed, 845 insertions(+)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index 60634388fa1..9179b88a26d 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
@@ -992,6 +992,88 @@ object functions {
*/
   def var_pop(columnName: String): Column = var_pop(Column(columnName))
 
+  /**
+   * Aggregate function: returns the average of the independent variable for 
non-null pairs in a
+   * group, where `y` is the dependent variable and `x` is the independent 
variable.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def regr_avgx(y: Column, x: Column): Column = Column.fn("regr_avgx", y, x)
+
+  /**
+   * Aggregate function: returns the average of the independent variable for 
non-null pairs in a
+   * group, where `y` is the dependent variable and `x` is the independent 
variable.
+   *
+   * @group agg_funcs
+   * @since 3.5.0
+   */
+  def regr_avgy(y: Column, x: Column): Column = Column.fn("regr_avgy", y, x)
+
+  /**
+   * Aggregate function: returns the number of non-null number pairs in a 
group, where `y` is the
+   * dependent variable and `x` is the independent 

[spark] branch master updated: [SPARK-43984][SQL][PROTOBUF] Change to use `foreach` when `map` doesn't produce results

2023-06-07 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4ffa2722d13 [SPARK-43984][SQL][PROTOBUF] Change to use `foreach` when 
`map` doesn't produce results
4ffa2722d13 is described below

commit 4ffa2722d13593e46ef1c8fd59a20c439a535f61
Author: yangjie01 
AuthorDate: Wed Jun 7 21:02:54 2023 +0800

[SPARK-43984][SQL][PROTOBUF] Change to use `foreach` when `map` doesn't 
produce results

### What changes were proposed in this pull request?
Similar as https://github.com/apache/spark/pull/36720, this pr change to 
use `foreach` when `map` doesn't produce results in Spark code,   these are new 
cases after Spark 3.4.

### Why are the changes needed?
Use appropriate api.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass Github Actions.

Closes #41482 from LuciferYang/SPARK-43984.

Authored-by: yangjie01 
Signed-off-by: yangjie01 
---
 .../main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala | 2 +-
 sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala  | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala
 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala
index b11284d1f28..611b753d024 100644
--- 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala
+++ 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala
@@ -173,7 +173,7 @@ private[sql] class ProtobufSerializer(
   case (MapType(kt, vt, valueContainsNull), MESSAGE) =>
 var keyField: FieldDescriptor = null
 var valueField: FieldDescriptor = null
-fieldDescriptor.getMessageType.getFields.asScala.map { field =>
+fieldDescriptor.getMessageType.getFields.asScala.foreach { field =>
   field.getName match {
 case "key" =>
   keyField = field
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
index 0f689ea975d..01684f52ab8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
@@ -63,7 +63,7 @@ private[hive] case class HiveSimpleUDF(
 
   // TODO: Finish input output types.
   override def eval(input: InternalRow): Any = {
-children.zipWithIndex.map {
+children.zipWithIndex.foreach {
   case (child, idx) => evaluator.setArg(idx, child.eval(input))
 }
 evaluator.evaluate()
@@ -135,7 +135,7 @@ private[hive] case class HiveGenericUDF(
   private lazy val evaluator = new HiveGenericUDFEvaluator(funcWrapper, 
children)
 
   override def eval(input: InternalRow): Any = {
-children.zipWithIndex.map {
+children.zipWithIndex.foreach {
   case (child, idx) => evaluator.setArg(idx, child.eval(input))
 }
 evaluator.evaluate()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (fead8a7962a -> 89de4f79e7f)

2023-06-07 Thread weichenxu123
This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from fead8a7962a [SPARK-43993][SQL][TESTS] Add tests for cache artifacts
 add 89de4f79e7f [SPARK-43790][PYTHON][CONNECT][ML] Add `copyFromLocalToFs` 
API

No new revisions were added by this update.

Summary of changes:
 .../artifact/SparkConnectArtifactManager.scala | 39 ++
 .../apache/spark/sql/connect/config/Connect.scala  | 15 +
 .../service/SparkConnectAddArtifactsHandler.scala  |  7 +++-
 .../connect/artifact/ArtifactManagerSuite.scala| 18 +-
 python/pyspark/sql/connect/client/artifact.py  | 33 +++---
 python/pyspark/sql/connect/client/core.py  |  3 ++
 python/pyspark/sql/connect/session.py  | 29 
 .../sql/tests/connect/client/test_artifact.py  | 21 
 8 files changed, 158 insertions(+), 7 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43993][SQL][TESTS] Add tests for cache artifacts

2023-06-07 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new fead8a7962a [SPARK-43993][SQL][TESTS] Add tests for cache artifacts
fead8a7962a is described below

commit fead8a7962a717aae5cab9eef51eed2ac684f070
Author: Max Gekk 
AuthorDate: Wed Jun 7 16:00:49 2023 +0300

[SPARK-43993][SQL][TESTS] Add tests for cache artifacts

### What changes were proposed in this pull request?
In the PR, I propose to add a test to check two methods of the artifact 
manager:
- `isCachedArtifact()`
- `cacheArtifact()`

### Why are the changes needed?
To improve test coverage of Artifacts API.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
By running new test:
```
$ build/sbt "test:testOnly *.ArtifactSuite"
```

Closes #41493 from MaxGekk/test-cache-artifact.

Authored-by: Max Gekk 
Signed-off-by: Max Gekk 
---
 .../spark/sql/connect/client/ArtifactManager.scala |  2 +-
 .../spark/sql/connect/client/ArtifactSuite.scala   | 14 
 .../connect/client/SparkConnectClientSuite.scala   | 25 +-
 3 files changed, 39 insertions(+), 2 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
index acd9f279c6d..6d0d16df946 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
@@ -108,7 +108,7 @@ class ArtifactManager(
*/
   def addArtifacts(uris: Seq[URI]): Unit = 
addArtifacts(uris.flatMap(parseArtifacts))
 
-  private def isCachedArtifact(hash: String): Boolean = {
+  private[client] def isCachedArtifact(hash: String): Boolean = {
 val artifactName = CACHE_PREFIX + "/" + hash
 val request = proto.ArtifactStatusesRequest
   .newBuilder()
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
index 506ad3625b0..39ab0eef412 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala
@@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
 import com.google.protobuf.ByteString
 import io.grpc.{ManagedChannel, Server}
 import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
+import org.apache.commons.codec.digest.DigestUtils.sha256Hex
 import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.connect.proto
@@ -248,4 +249,17 @@ class ArtifactSuite extends ConnectFunSuite with 
BeforeAndAfterEach {
 assertFileDataEquality(remainingArtifacts.get(0).getData, Paths.get(file3))
 assertFileDataEquality(remainingArtifacts.get(1).getData, Paths.get(file4))
   }
+
+  test("cache an artifact and check its presence") {
+val s = "Hello, World!"
+val blob = s.getBytes("UTF-8")
+val expectedHash = sha256Hex(blob)
+assert(artifactManager.isCachedArtifact(expectedHash) === false)
+val actualHash = artifactManager.cacheArtifact(blob)
+assert(actualHash === expectedHash)
+assert(artifactManager.isCachedArtifact(expectedHash) === true)
+
+val receivedRequests = service.getAndClearLatestAddArtifactRequests()
+assert(receivedRequests.size == 1)
+  }
 }
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
index 7a0ad1a9e2a..7e0b687054d 100755
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.connect.client
 
 import java.util.concurrent.TimeUnit
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import io.grpc.{Server, StatusRuntimeException}
@@ -26,7 +27,7 @@ import io.grpc.stub.StreamObserver
 import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.connect.proto
-import org.apache.spark.connect.proto.{AddArtifactsRequest, 
AddArtifactsResponse, AnalyzePlanRequest, AnalyzePlanResponse, 
ExecutePlanRequest, ExecutePlanResponse, SparkConnectServiceGrpc}
+import org.apache.spark.connect.proto.{AddArtifactsRequest, 

[spark] branch master updated (e5b006f89d9 -> 1a754913603)

2023-06-07 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from e5b006f89d9 [MINOR][PS][DOCS] Add `DataFrame.{from_dict, to_orc}` to 
API references
 add 1a754913603 [SPARK-43921][PROTOBUF] Generate Protobuf descriptor files 
at build time

No new revisions were added by this update.

Summary of changes:
 connector/protobuf/pom.xml |  27 +--
 .../resources/protobuf/basicmessage_noimports.desc |  18 
 .../test/resources/protobuf/catalyst_types.desc|  48 
 .../test/resources/protobuf/functions_suite.desc   | Bin 11190 -> 0 bytes
 .../test/resources/protobuf/proto2_messages.desc   | Bin 929 -> 0 bytes
 .../src/test/resources/protobuf/serde_suite.desc   |  27 ---
 .../ProtobufCatalystDataConversionSuite.scala  |   4 +-
 .../sql/protobuf/ProtobufFunctionsSuite.scala  |  15 +++
 .../spark/sql/protobuf/ProtobufSerdeSuite.scala|  28 ++--
 .../spark/sql/protobuf/ProtobufTestBase.scala  |  50 +
 project/SparkBuild.scala   |   7 ++-
 11 files changed, 94 insertions(+), 130 deletions(-)
 delete mode 100644 
connector/protobuf/src/test/resources/protobuf/basicmessage_noimports.desc
 delete mode 100644 
connector/protobuf/src/test/resources/protobuf/catalyst_types.desc
 delete mode 100644 
connector/protobuf/src/test/resources/protobuf/functions_suite.desc
 delete mode 100644 
connector/protobuf/src/test/resources/protobuf/proto2_messages.desc
 delete mode 100644 
connector/protobuf/src/test/resources/protobuf/serde_suite.desc


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [MINOR][PS][DOCS] Add `DataFrame.{from_dict, to_orc}` to API references

2023-06-07 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e5b006f89d9 [MINOR][PS][DOCS] Add `DataFrame.{from_dict, to_orc}` to 
API references
e5b006f89d9 is described below

commit e5b006f89d9e4a50d00e78ba03f2866fde12abb1
Author: Ruifeng Zheng 
AuthorDate: Wed Jun 7 20:57:43 2023 +0900

[MINOR][PS][DOCS] Add `DataFrame.{from_dict, to_orc}` to API references

### What changes were proposed in this pull request?
Add `DataFrame.{from_dict, to_orc}` to API references
Move `DataFrame.info` to attributes according to 
https://pandas.pydata.org/docs/reference/frame.html#attributes-and-underlying-data

### Why are the changes needed?
add missing doc

### Does this PR introduce _any_ user-facing change?
yes, doc updated

### How was this patch tested?
CI and manually check

Closes #41492 from zhengruifeng/doc_frame_missing_io.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 python/docs/source/reference/pyspark.pandas/frame.rst | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/python/docs/source/reference/pyspark.pandas/frame.rst 
b/python/docs/source/reference/pyspark.pandas/frame.rst
index 4ab0ea199be..a8d114187b9 100644
--- a/python/docs/source/reference/pyspark.pandas/frame.rst
+++ b/python/docs/source/reference/pyspark.pandas/frame.rst
@@ -37,6 +37,7 @@ Attributes and underlying data
:toctree: api/
 
DataFrame.index
+   DataFrame.info
DataFrame.columns
DataFrame.empty
 
@@ -272,13 +273,14 @@ Serialization / IO / Conversion
 .. autosummary::
:toctree: api/
 
+   DataFrame.from_dict
DataFrame.from_records
-   DataFrame.info
DataFrame.to_table
DataFrame.to_delta
DataFrame.to_parquet
DataFrame.to_spark_io
DataFrame.to_csv
+   DataFrame.to_orc
DataFrame.to_pandas
DataFrame.to_html
DataFrame.to_numpy


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (e4c27f53e26 -> 41fd030145a)

2023-06-07 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from e4c27f53e26 [SPARK-43979][SQL] CollectedMetrics should be treated as 
the same one for self-join
 add 41fd030145a [SPARK-43376][SQL][FOLLOWUP] lazy construct subquery to 
improve reuse subquery

No new revisions were added by this update.

Summary of changes:
 .../execution/adaptive/AdaptiveSparkPlanExec.scala |  7 +++-
 .../adaptive/InsertAdaptiveSparkPlan.scala | 39 +++---
 .../adaptive/PlanAdaptiveSubqueries.scala  | 22 +++-
 .../execution/adaptive/ReuseAdaptiveSubquery.scala | 15 +++--
 4 files changed, 37 insertions(+), 46 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (5021638ee14 -> e4c27f53e26)

2023-06-07 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 5021638ee14 [SPARK-43717][CONNECT] Scala client reduce agg cannot 
handle null partitions for scala primitive inputs
 add e4c27f53e26 [SPARK-43979][SQL] CollectedMetrics should be treated as 
the same one for self-join

No new revisions were added by this update.

Summary of changes:
 .../sql/catalyst/analysis/CheckAnalysis.scala  | 25 +-
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 16 ++
 2 files changed, 40 insertions(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43717][CONNECT] Scala client reduce agg cannot handle null partitions for scala primitive inputs

2023-06-07 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5021638ee14 [SPARK-43717][CONNECT] Scala client reduce agg cannot 
handle null partitions for scala primitive inputs
5021638ee14 is described below

commit 5021638ee14758b92309942a1bcaed2b6554f810
Author: Zhen Li 
AuthorDate: Wed Jun 7 14:30:42 2023 +0800

[SPARK-43717][CONNECT] Scala client reduce agg cannot handle null 
partitions for scala primitive inputs

### What changes were proposed in this pull request?
Scala client fails with NPE when running the following reduce agg:
```
spark.range(0, 5, 1, 10).as[Long].reduce(_ + _) == 10
```
The reason is because the `range` will produce null partitions and the 
Reduce encoder will not be able to set the default value correctly for 
partitions that contains Scala primitives. In the example, we expect 0 but 
receive null. This causes the codegen wrongly assumes the input is nullable and 
generates wrong code.

### Why are the changes needed?
Bug fix

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Unit and Scala Client E2E tests.

Closes #41264 from zhenlineo/fix-agg-null.

Authored-by: Zhen Li 
Signed-off-by: yangjie01 
---
 .../spark/sql/UserDefinedFunctionE2ETestSuite.scala  | 20 
 .../spark/sql/expressions/ReduceAggregator.scala | 13 -
 .../sql/expressions/ReduceAggregatorSuite.scala  | 10 --
 3 files changed, 36 insertions(+), 7 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala
index b5bbee67803..ca1bcf3fe67 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/UserDefinedFunctionE2ETestSuite.scala
@@ -198,18 +198,30 @@ class UserDefinedFunctionE2ETestSuite extends 
RemoteSparkSession {
 assert(sum.get() == 0) // The value is not 45
   }
 
-  test("Dataset reduce") {
+  test("Dataset reduce without null partition inputs") {
 val session: SparkSession = spark
 import session.implicits._
-assert(spark.range(10).map(_ + 1).reduce(_ + _) == 55)
+assert(spark.range(0, 10, 1, 5).map(_ + 1).reduce(_ + _) == 55)
   }
 
-  test("Dataset reduce - java") {
+  test("Dataset reduce with null partition inputs") {
+val session: SparkSession = spark
+import session.implicits._
+assert(spark.range(0, 10, 1, 16).map(_ + 1).reduce(_ + _) == 55)
+  }
+
+  test("Dataset reduce with null partition inputs - java to scala long type") {
+val session: SparkSession = spark
+import session.implicits._
+assert(spark.range(0, 5, 1, 10).as[Long].reduce(_ + _) == 10)
+  }
+
+  test("Dataset reduce with null partition inputs - java") {
 val session: SparkSession = spark
 import session.implicits._
 assert(
   spark
-.range(10)
+.range(0, 10, 1, 16)
 .map(_ + 1)
 .reduce(new ReduceFunction[Long] {
   override def call(v1: Long, v2: Long): Long = v1 + v2
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala
index 41306cd0a99..e897fdfe008 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/expressions/ReduceAggregator.scala
@@ -32,7 +32,18 @@ private[sql] class ReduceAggregator[T: Encoder](func: (T, T) 
=> T)
 
   @transient private val encoder = implicitly[Encoder[T]]
 
-  override def zero: (Boolean, T) = (false, null.asInstanceOf[T])
+  private val _zero = encoder.clsTag.runtimeClass match {
+case java.lang.Boolean.TYPE => false
+case java.lang.Byte.TYPE => 0.toByte
+case java.lang.Short.TYPE => 0.toShort
+case java.lang.Integer.TYPE => 0
+case java.lang.Long.TYPE => 0L
+case java.lang.Float.TYPE => 0f
+case java.lang.Double.TYPE => 0d
+case _ => null
+  }
+
+  override def zero: (Boolean, T) = (false, _zero.asInstanceOf[T])
 
   override def bufferEncoder: Encoder[(Boolean, T)] =
 ExpressionEncoder.tuple(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ReduceAggregatorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/expressions/ReduceAggregatorSuite.scala
index f65dcdf119c..c1071373287 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/expressions/ReduceAggregatorSuite.scala
+++