(spark) branch master updated: [SPARK-46713][PYTHON][DOCS] Refine docstring of `map_keys/map_values/map_entries`

2024-01-14 Thread yangjie01
This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 02573741605d [SPARK-46713][PYTHON][DOCS] Refine docstring of 
`map_keys/map_values/map_entries`
02573741605d is described below

commit 02573741605dc240de0af6c8d23f983cfd303cc6
Author: yangjie01 
AuthorDate: Mon Jan 15 15:15:01 2024 +0800

[SPARK-46713][PYTHON][DOCS] Refine docstring of 
`map_keys/map_values/map_entries`

### What changes were proposed in this pull request?
This pr refine docstring of  `map_keys/map_values/map_entries` and add some 
new examples.

### Why are the changes needed?
To improve PySpark documentation

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass Github Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44724 from LuciferYang/SPARK-46713.

Lead-authored-by: yangjie01 
Co-authored-by: YangJie 
Signed-off-by: yangjie01 
---
 python/pyspark/sql/functions/builtin.py | 192 +---
 1 file changed, 154 insertions(+), 38 deletions(-)

diff --git a/python/pyspark/sql/functions/builtin.py 
b/python/pyspark/sql/functions/builtin.py
index a05ce7b04368..f1422d17b071 100644
--- a/python/pyspark/sql/functions/builtin.py
+++ b/python/pyspark/sql/functions/builtin.py
@@ -15800,7 +15800,7 @@ def map_contains_key(col: "ColumnOrName", value: Any) 
-> Column:
 @_try_remote_functions
 def map_keys(col: "ColumnOrName") -> Column:
 """
-Collection function: Returns an unordered array containing the keys of the 
map.
+Map function: Returns an unordered array containing the keys of the map.
 
 .. versionadded:: 2.3.0
 
@@ -15810,23 +15810,61 @@ def map_keys(col: "ColumnOrName") -> Column:
 Parameters
 --
 col : :class:`~pyspark.sql.Column` or str
-name of column or expression
+Name of column or expression
 
 Returns
 ---
 :class:`~pyspark.sql.Column`
-keys of the map as an array.
+Keys of the map as an array.
 
 Examples
 
->>> from pyspark.sql.functions import map_keys
+Example 1: Extracting keys from a simple map
+
+>>> from pyspark.sql import functions as sf
 >>> df = spark.sql("SELECT map(1, 'a', 2, 'b') as data")
->>> df.select(map_keys("data").alias("keys")).show()
-+--+
-|  keys|
-+--+
-|[1, 2]|
-+--+
+>>> df.select(sf.sort_array(sf.map_keys("data"))).show()
+++
+|sort_array(map_keys(data), true)|
+++
+|  [1, 2]|
+++
+
+Example 2: Extracting keys from a map with complex keys
+
+>>> from pyspark.sql import functions as sf
+>>> df = spark.sql("SELECT map(array(1, 2), 'a', array(3, 4), 'b') as 
data")
+>>> df.select(sf.sort_array(sf.map_keys("data"))).show()
+++
+|sort_array(map_keys(data), true)|
+++
+|[[1, 2], [3, 4]]|
+++
+
+Example 3: Extracting keys from a map with duplicate keys
+
+>>> from pyspark.sql import functions as sf
+>>> originalmapKeyDedupPolicy = 
spark.conf.get("spark.sql.mapKeyDedupPolicy")
+>>> spark.conf.set("spark.sql.mapKeyDedupPolicy", "LAST_WIN")
+>>> df = spark.sql("SELECT map(1, 'a', 1, 'b') as data")
+>>> df.select(sf.map_keys("data")).show()
++--+
+|map_keys(data)|
++--+
+|   [1]|
++--+
+>>> spark.conf.set("spark.sql.mapKeyDedupPolicy", 
originalmapKeyDedupPolicy)
+
+Example 4: Extracting keys from an empty map
+
+>>> from pyspark.sql import functions as sf
+>>> df = spark.sql("SELECT map() as data")
+>>> df.select(sf.map_keys("data")).show()
++--+
+|map_keys(data)|
++--+
+|[]|
++--+
 """
 return _invoke_function_over_columns("map_keys", col)
 
@@ -15834,7 +15872,7 @@ def map_keys(col: "ColumnOrName") -> Column:
 @_try_remote_functions
 def map_values(col: "ColumnOrName") -> Column:
 """
-Collection function: Returns an unordered array containing the values of 
the map.
+Map function: Returns an unordered array containing the values of the map.
 
 .. versionadded:: 2.3.0
 
@@ -15844,23 +15882,69 @@ def map_values(col: "ColumnOrName") -> Column:
 Parameters
 --
 col : :class:`~pyspark.sql.Column` or str
-name of column or expression
+Name of column or expression
 
 Returns
 ---
 :class:`~pyspark.sql.Column`
-values of the map as an 

(spark) branch master updated: [SPARK-46719][PS][TESTS] Rebalance `pyspark_pandas` and `pyspark_pandas_slow`

2024-01-14 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ee0d243f0141 [SPARK-46719][PS][TESTS] Rebalance `pyspark_pandas` and 
`pyspark_pandas_slow`
ee0d243f0141 is described below

commit ee0d243f014176ede9d8cf8296f9a2df1798920b
Author: Ruifeng Zheng 
AuthorDate: Mon Jan 15 15:10:55 2024 +0800

[SPARK-46719][PS][TESTS] Rebalance `pyspark_pandas` and 
`pyspark_pandas_slow`

### What changes were proposed in this pull request?
Rebalance `pyspark_pandas` and `pyspark_pandas_slow`

### Why are the changes needed?
before:
`pyspark_pandas`: `Tests passed in 1849 seconds`
`pyspark_pandas-slow`: `Tests passed in 3538 seconds`

after:
`pyspark_pandas`: `Tests passed in 2733 seconds`
`pyspark_pandas-slow`: `Tests passed in 2804 seconds`

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
ci, 
https://github.com/zhengruifeng/spark/actions/runs/7524159324/job/20478674209

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #44731 from zhengruifeng/infra_rebalance_ps_test.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 dev/sparktestsupport/modules.py | 158 
 1 file changed, 79 insertions(+), 79 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 9cdbe4d250ca..202263febc93 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -699,6 +699,40 @@ pyspark_pandas = Module(
 "pyspark.pandas.spark.utils",
 "pyspark.pandas.typedef.typehints",
 # unittests
+"pyspark.pandas.tests.test_categorical",
+"pyspark.pandas.tests.test_config",
+"pyspark.pandas.tests.test_extension",
+"pyspark.pandas.tests.test_frame_spark",
+"pyspark.pandas.tests.test_generic_functions",
+"pyspark.pandas.tests.test_indexops_spark",
+"pyspark.pandas.tests.test_internal",
+"pyspark.pandas.tests.test_namespace",
+"pyspark.pandas.tests.test_numpy_compat",
+"pyspark.pandas.tests.test_repr",
+"pyspark.pandas.tests.test_spark_functions",
+"pyspark.pandas.tests.test_scalars",
+"pyspark.pandas.tests.test_sql",
+"pyspark.pandas.tests.test_typedef",
+"pyspark.pandas.tests.test_utils",
+"pyspark.pandas.tests.computation.test_any_all",
+"pyspark.pandas.tests.computation.test_apply_func",
+"pyspark.pandas.tests.computation.test_binary_ops",
+"pyspark.pandas.tests.computation.test_combine",
+"pyspark.pandas.tests.computation.test_compute",
+"pyspark.pandas.tests.computation.test_corr",
+"pyspark.pandas.tests.computation.test_corrwith",
+"pyspark.pandas.tests.computation.test_cov",
+"pyspark.pandas.tests.computation.test_cumulative",
+"pyspark.pandas.tests.computation.test_describe",
+"pyspark.pandas.tests.computation.test_eval",
+"pyspark.pandas.tests.computation.test_melt",
+"pyspark.pandas.tests.computation.test_missing_data",
+"pyspark.pandas.tests.computation.test_pivot",
+"pyspark.pandas.tests.computation.test_pivot_table",
+"pyspark.pandas.tests.computation.test_pivot_table_adv",
+"pyspark.pandas.tests.computation.test_pivot_table_multi_idx",
+"pyspark.pandas.tests.computation.test_pivot_table_multi_idx_adv",
+"pyspark.pandas.tests.computation.test_stats",
 "pyspark.pandas.tests.data_type_ops.test_as_type",
 "pyspark.pandas.tests.data_type_ops.test_base",
 "pyspark.pandas.tests.data_type_ops.test_binary_ops",
@@ -717,42 +751,26 @@ pyspark_pandas = Module(
 "pyspark.pandas.tests.data_type_ops.test_string_ops",
 "pyspark.pandas.tests.data_type_ops.test_udt_ops",
 "pyspark.pandas.tests.data_type_ops.test_timedelta_ops",
-"pyspark.pandas.tests.indexes.test_category",
-"pyspark.pandas.tests.indexes.test_timedelta",
 "pyspark.pandas.tests.plot.test_frame_plot",
 "pyspark.pandas.tests.plot.test_frame_plot_matplotlib",
 "pyspark.pandas.tests.plot.test_frame_plot_plotly",
 "pyspark.pandas.tests.plot.test_series_plot",
 "pyspark.pandas.tests.plot.test_series_plot_matplotlib",
 "pyspark.pandas.tests.plot.test_series_plot_plotly",
-"pyspark.pandas.tests.test_categorical",
-"pyspark.pandas.tests.test_config",
-"pyspark.pandas.tests.indexes.test_default",
-"pyspark.pandas.tests.window.test_expanding",
-"pyspark.pandas.tests.window.test_expanding_adv",
-"pyspark.pandas.tests.window.test_expanding_error",
-

(spark) branch master updated: [SPARK-45435][DOC] Document that lazy checkpoint may not be a consistent snapshot

2024-01-14 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 65d822c44d93 [SPARK-45435][DOC] Document that lazy checkpoint may not 
be a consistent snapshot
65d822c44d93 is described below

commit 65d822c44d93077b68d8738c261fcaf9288cc960
Author: Juliusz Sompolski 
AuthorDate: Mon Jan 15 14:55:39 2024 +0800

[SPARK-45435][DOC] Document that lazy checkpoint may not be a consistent 
snapshot

### What changes were proposed in this pull request?

Some may want to use checkpoint to get a consistent snapshot of the Dataset 
/ RDD. Warn that this is not the case with lazy checkpoint, because checkpoint 
is computed only at the end of the first action, and the data used during the 
first action may be different because of non-determinism and retries.

`doCheckpoint` is only called at the end of 
[SparkContext.runJob](https://github.com/apache/spark/blob/5446f548bbc8a93414f1c773a8daf714b57b7d1a/core/src/main/scala/org/apache/spark/SparkContext.scala#L2426).
 This may cause recomputation both of data of [local checkpoint 
data](https://github.com/apache/spark/blob/5446f548bbc8a93414f1c773a8daf714b57b7d1a/core/src/main/scala/org/apache/spark/rdd/LocalRDDCheckpointData.scala#L54)
 and [reliable checkpoint data](https://github.com/apache/sp [...]

### Why are the changes needed?

Document a gnarly edge case.

### Does this PR introduce _any_ user-facing change?

Yes, change to documentation of public APIs.

### How was this patch tested?

Doc only change.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43247 from juliuszsompolski/SPARK-45435-doc.

Authored-by: Juliusz Sompolski 
Signed-off-by: Wenchen Fan 
---
 core/src/main/scala/org/apache/spark/rdd/RDD.scala   | 14 ++
 .../src/main/scala/org/apache/spark/sql/Dataset.scala| 16 
 2 files changed, 30 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 9518433a7f69..d73fb1b9bc3b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1649,6 +1649,13 @@ abstract class RDD[T: ClassTag](
* RDDs will be removed. This function must be called before any job has been
* executed on this RDD. It is strongly recommended that this RDD is 
persisted in
* memory, otherwise saving it on a file will require recomputation.
+   *
+   * The data is only checkpointed when `doCheckpoint()` is called, and this 
only happens at the
+   * end of the first action execution on this RDD. The final data that is 
checkpointed after the
+   * first action may be different from the data that was used during the 
action, due to
+   * non-determinism of the underlying operation and retries. If the purpose 
of the checkpoint is
+   * to achieve saving a deterministic snapshot of the data, an eager action 
may need to be called
+   * first on the RDD to trigger the checkpoint.
*/
   def checkpoint(): Unit = RDDCheckpointData.synchronized {
 // NOTE: we use a global lock here due to complexities downstream with 
ensuring
@@ -1678,6 +1685,13 @@ abstract class RDD[T: ClassTag](
* `spark.dynamicAllocation.cachedExecutorIdleTimeout` to a high value.
*
* The checkpoint directory set through `SparkContext#setCheckpointDir` is 
not used.
+   *
+   * The data is only checkpointed when `doCheckpoint()` is called, and this 
only happens at the
+   * end of the first action execution on this RDD. The final data that is 
checkpointed after the
+   * first action may be different from the data that was used during the 
action, due to
+   * non-determinism of the underlying operation and retries. If the purpose 
of the checkpoint is
+   * to achieve saving a deterministic snapshot of the data, an eager action 
may need to be called
+   * first on the RDD to trigger the checkpoint.
*/
   def localCheckpoint(): this.type = RDDCheckpointData.synchronized {
 if (Utils.isDynamicAllocationEnabled(conf) &&
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index d792cdbcf865..0038f1a510b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -688,6 +688,14 @@ class Dataset[T] private[sql](
* plan may grow exponentially. It will be saved to files inside the 
checkpoint
* directory set with `SparkContext#setCheckpointDir`.
*
+   * @param eager Whether to checkpoint this dataframe immediately
+   *
+   * @note When checkpoint is used with eager = false, the final data that is 

(spark) branch master updated: [SPARK-46716][SS][TESTS] Add a test regarding to backward compatibility check for Scala StreamingQueryListener

2024-01-14 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b18b6cca28ef [SPARK-46716][SS][TESTS] Add a test regarding to backward 
compatibility check for Scala StreamingQueryListener
b18b6cca28ef is described below

commit b18b6cca28ef7ed1199ae591d00e945ae4ae611a
Author: Jungtaek Lim 
AuthorDate: Mon Jan 15 15:32:32 2024 +0900

[SPARK-46716][SS][TESTS] Add a test regarding to backward compatibility 
check for Scala StreamingQueryListener

### What changes were proposed in this pull request?

This PR proposes to add a functionality to perform backward compatibility 
check for StreamingQueryListener in Scala, specifically implementing 
`onQueryIdle` or not.

### Why are the changes needed?

We missed to add backward compatibility test when introducing onQueryIdle, 
and it led to an issue in PySpark 
(https://issues.apache.org/jira/browse/SPARK-45631). We added the compatibility 
test in PySpark but didn't add it in Scala.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Modified UT.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44730 from HeartSaVioR/SPARK-46716.

Authored-by: Jungtaek Lim 
Signed-off-by: Hyukjin Kwon 
---
 .../streaming/StreamingQueryListenerSuite.scala| 63 --
 1 file changed, 48 insertions(+), 15 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 2504cd17acfc..d9ce8002d285 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -57,10 +57,14 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
   }
 
   testQuietly("single listener, check trigger events are generated correctly") 
{
+testSingleListenerBasic(new EventCollectorV1)
+testSingleListenerBasic(new EventCollectorV2)
+  }
+
+  private def testSingleListenerBasic(listener: EventCollector): Unit = {
 val clock = new StreamManualClock
 val inputData = new MemoryStream[Int](0, sqlContext)
 val df = inputData.toDS().as[Long].map { 10 / _ }
-val listener = new EventCollector
 
 case class AssertStreamExecThreadToWaitForClock()
   extends AssertOnQuery(q => {
@@ -155,7 +159,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
 
   test("SPARK-19594: all of listeners should receive QueryTerminatedEvent") {
 val df = MemoryStream[Int].toDS().as[Long]
-val listeners = (1 to 5).map(_ => new EventCollector)
+val listeners = (1 to 5).map(_ => new EventCollectorV2)
 try {
   listeners.foreach(listener => spark.streams.addListener(listener))
   testStream(df, OutputMode.Append)(
@@ -182,7 +186,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
 
   test("continuous processing listeners should receive QueryTerminatedEvent") {
 val df = spark.readStream.format("rate").load()
-val listeners = (1 to 5).map(_ => new EventCollector)
+val listeners = (1 to 5).map(_ => new EventCollectorV2)
 try {
   listeners.foreach(listener => spark.streams.addListener(listener))
   testStream(df, OutputMode.Append)(
@@ -218,8 +222,8 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
 }
 
 try {
-  val listener1 = new EventCollector
-  val listener2 = new EventCollector
+  val listener1 = new EventCollectorV1
+  val listener2 = new EventCollectorV2
 
   spark.streams.addListener(listener1)
   assert(isListenerActive(listener1))
@@ -236,7 +240,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
   }
 
   test("event ordering") {
-val listener = new EventCollector
+val listener = new EventCollectorV2
 withListenerAdded(listener) {
   for (i <- 1 to 50) {
 listener.reset()
@@ -348,8 +352,8 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
   test("listener only posts events from queries started in the related 
sessions") {
 val session1 = spark.newSession()
 val session2 = spark.newSession()
-val collector1 = new EventCollector
-val collector2 = new EventCollector
+val collector1 = new EventCollectorV2
+val collector2 = new EventCollectorV2
 
 def runQuery(session: SparkSession): Unit = {
   collector1.reset()
@@ -434,7 +438,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
   .observe(
 name 

(spark) branch master updated: [SPARK-46715][INFRA] Pin `sphinxcontrib-*`

2024-01-14 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 416b7b1cd5a6 [SPARK-46715][INFRA] Pin `sphinxcontrib-*`
416b7b1cd5a6 is described below

commit 416b7b1cd5a6555a2d545d2f8f3cbd6cadff130e
Author: Ruifeng Zheng 
AuthorDate: Mon Jan 15 12:00:26 2024 +0800

[SPARK-46715][INFRA] Pin `sphinxcontrib-*`

### What changes were proposed in this pull request?
Pin

- `sphinxcontrib-applehelp==1.0.4`
- `sphinxcontrib-devhelp==1.0.2`
- `sphinxcontrib-htmlhelp==2.0.1`
- `sphinxcontrib-qthelp==1.0.3`
- `sphinxcontrib-serializinghtml==1.1.5`

previously,
`Install Python linter dependencies` install 
`sphinxcontrib-applehelp-1.0.7`, and then `Install dependencies for 
documentation generation` reinstall it with `sphinxcontrib-applehelp-1.0.4`;

now,
`Install Python linter dependencies` install 
`sphinxcontrib-applehelp-1.0.8`, and `Install dependencies for documentation 
generation` keep this intallation:
`Requirement already satisfied: sphinxcontrib-applehelp in 
/usr/local/lib/python3.9/dist-packages (from sphinx==4.5.0) (1.0.8)`

### Why are the changes needed?
doc build is failing with:
```
Sphinx version error:
The sphinxcontrib.applehelp extension used by this project needs at least 
Sphinx v5.0; it therefore cannot be built with this version.
make: *** [Makefile:35: html] Error 2

  Jekyll 4.3.3   Please append `--trace` to the `build` command
 for any additional information or backtrace.

/__w/spark/spark/docs/_plugins/copy_api_dirs.rb:131:in `': 
Python doc generation failed (RuntimeError)
from 
/__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.3.3/lib/jekyll/external.rb:57:in
 `require'
from 
/__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.3.3/lib/jekyll/external.rb:57:in
 `block in require_with_graceful_fail'
```

```
Sphinx version error:
The sphinxcontrib.devhelp extension used by this project needs at least 
Sphinx v5.0; it therefore cannot be built with this version.
make: *** [Makefile:35: html] Error 2

  Jekyll 4.3.3   Please append `--trace` to the `build` command
 for any additional information or backtrace.

/__w/spark/spark/docs/_plugins/copy_api_dirs.rb:131:in `': 
Python doc generation failed (RuntimeError)
```

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #44727 from zhengruifeng/infra_pin_sphinxcontrib-applehelp.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 .github/workflows/build_and_test.yml | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/build_and_test.yml 
b/.github/workflows/build_and_test.yml
index 089a9296bcc4..40bcf734c6af 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -753,7 +753,8 @@ jobs:
 Rscript -e "devtools::install_version('preferably', version='0.4', 
repos='https://cloud.r-project.org')"
 - name: Install dependencies for documentation generation
   run: |
-python3.9 -m pip install 'sphinx==4.5.0' mkdocs 
'pydata_sphinx_theme>=0.13' sphinx-copybutton nbsphinx numpydoc jinja2 
markupsafe 'pyzmq<24.0.0'
+# Should unpin 'sphinxcontrib-*' after upgrading sphinx>5
+python3.9 -m pip install 'sphinx==4.5.0' mkdocs 
'pydata_sphinx_theme>=0.13' sphinx-copybutton nbsphinx numpydoc jinja2 
markupsafe 'pyzmq<24.0.0' 'sphinxcontrib-applehelp==1.0.4' 
'sphinxcontrib-devhelp==1.0.2' 'sphinxcontrib-htmlhelp==2.0.1' 
'sphinxcontrib-qthelp==1.0.3' 'sphinxcontrib-serializinghtml==1.1.5'
 python3.9 -m pip install ipython_genutils # See SPARK-38517
 python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' 
pyarrow pandas 'plotly>=4.8'
 python3.9 -m pip install 'docutils<0.18.0' # See SPARK-39421


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated (86bac8911a31 -> 4f72ef843c4e)

2024-01-14 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 86bac8911a31 [SPARK-46711][SS] Fix RocksDB state provider race 
condition during rollback
 add 4f72ef843c4e [MINOR][DOCS] Add missing part of RDD mapPartitions* 
docstrings

No new revisions were added by this update.

Summary of changes:
 python/pyspark/rdd.py | 7 +++
 1 file changed, 7 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46711][SS] Fix RocksDB state provider race condition during rollback

2024-01-14 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 86bac8911a31 [SPARK-46711][SS] Fix RocksDB state provider race 
condition during rollback
86bac8911a31 is described below

commit 86bac8911a31a98eb6ad1f5365554f4f095c0376
Author: Anish Shrigondekar 
AuthorDate: Mon Jan 15 11:09:34 2024 +0900

[SPARK-46711][SS] Fix RocksDB state provider race condition during rollback

### What changes were proposed in this pull request?
Fix RocksDB state provider race condition during rollback

### Why are the changes needed?
The rollback() method in RocksDB is not properly synchronized, thus a race 
condition can be introduced during rollback when there are tasks trying to 
commit.

The symptom of the race condition is the following exception being thrown:
```
`Caused by: java.io.FileNotFoundException: No such file or directory: 
...state/0/54/10369.changelog
at 
shaded.databricks.org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:4069)
at 
shaded.databricks.org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3907)
at 
shaded.databricks.org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3801)
at 
com.databricks.common.filesystem.LokiS3FS.getFileStatusNoCache(LokiS3FS.scala:91)
at 
com.databricks.common.filesystem.LokiS3FS.getFileStatus(LokiS3FS.scala:86)
at 
shaded.databricks.org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:1525)`
```

This race condition can happen for the following sequence of events
1. task A gets cancelled after releasing lock for rocksdb
2. task B starts and loads 10368
3. task A performs rocksdb rollback to -1
4. task B reads data from rocksdb and commits

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing unit tests

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44722 from anishshri-db/task/SPARK-46711.

Authored-by: Anish Shrigondekar 
Signed-off-by: Jungtaek Lim 
---
 .../scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala   | 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 101a9e6b9199..0284d4c9d303 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -452,6 +452,7 @@ class RocksDB(
* Drop uncommitted changes, and roll back to previous version.
*/
   def rollback(): Unit = {
+acquire()
 numKeysOnWritingVersion = numKeysOnLoadedVersion
 loadedVersion = -1L
 changelogWriter.foreach(_.abort())


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [MINOR][SS] Fix indent for streaming aggregation operator

2024-01-14 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 173ba2c6327f [MINOR][SS] Fix indent for streaming aggregation operator
173ba2c6327f is described below

commit 173ba2c6327ff74bab74c1660c80c0c8b43707c9
Author: Anish Shrigondekar 
AuthorDate: Mon Jan 15 09:26:24 2024 +0900

[MINOR][SS] Fix indent for streaming aggregation operator

### What changes were proposed in this pull request?
Fix indent for streaming aggregation operator

### Why are the changes needed?
Indent/style change

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing unit tests

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #44723 from anishshri-db/task/SPARK-46712.

Authored-by: Anish Shrigondekar 
Signed-off-by: Jungtaek Lim 
---
 .../execution/streaming/statefulOperators.scala| 30 +++---
 1 file changed, 15 insertions(+), 15 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 80f5b3532c5e..8cb99a162ab2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -434,22 +434,22 @@ case class StateStoreRestoreExec(
   numColsPrefixKey = 0,
   session.sessionState,
   Some(session.streams.stateStoreCoordinator)) { case (store, iter) =>
-val hasInput = iter.hasNext
-if (!hasInput && keyExpressions.isEmpty) {
-  // If our `keyExpressions` are empty, we're getting a global 
aggregation. In that case
-  // the `HashAggregateExec` will output a 0 value for the partial 
merge. We need to
-  // restore the value, so that we don't overwrite our state with a 0 
value, but rather
-  // merge the 0 with existing state.
-  store.iterator().map(_.value)
-} else {
-  iter.flatMap { row =>
-val key = stateManager.getKey(row.asInstanceOf[UnsafeRow])
-val restoredRow = stateManager.get(store, key)
-val outputRows = Option(restoredRow).toSeq :+ row
-numOutputRows += outputRows.size
-outputRows
-  }
+  val hasInput = iter.hasNext
+  if (!hasInput && keyExpressions.isEmpty) {
+// If our `keyExpressions` are empty, we're getting a global 
aggregation. In that case
+// the `HashAggregateExec` will output a 0 value for the partial 
merge. We need to
+// restore the value, so that we don't overwrite our state with a 0 
value, but rather
+// merge the 0 with existing state.
+store.iterator().map(_.value)
+  } else {
+iter.flatMap { row =>
+  val key = stateManager.getKey(row.asInstanceOf[UnsafeRow])
+  val restoredRow = stateManager.get(store, key)
+  val outputRows = Option(restoredRow).toSeq :+ row
+  numOutputRows += outputRows.size
+  outputRows
 }
+  }
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46709][SS] Expose partition_id column for state data source

2024-01-14 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 1c3b94150b44 [SPARK-46709][SS] Expose partition_id column for state 
data source
1c3b94150b44 is described below

commit 1c3b94150b44f51af4e23601fb6e7e51c4605712
Author: Chaoqin Li 
AuthorDate: Mon Jan 15 08:21:19 2024 +0900

[SPARK-46709][SS] Expose partition_id column for state data source

### What changes were proposed in this pull request?
Expose the partition_id column of state data source was hidden by default.

### Why are the changes needed?
partition_id column is useful to users.

### Does this PR introduce _any_ user-facing change?
yes, Expose the partition_id column of state data source was hidden by 
default and modify the doc accordingly.

### How was this patch tested?
Modify existing integration test.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #44717 from chaoqin-li1123/unhide_partition_id.

Authored-by: Chaoqin Li 
Signed-off-by: Hyukjin Kwon 
---
 docs/structured-streaming-state-data-source.md |  4 ++--
 .../datasources/v2/state/StateDataSource.scala |  3 ++-
 .../v2/state/StatePartitionReader.scala| 18 
 .../datasources/v2/state/StateTable.scala  | 22 +--
 .../StreamStreamJoinStatePartitionReader.scala | 18 
 .../v2/state/StateDataSourceReadSuite.scala| 25 ++
 6 files changed, 24 insertions(+), 66 deletions(-)

diff --git a/docs/structured-streaming-state-data-source.md 
b/docs/structured-streaming-state-data-source.md
index ae323f6b0c14..986699130669 100644
--- a/docs/structured-streaming-state-data-source.md
+++ b/docs/structured-streaming-state-data-source.md
@@ -96,9 +96,9 @@ Each row in the source has the following schema:
   
 
 
-  _partition_id
+  partition_id
   int
-  metadata column (hidden unless specified with SELECT)
+  
 
 
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
index 1192accaabef..1a8f444042c2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala
@@ -35,7 +35,7 @@ import 
org.apache.spark.sql.execution.streaming.StreamingCheckpointConstants.{DI
 import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{LeftSide,
 RightSide}
 import 
org.apache.spark.sql.execution.streaming.state.{StateSchemaCompatibilityChecker,
 StateStore, StateStoreConf, StateStoreId, StateStoreProviderId}
 import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{IntegerType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 /**
@@ -83,6 +83,7 @@ class StateDataSource extends TableProvider with 
DataSourceRegister {
   new StructType()
 .add("key", keySchema)
 .add("value", valueSchema)
+.add("partition_id", IntegerType)
 } catch {
   case NonFatal(e) =>
 throw StateDataSourceErrors.failedToReadStateSchema(sourceOptions, e)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
index 1e5f7216e8bf..ef8d7bf628bf 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.datasources.v2.state
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
JoinedRow, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeRow}
 import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, 
PartitionReaderFactory}
 import org.apache.spark.sql.execution.datasources.v2.state.utils.SchemaUtil
 import org.apache.spark.sql.execution.streaming.state.{ReadStateStore, 
StateStore, StateStoreConf, StateStoreId, StateStoreProviderId}
@@ -99,18 +99,7 @@ class StatePartitionReader(
 }
   }
 
-  private val joinedRow = new JoinedRow()
-
-  private def addMetadata(row: InternalRow): InternalRow = {
-val metadataRow = new GenericInternalRow(
-  StateTable.METADATA_COLUMNS.map(_.name()).map {
-