(spark) branch master updated: [SPARK-46208][PS][DOCS] Adding a link for latest Pandas API specifications

2023-12-02 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a3d70195dd5 [SPARK-46208][PS][DOCS] Adding a link for latest Pandas 
API specifications
a3d70195dd5 is described below

commit a3d70195dd520a33a1cab4507d4b792e0f089640
Author: Haejoon Lee 
AuthorDate: Sun Dec 3 15:47:59 2023 +0900

[SPARK-46208][PS][DOCS] Adding a link for latest Pandas API specifications

### What changes were proposed in this pull request?

This PR proposes to adding a link for latest Pandas API specifications.

### Why are the changes needed?

To increase the usability of documents

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

The existing CI should pass

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44115 from itholic/use_exact_pandas_version.

Lead-authored-by: Haejoon Lee 
Co-authored-by: Dongjoon Hyun 
Signed-off-by: Hyukjin Kwon 
---
 python/docs/source/reference/pyspark.pandas/index.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/python/docs/source/reference/pyspark.pandas/index.rst 
b/python/docs/source/reference/pyspark.pandas/index.rst
index 6021be707e5..a7c3bc1dee8 100644
--- a/python/docs/source/reference/pyspark.pandas/index.rst
+++ b/python/docs/source/reference/pyspark.pandas/index.rst
@@ -23,7 +23,7 @@ Pandas API on Spark
 This page gives an overview of all public pandas API on Spark.
 
 .. note::
-   pandas API on Spark follows the API specifications of latest pandas release.
+   pandas API on Spark follows the API specifications of `latest pandas 
release `_.
 
 .. toctree::
:maxdepth: 2


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46206][PS] Use a narrower scope exception for SQL processor

2023-12-02 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 73a09ed7bd7 [SPARK-46206][PS] Use a narrower scope exception for SQL 
processor
73a09ed7bd7 is described below

commit 73a09ed7bd7372779e25d65498c4ab6b8496f0a8
Author: Haejoon Lee 
AuthorDate: Sat Dec 2 21:42:09 2023 -0800

[SPARK-46206][PS] Use a narrower scope exception for SQL processor

### What changes were proposed in this pull request?

This PR proposes to refine the exception handling in SQL processor 
functions by replacing the general `Exception` class with more specific 
exception types.

### Why are the changes needed?

The current exception handling uses the broad `Exception` type, which can 
obscure the root cause of issues. By specifying more accurate exceptions, the 
code becomes clearer:

- In `_get_local_scope()`, an `IndexError` is more appropriate as it 
explicitly handles the case where the index is out of range when accessing the 
call stack using `inspect.stack()`.
- In `_get_ipython_scope()`, `AttributeError` and `ModuleNotFoundError` 
could occur if the IPython environment is not available or the expected 
attributes in the IPython shell object are missing.

Using these specific exceptions enhances the maintainability and 
readability of the code, making it easier for developers to understand and 
handle errors more effectively.

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

The existing test suite `pyspark.pandas.tests.test_sql::SQLTests` should 
pass.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44114 from itholic/refine_sql_error.

Authored-by: Haejoon Lee 
Signed-off-by: Dongjoon Hyun 
---
 python/pyspark/pandas/sql_processor.py | 8 ++--
 1 file changed, 2 insertions(+), 6 deletions(-)

diff --git a/python/pyspark/pandas/sql_processor.py 
b/python/pyspark/pandas/sql_processor.py
index 1bd1cb9823c..b047417b763 100644
--- a/python/pyspark/pandas/sql_processor.py
+++ b/python/pyspark/pandas/sql_processor.py
@@ -206,9 +206,7 @@ def _get_local_scope() -> Dict[str, Any]:
 # Get 2 scopes above (_get_local_scope -> sql -> ...) to capture the vars 
there.
 try:
 return inspect.stack()[_CAPTURE_SCOPES][0].f_locals
-except Exception:
-# TODO (rxin, thunterdb): use a narrower scope exception.
-# See https://github.com/databricks/koalas/pull/448
+except IndexError:
 return {}
 
 
@@ -222,9 +220,7 @@ def _get_ipython_scope() -> Dict[str, Any]:
 
 shell = get_ipython()
 return shell.user_ns
-except Exception:
-# TODO (rxin, thunterdb): use a narrower scope exception.
-# See https://github.com/databricks/koalas/pull/448
+except (AttributeError, ModuleNotFoundError):
 return None
 
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46212][CORE][SQL][SS][CONNECT][MLLIB][GRAPHX][DSTREAM][PROTOBUF][EXAMPLES] Use other functions to simplify the code pattern of `s.c.MapOps#view.mapValues`

2023-12-02 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 938b7f58051 
[SPARK-46212][CORE][SQL][SS][CONNECT][MLLIB][GRAPHX][DSTREAM][PROTOBUF][EXAMPLES]
 Use other functions to simplify the code pattern of `s.c.MapOps#view.mapValues`
938b7f58051 is described below

commit 938b7f580519e3da64004185f7083ae63cf99bc0
Author: yangjie01 
AuthorDate: Sat Dec 2 21:39:14 2023 -0800


[SPARK-46212][CORE][SQL][SS][CONNECT][MLLIB][GRAPHX][DSTREAM][PROTOBUF][EXAMPLES]
 Use other functions to simplify the code pattern of `s.c.MapOps#view.mapValues`

### What changes were proposed in this pull request?
This pr simplifies `s.c.MapOps.view.mapValues` using the following approach:

- For the `s.c.immutable.MapOps` type, replace it with the 
`s.c.immutable.MapOps#transform` function.

```scala
def transform[W](f: (K, V) => W): CC[K, W] = map { case (k, v) => (k, f(k, 
v)) }
```

Like the case in `CountMinSketchSuite`:


https://github.com/apache/spark/blob/0d40b1aea758b95a4416c8653599af8713a4aa16/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala#L59

- For the `s.c.MapOps` type, since the `transform` function does not exist 
for this type, replace it directly with the `map` function.

```scala
def map[K2, V2](f: ((K, V)) => (K2, V2)): CC[K2, V2] = mapFactory.from(new 
View.Map(this, f))
```

Like the case in `KafkaTestUtils`:


https://github.com/apache/spark/blob/0d40b1aea758b95a4416c8653599af8713a4aa16/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala#L381

- For the `s.c.mutable.MapOps` type, the `transform` function has also been 
deprecated. At the same time, the signature of `transform` and its replacement 
function `mapValuesInPlace` is as follows:

```scala

  deprecated("Use mapValuesInPlace instead", "2.13.0")
  inline final def transform(f: (K, V) => V): this.type = 
mapValuesInPlace(f)

  def mapValuesInPlace(f: (K, V) => V): this.type = {...}
```

The target type of the value in the function is `V`, which is different 
from the target type of the value in `s.c.immutable.MapOps#transform`, which is 
`W`. This does not meet the desired requirement. So in this scenario, it can be 
divided into two sub-scenarios for handling:

1. If the `mutable.Map` are using needs to be eventually converted to an 
`immutable.Map`, first convert it to an `immutable.Map` and then use the 
`transform` function for replacement. Like the case in `SparkConnectPlanner`:


https://github.com/apache/spark/blob/0d40b1aea758b95a4416c8653599af8713a4aa16/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L292

2. If the `mutable.Map` are using does not need to be converted to an 
`immutable.Map` in the end, directly use the `map` function from 
`scala.collection.MapOps` for replacement. Like the case in `SparkSession`:


https://github.com/apache/spark/blob/0d40b1aea758b95a4416c8653599af8713a4aa16/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala#L313

In addition, there is a special case in `PythonWorkerFactory`:


https://github.com/apache/spark/blob/0d40b1aea758b95a4416c8653599af8713a4aa16/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L381

For this case, it only needs to `destroy` each `Process` in `values` 
without returning any value. Therefore, it has been rewritten using 
`.values.foreach`.

### Why are the changes needed?
The coding pattern of `s.c.MapOps.view.mapValues` seems verbose, it can be 
simplified using other functions.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44122 from LuciferYang/SPARK-46212.

Authored-by: yangjie01 
Signed-off-by: Dongjoon Hyun 
---
 .../spark/util/sketch/CountMinSketchSuite.scala|  2 +-
 .../org/apache/spark/sql/avro/AvroUtils.scala  |  3 +--
 .../scala/org/apache/spark/sql/SparkSession.scala  |  2 +-
 .../spark/sql/ClientDataFrameStatSuite.scala   |  2 +-
 .../org/apache/spark/sql/connect/dsl/package.scala |  2 +-
 .../sql/connect/planner/SparkConnectPlanner.scala  | 15 ++-
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  |  2 +-
 .../apache/spark/sql/kafka010/KafkaTestUtils.scala |  3 ++-
 .../streaming/kafka010/ConsumerStrategy.scala  |  9 ---
 .../kafka010/DirectKafkaInputDStream.scala |  2 +-
 .../kafka010/DirectKafkaStreamSuite.scala  |  4 +--
 .../spark/streaming

(spark) branch master updated: [SPARK-46069][SQL] Support unwrap timestamp type to date type

2023-12-02 Thread yumwang
This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 0e8e303fb4b [SPARK-46069][SQL] Support unwrap timestamp type to date 
type
0e8e303fb4b is described below

commit 0e8e303fb4b25b2254791abbf900d115232eb966
Author: Kun Wan 
AuthorDate: Sun Dec 3 12:26:41 2023 +0800

[SPARK-46069][SQL] Support unwrap timestamp type to date type

### What changes were proposed in this pull request?

Just like [[SPARK-42597][SQL] Support unwrap date type to timestamp 
type](https://github.com/apache/spark/pull/40190), this PR enhance 
`UnwrapCastInBinaryComparison` to support unwrap timestamp type to date type.

Add two new expressions:
1. floorDate: the largest date that is less than or equal to the input 
timestamp.
2. dateAddOne: floorDate + one day.
3. isStartOfDay: return true if ts == floorDate

The way to unwrap timestamp type to date type are:

1. CAST(date AS timestamp) > ts ===> date > floorDate
2. CAST(date AS timestamp) >= ts ===> if(isStartOfDay) date >= floorDate 
else date >= dateAddOne
3. CAST(date AS timestamp) === ts ===>
if (isStartOfDay) {
  fromExp === floorDate
} else if (!fromExp.nullable) {
  FalseLiteral
} else {
  fromExp === floorDate AND fromExp === dateAddOne
}
4. CAST(date AS timestamp) <=> ts ===> if (isStartOfDay) date <=> floorDate 
else FalseLiteral
5. CAST(date AS timestamp) < ts ===> if(isStartOfDay) date < floorDate else 
date < dateAddOne
6. CAST(date AS timestamp) <= ts ===> date <= floorDate

### Why are the changes needed?

Improve query performance.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Unit test.

Closes #43982 from wankunde/date_ts.

Lead-authored-by: Kun Wan 
Co-authored-by: wankun 
Signed-off-by: Yuming Wang 
---
 .../optimizer/UnwrapCastInBinaryComparison.scala   | 47 
 .../UnwrapCastInBinaryComparisonSuite.scala| 51 +-
 .../sql/UnwrapCastInComparisonEndToEndSuite.scala  | 25 +++
 3 files changed, 113 insertions(+), 10 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparison.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparison.scala
index 54b1dd419fb..34b98e43038 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparison.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/UnwrapCastInBinaryComparison.scala
@@ -138,6 +138,11 @@ object UnwrapCastInBinaryComparison extends 
Rule[LogicalPlan] {
 if AnyTimestampType.acceptsType(fromExp.dataType) && value != null =>
   Some(unwrapDateToTimestamp(be, fromExp, date, timeZoneId, evalMode))
 
+case be @ BinaryComparison(
+  Cast(fromExp, _, timeZoneId, evalMode), ts @ Literal(value, _))
+if AnyTimestampType.acceptsType(ts.dataType) && value != null =>
+  Some(unwrapTimeStampToDate(be, fromExp, ts, timeZoneId, evalMode))
+
 // As the analyzer makes sure that the list of In is already of the same 
data type, then the
 // rule can simply check the first literal in `in.list` can implicitly 
cast to `toType` or not,
 // and note that:
@@ -329,6 +334,48 @@ object UnwrapCastInBinaryComparison extends 
Rule[LogicalPlan] {
 }
   }
 
+  private def unwrapTimeStampToDate(
+  exp: BinaryComparison,
+  fromExp: Expression,
+  ts: Literal,
+  tz: Option[String],
+  evalMode: EvalMode.Value): Expression = {
+val floorDate = Cast(ts, fromExp.dataType, tz, evalMode)
+val dateAddOne = DateAdd(floorDate, Literal(1, IntegerType))
+val isStartOfDay =
+  EqualTo(ts, Cast(floorDate, ts.dataType, tz, 
evalMode)).eval(EmptyRow).asInstanceOf[Boolean]
+
+exp match {
+  case _: GreaterThan =>
+GreaterThan(fromExp, floorDate)
+  case _: GreaterThanOrEqual =>
+if (isStartOfDay) {
+  GreaterThanOrEqual(fromExp, floorDate)
+} else {
+  GreaterThanOrEqual(fromExp, dateAddOne)
+}
+  case _: EqualTo =>
+if (isStartOfDay) {
+  EqualTo(fromExp, floorDate)
+} else if (!fromExp.nullable) {
+  FalseLiteral
+} else {
+  And(EqualTo(fromExp, floorDate), EqualTo(fromExp, dateAddOne))
+}
+  case _: EqualNullSafe =>
+if (isStartOfDay) EqualNullSafe(fromExp, floorDate) else FalseLiteral
+  case _: LessThan =>
+if (isStartOfDay) {
+  LessThan(fromExp, floorDate)
+} else {
+  LessThan(fromExp, dateAddOne)
+}
+  

(spark) branch master updated: [SPARK-46215][CORE] Improve `FileSystemPersistenceEngine` to allow nonexistent parents

2023-12-02 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d1311f75dea [SPARK-46215][CORE] Improve `FileSystemPersistenceEngine` 
to allow nonexistent parents
d1311f75dea is described below

commit d1311f75dea21201f2c37b8f92b111c91ebc9aad
Author: Dongjoon Hyun 
AuthorDate: Sat Dec 2 20:24:11 2023 -0800

[SPARK-46215][CORE] Improve `FileSystemPersistenceEngine` to allow 
nonexistent parents

### What changes were proposed in this pull request?

This PR aims to improve `FileSystemPersistenceEngine` to allow non-exist 
parents

### Why are the changes needed?

To prevent the following error,
```
java.io.IOException: No such file or directory
  at java.base/java.io.UnixFileSystem.createFileExclusively(Native Method)
  at java.base/java.io.File.createNewFile(File.java:1043)
  at 
org.apache.spark.deploy.master.FileSystemPersistenceEngine.serializeIntoFile(FileSystemPersistenceEngine.scala:62)
  at 
org.apache.spark.deploy.master.FileSystemPersistenceEngine.persist(FileSystemPersistenceEngine.scala:45)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs with the newly added test case.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44127 from dongjoon-hyun/SPARK-46215.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/deploy/master/FileSystemPersistenceEngine.scala | 5 +++--
 .../org/apache/spark/deploy/master/PersistenceEngineSuite.scala  | 9 +
 2 files changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
 
b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
index 7f624816f25..094136ea274 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.deploy.master
 
 import java.io._
+import java.nio.file.{Files, Paths}
 
 import scala.reflect.ClassTag
 
@@ -31,7 +32,7 @@ import org.apache.spark.util.Utils
  * Stores data in a single on-disk directory with one file per application and 
worker.
  * Files are deleted when applications and workers are removed.
  *
- * @param dir Directory to store files. Created if non-existent (but not 
recursively).
+ * @param dir Directory to store files. Created if non-existent.
  * @param serializer Used to serialize our objects.
  */
 private[master] class FileSystemPersistenceEngine(
@@ -39,7 +40,7 @@ private[master] class FileSystemPersistenceEngine(
 val serializer: Serializer)
   extends PersistenceEngine with Logging {
 
-  new File(dir).mkdir()
+  Files.createDirectories(Paths.get(dir))
 
   override def persist(name: String, obj: Object): Unit = {
 serializeIntoFile(new File(dir + File.separator + name), obj)
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
index 66a61d80d2a..8546f4e01f3 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala
@@ -53,6 +53,15 @@ class PersistenceEngineSuite extends SparkFunSuite {
 }
   }
 
+  test("SPARK-46215: FileSystemPersistenceEngine with a non-existent parent 
dir") {
+withTempDir { dir =>
+  val conf = new SparkConf()
+  testPersistenceEngine(conf, serializer =>
+new FileSystemPersistenceEngine(dir.getAbsolutePath + "/a/b/c/dir", 
serializer)
+  )
+}
+  }
+
   test("SPARK-46205: Support KryoSerializer in FileSystemPersistenceEngine") {
 withTempDir { dir =>
   val conf = new SparkConf()


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-43228][SQL] Join keys also match PartitioningCollection in CoalesceBucketsInJoin

2023-12-02 Thread yumwang
This is an automated email from the ASF dual-hosted git repository.

yumwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 0ddf4fdb99cb [SPARK-43228][SQL] Join keys also match 
PartitioningCollection in CoalesceBucketsInJoin
0ddf4fdb99cb is described below

commit 0ddf4fdb99cbff5c40653c646096ffa845744a83
Author: wankunde 
AuthorDate: Sun Dec 3 11:55:59 2023 +0800

[SPARK-43228][SQL] Join keys also match PartitioningCollection in 
CoalesceBucketsInJoin

### What changes were proposed in this pull request?
This PR updates `CoalesceBucketsInJoin.satisfiesOutputPartitioning` to 
support matching `PartitioningCollection`. A common case is that we add an 
alias on the join key. For example:

```sql
SELECT *
FROM   (SELECT /*+ BROADCAST(t3) */ t1.i AS t1i, t1.j AS t1j, t3.*
FROM   t1 JOIN t3 ON t1.i = t3.i AND t1.j = t3.j) t
   JOIN t2 ON t.t1i = t2.i AND t.t1j = t2.j
```

The left side outputPartitioning is:

```
(hashpartitioning(t1i#41, t1j#42, 8) or hashpartitioning(i#46, t1j#42, 8) 
or hashpartitioning(t1i#41, j#47, 8) or hashpartitioning(i#46, j#47, 8))
```

### Why are the changes needed?
Enhance `CoalesceBucketsInJoin` to support more cases.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Unit test.

Closes #44128 from wankunde/coalesce_partitions.

Authored-by: wankunde 
Signed-off-by: Yuming Wang 
---
 .../spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala | 4 +++-
 .../scala/org/apache/spark/sql/sources/BucketedReadSuite.scala| 8 
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
index bd79aca8e647..d1464b4ac4ee 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
@@ -22,7 +22,7 @@ import scala.annotation.tailrec
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, PartitioningCollection}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, 
ProjectExec, SparkPlan}
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
BroadcastNestedLoopJoinExec, ShuffledHashJoinExec, ShuffledJoin, 
SortMergeJoinExec}
@@ -141,6 +141,8 @@ object ExtractJoinWithBuckets {
 partitioning match {
   case HashPartitioning(exprs, _) if exprs.length == keys.length =>
 exprs.forall(e => keys.exists(_.semanticEquals(e)))
+  case PartitioningCollection(partitionings) =>
+partitionings.exists(satisfiesOutputPartitioning(keys, _))
   case _ => false
 }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 898e80df0207..3573bafe482c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
@@ -1077,6 +1077,14 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils with Adapti
 |FROM   t1 LEFT JOIN t3 ON t1.i = t3.i AND t1.j = t3.j) t
 |   LEFT JOIN t2 ON t.i = t2.i AND t.j = t2.j
 |""".stripMargin, 2, None)
+// join keys also match PartitioningCollection
+verify(
+  """
+|SELECT *
+|FROM   (SELECT /*+ BROADCAST(t3) */ t1.i AS t1i, t1.j AS t1j, t3.*
+|FROM   t1 JOIN t3 ON t1.i = t3.i AND t1.j = t3.j) t
+|   JOIN t2 ON t.t1i = t2.i AND t.t1j = t2.j
+|""".stripMargin, 0, Some(4))
   }
 }
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46213][PYTHON] Introduce `PySparkImportError` for error framework

2023-12-02 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 75b0eb2d6017 [SPARK-46213][PYTHON] Introduce `PySparkImportError` for 
error framework
75b0eb2d6017 is described below

commit 75b0eb2d601763847507a5e715b3732db004544a
Author: Haejoon Lee 
AuthorDate: Sat Dec 2 13:15:35 2023 -0800

[SPARK-46213][PYTHON] Introduce `PySparkImportError` for error framework

### What changes were proposed in this pull request?

This PR proposes to introduce `PySparkImportError` for error framework.

### Why are the changes needed?

For better error handling.

### Does this PR introduce _any_ user-facing change?

No API changes, but it's improve the user-facing error messages.

### How was this patch tested?

The existing CI should pass

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44123 from itholic/pyspark_import_error.

Authored-by: Haejoon Lee 
Signed-off-by: Dongjoon Hyun 
---
 python/docs/source/reference/pyspark.errors.rst |  1 +
 python/pyspark/errors/__init__.py   |  2 ++
 python/pyspark/errors/error_classes.py  | 10 +++
 python/pyspark/errors/exceptions/base.py|  6 
 python/pyspark/sql/connect/utils.py | 36 ---
 python/pyspark/sql/pandas/utils.py  | 38 ++---
 6 files changed, 72 insertions(+), 21 deletions(-)

diff --git a/python/docs/source/reference/pyspark.errors.rst 
b/python/docs/source/reference/pyspark.errors.rst
index d659657afc56..88cbd405b83d 100644
--- a/python/docs/source/reference/pyspark.errors.rst
+++ b/python/docs/source/reference/pyspark.errors.rst
@@ -43,6 +43,7 @@ Classes
 PySparkRuntimeError
 PySparkTypeError
 PySparkValueError
+PySparkImportError
 PySparkIndexError
 PythonException
 QueryExecutionException
diff --git a/python/pyspark/errors/__init__.py 
b/python/pyspark/errors/__init__.py
index d0a62537d347..923cb665d112 100644
--- a/python/pyspark/errors/__init__.py
+++ b/python/pyspark/errors/__init__.py
@@ -39,6 +39,7 @@ from pyspark.errors.exceptions.base import (  # noqa: F401
 SparkNoSuchElementException,
 PySparkTypeError,
 PySparkValueError,
+PySparkImportError,
 PySparkIndexError,
 PySparkAttributeError,
 PySparkRuntimeError,
@@ -69,6 +70,7 @@ __all__ = [
 "SparkNoSuchElementException",
 "PySparkTypeError",
 "PySparkValueError",
+"PySparkImportError",
 "PySparkIndexError",
 "PySparkAttributeError",
 "PySparkRuntimeError",
diff --git a/python/pyspark/errors/error_classes.py 
b/python/pyspark/errors/error_classes.py
index 289b16c9b606..e1a93aa6be1a 100644
--- a/python/pyspark/errors/error_classes.py
+++ b/python/pyspark/errors/error_classes.py
@@ -667,6 +667,11 @@ ERROR_CLASSES_JSON = """
   "Only a single trigger is allowed."
 ]
   },
+  "PACKAGE_NOT_INSTALLED" : {
+"message" : [
+  " >=  must be installed; however, it was 
not found."
+]
+  },
   "PIPE_FUNCTION_EXITED" : {
 "message" : [
   "Pipe function `` exited with error code ."
@@ -908,6 +913,11 @@ ERROR_CLASSES_JSON = """
   " is not supported."
 ]
   },
+  "UNSUPPORTED_PACKAGE_VERSION" : {
+"message" : [
+  " >=  must be installed; however, your 
version is ."
+]
+  },
   "UNSUPPORTED_PARAM_TYPE_FOR_HIGHER_ORDER_FUNCTION" : {
 "message" : [
   "Function `` should use only POSITIONAL or POSITIONAL OR 
KEYWORD arguments."
diff --git a/python/pyspark/errors/exceptions/base.py 
b/python/pyspark/errors/exceptions/base.py
index c84ca17c3dbd..4a2b31418e29 100644
--- a/python/pyspark/errors/exceptions/base.py
+++ b/python/pyspark/errors/exceptions/base.py
@@ -258,3 +258,9 @@ class PySparkPicklingError(PySparkException, PicklingError):
 """
 Wrapper class for pickle.PicklingError to support error classes.
 """
+
+
+class PySparkImportError(PySparkException, ImportError):
+"""
+Wrapper class for ImportError to support error classes.
+"""
diff --git a/python/pyspark/sql/connect/utils.py 
b/python/pyspark/sql/connect/utils.py
index fd85d75060b5..88f26202b0b2 100644
--- a/python/pyspark/sql/connect/utils.py
+++ b/python/pyspark/sql/connect/utils.py
@@ -18,6 +18,7 @@ import sys
 
 from pyspark.loose_version import LooseVersion
 from pyspark.sql.pandas.utils import require_minimum_pandas_version, 
require_minimum_pyarrow_version
+from pyspark.errors import PySparkImportError
 
 
 def check_dependencies(mod_name: str) -> None:
@@ -45,13 +46,21 @@ def require_minimum_grpc_version() -> None:
 try:
 import grpc
 except ImportError as error:
-raise ImportError(
-f"grpcio >= {minimum_grpc_version} must be installed; h

(spark) branch master updated: [MINOR][DOCS] Add `since` tag for `Scan.reportDriverMetrics`

2023-12-02 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new aa1eca8d26cc [MINOR][DOCS] Add `since` tag for 
`Scan.reportDriverMetrics`
aa1eca8d26cc is described below

commit aa1eca8d26cc69cc0e85403ea65ca07f577248ca
Author: zouxxyy 
AuthorDate: Sat Dec 2 11:43:50 2023 -0800

[MINOR][DOCS] Add `since` tag for `Scan.reportDriverMetrics`

### What changes were proposed in this pull request?

Add `since` tag for `reportDriverMetrics`

### Why are the changes needed?

The `reportDriverMetrics` api in `Scan`was introduced in spark3.4.0, we 
need to the `since` tag to facilitate developers

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

No need test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #44116 from Zouxxyy/dev/add-tag.

Authored-by: zouxxyy 
Signed-off-by: Dongjoon Hyun 
---
 .../src/main/java/org/apache/spark/sql/connector/read/Scan.java | 2 ++
 1 file changed, 2 insertions(+)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java
index 969a47be7070..edb6e631c30f 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java
@@ -121,6 +121,8 @@ public interface Scan {
* Returns an array of custom metrics which are collected with values at the 
driver side only.
* Note that these metrics must be included in the supported custom metrics 
reported by
* `supportedCustomMetrics`.
+   *
+   * @since 3.4.0
*/
   default CustomTaskMetric[] reportDriverMetrics() {
 return new CustomTaskMetric[]{};


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-41532][CONNECT][PYTHON][FOLLOWUP] Expose `SessionNotSameException` as PySpark exceptions

2023-12-02 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ccedc1988fa5 [SPARK-41532][CONNECT][PYTHON][FOLLOWUP] Expose 
`SessionNotSameException` as PySpark exceptions
ccedc1988fa5 is described below

commit ccedc1988fa5a3968b606b943e32760058c7115f
Author: Haejoon Lee 
AuthorDate: Sat Dec 2 11:40:47 2023 -0800

[SPARK-41532][CONNECT][PYTHON][FOLLOWUP] Expose `SessionNotSameException` 
as PySpark exceptions

### What changes were proposed in this pull request?

This PR proposes to expose `SessionNotSameException` as PySpark exceptions.

### Why are the changes needed?

`SessionNotSameException` was defined from 
https://github.com/apache/spark/pull/40684, but was not added into all 
exception API list.

All PySpark-specific exceptions defined in 
`python/pyspark/errors/exceptions/base.py` also should be exposed from 
`python/pyspark/errors/__init__.py`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

The existing CI should pass.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44121 from itholic/41532-followup.

Authored-by: Haejoon Lee 
Signed-off-by: Dongjoon Hyun 
---
 python/docs/source/reference/pyspark.errors.rst | 1 +
 python/pyspark/errors/__init__.py   | 2 ++
 2 files changed, 3 insertions(+)

diff --git a/python/docs/source/reference/pyspark.errors.rst 
b/python/docs/source/reference/pyspark.errors.rst
index 9f28d134cf2d..d659657afc56 100644
--- a/python/docs/source/reference/pyspark.errors.rst
+++ b/python/docs/source/reference/pyspark.errors.rst
@@ -46,6 +46,7 @@ Classes
 PySparkIndexError
 PythonException
 QueryExecutionException
+SessionNotSameException
 SparkRuntimeException
 SparkUpgradeException
 SparkNoSuchElementException
diff --git a/python/pyspark/errors/__init__.py 
b/python/pyspark/errors/__init__.py
index a8b7191c166a..d0a62537d347 100644
--- a/python/pyspark/errors/__init__.py
+++ b/python/pyspark/errors/__init__.py
@@ -21,6 +21,7 @@ PySpark exceptions.
 from pyspark.errors.exceptions.base import (  # noqa: F401
 PySparkException,
 AnalysisException,
+SessionNotSameException,
 TempTableAlreadyExistsException,
 ParseException,
 IllegalArgumentException,
@@ -50,6 +51,7 @@ from pyspark.errors.exceptions.base import (  # noqa: F401
 __all__ = [
 "PySparkException",
 "AnalysisException",
+"SessionNotSameException",
 "TempTableAlreadyExistsException",
 "ParseException",
 "IllegalArgumentException",


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46210][K8S][DOCS] Update `YuniKorn` docs with v1.4

2023-12-02 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 37deb1c1 [SPARK-46210][K8S][DOCS] Update `YuniKorn` docs with v1.4
37deb1c1 is described below

commit 37deb1c1962ca742e5adf9fe902a4aa792a0
Author: Dongjoon Hyun 
AuthorDate: Sun Dec 3 03:28:42 2023 +0800

[SPARK-46210][K8S][DOCS] Update `YuniKorn` docs with v1.4

### What changes were proposed in this pull request?

This PR aims to update `YuniKorn` docs with v1.4 for Apache Spark 4.0.0.

### Why are the changes needed?

Apache YuniKorn v1.4.0 was released on 2023-11-20 with 270 resolved JIRAs.

- https://yunikorn.apache.org/release-announce/1.4.0
- `PreEnqueue-Plugin` for `SchedulingGate` feature based on [KEP-3521: 
Pod Scheduling 
Readiness](https://github.com/kubernetes/enhancements/blob/master/keps/sig-scheduling/3521-pod-scheduling-readiness/README.md)

I installed YuniKorn v1.4.0 on K8s 1.28 and tested manually.

**K8s v1.28**
```
$ kubectl version
Client Version: v1.28.4
Kustomize Version: v5.0.4-0.20230601165947-6ce0bf390ce3
Server Version: v1.28.2
```

**YuniKorn v1.4**
```
$ helm list -n yunikorn
NAMENAMESPACE   REVISIONUPDATED 
STATUS  CHART   APP VERSION
yunikornyunikorn1   2023-12-01 19:02:54.63097 
-0800 PST deployedyunikorn-1.4.0
```

```
$ build/sbt -Pkubernetes -Pkubernetes-integration-tests 
-Dspark.kubernetes.test.deployMode=docker-desktop 
"kubernetes-integration-tests/testOnly *.YuniKornSuite" 
-Dtest.exclude.tags=minikube,local,decom,r -Dtest.default.exclude.tags=
...
[info] YuniKornSuite:
[info] - SPARK-42190: Run SparkPi with local[*] (11 seconds, 592 
milliseconds)
[info] - Run SparkPi with no resources (15 seconds, 437 milliseconds)
[info] - Run SparkPi with no resources & statefulset allocation (20 
seconds, 243 milliseconds)
[info] - Run SparkPi with a very long application name. (15 seconds, 231 
milliseconds
[info] - Use SparkLauncher.NO_RESOURCE (20 seconds, 240 milliseconds)
[info] - Run SparkPi with a master URL without a scheme. (15 seconds, 233 
milliseconds)
[info] - Run SparkPi with an argument. (21 seconds, 378 milliseconds)
[info] - Run SparkPi with custom labels, annotations, and environment 
variables. (15 seconds, 231 milliseconds)
[info] - All pods have the same service account by default (20 seconds, 295 
milliseconds)
[info] - Run extraJVMOptions check on driver (11 seconds, 300 milliseconds)
[info] - SPARK-42474: Run extraJVMOptions JVM GC option check - G1GC (12 
seconds, 183 milliseconds)
[info] - SPARK-42474: Run extraJVMOptions JVM GC option check - Other GC 
(11 seconds, 837 milliseconds)
[info] - SPARK-42769: All executor pods have SPARK_DRIVER_POD_IP env 
variable (15 seconds, 499 milliseconds)
[info] - Verify logging configuration is picked from the provided 
SPARK_CONF_DIR/log4j2.properties (19 seconds, 881 milliseconds)
[info] - Run SparkPi with env and mount secrets. (22 seconds, 842 
milliseconds)
[info] - Run PySpark on simple pi.py example (16 seconds, 319 milliseconds)
[info] - Run PySpark to test a pyfiles example (21 seconds, 599 
milliseconds)
[info] - Run PySpark with memory customization (15 seconds, 355 
milliseconds)
[info] - Run in client mode. (5 seconds, 120 milliseconds)
[info] - Start pod creation from template (15 seconds, 484 milliseconds)
[info] - SPARK-38398: Schedule pod creation from template (15 seconds, 427 
milliseconds)
[info] Run completed in 7 minutes, 48 seconds.
[info] Total number of tests run: 21
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 21, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 485 s (08:05), completed Dec 1, 2023, 7:36:56 PM
```

```
$ k describe pod -l spark-role=driver -n 
spark-ce267a8f046f4785a4f42763c5b3851c
...
Events:
  TypeReason Age   From  Message
  --     ---
  Normal  Scheduling 7syunikorn  
spark-ce267a8f046f4785a4f42763c5b3851c/spark-test-app-c94236f72b6b438c80b00226cbc0e95c-driver
 is queued and waiting for allocation
  Normal  Scheduled  7syunikorn  Successfully assigned 
spark-ce267a8f046f4785a4f42763c5b3851c/spark-test-app-c94236f72b6b438c80b00226cbc0e95c-driver
 to node docker-desktop
  Normal  PodBindSuccessful  7syunikorn  Pod 
spark-ce267a8f046f4785a4f42763c5b3851c/spark-test-app-c94236f72b6b438c80b00226cbc0e95c-driver
 is successfully bound to node docker-desktop
  Normal  Pu

(spark) branch master updated: [SPARK-46205][CORE][TESTS][FOLLOWUP] Simplify PersistenceEngineBenchmark

2023-12-02 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a6afd09f7f2f [SPARK-46205][CORE][TESTS][FOLLOWUP] Simplify 
PersistenceEngineBenchmark
a6afd09f7f2f is described below

commit a6afd09f7f2fca38b693eac243d5410708c1ef70
Author: Dongjoon Hyun 
AuthorDate: Sat Dec 2 11:27:34 2023 -0800

[SPARK-46205][CORE][TESTS][FOLLOWUP] Simplify PersistenceEngineBenchmark

### What changes were proposed in this pull request?

This is a follow-up of #44113 to address a comment about simplifying the 
benchmark.

### Why are the changes needed?

To simplify.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually.
```
$ build/sbt "core/Test/runMain 
org.apache.spark.deploy.master.PersistenceEngineBenchmark"
...
[info] OpenJDK 64-Bit Server VM 17.0.9+9-LTS on Mac OS X 14.2
[info] Apple M1 Max
[info] 1000 Workers:Best Time(ms)   Avg 
Time(ms)   Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
[info] 
---
[info] ZooKeeperPersistenceEngine with JavaSerializer   12602   
   12847 253  0.012601717.5   1.0X
[info] ZooKeeperPersistenceEngine with KryoSerializer   12116   
   12130  13  0.012116373.8   1.0X
[info] FileSystemPersistenceEngine with JavaSerializer429   
 435   6  0.0  429374.9  29.3X
[info] FileSystemPersistenceEngine with KryoSerializer179   
 180   2  0.0  178795.7  70.5X
[info] BlackHolePersistenceEngine   0   
   0   0 46.6  21.5  587273.6X
[success] Total time: 126 s (02:06), completed Dec 1, 2023, 7:59:25 PM
```

### Was this patch authored or co-authored using generative AI tooling?

Closes #44118 from dongjoon-hyun/SPARK-46205-2.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 .../deploy/master/PersistenceEngineBenchmark.scala | 52 +-
 1 file changed, 20 insertions(+), 32 deletions(-)

diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala
 
b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala
index 730ae05fa146..2f8e9a8eff2d 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineBenchmark.scala
@@ -46,8 +46,7 @@ import org.apache.spark.util.Utils
 object PersistenceEngineBenchmark extends BenchmarkBase {
 
   val conf = new SparkConf()
-  val serializerJava = new JavaSerializer(conf)
-  val serializerKryo = new KryoSerializer(conf)
+  val serializers = Seq(new JavaSerializer(conf), new KryoSerializer(conf))
   val zkTestServer = new TestingServer(findFreePort(conf))
 
   override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
@@ -61,38 +60,27 @@ object PersistenceEngineBenchmark extends BenchmarkBase {
 runBenchmark("PersistenceEngineBenchmark") {
   val benchmark = new Benchmark(s"$numWorkers Workers", numWorkers, output 
= output)
 
-  benchmark.addCase("ZooKeeperPersistenceEngine with JavaSerializer", 
numIters) { _ =>
-val engine = new ZooKeeperPersistenceEngine(conf, serializerJava)
-workers.foreach(engine.addWorker)
-engine.read[WorkerInfo]("worker_")
-workers.foreach(engine.removeWorker)
-engine.close()
-  }
-
-  benchmark.addCase("ZooKeeperPersistenceEngine with KryoSerializer", 
numIters) { _ =>
-val engine = new ZooKeeperPersistenceEngine(conf, serializerKryo)
-workers.foreach(engine.addWorker)
-engine.read[WorkerInfo]("worker_")
-workers.foreach(engine.removeWorker)
-engine.close()
+  serializers.foreach { serializer =>
+val serializerName = serializer.getClass.getSimpleName
+benchmark.addCase(s"ZooKeeperPersistenceEngine with $serializerName", 
numIters) { _ =>
+  val engine = new ZooKeeperPersistenceEngine(conf, serializer)
+  workers.foreach(engine.addWorker)
+  engine.read[WorkerInfo]("worker_")
+  workers.foreach(engine.removeWorker)
+  engine.close()
+}
   }
 
-  benchmark.addCase("FileSystemPersistenceEngine with JavaSerializer", 
numIters) { _ =>
-val dir = Utils.createTempDir().getAbsolutePath
-val engine = new FileSystemPersistenceEngine(dir, serializerJava)
-workers.foreach(engine.addWorker)
-

(spark) branch branch-3.5 updated: [SPARK-45975][SQL][TESTS][3.5] Reset storeAssignmentPolicy to original

2023-12-02 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new fde0fe676358 [SPARK-45975][SQL][TESTS][3.5] Reset 
storeAssignmentPolicy to original
fde0fe676358 is described below

commit fde0fe676358fb3e9142d6895e4c7fc2e6604d5e
Author: wforget <643348...@qq.com>
AuthorDate: Sat Dec 2 11:25:54 2023 -0800

[SPARK-45975][SQL][TESTS][3.5] Reset storeAssignmentPolicy to original

### What changes were proposed in this pull request?
Reset storeAssignmentPolicy to original in HiveCompatibilitySuite.

### Why are the changes needed?
STORE_ASSIGNMENT_POLICY was not reset in HiveCompatibilitySuite, causing 
subsequent test cases to fail.

Details: 
https://github.com/wForget/spark/actions/runs/6902668865/job/18779862759

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44126 from LuciferYang/SPARK-45943-FOLLOWUP.

Authored-by: wforget <643348...@qq.com>
Signed-off-by: Dongjoon Hyun 
---
 .../org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala   | 3 +++
 1 file changed, 3 insertions(+)

diff --git 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index bd323dc4b24e..0467603c01cd 100644
--- 
a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ 
b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -41,6 +41,8 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with 
BeforeAndAfter {
   private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled
   private val originalSessionLocalTimeZone = TestHive.conf.sessionLocalTimeZone
   private val originalAnsiMode = TestHive.conf.getConf(SQLConf.ANSI_ENABLED)
+  private val originalStoreAssignmentPolicy =
+TestHive.conf.getConf(SQLConf.STORE_ASSIGNMENT_POLICY)
   private val originalCreateHiveTable =
 TestHive.conf.getConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT)
 
@@ -76,6 +78,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with 
BeforeAndAfter {
   TestHive.setConf(SQLConf.CROSS_JOINS_ENABLED, originalCrossJoinEnabled)
   TestHive.setConf(SQLConf.SESSION_LOCAL_TIMEZONE, 
originalSessionLocalTimeZone)
   TestHive.setConf(SQLConf.ANSI_ENABLED, originalAnsiMode)
+  TestHive.setConf(SQLConf.STORE_ASSIGNMENT_POLICY, 
originalStoreAssignmentPolicy)
   TestHive.setConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT, 
originalCreateHiveTable)
 
   // For debugging dump some statistics about how much time was spent in 
various optimizer rules


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46092][SQL] Don't push down Parquet row group filters that overflow

2023-12-02 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8f8e41ea5203 [SPARK-46092][SQL] Don't push down Parquet row group 
filters that overflow
8f8e41ea5203 is described below

commit 8f8e41ea5203a7aa43f860da68320a6e1de24834
Author: Johan Lasperas 
AuthorDate: Sat Dec 2 11:14:47 2023 -0800

[SPARK-46092][SQL] Don't push down Parquet row group filters that overflow

### What changes were proposed in this pull request?
This change adds a check for overflows when creating Parquet row group 
filters on an INT32 (byte/short/int) parquet type to avoid incorrectly skipping 
row groups if the predicate value doesn't fit in an INT. This can happen if the 
read schema is specified as LONG, e.g via `.schema("col LONG")`
While the Parquet readers don't support reading INT32 into a LONG, the 
overflow can lead to row groups being incorrectly skipped, bypassing the reader 
altogether and producing incorrect results instead of failing.

### Why are the changes needed?
Reading a parquet file containing INT32 values with a read schema specified 
as LONG can produce incorrect results today:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < 
${Long.MaxValue}").collect()
```
will return an empty result. The correct result is either:
- Failing the query if the parquet reader doesn't support upcasting 
integers to longs (all parquet readers in Spark today)
- Return result `[0]` if the parquet reader supports that upcast (no 
readers in Spark as of now, but I'm looking into adding this capability).

### Does this PR introduce _any_ user-facing change?
The following:
```
Seq(0).toDF("a").write.parquet(path)
spark.read.schema("a LONG").parquet(path).where(s"a < 
${Long.MaxValue}").collect()
```
produces an (incorrect) empty result before this change. After this change, 
the read will fail, raising an error about the unsupported conversion from INT 
to LONG in the parquet reader.

### How was this patch tested?
- Added tests to `ParquetFilterSuite` to ensure that no row group filter is 
created when the predicate value overflows or when the value type isn't 
compatible with the parquet type
- Added test to `ParquetQuerySuite` covering the correctness issue 
described above.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44006 from johanl-db/SPARK-46092-row-group-skipping-overflow.

Authored-by: Johan Lasperas 
Signed-off-by: Dongjoon Hyun 
---
 .../datasources/parquet/ParquetFilters.scala   | 10 ++-
 .../datasources/parquet/ParquetFilterSuite.scala   | 71 ++
 .../datasources/parquet/ParquetQuerySuite.scala| 20 ++
 3 files changed, 99 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 1cab5eace2be..02cf34aee3ec 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, 
Long => JLong}
+import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float 
=> JFloat, Long => JLong, Short => JShort}
 import java.math.{BigDecimal => JBigDecimal}
 import java.nio.charset.StandardCharsets.UTF_8
 import java.sql.{Date, Timestamp}
@@ -614,7 +614,13 @@ class ParquetFilters(
 value == null || (nameToParquetField(name).fieldType match {
   case ParquetBooleanType => value.isInstanceOf[JBoolean]
   case ParquetIntegerType if value.isInstanceOf[Period] => true
-  case ParquetByteType | ParquetShortType | ParquetIntegerType => 
value.isInstanceOf[Number]
+  case ParquetByteType | ParquetShortType | ParquetIntegerType => value 
match {
+// Byte/Short/Int are all stored as INT32 in Parquet so filters are 
built using type Int.
+// We don't create a filter if the value would overflow.
+case _: JByte | _: JShort | _: Integer => true
+case v: JLong => v.longValue() >= Int.MinValue && v.longValue() <= 
Int.MaxValue
+case _ => false
+  }
   case ParquetLongType => value.isInstanceOf[JLong] || 
value.isInstanceOf[Duration]
   case ParquetFloatType => value.isInstanceOf[JFloat]
   case ParquetDoubleType => value.isInstanceOf[JDouble]
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetF

(spark) branch master updated: [SPARK-46214][BUILD] Upgrade commons-io to 2.15.1

2023-12-02 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 341b7d7a6e3a [SPARK-46214][BUILD] Upgrade commons-io to 2.15.1
341b7d7a6e3a is described below

commit 341b7d7a6e3a0b24d6cacbd11251f3beb44d12fa
Author: panbingkun 
AuthorDate: Sat Dec 2 11:08:32 2023 -0800

[SPARK-46214][BUILD] Upgrade commons-io to 2.15.1

### What changes were proposed in this pull request?
The pr aims to upgrade `commons-io` from 2.15.0 to 2.15.1.

### Why are the changes needed?
- The full release notes: 
https://commons.apache.org/proper/commons-io/changes-report.html#a2.15.1
- The version mainly focus on fixing bugs:
Fix FileChannels.contentEquals().
Avoid NullPointerException in RegexFileFilter.RegexFileFilter(Pattern).
Avoid NullPointerException in RegexFileFilter.accept(Path, 
BasicFileAttributes)

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #44125 from panbingkun/SPARK-46214.

Authored-by: panbingkun 
Signed-off-by: Dongjoon Hyun 
---
 dev/deps/spark-deps-hadoop-3-hive-2.3 | 2 +-
 pom.xml   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 
b/dev/deps/spark-deps-hadoop-3-hive-2.3
index 25245d0b74f5..f1d675d92b6d 100644
--- a/dev/deps/spark-deps-hadoop-3-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-3-hive-2.3
@@ -43,7 +43,7 @@ commons-compiler/3.1.9//commons-compiler-3.1.9.jar
 commons-compress/1.25.0//commons-compress-1.25.0.jar
 commons-crypto/1.1.0//commons-crypto-1.1.0.jar
 commons-dbcp/1.4//commons-dbcp-1.4.jar
-commons-io/2.15.0//commons-io-2.15.0.jar
+commons-io/2.15.1//commons-io-2.15.1.jar
 commons-lang/2.6//commons-lang-2.6.jar
 commons-lang3/3.14.0//commons-lang3-3.14.0.jar
 commons-logging/1.1.3//commons-logging-1.1.3.jar
diff --git a/pom.xml b/pom.xml
index 1e1dbbf59ed9..2a259cfd322b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -193,7 +193,7 @@
 3.0.3
 1.16.0
 1.25.0
-2.15.0
+2.15.1
 
 2.6
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark-docker) branch master updated: [SPARK-46209] Add java 11 only yml for version before 3.5

2023-12-02 Thread yikun
This is an automated email from the ASF dual-hosted git repository.

yikun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark-docker.git


The following commit(s) were added to refs/heads/master by this push:
 new 431aa51  [SPARK-46209] Add java 11 only yml for version before 3.5
431aa51 is described below

commit 431aa516ba58985c902bf2d2a07bf0eaa1df6740
Author: Yikun Jiang 
AuthorDate: Sat Dec 2 20:36:29 2023 +0800

[SPARK-46209] Add java 11 only yml for version before 3.5

### What changes were proposed in this pull request?
Add Java11 only workflow for version before 3.5.0.

### Why are the changes needed?
otherwise, the publish will failed due to no java 17 file founded in 
version before v 3.5.0.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Test on my repo: 
https://github.com/Yikun/spark-docker/actions/workflows/publish-java11.yml

Closes #58 from Yikun/java11-publish.

Authored-by: Yikun Jiang 
Signed-off-by: Yikun Jiang 
---
 .github/workflows/{publish.yml => publish-java11.yml} | 9 -
 .github/workflows/publish.yml | 7 ---
 2 files changed, 4 insertions(+), 12 deletions(-)

diff --git a/.github/workflows/publish.yml 
b/.github/workflows/publish-java11.yml
similarity index 96%
copy from .github/workflows/publish.yml
copy to .github/workflows/publish-java11.yml
index ec0d66c..caa3702 100644
--- a/.github/workflows/publish.yml
+++ b/.github/workflows/publish-java11.yml
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-name: "Publish"
+name: "Publish (Java 11 only)"
 
 on:
   workflow_dispatch:
@@ -25,10 +25,9 @@ on:
   spark:
 description: 'The Spark version of Spark image.'
 required: true
-default: '3.5.0'
+default: '3.4.2'
 type: choice
 options:
-- 3.5.0
 - 3.4.2
 - 3.4.1
 - 3.4.0
@@ -59,7 +58,7 @@ jobs:
 strategy:
   matrix:
 scala: [2.12]
-java: [11, 17]
+java: [11]
 image-type: ["scala"]
 permissions:
   packages: write
@@ -81,7 +80,7 @@ jobs:
 strategy:
   matrix:
 scala: [2.12]
-java: [11, 17]
+java: [11]
 image-type: ["all", "python", "r"]
 permissions:
   packages: write
diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml
index ec0d66c..2f828a4 100644
--- a/.github/workflows/publish.yml
+++ b/.github/workflows/publish.yml
@@ -29,13 +29,6 @@ on:
 type: choice
 options:
 - 3.5.0
-- 3.4.2
-- 3.4.1
-- 3.4.0
-- 3.3.3
-- 3.3.2
-- 3.3.1
-- 3.3.0
   publish:
 description: 'Publish the image or not.'
 default: false


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org