[spark] branch master updated: [SPARK-45220][PYTHON][DOCS] Refine docstring of DataFrame.join

2023-10-18 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new db162369941 [SPARK-45220][PYTHON][DOCS] Refine docstring of 
DataFrame.join
db162369941 is described below

commit db162369941dacbbd7eceb98896d498666f7fac8
Author: allisonwang-db 
AuthorDate: Thu Oct 19 14:25:30 2023 +0900

[SPARK-45220][PYTHON][DOCS] Refine docstring of DataFrame.join

### What changes were proposed in this pull request?

This PR refines the docstring of `DataFrame.join` by adding more examples 
and explanations.

### Why are the changes needed?

To improve PySpark documentation.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

doctest

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43039 from allisonwang-db/spark-45220-refine-join.

Authored-by: allisonwang-db 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/dataframe.py | 167 +---
 1 file changed, 124 insertions(+), 43 deletions(-)

diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 637787ceb66..34df2bafcf8 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -2646,7 +2646,8 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 on: Optional[Union[str, List[str], Column, List[Column]]] = None,
 how: Optional[str] = None,
 ) -> "DataFrame":
-"""Joins with another :class:`DataFrame`, using the given join 
expression.
+"""
+Joins with another :class:`DataFrame`, using the given join expression.
 
 .. versionadded:: 1.3.0
 
@@ -2675,39 +2676,55 @@ class DataFrame(PandasMapOpsMixin, 
PandasConversionMixin):
 
 Examples
 
-The following performs a full outer join between ``df1`` and ``df2``.
+The following examples demonstrate various join types among ``df1``, 
``df2``, and ``df3``.
 
+>>> import pyspark.sql.functions as sf
 >>> from pyspark.sql import Row
->>> from pyspark.sql.functions import desc
->>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")]).toDF("age", 
"name")
->>> df2 = spark.createDataFrame([Row(height=80, name="Tom"), 
Row(height=85, name="Bob")])
->>> df3 = spark.createDataFrame([Row(age=2, name="Alice"), Row(age=5, 
name="Bob")])
->>> df4 = spark.createDataFrame([
-... Row(age=10, height=80, name="Alice"),
-... Row(age=5, height=None, name="Bob"),
-... Row(age=None, height=None, name="Tom"),
-... Row(age=None, height=None, name=None),
+>>> df = spark.createDataFrame([Row(name="Alice", age=2), 
Row(name="Bob", age=5)])
+>>> df2 = spark.createDataFrame([Row(name="Tom", height=80), 
Row(name="Bob", height=85)])
+>>> df3 = spark.createDataFrame([
+... Row(name="Alice", age=10, height=80),
+... Row(name="Bob", age=5, height=None),
+... Row(name="Tom", age=None, height=None),
+... Row(name=None, age=None, height=None),
 ... ])
 
 Inner join on columns (default)
 
->>> df.join(df2, 'name').select(df.name, df2.height).show()
-++--+
-|name|height|
-++--+
-| Bob|85|
-++--+
->>> df.join(df4, ['name', 'age']).select(df.name, df.age).show()
-++---+
-|name|age|
-++---+
-| Bob|  5|
-++---+
-
-Outer join for both DataFrames on the 'name' column.
-
->>> df.join(df2, df.name == df2.name, 'outer').select(
-... df.name, df2.height).sort(desc("name")).show()
+>>> df.join(df2, "name").show()
+++---+--+
+|name|age|height|
+++---+--+
+| Bob|  5|85|
+++---+--+
+
+>>> df.join(df3, ["name", "age"]).show()
+++---+--+
+|name|age|height|
+++---+--+
+| Bob|  5|  NULL|
+++---+--+
+
+Outer join on a single column with an explicit join condition.
+
+When the join condition is explicited stated: `df.name == df2.name`, 
this will
+produce all records where the names match, as well as those that don't 
(since
+it's an outer join). If there are names in `df2` that are not present 
in `df`,
+they will appear with `NULL` in the `name` column of `df`, and vice 
versa for `df2`.
+
+>>> joined = df.join(df2, df.name == df2.name, 
"outer").sort(sf.desc(df.name))
+>>> joined.show() # doctest: +SKIP
++-+++--+
+| name| age|name|height|
+

[spark] branch master updated: [SPARK-45589][PYTHON][DOCS] Supplementary exception class

2023-10-18 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 47d3fa83e70 [SPARK-45589][PYTHON][DOCS] Supplementary exception class
47d3fa83e70 is described below

commit 47d3fa83e70ec0b05ac72c6b7c47ee905543b6bd
Author: panbingkun 
AuthorDate: Thu Oct 19 14:18:16 2023 +0900

[SPARK-45589][PYTHON][DOCS] Supplementary exception class

### What changes were proposed in this pull request?
The pr aims to supplementary exception class for pyspark docs.

### Why are the changes needed?
Currently, there are only the following classes in the document,
https://github.com/apache/spark/assets/15246973/c297d00b-5534-4751-9d97-90c0b4d14a81;>

Actually:
https://github.com/apache/spark/assets/15246973/42a8a0e3-ba90-4c33-8b2f-1deef3753896;>

### Does this PR introduce _any_ user-facing change?
Yes, more complete documentation on exception classes.

### How was this patch tested?
- Pass GA.
- Manually test.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #43427 from panbingkun/SPARK-45589.

Authored-by: panbingkun 
Signed-off-by: Hyukjin Kwon 
---
 python/docs/source/reference/pyspark.errors.rst | 25 +++--
 1 file changed, 19 insertions(+), 6 deletions(-)

diff --git a/python/docs/source/reference/pyspark.errors.rst 
b/python/docs/source/reference/pyspark.errors.rst
index 13db9bd01fa..8723de8ebf2 100644
--- a/python/docs/source/reference/pyspark.errors.rst
+++ b/python/docs/source/reference/pyspark.errors.rst
@@ -28,16 +28,29 @@ Classes
 .. autosummary::
 :toctree: api/
 
-PySparkException
 AnalysisException
-TempTableAlreadyExistsException
-ParseException
+ArithmeticException
+ArrayIndexOutOfBoundsException
+DateTimeException
 IllegalArgumentException
-StreamingQueryException
-QueryExecutionException
+NumberFormatException
+ParseException
+PySparkAssertionError
+PySparkAttributeError
+PySparkException
+PySparkNotImplementedError
+PySparkPicklingError
+PySparkRuntimeError
+PySparkTypeError
+PySparkValueError
 PythonException
-UnknownException
+QueryExecutionException
+SparkRuntimeException
 SparkUpgradeException
+StreamingQueryException
+TempTableAlreadyExistsException
+UnknownException
+UnsupportedOperationException
 
 
 Methods


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45588][PROTOBUF][CONNECT][MINOR] Scaladoc improvement for StreamingForeachBatchHelper

2023-10-18 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new e359f210c45 [SPARK-45588][PROTOBUF][CONNECT][MINOR] Scaladoc 
improvement for StreamingForeachBatchHelper
e359f210c45 is described below

commit e359f210c45abc17f0bcd32c9a86faf678caff75
Author: Raghu Angadi 
AuthorDate: Thu Oct 19 14:15:43 2023 +0900

[SPARK-45588][PROTOBUF][CONNECT][MINOR] Scaladoc improvement for 
StreamingForeachBatchHelper

### What changes were proposed in this pull request?

Couple of minor improvements to `StreamingForeachBatchHelper`:

  * Make `RunnerCleaner` private and add ScalaDoc.
  * Update contract for `pythonForeachBatchWrapper()` to inform that call 
should eventually should `close()` the `AutoClosable` returned.

In addition, it also fixes a flake in Protobuf unit test.

### Why are the changes needed?
  - Code readability improvement.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
 - Existing tests.
 - For protobuf suite, verified with seed set to '399'. It fails before 
this PR and passes after.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #43424 from rangadi/feb-scaladoc.

Authored-by: Raghu Angadi 
Signed-off-by: Jungtaek Lim 
---
 .../apache/spark/sql/connect/planner/SparkConnectPlanner.scala   | 2 +-
 .../spark/sql/connect/planner/StreamingForeachBatchHelper.scala  | 9 ++---
 .../sql/connect/service/SparkConnectSessionHodlerSuite.scala | 4 +++-
 .../spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala | 2 +-
 4 files changed, 11 insertions(+), 6 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index fa964c02a25..299f4f8830a 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -2927,7 +2927,7 @@ class SparkConnectPlanner(
 }
 
 // This is filled when a foreach batch runner started for Python.
-var foreachBatchRunnerCleaner: 
Option[StreamingForeachBatchHelper.RunnerCleaner] = None
+var foreachBatchRunnerCleaner: Option[AutoCloseable] = None
 
 if (writeOp.hasForeachBatch) {
   val foreachBatchFn = writeOp.getForeachBatch.getFunctionCase match {
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
index b8097b23550..ce75ba3eb59 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
@@ -40,7 +40,9 @@ object StreamingForeachBatchHelper extends Logging {
 
   type ForeachBatchFnType = (DataFrame, Long) => Unit
 
-  case class RunnerCleaner(runner: StreamingPythonRunner) extends 
AutoCloseable {
+  // Visible for testing.
+  /** An AutoClosable to clean up resources on query termination. Stops Python 
worker. */
+  private[connect] case class RunnerCleaner(runner: StreamingPythonRunner) 
extends AutoCloseable {
 override def close(): Unit = {
   try runner.stop()
   catch {
@@ -98,11 +100,12 @@ object StreamingForeachBatchHelper extends Logging {
   /**
* Starts up Python worker and initializes it with Python function. Returns 
a foreachBatch
* function that sets up the session and Dataframe cache and and interacts 
with the Python
-   * worker to execute user's function.
+   * worker to execute user's function. In addition, it returns an 
AutoClosable. The caller must
+   * ensure it is closed so that worker process and related resources are 
released.
*/
   def pythonForeachBatchWrapper(
   pythonFn: SimplePythonFunction,
-  sessionHolder: SessionHolder): (ForeachBatchFnType, RunnerCleaner) = {
+  sessionHolder: SessionHolder): (ForeachBatchFnType, AutoCloseable) = {
 
 val port = SparkConnectService.localPort
 val connectUrl = s"sc://localhost:$port/;user_id=${sessionHolder.userId}"
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala
index a6451de8fc2..910c2a2650c 100644
--- 

[spark] branch master updated: [MINOR][CONNECT] Fix a typo in org.apache.spark.sql.connect.client.ArtifactManager

2023-10-18 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3042ffd08fb [MINOR][CONNECT] Fix a typo in 
org.apache.spark.sql.connect.client.ArtifactManager
3042ffd08fb is described below

commit 3042ffd08fbd3c3e81c6f7f2ed14448ad45292d0
Author: zhaomin 
AuthorDate: Thu Oct 19 14:10:11 2023 +0900

[MINOR][CONNECT] Fix a typo in 
org.apache.spark.sql.connect.client.ArtifactManager

### What changes were proposed in this pull request?

Change 'Unsuppoted' in the exception information to 'Unsupported'

### Why are the changes needed?

this is a typo

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

pass ci

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43440 from zhaomin1423/typo.

Authored-by: zhaomin 
Signed-off-by: Hyukjin Kwon 
---
 .../scala/org/apache/spark/sql/connect/client/ArtifactManager.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
index 6b08737ed21..2f8eacb0690 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala
@@ -87,7 +87,7 @@ class ArtifactManager(
   case cf if cf.endsWith(".class") =>
 newClassArtifact(path.getFileName, new LocalFile(path))
   case other =>
-throw new UnsupportedOperationException(s"Unsuppoted file format: 
$other")
+throw new UnsupportedOperationException(s"Unsupported file format: 
$other")
 }
 Seq[Artifact](artifact)
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch branch-3.5 updated: [SPARK-45553][PS] Deprecate `assertPandasOnSparkEqual`

2023-10-18 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 17d283990b6 [SPARK-45553][PS] Deprecate `assertPandasOnSparkEqual`
17d283990b6 is described below

commit 17d283990b64614828838afa718f48b855ab7842
Author: Haejoon Lee 
AuthorDate: Thu Oct 19 13:57:01 2023 +0900

[SPARK-45553][PS] Deprecate `assertPandasOnSparkEqual`

### What changes were proposed in this pull request?

This PR proposes to deprecate `assertPandasOnSparkEqual`.

### Why are the changes needed?

Now we have more pandas friendly testing utils such as 
`ps.testing.assert_frame_equal`, `ps.testing.assert_series_equal` and 
`ps.testing.assert_index_equal`.

### Does this PR introduce _any_ user-facing change?

Not for now, but `assertPandasOnSparkEqual` will be removed in the future 
version.

### How was this patch tested?

The existing CI should pass.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43426 from itholic/SPARK-45553.

Authored-by: Haejoon Lee 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit f3e280b952da8b8ab6c78371f3715cc674a73bc1)
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/testing/pandasutils.py | 11 +++
 1 file changed, 11 insertions(+)

diff --git a/python/pyspark/testing/pandasutils.py 
b/python/pyspark/testing/pandasutils.py
index c80ffb7ee53..04a523bce76 100644
--- a/python/pyspark/testing/pandasutils.py
+++ b/python/pyspark/testing/pandasutils.py
@@ -365,6 +365,11 @@ def assertPandasOnSparkEqual(
 
 .. versionadded:: 3.5.0
 
+.. deprecated:: 3.5.1
+`assertPandasOnSparkEqual` will be removed in Spark 4.0.0.
+Use `ps.testing.assert_frame_equal`, `ps.testing.assert_series_equal`
+and `ps.testing.assert_index_equal` instead.
+
 Parameters
 --
 actual: pandas-on-Spark DataFrame, Series, or Index
@@ -417,6 +422,12 @@ def assertPandasOnSparkEqual(
 >>> s2 = ps.Index([212.3, 100.0001])
 >>> assertPandasOnSparkEqual(s1, s2, almost=True)  # pass, ps.Index obj 
are almost equal
 """
+warnings.warn(
+"`assertPandasOnSparkEqual` will be removed in Spark 4.0.0. "
+"Use `ps.testing.assert_frame_equal`, `ps.testing.assert_series_equal` 
"
+"and `ps.testing.assert_index_equal` instead.",
+FutureWarning,
+)
 if actual is None and expected is None:
 return True
 elif actual is None or expected is None:


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45553][PS] Deprecate `assertPandasOnSparkEqual`

2023-10-18 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f3e280b952d [SPARK-45553][PS] Deprecate `assertPandasOnSparkEqual`
f3e280b952d is described below

commit f3e280b952da8b8ab6c78371f3715cc674a73bc1
Author: Haejoon Lee 
AuthorDate: Thu Oct 19 13:57:01 2023 +0900

[SPARK-45553][PS] Deprecate `assertPandasOnSparkEqual`

### What changes were proposed in this pull request?

This PR proposes to deprecate `assertPandasOnSparkEqual`.

### Why are the changes needed?

Now we have more pandas friendly testing utils such as 
`ps.testing.assert_frame_equal`, `ps.testing.assert_series_equal` and 
`ps.testing.assert_index_equal`.

### Does this PR introduce _any_ user-facing change?

Not for now, but `assertPandasOnSparkEqual` will be removed in the future 
version.

### How was this patch tested?

The existing CI should pass.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43426 from itholic/SPARK-45553.

Authored-by: Haejoon Lee 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/testing/pandasutils.py | 11 +++
 1 file changed, 11 insertions(+)

diff --git a/python/pyspark/testing/pandasutils.py 
b/python/pyspark/testing/pandasutils.py
index bb9ca0dc74e..9abffefdbe7 100644
--- a/python/pyspark/testing/pandasutils.py
+++ b/python/pyspark/testing/pandasutils.py
@@ -341,6 +341,11 @@ def assertPandasOnSparkEqual(
 
 .. versionadded:: 3.5.0
 
+.. deprecated:: 3.5.1
+`assertPandasOnSparkEqual` will be removed in Spark 4.0.0.
+Use `ps.testing.assert_frame_equal`, `ps.testing.assert_series_equal`
+and `ps.testing.assert_index_equal` instead.
+
 Parameters
 --
 actual: pandas-on-Spark DataFrame, Series, or Index
@@ -393,6 +398,12 @@ def assertPandasOnSparkEqual(
 >>> s2 = ps.Index([212.3, 100.0001])
 >>> assertPandasOnSparkEqual(s1, s2, almost=True)  # pass, ps.Index obj 
are almost equal
 """
+warnings.warn(
+"`assertPandasOnSparkEqual` will be removed in Spark 4.0.0. "
+"Use `ps.testing.assert_frame_equal`, `ps.testing.assert_series_equal` 
"
+"and `ps.testing.assert_index_equal` instead.",
+FutureWarning,
+)
 if actual is None and expected is None:
 return True
 elif actual is None or expected is None:


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] 02/02: Revert "[SPARK-45546][BUILD][INFRA] Make `publish-snapshot` support `package` first then `deploy`"

2023-10-18 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git

commit e37dd3ab8e0707eead2cb068bc19456349ccdd86
Author: Hyukjin Kwon 
AuthorDate: Thu Oct 19 13:50:31 2023 +0900

Revert "[SPARK-45546][BUILD][INFRA] Make `publish-snapshot` support 
`package` first then `deploy`"

This reverts commit 3ef18e2d00f386196292f0c768816626bc903d47.
---
 .github/workflows/publish_snapshot.yml |  4 
 dev/create-release/release-build.sh| 14 ++
 pom.xml|  7 ---
 3 files changed, 2 insertions(+), 23 deletions(-)

diff --git a/.github/workflows/publish_snapshot.yml 
b/.github/workflows/publish_snapshot.yml
index 476d41d0cf1..7ed836f016b 100644
--- a/.github/workflows/publish_snapshot.yml
+++ b/.github/workflows/publish_snapshot.yml
@@ -66,8 +66,4 @@ jobs:
 GPG_KEY: "not_used"
 GPG_PASSPHRASE: "not_used"
 GIT_REF: ${{ matrix.branch }}
-# SPARK-45546 adds this environment variable to split the publish 
snapshot process into two steps:
-# first package, then deploy. This is intended to reduce the resource 
pressure of deploy.
-# When PACKAGE_BEFORE_DEPLOY is not set to true, it will revert to the 
one-step deploy method.
-PACKAGE_BEFORE_DEPLOY: true
   run: ./dev/create-release/release-build.sh publish-snapshot
diff --git a/dev/create-release/release-build.sh 
b/dev/create-release/release-build.sh
index 3776c64e31e..f3571c4e48c 100755
--- a/dev/create-release/release-build.sh
+++ b/dev/create-release/release-build.sh
@@ -432,24 +432,14 @@ if [[ "$1" == "publish-snapshot" ]]; then
   echo "" >> $tmp_settings
 
   if [[ $PUBLISH_SCALA_2_12 = 1 ]]; then
-if [ "$PACKAGE_BEFORE_DEPLOY" = "true" ]; then
-  $MVN -DskipTests $SCALA_2_12_PROFILES $PUBLISH_PROFILES clean package
-  $MVN --settings $tmp_settings -DskipTests $SCALA_2_12_PROFILES 
$PUBLISH_PROFILES deploy
-else
-  $MVN --settings $tmp_settings -DskipTests $SCALA_2_12_PROFILES 
$PUBLISH_PROFILES clean deploy
-fi
+$MVN --settings $tmp_settings -DskipTests $SCALA_2_12_PROFILES 
$PUBLISH_PROFILES clean deploy
   fi
 
   if [[ $PUBLISH_SCALA_2_13 = 1 ]]; then
 if [[ $SPARK_VERSION < "4.0" ]]; then
   ./dev/change-scala-version.sh 2.13
 fi
-if [ "$PACKAGE_BEFORE_DEPLOY" = "true" ]; then
-  $MVN -DskipTests $SCALA_2_13_PROFILES $PUBLISH_PROFILES clean package
-  $MVN --settings $tmp_settings -DskipTests $SCALA_2_13_PROFILES 
$PUBLISH_PROFILES deploy
-else
-  $MVN --settings $tmp_settings -DskipTests $SCALA_2_13_PROFILES 
$PUBLISH_PROFILES clean deploy
-fi
+$MVN --settings $tmp_settings -DskipTests $SCALA_2_13_PROFILES 
$PUBLISH_PROFILES clean deploy
   fi
 
   rm $tmp_settings
diff --git a/pom.xml b/pom.xml
index ade6537c2a1..824ae49f6da 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3840,11 +3840,4 @@
   
 
   
-  
-
-  internal.snapshot
-  Internal Snapshot Repository
-  http://localhost:8081/repository/maven-snapshots/
-
-  
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated (a3c17b2e229 -> e37dd3ab8e0)

2023-10-18 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from a3c17b2e229 [SPARK-45562][SQL][FOLLOW-UP] XML: Make 'rowTag' option 
check case insensitive
 new 706872d4de2 Revert "[SPARK-45546][BUILD][FOLLOW-UP] Remove snapshot 
repo mistakenly added"
 new e37dd3ab8e0 Revert "[SPARK-45546][BUILD][INFRA] Make 
`publish-snapshot` support `package` first then `deploy`"

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/publish_snapshot.yml |  4 
 dev/create-release/release-build.sh| 14 ++
 2 files changed, 2 insertions(+), 16 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] 01/02: Revert "[SPARK-45546][BUILD][FOLLOW-UP] Remove snapshot repo mistakenly added"

2023-10-18 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 706872d4de2374d1faf84d8706611a092c0b6e76
Author: Hyukjin Kwon 
AuthorDate: Thu Oct 19 13:50:22 2023 +0900

Revert "[SPARK-45546][BUILD][FOLLOW-UP] Remove snapshot repo mistakenly 
added"

This reverts commit 74dc5a3d8c0ffe425dfadec44e41615a8f3f8367.
---
 pom.xml | 7 +++
 1 file changed, 7 insertions(+)

diff --git a/pom.xml b/pom.xml
index 824ae49f6da..ade6537c2a1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3840,4 +3840,11 @@
   
 
   
+  
+
+  internal.snapshot
+  Internal Snapshot Repository
+  http://localhost:8081/repository/maven-snapshots/
+
+  
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45562][SQL][FOLLOW-UP] XML: Make 'rowTag' option check case insensitive

2023-10-18 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a3c17b2e229 [SPARK-45562][SQL][FOLLOW-UP] XML: Make 'rowTag' option 
check case insensitive
a3c17b2e229 is described below

commit a3c17b2e22969de3d225fc9890023456592f6158
Author: Sandip Agarwala <131817656+sandip...@users.noreply.github.com>
AuthorDate: Thu Oct 19 13:23:04 2023 +0900

[SPARK-45562][SQL][FOLLOW-UP] XML: Make 'rowTag' option check case 
insensitive

### What changes were proposed in this pull request?
[PR 43389](https://github.com/apache/spark/pull/43389) made `rowTag` option 
required for XML read and write. However, the option check was done in a case 
sensitive manner. This PR makes the check case-insensitive.

### Why are the changes needed?
Options are case-insensitive.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43416 from sandip-db/xml-rowTagCaseInsensitive.

Authored-by: Sandip Agarwala <131817656+sandip...@users.noreply.github.com>
Signed-off-by: Hyukjin Kwon 
---
 .../org/apache/spark/sql/catalyst/xml/XmlOptions.scala  | 17 +++--
 .../sql/execution/datasources/xml/XmlFileFormat.scala   |  5 ++---
 2 files changed, 13 insertions(+), 9 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala
index 0dedbec58e1..d2c7b435fe6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala
@@ -34,7 +34,8 @@ import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, 
SQLConf}
 private[sql] class XmlOptions(
 @transient val parameters: CaseInsensitiveMap[String],
 defaultTimeZoneId: String,
-defaultColumnNameOfCorruptRecord: String)
+defaultColumnNameOfCorruptRecord: String,
+rowTagRequired: Boolean)
   extends FileSourceOptions(parameters) with Logging {
 
   import XmlOptions._
@@ -42,11 +43,13 @@ private[sql] class XmlOptions(
   def this(
   parameters: Map[String, String] = Map.empty,
   defaultTimeZoneId: String = SQLConf.get.sessionLocalTimeZone,
-  defaultColumnNameOfCorruptRecord: String = 
SQLConf.get.columnNameOfCorruptRecord) = {
+  defaultColumnNameOfCorruptRecord: String = 
SQLConf.get.columnNameOfCorruptRecord,
+  rowTagRequired: Boolean = false) = {
 this(
   CaseInsensitiveMap(parameters),
   defaultTimeZoneId,
-  defaultColumnNameOfCorruptRecord)
+  defaultColumnNameOfCorruptRecord,
+  rowTagRequired)
   }
 
   private def getBool(paramName: String, default: Boolean = false): Boolean = {
@@ -63,7 +66,9 @@ private[sql] class XmlOptions(
   }
 
   val compressionCodec = 
parameters.get(COMPRESSION).map(CompressionCodecs.getCodecClassName)
-  val rowTag = parameters.getOrElse(ROW_TAG, XmlOptions.DEFAULT_ROW_TAG).trim
+  val rowTagOpt = parameters.get(XmlOptions.ROW_TAG)
+  require(!rowTagRequired || rowTagOpt.isDefined, s"'${XmlOptions.ROW_TAG}' 
option is required.")
+  val rowTag = rowTagOpt.getOrElse(XmlOptions.DEFAULT_ROW_TAG).trim
   require(rowTag.nonEmpty, s"'$ROW_TAG' option should not be an empty string.")
   require(!rowTag.startsWith("<") && !rowTag.endsWith(">"),
   s"'$ROW_TAG' should not include angle brackets")
@@ -223,8 +228,8 @@ private[sql] object XmlOptions extends DataSourceOptions {
   newOption(ENCODING, CHARSET)
 
   def apply(parameters: Map[String, String]): XmlOptions =
-new XmlOptions(parameters, SQLConf.get.sessionLocalTimeZone)
+new XmlOptions(parameters)
 
   def apply(): XmlOptions =
-new XmlOptions(Map.empty, SQLConf.get.sessionLocalTimeZone)
+new XmlOptions(Map.empty)
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlFileFormat.scala
index 4342711b00f..77619299278 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlFileFormat.scala
@@ -42,11 +42,10 @@ class XmlFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
   def getXmlOptions(
   sparkSession: SparkSession,
   parameters: Map[String, String]): XmlOptions = {
-val rowTagOpt = parameters.get(XmlOptions.ROW_TAG)
-require(rowTagOpt.isDefined, s"'${XmlOptions.ROW_TAG}' option is 
required.")
 new XmlOptions(parameters,
   sparkSession.sessionState.conf.sessionLocalTimeZone,
-   

[spark] branch master updated: [SPARK-45507][SQL] Correctness fix for nested correlated scalar subqueries with COUNT aggregates

2023-10-18 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8a972c2fe87 [SPARK-45507][SQL] Correctness fix for nested correlated 
scalar subqueries with COUNT aggregates
8a972c2fe87 is described below

commit 8a972c2fe8730e41193986a273aa92b234e5beb8
Author: Andy Lam 
AuthorDate: Thu Oct 19 10:36:32 2023 +0800

[SPARK-45507][SQL] Correctness fix for nested correlated scalar subqueries 
with COUNT aggregates

### What changes were proposed in this pull request?

We want to use the count bug handling in `DecorrelateInnerQuery` to detect 
potential count bugs in scalar subqueries.  it It is always safe to use 
`DecorrelateInnerQuery` to handle count bugs, but for efficiency reasons, like 
for the common case of COUNT on top of the scalar subquery, we would like to 
avoid an extra left outer join. This PR therefore introduces a simple check to 
detect such cases before `decorrelate()` - if true, then don't do count bug 
handling in `decorrelate()`, an [...]

### Why are the changes needed?

This PR fixes correctness issues for correlated scalar subqueries 
pertaining to the COUNT bug. Examples can be found in the JIRA ticket.

### Does this PR introduce _any_ user-facing change?

Yes, results will change.

### How was this patch tested?

Added SQL end-to-end tests in `count.sql`

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43341 from andylam-db/multiple-count-bug.

Authored-by: Andy Lam 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/optimizer/subquery.scala|  44 +-
 .../org/apache/spark/sql/internal/SQLConf.scala|   9 ++
 .../nested-scalar-subquery-count-bug.sql.out   | 166 +
 .../nested-scalar-subquery-count-bug.sql   |  34 +
 .../nested-scalar-subquery-count-bug.sql.out   | 125 
 5 files changed, 372 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
index 5b95ee1df1b..1f1a16e9093 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
@@ -426,17 +426,49 @@ object PullupCorrelatedPredicates extends 
Rule[LogicalPlan] with PredicateHelper
 plan.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
   case ScalarSubquery(sub, children, exprId, conditions, hint, 
mayHaveCountBugOld)
 if children.nonEmpty =>
-val (newPlan, newCond) = decorrelate(sub, plan)
-val mayHaveCountBug = if (mayHaveCountBugOld.isEmpty) {
+
+def mayHaveCountBugAgg(a: Aggregate): Boolean = {
+  a.groupingExpressions.isEmpty && 
a.aggregateExpressions.exists(_.exists {
+case a: AggregateExpression => 
a.aggregateFunction.defaultResult.isDefined
+case _ => false
+  })
+}
+
+// The below logic controls handling count bug for scalar subqueries in
+// [[DecorrelateInnerQuery]], and if we don't handle it here, we 
handle it in
+// [[RewriteCorrelatedScalarSubquery#constructLeftJoins]]. Note that 
handling it in
+// [[DecorrelateInnerQuery]] is always correct, and turning it off to 
handle it in
+// constructLeftJoins is an optimization, so that additional, 
redundant left outer joins are
+// not introduced.
+val handleCountBugInDecorrelate = 
SQLConf.get.decorrelateInnerQueryEnabled &&
+  !conf.getConf(SQLConf.LEGACY_SCALAR_SUBQUERY_COUNT_BUG_HANDLING) && 
!(sub match {
+  // Handle count bug only if there exists lower level Aggs with count 
bugs. It does not
+  // matter if the top level agg is count bug vulnerable or not, 
because:
+  // 1. If the top level agg is count bug vulnerable, it can be 
handled in
+  // constructLeftJoins, unless there are lower aggs that are count 
bug vulnerable.
+  // E.g. COUNT(COUNT + COUNT)
+  // 2. If the top level agg is not count bug vulnerable, it can be 
count bug vulnerable if
+  // there are lower aggs that are count bug vulnerable. E.g. 
SUM(COUNT)
+  case agg: Aggregate => !agg.child.exists {
+case lowerAgg: Aggregate => mayHaveCountBugAgg(lowerAgg)
+case _ => false
+  }
+  case _ => false
+})
+val (newPlan, newCond) = decorrelate(sub, plan, 
handleCountBugInDecorrelate)
+val mayHaveCountBug = if (mayHaveCountBugOld.isDefined) {
+  // For idempotency, we must save this variable the first time this 
rule is 

[spark] branch master updated: [SPARK-45586][SQL] Reduce compiler latency for plans with large expression trees

2023-10-18 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 1e94415739c [SPARK-45586][SQL] Reduce compiler latency for plans with 
large expression trees
1e94415739c is described below

commit 1e94415739ccfc4222a067459d3cb8be480530b4
Author: Kelvin Jiang 
AuthorDate: Thu Oct 19 10:24:58 2023 +0800

[SPARK-45586][SQL] Reduce compiler latency for plans with large expression 
trees

### What changes were proposed in this pull request?

* Included rule ID pruning when traversing the expression trees in 
`TypeCoercionRule` (this avoids us from traversing the expression tree over and 
over again in future iterations of the rule)
* Improved `EquivalentExpressions`:
  * Since `supportedExpression()` is checking for the existence of a 
pattern in the tree, changed to check the `TreePatternBits` instead of 
recursing using `.exists()`
  * When creating an `ExpressionEquals` object, calculating the height 
requires recursing through all of its children, which is O(n^2) when called 
upon each expression in the expression tree. This changes it so that this 
height is cached in the `TreeNode`, so that it is now O(n) when called upon 
each expression in the tree
* More targeted TreePatternBits pruning in `ResolveTimeZone` and 
`ConstantPropagation`

### Why are the changes needed?

This PR improves some analyzer and optimizer rules to address 
inefficiencies when handling extremely large expression trees.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

There should be no plan changes, so no unit tests were modified.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43420 from kelvinjian-db/SPARK-45586-large-expr-trees.

Authored-by: Kelvin Jiang 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/analysis/TypeCoercion.scala |  4 +++-
 .../sql/catalyst/analysis/timeZoneAnalysis.scala   |  8 
 .../expressions/EquivalentExpressions.scala| 23 +++---
 .../spark/sql/catalyst/optimizer/expressions.scala |  6 --
 .../apache/spark/sql/catalyst/trees/TreeNode.scala |  2 ++
 5 files changed, 20 insertions(+), 23 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index c26569866e5..b34fd873621 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.AlwaysProcess
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
@@ -1215,7 +1216,8 @@ trait TypeCoercionRule extends Rule[LogicalPlan] with 
Logging {
   } else {
 beforeMapChildren
   }
-  withPropagatedTypes.transformExpressionsUp(typeCoercionFn)
+  withPropagatedTypes.transformExpressionsUpWithPruning(
+AlwaysProcess.fn, ruleId)(typeCoercionFn)
 }
 }
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala
index 11a5bc99b6c..01d88f050ca 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala
@@ -20,7 +20,7 @@ import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ListQuery, 
TimeZoneAwareExpression}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreePattern.{LIST_SUBQUERY, 
TIME_ZONE_AWARE_EXPRESSION}
+import 
org.apache.spark.sql.catalyst.trees.TreePattern.TIME_ZONE_AWARE_EXPRESSION
 import org.apache.spark.sql.types.DataType
 
 /**
@@ -40,10 +40,10 @@ object ResolveTimeZone extends Rule[LogicalPlan] {
 
   override def apply(plan: LogicalPlan): LogicalPlan =
 plan.resolveExpressionsWithPruning(
-  _.containsAnyPattern(LIST_SUBQUERY, TIME_ZONE_AWARE_EXPRESSION), ruleId
-)(transformTimeZoneExprs)
+  _.containsPattern(TIME_ZONE_AWARE_EXPRESSION), 

[spark] branch master updated: [SPARK-45585][TEST] Fix time format and redirection issues in SparkSubmit tests

2023-10-18 Thread yao
This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a14f90941ca [SPARK-45585][TEST] Fix time format and redirection issues 
in SparkSubmit tests
a14f90941ca is described below

commit a14f90941caf06e2d77789a3952dd588e6900b90
Author: Kent Yao 
AuthorDate: Thu Oct 19 09:43:13 2023 +0800

[SPARK-45585][TEST] Fix time format and redirection issues in SparkSubmit 
tests

### What changes were proposed in this pull request?

This PR fixes:

- The deviation from `new Timestamp(new Date().getTime)` and log4j2 date 
format pattern from sub spark-submit progress
```
2023-10-17 03:58:48.275 - stderr> 23/10/17 18:58:48 INFO 
StandaloneSchedulerBackend: Connected to Spark cluster with app ID 
app-20231017185848-
2023-10-17 03:58:48.278 - stderr> 23/10/17 18:58:48 INFO Utils: 
Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 57637.
```
- The duplication of `new Timestamp(new Date().getTime)` when using logInfo 
instead of println
```
23/10/17 19:02:34.392 Thread-5 INFO SparkShellSuite: 2023-10-17 
04:02:34.392 - stderr> 23/10/17 19:02:34 WARN Utils: Your hostname, hulk.local 
resolves to a loopback address: 127.0.0.1; using 10.221.103.23 instead (on 
interface en0)
23/10/17 19:02:34.393 Thread-5 INFO SparkShellSuite: 2023-10-17 
04:02:34.393 - stderr> 23/10/17 19:02:34 WARN Utils: Set SPARK_LOCAL_IP if you 
need to bind to another address

```
- Correctly redirects sub spark-submit progress logs to unit-tests.log

### Why are the changes needed?

test fixes

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

- WholeStageCodegenSparkSubmitSuite - before

```
18:58:53.882 shutdown-hook-0 INFO ShutdownHookManager: Shutdown hook called
18:58:53.882 shutdown-hook-0 INFO ShutdownHookManager: Deleting directory 
/Users/hzyaoqin/spark/target/tmp/spark-ecd53d47-d109-4ddc-80dd-2d829f34371e
11:58:18.892 pool-1-thread-1 WARN Utils: Your hostname, hulk.local resolves 
to a loopback address: 127.0.0.1; using 10.221.103.23 instead (on interface en0)
11:58:18.893 pool-1-thread-1 WARN Utils: Set SPARK_LOCAL_IP if you need to 
bind to another address
11:58:18.932 
pool-1-thread-1-ScalaTest-running-WholeStageCodegenSparkSubmitSuite INFO 
WholeStageCodegenSparkSubmitSuite:
```

- WholeStageCodegenSparkSubmitSuite - after
```
= TEST OUTPUT FOR 
o.a.s.sql.execution.WholeStageCodegenSparkSubmitSuite: 'Generated code on 
driver should not embed platform-specific constant' =

11:58:19.882 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 
23/10/18 11:58:19 WARN Utils: Your hostname, hulk.local resolves to a loopback 
address: 127.0.0.1; using 10.221.103.23 instead (on interface en0)
11:58:19.883 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 
23/10/18 11:58:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another 
address
11:58:20.195 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 
23/10/18 11:58:20 INFO SparkContext: Running Spark version 4.0.0-SNAPSHOT
11:58:20.195 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 
23/10/18 11:58:20 INFO SparkContext: OS info Mac OS X, 13.4, aarch64
11:58:20.195 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 
23/10/18 11:58:20 INFO SparkContext: Java version 17.0.8
11:58:20.227 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 
23/10/18 11:58:20 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
11:58:20.253 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 
23/10/18 11:58:20 INFO ResourceUtils: 
==
11:58:20.253 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 
23/10/18 11:58:20 INFO ResourceUtils: No custom resources configured for 
spark.driver.
11:58:20.253 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 
23/10/18 11:58:20 INFO ResourceUtils: 
==
11:58:20.254 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 
23/10/18 11:58:20 INFO SparkContext: Submitted application: 
org.apache.spark.sql.execution.WholeStageCodegenSparkSubmitSuite
11:58:20.266 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 
23/10/18 11:58:20 INFO ResourceProfile: Default ResourceProfile created, 
executor resources: Map(memory -> name: memory, amount: 1024, script: , vendor: 
, offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: 
Map(cpus -> name: cpus, amount: 1.0)
11:58:20.268 Thread-6 INFO 

[spark] branch branch-3.5 updated: [SPARK-45585][TEST] Fix time format and redirection issues in SparkSubmit tests

2023-10-18 Thread yao
This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 2f66851972c [SPARK-45585][TEST] Fix time format and redirection issues 
in SparkSubmit tests
2f66851972c is described below

commit 2f66851972c2fc66b053a8c78bc2814b7bb4257f
Author: Kent Yao 
AuthorDate: Thu Oct 19 09:43:13 2023 +0800

[SPARK-45585][TEST] Fix time format and redirection issues in SparkSubmit 
tests

### What changes were proposed in this pull request?

This PR fixes:

- The deviation from `new Timestamp(new Date().getTime)` and log4j2 date 
format pattern from sub spark-submit progress
```
2023-10-17 03:58:48.275 - stderr> 23/10/17 18:58:48 INFO 
StandaloneSchedulerBackend: Connected to Spark cluster with app ID 
app-20231017185848-
2023-10-17 03:58:48.278 - stderr> 23/10/17 18:58:48 INFO Utils: 
Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 57637.
```
- The duplication of `new Timestamp(new Date().getTime)` when using logInfo 
instead of println
```
23/10/17 19:02:34.392 Thread-5 INFO SparkShellSuite: 2023-10-17 
04:02:34.392 - stderr> 23/10/17 19:02:34 WARN Utils: Your hostname, hulk.local 
resolves to a loopback address: 127.0.0.1; using 10.221.103.23 instead (on 
interface en0)
23/10/17 19:02:34.393 Thread-5 INFO SparkShellSuite: 2023-10-17 
04:02:34.393 - stderr> 23/10/17 19:02:34 WARN Utils: Set SPARK_LOCAL_IP if you 
need to bind to another address

```
- Correctly redirects sub spark-submit progress logs to unit-tests.log

### Why are the changes needed?

test fixes

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

- WholeStageCodegenSparkSubmitSuite - before

```
18:58:53.882 shutdown-hook-0 INFO ShutdownHookManager: Shutdown hook called
18:58:53.882 shutdown-hook-0 INFO ShutdownHookManager: Deleting directory 
/Users/hzyaoqin/spark/target/tmp/spark-ecd53d47-d109-4ddc-80dd-2d829f34371e
11:58:18.892 pool-1-thread-1 WARN Utils: Your hostname, hulk.local resolves 
to a loopback address: 127.0.0.1; using 10.221.103.23 instead (on interface en0)
11:58:18.893 pool-1-thread-1 WARN Utils: Set SPARK_LOCAL_IP if you need to 
bind to another address
11:58:18.932 
pool-1-thread-1-ScalaTest-running-WholeStageCodegenSparkSubmitSuite INFO 
WholeStageCodegenSparkSubmitSuite:
```

- WholeStageCodegenSparkSubmitSuite - after
```
= TEST OUTPUT FOR 
o.a.s.sql.execution.WholeStageCodegenSparkSubmitSuite: 'Generated code on 
driver should not embed platform-specific constant' =

11:58:19.882 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 
23/10/18 11:58:19 WARN Utils: Your hostname, hulk.local resolves to a loopback 
address: 127.0.0.1; using 10.221.103.23 instead (on interface en0)
11:58:19.883 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 
23/10/18 11:58:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another 
address
11:58:20.195 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 
23/10/18 11:58:20 INFO SparkContext: Running Spark version 4.0.0-SNAPSHOT
11:58:20.195 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 
23/10/18 11:58:20 INFO SparkContext: OS info Mac OS X, 13.4, aarch64
11:58:20.195 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 
23/10/18 11:58:20 INFO SparkContext: Java version 17.0.8
11:58:20.227 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 
23/10/18 11:58:20 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
11:58:20.253 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 
23/10/18 11:58:20 INFO ResourceUtils: 
==
11:58:20.253 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 
23/10/18 11:58:20 INFO ResourceUtils: No custom resources configured for 
spark.driver.
11:58:20.253 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 
23/10/18 11:58:20 INFO ResourceUtils: 
==
11:58:20.254 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 
23/10/18 11:58:20 INFO SparkContext: Submitted application: 
org.apache.spark.sql.execution.WholeStageCodegenSparkSubmitSuite
11:58:20.266 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 
23/10/18 11:58:20 INFO ResourceProfile: Default ResourceProfile created, 
executor resources: Map(memory -> name: memory, amount: 1024, script: , vendor: 
, offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: 
Map(cpus -> name: cpus, amount: 1.0)
11:58:20.268 Thread-6 INFO 

[spark] branch master updated: [SPARK-45582][SS] Ensure that store instance is not used after calling commit within output mode streaming aggregation

2023-10-18 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b42ac52e787 [SPARK-45582][SS] Ensure that store instance is not used 
after calling commit within output mode streaming aggregation
b42ac52e787 is described below

commit b42ac52e787c896f21452023da0bd6685a1b47fc
Author: Anish Shrigondekar 
AuthorDate: Thu Oct 19 06:51:45 2023 +0900

[SPARK-45582][SS] Ensure that store instance is not used after calling 
commit within output mode streaming aggregation

### What changes were proposed in this pull request?
Ensure that store instance is not used after calling commit within output 
mode streaming aggregation

### Why are the changes needed?
Without these changes, we were accessing the store instance to retrieve the 
iterator even after the commit was called. When commit is called, we release 
the DB instance lock. So its possible task retries can acquire the instance 
lock and close the DB instance. So when the original thread tries to access the 
DB, it might run into a null pointer exception. This change fixes the issue

```
org.apache.spark.sql.streaming.StreamingQueryException: Job aborted 
due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: 
Lost task 0.0 in stage 5.0 (TID 492) 
(ip-10-110-25-116.us-west-2.compute.internal executor driver): 
java.lang.NullPointerException
at 
org.apache.spark.sql.execution.streaming.state.RocksDB.iterator(RocksDB.scala:337)
at 
org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider$RocksDBStateStore.iterator(RocksDBStateStoreProvider.scala:79)
at 
org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV1.values(StreamingAggregationStateManager.scala:130)
at 
org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$5(statefulOperators.scala:543)
at 
org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.$anonfun$mapPartitionsWithStateStore$1(package.scala:63)
at 
org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:131)
at 
org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:407)
at 
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:404)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:371)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
RocksDBStreamingAggregationSuite
```
18:12:00.242 WARN 
org.apache.spark.sql.streaming.RocksDBStateStoreStreamingAggregationSuite:

= POSSIBLE THREAD LEAK IN SUITE 
o.a.s.sql.streaming.RocksDBStateStoreStreamingAggregationSuite, threads: 
ForkJoinPool.commonPool-worker-6 (daemon=true), 
ForkJoinPool.commonPool-worker-4 (daemon=true), 
ForkJoinPool.commonPool-worker-7 (daemon=true), 
ForkJoinPool.commonPool-worker-5 (daemon=true), 
ForkJoinPool.commonPool-worker-3 (daemon=true), state-store-maintenance-task 
(daemon=true), rpc-boss-3-1 (daemon=true), ForkJoinPool.commonPool-worker-8 
(daemon=true), shuffle-boss-6-1 (da [...]
[info] Run completed in 5 minutes, 8 seconds.
[info] Total number of tests run: 80
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 80, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

StreamingSessionWindowSuite
```
= POSSIBLE THREAD LEAK IN SUITE 
o.a.s.sql.streaming.StreamingSessionWindowSuite, threads: 
ForkJoinPool.commonPool-worker-6 (daemon=true), 
state-store-maintenance-thread-0 (daemon=true), 
ForkJoinPool.commonPool-worker-4 (daemon=true), 
state-store-maintenance-thread-1 (daemon=true), 
ForkJoinPool.commonPool-worker-7 (daemon=true), 
ForkJoinPool.commonPool-worker-5 (daemon=true), 
ForkJoinPool.commonPool-worker-3 (daemon=true), state-store-maintenance-task 
(daemon=true), rpc-boss-3-1 (d [...]
[info] Run completed in 3 minutes, 38 seconds.
[info] Total number of tests run: 48
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 48, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43413 from anishshri-db/task/SPARK-45582.

Authored-by: Anish Shrigondekar 
Signed-off-by: Jungtaek Lim 
---
 .../execution/streaming/statefulOperators.scala| 64 --
 1 file changed, 48 insertions(+), 16 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 

[spark] branch master updated: [SPARK-45581] Make SQLSTATE mandatory

2023-10-18 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7e82e1bc43e [SPARK-45581] Make SQLSTATE mandatory
7e82e1bc43e is described below

commit 7e82e1bc43e0297c3036d802b3a151d2b93db2f6
Author: srielau 
AuthorDate: Wed Oct 18 11:04:44 2023 -0700

[SPARK-45581] Make SQLSTATE mandatory

### What changes were proposed in this pull request?

We propose to make SQLSTATEs mandatory field when using error classes in 
the new error framework.

### Why are the changes needed?

Being able to rely on the existence of SQLSTATEs allows easier 
classification of errors as well as usage of tooling to intercept SQLSTATEs.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

A new test was added to SparkThrowableSuite to enforce SQLSTATE existence

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43412 from srielau/SPARK-45581-make-SQLSTATEs-mandatory.

Authored-by: srielau 
Signed-off-by: Gengliang Wang 
---
 common/utils/src/main/resources/error/README.md| 26 ++
 .../src/main/resources/error/error-classes.json|  9 +---
 .../org/apache/spark/ErrorClassesJSONReader.scala  |  8 +++
 .../org/apache/spark/SparkThrowableSuite.scala | 16 -
 docs/sql-error-conditions.md   |  6 ++---
 5 files changed, 41 insertions(+), 24 deletions(-)

diff --git a/common/utils/src/main/resources/error/README.md 
b/common/utils/src/main/resources/error/README.md
index ac388c29250..8d8529bea56 100644
--- a/common/utils/src/main/resources/error/README.md
+++ b/common/utils/src/main/resources/error/README.md
@@ -1,6 +1,6 @@
 # Guidelines
 
-To throw a standardized user-facing error or exception, developers should 
specify the error class
+To throw a standardized user-facing error or exception, developers should 
specify the error class, a SQLSTATE,
 and message parameters rather than an arbitrary error message.
 
 ## Usage
@@ -10,7 +10,7 @@ and message parameters rather than an arbitrary error message.
If true, use the error class `INTERNAL_ERROR` and skip to step 4.
 2. Check if an appropriate error class already exists in `error-classes.json`.
If true, use the error class and skip to step 4.
-3. Add a new class to `error-classes.json`; keep in mind the invariants below.
+3. Add a new class with a new or existing SQLSTATE to `error-classes.json`; 
keep in mind the invariants below.
 4. Check if the exception type already extends `SparkThrowable`.
If true, skip to step 6.
 5. Mix `SparkThrowable` into the exception.
@@ -26,9 +26,9 @@ Throw with arbitrary error message:
 
 `error-classes.json`
 
-"PROBLEM_BECAUSE": {
-  "message": ["Problem  because "],
-  "sqlState": "X"
+"PROBLEM_BECAUSE" : {
+  "message" : ["Problem  because "],
+  "sqlState" : "X"
 }
 
 `SparkException.scala`
@@ -70,6 +70,8 @@ Error classes are a succinct, human-readable representation 
of the error categor
 
 An uncategorized errors can be assigned to a legacy error class with the 
prefix `_LEGACY_ERROR_TEMP_` and an unused sequential number, for instance 
`_LEGACY_ERROR_TEMP_0053`.
 
+You should not introduce new uncategorized errors. Instead, convert them to 
proper errors whenever encountering them in new code.
+
  Invariants
 
 - Unique
@@ -79,7 +81,10 @@ An uncategorized errors can be assigned to a legacy error 
class with the prefix
 ### Message
 
 Error messages provide a descriptive, human-readable representation of the 
error.
-The message format accepts string parameters via the C-style printf syntax.
+The message format accepts string parameters via the HTML tag syntax: e.g. 
.
+
+The values passed to the message shoudl not themselves be messages.
+They should be: runtime-values, keywords, identifiers, or other values that 
are not translated.
 
 The quality of the error message should match the
 [guidelines](https://spark.apache.org/error-message-guidelines.html).
@@ -90,21 +95,24 @@ The quality of the error message should match the
 
 ### SQLSTATE
 
-SQLSTATE is an optional portable error identifier across SQL engines.
+SQLSTATE is an mandatory portable error identifier across SQL engines.
 SQLSTATE comprises a 2-character class value followed by a 3-character 
subclass value.
 Spark prefers to re-use existing SQLSTATEs, preferably used by multiple 
vendors.
 For extension Spark claims the 'K**' subclass range.
 If a new class is needed it will also claim the 'K0' class.
 
+Internal errors should use the 'XX' class. You can subdivide internal errors 
by component. 
+For example: The existing 'XXKD0' is used for an internal analyzer error.
+
  Invariants
 
-- Consistent 

[spark] branch master updated: [SPARK-45549][CORE] Remove unused `numExistingExecutors` in `CoarseGrainedSchedulerBackend`

2023-10-18 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b41ea9162f4c [SPARK-45549][CORE] Remove unused `numExistingExecutors` 
in `CoarseGrainedSchedulerBackend`
b41ea9162f4c is described below

commit b41ea9162f4c8fbc4d04d28d6ab5cc0342b88cb0
Author: huangxiaoping <1754789...@qq.com>
AuthorDate: Wed Oct 18 08:49:46 2023 -0700

[SPARK-45549][CORE] Remove unused `numExistingExecutors` in 
`CoarseGrainedSchedulerBackend`

### What changes were proposed in this pull request?

Remove unused `numExistingExecutors` in `CoarseGrainedSchedulerBackend`

### Why are the changes needed?
Remove unused code, make spark code cleaner

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
No need test

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43383 from huangxiaopingRD/SPARK-45549.

Authored-by: huangxiaoping <1754789...@qq.com>
Signed-off-by: Dongjoon Hyun 
---
 .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala  | 5 -
 1 file changed, 5 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index b55dfb39d445..c770e5c9950a 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -731,11 +731,6 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 false
   }
 
-  /**
-   * Return the number of executors currently registered with this backend.
-   */
-  private def numExistingExecutors: Int = synchronized { executorDataMap.size }
-
   override def getExecutorIds(): Seq[String] = synchronized {
 executorDataMap.keySet.toSeq
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45370][PYTHON][TESTS] Fix python test when ansi mode enabled

2023-10-18 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 4a35a31c038f [SPARK-45370][PYTHON][TESTS] Fix python test when ansi 
mode enabled
4a35a31c038f is described below

commit 4a35a31c038f726f9329b4f28f3dde87286fb8d2
Author: panbingkun 
AuthorDate: Wed Oct 18 17:10:42 2023 +0900

[SPARK-45370][PYTHON][TESTS] Fix python test when ansi mode enabled

### What changes were proposed in this pull request?
The pr aims to fix some UT in `pyspark.sql.functions` when 
SPARK_ANSI_SQL_MODE=true.

### Why are the changes needed?
Make pyspark test happy.
When Ansi workflow daily GA runs, the following error occurs, eg:
https://github.com/apache/spark/actions/workflows/build_ansi.yml
https://github.com/apache/spark/actions/runs/667232/job/17202251325
https://github.com/apache/spark/assets/15246973/c22fb8c4-8b87-46fd-85f2-51f4c1d8d13d;>

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Manually test:
```
python/run-tests --testnames 'pyspark.sql.functions'
Running PySpark tests. Output is in 
/Users/panbingkun/Developer/spark/spark-community/python/unit-tests.log
Will test against the following Python executables: ['python3.9']
Will test the following Python tests: ['pyspark.sql.functions']
python3.9 python_implementation is CPython
python3.9 version is: Python 3.9.13
Starting test(python3.9): pyspark.sql.functions (temp output: 
/Users/panbingkun/Developer/spark/spark-community/python/target/c8705d5c-d9f9-4bc5-babf-d3642736c70c/python3.9__pyspark.sql.functions__gcfwu3ik.log)
Finished test(python3.9): pyspark.sql.functions (47s)
Tests passed in 47 seconds

```

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #43168 from panbingkun/SPARK-45370.

Authored-by: panbingkun 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/functions.py | 114 +---
 1 file changed, 73 insertions(+), 41 deletions(-)

diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 31e5884e9ebd..7807919ce2c3 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -11740,15 +11740,29 @@ def create_map(
 
 >>> from pyspark.sql import functions as sf
 >>> df = spark.createDataFrame([("Alice", 2, "female"),
-... ("Bob", 5, "male")], ("name", "age", 
"gender"))
+... ("Bob", 5, "male")], ("name", "age", "gender"))
 >>> df.select(sf.create_map(sf.lit('name'), df['name'],
-... sf.lit('age'), df['age'])).show(truncate=False)
-+-+
-|map(name, name, age, age)|
-+-+
-|{name -> Alice, age -> 2}|
-|{name -> Bob, age -> 5}  |
-+-+
+... sf.lit('gender'), df['gender'])).show(truncate=False)
++-+
+|map(name, name, gender, gender)  |
++-+
+|{name -> Alice, gender -> female}|
+|{name -> Bob, gender -> male}|
++-+
+
+Example 4: Usage of create_map function with values of different types.
+
+>>> from pyspark.sql import functions as sf
+>>> df = spark.createDataFrame([("Alice", 2, 22.2),
+... ("Bob", 5, 36.1)], ("name", "age", "weight"))
+>>> df.select(sf.create_map(sf.lit('age'), df['age'],
+... sf.lit('weight'), df['weight'])).show(truncate=False)
++-+
+|map(age, age, weight, weight)|
++-+
+|{age -> 2.0, weight -> 22.2} |
+|{age -> 5.0, weight -> 36.1} |
++-+
 """
 if len(cols) == 1 and isinstance(cols[0], (list, set)):
 cols = cols[0]  # type: ignore[assignment]
@@ -11833,50 +11847,68 @@ def array(
 Example 1: Basic usage of array function with column names.
 
 >>> from pyspark.sql import functions as sf
->>> df = spark.createDataFrame([("Alice", 2), ("Bob", 5)], ("name", "age"))
->>> df.select(sf.array('name', 'age').alias("arr")).show()
-+--+
-|   arr|
-+--+
-|[Alice, 2]|
-|  [Bob, 5]|
-+--+
+>>> df = spark.createDataFrame([("Alice", "doctor"), ("Bob", "engineer")],
+... ("name", "occupation"))
+>>> df.select(sf.array('name', 'occupation').alias("arr")).show()
++---+
+|arr|
++---+
+|[Alice, doctor]|
+|[Bob, engineer]|
++---+
 
 Example 2: Usage of array function with Column objects.
 
 >>> from pyspark.sql import functions as sf
->>> df = 

[spark] branch master updated: [SPARK-45410][INFRA][FOLLOW-UP] Rename "Python - PyPy3.8 (master)" to "Build / Python-only (master, PyPy 3.8)"

2023-10-18 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 69bb68bc997c [SPARK-45410][INFRA][FOLLOW-UP] Rename "Python - PyPy3.8 
(master)" to "Build / Python-only (master, PyPy 3.8)"
69bb68bc997c is described below

commit 69bb68bc997c6aae7072ca3c3b395f602c847473
Author: Hyukjin Kwon 
AuthorDate: Wed Oct 18 00:23:20 2023 -0700

[SPARK-45410][INFRA][FOLLOW-UP] Rename "Python - PyPy3.8 (master)" to 
"Build / Python-only (master, PyPy 3.8)"

### What changes were proposed in this pull request?

This PR is a simple followup of https://github.com/apache/spark/pull/43209 
that changes the build name from "Python - PyPy3.8 (master)" to "Build / 
Python-only (master, PyPy 3.8)"

### Why are the changes needed?

To move/group the build up when you click Actions 
(https://github.com/apache/spark/actions). For now, it looks as below:
![Screenshot 2023-10-18 at 4 16 23 
PM](https://github.com/apache/spark/assets/6477701/617cb386-53b7-40e9-bfa2-3c4a72b0278f)

and you have to click and extend to see Python build:
![Screenshot 2023-10-18 at 4 16 19 
PM](https://github.com/apache/spark/assets/6477701/ec258fb6-2449-4c9f-bb2e-340fc1efe78e)

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

N/A

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43428 from HyukjinKwon/minor-rename-python.

Authored-by: Hyukjin Kwon 
Signed-off-by: Dongjoon Hyun 
---
 .github/workflows/build_python.yml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/.github/workflows/build_python.yml 
b/.github/workflows/build_python.yml
index 94d64a825d87..04b46ffca672 100644
--- a/.github/workflows/build_python.yml
+++ b/.github/workflows/build_python.yml
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-name: "Python - PyPy3.8 (master)"
+name: "Build / Python-only (master, PyPy 3.8)"
 
 on:
   schedule:


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45542][CORE] Replace `setSafeMode(HdfsConstants.SafeModeAction, boolean)` with `setSafeMode(SafeModeAction, boolean)`

2023-10-18 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5d5a9dbfba18 [SPARK-45542][CORE] Replace 
`setSafeMode(HdfsConstants.SafeModeAction, boolean)` with 
`setSafeMode(SafeModeAction, boolean)`
5d5a9dbfba18 is described below

commit 5d5a9dbfba1831d1aafcc47a82a25df02e431749
Author: yangjie01 
AuthorDate: Wed Oct 18 00:16:43 2023 -0700

[SPARK-45542][CORE] Replace `setSafeMode(HdfsConstants.SafeModeAction, 
boolean)` with `setSafeMode(SafeModeAction, boolean)`

### What changes were proposed in this pull request?
The function `FsHistoryProvider#isFsInSafeMode` is using the deprecated API


https://github.com/apache/hadoop/blob/1be78238728da9266a4f88195058f08fd012bf9c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java#L1702-L1722

```
/**
   * Enter, leave or get safe mode.
   *
   * param action
   *  One of SafeModeAction.ENTER, SafeModeAction.LEAVE and
   *  SafeModeAction.GET.
   * param isChecked
   *  If true check only for Active NNs status, else check first 
NN's
   *  status.
   *
   * see 
org.apache.hadoop.hdfs.protocol.ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction,
   * boolean)
   *
   * deprecated please instead use
   *   {link DistributedFileSystem#setSafeMode(SafeModeAction, 
boolean)}.
   */
  Deprecated
  public boolean setSafeMode(HdfsConstants.SafeModeAction action,
  boolean isChecked) throws IOException {
return dfs.setSafeMode(action, isChecked);
  }
```

This pr change to use


https://github.com/apache/hadoop/blob/1be78238728da9266a4f88195058f08fd012bf9c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java#L1646-L1661

```
 /**
   * Enter, leave or get safe mode.
   *
   * param action
   *  One of SafeModeAction.ENTER, SafeModeAction.LEAVE and
   *  SafeModeAction.GET.
   * param isChecked
   *  If true check only for Active NNs status, else check first 
NN's
   *  status.
   */
  Override
  SuppressWarnings("deprecation")
  public boolean setSafeMode(SafeModeAction action, boolean isChecked)
  throws IOException {
return this.setSafeMode(convertToClientProtocolSafeModeAction(action), 
isChecked);
  }
```

instead of it.

The conversion relationship from `HdfsConstants.SafeModeAction` to 
`SafeModeAction` refers to `convertToClientProtocolSafeModeAction` method.


https://github.com/apache/hadoop/blob/1be78238728da9266a4f88195058f08fd012bf9c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java#L1663-L1686

```
/**
   * Translating the {link SafeModeAction} into {link 
HdfsConstants.SafeModeAction}
   * that is used by {link 
DFSClient#setSafeMode(HdfsConstants.SafeModeAction, boolean)}.
   *
   * param action any supported action listed in {link SafeModeAction}.
   * return the converted {link HdfsConstants.SafeModeAction}.
   * throws UnsupportedOperationException if the provided {link 
SafeModeAction} cannot be
   *   translated.
   */
  private static HdfsConstants.SafeModeAction 
convertToClientProtocolSafeModeAction(
  SafeModeAction action) {
switch (action) {
case ENTER:
  return HdfsConstants.SafeModeAction.SAFEMODE_ENTER;
case LEAVE:
  return HdfsConstants.SafeModeAction.SAFEMODE_LEAVE;
case FORCE_EXIT:
  return HdfsConstants.SafeModeAction.SAFEMODE_FORCE_EXIT;
case GET:
  return HdfsConstants.SafeModeAction.SAFEMODE_GET;
default:
  throw new UnsupportedOperationException("Unsupported safe mode action 
" + action);
}
  }
```

### Why are the changes needed?
Clean up deprecated API usage

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43377 from LuciferYang/SPARK-45542.

Authored-by: yangjie01 
Signed-off-by: Dongjoon Hyun 
---
 .../scala/org/apache/spark/deploy/history/FsHistoryProvider.scala| 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index b5afa86180b3..99b3184ac918 100644
--- 

[spark] branch master updated: [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions

2023-10-18 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 103de914a5f [SPARK-44649][SQL] Runtime Filter supports passing 
equivalent creation side expressions
103de914a5f is described below

commit 103de914a5f96fccbe722663ee69c8ee7d9c8135
Author: Jiaan Geng 
AuthorDate: Wed Oct 18 14:55:51 2023 +0800

[SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side 
expressions

### What changes were proposed in this pull request?
Currently, Spark runtime filter supports multi level shuffle join side as 
filter creation side. Please see: https://github.com/apache/spark/pull/39170. 
Although this feature adds the adaptive scene and improves the performance, 
there are still need to support other case.

**Optimization of Expression Transitivity on the Creation Side of Spark 
Runtime Filter**

**Principle**
Association expressions are transitive in some Joins, such as:
`Tab1.col1A = Tab2.col2B` and `Tab2.col2B = Tab3.col3C`
Actually, it can be inferred that `Tab1.col1A = Tab3.col3C`.

**Optimization points**
Currently, the runtime filter's creation side expression only uses directly 
associated keys. If the transitivity of association conditions is utilized, 
runtime filters can be injected into many scenarios, such as:

```
SELECT *
FROM (
  SELECT *
  FROM tab1
JOIN tab2 ON tab1.c1 = tab2.c2
  WHERE tab2.a2 = 5
) AS a
  JOIN tab3 ON tab3.c3 = a.c1
```

The `tab3.c3` here is only associated with `tab1.c1` and not with 
`tab2.c2`. Although there is selective filtering on tab2 (`tab2.a2 = 5`), Spark 
is currently unable to inject a Runtime Filter.
As long as transitivity is considered, we can know that `tab3.c3` and 
`tab2.c2` are related, so we can still inject Runtime Filter and improve 
performance.

For the current implementation, Spark only inject runtime filter into tab1 
with bloom filter based on `bf2.a2 = 5`.
Because there is no the join between tab3 and tab2, so Spark can't inject 
runtime filter into tab3 with the same bloom filter.
But the above SQL have the join condition `tab3.c3 = a.c1(tab1.c1)` between 
tab3 and tab2, and also have the join condition `tab1.c1 = tab2.c2`. We can 
rely on the transitivity of the join condition to get the virtual join 
condition `tab3.c3 = tab2.c2`, then we can inject the bloom filter based on 
`bf2.a2 = 5` into tab3.

### Why are the changes needed?
Enhance the Spark runtime filter and improve performance.

### Does this PR introduce _any_ user-facing change?
'No'.
Just update the inner implementation.

### How was this patch tested?
New tests.
Micro benchmark for q75 in TPC-DS.
**2TB TPC-DS**
| TPC-DS Query   | Before(Seconds)  | After(Seconds)  | Speedup(Percent)  |
|    |   |   |   |
| q75 | 129.664 | 81.562 | 58.98% |

Closes #42317 from beliefer/SPARK-44649.

Authored-by: Jiaan Geng 
Signed-off-by: Wenchen Fan 
---
 .../catalyst/optimizer/InjectRuntimeFilter.scala   | 64 +++---
 .../spark/sql/InjectRuntimeFilterSuite.scala   | 38 +++--
 2 files changed, 75 insertions(+), 27 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
index 8737082e571..30526bd8106 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
@@ -125,14 +125,14 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
*/
   private def extractSelectiveFilterOverScan(
   plan: LogicalPlan,
-  filterCreationSideKey: Expression): Option[LogicalPlan] = {
-@tailrec
+  filterCreationSideKey: Expression): Option[(Expression, LogicalPlan)] = {
 def extract(
 p: LogicalPlan,
 predicateReference: AttributeSet,
 hasHitFilter: Boolean,
 hasHitSelectiveFilter: Boolean,
-currentPlan: LogicalPlan): Option[LogicalPlan] = p match {
+currentPlan: LogicalPlan,
+targetKey: Expression): Option[(Expression, LogicalPlan)] = p match {
   case Project(projectList, child) if hasHitFilter =>
 // We need to make sure all expressions referenced by filter 
predicates are simple
 // expressions.
@@ -143,41 +143,62 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
 referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ 
++ _),
 hasHitFilter,
 

[spark] branch master updated: [SPARK-45558][SS] Introduce a metadata file for streaming stateful operator

2023-10-18 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3005dc89084 [SPARK-45558][SS] Introduce a metadata file for streaming 
stateful operator
3005dc89084 is described below

commit 3005dc8908486f63a3e471cd05189881b833daf1
Author: Chaoqin Li 
AuthorDate: Wed Oct 18 15:49:43 2023 +0900

[SPARK-45558][SS] Introduce a metadata file for streaming stateful operator

### What changes were proposed in this pull request?
Introduce a metadata file for streaming stateful operator, write metadata 
for stateful operator during planning.
The information to store in the metadata file:
- operator name (no need to be unique among stateful operators in the query)
- state store name
- numColumnsPrefixKey: > 0 if prefix scan is enabled, 0 otherwise
The body of metadata file will be in json format. The metadata file will be 
versioned just as other streaming metadata file to be future proof.

### Why are the changes needed?
The metadata file will improve expose more information about the state 
store, improves debugability and facilitate the development of state related 
feature such as reading and writing state and state repartitioning.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add unit test and integration tests

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43393 from chaoqin-li1123/state_metadata.

Authored-by: Chaoqin Li 
Signed-off-by: Jungtaek Lim 
---
 .../spark/sql/execution/QueryExecution.scala   |   2 +-
 .../execution/streaming/IncrementalExecution.scala |  22 ++-
 .../execution/streaming/MicroBatchExecution.scala  |   4 +-
 .../streaming/StreamingSymmetricHashJoinExec.scala |  10 ++
 .../streaming/continuous/ContinuousExecution.scala |   3 +-
 .../streaming/state/OperatorStateMetadata.scala| 136 
 .../execution/streaming/statefulOperators.scala|  21 ++-
 .../state/OperatorStateMetadataSuite.scala | 181 +
 8 files changed, 374 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index b3c97a83970..3d35300773b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -272,7 +272,7 @@ class QueryExecution(
   new IncrementalExecution(
 sparkSession, logical, OutputMode.Append(), "",
 UUID.randomUUID, UUID.randomUUID, 0, None, OffsetSeqMetadata(0, 0),
-WatermarkPropagator.noop())
+WatermarkPropagator.noop(), false)
 } else {
   this
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index ebdb9caf09e..a67097f6e96 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.streaming
 import java.util.UUID
 import java.util.concurrent.atomic.AtomicInteger
 
+import org.apache.hadoop.fs.Path
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.QueryPlanningTracker
@@ -32,6 +34,7 @@ import 
org.apache.spark.sql.execution.aggregate.{HashAggregateExec, MergingSessi
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
 import org.apache.spark.sql.execution.python.FlatMapGroupsInPandasWithStateExec
 import 
org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSourceV1
+import 
org.apache.spark.sql.execution.streaming.state.OperatorStateMetadataWriter
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.util.Utils
@@ -50,7 +53,8 @@ class IncrementalExecution(
 val currentBatchId: Long,
 val prevOffsetSeqMetadata: Option[OffsetSeqMetadata],
 val offsetSeqMetadata: OffsetSeqMetadata,
-val watermarkPropagator: WatermarkPropagator)
+val watermarkPropagator: WatermarkPropagator,
+val isFirstBatch: Boolean)
   extends QueryExecution(sparkSession, logicalPlan) with Logging {
 
   // Modified planner with stateful operations.
@@ -71,6 +75,8 @@ class IncrementalExecution(
   StreamingGlobalLimitStrategy(outputMode) :: Nil
   }
 
+  private lazy val hadoopConf = sparkSession.sessionState.newHadoopConf()
+
   private[sql] val 

[spark] branch master updated (b1e57a2b359 -> d7d38fbc184)

2023-10-18 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from b1e57a2b359 [SPARK-45534][CORE] Use java.lang.ref.Cleaner instead of 
finalize for RemoteBlockPushResolver
 add d7d38fbc184 [SPARK-45587][INFRA] Skip UNIDOC and MIMA in `build` 
GitHub Action job

No new revisions were added by this update.

Summary of changes:
 .github/workflows/build_and_test.yml | 4 
 1 file changed, 4 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] branch master updated: [SPARK-45534][CORE] Use java.lang.ref.Cleaner instead of finalize for RemoteBlockPushResolver

2023-10-18 Thread mridulm80
This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b1e57a2b359 [SPARK-45534][CORE] Use java.lang.ref.Cleaner instead of 
finalize for RemoteBlockPushResolver
b1e57a2b359 is described below

commit b1e57a2b359d7d9fbf07adfba10db97f38b99bde
Author: zhaomin 
AuthorDate: Wed Oct 18 01:20:08 2023 -0500

[SPARK-45534][CORE] Use java.lang.ref.Cleaner instead of finalize for 
RemoteBlockPushResolver

### What changes were proposed in this pull request?

use java.lang.ref.Cleaner instead of finalize() for RemoteBlockPushResolver

### Why are the changes needed?

The finalize() method has been marked as deprecated since Java 9 and will 
be removed in the future, java.lang.ref.Cleaner is the more recommended 
solution.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43371 from zhaomin1423/45315.

Authored-by: zhaomin 
Signed-off-by: Mridul Muralidharan gmail.com>
---
 .../network/shuffle/RemoteBlockPushResolver.java   | 101 +
 .../network/shuffle/ShuffleTestAccessor.scala  |   2 +-
 2 files changed, 64 insertions(+), 39 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index a915d0eccb0..14fefebe089 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -21,6 +21,7 @@ import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.lang.ref.Cleaner;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.charset.StandardCharsets;
@@ -94,6 +95,7 @@ import org.apache.spark.network.util.TransportConf;
  */
 public class RemoteBlockPushResolver implements MergedShuffleFileManager {
 
+  private static final Cleaner CLEANER = Cleaner.create();
   private static final Logger logger = 
LoggerFactory.getLogger(RemoteBlockPushResolver.class);
 
   public static final String MERGED_SHUFFLE_FILE_NAME_PREFIX = "shuffleMerged";
@@ -481,7 +483,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
 appShuffleInfo.shuffles.forEach((shuffleId, shuffleInfo) -> 
shuffleInfo.shuffleMergePartitions
   .forEach((shuffleMergeId, partitionInfo) -> {
 synchronized (partitionInfo) {
-  partitionInfo.closeAllFilesAndDeleteIfNeeded(false);
+  partitionInfo.cleanable.clean();
 }
   }));
 if (cleanupLocalDirs) {
@@ -537,7 +539,8 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
 partitions
   .forEach((partitionId, partitionInfo) -> {
 synchronized (partitionInfo) {
-  partitionInfo.closeAllFilesAndDeleteIfNeeded(true);
+  partitionInfo.cleanable.clean();
+  partitionInfo.deleteAllFiles();
 }
   });
   }
@@ -822,7 +825,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
 msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId, 
partition.reduceId,
 ioe.getMessage());
   } finally {
-partition.closeAllFilesAndDeleteIfNeeded(false);
+partition.cleanable.clean();
   }
 }
   }
@@ -1720,6 +1723,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
 // The meta file for a particular merged shuffle contains all the map 
indices that belong to
 // every chunk. The entry per chunk is a serialized bitmap.
 private final MergeShuffleFile metaFile;
+private final Cleaner.Cleanable cleanable;
 // Location offset of the last successfully merged block for this shuffle 
partition
 private long dataFilePos;
 // Track the map index whose block is being merged for this shuffle 
partition
@@ -1756,6 +1760,8 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
   this.dataFilePos = 0;
   this.mapTracker = new RoaringBitmap();
   this.chunkTracker = new RoaringBitmap();
+  this.cleanable = CLEANER.register(this, new ResourceCleaner(dataChannel, 
indexFile,
+metaFile, appAttemptShuffleMergeId, reduceId));
 }
 
 public long getDataFilePos() {
@@ -1864,36 +1870,13 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
   

[spark] branch master updated: [SPARK-45035][SQL] Fix ignoreCorruptFiles/ignoreMissingFiles with multiline CSV/JSON will report error

2023-10-18 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 11e7ea4f11d [SPARK-45035][SQL] Fix 
ignoreCorruptFiles/ignoreMissingFiles with multiline CSV/JSON will report error
11e7ea4f11d is described below

commit 11e7ea4f11df71e2942322b01fcaab57dac20c83
Author: Jia Fan 
AuthorDate: Wed Oct 18 11:06:43 2023 +0500

[SPARK-45035][SQL] Fix ignoreCorruptFiles/ignoreMissingFiles with multiline 
CSV/JSON will report error

### What changes were proposed in this pull request?
Fix ignoreCorruptFiles/ignoreMissingFiles with multiline CSV/JSON will 
report error, it would be like:
```log
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
in stage 4940.0 failed 4 times, most recent failure: Lost task 0.3 in stage 
4940.0 (TID 4031) (10.68.177.106 executor 0): 
com.univocity.parsers.common.TextParsingException: 
java.lang.IllegalStateException - Error reading from input
Parser Configuration: CsvParserSettings:
Auto configuration enabled=true
Auto-closing enabled=true
Autodetect column delimiter=false
Autodetect quotes=false
Column reordering enabled=true
Delimiters for detection=null
Empty value=
Escape unquoted values=false
Header extraction enabled=null
Headers=null
Ignore leading whitespaces=false
Ignore leading whitespaces in quotes=false
Ignore trailing whitespaces=false
Ignore trailing whitespaces in quotes=false
Input buffer size=1048576
Input reading on separate thread=false
Keep escape sequences=false
Keep quotes=false
Length of content displayed on error=1000
Line separator detection enabled=true
Maximum number of characters per column=-1
Maximum number of columns=20480
Normalize escaped line separators=true
Null value=
Number of records to read=all
Processor=none
Restricting data in exceptions=false
RowProcessor error handler=null
Selected fields=none
Skip bits as whitespace=true
Skip empty lines=true
Unescaped quote handling=STOP_AT_DELIMITERFormat configuration:
CsvFormat:
Comment character=#
Field delimiter=,
Line separator (normalized)=\n
Line separator sequence=\n
Quote character="
Quote escape character=\
Quote escape escape character=null
Internal state when error was thrown: line=0, column=0, record=0
at 
com.univocity.parsers.common.AbstractParser.handleException(AbstractParser.java:402)
at 
com.univocity.parsers.common.AbstractParser.beginParsing(AbstractParser.java:277)
at 
com.univocity.parsers.common.AbstractParser.beginParsing(AbstractParser.java:843)
at 
org.apache.spark.sql.catalyst.csv.UnivocityParser$$anon$1.(UnivocityParser.scala:463)
at 
org.apache.spark.sql.catalyst.csv.UnivocityParser$.convertStream(UnivocityParser.scala:46...
```
Because multiline CSV/JSON use `BinaryFileRDD` not `FileScanRDD`. Unlike 
`FileScanRDD`, when met corrupt files will check `ignoreCorruptFiles` config to 
avoid report IOException, `BinaryFileRDD` will not report error because it 
return normal `PortableDataStream`. So we should catch it when infer schema in 
lambda function. Also do same thing for `ignoreMissingFiles`.

### Why are the changes needed?
Fix the bug when use mulitline mode with 
ignoreCorruptFiles/ignoreMissingFiles config.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
add new test.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #42979 from Hisoka-X/SPARK-45035_csv_multi_line.

Authored-by: Jia Fan 
Signed-off-by: Max Gekk 
---
 .../spark/sql/catalyst/json/JsonInferSchema.scala  | 18 +--
 .../execution/datasources/csv/CSVDataSource.scala  | 28 ---
 .../datasources/CommonFileDataSourceSuite.scala| 28 +++
 .../sql/execution/datasources/csv/CSVSuite.scala   | 58 +-
 .../sql/execution/datasources/json/JsonSuite.scala | 46 -
 5 files changed, 142 insertions(+), 36 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index 4123c5290b6..4d04b34876c 100644
---