(spark) branch master updated: [SPARK-46713][PYTHON][DOCS] Refine docstring of `map_keys/map_values/map_entries`
This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 02573741605d [SPARK-46713][PYTHON][DOCS] Refine docstring of `map_keys/map_values/map_entries` 02573741605d is described below commit 02573741605dc240de0af6c8d23f983cfd303cc6 Author: yangjie01 AuthorDate: Mon Jan 15 15:15:01 2024 +0800 [SPARK-46713][PYTHON][DOCS] Refine docstring of `map_keys/map_values/map_entries` ### What changes were proposed in this pull request? This pr refine docstring of `map_keys/map_values/map_entries` and add some new examples. ### Why are the changes needed? To improve PySpark documentation ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass Github Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #44724 from LuciferYang/SPARK-46713. Lead-authored-by: yangjie01 Co-authored-by: YangJie Signed-off-by: yangjie01 --- python/pyspark/sql/functions/builtin.py | 192 +--- 1 file changed, 154 insertions(+), 38 deletions(-) diff --git a/python/pyspark/sql/functions/builtin.py b/python/pyspark/sql/functions/builtin.py index a05ce7b04368..f1422d17b071 100644 --- a/python/pyspark/sql/functions/builtin.py +++ b/python/pyspark/sql/functions/builtin.py @@ -15800,7 +15800,7 @@ def map_contains_key(col: "ColumnOrName", value: Any) -> Column: @_try_remote_functions def map_keys(col: "ColumnOrName") -> Column: """ -Collection function: Returns an unordered array containing the keys of the map. +Map function: Returns an unordered array containing the keys of the map. .. versionadded:: 2.3.0 @@ -15810,23 +15810,61 @@ def map_keys(col: "ColumnOrName") -> Column: Parameters -- col : :class:`~pyspark.sql.Column` or str -name of column or expression +Name of column or expression Returns --- :class:`~pyspark.sql.Column` -keys of the map as an array. +Keys of the map as an array. Examples ->>> from pyspark.sql.functions import map_keys +Example 1: Extracting keys from a simple map + +>>> from pyspark.sql import functions as sf >>> df = spark.sql("SELECT map(1, 'a', 2, 'b') as data") ->>> df.select(map_keys("data").alias("keys")).show() -+--+ -| keys| -+--+ -|[1, 2]| -+--+ +>>> df.select(sf.sort_array(sf.map_keys("data"))).show() +++ +|sort_array(map_keys(data), true)| +++ +| [1, 2]| +++ + +Example 2: Extracting keys from a map with complex keys + +>>> from pyspark.sql import functions as sf +>>> df = spark.sql("SELECT map(array(1, 2), 'a', array(3, 4), 'b') as data") +>>> df.select(sf.sort_array(sf.map_keys("data"))).show() +++ +|sort_array(map_keys(data), true)| +++ +|[[1, 2], [3, 4]]| +++ + +Example 3: Extracting keys from a map with duplicate keys + +>>> from pyspark.sql import functions as sf +>>> originalmapKeyDedupPolicy = spark.conf.get("spark.sql.mapKeyDedupPolicy") +>>> spark.conf.set("spark.sql.mapKeyDedupPolicy", "LAST_WIN") +>>> df = spark.sql("SELECT map(1, 'a', 1, 'b') as data") +>>> df.select(sf.map_keys("data")).show() ++--+ +|map_keys(data)| ++--+ +| [1]| ++--+ +>>> spark.conf.set("spark.sql.mapKeyDedupPolicy", originalmapKeyDedupPolicy) + +Example 4: Extracting keys from an empty map + +>>> from pyspark.sql import functions as sf +>>> df = spark.sql("SELECT map() as data") +>>> df.select(sf.map_keys("data")).show() ++--+ +|map_keys(data)| ++--+ +|[]| ++--+ """ return _invoke_function_over_columns("map_keys", col) @@ -15834,7 +15872,7 @@ def map_keys(col: "ColumnOrName") -> Column: @_try_remote_functions def map_values(col: "ColumnOrName") -> Column: """ -Collection function: Returns an unordered array containing the values of the map. +Map function: Returns an unordered array containing the values of the map. .. versionadded:: 2.3.0 @@ -15844,23 +15882,69 @@ def map_values(col: "ColumnOrName") -> Column: Parameters -- col : :class:`~pyspark.sql.Column` or str -name of column or expression +Name of column or expression Returns --- :class:`~pyspark.sql.Column` -values of the map as an
(spark) branch master updated: [SPARK-46719][PS][TESTS] Rebalance `pyspark_pandas` and `pyspark_pandas_slow`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ee0d243f0141 [SPARK-46719][PS][TESTS] Rebalance `pyspark_pandas` and `pyspark_pandas_slow` ee0d243f0141 is described below commit ee0d243f014176ede9d8cf8296f9a2df1798920b Author: Ruifeng Zheng AuthorDate: Mon Jan 15 15:10:55 2024 +0800 [SPARK-46719][PS][TESTS] Rebalance `pyspark_pandas` and `pyspark_pandas_slow` ### What changes were proposed in this pull request? Rebalance `pyspark_pandas` and `pyspark_pandas_slow` ### Why are the changes needed? before: `pyspark_pandas`: `Tests passed in 1849 seconds` `pyspark_pandas-slow`: `Tests passed in 3538 seconds` after: `pyspark_pandas`: `Tests passed in 2733 seconds` `pyspark_pandas-slow`: `Tests passed in 2804 seconds` ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? ci, https://github.com/zhengruifeng/spark/actions/runs/7524159324/job/20478674209 ### Was this patch authored or co-authored using generative AI tooling? no Closes #44731 from zhengruifeng/infra_rebalance_ps_test. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- dev/sparktestsupport/modules.py | 158 1 file changed, 79 insertions(+), 79 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 9cdbe4d250ca..202263febc93 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -699,6 +699,40 @@ pyspark_pandas = Module( "pyspark.pandas.spark.utils", "pyspark.pandas.typedef.typehints", # unittests +"pyspark.pandas.tests.test_categorical", +"pyspark.pandas.tests.test_config", +"pyspark.pandas.tests.test_extension", +"pyspark.pandas.tests.test_frame_spark", +"pyspark.pandas.tests.test_generic_functions", +"pyspark.pandas.tests.test_indexops_spark", +"pyspark.pandas.tests.test_internal", +"pyspark.pandas.tests.test_namespace", +"pyspark.pandas.tests.test_numpy_compat", +"pyspark.pandas.tests.test_repr", +"pyspark.pandas.tests.test_spark_functions", +"pyspark.pandas.tests.test_scalars", +"pyspark.pandas.tests.test_sql", +"pyspark.pandas.tests.test_typedef", +"pyspark.pandas.tests.test_utils", +"pyspark.pandas.tests.computation.test_any_all", +"pyspark.pandas.tests.computation.test_apply_func", +"pyspark.pandas.tests.computation.test_binary_ops", +"pyspark.pandas.tests.computation.test_combine", +"pyspark.pandas.tests.computation.test_compute", +"pyspark.pandas.tests.computation.test_corr", +"pyspark.pandas.tests.computation.test_corrwith", +"pyspark.pandas.tests.computation.test_cov", +"pyspark.pandas.tests.computation.test_cumulative", +"pyspark.pandas.tests.computation.test_describe", +"pyspark.pandas.tests.computation.test_eval", +"pyspark.pandas.tests.computation.test_melt", +"pyspark.pandas.tests.computation.test_missing_data", +"pyspark.pandas.tests.computation.test_pivot", +"pyspark.pandas.tests.computation.test_pivot_table", +"pyspark.pandas.tests.computation.test_pivot_table_adv", +"pyspark.pandas.tests.computation.test_pivot_table_multi_idx", +"pyspark.pandas.tests.computation.test_pivot_table_multi_idx_adv", +"pyspark.pandas.tests.computation.test_stats", "pyspark.pandas.tests.data_type_ops.test_as_type", "pyspark.pandas.tests.data_type_ops.test_base", "pyspark.pandas.tests.data_type_ops.test_binary_ops", @@ -717,42 +751,26 @@ pyspark_pandas = Module( "pyspark.pandas.tests.data_type_ops.test_string_ops", "pyspark.pandas.tests.data_type_ops.test_udt_ops", "pyspark.pandas.tests.data_type_ops.test_timedelta_ops", -"pyspark.pandas.tests.indexes.test_category", -"pyspark.pandas.tests.indexes.test_timedelta", "pyspark.pandas.tests.plot.test_frame_plot", "pyspark.pandas.tests.plot.test_frame_plot_matplotlib", "pyspark.pandas.tests.plot.test_frame_plot_plotly", "pyspark.pandas.tests.plot.test_series_plot", "pyspark.pandas.tests.plot.test_series_plot_matplotlib", "pyspark.pandas.tests.plot.test_series_plot_plotly", -"pyspark.pandas.tests.test_categorical", -"pyspark.pandas.tests.test_config", -"pyspark.pandas.tests.indexes.test_default", -"pyspark.pandas.tests.window.test_expanding", -"pyspark.pandas.tests.window.test_expanding_adv", -"pyspark.pandas.tests.window.test_expanding_error", -
(spark) branch master updated: [SPARK-45435][DOC] Document that lazy checkpoint may not be a consistent snapshot
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 65d822c44d93 [SPARK-45435][DOC] Document that lazy checkpoint may not be a consistent snapshot 65d822c44d93 is described below commit 65d822c44d93077b68d8738c261fcaf9288cc960 Author: Juliusz Sompolski AuthorDate: Mon Jan 15 14:55:39 2024 +0800 [SPARK-45435][DOC] Document that lazy checkpoint may not be a consistent snapshot ### What changes were proposed in this pull request? Some may want to use checkpoint to get a consistent snapshot of the Dataset / RDD. Warn that this is not the case with lazy checkpoint, because checkpoint is computed only at the end of the first action, and the data used during the first action may be different because of non-determinism and retries. `doCheckpoint` is only called at the end of [SparkContext.runJob](https://github.com/apache/spark/blob/5446f548bbc8a93414f1c773a8daf714b57b7d1a/core/src/main/scala/org/apache/spark/SparkContext.scala#L2426). This may cause recomputation both of data of [local checkpoint data](https://github.com/apache/spark/blob/5446f548bbc8a93414f1c773a8daf714b57b7d1a/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala#L54) and [reliable checkpoint data](https://github.com/apache/sp [...] ### Why are the changes needed? Document a gnarly edge case. ### Does this PR introduce _any_ user-facing change? Yes, change to documentation of public APIs. ### How was this patch tested? Doc only change. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #43247 from juliuszsompolski/SPARK-45435-doc. Authored-by: Juliusz Sompolski Signed-off-by: Wenchen Fan --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 14 ++ .../src/main/scala/org/apache/spark/sql/Dataset.scala| 16 2 files changed, 30 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 9518433a7f69..d73fb1b9bc3b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1649,6 +1649,13 @@ abstract class RDD[T: ClassTag]( * RDDs will be removed. This function must be called before any job has been * executed on this RDD. It is strongly recommended that this RDD is persisted in * memory, otherwise saving it on a file will require recomputation. + * + * The data is only checkpointed when `doCheckpoint()` is called, and this only happens at the + * end of the first action execution on this RDD. The final data that is checkpointed after the + * first action may be different from the data that was used during the action, due to + * non-determinism of the underlying operation and retries. If the purpose of the checkpoint is + * to achieve saving a deterministic snapshot of the data, an eager action may need to be called + * first on the RDD to trigger the checkpoint. */ def checkpoint(): Unit = RDDCheckpointData.synchronized { // NOTE: we use a global lock here due to complexities downstream with ensuring @@ -1678,6 +1685,13 @@ abstract class RDD[T: ClassTag]( * `spark.dynamicAllocation.cachedExecutorIdleTimeout` to a high value. * * The checkpoint directory set through `SparkContext#setCheckpointDir` is not used. + * + * The data is only checkpointed when `doCheckpoint()` is called, and this only happens at the + * end of the first action execution on this RDD. The final data that is checkpointed after the + * first action may be different from the data that was used during the action, due to + * non-determinism of the underlying operation and retries. If the purpose of the checkpoint is + * to achieve saving a deterministic snapshot of the data, an eager action may need to be called + * first on the RDD to trigger the checkpoint. */ def localCheckpoint(): this.type = RDDCheckpointData.synchronized { if (Utils.isDynamicAllocationEnabled(conf) && diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index d792cdbcf865..0038f1a510b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -688,6 +688,14 @@ class Dataset[T] private[sql]( * plan may grow exponentially. It will be saved to files inside the checkpoint * directory set with `SparkContext#setCheckpointDir`. * + * @param eager Whether to checkpoint this dataframe immediately + * + * @note When checkpoint is used with eager = false, the final data that is
(spark) branch master updated: [SPARK-46716][SS][TESTS] Add a test regarding to backward compatibility check for Scala StreamingQueryListener
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b18b6cca28ef [SPARK-46716][SS][TESTS] Add a test regarding to backward compatibility check for Scala StreamingQueryListener b18b6cca28ef is described below commit b18b6cca28ef7ed1199ae591d00e945ae4ae611a Author: Jungtaek Lim AuthorDate: Mon Jan 15 15:32:32 2024 +0900 [SPARK-46716][SS][TESTS] Add a test regarding to backward compatibility check for Scala StreamingQueryListener ### What changes were proposed in this pull request? This PR proposes to add a functionality to perform backward compatibility check for StreamingQueryListener in Scala, specifically implementing `onQueryIdle` or not. ### Why are the changes needed? We missed to add backward compatibility test when introducing onQueryIdle, and it led to an issue in PySpark (https://issues.apache.org/jira/browse/SPARK-45631). We added the compatibility test in PySpark but didn't add it in Scala. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Modified UT. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44730 from HeartSaVioR/SPARK-46716. Authored-by: Jungtaek Lim Signed-off-by: Hyukjin Kwon --- .../streaming/StreamingQueryListenerSuite.scala| 63 -- 1 file changed, 48 insertions(+), 15 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 2504cd17acfc..d9ce8002d285 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -57,10 +57,14 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } testQuietly("single listener, check trigger events are generated correctly") { +testSingleListenerBasic(new EventCollectorV1) +testSingleListenerBasic(new EventCollectorV2) + } + + private def testSingleListenerBasic(listener: EventCollector): Unit = { val clock = new StreamManualClock val inputData = new MemoryStream[Int](0, sqlContext) val df = inputData.toDS().as[Long].map { 10 / _ } -val listener = new EventCollector case class AssertStreamExecThreadToWaitForClock() extends AssertOnQuery(q => { @@ -155,7 +159,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { test("SPARK-19594: all of listeners should receive QueryTerminatedEvent") { val df = MemoryStream[Int].toDS().as[Long] -val listeners = (1 to 5).map(_ => new EventCollector) +val listeners = (1 to 5).map(_ => new EventCollectorV2) try { listeners.foreach(listener => spark.streams.addListener(listener)) testStream(df, OutputMode.Append)( @@ -182,7 +186,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { test("continuous processing listeners should receive QueryTerminatedEvent") { val df = spark.readStream.format("rate").load() -val listeners = (1 to 5).map(_ => new EventCollector) +val listeners = (1 to 5).map(_ => new EventCollectorV2) try { listeners.foreach(listener => spark.streams.addListener(listener)) testStream(df, OutputMode.Append)( @@ -218,8 +222,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } try { - val listener1 = new EventCollector - val listener2 = new EventCollector + val listener1 = new EventCollectorV1 + val listener2 = new EventCollectorV2 spark.streams.addListener(listener1) assert(isListenerActive(listener1)) @@ -236,7 +240,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } test("event ordering") { -val listener = new EventCollector +val listener = new EventCollectorV2 withListenerAdded(listener) { for (i <- 1 to 50) { listener.reset() @@ -348,8 +352,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { test("listener only posts events from queries started in the related sessions") { val session1 = spark.newSession() val session2 = spark.newSession() -val collector1 = new EventCollector -val collector2 = new EventCollector +val collector1 = new EventCollectorV2 +val collector2 = new EventCollectorV2 def runQuery(session: SparkSession): Unit = { collector1.reset() @@ -434,7 +438,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { .observe( name
(spark) branch master updated: [SPARK-46715][INFRA] Pin `sphinxcontrib-*`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 416b7b1cd5a6 [SPARK-46715][INFRA] Pin `sphinxcontrib-*` 416b7b1cd5a6 is described below commit 416b7b1cd5a6555a2d545d2f8f3cbd6cadff130e Author: Ruifeng Zheng AuthorDate: Mon Jan 15 12:00:26 2024 +0800 [SPARK-46715][INFRA] Pin `sphinxcontrib-*` ### What changes were proposed in this pull request? Pin - `sphinxcontrib-applehelp==1.0.4` - `sphinxcontrib-devhelp==1.0.2` - `sphinxcontrib-htmlhelp==2.0.1` - `sphinxcontrib-qthelp==1.0.3` - `sphinxcontrib-serializinghtml==1.1.5` previously, `Install Python linter dependencies` install `sphinxcontrib-applehelp-1.0.7`, and then `Install dependencies for documentation generation` reinstall it with `sphinxcontrib-applehelp-1.0.4`; now, `Install Python linter dependencies` install `sphinxcontrib-applehelp-1.0.8`, and `Install dependencies for documentation generation` keep this intallation: `Requirement already satisfied: sphinxcontrib-applehelp in /usr/local/lib/python3.9/dist-packages (from sphinx==4.5.0) (1.0.8)` ### Why are the changes needed? doc build is failing with: ``` Sphinx version error: The sphinxcontrib.applehelp extension used by this project needs at least Sphinx v5.0; it therefore cannot be built with this version. make: *** [Makefile:35: html] Error 2 Jekyll 4.3.3 Please append `--trace` to the `build` command for any additional information or backtrace. /__w/spark/spark/docs/_plugins/copy_api_dirs.rb:131:in `': Python doc generation failed (RuntimeError) from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.3.3/lib/jekyll/external.rb:57:in `require' from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.3.3/lib/jekyll/external.rb:57:in `block in require_with_graceful_fail' ``` ``` Sphinx version error: The sphinxcontrib.devhelp extension used by this project needs at least Sphinx v5.0; it therefore cannot be built with this version. make: *** [Makefile:35: html] Error 2 Jekyll 4.3.3 Please append `--trace` to the `build` command for any additional information or backtrace. /__w/spark/spark/docs/_plugins/copy_api_dirs.rb:131:in `': Python doc generation failed (RuntimeError) ``` ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #44727 from zhengruifeng/infra_pin_sphinxcontrib-applehelp. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .github/workflows/build_and_test.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 089a9296bcc4..40bcf734c6af 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -753,7 +753,8 @@ jobs: Rscript -e "devtools::install_version('preferably', version='0.4', repos='https://cloud.r-project.org')" - name: Install dependencies for documentation generation run: | -python3.9 -m pip install 'sphinx==4.5.0' mkdocs 'pydata_sphinx_theme>=0.13' sphinx-copybutton nbsphinx numpydoc jinja2 markupsafe 'pyzmq<24.0.0' +# Should unpin 'sphinxcontrib-*' after upgrading sphinx>5 +python3.9 -m pip install 'sphinx==4.5.0' mkdocs 'pydata_sphinx_theme>=0.13' sphinx-copybutton nbsphinx numpydoc jinja2 markupsafe 'pyzmq<24.0.0' 'sphinxcontrib-applehelp==1.0.4' 'sphinxcontrib-devhelp==1.0.2' 'sphinxcontrib-htmlhelp==2.0.1' 'sphinxcontrib-qthelp==1.0.3' 'sphinxcontrib-serializinghtml==1.1.5' python3.9 -m pip install ipython_genutils # See SPARK-38517 python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' pyarrow pandas 'plotly>=4.8' python3.9 -m pip install 'docutils<0.18.0' # See SPARK-39421 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (86bac8911a31 -> 4f72ef843c4e)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 86bac8911a31 [SPARK-46711][SS] Fix RocksDB state provider race condition during rollback add 4f72ef843c4e [MINOR][DOCS] Add missing part of RDD mapPartitions* docstrings No new revisions were added by this update. Summary of changes: python/pyspark/rdd.py | 7 +++ 1 file changed, 7 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-46711][SS] Fix RocksDB state provider race condition during rollback
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 86bac8911a31 [SPARK-46711][SS] Fix RocksDB state provider race condition during rollback 86bac8911a31 is described below commit 86bac8911a31a98eb6ad1f5365554f4f095c0376 Author: Anish Shrigondekar AuthorDate: Mon Jan 15 11:09:34 2024 +0900 [SPARK-46711][SS] Fix RocksDB state provider race condition during rollback ### What changes were proposed in this pull request? Fix RocksDB state provider race condition during rollback ### Why are the changes needed? The rollback() method in RocksDB is not properly synchronized, thus a race condition can be introduced during rollback when there are tasks trying to commit. The symptom of the race condition is the following exception being thrown: ``` `Caused by: java.io.FileNotFoundException: No such file or directory: ...state/0/54/10369.changelog at shaded.databricks.org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:4069) at shaded.databricks.org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3907) at shaded.databricks.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3801) at com.databricks.common.filesystem.LokiS3FS.getFileStatusNoCache(LokiS3FS.scala:91) at com.databricks.common.filesystem.LokiS3FS.getFileStatus(LokiS3FS.scala:86) at shaded.databricks.org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:1525)` ``` This race condition can happen for the following sequence of events 1. task A gets cancelled after releasing lock for rocksdb 2. task B starts and loads 10368 3. task A performs rocksdb rollback to -1 4. task B reads data from rocksdb and commits ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #44722 from anishshri-db/task/SPARK-46711. Authored-by: Anish Shrigondekar Signed-off-by: Jungtaek Lim --- .../scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 101a9e6b9199..0284d4c9d303 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -452,6 +452,7 @@ class RocksDB( * Drop uncommitted changes, and roll back to previous version. */ def rollback(): Unit = { +acquire() numKeysOnWritingVersion = numKeysOnLoadedVersion loadedVersion = -1L changelogWriter.foreach(_.abort()) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [MINOR][SS] Fix indent for streaming aggregation operator
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 173ba2c6327f [MINOR][SS] Fix indent for streaming aggregation operator 173ba2c6327f is described below commit 173ba2c6327ff74bab74c1660c80c0c8b43707c9 Author: Anish Shrigondekar AuthorDate: Mon Jan 15 09:26:24 2024 +0900 [MINOR][SS] Fix indent for streaming aggregation operator ### What changes were proposed in this pull request? Fix indent for streaming aggregation operator ### Why are the changes needed? Indent/style change ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #44723 from anishshri-db/task/SPARK-46712. Authored-by: Anish Shrigondekar Signed-off-by: Jungtaek Lim --- .../execution/streaming/statefulOperators.scala| 30 +++--- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 80f5b3532c5e..8cb99a162ab2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -434,22 +434,22 @@ case class StateStoreRestoreExec( numColsPrefixKey = 0, session.sessionState, Some(session.streams.stateStoreCoordinator)) { case (store, iter) => -val hasInput = iter.hasNext -if (!hasInput && keyExpressions.isEmpty) { - // If our `keyExpressions` are empty, we're getting a global aggregation. In that case - // the `HashAggregateExec` will output a 0 value for the partial merge. We need to - // restore the value, so that we don't overwrite our state with a 0 value, but rather - // merge the 0 with existing state. - store.iterator().map(_.value) -} else { - iter.flatMap { row => -val key = stateManager.getKey(row.asInstanceOf[UnsafeRow]) -val restoredRow = stateManager.get(store, key) -val outputRows = Option(restoredRow).toSeq :+ row -numOutputRows += outputRows.size -outputRows - } + val hasInput = iter.hasNext + if (!hasInput && keyExpressions.isEmpty) { +// If our `keyExpressions` are empty, we're getting a global aggregation. In that case +// the `HashAggregateExec` will output a 0 value for the partial merge. We need to +// restore the value, so that we don't overwrite our state with a 0 value, but rather +// merge the 0 with existing state. +store.iterator().map(_.value) + } else { +iter.flatMap { row => + val key = stateManager.getKey(row.asInstanceOf[UnsafeRow]) + val restoredRow = stateManager.get(store, key) + val outputRows = Option(restoredRow).toSeq :+ row + numOutputRows += outputRows.size + outputRows } + } } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-46709][SS] Expose partition_id column for state data source
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 1c3b94150b44 [SPARK-46709][SS] Expose partition_id column for state data source 1c3b94150b44 is described below commit 1c3b94150b44f51af4e23601fb6e7e51c4605712 Author: Chaoqin Li AuthorDate: Mon Jan 15 08:21:19 2024 +0900 [SPARK-46709][SS] Expose partition_id column for state data source ### What changes were proposed in this pull request? Expose the partition_id column of state data source was hidden by default. ### Why are the changes needed? partition_id column is useful to users. ### Does this PR introduce _any_ user-facing change? yes, Expose the partition_id column of state data source was hidden by default and modify the doc accordingly. ### How was this patch tested? Modify existing integration test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44717 from chaoqin-li1123/unhide_partition_id. Authored-by: Chaoqin Li Signed-off-by: Hyukjin Kwon --- docs/structured-streaming-state-data-source.md | 4 ++-- .../datasources/v2/state/StateDataSource.scala | 3 ++- .../v2/state/StatePartitionReader.scala| 18 .../datasources/v2/state/StateTable.scala | 22 +-- .../StreamStreamJoinStatePartitionReader.scala | 18 .../v2/state/StateDataSourceReadSuite.scala| 25 ++ 6 files changed, 24 insertions(+), 66 deletions(-) diff --git a/docs/structured-streaming-state-data-source.md b/docs/structured-streaming-state-data-source.md index ae323f6b0c14..986699130669 100644 --- a/docs/structured-streaming-state-data-source.md +++ b/docs/structured-streaming-state-data-source.md @@ -96,9 +96,9 @@ Each row in the source has the following schema: - _partition_id + partition_id int - metadata column (hidden unless specified with SELECT) + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala index 1192accaabef..1a8f444042c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.{DI import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide, RightSide} import org.apache.spark.sql.execution.streaming.state.{StateSchemaCompatibilityChecker, StateStore, StateStoreConf, StateStoreId, StateStoreProviderId} import org.apache.spark.sql.sources.DataSourceRegister -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{IntegerType, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap /** @@ -83,6 +83,7 @@ class StateDataSource extends TableProvider with DataSourceRegister { new StructType() .add("key", keySchema) .add("value", valueSchema) +.add("partition_id", IntegerType) } catch { case NonFatal(e) => throw StateDataSourceErrors.failedToReadStateSchema(sourceOptions, e) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala index 1e5f7216e8bf..ef8d7bf628bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources.v2.state import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil import org.apache.spark.sql.execution.streaming.state.{ReadStateStore, StateStore, StateStoreConf, StateStoreId, StateStoreProviderId} @@ -99,18 +99,7 @@ class StatePartitionReader( } } - private val joinedRow = new JoinedRow() - - private def addMetadata(row: InternalRow): InternalRow = { -val metadataRow = new GenericInternalRow( - StateTable.METADATA_COLUMNS.map(_.name()).map { -