[spark] branch master updated: [SPARK-45220][PYTHON][DOCS] Refine docstring of DataFrame.join
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new db162369941 [SPARK-45220][PYTHON][DOCS] Refine docstring of DataFrame.join db162369941 is described below commit db162369941dacbbd7eceb98896d498666f7fac8 Author: allisonwang-db AuthorDate: Thu Oct 19 14:25:30 2023 +0900 [SPARK-45220][PYTHON][DOCS] Refine docstring of DataFrame.join ### What changes were proposed in this pull request? This PR refines the docstring of `DataFrame.join` by adding more examples and explanations. ### Why are the changes needed? To improve PySpark documentation. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? doctest ### Was this patch authored or co-authored using generative AI tooling? No Closes #43039 from allisonwang-db/spark-45220-refine-join. Authored-by: allisonwang-db Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/dataframe.py | 167 +--- 1 file changed, 124 insertions(+), 43 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 637787ceb66..34df2bafcf8 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -2646,7 +2646,8 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): on: Optional[Union[str, List[str], Column, List[Column]]] = None, how: Optional[str] = None, ) -> "DataFrame": -"""Joins with another :class:`DataFrame`, using the given join expression. +""" +Joins with another :class:`DataFrame`, using the given join expression. .. versionadded:: 1.3.0 @@ -2675,39 +2676,55 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin): Examples -The following performs a full outer join between ``df1`` and ``df2``. +The following examples demonstrate various join types among ``df1``, ``df2``, and ``df3``. +>>> import pyspark.sql.functions as sf >>> from pyspark.sql import Row ->>> from pyspark.sql.functions import desc ->>> df = spark.createDataFrame([(2, "Alice"), (5, "Bob")]).toDF("age", "name") ->>> df2 = spark.createDataFrame([Row(height=80, name="Tom"), Row(height=85, name="Bob")]) ->>> df3 = spark.createDataFrame([Row(age=2, name="Alice"), Row(age=5, name="Bob")]) ->>> df4 = spark.createDataFrame([ -... Row(age=10, height=80, name="Alice"), -... Row(age=5, height=None, name="Bob"), -... Row(age=None, height=None, name="Tom"), -... Row(age=None, height=None, name=None), +>>> df = spark.createDataFrame([Row(name="Alice", age=2), Row(name="Bob", age=5)]) +>>> df2 = spark.createDataFrame([Row(name="Tom", height=80), Row(name="Bob", height=85)]) +>>> df3 = spark.createDataFrame([ +... Row(name="Alice", age=10, height=80), +... Row(name="Bob", age=5, height=None), +... Row(name="Tom", age=None, height=None), +... Row(name=None, age=None, height=None), ... ]) Inner join on columns (default) ->>> df.join(df2, 'name').select(df.name, df2.height).show() -++--+ -|name|height| -++--+ -| Bob|85| -++--+ ->>> df.join(df4, ['name', 'age']).select(df.name, df.age).show() -++---+ -|name|age| -++---+ -| Bob| 5| -++---+ - -Outer join for both DataFrames on the 'name' column. - ->>> df.join(df2, df.name == df2.name, 'outer').select( -... df.name, df2.height).sort(desc("name")).show() +>>> df.join(df2, "name").show() +++---+--+ +|name|age|height| +++---+--+ +| Bob| 5|85| +++---+--+ + +>>> df.join(df3, ["name", "age"]).show() +++---+--+ +|name|age|height| +++---+--+ +| Bob| 5| NULL| +++---+--+ + +Outer join on a single column with an explicit join condition. + +When the join condition is explicited stated: `df.name == df2.name`, this will +produce all records where the names match, as well as those that don't (since +it's an outer join). If there are names in `df2` that are not present in `df`, +they will appear with `NULL` in the `name` column of `df`, and vice versa for `df2`. + +>>> joined = df.join(df2, df.name == df2.name, "outer").sort(sf.desc(df.name)) +>>> joined.show() # doctest: +SKIP ++-+++--+ +| name| age|name|height| +
[spark] branch master updated: [SPARK-45589][PYTHON][DOCS] Supplementary exception class
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 47d3fa83e70 [SPARK-45589][PYTHON][DOCS] Supplementary exception class 47d3fa83e70 is described below commit 47d3fa83e70ec0b05ac72c6b7c47ee905543b6bd Author: panbingkun AuthorDate: Thu Oct 19 14:18:16 2023 +0900 [SPARK-45589][PYTHON][DOCS] Supplementary exception class ### What changes were proposed in this pull request? The pr aims to supplementary exception class for pyspark docs. ### Why are the changes needed? Currently, there are only the following classes in the document, https://github.com/apache/spark/assets/15246973/c297d00b-5534-4751-9d97-90c0b4d14a81;> Actually: https://github.com/apache/spark/assets/15246973/42a8a0e3-ba90-4c33-8b2f-1deef3753896;> ### Does this PR introduce _any_ user-facing change? Yes, more complete documentation on exception classes. ### How was this patch tested? - Pass GA. - Manually test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43427 from panbingkun/SPARK-45589. Authored-by: panbingkun Signed-off-by: Hyukjin Kwon --- python/docs/source/reference/pyspark.errors.rst | 25 +++-- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/python/docs/source/reference/pyspark.errors.rst b/python/docs/source/reference/pyspark.errors.rst index 13db9bd01fa..8723de8ebf2 100644 --- a/python/docs/source/reference/pyspark.errors.rst +++ b/python/docs/source/reference/pyspark.errors.rst @@ -28,16 +28,29 @@ Classes .. autosummary:: :toctree: api/ -PySparkException AnalysisException -TempTableAlreadyExistsException -ParseException +ArithmeticException +ArrayIndexOutOfBoundsException +DateTimeException IllegalArgumentException -StreamingQueryException -QueryExecutionException +NumberFormatException +ParseException +PySparkAssertionError +PySparkAttributeError +PySparkException +PySparkNotImplementedError +PySparkPicklingError +PySparkRuntimeError +PySparkTypeError +PySparkValueError PythonException -UnknownException +QueryExecutionException +SparkRuntimeException SparkUpgradeException +StreamingQueryException +TempTableAlreadyExistsException +UnknownException +UnsupportedOperationException Methods - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45588][PROTOBUF][CONNECT][MINOR] Scaladoc improvement for StreamingForeachBatchHelper
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new e359f210c45 [SPARK-45588][PROTOBUF][CONNECT][MINOR] Scaladoc improvement for StreamingForeachBatchHelper e359f210c45 is described below commit e359f210c45abc17f0bcd32c9a86faf678caff75 Author: Raghu Angadi AuthorDate: Thu Oct 19 14:15:43 2023 +0900 [SPARK-45588][PROTOBUF][CONNECT][MINOR] Scaladoc improvement for StreamingForeachBatchHelper ### What changes were proposed in this pull request? Couple of minor improvements to `StreamingForeachBatchHelper`: * Make `RunnerCleaner` private and add ScalaDoc. * Update contract for `pythonForeachBatchWrapper()` to inform that call should eventually should `close()` the `AutoClosable` returned. In addition, it also fixes a flake in Protobuf unit test. ### Why are the changes needed? - Code readability improvement. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Existing tests. - For protobuf suite, verified with seed set to '399'. It fails before this PR and passes after. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43424 from rangadi/feb-scaladoc. Authored-by: Raghu Angadi Signed-off-by: Jungtaek Lim --- .../apache/spark/sql/connect/planner/SparkConnectPlanner.scala | 2 +- .../spark/sql/connect/planner/StreamingForeachBatchHelper.scala | 9 ++--- .../sql/connect/service/SparkConnectSessionHodlerSuite.scala | 4 +++- .../spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala | 2 +- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index fa964c02a25..299f4f8830a 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -2927,7 +2927,7 @@ class SparkConnectPlanner( } // This is filled when a foreach batch runner started for Python. -var foreachBatchRunnerCleaner: Option[StreamingForeachBatchHelper.RunnerCleaner] = None +var foreachBatchRunnerCleaner: Option[AutoCloseable] = None if (writeOp.hasForeachBatch) { val foreachBatchFn = writeOp.getForeachBatch.getFunctionCase match { diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala index b8097b23550..ce75ba3eb59 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala @@ -40,7 +40,9 @@ object StreamingForeachBatchHelper extends Logging { type ForeachBatchFnType = (DataFrame, Long) => Unit - case class RunnerCleaner(runner: StreamingPythonRunner) extends AutoCloseable { + // Visible for testing. + /** An AutoClosable to clean up resources on query termination. Stops Python worker. */ + private[connect] case class RunnerCleaner(runner: StreamingPythonRunner) extends AutoCloseable { override def close(): Unit = { try runner.stop() catch { @@ -98,11 +100,12 @@ object StreamingForeachBatchHelper extends Logging { /** * Starts up Python worker and initializes it with Python function. Returns a foreachBatch * function that sets up the session and Dataframe cache and and interacts with the Python - * worker to execute user's function. + * worker to execute user's function. In addition, it returns an AutoClosable. The caller must + * ensure it is closed so that worker process and related resources are released. */ def pythonForeachBatchWrapper( pythonFn: SimplePythonFunction, - sessionHolder: SessionHolder): (ForeachBatchFnType, RunnerCleaner) = { + sessionHolder: SessionHolder): (ForeachBatchFnType, AutoCloseable) = { val port = SparkConnectService.localPort val connectUrl = s"sc://localhost:$port/;user_id=${sessionHolder.userId}" diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala index a6451de8fc2..910c2a2650c 100644 ---
[spark] branch master updated: [MINOR][CONNECT] Fix a typo in org.apache.spark.sql.connect.client.ArtifactManager
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3042ffd08fb [MINOR][CONNECT] Fix a typo in org.apache.spark.sql.connect.client.ArtifactManager 3042ffd08fb is described below commit 3042ffd08fbd3c3e81c6f7f2ed14448ad45292d0 Author: zhaomin AuthorDate: Thu Oct 19 14:10:11 2023 +0900 [MINOR][CONNECT] Fix a typo in org.apache.spark.sql.connect.client.ArtifactManager ### What changes were proposed in this pull request? Change 'Unsuppoted' in the exception information to 'Unsupported' ### Why are the changes needed? this is a typo ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? pass ci ### Was this patch authored or co-authored using generative AI tooling? No Closes #43440 from zhaomin1423/typo. Authored-by: zhaomin Signed-off-by: Hyukjin Kwon --- .../scala/org/apache/spark/sql/connect/client/ArtifactManager.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala index 6b08737ed21..2f8eacb0690 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala @@ -87,7 +87,7 @@ class ArtifactManager( case cf if cf.endsWith(".class") => newClassArtifact(path.getFileName, new LocalFile(path)) case other => -throw new UnsupportedOperationException(s"Unsuppoted file format: $other") +throw new UnsupportedOperationException(s"Unsupported file format: $other") } Seq[Artifact](artifact) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-3.5 updated: [SPARK-45553][PS] Deprecate `assertPandasOnSparkEqual`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 17d283990b6 [SPARK-45553][PS] Deprecate `assertPandasOnSparkEqual` 17d283990b6 is described below commit 17d283990b64614828838afa718f48b855ab7842 Author: Haejoon Lee AuthorDate: Thu Oct 19 13:57:01 2023 +0900 [SPARK-45553][PS] Deprecate `assertPandasOnSparkEqual` ### What changes were proposed in this pull request? This PR proposes to deprecate `assertPandasOnSparkEqual`. ### Why are the changes needed? Now we have more pandas friendly testing utils such as `ps.testing.assert_frame_equal`, `ps.testing.assert_series_equal` and `ps.testing.assert_index_equal`. ### Does this PR introduce _any_ user-facing change? Not for now, but `assertPandasOnSparkEqual` will be removed in the future version. ### How was this patch tested? The existing CI should pass. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43426 from itholic/SPARK-45553. Authored-by: Haejoon Lee Signed-off-by: Hyukjin Kwon (cherry picked from commit f3e280b952da8b8ab6c78371f3715cc674a73bc1) Signed-off-by: Hyukjin Kwon --- python/pyspark/testing/pandasutils.py | 11 +++ 1 file changed, 11 insertions(+) diff --git a/python/pyspark/testing/pandasutils.py b/python/pyspark/testing/pandasutils.py index c80ffb7ee53..04a523bce76 100644 --- a/python/pyspark/testing/pandasutils.py +++ b/python/pyspark/testing/pandasutils.py @@ -365,6 +365,11 @@ def assertPandasOnSparkEqual( .. versionadded:: 3.5.0 +.. deprecated:: 3.5.1 +`assertPandasOnSparkEqual` will be removed in Spark 4.0.0. +Use `ps.testing.assert_frame_equal`, `ps.testing.assert_series_equal` +and `ps.testing.assert_index_equal` instead. + Parameters -- actual: pandas-on-Spark DataFrame, Series, or Index @@ -417,6 +422,12 @@ def assertPandasOnSparkEqual( >>> s2 = ps.Index([212.3, 100.0001]) >>> assertPandasOnSparkEqual(s1, s2, almost=True) # pass, ps.Index obj are almost equal """ +warnings.warn( +"`assertPandasOnSparkEqual` will be removed in Spark 4.0.0. " +"Use `ps.testing.assert_frame_equal`, `ps.testing.assert_series_equal` " +"and `ps.testing.assert_index_equal` instead.", +FutureWarning, +) if actual is None and expected is None: return True elif actual is None or expected is None: - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45553][PS] Deprecate `assertPandasOnSparkEqual`
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f3e280b952d [SPARK-45553][PS] Deprecate `assertPandasOnSparkEqual` f3e280b952d is described below commit f3e280b952da8b8ab6c78371f3715cc674a73bc1 Author: Haejoon Lee AuthorDate: Thu Oct 19 13:57:01 2023 +0900 [SPARK-45553][PS] Deprecate `assertPandasOnSparkEqual` ### What changes were proposed in this pull request? This PR proposes to deprecate `assertPandasOnSparkEqual`. ### Why are the changes needed? Now we have more pandas friendly testing utils such as `ps.testing.assert_frame_equal`, `ps.testing.assert_series_equal` and `ps.testing.assert_index_equal`. ### Does this PR introduce _any_ user-facing change? Not for now, but `assertPandasOnSparkEqual` will be removed in the future version. ### How was this patch tested? The existing CI should pass. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43426 from itholic/SPARK-45553. Authored-by: Haejoon Lee Signed-off-by: Hyukjin Kwon --- python/pyspark/testing/pandasutils.py | 11 +++ 1 file changed, 11 insertions(+) diff --git a/python/pyspark/testing/pandasutils.py b/python/pyspark/testing/pandasutils.py index bb9ca0dc74e..9abffefdbe7 100644 --- a/python/pyspark/testing/pandasutils.py +++ b/python/pyspark/testing/pandasutils.py @@ -341,6 +341,11 @@ def assertPandasOnSparkEqual( .. versionadded:: 3.5.0 +.. deprecated:: 3.5.1 +`assertPandasOnSparkEqual` will be removed in Spark 4.0.0. +Use `ps.testing.assert_frame_equal`, `ps.testing.assert_series_equal` +and `ps.testing.assert_index_equal` instead. + Parameters -- actual: pandas-on-Spark DataFrame, Series, or Index @@ -393,6 +398,12 @@ def assertPandasOnSparkEqual( >>> s2 = ps.Index([212.3, 100.0001]) >>> assertPandasOnSparkEqual(s1, s2, almost=True) # pass, ps.Index obj are almost equal """ +warnings.warn( +"`assertPandasOnSparkEqual` will be removed in Spark 4.0.0. " +"Use `ps.testing.assert_frame_equal`, `ps.testing.assert_series_equal` " +"and `ps.testing.assert_index_equal` instead.", +FutureWarning, +) if actual is None and expected is None: return True elif actual is None or expected is None: - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] 02/02: Revert "[SPARK-45546][BUILD][INFRA] Make `publish-snapshot` support `package` first then `deploy`"
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git commit e37dd3ab8e0707eead2cb068bc19456349ccdd86 Author: Hyukjin Kwon AuthorDate: Thu Oct 19 13:50:31 2023 +0900 Revert "[SPARK-45546][BUILD][INFRA] Make `publish-snapshot` support `package` first then `deploy`" This reverts commit 3ef18e2d00f386196292f0c768816626bc903d47. --- .github/workflows/publish_snapshot.yml | 4 dev/create-release/release-build.sh| 14 ++ pom.xml| 7 --- 3 files changed, 2 insertions(+), 23 deletions(-) diff --git a/.github/workflows/publish_snapshot.yml b/.github/workflows/publish_snapshot.yml index 476d41d0cf1..7ed836f016b 100644 --- a/.github/workflows/publish_snapshot.yml +++ b/.github/workflows/publish_snapshot.yml @@ -66,8 +66,4 @@ jobs: GPG_KEY: "not_used" GPG_PASSPHRASE: "not_used" GIT_REF: ${{ matrix.branch }} -# SPARK-45546 adds this environment variable to split the publish snapshot process into two steps: -# first package, then deploy. This is intended to reduce the resource pressure of deploy. -# When PACKAGE_BEFORE_DEPLOY is not set to true, it will revert to the one-step deploy method. -PACKAGE_BEFORE_DEPLOY: true run: ./dev/create-release/release-build.sh publish-snapshot diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh index 3776c64e31e..f3571c4e48c 100755 --- a/dev/create-release/release-build.sh +++ b/dev/create-release/release-build.sh @@ -432,24 +432,14 @@ if [[ "$1" == "publish-snapshot" ]]; then echo "" >> $tmp_settings if [[ $PUBLISH_SCALA_2_12 = 1 ]]; then -if [ "$PACKAGE_BEFORE_DEPLOY" = "true" ]; then - $MVN -DskipTests $SCALA_2_12_PROFILES $PUBLISH_PROFILES clean package - $MVN --settings $tmp_settings -DskipTests $SCALA_2_12_PROFILES $PUBLISH_PROFILES deploy -else - $MVN --settings $tmp_settings -DskipTests $SCALA_2_12_PROFILES $PUBLISH_PROFILES clean deploy -fi +$MVN --settings $tmp_settings -DskipTests $SCALA_2_12_PROFILES $PUBLISH_PROFILES clean deploy fi if [[ $PUBLISH_SCALA_2_13 = 1 ]]; then if [[ $SPARK_VERSION < "4.0" ]]; then ./dev/change-scala-version.sh 2.13 fi -if [ "$PACKAGE_BEFORE_DEPLOY" = "true" ]; then - $MVN -DskipTests $SCALA_2_13_PROFILES $PUBLISH_PROFILES clean package - $MVN --settings $tmp_settings -DskipTests $SCALA_2_13_PROFILES $PUBLISH_PROFILES deploy -else - $MVN --settings $tmp_settings -DskipTests $SCALA_2_13_PROFILES $PUBLISH_PROFILES clean deploy -fi +$MVN --settings $tmp_settings -DskipTests $SCALA_2_13_PROFILES $PUBLISH_PROFILES clean deploy fi rm $tmp_settings diff --git a/pom.xml b/pom.xml index ade6537c2a1..824ae49f6da 100644 --- a/pom.xml +++ b/pom.xml @@ -3840,11 +3840,4 @@ - - - internal.snapshot - Internal Snapshot Repository - http://localhost:8081/repository/maven-snapshots/ - - - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (a3c17b2e229 -> e37dd3ab8e0)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from a3c17b2e229 [SPARK-45562][SQL][FOLLOW-UP] XML: Make 'rowTag' option check case insensitive new 706872d4de2 Revert "[SPARK-45546][BUILD][FOLLOW-UP] Remove snapshot repo mistakenly added" new e37dd3ab8e0 Revert "[SPARK-45546][BUILD][INFRA] Make `publish-snapshot` support `package` first then `deploy`" The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .github/workflows/publish_snapshot.yml | 4 dev/create-release/release-build.sh| 14 ++ 2 files changed, 2 insertions(+), 16 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] 01/02: Revert "[SPARK-45546][BUILD][FOLLOW-UP] Remove snapshot repo mistakenly added"
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git commit 706872d4de2374d1faf84d8706611a092c0b6e76 Author: Hyukjin Kwon AuthorDate: Thu Oct 19 13:50:22 2023 +0900 Revert "[SPARK-45546][BUILD][FOLLOW-UP] Remove snapshot repo mistakenly added" This reverts commit 74dc5a3d8c0ffe425dfadec44e41615a8f3f8367. --- pom.xml | 7 +++ 1 file changed, 7 insertions(+) diff --git a/pom.xml b/pom.xml index 824ae49f6da..ade6537c2a1 100644 --- a/pom.xml +++ b/pom.xml @@ -3840,4 +3840,11 @@ + + + internal.snapshot + Internal Snapshot Repository + http://localhost:8081/repository/maven-snapshots/ + + - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45562][SQL][FOLLOW-UP] XML: Make 'rowTag' option check case insensitive
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a3c17b2e229 [SPARK-45562][SQL][FOLLOW-UP] XML: Make 'rowTag' option check case insensitive a3c17b2e229 is described below commit a3c17b2e22969de3d225fc9890023456592f6158 Author: Sandip Agarwala <131817656+sandip...@users.noreply.github.com> AuthorDate: Thu Oct 19 13:23:04 2023 +0900 [SPARK-45562][SQL][FOLLOW-UP] XML: Make 'rowTag' option check case insensitive ### What changes were proposed in this pull request? [PR 43389](https://github.com/apache/spark/pull/43389) made `rowTag` option required for XML read and write. However, the option check was done in a case sensitive manner. This PR makes the check case-insensitive. ### Why are the changes needed? Options are case-insensitive. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43416 from sandip-db/xml-rowTagCaseInsensitive. Authored-by: Sandip Agarwala <131817656+sandip...@users.noreply.github.com> Signed-off-by: Hyukjin Kwon --- .../org/apache/spark/sql/catalyst/xml/XmlOptions.scala | 17 +++-- .../sql/execution/datasources/xml/XmlFileFormat.scala | 5 ++--- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala index 0dedbec58e1..d2c7b435fe6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala @@ -34,7 +34,8 @@ import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} private[sql] class XmlOptions( @transient val parameters: CaseInsensitiveMap[String], defaultTimeZoneId: String, -defaultColumnNameOfCorruptRecord: String) +defaultColumnNameOfCorruptRecord: String, +rowTagRequired: Boolean) extends FileSourceOptions(parameters) with Logging { import XmlOptions._ @@ -42,11 +43,13 @@ private[sql] class XmlOptions( def this( parameters: Map[String, String] = Map.empty, defaultTimeZoneId: String = SQLConf.get.sessionLocalTimeZone, - defaultColumnNameOfCorruptRecord: String = SQLConf.get.columnNameOfCorruptRecord) = { + defaultColumnNameOfCorruptRecord: String = SQLConf.get.columnNameOfCorruptRecord, + rowTagRequired: Boolean = false) = { this( CaseInsensitiveMap(parameters), defaultTimeZoneId, - defaultColumnNameOfCorruptRecord) + defaultColumnNameOfCorruptRecord, + rowTagRequired) } private def getBool(paramName: String, default: Boolean = false): Boolean = { @@ -63,7 +66,9 @@ private[sql] class XmlOptions( } val compressionCodec = parameters.get(COMPRESSION).map(CompressionCodecs.getCodecClassName) - val rowTag = parameters.getOrElse(ROW_TAG, XmlOptions.DEFAULT_ROW_TAG).trim + val rowTagOpt = parameters.get(XmlOptions.ROW_TAG) + require(!rowTagRequired || rowTagOpt.isDefined, s"'${XmlOptions.ROW_TAG}' option is required.") + val rowTag = rowTagOpt.getOrElse(XmlOptions.DEFAULT_ROW_TAG).trim require(rowTag.nonEmpty, s"'$ROW_TAG' option should not be an empty string.") require(!rowTag.startsWith("<") && !rowTag.endsWith(">"), s"'$ROW_TAG' should not include angle brackets") @@ -223,8 +228,8 @@ private[sql] object XmlOptions extends DataSourceOptions { newOption(ENCODING, CHARSET) def apply(parameters: Map[String, String]): XmlOptions = -new XmlOptions(parameters, SQLConf.get.sessionLocalTimeZone) +new XmlOptions(parameters) def apply(): XmlOptions = -new XmlOptions(Map.empty, SQLConf.get.sessionLocalTimeZone) +new XmlOptions(Map.empty) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlFileFormat.scala index 4342711b00f..77619299278 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/xml/XmlFileFormat.scala @@ -42,11 +42,10 @@ class XmlFileFormat extends TextBasedFileFormat with DataSourceRegister { def getXmlOptions( sparkSession: SparkSession, parameters: Map[String, String]): XmlOptions = { -val rowTagOpt = parameters.get(XmlOptions.ROW_TAG) -require(rowTagOpt.isDefined, s"'${XmlOptions.ROW_TAG}' option is required.") new XmlOptions(parameters, sparkSession.sessionState.conf.sessionLocalTimeZone, -
[spark] branch master updated: [SPARK-45507][SQL] Correctness fix for nested correlated scalar subqueries with COUNT aggregates
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8a972c2fe87 [SPARK-45507][SQL] Correctness fix for nested correlated scalar subqueries with COUNT aggregates 8a972c2fe87 is described below commit 8a972c2fe8730e41193986a273aa92b234e5beb8 Author: Andy Lam AuthorDate: Thu Oct 19 10:36:32 2023 +0800 [SPARK-45507][SQL] Correctness fix for nested correlated scalar subqueries with COUNT aggregates ### What changes were proposed in this pull request? We want to use the count bug handling in `DecorrelateInnerQuery` to detect potential count bugs in scalar subqueries. it It is always safe to use `DecorrelateInnerQuery` to handle count bugs, but for efficiency reasons, like for the common case of COUNT on top of the scalar subquery, we would like to avoid an extra left outer join. This PR therefore introduces a simple check to detect such cases before `decorrelate()` - if true, then don't do count bug handling in `decorrelate()`, an [...] ### Why are the changes needed? This PR fixes correctness issues for correlated scalar subqueries pertaining to the COUNT bug. Examples can be found in the JIRA ticket. ### Does this PR introduce _any_ user-facing change? Yes, results will change. ### How was this patch tested? Added SQL end-to-end tests in `count.sql` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43341 from andylam-db/multiple-count-bug. Authored-by: Andy Lam Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/optimizer/subquery.scala| 44 +- .../org/apache/spark/sql/internal/SQLConf.scala| 9 ++ .../nested-scalar-subquery-count-bug.sql.out | 166 + .../nested-scalar-subquery-count-bug.sql | 34 + .../nested-scalar-subquery-count-bug.sql.out | 125 5 files changed, 372 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index 5b95ee1df1b..1f1a16e9093 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -426,17 +426,49 @@ object PullupCorrelatedPredicates extends Rule[LogicalPlan] with PredicateHelper plan.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) { case ScalarSubquery(sub, children, exprId, conditions, hint, mayHaveCountBugOld) if children.nonEmpty => -val (newPlan, newCond) = decorrelate(sub, plan) -val mayHaveCountBug = if (mayHaveCountBugOld.isEmpty) { + +def mayHaveCountBugAgg(a: Aggregate): Boolean = { + a.groupingExpressions.isEmpty && a.aggregateExpressions.exists(_.exists { +case a: AggregateExpression => a.aggregateFunction.defaultResult.isDefined +case _ => false + }) +} + +// The below logic controls handling count bug for scalar subqueries in +// [[DecorrelateInnerQuery]], and if we don't handle it here, we handle it in +// [[RewriteCorrelatedScalarSubquery#constructLeftJoins]]. Note that handling it in +// [[DecorrelateInnerQuery]] is always correct, and turning it off to handle it in +// constructLeftJoins is an optimization, so that additional, redundant left outer joins are +// not introduced. +val handleCountBugInDecorrelate = SQLConf.get.decorrelateInnerQueryEnabled && + !conf.getConf(SQLConf.LEGACY_SCALAR_SUBQUERY_COUNT_BUG_HANDLING) && !(sub match { + // Handle count bug only if there exists lower level Aggs with count bugs. It does not + // matter if the top level agg is count bug vulnerable or not, because: + // 1. If the top level agg is count bug vulnerable, it can be handled in + // constructLeftJoins, unless there are lower aggs that are count bug vulnerable. + // E.g. COUNT(COUNT + COUNT) + // 2. If the top level agg is not count bug vulnerable, it can be count bug vulnerable if + // there are lower aggs that are count bug vulnerable. E.g. SUM(COUNT) + case agg: Aggregate => !agg.child.exists { +case lowerAgg: Aggregate => mayHaveCountBugAgg(lowerAgg) +case _ => false + } + case _ => false +}) +val (newPlan, newCond) = decorrelate(sub, plan, handleCountBugInDecorrelate) +val mayHaveCountBug = if (mayHaveCountBugOld.isDefined) { + // For idempotency, we must save this variable the first time this rule is
[spark] branch master updated: [SPARK-45586][SQL] Reduce compiler latency for plans with large expression trees
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 1e94415739c [SPARK-45586][SQL] Reduce compiler latency for plans with large expression trees 1e94415739c is described below commit 1e94415739ccfc4222a067459d3cb8be480530b4 Author: Kelvin Jiang AuthorDate: Thu Oct 19 10:24:58 2023 +0800 [SPARK-45586][SQL] Reduce compiler latency for plans with large expression trees ### What changes were proposed in this pull request? * Included rule ID pruning when traversing the expression trees in `TypeCoercionRule` (this avoids us from traversing the expression tree over and over again in future iterations of the rule) * Improved `EquivalentExpressions`: * Since `supportedExpression()` is checking for the existence of a pattern in the tree, changed to check the `TreePatternBits` instead of recursing using `.exists()` * When creating an `ExpressionEquals` object, calculating the height requires recursing through all of its children, which is O(n^2) when called upon each expression in the expression tree. This changes it so that this height is cached in the `TreeNode`, so that it is now O(n) when called upon each expression in the tree * More targeted TreePatternBits pruning in `ResolveTimeZone` and `ConstantPropagation` ### Why are the changes needed? This PR improves some analyzer and optimizer rules to address inefficiencies when handling extremely large expression trees. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? There should be no plan changes, so no unit tests were modified. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43420 from kelvinjian-db/SPARK-45586-large-expr-trees. Authored-by: Kelvin Jiang Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/analysis/TypeCoercion.scala | 4 +++- .../sql/catalyst/analysis/timeZoneAnalysis.scala | 8 .../expressions/EquivalentExpressions.scala| 23 +++--- .../spark/sql/catalyst/optimizer/expressions.scala | 6 -- .../apache/spark/sql/catalyst/trees/TreeNode.scala | 2 ++ 5 files changed, 20 insertions(+), 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index c26569866e5..b34fd873621 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.AlwaysProcess import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.internal.SQLConf @@ -1215,7 +1216,8 @@ trait TypeCoercionRule extends Rule[LogicalPlan] with Logging { } else { beforeMapChildren } - withPropagatedTypes.transformExpressionsUp(typeCoercionFn) + withPropagatedTypes.transformExpressionsUpWithPruning( +AlwaysProcess.fn, ruleId)(typeCoercionFn) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala index 11a5bc99b6c..01d88f050ca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/timeZoneAnalysis.scala @@ -20,7 +20,7 @@ import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ListQuery, TimeZoneAwareExpression} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.trees.TreePattern.{LIST_SUBQUERY, TIME_ZONE_AWARE_EXPRESSION} +import org.apache.spark.sql.catalyst.trees.TreePattern.TIME_ZONE_AWARE_EXPRESSION import org.apache.spark.sql.types.DataType /** @@ -40,10 +40,10 @@ object ResolveTimeZone extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveExpressionsWithPruning( - _.containsAnyPattern(LIST_SUBQUERY, TIME_ZONE_AWARE_EXPRESSION), ruleId -)(transformTimeZoneExprs) + _.containsPattern(TIME_ZONE_AWARE_EXPRESSION),
[spark] branch master updated: [SPARK-45585][TEST] Fix time format and redirection issues in SparkSubmit tests
This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a14f90941ca [SPARK-45585][TEST] Fix time format and redirection issues in SparkSubmit tests a14f90941ca is described below commit a14f90941caf06e2d77789a3952dd588e6900b90 Author: Kent Yao AuthorDate: Thu Oct 19 09:43:13 2023 +0800 [SPARK-45585][TEST] Fix time format and redirection issues in SparkSubmit tests ### What changes were proposed in this pull request? This PR fixes: - The deviation from `new Timestamp(new Date().getTime)` and log4j2 date format pattern from sub spark-submit progress ``` 2023-10-17 03:58:48.275 - stderr> 23/10/17 18:58:48 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20231017185848- 2023-10-17 03:58:48.278 - stderr> 23/10/17 18:58:48 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 57637. ``` - The duplication of `new Timestamp(new Date().getTime)` when using logInfo instead of println ``` 23/10/17 19:02:34.392 Thread-5 INFO SparkShellSuite: 2023-10-17 04:02:34.392 - stderr> 23/10/17 19:02:34 WARN Utils: Your hostname, hulk.local resolves to a loopback address: 127.0.0.1; using 10.221.103.23 instead (on interface en0) 23/10/17 19:02:34.393 Thread-5 INFO SparkShellSuite: 2023-10-17 04:02:34.393 - stderr> 23/10/17 19:02:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address ``` - Correctly redirects sub spark-submit progress logs to unit-tests.log ### Why are the changes needed? test fixes ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? - WholeStageCodegenSparkSubmitSuite - before ``` 18:58:53.882 shutdown-hook-0 INFO ShutdownHookManager: Shutdown hook called 18:58:53.882 shutdown-hook-0 INFO ShutdownHookManager: Deleting directory /Users/hzyaoqin/spark/target/tmp/spark-ecd53d47-d109-4ddc-80dd-2d829f34371e 11:58:18.892 pool-1-thread-1 WARN Utils: Your hostname, hulk.local resolves to a loopback address: 127.0.0.1; using 10.221.103.23 instead (on interface en0) 11:58:18.893 pool-1-thread-1 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 11:58:18.932 pool-1-thread-1-ScalaTest-running-WholeStageCodegenSparkSubmitSuite INFO WholeStageCodegenSparkSubmitSuite: ``` - WholeStageCodegenSparkSubmitSuite - after ``` = TEST OUTPUT FOR o.a.s.sql.execution.WholeStageCodegenSparkSubmitSuite: 'Generated code on driver should not embed platform-specific constant' = 11:58:19.882 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 23/10/18 11:58:19 WARN Utils: Your hostname, hulk.local resolves to a loopback address: 127.0.0.1; using 10.221.103.23 instead (on interface en0) 11:58:19.883 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 23/10/18 11:58:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 11:58:20.195 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 23/10/18 11:58:20 INFO SparkContext: Running Spark version 4.0.0-SNAPSHOT 11:58:20.195 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 23/10/18 11:58:20 INFO SparkContext: OS info Mac OS X, 13.4, aarch64 11:58:20.195 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 23/10/18 11:58:20 INFO SparkContext: Java version 17.0.8 11:58:20.227 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 23/10/18 11:58:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 11:58:20.253 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 23/10/18 11:58:20 INFO ResourceUtils: == 11:58:20.253 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 23/10/18 11:58:20 INFO ResourceUtils: No custom resources configured for spark.driver. 11:58:20.253 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 23/10/18 11:58:20 INFO ResourceUtils: == 11:58:20.254 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 23/10/18 11:58:20 INFO SparkContext: Submitted application: org.apache.spark.sql.execution.WholeStageCodegenSparkSubmitSuite 11:58:20.266 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 23/10/18 11:58:20 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0) 11:58:20.268 Thread-6 INFO
[spark] branch branch-3.5 updated: [SPARK-45585][TEST] Fix time format and redirection issues in SparkSubmit tests
This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 2f66851972c [SPARK-45585][TEST] Fix time format and redirection issues in SparkSubmit tests 2f66851972c is described below commit 2f66851972c2fc66b053a8c78bc2814b7bb4257f Author: Kent Yao AuthorDate: Thu Oct 19 09:43:13 2023 +0800 [SPARK-45585][TEST] Fix time format and redirection issues in SparkSubmit tests ### What changes were proposed in this pull request? This PR fixes: - The deviation from `new Timestamp(new Date().getTime)` and log4j2 date format pattern from sub spark-submit progress ``` 2023-10-17 03:58:48.275 - stderr> 23/10/17 18:58:48 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20231017185848- 2023-10-17 03:58:48.278 - stderr> 23/10/17 18:58:48 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 57637. ``` - The duplication of `new Timestamp(new Date().getTime)` when using logInfo instead of println ``` 23/10/17 19:02:34.392 Thread-5 INFO SparkShellSuite: 2023-10-17 04:02:34.392 - stderr> 23/10/17 19:02:34 WARN Utils: Your hostname, hulk.local resolves to a loopback address: 127.0.0.1; using 10.221.103.23 instead (on interface en0) 23/10/17 19:02:34.393 Thread-5 INFO SparkShellSuite: 2023-10-17 04:02:34.393 - stderr> 23/10/17 19:02:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address ``` - Correctly redirects sub spark-submit progress logs to unit-tests.log ### Why are the changes needed? test fixes ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? - WholeStageCodegenSparkSubmitSuite - before ``` 18:58:53.882 shutdown-hook-0 INFO ShutdownHookManager: Shutdown hook called 18:58:53.882 shutdown-hook-0 INFO ShutdownHookManager: Deleting directory /Users/hzyaoqin/spark/target/tmp/spark-ecd53d47-d109-4ddc-80dd-2d829f34371e 11:58:18.892 pool-1-thread-1 WARN Utils: Your hostname, hulk.local resolves to a loopback address: 127.0.0.1; using 10.221.103.23 instead (on interface en0) 11:58:18.893 pool-1-thread-1 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 11:58:18.932 pool-1-thread-1-ScalaTest-running-WholeStageCodegenSparkSubmitSuite INFO WholeStageCodegenSparkSubmitSuite: ``` - WholeStageCodegenSparkSubmitSuite - after ``` = TEST OUTPUT FOR o.a.s.sql.execution.WholeStageCodegenSparkSubmitSuite: 'Generated code on driver should not embed platform-specific constant' = 11:58:19.882 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 23/10/18 11:58:19 WARN Utils: Your hostname, hulk.local resolves to a loopback address: 127.0.0.1; using 10.221.103.23 instead (on interface en0) 11:58:19.883 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 23/10/18 11:58:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 11:58:20.195 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 23/10/18 11:58:20 INFO SparkContext: Running Spark version 4.0.0-SNAPSHOT 11:58:20.195 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 23/10/18 11:58:20 INFO SparkContext: OS info Mac OS X, 13.4, aarch64 11:58:20.195 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 23/10/18 11:58:20 INFO SparkContext: Java version 17.0.8 11:58:20.227 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 23/10/18 11:58:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 11:58:20.253 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 23/10/18 11:58:20 INFO ResourceUtils: == 11:58:20.253 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 23/10/18 11:58:20 INFO ResourceUtils: No custom resources configured for spark.driver. 11:58:20.253 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 23/10/18 11:58:20 INFO ResourceUtils: == 11:58:20.254 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 23/10/18 11:58:20 INFO SparkContext: Submitted application: org.apache.spark.sql.execution.WholeStageCodegenSparkSubmitSuite 11:58:20.266 Thread-6 INFO WholeStageCodegenSparkSubmitSuite: stderr> 23/10/18 11:58:20 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0) 11:58:20.268 Thread-6 INFO
[spark] branch master updated: [SPARK-45582][SS] Ensure that store instance is not used after calling commit within output mode streaming aggregation
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b42ac52e787 [SPARK-45582][SS] Ensure that store instance is not used after calling commit within output mode streaming aggregation b42ac52e787 is described below commit b42ac52e787c896f21452023da0bd6685a1b47fc Author: Anish Shrigondekar AuthorDate: Thu Oct 19 06:51:45 2023 +0900 [SPARK-45582][SS] Ensure that store instance is not used after calling commit within output mode streaming aggregation ### What changes were proposed in this pull request? Ensure that store instance is not used after calling commit within output mode streaming aggregation ### Why are the changes needed? Without these changes, we were accessing the store instance to retrieve the iterator even after the commit was called. When commit is called, we release the DB instance lock. So its possible task retries can acquire the instance lock and close the DB instance. So when the original thread tries to access the DB, it might run into a null pointer exception. This change fixes the issue ``` org.apache.spark.sql.streaming.StreamingQueryException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 492) (ip-10-110-25-116.us-west-2.compute.internal executor driver): java.lang.NullPointerException at org.apache.spark.sql.execution.streaming.state.RocksDB.iterator(RocksDB.scala:337) at org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider$RocksDBStateStore.iterator(RocksDBStateStoreProvider.scala:79) at org.apache.spark.sql.execution.streaming.state.StreamingAggregationStateManagerImplV1.values(StreamingAggregationStateManager.scala:130) at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$5(statefulOperators.scala:543) at org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.$anonfun$mapPartitionsWithStateStore$1(package.scala:63) at org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:131) at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:407) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:404) at org.apache.spark.rdd.RDD.iterator(RDD.scala:371) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? RocksDBStreamingAggregationSuite ``` 18:12:00.242 WARN org.apache.spark.sql.streaming.RocksDBStateStoreStreamingAggregationSuite: = POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.streaming.RocksDBStateStoreStreamingAggregationSuite, threads: ForkJoinPool.commonPool-worker-6 (daemon=true), ForkJoinPool.commonPool-worker-4 (daemon=true), ForkJoinPool.commonPool-worker-7 (daemon=true), ForkJoinPool.commonPool-worker-5 (daemon=true), ForkJoinPool.commonPool-worker-3 (daemon=true), state-store-maintenance-task (daemon=true), rpc-boss-3-1 (daemon=true), ForkJoinPool.commonPool-worker-8 (daemon=true), shuffle-boss-6-1 (da [...] [info] Run completed in 5 minutes, 8 seconds. [info] Total number of tests run: 80 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 80, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` StreamingSessionWindowSuite ``` = POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.streaming.StreamingSessionWindowSuite, threads: ForkJoinPool.commonPool-worker-6 (daemon=true), state-store-maintenance-thread-0 (daemon=true), ForkJoinPool.commonPool-worker-4 (daemon=true), state-store-maintenance-thread-1 (daemon=true), ForkJoinPool.commonPool-worker-7 (daemon=true), ForkJoinPool.commonPool-worker-5 (daemon=true), ForkJoinPool.commonPool-worker-3 (daemon=true), state-store-maintenance-task (daemon=true), rpc-boss-3-1 (d [...] [info] Run completed in 3 minutes, 38 seconds. [info] Total number of tests run: 48 [info] Suites: completed 1, aborted 0 [info] Tests: succeeded 48, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #43413 from anishshri-db/task/SPARK-45582. Authored-by: Anish Shrigondekar Signed-off-by: Jungtaek Lim --- .../execution/streaming/statefulOperators.scala| 64 -- 1 file changed, 48 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
[spark] branch master updated: [SPARK-45581] Make SQLSTATE mandatory
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7e82e1bc43e [SPARK-45581] Make SQLSTATE mandatory 7e82e1bc43e is described below commit 7e82e1bc43e0297c3036d802b3a151d2b93db2f6 Author: srielau AuthorDate: Wed Oct 18 11:04:44 2023 -0700 [SPARK-45581] Make SQLSTATE mandatory ### What changes were proposed in this pull request? We propose to make SQLSTATEs mandatory field when using error classes in the new error framework. ### Why are the changes needed? Being able to rely on the existence of SQLSTATEs allows easier classification of errors as well as usage of tooling to intercept SQLSTATEs. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? A new test was added to SparkThrowableSuite to enforce SQLSTATE existence ### Was this patch authored or co-authored using generative AI tooling? No Closes #43412 from srielau/SPARK-45581-make-SQLSTATEs-mandatory. Authored-by: srielau Signed-off-by: Gengliang Wang --- common/utils/src/main/resources/error/README.md| 26 ++ .../src/main/resources/error/error-classes.json| 9 +--- .../org/apache/spark/ErrorClassesJSONReader.scala | 8 +++ .../org/apache/spark/SparkThrowableSuite.scala | 16 - docs/sql-error-conditions.md | 6 ++--- 5 files changed, 41 insertions(+), 24 deletions(-) diff --git a/common/utils/src/main/resources/error/README.md b/common/utils/src/main/resources/error/README.md index ac388c29250..8d8529bea56 100644 --- a/common/utils/src/main/resources/error/README.md +++ b/common/utils/src/main/resources/error/README.md @@ -1,6 +1,6 @@ # Guidelines -To throw a standardized user-facing error or exception, developers should specify the error class +To throw a standardized user-facing error or exception, developers should specify the error class, a SQLSTATE, and message parameters rather than an arbitrary error message. ## Usage @@ -10,7 +10,7 @@ and message parameters rather than an arbitrary error message. If true, use the error class `INTERNAL_ERROR` and skip to step 4. 2. Check if an appropriate error class already exists in `error-classes.json`. If true, use the error class and skip to step 4. -3. Add a new class to `error-classes.json`; keep in mind the invariants below. +3. Add a new class with a new or existing SQLSTATE to `error-classes.json`; keep in mind the invariants below. 4. Check if the exception type already extends `SparkThrowable`. If true, skip to step 6. 5. Mix `SparkThrowable` into the exception. @@ -26,9 +26,9 @@ Throw with arbitrary error message: `error-classes.json` -"PROBLEM_BECAUSE": { - "message": ["Problem because "], - "sqlState": "X" +"PROBLEM_BECAUSE" : { + "message" : ["Problem because "], + "sqlState" : "X" } `SparkException.scala` @@ -70,6 +70,8 @@ Error classes are a succinct, human-readable representation of the error categor An uncategorized errors can be assigned to a legacy error class with the prefix `_LEGACY_ERROR_TEMP_` and an unused sequential number, for instance `_LEGACY_ERROR_TEMP_0053`. +You should not introduce new uncategorized errors. Instead, convert them to proper errors whenever encountering them in new code. + Invariants - Unique @@ -79,7 +81,10 @@ An uncategorized errors can be assigned to a legacy error class with the prefix ### Message Error messages provide a descriptive, human-readable representation of the error. -The message format accepts string parameters via the C-style printf syntax. +The message format accepts string parameters via the HTML tag syntax: e.g. . + +The values passed to the message shoudl not themselves be messages. +They should be: runtime-values, keywords, identifiers, or other values that are not translated. The quality of the error message should match the [guidelines](https://spark.apache.org/error-message-guidelines.html). @@ -90,21 +95,24 @@ The quality of the error message should match the ### SQLSTATE -SQLSTATE is an optional portable error identifier across SQL engines. +SQLSTATE is an mandatory portable error identifier across SQL engines. SQLSTATE comprises a 2-character class value followed by a 3-character subclass value. Spark prefers to re-use existing SQLSTATEs, preferably used by multiple vendors. For extension Spark claims the 'K**' subclass range. If a new class is needed it will also claim the 'K0' class. +Internal errors should use the 'XX' class. You can subdivide internal errors by component. +For example: The existing 'XXKD0' is used for an internal analyzer error. + Invariants -- Consistent
[spark] branch master updated: [SPARK-45549][CORE] Remove unused `numExistingExecutors` in `CoarseGrainedSchedulerBackend`
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b41ea9162f4c [SPARK-45549][CORE] Remove unused `numExistingExecutors` in `CoarseGrainedSchedulerBackend` b41ea9162f4c is described below commit b41ea9162f4c8fbc4d04d28d6ab5cc0342b88cb0 Author: huangxiaoping <1754789...@qq.com> AuthorDate: Wed Oct 18 08:49:46 2023 -0700 [SPARK-45549][CORE] Remove unused `numExistingExecutors` in `CoarseGrainedSchedulerBackend` ### What changes were proposed in this pull request? Remove unused `numExistingExecutors` in `CoarseGrainedSchedulerBackend` ### Why are the changes needed? Remove unused code, make spark code cleaner ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? No need test ### Was this patch authored or co-authored using generative AI tooling? No Closes #43383 from huangxiaopingRD/SPARK-45549. Authored-by: huangxiaoping <1754789...@qq.com> Signed-off-by: Dongjoon Hyun --- .../spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 5 - 1 file changed, 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index b55dfb39d445..c770e5c9950a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -731,11 +731,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp false } - /** - * Return the number of executors currently registered with this backend. - */ - private def numExistingExecutors: Int = synchronized { executorDataMap.size } - override def getExecutorIds(): Seq[String] = synchronized { executorDataMap.keySet.toSeq } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45370][PYTHON][TESTS] Fix python test when ansi mode enabled
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 4a35a31c038f [SPARK-45370][PYTHON][TESTS] Fix python test when ansi mode enabled 4a35a31c038f is described below commit 4a35a31c038f726f9329b4f28f3dde87286fb8d2 Author: panbingkun AuthorDate: Wed Oct 18 17:10:42 2023 +0900 [SPARK-45370][PYTHON][TESTS] Fix python test when ansi mode enabled ### What changes were proposed in this pull request? The pr aims to fix some UT in `pyspark.sql.functions` when SPARK_ANSI_SQL_MODE=true. ### Why are the changes needed? Make pyspark test happy. When Ansi workflow daily GA runs, the following error occurs, eg: https://github.com/apache/spark/actions/workflows/build_ansi.yml https://github.com/apache/spark/actions/runs/667232/job/17202251325 https://github.com/apache/spark/assets/15246973/c22fb8c4-8b87-46fd-85f2-51f4c1d8d13d;> ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually test: ``` python/run-tests --testnames 'pyspark.sql.functions' Running PySpark tests. Output is in /Users/panbingkun/Developer/spark/spark-community/python/unit-tests.log Will test against the following Python executables: ['python3.9'] Will test the following Python tests: ['pyspark.sql.functions'] python3.9 python_implementation is CPython python3.9 version is: Python 3.9.13 Starting test(python3.9): pyspark.sql.functions (temp output: /Users/panbingkun/Developer/spark/spark-community/python/target/c8705d5c-d9f9-4bc5-babf-d3642736c70c/python3.9__pyspark.sql.functions__gcfwu3ik.log) Finished test(python3.9): pyspark.sql.functions (47s) Tests passed in 47 seconds ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43168 from panbingkun/SPARK-45370. Authored-by: panbingkun Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/functions.py | 114 +--- 1 file changed, 73 insertions(+), 41 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 31e5884e9ebd..7807919ce2c3 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -11740,15 +11740,29 @@ def create_map( >>> from pyspark.sql import functions as sf >>> df = spark.createDataFrame([("Alice", 2, "female"), -... ("Bob", 5, "male")], ("name", "age", "gender")) +... ("Bob", 5, "male")], ("name", "age", "gender")) >>> df.select(sf.create_map(sf.lit('name'), df['name'], -... sf.lit('age'), df['age'])).show(truncate=False) -+-+ -|map(name, name, age, age)| -+-+ -|{name -> Alice, age -> 2}| -|{name -> Bob, age -> 5} | -+-+ +... sf.lit('gender'), df['gender'])).show(truncate=False) ++-+ +|map(name, name, gender, gender) | ++-+ +|{name -> Alice, gender -> female}| +|{name -> Bob, gender -> male}| ++-+ + +Example 4: Usage of create_map function with values of different types. + +>>> from pyspark.sql import functions as sf +>>> df = spark.createDataFrame([("Alice", 2, 22.2), +... ("Bob", 5, 36.1)], ("name", "age", "weight")) +>>> df.select(sf.create_map(sf.lit('age'), df['age'], +... sf.lit('weight'), df['weight'])).show(truncate=False) ++-+ +|map(age, age, weight, weight)| ++-+ +|{age -> 2.0, weight -> 22.2} | +|{age -> 5.0, weight -> 36.1} | ++-+ """ if len(cols) == 1 and isinstance(cols[0], (list, set)): cols = cols[0] # type: ignore[assignment] @@ -11833,50 +11847,68 @@ def array( Example 1: Basic usage of array function with column names. >>> from pyspark.sql import functions as sf ->>> df = spark.createDataFrame([("Alice", 2), ("Bob", 5)], ("name", "age")) ->>> df.select(sf.array('name', 'age').alias("arr")).show() -+--+ -| arr| -+--+ -|[Alice, 2]| -| [Bob, 5]| -+--+ +>>> df = spark.createDataFrame([("Alice", "doctor"), ("Bob", "engineer")], +... ("name", "occupation")) +>>> df.select(sf.array('name', 'occupation').alias("arr")).show() ++---+ +|arr| ++---+ +|[Alice, doctor]| +|[Bob, engineer]| ++---+ Example 2: Usage of array function with Column objects. >>> from pyspark.sql import functions as sf ->>> df =
[spark] branch master updated: [SPARK-45410][INFRA][FOLLOW-UP] Rename "Python - PyPy3.8 (master)" to "Build / Python-only (master, PyPy 3.8)"
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 69bb68bc997c [SPARK-45410][INFRA][FOLLOW-UP] Rename "Python - PyPy3.8 (master)" to "Build / Python-only (master, PyPy 3.8)" 69bb68bc997c is described below commit 69bb68bc997c6aae7072ca3c3b395f602c847473 Author: Hyukjin Kwon AuthorDate: Wed Oct 18 00:23:20 2023 -0700 [SPARK-45410][INFRA][FOLLOW-UP] Rename "Python - PyPy3.8 (master)" to "Build / Python-only (master, PyPy 3.8)" ### What changes were proposed in this pull request? This PR is a simple followup of https://github.com/apache/spark/pull/43209 that changes the build name from "Python - PyPy3.8 (master)" to "Build / Python-only (master, PyPy 3.8)" ### Why are the changes needed? To move/group the build up when you click Actions (https://github.com/apache/spark/actions). For now, it looks as below: ![Screenshot 2023-10-18 at 4 16 23 PM](https://github.com/apache/spark/assets/6477701/617cb386-53b7-40e9-bfa2-3c4a72b0278f) and you have to click and extend to see Python build: ![Screenshot 2023-10-18 at 4 16 19 PM](https://github.com/apache/spark/assets/6477701/ec258fb6-2449-4c9f-bb2e-340fc1efe78e) ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? N/A ### Was this patch authored or co-authored using generative AI tooling? No Closes #43428 from HyukjinKwon/minor-rename-python. Authored-by: Hyukjin Kwon Signed-off-by: Dongjoon Hyun --- .github/workflows/build_python.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build_python.yml b/.github/workflows/build_python.yml index 94d64a825d87..04b46ffca672 100644 --- a/.github/workflows/build_python.yml +++ b/.github/workflows/build_python.yml @@ -17,7 +17,7 @@ # under the License. # -name: "Python - PyPy3.8 (master)" +name: "Build / Python-only (master, PyPy 3.8)" on: schedule: - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45542][CORE] Replace `setSafeMode(HdfsConstants.SafeModeAction, boolean)` with `setSafeMode(SafeModeAction, boolean)`
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5d5a9dbfba18 [SPARK-45542][CORE] Replace `setSafeMode(HdfsConstants.SafeModeAction, boolean)` with `setSafeMode(SafeModeAction, boolean)` 5d5a9dbfba18 is described below commit 5d5a9dbfba1831d1aafcc47a82a25df02e431749 Author: yangjie01 AuthorDate: Wed Oct 18 00:16:43 2023 -0700 [SPARK-45542][CORE] Replace `setSafeMode(HdfsConstants.SafeModeAction, boolean)` with `setSafeMode(SafeModeAction, boolean)` ### What changes were proposed in this pull request? The function `FsHistoryProvider#isFsInSafeMode` is using the deprecated API https://github.com/apache/hadoop/blob/1be78238728da9266a4f88195058f08fd012bf9c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java#L1702-L1722 ``` /** * Enter, leave or get safe mode. * * param action * One of SafeModeAction.ENTER, SafeModeAction.LEAVE and * SafeModeAction.GET. * param isChecked * If true check only for Active NNs status, else check first NN's * status. * * see org.apache.hadoop.hdfs.protocol.ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction, * boolean) * * deprecated please instead use * {link DistributedFileSystem#setSafeMode(SafeModeAction, boolean)}. */ Deprecated public boolean setSafeMode(HdfsConstants.SafeModeAction action, boolean isChecked) throws IOException { return dfs.setSafeMode(action, isChecked); } ``` This pr change to use https://github.com/apache/hadoop/blob/1be78238728da9266a4f88195058f08fd012bf9c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java#L1646-L1661 ``` /** * Enter, leave or get safe mode. * * param action * One of SafeModeAction.ENTER, SafeModeAction.LEAVE and * SafeModeAction.GET. * param isChecked * If true check only for Active NNs status, else check first NN's * status. */ Override SuppressWarnings("deprecation") public boolean setSafeMode(SafeModeAction action, boolean isChecked) throws IOException { return this.setSafeMode(convertToClientProtocolSafeModeAction(action), isChecked); } ``` instead of it. The conversion relationship from `HdfsConstants.SafeModeAction` to `SafeModeAction` refers to `convertToClientProtocolSafeModeAction` method. https://github.com/apache/hadoop/blob/1be78238728da9266a4f88195058f08fd012bf9c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java#L1663-L1686 ``` /** * Translating the {link SafeModeAction} into {link HdfsConstants.SafeModeAction} * that is used by {link DFSClient#setSafeMode(HdfsConstants.SafeModeAction, boolean)}. * * param action any supported action listed in {link SafeModeAction}. * return the converted {link HdfsConstants.SafeModeAction}. * throws UnsupportedOperationException if the provided {link SafeModeAction} cannot be * translated. */ private static HdfsConstants.SafeModeAction convertToClientProtocolSafeModeAction( SafeModeAction action) { switch (action) { case ENTER: return HdfsConstants.SafeModeAction.SAFEMODE_ENTER; case LEAVE: return HdfsConstants.SafeModeAction.SAFEMODE_LEAVE; case FORCE_EXIT: return HdfsConstants.SafeModeAction.SAFEMODE_FORCE_EXIT; case GET: return HdfsConstants.SafeModeAction.SAFEMODE_GET; default: throw new UnsupportedOperationException("Unsupported safe mode action " + action); } } ``` ### Why are the changes needed? Clean up deprecated API usage ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #43377 from LuciferYang/SPARK-45542. Authored-by: yangjie01 Signed-off-by: Dongjoon Hyun --- .../scala/org/apache/spark/deploy/history/FsHistoryProvider.scala| 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index b5afa86180b3..99b3184ac918 100644 ---
[spark] branch master updated: [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 103de914a5f [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions 103de914a5f is described below commit 103de914a5f96fccbe722663ee69c8ee7d9c8135 Author: Jiaan Geng AuthorDate: Wed Oct 18 14:55:51 2023 +0800 [SPARK-44649][SQL] Runtime Filter supports passing equivalent creation side expressions ### What changes were proposed in this pull request? Currently, Spark runtime filter supports multi level shuffle join side as filter creation side. Please see: https://github.com/apache/spark/pull/39170. Although this feature adds the adaptive scene and improves the performance, there are still need to support other case. **Optimization of Expression Transitivity on the Creation Side of Spark Runtime Filter** **Principle** Association expressions are transitive in some Joins, such as: `Tab1.col1A = Tab2.col2B` and `Tab2.col2B = Tab3.col3C` Actually, it can be inferred that `Tab1.col1A = Tab3.col3C`. **Optimization points** Currently, the runtime filter's creation side expression only uses directly associated keys. If the transitivity of association conditions is utilized, runtime filters can be injected into many scenarios, such as: ``` SELECT * FROM ( SELECT * FROM tab1 JOIN tab2 ON tab1.c1 = tab2.c2 WHERE tab2.a2 = 5 ) AS a JOIN tab3 ON tab3.c3 = a.c1 ``` The `tab3.c3` here is only associated with `tab1.c1` and not with `tab2.c2`. Although there is selective filtering on tab2 (`tab2.a2 = 5`), Spark is currently unable to inject a Runtime Filter. As long as transitivity is considered, we can know that `tab3.c3` and `tab2.c2` are related, so we can still inject Runtime Filter and improve performance. For the current implementation, Spark only inject runtime filter into tab1 with bloom filter based on `bf2.a2 = 5`. Because there is no the join between tab3 and tab2, so Spark can't inject runtime filter into tab3 with the same bloom filter. But the above SQL have the join condition `tab3.c3 = a.c1(tab1.c1)` between tab3 and tab2, and also have the join condition `tab1.c1 = tab2.c2`. We can rely on the transitivity of the join condition to get the virtual join condition `tab3.c3 = tab2.c2`, then we can inject the bloom filter based on `bf2.a2 = 5` into tab3. ### Why are the changes needed? Enhance the Spark runtime filter and improve performance. ### Does this PR introduce _any_ user-facing change? 'No'. Just update the inner implementation. ### How was this patch tested? New tests. Micro benchmark for q75 in TPC-DS. **2TB TPC-DS** | TPC-DS Query | Before(Seconds) | After(Seconds) | Speedup(Percent) | | | | | | | q75 | 129.664 | 81.562 | 58.98% | Closes #42317 from beliefer/SPARK-44649. Authored-by: Jiaan Geng Signed-off-by: Wenchen Fan --- .../catalyst/optimizer/InjectRuntimeFilter.scala | 64 +++--- .../spark/sql/InjectRuntimeFilterSuite.scala | 38 +++-- 2 files changed, 75 insertions(+), 27 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala index 8737082e571..30526bd8106 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala @@ -125,14 +125,14 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J */ private def extractSelectiveFilterOverScan( plan: LogicalPlan, - filterCreationSideKey: Expression): Option[LogicalPlan] = { -@tailrec + filterCreationSideKey: Expression): Option[(Expression, LogicalPlan)] = { def extract( p: LogicalPlan, predicateReference: AttributeSet, hasHitFilter: Boolean, hasHitSelectiveFilter: Boolean, -currentPlan: LogicalPlan): Option[LogicalPlan] = p match { +currentPlan: LogicalPlan, +targetKey: Expression): Option[(Expression, LogicalPlan)] = p match { case Project(projectList, child) if hasHitFilter => // We need to make sure all expressions referenced by filter predicates are simple // expressions. @@ -143,41 +143,62 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J referencedExprs.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _), hasHitFilter,
[spark] branch master updated: [SPARK-45558][SS] Introduce a metadata file for streaming stateful operator
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3005dc89084 [SPARK-45558][SS] Introduce a metadata file for streaming stateful operator 3005dc89084 is described below commit 3005dc8908486f63a3e471cd05189881b833daf1 Author: Chaoqin Li AuthorDate: Wed Oct 18 15:49:43 2023 +0900 [SPARK-45558][SS] Introduce a metadata file for streaming stateful operator ### What changes were proposed in this pull request? Introduce a metadata file for streaming stateful operator, write metadata for stateful operator during planning. The information to store in the metadata file: - operator name (no need to be unique among stateful operators in the query) - state store name - numColumnsPrefixKey: > 0 if prefix scan is enabled, 0 otherwise The body of metadata file will be in json format. The metadata file will be versioned just as other streaming metadata file to be future proof. ### Why are the changes needed? The metadata file will improve expose more information about the state store, improves debugability and facilitate the development of state related feature such as reading and writing state and state repartitioning. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add unit test and integration tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #43393 from chaoqin-li1123/state_metadata. Authored-by: Chaoqin Li Signed-off-by: Jungtaek Lim --- .../spark/sql/execution/QueryExecution.scala | 2 +- .../execution/streaming/IncrementalExecution.scala | 22 ++- .../execution/streaming/MicroBatchExecution.scala | 4 +- .../streaming/StreamingSymmetricHashJoinExec.scala | 10 ++ .../streaming/continuous/ContinuousExecution.scala | 3 +- .../streaming/state/OperatorStateMetadata.scala| 136 .../execution/streaming/statefulOperators.scala| 21 ++- .../state/OperatorStateMetadataSuite.scala | 181 + 8 files changed, 374 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index b3c97a83970..3d35300773b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -272,7 +272,7 @@ class QueryExecution( new IncrementalExecution( sparkSession, logical, OutputMode.Append(), "", UUID.randomUUID, UUID.randomUUID, 0, None, OffsetSeqMetadata(0, 0), -WatermarkPropagator.noop()) +WatermarkPropagator.noop(), false) } else { this } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index ebdb9caf09e..a67097f6e96 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.streaming import java.util.UUID import java.util.concurrent.atomic.AtomicInteger +import org.apache.hadoop.fs.Path + import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.QueryPlanningTracker @@ -32,6 +34,7 @@ import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, MergingSessi import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike import org.apache.spark.sql.execution.python.FlatMapGroupsInPandasWithStateExec import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSourceV1 +import org.apache.spark.sql.execution.streaming.state.OperatorStateMetadataWriter import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.util.Utils @@ -50,7 +53,8 @@ class IncrementalExecution( val currentBatchId: Long, val prevOffsetSeqMetadata: Option[OffsetSeqMetadata], val offsetSeqMetadata: OffsetSeqMetadata, -val watermarkPropagator: WatermarkPropagator) +val watermarkPropagator: WatermarkPropagator, +val isFirstBatch: Boolean) extends QueryExecution(sparkSession, logicalPlan) with Logging { // Modified planner with stateful operations. @@ -71,6 +75,8 @@ class IncrementalExecution( StreamingGlobalLimitStrategy(outputMode) :: Nil } + private lazy val hadoopConf = sparkSession.sessionState.newHadoopConf() + private[sql] val
[spark] branch master updated (b1e57a2b359 -> d7d38fbc184)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from b1e57a2b359 [SPARK-45534][CORE] Use java.lang.ref.Cleaner instead of finalize for RemoteBlockPushResolver add d7d38fbc184 [SPARK-45587][INFRA] Skip UNIDOC and MIMA in `build` GitHub Action job No new revisions were added by this update. Summary of changes: .github/workflows/build_and_test.yml | 4 1 file changed, 4 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-45534][CORE] Use java.lang.ref.Cleaner instead of finalize for RemoteBlockPushResolver
This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b1e57a2b359 [SPARK-45534][CORE] Use java.lang.ref.Cleaner instead of finalize for RemoteBlockPushResolver b1e57a2b359 is described below commit b1e57a2b359d7d9fbf07adfba10db97f38b99bde Author: zhaomin AuthorDate: Wed Oct 18 01:20:08 2023 -0500 [SPARK-45534][CORE] Use java.lang.ref.Cleaner instead of finalize for RemoteBlockPushResolver ### What changes were proposed in this pull request? use java.lang.ref.Cleaner instead of finalize() for RemoteBlockPushResolver ### Why are the changes needed? The finalize() method has been marked as deprecated since Java 9 and will be removed in the future, java.lang.ref.Cleaner is the more recommended solution. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #43371 from zhaomin1423/45315. Authored-by: zhaomin Signed-off-by: Mridul Muralidharan gmail.com> --- .../network/shuffle/RemoteBlockPushResolver.java | 101 + .../network/shuffle/ShuffleTestAccessor.scala | 2 +- 2 files changed, 64 insertions(+), 39 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index a915d0eccb0..14fefebe089 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -21,6 +21,7 @@ import java.io.DataOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.lang.ref.Cleaner; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; @@ -94,6 +95,7 @@ import org.apache.spark.network.util.TransportConf; */ public class RemoteBlockPushResolver implements MergedShuffleFileManager { + private static final Cleaner CLEANER = Cleaner.create(); private static final Logger logger = LoggerFactory.getLogger(RemoteBlockPushResolver.class); public static final String MERGED_SHUFFLE_FILE_NAME_PREFIX = "shuffleMerged"; @@ -481,7 +483,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { appShuffleInfo.shuffles.forEach((shuffleId, shuffleInfo) -> shuffleInfo.shuffleMergePartitions .forEach((shuffleMergeId, partitionInfo) -> { synchronized (partitionInfo) { - partitionInfo.closeAllFilesAndDeleteIfNeeded(false); + partitionInfo.cleanable.clean(); } })); if (cleanupLocalDirs) { @@ -537,7 +539,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { partitions .forEach((partitionId, partitionInfo) -> { synchronized (partitionInfo) { - partitionInfo.closeAllFilesAndDeleteIfNeeded(true); + partitionInfo.cleanable.clean(); + partitionInfo.deleteAllFiles(); } }); } @@ -822,7 +825,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId, partition.reduceId, ioe.getMessage()); } finally { -partition.closeAllFilesAndDeleteIfNeeded(false); +partition.cleanable.clean(); } } } @@ -1720,6 +1723,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { // The meta file for a particular merged shuffle contains all the map indices that belong to // every chunk. The entry per chunk is a serialized bitmap. private final MergeShuffleFile metaFile; +private final Cleaner.Cleanable cleanable; // Location offset of the last successfully merged block for this shuffle partition private long dataFilePos; // Track the map index whose block is being merged for this shuffle partition @@ -1756,6 +1760,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager { this.dataFilePos = 0; this.mapTracker = new RoaringBitmap(); this.chunkTracker = new RoaringBitmap(); + this.cleanable = CLEANER.register(this, new ResourceCleaner(dataChannel, indexFile, +metaFile, appAttemptShuffleMergeId, reduceId)); } public long getDataFilePos() { @@ -1864,36 +1870,13 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
[spark] branch master updated: [SPARK-45035][SQL] Fix ignoreCorruptFiles/ignoreMissingFiles with multiline CSV/JSON will report error
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 11e7ea4f11d [SPARK-45035][SQL] Fix ignoreCorruptFiles/ignoreMissingFiles with multiline CSV/JSON will report error 11e7ea4f11d is described below commit 11e7ea4f11df71e2942322b01fcaab57dac20c83 Author: Jia Fan AuthorDate: Wed Oct 18 11:06:43 2023 +0500 [SPARK-45035][SQL] Fix ignoreCorruptFiles/ignoreMissingFiles with multiline CSV/JSON will report error ### What changes were proposed in this pull request? Fix ignoreCorruptFiles/ignoreMissingFiles with multiline CSV/JSON will report error, it would be like: ```log org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4940.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4940.0 (TID 4031) (10.68.177.106 executor 0): com.univocity.parsers.common.TextParsingException: java.lang.IllegalStateException - Error reading from input Parser Configuration: CsvParserSettings: Auto configuration enabled=true Auto-closing enabled=true Autodetect column delimiter=false Autodetect quotes=false Column reordering enabled=true Delimiters for detection=null Empty value= Escape unquoted values=false Header extraction enabled=null Headers=null Ignore leading whitespaces=false Ignore leading whitespaces in quotes=false Ignore trailing whitespaces=false Ignore trailing whitespaces in quotes=false Input buffer size=1048576 Input reading on separate thread=false Keep escape sequences=false Keep quotes=false Length of content displayed on error=1000 Line separator detection enabled=true Maximum number of characters per column=-1 Maximum number of columns=20480 Normalize escaped line separators=true Null value= Number of records to read=all Processor=none Restricting data in exceptions=false RowProcessor error handler=null Selected fields=none Skip bits as whitespace=true Skip empty lines=true Unescaped quote handling=STOP_AT_DELIMITERFormat configuration: CsvFormat: Comment character=# Field delimiter=, Line separator (normalized)=\n Line separator sequence=\n Quote character=" Quote escape character=\ Quote escape escape character=null Internal state when error was thrown: line=0, column=0, record=0 at com.univocity.parsers.common.AbstractParser.handleException(AbstractParser.java:402) at com.univocity.parsers.common.AbstractParser.beginParsing(AbstractParser.java:277) at com.univocity.parsers.common.AbstractParser.beginParsing(AbstractParser.java:843) at org.apache.spark.sql.catalyst.csv.UnivocityParser$$anon$1.(UnivocityParser.scala:463) at org.apache.spark.sql.catalyst.csv.UnivocityParser$.convertStream(UnivocityParser.scala:46... ``` Because multiline CSV/JSON use `BinaryFileRDD` not `FileScanRDD`. Unlike `FileScanRDD`, when met corrupt files will check `ignoreCorruptFiles` config to avoid report IOException, `BinaryFileRDD` will not report error because it return normal `PortableDataStream`. So we should catch it when infer schema in lambda function. Also do same thing for `ignoreMissingFiles`. ### Why are the changes needed? Fix the bug when use mulitline mode with ignoreCorruptFiles/ignoreMissingFiles config. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? add new test. ### Was this patch authored or co-authored using generative AI tooling? No Closes #42979 from Hisoka-X/SPARK-45035_csv_multi_line. Authored-by: Jia Fan Signed-off-by: Max Gekk --- .../spark/sql/catalyst/json/JsonInferSchema.scala | 18 +-- .../execution/datasources/csv/CSVDataSource.scala | 28 --- .../datasources/CommonFileDataSourceSuite.scala| 28 +++ .../sql/execution/datasources/csv/CSVSuite.scala | 58 +- .../sql/execution/datasources/json/JsonSuite.scala | 46 - 5 files changed, 142 insertions(+), 36 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index 4123c5290b6..4d04b34876c 100644 ---