(spark) branch master updated: [SPARK-46341][PS][TESTS] Reorganize `SeriesInterpolateTests`

2023-12-09 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new bac3492980a3 [SPARK-46341][PS][TESTS] Reorganize 
`SeriesInterpolateTests`
bac3492980a3 is described below

commit bac3492980a3e793065a9e9d511ddf0fb66357b3
Author: Ruifeng Zheng 
AuthorDate: Sun Dec 10 09:35:14 2023 +0800

[SPARK-46341][PS][TESTS] Reorganize `SeriesInterpolateTests`

### What changes were proposed in this pull request?
1, Move `SeriesInterpolateTests` to `pyspark.pandas.tests.series.*`
2, also optimize the test by control the combinations

### Why are the changes needed?
move it to the right place

### Does this PR introduce _any_ user-facing change?
no, test-only

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #44274 from zhengruifeng/ps_test_series_interpolate.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 dev/sparktestsupport/modules.py|  4 +--
 .../test_parity_interpolate.py}| 12 ---
 .../test_interpolate.py}   | 37 +-
 3 files changed, 31 insertions(+), 22 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index ca35fdabc0c4..e1193d02ec51 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -735,7 +735,7 @@ pyspark_pandas = Module(
 "pyspark.pandas.tests.test_frame_spark",
 "pyspark.pandas.tests.test_generic_functions",
 "pyspark.pandas.tests.test_frame_interpolate",
-"pyspark.pandas.tests.test_series_interpolate",
+"pyspark.pandas.tests.series.test_interpolate",
 "pyspark.pandas.tests.test_indexops_spark",
 "pyspark.pandas.tests.test_internal",
 "pyspark.pandas.tests.test_namespace",
@@ -1112,7 +1112,7 @@ pyspark_pandas_connect_part2 = Module(
 "pyspark.pandas.tests.connect.indexes.test_parity_base_slow",
 "pyspark.pandas.tests.connect.indexes.test_parity_datetime_property",
 "pyspark.pandas.tests.connect.test_parity_frame_interpolate",
-"pyspark.pandas.tests.connect.test_parity_series_interpolate",
+"pyspark.pandas.tests.connect.series.test_parity_interpolate",
 "pyspark.pandas.tests.connect.resample.test_parity_frame",
 "pyspark.pandas.tests.connect.resample.test_parity_series",
 "pyspark.pandas.tests.connect.window.test_parity_ewm_error",
diff --git 
a/python/pyspark/pandas/tests/connect/test_parity_series_interpolate.py 
b/python/pyspark/pandas/tests/connect/series/test_parity_interpolate.py
similarity index 77%
rename from 
python/pyspark/pandas/tests/connect/test_parity_series_interpolate.py
rename to python/pyspark/pandas/tests/connect/series/test_parity_interpolate.py
index 55a804b280d3..d3f753d0dbf2 100644
--- a/python/pyspark/pandas/tests/connect/test_parity_series_interpolate.py
+++ b/python/pyspark/pandas/tests/connect/series/test_parity_interpolate.py
@@ -16,19 +16,21 @@
 #
 import unittest
 
-from pyspark.pandas.tests.test_series_interpolate import 
SeriesInterpolateTestsMixin
+from pyspark.pandas.tests.series.test_interpolate import SeriesInterpolateMixin
 from pyspark.testing.connectutils import ReusedConnectTestCase
-from pyspark.testing.pandasutils import PandasOnSparkTestUtils, TestUtils
+from pyspark.testing.pandasutils import PandasOnSparkTestUtils
 
 
-class SeriesInterpolateParityTests(
-SeriesInterpolateTestsMixin, TestUtils, PandasOnSparkTestUtils, 
ReusedConnectTestCase
+class SeriesParityInterpolateTests(
+SeriesInterpolateMixin,
+PandasOnSparkTestUtils,
+ReusedConnectTestCase,
 ):
 pass
 
 
 if __name__ == "__main__":
-from pyspark.pandas.tests.connect.test_parity_series_interpolate import *  
# noqa: F401
+from pyspark.pandas.tests.connect.series.test_parity_interpolate import *  
# noqa: F401
 
 try:
 import xmlrunner  # type: ignore[import]
diff --git a/python/pyspark/pandas/tests/test_series_interpolate.py 
b/python/pyspark/pandas/tests/series/test_interpolate.py
similarity index 72%
rename from python/pyspark/pandas/tests/test_series_interpolate.py
rename to python/pyspark/pandas/tests/series/test_interpolate.py
index 0f15bf59dede..0008dd2ee916 100644
--- a/python/pyspark/pandas/tests/test_series_interpolate.py
+++ b/python/pyspark/pandas/tests/series/test_interpolate.py
@@ -18,24 +18,28 @@ import numpy as np
 import pandas as pd
 
 import pyspark.pandas as ps
-from pyspark.testing.pandasutils import PandasOnSparkTestCase, TestUtils
+from pyspark.testing.pandasutils import PandasOnSparkTestCase
 
 
-class SeriesInterpolateTestsMixin:
+class SeriesInterpolateMixin:
 def _test_interpolate(self, pobj):
  

(spark) branch master updated: [SPARK-46340][PS][CONNECT][TESTS] Reorganize `EWMTests`

2023-12-09 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 55d6b51af7cd [SPARK-46340][PS][CONNECT][TESTS] Reorganize `EWMTests`
55d6b51af7cd is described below

commit 55d6b51af7cd9108752eea65e7eef13da01118e8
Author: Ruifeng Zheng 
AuthorDate: Sun Dec 10 08:53:19 2023 +0800

[SPARK-46340][PS][CONNECT][TESTS] Reorganize `EWMTests`

### What changes were proposed in this pull request?
Reorganize `EWMTests`

### Why are the changes needed?
break it into smaller files to be consistent with pandas tests (see 
https://github.com/pandas-dev/pandas/tree/main/pandas/tests/window )

### Does this PR introduce _any_ user-facing change?
no, test-only

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #44273 from zhengruifeng/ps_test_ewm.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 dev/sparktestsupport/modules.py|   8 +-
 .../{test_parity_ewm.py => window/__init__.py} |  21 --
 .../test_parity_ewm_error.py}  |  11 +-
 .../test_parity_ewm_mean.py}   |  11 +-
 .../test_parity_groupby_ewm_mean.py}   |  11 +-
 .../test_parity_ewm.py => window/__init__.py}  |  21 --
 .../pyspark/pandas/tests/window/test_ewm_error.py  |  97 ++
 .../pyspark/pandas/tests/window/test_ewm_mean.py   | 194 +++
 .../test_groupby_ewm_mean.py}  | 215 +
 9 files changed, 328 insertions(+), 261 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index e67cfce0f5c0..ca35fdabc0c4 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -729,7 +729,9 @@ pyspark_pandas = Module(
 "pyspark.pandas.tests.test_default_index",
 "pyspark.pandas.tests.test_expanding",
 "pyspark.pandas.tests.test_extension",
-"pyspark.pandas.tests.test_ewm",
+"pyspark.pandas.tests.window.test_ewm_error",
+"pyspark.pandas.tests.window.test_ewm_mean",
+"pyspark.pandas.tests.window.test_groupby_ewm_mean",
 "pyspark.pandas.tests.test_frame_spark",
 "pyspark.pandas.tests.test_generic_functions",
 "pyspark.pandas.tests.test_frame_interpolate",
@@ -1113,7 +1115,9 @@ pyspark_pandas_connect_part2 = Module(
 "pyspark.pandas.tests.connect.test_parity_series_interpolate",
 "pyspark.pandas.tests.connect.resample.test_parity_frame",
 "pyspark.pandas.tests.connect.resample.test_parity_series",
-"pyspark.pandas.tests.connect.test_parity_ewm",
+"pyspark.pandas.tests.connect.window.test_parity_ewm_error",
+"pyspark.pandas.tests.connect.window.test_parity_ewm_mean",
+"pyspark.pandas.tests.connect.window.test_parity_groupby_ewm_mean",
 "pyspark.pandas.tests.connect.test_parity_rolling",
 "pyspark.pandas.tests.connect.test_parity_expanding",
 
"pyspark.pandas.tests.connect.test_parity_ops_on_diff_frames_groupby_rolling",
diff --git a/python/pyspark/pandas/tests/connect/test_parity_ewm.py 
b/python/pyspark/pandas/tests/connect/window/__init__.py
similarity index 53%
copy from python/pyspark/pandas/tests/connect/test_parity_ewm.py
copy to python/pyspark/pandas/tests/connect/window/__init__.py
index 748728203337..cce3acad34a4 100644
--- a/python/pyspark/pandas/tests/connect/test_parity_ewm.py
+++ b/python/pyspark/pandas/tests/connect/window/__init__.py
@@ -14,24 +14,3 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-import unittest
-
-from pyspark.pandas.tests.test_ewm import EWMTestsMixin
-from pyspark.testing.connectutils import ReusedConnectTestCase
-from pyspark.testing.pandasutils import PandasOnSparkTestUtils, TestUtils
-
-
-class EWMParityTests(EWMTestsMixin, PandasOnSparkTestUtils, 
ReusedConnectTestCase, TestUtils):
-pass
-
-
-if __name__ == "__main__":
-from pyspark.pandas.tests.connect.test_parity_ewm import *  # noqa: F401
-
-try:
-import xmlrunner  # type: ignore[import]
-
-testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", 
verbosity=2)
-except ImportError:
-testRunner = None
-unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/python/pyspark/pandas/tests/connect/test_parity_ewm.py 
b/python/pyspark/pandas/tests/connect/window/test_parity_ewm_error.py
similarity index 81%
copy from python/pyspark/pandas/tests/connect/test_parity_ewm.py
copy to python/pyspark/pandas/tests/connect/window/test_parity_ewm_error.py
index 748728203337..7f6b0e8494cf 100644
--- a/python/pyspark/pandas/tests/connect/test_parity_ewm.py
+++ b/python/pyspark/pand

(spark) branch master updated: [SPARK-46344][CORE] Warn properly when a driver exists successfully but master is disconnected

2023-12-09 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 166df8b1df59 [SPARK-46344][CORE] Warn properly when a driver exists 
successfully but master is disconnected
166df8b1df59 is described below

commit 166df8b1df5965fe3f15fecd5574545746b0b18f
Author: Dongjoon Hyun 
AuthorDate: Sat Dec 9 15:33:48 2023 -0800

[SPARK-46344][CORE] Warn properly when a driver exists successfully but 
master is disconnected

### What changes were proposed in this pull request?

This PR aims to warn properly when a driver exists successfully but master 
is disconnected.

### Why are the changes needed?

In this case, `Master` considers them `Error` eventually.

![Screenshot 2023-12-09 at 3 05 27 
PM](https://github.com/apache/spark/assets/9700541/1323819b-4a0c-466d-afaa-845f507a905e)

**Worker Log**
```
23/12/09 15:13:21 INFO Worker: Driver driver-20231209151301-0003 exited 
successfully
=== Master is disconnected here ===
23/12/09 15:13:53 WARN Worker: Driver driver-20231209151332-0004 exited 
successfully while master is disconnected.
=== A new master starts and is connected here ===
23/12/09 15:17:10 INFO Worker: Driver driver-20231209151707- exited 
successfully
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #44278 from dongjoon-hyun/SPARK-46344.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 7 ++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 1422a1484f8d..785129e1d818 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -878,7 +878,12 @@ private[deploy] class Worker(
   case DriverState.FAILED =>
 logWarning(s"Driver $driverId exited with failure")
   case DriverState.FINISHED =>
-logInfo(s"Driver $driverId exited successfully")
+registrationRetryTimer match {
+  case Some(_) =>
+logWarning(s"Driver $driverId exited successfully while master is 
disconnected.")
+  case _ =>
+logInfo(s"Driver $driverId exited successfully")
+}
   case DriverState.KILLED =>
 logInfo(s"Driver $driverId was killed by user")
   case _ =>


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch branch-3.4 updated: [SPARK-46339][SS] Directory with batch number name should not be treated as metadata log

2023-12-09 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 4e80b3a09407 [SPARK-46339][SS] Directory with batch number name should 
not be treated as metadata log
4e80b3a09407 is described below

commit 4e80b3a09407042f7c596963dcb4fc59e68755ab
Author: Liang-Chi Hsieh 
AuthorDate: Sat Dec 9 15:20:55 2023 -0800

[SPARK-46339][SS] Directory with batch number name should not be treated as 
metadata log

### What changes were proposed in this pull request?

This patch updates the document of `CheckpointFileManager.list` method to 
reflect the fact it is used to return both files and directories to reduce 
confusion.

For the usage like `HDFSMetadataLog` where it assumes returned file status 
by `list` are all files, we add a filter there to avoid confusing error.

### Why are the changes needed?

`HDFSMetadataLog` takes a metadata path as parameter. When it goes to 
retrieves all batches metadata, it calls `CheckpointFileManager.list` to get 
all files under the metadata path. However, currently all implementations of 
`CheckpointFileManager.list` returns all files/directories under the given 
path. So if there is a dictionary with name of batch number (a long value), the 
directory will be returned too and cause trouble when `HDFSMetadataLog` goes to 
read it.

Actually, `CheckpointFileManager.list` method clearly defines that it lists 
the "files" in a path. That's being said, current implementations don't follow 
the doc. We tried to make `list` method implementations only return files but 
some usage (state metadata) of `list` method already break the assumption and 
they use dictionaries returned by `list` method. So we simply update `list` 
method document to explicitly define it returns both files/dictionaries. We add 
a filter in `HDFSMetad [...]

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #44272 from viirya/fix_metadatalog.

Authored-by: Liang-Chi Hsieh 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 75805f07f5caeb01104a7352b02790d03a043ded)
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 28a8b181e96d4ce71e2f9888910214d14a859b7d)
Signed-off-by: Dongjoon Hyun 
---
 .../sql/execution/streaming/CheckpointFileManager.scala  |  4 ++--
 .../spark/sql/execution/streaming/HDFSMetadataLog.scala  |  2 ++
 .../spark/sql/execution/streaming/HDFSMetadataLogSuite.scala | 12 
 3 files changed, 16 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index 013efd3c7bae..b2a3b8d73d4e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -65,10 +65,10 @@ trait CheckpointFileManager {
   /** Open a file for reading, or throw exception if it does not exist. */
   def open(path: Path): FSDataInputStream
 
-  /** List the files in a path that match a filter. */
+  /** List the files/directories in a path that match a filter. */
   def list(path: Path, filter: PathFilter): Array[FileStatus]
 
-  /** List all the files in a path. */
+  /** List all the files/directories in a path. */
   def list(path: Path): Array[FileStatus] = {
 list(path, (_: Path) => true)
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 2b0172bb9555..9a811db679d0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -325,6 +325,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: 
SparkSession, path:
   /** List the available batches on file system. */
   protected def listBatches: Array[Long] = {
 val batchIds = fileManager.list(metadataPath, batchFilesFilter)
+  // Batches must be files
+  .filter(f => f.isFile)
   .map(f => pathToBatchId(f.getPath)) ++
   // Iterate over keySet is not thread safe. We call `toArray` to make a 
copy in the lock to
   // elimiate the race condition.
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
ind

(spark) branch branch-3.5 updated: [SPARK-46339][SS] Directory with batch number name should not be treated as metadata log

2023-12-09 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 28a8b181e96d [SPARK-46339][SS] Directory with batch number name should 
not be treated as metadata log
28a8b181e96d is described below

commit 28a8b181e96d4ce71e2f9888910214d14a859b7d
Author: Liang-Chi Hsieh 
AuthorDate: Sat Dec 9 15:20:55 2023 -0800

[SPARK-46339][SS] Directory with batch number name should not be treated as 
metadata log

### What changes were proposed in this pull request?

This patch updates the document of `CheckpointFileManager.list` method to 
reflect the fact it is used to return both files and directories to reduce 
confusion.

For the usage like `HDFSMetadataLog` where it assumes returned file status 
by `list` are all files, we add a filter there to avoid confusing error.

### Why are the changes needed?

`HDFSMetadataLog` takes a metadata path as parameter. When it goes to 
retrieves all batches metadata, it calls `CheckpointFileManager.list` to get 
all files under the metadata path. However, currently all implementations of 
`CheckpointFileManager.list` returns all files/directories under the given 
path. So if there is a dictionary with name of batch number (a long value), the 
directory will be returned too and cause trouble when `HDFSMetadataLog` goes to 
read it.

Actually, `CheckpointFileManager.list` method clearly defines that it lists 
the "files" in a path. That's being said, current implementations don't follow 
the doc. We tried to make `list` method implementations only return files but 
some usage (state metadata) of `list` method already break the assumption and 
they use dictionaries returned by `list` method. So we simply update `list` 
method document to explicitly define it returns both files/dictionaries. We add 
a filter in `HDFSMetad [...]

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #44272 from viirya/fix_metadatalog.

Authored-by: Liang-Chi Hsieh 
Signed-off-by: Dongjoon Hyun 
(cherry picked from commit 75805f07f5caeb01104a7352b02790d03a043ded)
Signed-off-by: Dongjoon Hyun 
---
 .../sql/execution/streaming/CheckpointFileManager.scala  |  4 ++--
 .../spark/sql/execution/streaming/HDFSMetadataLog.scala  |  2 ++
 .../spark/sql/execution/streaming/HDFSMetadataLogSuite.scala | 12 
 3 files changed, 16 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index ad3212871fc9..677e2fccb6b4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -65,10 +65,10 @@ trait CheckpointFileManager {
   /** Open a file for reading, or throw exception if it does not exist. */
   def open(path: Path): FSDataInputStream
 
-  /** List the files in a path that match a filter. */
+  /** List the files/directories in a path that match a filter. */
   def list(path: Path, filter: PathFilter): Array[FileStatus]
 
-  /** List all the files in a path. */
+  /** List all the files/directories in a path. */
   def list(path: Path): Array[FileStatus] = {
 list(path, (_: Path) => true)
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 2b0172bb9555..9a811db679d0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -325,6 +325,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: 
SparkSession, path:
   /** List the available batches on file system. */
   protected def listBatches: Array[Long] = {
 val batchIds = fileManager.list(metadataPath, batchFilesFilter)
+  // Batches must be files
+  .filter(f => f.isFile)
   .map(f => pathToBatchId(f.getPath)) ++
   // Iterate over keySet is not thread safe. We call `toArray` to make a 
copy in the lock to
   // elimiate the race condition.
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 980d532dd477..08f245135f58 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streamin

(spark) branch master updated: [SPARK-46339][SS] Directory with batch number name should not be treated as metadata log

2023-12-09 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 75805f07f5ca [SPARK-46339][SS] Directory with batch number name should 
not be treated as metadata log
75805f07f5ca is described below

commit 75805f07f5caeb01104a7352b02790d03a043ded
Author: Liang-Chi Hsieh 
AuthorDate: Sat Dec 9 15:20:55 2023 -0800

[SPARK-46339][SS] Directory with batch number name should not be treated as 
metadata log

### What changes were proposed in this pull request?

This patch updates the document of `CheckpointFileManager.list` method to 
reflect the fact it is used to return both files and directories to reduce 
confusion.

For the usage like `HDFSMetadataLog` where it assumes returned file status 
by `list` are all files, we add a filter there to avoid confusing error.

### Why are the changes needed?

`HDFSMetadataLog` takes a metadata path as parameter. When it goes to 
retrieves all batches metadata, it calls `CheckpointFileManager.list` to get 
all files under the metadata path. However, currently all implementations of 
`CheckpointFileManager.list` returns all files/directories under the given 
path. So if there is a dictionary with name of batch number (a long value), the 
directory will be returned too and cause trouble when `HDFSMetadataLog` goes to 
read it.

Actually, `CheckpointFileManager.list` method clearly defines that it lists 
the "files" in a path. That's being said, current implementations don't follow 
the doc. We tried to make `list` method implementations only return files but 
some usage (state metadata) of `list` method already break the assumption and 
they use dictionaries returned by `list` method. So we simply update `list` 
method document to explicitly define it returns both files/dictionaries. We add 
a filter in `HDFSMetad [...]

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #44272 from viirya/fix_metadatalog.

Authored-by: Liang-Chi Hsieh 
Signed-off-by: Dongjoon Hyun 
---
 .../sql/execution/streaming/CheckpointFileManager.scala  |  4 ++--
 .../spark/sql/execution/streaming/HDFSMetadataLog.scala  |  2 ++
 .../spark/sql/execution/streaming/HDFSMetadataLogSuite.scala | 12 
 3 files changed, 16 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
index af2c97b21138..34c5dee0997b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala
@@ -65,10 +65,10 @@ trait CheckpointFileManager {
   /** Open a file for reading, or throw exception if it does not exist. */
   def open(path: Path): FSDataInputStream
 
-  /** List the files in a path that match a filter. */
+  /** List the files/directories in a path that match a filter. */
   def list(path: Path, filter: PathFilter): Array[FileStatus]
 
-  /** List all the files in a path. */
+  /** List all the files/directories in a path. */
   def list(path: Path): Array[FileStatus] = {
 list(path, (_: Path) => true)
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index 79627030e1eb..b3eedbf93f04 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -327,6 +327,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: 
SparkSession, path:
   /** List the available batches on file system. */
   protected def listBatches: Array[Long] = {
 val batchIds = fileManager.list(metadataPath, batchFilesFilter)
+  // Batches must be files
+  .filter(f => f.isFile)
   .map(f => pathToBatchId(f.getPath)) ++
   // Iterate over keySet is not thread safe. We call `toArray` to make a 
copy in the lock to
   // elimiate the race condition.
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 980d532dd477..08f245135f58 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLo

(spark) branch master updated (1af4b658064d -> cae4bdc88161)

2023-12-09 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 1af4b658064d [SPARK-46338][PS][TESTS] Re-enable the `get_item` test 
for `BasicIndexingTests`
 add cae4bdc88161 [SPARK-46322][PYTHON][DOCS] Replace external link with 
internal link for error documentation

No new revisions were added by this update.

Summary of changes:
 python/pyspark/errors_doc_gen.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46338][PS][TESTS] Re-enable the `get_item` test for `BasicIndexingTests`

2023-12-09 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 1af4b658064d [SPARK-46338][PS][TESTS] Re-enable the `get_item` test 
for `BasicIndexingTests`
1af4b658064d is described below

commit 1af4b658064d05b15c4a8409a8aa13df63b64ca4
Author: Haejoon Lee 
AuthorDate: Sat Dec 9 13:46:13 2023 -0800

[SPARK-46338][PS][TESTS] Re-enable the `get_item` test for 
`BasicIndexingTests`

### What changes were proposed in this pull request?

This PR proposes to re-enable the `get_item` test for `BasicIndexingTests`.

### Why are the changes needed?

To improve the test coverage.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Enable the test.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #44271 from itholic/enable_test_getitem.

Authored-by: Haejoon Lee 
Signed-off-by: Dongjoon Hyun 
---
 python/pyspark/pandas/tests/test_indexing.py | 12 ++--
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/python/pyspark/pandas/tests/test_indexing.py 
b/python/pyspark/pandas/tests/test_indexing.py
index a4ca03005b33..eb86c9ffabc4 100644
--- a/python/pyspark/pandas/tests/test_indexing.py
+++ b/python/pyspark/pandas/tests/test_indexing.py
@@ -742,13 +742,13 @@ class IndexingTest(ComparisonTestBase):
 self.assertRaises(AttributeError, lambda: psdf.X)
 
 # not str/unicode
-# TODO?: pdf = pd.DataFrame(np.random.randn(10, 5))
-# TODO?: psdf = ps.from_pandas(pdf)
-# TODO?: self.assert_eq(psdf[0], pdf[0])
-# TODO?: self.assert_eq(psdf[[1, 2]], pdf[[1, 2]])
+pdf = pd.DataFrame(np.random.randn(10, 5))
+psdf = ps.from_pandas(pdf)
+self.assert_eq(psdf[0], pdf[0])
+self.assert_eq(psdf[[1, 2]], pdf[[1, 2]])
 
-# TODO?: self.assertRaises(KeyError, lambda: pdf[8])
-# TODO?: self.assertRaises(KeyError, lambda: pdf[[1, 8]])
+self.assertRaises(KeyError, lambda: pdf[8])
+self.assertRaises(KeyError, lambda: pdf[[1, 8]])
 
 # non-string column names
 pdf = pd.DataFrame(


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-46333][SQL] Replace `IllegalStateException` by `SparkException.internalError` in catalyst

2023-12-09 Thread maxgekk
This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8e95929ac423 [SPARK-46333][SQL] Replace `IllegalStateException` by 
`SparkException.internalError` in catalyst
8e95929ac423 is described below

commit 8e95929ac4238d02dca379837ccf2fbc1cd1926d
Author: Max Gekk 
AuthorDate: Sat Dec 9 12:32:21 2023 +0300

[SPARK-46333][SQL] Replace `IllegalStateException` by 
`SparkException.internalError` in catalyst

### What changes were proposed in this pull request?
In the PR, I propose to replace all `IllegalStateException` exception in 
`catalyst` by `SparkException.internalError`.

### Why are the changes needed?
This is a part of migration onto new error framework and error classes.

### Does this PR introduce _any_ user-facing change?
No, users shouldn't face to `IllegalStateException` in regular cases.

### How was this patch tested?
Using existing GAs.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #44263 from MaxGekk/bind-ref-internal-error.

Authored-by: Max Gekk 
Signed-off-by: Max Gekk 
---
 .../spark/sql/catalyst/analysis/Analyzer.scala |  3 ++-
 .../sql/catalyst/analysis/CheckAnalysis.scala  |  6 ++---
 .../sql/catalyst/analysis/v2ResolutionPlans.scala  | 13 +-
 .../sql/catalyst/catalog/SessionCatalog.scala  |  3 ++-
 .../spark/sql/catalyst/catalog/interface.scala |  3 ++-
 .../sql/catalyst/expressions/BoundAttribute.scala  |  3 ++-
 .../expressions/EquivalentExpressions.scala|  5 ++--
 .../expressions/InterpretedUnsafeProjection.scala  |  4 ++-
 .../expressions/ProjectionOverSchema.scala |  5 ++--
 .../sql/catalyst/expressions/arithmetic.scala  |  4 +--
 .../expressions/codegen/CodeGenerator.scala|  4 +--
 .../catalyst/expressions/codegen/javaCode.scala|  3 ++-
 .../expressions/collectionOperations.scala |  6 ++---
 .../catalyst/expressions/complexTypeCreator.scala  |  5 ++--
 .../sql/catalyst/expressions/csvExpressions.scala  |  3 ++-
 .../sql/catalyst/expressions/jsonExpressions.scala |  5 ++--
 .../catalyst/expressions/namedExpressions.scala|  3 ++-
 .../catalyst/optimizer/DecorrelateInnerQuery.scala |  4 +--
 .../catalyst/optimizer/NestedColumnAliasing.scala  |  3 ++-
 .../optimizer/NormalizeFloatingNumbers.scala   |  5 ++--
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  5 ++--
 .../optimizer/PushExtraPredicateThroughJoin.scala  |  3 ++-
 .../optimizer/ReplaceExceptWithFilter.scala|  3 ++-
 .../spark/sql/catalyst/optimizer/objects.scala |  7 ++---
 .../spark/sql/catalyst/optimizer/subquery.scala|  2 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala |  6 ++---
 .../sql/catalyst/plans/physical/partitioning.scala |  6 +++--
 .../apache/spark/sql/catalyst/trees/TreeNode.scala |  3 ++-
 .../sql/catalyst/util/ArrayBasedMapBuilder.scala   |  3 ++-
 .../spark/sql/catalyst/util/DateTimeUtils.scala|  8 +++---
 .../sql/catalyst/analysis/AnalysisSuite.scala  | 22 ++--
 .../optimizer/ReassignLambdaVariableIDSuite.scala  |  8 --
 .../sql/catalyst/util/DateTimeUtilsSuite.scala | 30 --
 .../sql/execution/WholeStageCodegenSuite.scala | 14 ++
 34 files changed, 126 insertions(+), 84 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e5961b46e743..ec91f9b21a76 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.util.{Failure, Random, Success, Try}
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.catalog._
@@ -3706,7 +3707,7 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
 case u @ UpCast(child, _, _) if !child.resolved => u
 
 case UpCast(_, target, _) if target != DecimalType && 
!target.isInstanceOf[DataType] =>
-  throw new IllegalStateException(
+  throw SparkException.internalError(
 s"UpCast only supports DecimalType as AbstractDataType yet, but 
got: $target")
 
 case UpCast(child, target, walkedTypePath) if target == DecimalType
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index ea1af1d3c8cd..1ce984a39b27 10