[spark] branch master updated (87ccfc24023 -> 09a43531d30)

2023-04-19 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 87ccfc24023 [SPARK-43196][YARN] Replace reflection w/ direct calling 
for `ContainerLaunchContext#setTokensConf`
 add 09a43531d30 Revert "[SPARK-42945][CONNECT] Support 
PYSPARK_JVM_STACKTRACE_ENABLED in Spark Connect"

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/sql/connect/config/Connect.scala  |  10 --
 .../sql/connect/service/SparkConnectService.scala  | 106 ++---
 python/pyspark/errors/exceptions/connect.py|   4 -
 .../sql/tests/connect/test_connect_basic.py|  61 ++--
 python/pyspark/testing/connectutils.py |   6 +-
 5 files changed, 39 insertions(+), 148 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43196][YARN] Replace reflection w/ direct calling for `ContainerLaunchContext#setTokensConf`

2023-04-19 Thread sunchao
This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 87ccfc24023 [SPARK-43196][YARN] Replace reflection w/ direct calling 
for `ContainerLaunchContext#setTokensConf`
87ccfc24023 is described below

commit 87ccfc2402394d168a90c8df494482c7a1d6a552
Author: yangjie01 
AuthorDate: Wed Apr 19 21:09:18 2023 -0700

[SPARK-43196][YARN] Replace reflection w/ direct calling for 
`ContainerLaunchContext#setTokensConf`

### What changes were proposed in this pull request?
This pr replace reflection with direct calling for 
`ContainerLaunchContext#setTokensConf` in 
`o.a.s.deploy.yarn.Client#setTokenConf` function.

### Why are the changes needed?
SPARK-37205 uses reflection to call `ContainerLaunchContext#setTokensConf` 
for compatibility with Hadoop 2.7, since SPARK-42452 removed support for 
Hadoop2, we can call it directly instead of using reflection.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GA.

Closes #40855 from LuciferYang/SPARK-43196.

Authored-by: yangjie01 
Signed-off-by: Chao Sun 
---
 .../main/scala/org/apache/spark/deploy/yarn/Client.scala  | 15 +--
 1 file changed, 1 insertion(+), 14 deletions(-)

diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 06d8b4e2509..7010067e1ae 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -77,10 +77,6 @@ private[spark] class Client(
 
   private val isClusterMode = sparkConf.get(SUBMIT_DEPLOY_MODE) == "cluster"
 
-  // ContainerLaunchContext.setTokensConf is only available in Hadoop 2.9+ and 
3.x, so here we use
-  // reflection to avoid compilation for Hadoop 2.7 profile.
-  private val SET_TOKENS_CONF_METHOD = "setTokensConf"
-
   private val isClientUnmanagedAMEnabled = sparkConf.get(YARN_UNMANAGED_AM) && 
!isClusterMode
   private var appMaster: ApplicationMaster = _
   private var stagingDirPath: Path = _
@@ -382,16 +378,7 @@ private[spark] class Client(
   }
   copy.write(dob);
 
-  // since this method was added in Hadoop 2.9 and 3.0, we use reflection 
here to avoid
-  // compilation error for Hadoop 2.7 profile.
-  val setTokensConfMethod = try {
-amContainer.getClass.getMethod(SET_TOKENS_CONF_METHOD, 
classOf[ByteBuffer])
-  } catch {
-case _: NoSuchMethodException =>
-  throw new SparkException(s"Cannot find setTokensConf method in 
${amContainer.getClass}." +
-  s" Please check YARN version and make sure it is 2.9+ or 3.x")
-  }
-  setTokensConfMethod.invoke(amContainer, ByteBuffer.wrap(dob.getData))
+  amContainer.setTokensConf(ByteBuffer.wrap(dob.getData))
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (78407a73aa8 -> 34d1c22980f)

2023-04-19 Thread sunchao
This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 78407a73aa8 [SPARK-43200][DOCS] Remove Hadoop 2 reference in docs
 add 34d1c22980f [SPARK-43191][CORE] Replace reflection w/ direct calling 
for Hadoop CallerContext

No new revisions were added by this update.

Summary of changes:
 .../main/scala/org/apache/spark/util/Utils.scala   | 36 --
 .../scala/org/apache/spark/util/UtilsSuite.scala   |  6 ++--
 2 files changed, 8 insertions(+), 34 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43200][DOCS] Remove Hadoop 2 reference in docs

2023-04-19 Thread sunchao
This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 78407a73aa8 [SPARK-43200][DOCS] Remove Hadoop 2 reference in docs
78407a73aa8 is described below

commit 78407a73aa85f147bf2cba2f4c94d19afcd0877d
Author: Cheng Pan 
AuthorDate: Wed Apr 19 21:07:43 2023 -0700

[SPARK-43200][DOCS] Remove Hadoop 2 reference in docs

### What changes were proposed in this pull request?

Remove Hadoop 2 reference in docs.

### Why are the changes needed?

SPARK-42452 removed support for Hadoop 2.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually review

Closes #40857 from pan3793/SPARK-43200.

Authored-by: Cheng Pan 
Signed-off-by: Chao Sun 
---
 docs/cluster-overview.md | 2 +-
 docs/sql-data-sources-orc.md | 3 +--
 2 files changed, 2 insertions(+), 3 deletions(-)

diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md
index 711ec6e697e..7da06a85208 100644
--- a/docs/cluster-overview.md
+++ b/docs/cluster-overview.md
@@ -66,7 +66,7 @@ The system currently supports several cluster managers:
   easy to set up a cluster.
 * [Apache Mesos](running-on-mesos.html) -- a general cluster manager that can 
also run Hadoop MapReduce
   and service applications. (Deprecated)
-* [Hadoop YARN](running-on-yarn.html) -- the resource manager in Hadoop 2 and 
3.
+* [Hadoop YARN](running-on-yarn.html) -- the resource manager in Hadoop 3.
 * [Kubernetes](running-on-kubernetes.html) -- an open-source system for 
automating deployment, scaling,
   and management of containerized applications.
 
diff --git a/docs/sql-data-sources-orc.md b/docs/sql-data-sources-orc.md
index b90f1e07178..4e492598f59 100644
--- a/docs/sql-data-sources-orc.md
+++ b/docs/sql-data-sources-orc.md
@@ -56,8 +56,7 @@ turned it off by default . You may enable it by
 
 ### Zstandard
 
-Spark supports both Hadoop 2 and 3. Since Spark 3.2, you can take advantage
-of Zstandard compression in ORC files on both Hadoop versions.
+Since Spark 3.2, you can take advantage of Zstandard compression in ORC files.
 Please see [Zstandard](https://facebook.github.io/zstd/) for the benefits.
 
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43195][CORE] Remove unnecessary serializable wrapper in HadoopFSUtils

2023-04-19 Thread sunchao
This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 29b40f6fd0c [SPARK-43195][CORE] Remove unnecessary serializable 
wrapper in HadoopFSUtils
29b40f6fd0c is described below

commit 29b40f6fd0c42e594e24b12c8b828913139e8f54
Author: Cheng Pan 
AuthorDate: Wed Apr 19 21:02:46 2023 -0700

[SPARK-43195][CORE] Remove unnecessary serializable wrapper in HadoopFSUtils

### What changes were proposed in this pull request?

Remove unnecessary serializable wrapper in `HadoopFSUtils`.

### Why are the changes needed?

`Path`, `FileStatus` become serializable in Hadoop3, since SPARK-42452 
removed support for Hadoop2, we can remove those wrapper now.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass CI.

Closes #40854 from pan3793/SPARK-43195.

Authored-by: Cheng Pan 
Signed-off-by: Chao Sun 
---
 .../apache/spark/serializer/KryoSerializer.scala   |  4 --
 .../org/apache/spark/util/HadoopFSUtils.scala  | 73 ++
 2 files changed, 5 insertions(+), 72 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala 
b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index d79f6453bc5..1736088b498 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -510,10 +510,6 @@ private[serializer] object KryoSerializer {
   // SQL / ML / MLlib classes once and then re-use that filtered list in 
newInstance() calls.
   private lazy val loadableSparkClasses: Seq[Class[_]] = {
 Seq(
-  "org.apache.spark.util.HadoopFSUtils$SerializableBlockLocation",
-  "[Lorg.apache.spark.util.HadoopFSUtils$SerializableBlockLocation;",
-  "org.apache.spark.util.HadoopFSUtils$SerializableFileStatus",
-
   "org.apache.spark.sql.catalyst.expressions.BoundReference",
   "org.apache.spark.sql.catalyst.expressions.SortOrder",
   "[Lorg.apache.spark.sql.catalyst.expressions.SortOrder;",
diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala 
b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
index 01dc3ba68cc..bb03cac8f1f 100644
--- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
@@ -102,14 +102,13 @@ private[spark] object HadoopFSUtils extends Logging {
 HiveCatalogMetrics.incrementParallelListingJobCount(1)
 
 val serializableConfiguration = new SerializableConfiguration(hadoopConf)
-val serializedPaths = paths.map(_.toString)
 
 // Set the number of parallelism to prevent following file listing from 
generating many tasks
 // in case of large #defaultParallelism.
 val numParallelism = Math.min(paths.size, parallelismMax)
 
 val previousJobDescription = 
sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
-val statusMap = try {
+try {
   val description = paths.size match {
 case 0 =>
   "Listing leaf files and directories 0 paths"
@@ -119,11 +118,10 @@ private[spark] object HadoopFSUtils extends Logging {
   s"Listing leaf files and directories for $s paths:${paths(0)}, 
..."
   }
   sc.setJobDescription(description)
-  sc
-.parallelize(serializedPaths, numParallelism)
-.mapPartitions { pathStrings =>
+  sc.parallelize(paths, numParallelism)
+.mapPartitions { pathsEachPartition =>
   val hadoopConf = serializableConfiguration.value
-  pathStrings.map(new Path(_)).toSeq.map { path =>
+  pathsEachPartition.map { path =>
 val leafFiles = listLeafFiles(
   path = path,
   hadoopConf = hadoopConf,
@@ -135,54 +133,11 @@ private[spark] object HadoopFSUtils extends Logging {
   parallelismThreshold = Int.MaxValue,
   parallelismMax = 0)
 (path, leafFiles)
-  }.iterator
-}.map { case (path, statuses) =>
-val serializableStatuses = statuses.map { status =>
-  // Turn FileStatus into SerializableFileStatus so we can send it 
back to the driver
-  val blockLocations = status match {
-case f: LocatedFileStatus =>
-  f.getBlockLocations.map { loc =>
-SerializableBlockLocation(
-  loc.getNames,
-  loc.getHosts,
-  loc.getOffset,
-  loc.getLength)
-  }
-
-case _ =>
-  Array.empty[SerializableBlockLocation]
-  }
-
-  SerializableFileStatus(
-status.getPath.toS

[spark] branch master updated: [SPARK-43129] Scala core API for streaming Spark Connect

2023-04-19 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3814d15bd19 [SPARK-43129] Scala core API for streaming Spark Connect
3814d15bd19 is described below

commit 3814d15bd190bdae69c5c22b461e3f6145cca1f7
Author: Raghu Angadi 
AuthorDate: Thu Apr 20 08:33:36 2023 +0900

[SPARK-43129] Scala core API for streaming Spark Connect

### What changes were proposed in this pull request?
Implements core streaming API in Scala for running streaming queries over 
Spark Connect.
This is functionally equivalent to Python side PR #40586

There are no server side changes here since it was done earlier in Python 
PR.

We can run most streaming queries.
Notably, queries using `foreachBatch()` are not yet supported.

### Why are the changes needed?
This adds structured streaming support in Scala for Spark connect.

### Does this PR introduce _any_ user-facing change?

Adds more streaming API to Scala Spark Connect client.

### How was this patch tested?

  - Unit test
  - Manual testing

Closes #40783 from rangadi/scala-m1.

Authored-by: Raghu Angadi 
Signed-off-by: Hyukjin Kwon 
---
 .../org/apache/spark/sql/streaming/Trigger.java| 180 ++
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  42 +++-
 .../scala/org/apache/spark/sql/SparkSession.scala  |  17 +-
 .../spark/sql/execution/streaming/Triggers.scala   | 115 +
 .../spark/sql/streaming/DataStreamReader.scala | 273 +
 .../spark/sql/streaming/DataStreamWriter.scala | 266 
 .../spark/sql/streaming/StreamingQuery.scala   | 245 ++
 .../spark/sql/streaming/StreamingQueryStatus.scala |  72 ++
 .../org/apache/spark/sql/streaming/progress.scala  |  22 ++
 .../CheckConnectJvmClientCompatibility.scala   |  36 ++-
 .../spark/sql/streaming/StreamingQuerySuite.scala  |  81 ++
 dev/checkstyle-suppressions.xml|   2 +
 12 files changed, 1345 insertions(+), 6 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java
 
b/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java
new file mode 100644
index 000..27ffe67d990
--- /dev/null
+++ 
b/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming;
+
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.Duration;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.execution.streaming.AvailableNowTrigger$;
+import org.apache.spark.sql.execution.streaming.ContinuousTrigger;
+import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;
+import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
+
+/**
+ * Policy used to indicate how often results should be produced by a 
[[StreamingQuery]].
+ *
+ * @since 3.5.0
+ */
+@Evolving
+public class Trigger {
+  // This is a copy of the same class in sql/core/.../streaming/Trigger.java
+
+  /**
+   * A trigger policy that runs a query periodically based on an interval in 
processing time.
+   * If `interval` is 0, the query will run as fast as possible.
+   *
+   * @since 3.5.0
+   */
+  public static Trigger ProcessingTime(long intervalMs) {
+  return ProcessingTimeTrigger.create(intervalMs, TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * (Java-friendly)
+   * A trigger policy that runs a query periodically based on an interval in 
processing time.
+   * If `interval` is 0, the query will run as fast as possible.
+   *
+   * {{{
+   *import java.util.concurrent.TimeUnit
+   *df.writeStream().trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
+   * }}}
+   *
+   * @since 3.5.0
+   */
+  public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) {
+  return ProcessingTimeTrigger.create(interval, timeUnit);

[spark] branch master updated (c291564c7d4 -> b9400c7807a)

2023-04-19 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from c291564c7d4 [SPARK-42437][CONNECT][PYTHON][FOLLOW-UP] Storage level 
proto converters
 add b9400c7807a [SPARK-42945][CONNECT] Support 
PYSPARK_JVM_STACKTRACE_ENABLED in Spark Connect

No new revisions were added by this update.

Summary of changes:
 .../apache/spark/sql/connect/config/Connect.scala  |  10 ++
 .../sql/connect/service/SparkConnectService.scala  | 106 +++--
 python/pyspark/errors/exceptions/connect.py|   4 +
 .../sql/tests/connect/test_connect_basic.py|  61 ++--
 python/pyspark/testing/connectutils.py |   6 +-
 5 files changed, 148 insertions(+), 39 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-42437][CONNECT][PYTHON][FOLLOW-UP] Storage level proto converters

2023-04-19 Thread ueshin
This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c291564c7d4 [SPARK-42437][CONNECT][PYTHON][FOLLOW-UP] Storage level 
proto converters
c291564c7d4 is described below

commit c291564c7d493a9da5e2315d6bab28796dfce7ce
Author: khalidmammadov 
AuthorDate: Wed Apr 19 13:30:53 2023 -0700

[SPARK-42437][CONNECT][PYTHON][FOLLOW-UP] Storage level proto converters

### What changes were proposed in this pull request?
Converters between Proto and StorageLevel to avoid code duplication
It's follow up from https://github.com/apache/spark/pull/40015

### Why are the changes needed?
Code deduplication

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing tests

Closes #40859 from khalidmammadov/storage_level_converter.

Authored-by: khalidmammadov 
Signed-off-by: Takuya UESHIN 
---
 python/pyspark/sql/connect/client.py | 19 +++
 python/pyspark/sql/connect/conversion.py | 24 
 python/pyspark/sql/connect/plan.py   | 11 ++-
 3 files changed, 29 insertions(+), 25 deletions(-)

diff --git a/python/pyspark/sql/connect/client.py 
b/python/pyspark/sql/connect/client.py
index 780c5702477..60f3f1ac2ba 100644
--- a/python/pyspark/sql/connect/client.py
+++ b/python/pyspark/sql/connect/client.py
@@ -60,6 +60,7 @@ from google.protobuf import text_format
 from google.rpc import error_details_pb2
 
 from pyspark.resource.information import ResourceInformation
+from pyspark.sql.connect.conversion import storage_level_to_proto, 
proto_to_storage_level
 import pyspark.sql.connect.proto as pb2
 import pyspark.sql.connect.proto.base_pb2_grpc as grpc_lib
 import pyspark.sql.connect.types as types
@@ -469,13 +470,7 @@ class AnalyzeResult:
 elif pb.HasField("unpersist"):
 pass
 elif pb.HasField("get_storage_level"):
-storage_level = StorageLevel(
-useDisk=pb.get_storage_level.storage_level.use_disk,
-useMemory=pb.get_storage_level.storage_level.use_memory,
-useOffHeap=pb.get_storage_level.storage_level.use_off_heap,
-deserialized=pb.get_storage_level.storage_level.deserialized,
-replication=pb.get_storage_level.storage_level.replication,
-)
+storage_level = 
proto_to_storage_level(pb.get_storage_level.storage_level)
 else:
 raise SparkConnectException("No analyze result found!")
 
@@ -877,15 +872,7 @@ class SparkConnectClient(object):
 req.persist.relation.CopyFrom(cast(pb2.Relation, 
kwargs.get("relation")))
 if kwargs.get("storage_level", None) is not None:
 storage_level = cast(StorageLevel, kwargs.get("storage_level"))
-req.persist.storage_level.CopyFrom(
-pb2.StorageLevel(
-use_disk=storage_level.useDisk,
-use_memory=storage_level.useMemory,
-use_off_heap=storage_level.useOffHeap,
-deserialized=storage_level.deserialized,
-replication=storage_level.replication,
-)
-)
+
req.persist.storage_level.CopyFrom(storage_level_to_proto(storage_level))
 elif method == "unpersist":
 req.unpersist.relation.CopyFrom(cast(pb2.Relation, 
kwargs.get("relation")))
 if kwargs.get("blocking", None) is not None:
diff --git a/python/pyspark/sql/connect/conversion.py 
b/python/pyspark/sql/connect/conversion.py
index 5a31d1df67e..a6fe0c00e09 100644
--- a/python/pyspark/sql/connect/conversion.py
+++ b/python/pyspark/sql/connect/conversion.py
@@ -43,7 +43,9 @@ from pyspark.sql.types import (
 cast,
 )
 
+from pyspark.storagelevel import StorageLevel
 from pyspark.sql.connect.types import to_arrow_schema
+import pyspark.sql.connect.proto as pb2
 
 from typing import (
 Any,
@@ -486,3 +488,25 @@ class ArrowTableToRowsConversion:
 values = [field_converters[j](columnar_data[j][i]) for j in 
range(table.num_columns)]
 rows.append(_create_row(fields=schema.fieldNames(), values=values))
 return rows
+
+
+def storage_level_to_proto(storage_level: StorageLevel) -> pb2.StorageLevel:
+assert storage_level is not None and isinstance(storage_level, 
StorageLevel)
+return pb2.StorageLevel(
+use_disk=storage_level.useDisk,
+use_memory=storage_level.useMemory,
+use_off_heap=storage_level.useOffHeap,
+deserialized=storage_level.deserialized,
+replication=storage_level.replication,
+)
+
+
+def proto_to_storage_level(storage_level: pb2.StorageLevel) -> StorageLevel:
+assert storage_le

[spark-website] branch asf-site updated: 🚀 Including ApacheSparkMéxicoCity Meetup on community page 🚀

2023-04-19 Thread srowen
This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/spark-website.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new b2a33a5de6 🚀 Including ApacheSparkMéxicoCity Meetup on community page 🚀
b2a33a5de6 is described below

commit b2a33a5de6a1043ee93cd60722835131af81b0c5
Author: Juan Diaz 
AuthorDate: Wed Apr 19 14:31:02 2023 -0500

🚀 Including ApacheSparkMéxicoCity Meetup on community page 🚀

🚀 Including ApacheSparkMéxicoCity Meetup on community page 🚀

`While browsing the site, I find out that the site is missing Apache Spark 
México City. 
[https://www.meetup.com/es/apache-spark-mexicocity/](https://www.meetup.com/es/apache-spark-mexicocity/)

I And would like to include the community on the following web page 
[https://spark.apache.org/community.html](https://spark.apache.org/community.html)

I change the .md and the .html community files. I hope this helps.

Author: Juan Diaz juanchodishotmail.com`

Author: Juan Diaz 

Closes #459 from JuanPabloDiaz/asf-site.
---
 community.md| 5 -
 site/community.html | 5 -
 2 files changed, 8 insertions(+), 2 deletions(-)

diff --git a/community.md b/community.md
index 6c04ee5f83..e7fdec7c74 100644
--- a/community.md
+++ b/community.md
@@ -166,11 +166,14 @@ Spark Meetups are grass-roots events organized and hosted 
by individuals in the
 https://www.meetup.com/Apache-Spark-Maryland/";>Maryland Spark 
Meetup
   
   
-https://www.meetup.com/Mumbai-Spark-Meetup/";>Mumbai Spark 
Meetup
+https://www.meetup.com/es/apache-spark-mexicocity/";>México City 
Spark Meetup
   
   
 https://www.meetup.com/Apache-Spark-in-Moscow/";>Moscow Spark 
Meetup
   
+  
+https://www.meetup.com/Mumbai-Spark-Meetup/";>Mumbai Spark 
Meetup
+  
   
 https://www.meetup.com/Spark-NYC/";>NYC Spark Meetup
   
diff --git a/site/community.html b/site/community.html
index e712a48435..127822931d 100644
--- a/site/community.html
+++ b/site/community.html
@@ -294,11 +294,14 @@ vulnerabilities, and for information on known security 
issues.
 https://www.meetup.com/Apache-Spark-Maryland/";>Maryland Spark 
Meetup
   
   
-https://www.meetup.com/Mumbai-Spark-Meetup/";>Mumbai Spark 
Meetup
+https://www.meetup.com/es/apache-spark-mexicocity/";>México City 
Spark Meetup
   
   
 https://www.meetup.com/Apache-Spark-in-Moscow/";>Moscow Spark 
Meetup
   
+  
+https://www.meetup.com/Mumbai-Spark-Meetup/";>Mumbai Spark 
Meetup
+  
   
 https://www.meetup.com/Spark-NYC/";>NYC Spark Meetup
   


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[GitHub] [spark-website] srowen closed pull request #459: 🚀 Including ApacheSparkMéxicoCity Meetup on community page 🚀

2023-04-19 Thread via GitHub


srowen closed pull request #459: 🚀 Including ApacheSparkMéxicoCity Meetup on 
community page 🚀
URL: https://github.com/apache/spark-website/pull/459


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[GitHub] [spark-website] JuanPabloDiaz opened a new pull request, #459: 🚀 Including ApacheSparkMéxicoCity Meetup on community page 🚀

2023-04-19 Thread via GitHub


JuanPabloDiaz opened a new pull request, #459:
URL: https://github.com/apache/spark-website/pull/459

   🚀 Including ApacheSparkMéxicoCity Meetup on community page 🚀
   
   `While browsing the site, I find out that the site is missing Apache Spark 
México City. 
[https://www.meetup.com/es/apache-spark-mexicocity/](https://www.meetup.com/es/apache-spark-mexicocity/)
   
   I And would like to include the community on the following web page 
[https://spark.apache.org/community.html](https://spark.apache.org/community.html)
   
   I change the .md and the .html community files. I hope this helps.
   
   Author: Juan Diaz juancho...@hotmail.com`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[GitHub] [spark-website] srowen commented on pull request #452: ApacheSparkMéxicoCity Meetup added

2023-04-19 Thread via GitHub


srowen commented on PR #452:
URL: https://github.com/apache/spark-website/pull/452#issuecomment-1515227663

   Just make the edit by hand in both files then, and push a commit to reopen.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[GitHub] [spark-website] JuanPabloDiaz commented on pull request #452: ApacheSparkMéxicoCity Meetup added

2023-04-19 Thread via GitHub


JuanPabloDiaz commented on PR #452:
URL: https://github.com/apache/spark-website/pull/452#issuecomment-1515225372

   Sorry, I tried my best but nothing works. I am getting multiple errors when 
I tried to download or link the repo with my local. I forked the repo and tried 
multiple ways too but I could not figure it out. I am unable to edit both files 
in local and then do a pull request for you to see it. 
   
   I installed Jekyll and was able to see the site in local but again, I cant 
link my changes to github.
   Is there another way I could tried? I really want to contribute to the 
project.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43187][TEST] Remove workaround for MiniKdc's BindException

2023-04-19 Thread sunchao
This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 77b72fc5535 [SPARK-43187][TEST] Remove workaround for MiniKdc's 
BindException
77b72fc5535 is described below

commit 77b72fc553569008d1000e628cdd26e07d325b72
Author: Cheng Pan 
AuthorDate: Wed Apr 19 10:26:40 2023 -0700

[SPARK-43187][TEST] Remove workaround for MiniKdc's BindException

### What changes were proposed in this pull request?

This PR basically reverts the SPARK-31631, which was aimed to address 
[HADOOP-12656](https://issues.apache.org/jira/browse/HADOOP-12656)

### Why are the changes needed?

Since [HADOOP-12656](https://issues.apache.org/jira/browse/HADOOP-12656) 
got fixed in Hadoop 2.8.0/3.0.0, and SPARK-42452 removed support for Hadoop2, 
we can remove this workaround now.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass GA.

Closes #40849 from pan3793/SPARK-43187.

Authored-by: Cheng Pan 
Signed-off-by: Chao Sun 
---
 .../apache/spark/sql/kafka010/KafkaTestUtils.scala | 27 ++-
 .../HadoopDelegationTokenManagerSuite.scala| 30 ++
 2 files changed, 4 insertions(+), 53 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index 7c9c40883a5..4e11a66bc2e 100644
--- 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -26,7 +26,6 @@ import javax.security.auth.login.Configuration
 
 import scala.collection.JavaConverters._
 import scala.io.Source
-import scala.util.control.NonFatal
 
 import com.google.common.io.Files
 import kafka.api.Request
@@ -139,30 +138,8 @@ class KafkaTestUtils(
 val kdcDir = Utils.createTempDir()
 val kdcConf = MiniKdc.createConf()
 kdcConf.setProperty(MiniKdc.DEBUG, "true")
-// The port for MiniKdc service gets selected in the constructor, but will 
be bound
-// to it later in MiniKdc.start() -> MiniKdc.initKDCServer() -> 
KdcServer.start().
-// In meantime, when some other service might capture the port during this 
progress, and
-// cause BindException.
-// This makes our tests which have dedicated JVMs and rely on MiniKDC 
being flaky
-//
-// https://issues.apache.org/jira/browse/HADOOP-12656 get fixed in Hadoop 
2.8.0.
-//
-// The workaround here is to periodically repeat this process with a 
timeout , since we are
-// using Hadoop 2.7.4 as default.
-// https://issues.apache.org/jira/browse/SPARK-31631
-eventually(timeout(60.seconds), interval(1.second)) {
-  try {
-kdc = new MiniKdc(kdcConf, kdcDir)
-kdc.start()
-  } catch {
-case NonFatal(e) =>
-  if (kdc != null) {
-kdc.stop()
-kdc = null
-  }
-  throw e
-  }
-}
+kdc = new MiniKdc(kdcConf, kdcDir)
+kdc.start()
 // TODO https://issues.apache.org/jira/browse/SPARK-30037
 // Need to build spark's own MiniKDC and customize krb5.conf like Kafka
 rewriteKrb5Conf()
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
index d9d559509f4..275bca34598 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/security/HadoopDelegationTokenManagerSuite.scala
@@ -19,14 +19,10 @@ package org.apache.spark.deploy.security
 
 import java.security.PrivilegedExceptionAction
 
-import scala.util.control.NonFatal
-
 import org.apache.hadoop.conf.Configuration
 import 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION
 import org.apache.hadoop.minikdc.MiniKdc
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -92,30 +88,8 @@ class HadoopDelegationTokenManagerSuite extends 
SparkFunSuite {
   // krb5.conf. MiniKdc sets "java.security.krb5.conf" in start and 
removes it when stop called.
   val kdcDir = Utils.createTempDir()
   val kdcConf = MiniKdc.createConf()
-  // The port for MiniKdc service gets selected in the constructor, but 
will be bound
-  // to it later in MiniKdc.start() -> MiniKdc.initKDCServer() -> 
KdcServer.start().
-  // In m

[spark] branch master updated: [SPARK-43186][SQL][HIVE] Remove workaround for FileSinkDesc

2023-04-19 Thread sunchao
This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 2097581dfb2 [SPARK-43186][SQL][HIVE] Remove workaround for FileSinkDesc
2097581dfb2 is described below

commit 2097581dfb2baf9b78e978aea2b7342e55923212
Author: Cheng Pan 
AuthorDate: Wed Apr 19 10:25:41 2023 -0700

[SPARK-43186][SQL][HIVE] Remove workaround for FileSinkDesc

### What changes were proposed in this pull request?

Remove `org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc`, which is used 
to address serializable issue of `org.apache.hadoop.hive.ql.plan.FileSinkDesc`

### Why are the changes needed?

[HIVE-6171](https://issues.apache.org/jira/browse/HIVE-6171) changed 
`FileSinkDesc`'s property from `String dirName` to `Path dirName`, but the 
`Path` is not serializable until 
[HADOOP-13519](https://issues.apache.org/jira/browse/HADOOP-13519) (got fixed 
in Hadoop 3.0.0).

Since SPARK-42452 removed support for Hadoop2, we can remove this 
workaround now.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass GA.

Closes #40848 from pan3793/SPARK-43186.

Authored-by: Cheng Pan 
Signed-off-by: Chao Sun 
---
 .../scala/org/apache/spark/sql/hive/HiveShim.scala | 53 --
 .../spark/sql/hive/execution/HiveFileFormat.scala  |  2 +-
 .../hive/execution/InsertIntoHiveDirCommand.scala  |  4 +-
 .../sql/hive/execution/InsertIntoHiveTable.scala   |  4 +-
 4 files changed, 5 insertions(+), 58 deletions(-)

diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
index 6605d297010..0b9c55f6083 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
@@ -20,22 +20,18 @@ package org.apache.spark.sql.hive
 import java.rmi.server.UID
 
 import scala.collection.JavaConverters._
-import scala.language.implicitConversions
 
 import com.google.common.base.Objects
 import org.apache.avro.Schema
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities
 import org.apache.hadoop.hive.ql.exec.UDF
-import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMacro
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils
 import org.apache.hadoop.hive.serde2.avro.{AvroGenericRecordWritable, 
AvroSerdeUtils}
 import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector
 import org.apache.hadoop.io.Writable
 
-import org.apache.spark.internal.Logging
 import org.apache.spark.sql.types.Decimal
 import org.apache.spark.util.Utils
 
@@ -215,53 +211,4 @@ private[hive] object HiveShim {
   }
 }
   }
-
-  /*
-   * Bug introduced in hive-0.13. FileSinkDesc is serializable, but its member 
path is not.
-   * Fix it through wrapper.
-   */
-  implicit def wrapperToFileSinkDesc(w: ShimFileSinkDesc): FileSinkDesc = {
-val f = new FileSinkDesc(new Path(w.dir), w.tableInfo, w.compressed)
-f.setCompressCodec(w.compressCodec)
-f.setCompressType(w.compressType)
-f.setTableInfo(w.tableInfo)
-f.setDestTableId(w.destTableId)
-f
-  }
-
-  /*
-   * Bug introduced in hive-0.13. FileSinkDesc is serializable, but its member 
path is not.
-   * Fix it through wrapper.
-   */
-  private[hive] class ShimFileSinkDesc(
-  var dir: String,
-  var tableInfo: TableDesc,
-  var compressed: Boolean)
-extends Serializable with Logging {
-var compressCodec: String = _
-var compressType: String = _
-var destTableId: Int = _
-
-def setCompressed(compressed: Boolean): Unit = {
-  this.compressed = compressed
-}
-
-def getDirName(): String = dir
-
-def setDestTableId(destTableId: Int): Unit = {
-  this.destTableId = destTableId
-}
-
-def setTableInfo(tableInfo: TableDesc): Unit = {
-  this.tableInfo = tableInfo
-}
-
-def setCompressCodec(intermediateCompressorCodec: String): Unit = {
-  compressCodec = intermediateCompressorCodec
-}
-
-def setCompressType(intermediateCompressType: String): Unit = {
-  compressType = intermediateCompressType
-}
-  }
 }
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index 7dc1fbb433c..29734c4de34 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -22,6 +22,7 @@ import scala.collection.JavaConverte

[spark] branch master updated: [SPARK-43137][SQL] Improve ArrayInsert if the position is foldable and positive

2023-04-19 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8db31aa2392 [SPARK-43137][SQL] Improve ArrayInsert if the position is 
foldable and positive
8db31aa2392 is described below

commit 8db31aa2392a0469767c6dfcdcd7e0e036832313
Author: Jiaan Geng 
AuthorDate: Wed Apr 19 22:12:23 2023 +0800

[SPARK-43137][SQL] Improve ArrayInsert if the position is foldable and 
positive

### What changes were proposed in this pull request?
Currently, Spark supports the `array_insert` and `array_prepend`. Users 
insert an element into the head of array is common operation. Considered, we 
want make array_prepend reuse the implementation of array_insert, but it seems 
a bit performance worse if the position is foldable and positive.
The reason is that always do the check for position is negative or 
positive, and the code is too long. Too long code will lead to JIT failed.

### Why are the changes needed?
Improve ArrayInsert if the position is foldable and positive.

### Does this PR introduce _any_ user-facing change?
'No'.
Just change the inner implementation.

### How was this patch tested?
Exists test cases.

Closes #40833 from beliefer/SPARK-43137_new.

Authored-by: Jiaan Geng 
Signed-off-by: Wenchen Fan 
---
 .../expressions/collectionOperations.scala | 262 +
 1 file changed, 162 insertions(+), 100 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index 2d637a1923e..beed5a6e365 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -4806,6 +4806,17 @@ case class ArrayInsert(srcArrayExpr: Expression, 
posExpr: Expression, itemExpr:
 }
   }
 
+  private lazy val positivePos = if (second.foldable) {
+val pos = second.eval().asInstanceOf[Int]
+if (pos > 0) {
+  Some(pos)
+} else {
+  None
+}
+  } else {
+None
+  }
+
   override def eval(input: InternalRow): Any = {
 val value1 = first.eval(input)
 if (value1 != null) {
@@ -4819,21 +4830,9 @@ case class ArrayInsert(srcArrayExpr: Expression, 
posExpr: Expression, itemExpr:
   }
 
   override def nullSafeEval(arr: Any, pos: Any, item: Any): Any = {
-var posInt = pos.asInstanceOf[Int]
-if (posInt == 0) {
-  throw QueryExecutionErrors.invalidIndexOfZeroError(getContextOrNull())
-}
 val baseArr = arr.asInstanceOf[ArrayData]
-val arrayElementType = dataType.asInstanceOf[ArrayType].elementType
-
-val newPosExtendsArrayLeft = (posInt < 0) && (-posInt > 
baseArr.numElements())
-
-if (newPosExtendsArrayLeft) {
-  // special case- if the new position is negative but larger than the 
current array size
-  // place the new item at start of array, place the current array 
contents at the end
-  // and fill the newly created array elements inbetween with a null
-
-  val newArrayLength = -posInt + 1
+if (positivePos.isDefined) {
+  val newArrayLength = math.max(baseArr.numElements() + 1, positivePos.get)
 
   if (newArrayLength > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
 throw 
QueryExecutionErrors.concatArraysWithElementsExceedLimitError(newArrayLength)
@@ -4841,48 +4840,81 @@ case class ArrayInsert(srcArrayExpr: Expression, 
posExpr: Expression, itemExpr:
 
   val newArray = new Array[Any](newArrayLength)
 
-  baseArr.foreach(arrayElementType, (i, v) => {
-// current position, offset by new item + new null array elements
-val elementPosition = i + 1 + math.abs(posInt + baseArr.numElements())
-newArray(elementPosition) = v
+  val posInt = positivePos.get - 1
+  baseArr.foreach(elementType, (i, v) => {
+if (i >= posInt) {
+  newArray(i + 1) = v
+} else {
+  newArray(i) = v
+}
   })
 
-  newArray(0) = item
+  newArray(posInt) = item
 
-  return new GenericArrayData(newArray)
+  new GenericArrayData(newArray)
 } else {
-  if (posInt < 0) {
-posInt = posInt + baseArr.numElements()
-  } else if (posInt > 0) {
-posInt = posInt - 1
+  var posInt = pos.asInstanceOf[Int]
+  if (posInt == 0) {
+throw QueryExecutionErrors.invalidIndexOfZeroError(getContextOrNull())
   }
 
-  val newArrayLength = math.max(baseArr.numElements() + 1, posInt + 1)
+  val newPosExtendsArrayLeft = (posInt < 0) && (-posInt > 
baseArr.numElements())
 
-  if (newArrayLength > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
-   

[spark] branch branch-3.4 updated: [SPARK-37829][SQL] Dataframe.joinWith outer-join should return a null value for unmatched row

2023-04-19 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 217f1748224 [SPARK-37829][SQL] Dataframe.joinWith outer-join should 
return a null value for unmatched row
217f1748224 is described below

commit 217f1748224b6ade306eb5f0782e0af085378c55
Author: --global 
AuthorDate: Wed Apr 19 22:05:04 2023 +0800

[SPARK-37829][SQL] Dataframe.joinWith outer-join should return a null value 
for unmatched row

### What changes were proposed in this pull request?

When doing an outer join with joinWith on DataFrames, unmatched rows return 
Row objects with null fields instead of a single null value. This is not a 
expected behavior, and it's a regression introduced in [this 
commit](https://github.com/apache/spark/commit/cd92f25be5a221e0d4618925f7bc9dfd3bb8cb59).
This pull request aims to fix the regression, note this is not a full 
rollback of the commit, do not add back "schema" variable.

```
case class ClassData(a: String, b: Int)
val left = Seq(ClassData("a", 1), ClassData("b", 2)).toDF
val right = Seq(ClassData("x", 2), ClassData("y", 3)).toDF

left.joinWith(right, left("b") === right("b"), "left_outer").collect
```

```
Wrong results (current behavior):Array(([a,1],[null,null]), 
([b,2],[x,2]))
Correct results: Array(([a,1],null), ([b,2],[x,2]))
```

### Why are the changes needed?

We need to address the regression mentioned above. It results in unexpected 
behavior changes in the Dataframe joinWith API between versions 2.4.8 and 
3.0.0+. This could potentially cause data correctness issues for users who 
expect the old behavior when using Spark 3.0.0+.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit test (use the same test in previous [closed pull 
request](https://github.com/apache/spark/pull/35140), credit to Clément de Groc)
Run sql-core and sql-catalyst submodules locally with ./build/mvn clean 
package -pl sql/core,sql/catalyst

Closes #40755 from kings129/encoder_bug_fix.

Authored-by: --global 
Signed-off-by: Wenchen Fan 
(cherry picked from commit 74ce620901a958a1ddd76360e2faed7d3a111d4e)
Signed-off-by: Wenchen Fan 
---
 .../sql/catalyst/encoders/ExpressionEncoder.scala  | 19 ++---
 .../scala/org/apache/spark/sql/DatasetSuite.scala  | 45 ++
 2 files changed, 58 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index faa165c298d..8f7583c48fc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -97,22 +97,29 @@ object ExpressionEncoder {
 }
 val newSerializer = CreateStruct(serializers)
 
+def nullSafe(input: Expression, result: Expression): Expression = {
+  If(IsNull(input), Literal.create(null, result.dataType), result)
+}
+
 val newDeserializerInput = GetColumnByOrdinal(0, newSerializer.dataType)
-val deserializers = encoders.zipWithIndex.map { case (enc, index) =>
+val childrenDeserializers = encoders.zipWithIndex.map { case (enc, index) 
=>
   val getColExprs = enc.objDeserializer.collect { case c: 
GetColumnByOrdinal => c }.distinct
   assert(getColExprs.size == 1, "object deserializer should have only one 
" +
 s"`GetColumnByOrdinal`, but there are ${getColExprs.size}")
 
   val input = GetStructField(newDeserializerInput, index)
-  enc.objDeserializer.transformUp {
+  val childDeserializer = enc.objDeserializer.transformUp {
 case GetColumnByOrdinal(0, _) => input
   }
-}
-val newDeserializer = NewInstance(cls, deserializers, ObjectType(cls), 
propagateNull = false)
 
-def nullSafe(input: Expression, result: Expression): Expression = {
-  If(IsNull(input), Literal.create(null, result.dataType), result)
+  if (enc.objSerializer.nullable) {
+nullSafe(input, childDeserializer)
+  } else {
+childDeserializer
+  }
 }
+val newDeserializer =
+  NewInstance(cls, childrenDeserializers, ObjectType(cls), propagateNull = 
false)
 
 new ExpressionEncoder[Any](
   nullSafe(newSerializerInput, newSerializer),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 86e640a4fa8..f8f6845afca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/s

[spark] branch master updated: [SPARK-37829][SQL] Dataframe.joinWith outer-join should return a null value for unmatched row

2023-04-19 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 74ce620901a [SPARK-37829][SQL] Dataframe.joinWith outer-join should 
return a null value for unmatched row
74ce620901a is described below

commit 74ce620901a958a1ddd76360e2faed7d3a111d4e
Author: --global 
AuthorDate: Wed Apr 19 22:05:04 2023 +0800

[SPARK-37829][SQL] Dataframe.joinWith outer-join should return a null value 
for unmatched row

### What changes were proposed in this pull request?

When doing an outer join with joinWith on DataFrames, unmatched rows return 
Row objects with null fields instead of a single null value. This is not a 
expected behavior, and it's a regression introduced in [this 
commit](https://github.com/apache/spark/commit/cd92f25be5a221e0d4618925f7bc9dfd3bb8cb59).
This pull request aims to fix the regression, note this is not a full 
rollback of the commit, do not add back "schema" variable.

```
case class ClassData(a: String, b: Int)
val left = Seq(ClassData("a", 1), ClassData("b", 2)).toDF
val right = Seq(ClassData("x", 2), ClassData("y", 3)).toDF

left.joinWith(right, left("b") === right("b"), "left_outer").collect
```

```
Wrong results (current behavior):Array(([a,1],[null,null]), 
([b,2],[x,2]))
Correct results: Array(([a,1],null), ([b,2],[x,2]))
```

### Why are the changes needed?

We need to address the regression mentioned above. It results in unexpected 
behavior changes in the Dataframe joinWith API between versions 2.4.8 and 
3.0.0+. This could potentially cause data correctness issues for users who 
expect the old behavior when using Spark 3.0.0+.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit test (use the same test in previous [closed pull 
request](https://github.com/apache/spark/pull/35140), credit to Clément de Groc)
Run sql-core and sql-catalyst submodules locally with ./build/mvn clean 
package -pl sql/core,sql/catalyst

Closes #40755 from kings129/encoder_bug_fix.

Authored-by: --global 
Signed-off-by: Wenchen Fan 
---
 .../sql/catalyst/encoders/ExpressionEncoder.scala  | 19 ++---
 .../scala/org/apache/spark/sql/DatasetSuite.scala  | 45 ++
 2 files changed, 58 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index faa165c298d..8f7583c48fc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -97,22 +97,29 @@ object ExpressionEncoder {
 }
 val newSerializer = CreateStruct(serializers)
 
+def nullSafe(input: Expression, result: Expression): Expression = {
+  If(IsNull(input), Literal.create(null, result.dataType), result)
+}
+
 val newDeserializerInput = GetColumnByOrdinal(0, newSerializer.dataType)
-val deserializers = encoders.zipWithIndex.map { case (enc, index) =>
+val childrenDeserializers = encoders.zipWithIndex.map { case (enc, index) 
=>
   val getColExprs = enc.objDeserializer.collect { case c: 
GetColumnByOrdinal => c }.distinct
   assert(getColExprs.size == 1, "object deserializer should have only one 
" +
 s"`GetColumnByOrdinal`, but there are ${getColExprs.size}")
 
   val input = GetStructField(newDeserializerInput, index)
-  enc.objDeserializer.transformUp {
+  val childDeserializer = enc.objDeserializer.transformUp {
 case GetColumnByOrdinal(0, _) => input
   }
-}
-val newDeserializer = NewInstance(cls, deserializers, ObjectType(cls), 
propagateNull = false)
 
-def nullSafe(input: Expression, result: Expression): Expression = {
-  If(IsNull(input), Literal.create(null, result.dataType), result)
+  if (enc.objSerializer.nullable) {
+nullSafe(input, childDeserializer)
+  } else {
+childDeserializer
+  }
 }
+val newDeserializer =
+  NewInstance(cls, childrenDeserializers, ObjectType(cls), propagateNull = 
false)
 
 new ExpressionEncoder[Any](
   nullSafe(newSerializerInput, newSerializer),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 4aca7c8a5a6..75cee407819 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -33,6 +33,7 @@ import org.apache.spark.internal.config.MAX_RESULT_SIZE
 import org

[GitHub] [spark-website] Yikun commented on pull request #458: Update Spark Docker Images publish workflow

2023-04-19 Thread via GitHub


Yikun commented on PR #458:
URL: https://github.com/apache/spark-website/pull/458#issuecomment-1514708629

   I hitted a issue when push the `apache/spark` image using github action, 
seems dockerhub token is invalid, already raise a issue on 
[INFRA-24476](https://issues.apache.org/jira/projects/INFRA/issues/INFRA-24476) 
(seems a similar issue with 
[INFRA-24459](https://issues.apache.org/jira/projects/INFRA/issues/INFRA-24459))


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-43176][CONNECT][PYTHON][TESTS] Deduplicate imports in Connect Tests

2023-04-19 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new cac6f58318b [SPARK-43176][CONNECT][PYTHON][TESTS] Deduplicate imports 
in Connect Tests
cac6f58318b is described below

commit cac6f58318bb84d532f02d245a50d3c66daa3e4b
Author: Ruifeng Zheng 
AuthorDate: Wed Apr 19 19:33:49 2023 +0900

[SPARK-43176][CONNECT][PYTHON][TESTS] Deduplicate imports in Connect Tests

### What changes were proposed in this pull request?
Deduplicate imports in Connect Tests

### Why are the changes needed?
for simplicity

### Does this PR introduce _any_ user-facing change?
No, test-only

### How was this patch tested?
updated unittests

Closes #40839 from zhengruifeng/connect_test_import.

Authored-by: Ruifeng Zheng 
Signed-off-by: Hyukjin Kwon 
---
 .../sql/tests/connect/test_connect_basic.py| 10 ---
 .../sql/tests/connect/test_connect_column.py   | 15 +---
 .../sql/tests/connect/test_connect_function.py | 96 +++---
 3 files changed, 11 insertions(+), 110 deletions(-)

diff --git a/python/pyspark/sql/tests/connect/test_connect_basic.py 
b/python/pyspark/sql/tests/connect/test_connect_basic.py
index 2c1b6342924..9d12eb2b26e 100644
--- a/python/pyspark/sql/tests/connect/test_connect_basic.py
+++ b/python/pyspark/sql/tests/connect/test_connect_basic.py
@@ -466,9 +466,6 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
 )
 
 def test_collect_timestamp(self):
-from pyspark.sql import functions as SF
-from pyspark.sql.connect import functions as CF
-
 query = """
 SELECT * FROM VALUES
 (TIMESTAMP('2022-12-25 10:30:00'), 1),
@@ -652,10 +649,6 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
 
 def test_with_none_and_nan(self):
 # SPARK-41855: make createDataFrame support None and NaN
-
-from pyspark.sql import functions as SF
-from pyspark.sql.connect import functions as CF
-
 # SPARK-41814: test with eqNullSafe
 data1 = [Row(id=1, value=float("NaN")), Row(id=2, value=42.0), 
Row(id=3, value=None)]
 data2 = [Row(id=1, value=np.nan), Row(id=2, value=42.0), Row(id=3, 
value=None)]
@@ -1662,9 +1655,6 @@ class SparkConnectBasicTests(SparkConnectSQLTestCase):
 
 def test_observe(self):
 # SPARK-41527: test DataFrame.observe()
-from pyspark.sql import functions as SF
-from pyspark.sql.connect import functions as CF
-
 observation_name = "my_metric"
 
 self.assert_eq(
diff --git a/python/pyspark/sql/tests/connect/test_connect_column.py 
b/python/pyspark/sql/tests/connect/test_connect_column.py
index 2a22ca6ad8d..5703f8d2a3c 100644
--- a/python/pyspark/sql/tests/connect/test_connect_column.py
+++ b/python/pyspark/sql/tests/connect/test_connect_column.py
@@ -18,7 +18,6 @@
 import decimal
 import datetime
 
-from pyspark.sql import functions as SF
 from pyspark.sql.types import (
 Row,
 StructField,
@@ -48,6 +47,7 @@ from pyspark.sql.tests.connect.test_connect_basic import 
SparkConnectSQLTestCase
 
 if should_test_connect:
 import pandas as pd
+from pyspark.sql import functions as SF
 from pyspark.sql.connect import functions as CF
 from pyspark.sql.connect.column import Column
 from pyspark.sql.connect.expressions import DistributedSequenceID, 
LiteralExpression
@@ -482,9 +482,6 @@ class SparkConnectColumnTests(SparkConnectSQLTestCase):
 cdf = self.connect.range(0, 1)
 sdf = self.spark.range(0, 1)
 
-from pyspark.sql import functions as SF
-from pyspark.sql.connect import functions as CF
-
 cdf1 = cdf.select(
 CF.lit(0),
 CF.lit(1),
@@ -679,9 +676,6 @@ class SparkConnectColumnTests(SparkConnectSQLTestCase):
 
 def test_column_bitwise_ops(self):
 # SPARK-41751: test bitwiseAND, bitwiseOR, bitwiseXOR
-from pyspark.sql import functions as SF
-from pyspark.sql.connect import functions as CF
-
 query = """
 SELECT * FROM VALUES
 (1, 1, 0), (2, NULL, 1), (3, 3, 4)
@@ -718,9 +712,6 @@ class SparkConnectColumnTests(SparkConnectSQLTestCase):
 )
 
 def test_column_accessor(self):
-from pyspark.sql import functions as SF
-from pyspark.sql.connect import functions as CF
-
 query = """
 SELECT STRUCT(a, b, c) AS x, y, z, c FROM VALUES
 (float(1.0), double(1.0), '2022', MAP('b', '123', 'a', 'kk'), 
ARRAY(1, 2, 3)),
@@ -840,10 +831,6 @@ class SparkConnectColumnTests(SparkConnectSQLTestCase):
 
 def test_column_field_ops(self):
 # SPARK-41767: test withField, dropFields
-
-from pyspark.sql import functions as SF
-from pyspark.sql.connect import functions a

[spark] branch master updated: [SPARK-42552][SQL] Correct the two-stage parsing strategy of antlr parser

2023-04-19 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f8604ad14b2 [SPARK-42552][SQL] Correct the two-stage parsing strategy 
of antlr parser
f8604ad14b2 is described below

commit f8604ad14b24e8c657a0305b4fb8ad7efcb84060
Author: Cheng Pan 
AuthorDate: Wed Apr 19 16:37:09 2023 +0800

[SPARK-42552][SQL] Correct the two-stage parsing strategy of antlr parser

### What changes were proposed in this pull request?

This PR follows the 
https://github.com/antlr/antlr4/issues/192#issuecomment-15238595 to correct the 
current implementation of the **two-stage parsing strategy** in 
`AbstractSqlParser`.

### Why are the changes needed?

This should be a long-standing issue, before 
[SPARK-38385](https://issues.apache.org/jira/browse/SPARK-38385), Spark uses 
`DefaultErrorStrategy`, and after 
[SPARK-38385](https://issues.apache.org/jira/browse/SPARK-38385) Spark uses 
class `SparkParserErrorStrategy() extends DefaultErrorStrategy`. It is not a 
correct implementation of the "two-stage parsing strategy"

As mentioned in 
https://github.com/antlr/antlr4/issues/192#issuecomment-15238595

> You can save a great deal of time on correct inputs by using a two-stage 
parsing strategy.
>
> 1. Attempt to parse the input using BailErrorStrategy and 
PredictionMode.SLL.
>If no exception is thrown, you know the answer is correct.
> 2. If a ParseCancellationException is thrown, retry the parse using the 
default
>settings (DefaultErrorStrategy and PredictionMode.LL).

### Does this PR introduce _any_ user-facing change?

Yes, the Spark SQL parser becomes more powerful, SQL like `SELECT 1 UNION 
SELECT 2` parse succeeded after this change.

### How was this patch tested?

New UT is added.

Closes #40835 from pan3793/SPARK-42552.

Authored-by: Cheng Pan 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/parser/ParseDriver.scala|   9 +-
 .../catalyst/parser/SparkParserErrorStrategy.scala |  44 ++
 .../spark/sql/catalyst/parser/DDLParserSuite.scala |   2 +-
 .../catalyst/parser/ExpressionParserSuite.scala|   2 +-
 .../sql/catalyst/parser/PlanParserSuite.scala  |   6 +
 .../catalyst/parser/TableSchemaParserSuite.scala   |   2 +-
 .../analyzer-results/postgreSQL/union.sql.out  | 175 +++--
 .../sql-tests/results/postgreSQL/union.sql.out | 123 +--
 .../command/AlterTableRenameParserSuite.scala  |   2 +-
 .../execution/command/PlanResolutionSuite.scala|   2 +-
 .../org/apache/spark/sql/jdbc/JDBCWriteSuite.scala |   2 +-
 11 files changed, 193 insertions(+), 176 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala
index 727d35d5c91..1fb23c4a71e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala
@@ -114,25 +114,28 @@ abstract class AbstractSqlParser extends ParserInterface 
with SQLConfHelper with
 parser.addParseListener(UnclosedCommentProcessor(command, tokenStream))
 parser.removeErrorListeners()
 parser.addErrorListener(ParseErrorListener)
-parser.setErrorHandler(new SparkParserErrorStrategy())
 parser.legacy_setops_precedence_enabled = conf.setOpsPrecedenceEnforced
 parser.legacy_exponent_literal_as_decimal_enabled = 
conf.exponentLiteralAsDecimalEnabled
 parser.SQL_standard_keyword_behavior = conf.enforceReservedKeywords
 parser.double_quoted_identifiers = conf.doubleQuotedIdentifiers
 
+// https://github.com/antlr/antlr4/issues/192#issuecomment-15238595
+// Save a great deal of time on correct inputs by using a two-stage 
parsing strategy.
 try {
   try {
-// first, try parsing with potentially faster SLL mode
+// first, try parsing with potentially faster SLL mode w/ 
SparkParserBailErrorStrategy
+parser.setErrorHandler(new SparkParserBailErrorStrategy())
 parser.getInterpreter.setPredictionMode(PredictionMode.SLL)
 toResult(parser)
   }
   catch {
 case e: ParseCancellationException =>
-  // if we fail, parse with LL mode
+  // if we fail, parse with LL mode w/ SparkParserErrorStrategy
   tokenStream.seek(0) // rewind input stream
   parser.reset()
 
   // Try Again.
+  parser.setErrorHandler(new SparkParserErrorStrategy())
   parser.getInterpreter.setPredictionMode(PredictionMode.LL)
   toResult(parser)
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/SparkParserEr