(spark) branch master updated: [SPARK-46341][PS][TESTS] Reorganize `SeriesInterpolateTests`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new bac3492980a3 [SPARK-46341][PS][TESTS] Reorganize `SeriesInterpolateTests` bac3492980a3 is described below commit bac3492980a3e793065a9e9d511ddf0fb66357b3 Author: Ruifeng Zheng AuthorDate: Sun Dec 10 09:35:14 2023 +0800 [SPARK-46341][PS][TESTS] Reorganize `SeriesInterpolateTests` ### What changes were proposed in this pull request? 1, Move `SeriesInterpolateTests` to `pyspark.pandas.tests.series.*` 2, also optimize the test by control the combinations ### Why are the changes needed? move it to the right place ### Does this PR introduce _any_ user-facing change? no, test-only ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #44274 from zhengruifeng/ps_test_series_interpolate. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- dev/sparktestsupport/modules.py| 4 +-- .../test_parity_interpolate.py}| 12 --- .../test_interpolate.py} | 37 +- 3 files changed, 31 insertions(+), 22 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index ca35fdabc0c4..e1193d02ec51 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -735,7 +735,7 @@ pyspark_pandas = Module( "pyspark.pandas.tests.test_frame_spark", "pyspark.pandas.tests.test_generic_functions", "pyspark.pandas.tests.test_frame_interpolate", -"pyspark.pandas.tests.test_series_interpolate", +"pyspark.pandas.tests.series.test_interpolate", "pyspark.pandas.tests.test_indexops_spark", "pyspark.pandas.tests.test_internal", "pyspark.pandas.tests.test_namespace", @@ -1112,7 +1112,7 @@ pyspark_pandas_connect_part2 = Module( "pyspark.pandas.tests.connect.indexes.test_parity_base_slow", "pyspark.pandas.tests.connect.indexes.test_parity_datetime_property", "pyspark.pandas.tests.connect.test_parity_frame_interpolate", -"pyspark.pandas.tests.connect.test_parity_series_interpolate", +"pyspark.pandas.tests.connect.series.test_parity_interpolate", "pyspark.pandas.tests.connect.resample.test_parity_frame", "pyspark.pandas.tests.connect.resample.test_parity_series", "pyspark.pandas.tests.connect.window.test_parity_ewm_error", diff --git a/python/pyspark/pandas/tests/connect/test_parity_series_interpolate.py b/python/pyspark/pandas/tests/connect/series/test_parity_interpolate.py similarity index 77% rename from python/pyspark/pandas/tests/connect/test_parity_series_interpolate.py rename to python/pyspark/pandas/tests/connect/series/test_parity_interpolate.py index 55a804b280d3..d3f753d0dbf2 100644 --- a/python/pyspark/pandas/tests/connect/test_parity_series_interpolate.py +++ b/python/pyspark/pandas/tests/connect/series/test_parity_interpolate.py @@ -16,19 +16,21 @@ # import unittest -from pyspark.pandas.tests.test_series_interpolate import SeriesInterpolateTestsMixin +from pyspark.pandas.tests.series.test_interpolate import SeriesInterpolateMixin from pyspark.testing.connectutils import ReusedConnectTestCase -from pyspark.testing.pandasutils import PandasOnSparkTestUtils, TestUtils +from pyspark.testing.pandasutils import PandasOnSparkTestUtils -class SeriesInterpolateParityTests( -SeriesInterpolateTestsMixin, TestUtils, PandasOnSparkTestUtils, ReusedConnectTestCase +class SeriesParityInterpolateTests( +SeriesInterpolateMixin, +PandasOnSparkTestUtils, +ReusedConnectTestCase, ): pass if __name__ == "__main__": -from pyspark.pandas.tests.connect.test_parity_series_interpolate import * # noqa: F401 +from pyspark.pandas.tests.connect.series.test_parity_interpolate import * # noqa: F401 try: import xmlrunner # type: ignore[import] diff --git a/python/pyspark/pandas/tests/test_series_interpolate.py b/python/pyspark/pandas/tests/series/test_interpolate.py similarity index 72% rename from python/pyspark/pandas/tests/test_series_interpolate.py rename to python/pyspark/pandas/tests/series/test_interpolate.py index 0f15bf59dede..0008dd2ee916 100644 --- a/python/pyspark/pandas/tests/test_series_interpolate.py +++ b/python/pyspark/pandas/tests/series/test_interpolate.py @@ -18,24 +18,28 @@ import numpy as np import pandas as pd import pyspark.pandas as ps -from pyspark.testing.pandasutils import PandasOnSparkTestCase, TestUtils +from pyspark.testing.pandasutils import PandasOnSparkTestCase -class SeriesInterpolateTestsMixin: +class SeriesInterpolateMixin: def _test_interpolate(self, pobj):
(spark) branch master updated: [SPARK-46340][PS][CONNECT][TESTS] Reorganize `EWMTests`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 55d6b51af7cd [SPARK-46340][PS][CONNECT][TESTS] Reorganize `EWMTests` 55d6b51af7cd is described below commit 55d6b51af7cd9108752eea65e7eef13da01118e8 Author: Ruifeng Zheng AuthorDate: Sun Dec 10 08:53:19 2023 +0800 [SPARK-46340][PS][CONNECT][TESTS] Reorganize `EWMTests` ### What changes were proposed in this pull request? Reorganize `EWMTests` ### Why are the changes needed? break it into smaller files to be consistent with pandas tests (see https://github.com/pandas-dev/pandas/tree/main/pandas/tests/window ) ### Does this PR introduce _any_ user-facing change? no, test-only ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #44273 from zhengruifeng/ps_test_ewm. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- dev/sparktestsupport/modules.py| 8 +- .../{test_parity_ewm.py => window/__init__.py} | 21 -- .../test_parity_ewm_error.py} | 11 +- .../test_parity_ewm_mean.py} | 11 +- .../test_parity_groupby_ewm_mean.py} | 11 +- .../test_parity_ewm.py => window/__init__.py} | 21 -- .../pyspark/pandas/tests/window/test_ewm_error.py | 97 ++ .../pyspark/pandas/tests/window/test_ewm_mean.py | 194 +++ .../test_groupby_ewm_mean.py} | 215 + 9 files changed, 328 insertions(+), 261 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index e67cfce0f5c0..ca35fdabc0c4 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -729,7 +729,9 @@ pyspark_pandas = Module( "pyspark.pandas.tests.test_default_index", "pyspark.pandas.tests.test_expanding", "pyspark.pandas.tests.test_extension", -"pyspark.pandas.tests.test_ewm", +"pyspark.pandas.tests.window.test_ewm_error", +"pyspark.pandas.tests.window.test_ewm_mean", +"pyspark.pandas.tests.window.test_groupby_ewm_mean", "pyspark.pandas.tests.test_frame_spark", "pyspark.pandas.tests.test_generic_functions", "pyspark.pandas.tests.test_frame_interpolate", @@ -1113,7 +1115,9 @@ pyspark_pandas_connect_part2 = Module( "pyspark.pandas.tests.connect.test_parity_series_interpolate", "pyspark.pandas.tests.connect.resample.test_parity_frame", "pyspark.pandas.tests.connect.resample.test_parity_series", -"pyspark.pandas.tests.connect.test_parity_ewm", +"pyspark.pandas.tests.connect.window.test_parity_ewm_error", +"pyspark.pandas.tests.connect.window.test_parity_ewm_mean", +"pyspark.pandas.tests.connect.window.test_parity_groupby_ewm_mean", "pyspark.pandas.tests.connect.test_parity_rolling", "pyspark.pandas.tests.connect.test_parity_expanding", "pyspark.pandas.tests.connect.test_parity_ops_on_diff_frames_groupby_rolling", diff --git a/python/pyspark/pandas/tests/connect/test_parity_ewm.py b/python/pyspark/pandas/tests/connect/window/__init__.py similarity index 53% copy from python/pyspark/pandas/tests/connect/test_parity_ewm.py copy to python/pyspark/pandas/tests/connect/window/__init__.py index 748728203337..cce3acad34a4 100644 --- a/python/pyspark/pandas/tests/connect/test_parity_ewm.py +++ b/python/pyspark/pandas/tests/connect/window/__init__.py @@ -14,24 +14,3 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import unittest - -from pyspark.pandas.tests.test_ewm import EWMTestsMixin -from pyspark.testing.connectutils import ReusedConnectTestCase -from pyspark.testing.pandasutils import PandasOnSparkTestUtils, TestUtils - - -class EWMParityTests(EWMTestsMixin, PandasOnSparkTestUtils, ReusedConnectTestCase, TestUtils): -pass - - -if __name__ == "__main__": -from pyspark.pandas.tests.connect.test_parity_ewm import * # noqa: F401 - -try: -import xmlrunner # type: ignore[import] - -testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2) -except ImportError: -testRunner = None -unittest.main(testRunner=testRunner, verbosity=2) diff --git a/python/pyspark/pandas/tests/connect/test_parity_ewm.py b/python/pyspark/pandas/tests/connect/window/test_parity_ewm_error.py similarity index 81% copy from python/pyspark/pandas/tests/connect/test_parity_ewm.py copy to python/pyspark/pandas/tests/connect/window/test_parity_ewm_error.py index 748728203337..7f6b0e8494cf 100644 --- a/python/pyspark/pandas/tests/connect/test_parity_ewm.py +++ b/python/pyspark/pand
(spark) branch master updated: [SPARK-46344][CORE] Warn properly when a driver exists successfully but master is disconnected
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 166df8b1df59 [SPARK-46344][CORE] Warn properly when a driver exists successfully but master is disconnected 166df8b1df59 is described below commit 166df8b1df5965fe3f15fecd5574545746b0b18f Author: Dongjoon Hyun AuthorDate: Sat Dec 9 15:33:48 2023 -0800 [SPARK-46344][CORE] Warn properly when a driver exists successfully but master is disconnected ### What changes were proposed in this pull request? This PR aims to warn properly when a driver exists successfully but master is disconnected. ### Why are the changes needed? In this case, `Master` considers them `Error` eventually. ![Screenshot 2023-12-09 at 3 05 27 PM](https://github.com/apache/spark/assets/9700541/1323819b-4a0c-466d-afaa-845f507a905e) **Worker Log** ``` 23/12/09 15:13:21 INFO Worker: Driver driver-20231209151301-0003 exited successfully === Master is disconnected here === 23/12/09 15:13:53 WARN Worker: Driver driver-20231209151332-0004 exited successfully while master is disconnected. === A new master starts and is connected here === 23/12/09 15:17:10 INFO Worker: Driver driver-20231209151707- exited successfully ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44278 from dongjoon-hyun/SPARK-46344. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 7 ++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 1422a1484f8d..785129e1d818 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -878,7 +878,12 @@ private[deploy] class Worker( case DriverState.FAILED => logWarning(s"Driver $driverId exited with failure") case DriverState.FINISHED => -logInfo(s"Driver $driverId exited successfully") +registrationRetryTimer match { + case Some(_) => +logWarning(s"Driver $driverId exited successfully while master is disconnected.") + case _ => +logInfo(s"Driver $driverId exited successfully") +} case DriverState.KILLED => logInfo(s"Driver $driverId was killed by user") case _ => - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch branch-3.4 updated: [SPARK-46339][SS] Directory with batch number name should not be treated as metadata log
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 4e80b3a09407 [SPARK-46339][SS] Directory with batch number name should not be treated as metadata log 4e80b3a09407 is described below commit 4e80b3a09407042f7c596963dcb4fc59e68755ab Author: Liang-Chi Hsieh AuthorDate: Sat Dec 9 15:20:55 2023 -0800 [SPARK-46339][SS] Directory with batch number name should not be treated as metadata log ### What changes were proposed in this pull request? This patch updates the document of `CheckpointFileManager.list` method to reflect the fact it is used to return both files and directories to reduce confusion. For the usage like `HDFSMetadataLog` where it assumes returned file status by `list` are all files, we add a filter there to avoid confusing error. ### Why are the changes needed? `HDFSMetadataLog` takes a metadata path as parameter. When it goes to retrieves all batches metadata, it calls `CheckpointFileManager.list` to get all files under the metadata path. However, currently all implementations of `CheckpointFileManager.list` returns all files/directories under the given path. So if there is a dictionary with name of batch number (a long value), the directory will be returned too and cause trouble when `HDFSMetadataLog` goes to read it. Actually, `CheckpointFileManager.list` method clearly defines that it lists the "files" in a path. That's being said, current implementations don't follow the doc. We tried to make `list` method implementations only return files but some usage (state metadata) of `list` method already break the assumption and they use dictionaries returned by `list` method. So we simply update `list` method document to explicitly define it returns both files/dictionaries. We add a filter in `HDFSMetad [...] ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added test ### Was this patch authored or co-authored using generative AI tooling? No Closes #44272 from viirya/fix_metadatalog. Authored-by: Liang-Chi Hsieh Signed-off-by: Dongjoon Hyun (cherry picked from commit 75805f07f5caeb01104a7352b02790d03a043ded) Signed-off-by: Dongjoon Hyun (cherry picked from commit 28a8b181e96d4ce71e2f9888910214d14a859b7d) Signed-off-by: Dongjoon Hyun --- .../sql/execution/streaming/CheckpointFileManager.scala | 4 ++-- .../spark/sql/execution/streaming/HDFSMetadataLog.scala | 2 ++ .../spark/sql/execution/streaming/HDFSMetadataLogSuite.scala | 12 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index 013efd3c7bae..b2a3b8d73d4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -65,10 +65,10 @@ trait CheckpointFileManager { /** Open a file for reading, or throw exception if it does not exist. */ def open(path: Path): FSDataInputStream - /** List the files in a path that match a filter. */ + /** List the files/directories in a path that match a filter. */ def list(path: Path, filter: PathFilter): Array[FileStatus] - /** List all the files in a path. */ + /** List all the files/directories in a path. */ def list(path: Path): Array[FileStatus] = { list(path, (_: Path) => true) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 2b0172bb9555..9a811db679d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -325,6 +325,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: /** List the available batches on file system. */ protected def listBatches: Array[Long] = { val batchIds = fileManager.list(metadataPath, batchFilesFilter) + // Batches must be files + .filter(f => f.isFile) .map(f => pathToBatchId(f.getPath)) ++ // Iterate over keySet is not thread safe. We call `toArray` to make a copy in the lock to // elimiate the race condition. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala ind
(spark) branch branch-3.5 updated: [SPARK-46339][SS] Directory with batch number name should not be treated as metadata log
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 28a8b181e96d [SPARK-46339][SS] Directory with batch number name should not be treated as metadata log 28a8b181e96d is described below commit 28a8b181e96d4ce71e2f9888910214d14a859b7d Author: Liang-Chi Hsieh AuthorDate: Sat Dec 9 15:20:55 2023 -0800 [SPARK-46339][SS] Directory with batch number name should not be treated as metadata log ### What changes were proposed in this pull request? This patch updates the document of `CheckpointFileManager.list` method to reflect the fact it is used to return both files and directories to reduce confusion. For the usage like `HDFSMetadataLog` where it assumes returned file status by `list` are all files, we add a filter there to avoid confusing error. ### Why are the changes needed? `HDFSMetadataLog` takes a metadata path as parameter. When it goes to retrieves all batches metadata, it calls `CheckpointFileManager.list` to get all files under the metadata path. However, currently all implementations of `CheckpointFileManager.list` returns all files/directories under the given path. So if there is a dictionary with name of batch number (a long value), the directory will be returned too and cause trouble when `HDFSMetadataLog` goes to read it. Actually, `CheckpointFileManager.list` method clearly defines that it lists the "files" in a path. That's being said, current implementations don't follow the doc. We tried to make `list` method implementations only return files but some usage (state metadata) of `list` method already break the assumption and they use dictionaries returned by `list` method. So we simply update `list` method document to explicitly define it returns both files/dictionaries. We add a filter in `HDFSMetad [...] ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added test ### Was this patch authored or co-authored using generative AI tooling? No Closes #44272 from viirya/fix_metadatalog. Authored-by: Liang-Chi Hsieh Signed-off-by: Dongjoon Hyun (cherry picked from commit 75805f07f5caeb01104a7352b02790d03a043ded) Signed-off-by: Dongjoon Hyun --- .../sql/execution/streaming/CheckpointFileManager.scala | 4 ++-- .../spark/sql/execution/streaming/HDFSMetadataLog.scala | 2 ++ .../spark/sql/execution/streaming/HDFSMetadataLogSuite.scala | 12 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index ad3212871fc9..677e2fccb6b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -65,10 +65,10 @@ trait CheckpointFileManager { /** Open a file for reading, or throw exception if it does not exist. */ def open(path: Path): FSDataInputStream - /** List the files in a path that match a filter. */ + /** List the files/directories in a path that match a filter. */ def list(path: Path, filter: PathFilter): Array[FileStatus] - /** List all the files in a path. */ + /** List all the files/directories in a path. */ def list(path: Path): Array[FileStatus] = { list(path, (_: Path) => true) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 2b0172bb9555..9a811db679d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -325,6 +325,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: /** List the available batches on file system. */ protected def listBatches: Array[Long] = { val batchIds = fileManager.list(metadataPath, batchFilesFilter) + // Batches must be files + .filter(f => f.isFile) .map(f => pathToBatchId(f.getPath)) ++ // Iterate over keySet is not thread safe. We call `toArray` to make a copy in the lock to // elimiate the race condition. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 980d532dd477..08f245135f58 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streamin
(spark) branch master updated: [SPARK-46339][SS] Directory with batch number name should not be treated as metadata log
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 75805f07f5ca [SPARK-46339][SS] Directory with batch number name should not be treated as metadata log 75805f07f5ca is described below commit 75805f07f5caeb01104a7352b02790d03a043ded Author: Liang-Chi Hsieh AuthorDate: Sat Dec 9 15:20:55 2023 -0800 [SPARK-46339][SS] Directory with batch number name should not be treated as metadata log ### What changes were proposed in this pull request? This patch updates the document of `CheckpointFileManager.list` method to reflect the fact it is used to return both files and directories to reduce confusion. For the usage like `HDFSMetadataLog` where it assumes returned file status by `list` are all files, we add a filter there to avoid confusing error. ### Why are the changes needed? `HDFSMetadataLog` takes a metadata path as parameter. When it goes to retrieves all batches metadata, it calls `CheckpointFileManager.list` to get all files under the metadata path. However, currently all implementations of `CheckpointFileManager.list` returns all files/directories under the given path. So if there is a dictionary with name of batch number (a long value), the directory will be returned too and cause trouble when `HDFSMetadataLog` goes to read it. Actually, `CheckpointFileManager.list` method clearly defines that it lists the "files" in a path. That's being said, current implementations don't follow the doc. We tried to make `list` method implementations only return files but some usage (state metadata) of `list` method already break the assumption and they use dictionaries returned by `list` method. So we simply update `list` method document to explicitly define it returns both files/dictionaries. We add a filter in `HDFSMetad [...] ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added test ### Was this patch authored or co-authored using generative AI tooling? No Closes #44272 from viirya/fix_metadatalog. Authored-by: Liang-Chi Hsieh Signed-off-by: Dongjoon Hyun --- .../sql/execution/streaming/CheckpointFileManager.scala | 4 ++-- .../spark/sql/execution/streaming/HDFSMetadataLog.scala | 2 ++ .../spark/sql/execution/streaming/HDFSMetadataLogSuite.scala | 12 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index af2c97b21138..34c5dee0997b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -65,10 +65,10 @@ trait CheckpointFileManager { /** Open a file for reading, or throw exception if it does not exist. */ def open(path: Path): FSDataInputStream - /** List the files in a path that match a filter. */ + /** List the files/directories in a path that match a filter. */ def list(path: Path, filter: PathFilter): Array[FileStatus] - /** List all the files in a path. */ + /** List all the files/directories in a path. */ def list(path: Path): Array[FileStatus] = { list(path, (_: Path) => true) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 79627030e1eb..b3eedbf93f04 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -327,6 +327,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: /** List the available batches on file system. */ protected def listBatches: Array[Long] = { val batchIds = fileManager.list(metadataPath, batchFilesFilter) + // Batches must be files + .filter(f => f.isFile) .map(f => pathToBatchId(f.getPath)) ++ // Iterate over keySet is not thread safe. We call `toArray` to make a copy in the lock to // elimiate the race condition. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala index 980d532dd477..08f245135f58 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLo
(spark) branch master updated (1af4b658064d -> cae4bdc88161)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 1af4b658064d [SPARK-46338][PS][TESTS] Re-enable the `get_item` test for `BasicIndexingTests` add cae4bdc88161 [SPARK-46322][PYTHON][DOCS] Replace external link with internal link for error documentation No new revisions were added by this update. Summary of changes: python/pyspark/errors_doc_gen.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-46338][PS][TESTS] Re-enable the `get_item` test for `BasicIndexingTests`
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 1af4b658064d [SPARK-46338][PS][TESTS] Re-enable the `get_item` test for `BasicIndexingTests` 1af4b658064d is described below commit 1af4b658064d05b15c4a8409a8aa13df63b64ca4 Author: Haejoon Lee AuthorDate: Sat Dec 9 13:46:13 2023 -0800 [SPARK-46338][PS][TESTS] Re-enable the `get_item` test for `BasicIndexingTests` ### What changes were proposed in this pull request? This PR proposes to re-enable the `get_item` test for `BasicIndexingTests`. ### Why are the changes needed? To improve the test coverage. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Enable the test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44271 from itholic/enable_test_getitem. Authored-by: Haejoon Lee Signed-off-by: Dongjoon Hyun --- python/pyspark/pandas/tests/test_indexing.py | 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/pyspark/pandas/tests/test_indexing.py b/python/pyspark/pandas/tests/test_indexing.py index a4ca03005b33..eb86c9ffabc4 100644 --- a/python/pyspark/pandas/tests/test_indexing.py +++ b/python/pyspark/pandas/tests/test_indexing.py @@ -742,13 +742,13 @@ class IndexingTest(ComparisonTestBase): self.assertRaises(AttributeError, lambda: psdf.X) # not str/unicode -# TODO?: pdf = pd.DataFrame(np.random.randn(10, 5)) -# TODO?: psdf = ps.from_pandas(pdf) -# TODO?: self.assert_eq(psdf[0], pdf[0]) -# TODO?: self.assert_eq(psdf[[1, 2]], pdf[[1, 2]]) +pdf = pd.DataFrame(np.random.randn(10, 5)) +psdf = ps.from_pandas(pdf) +self.assert_eq(psdf[0], pdf[0]) +self.assert_eq(psdf[[1, 2]], pdf[[1, 2]]) -# TODO?: self.assertRaises(KeyError, lambda: pdf[8]) -# TODO?: self.assertRaises(KeyError, lambda: pdf[[1, 8]]) +self.assertRaises(KeyError, lambda: pdf[8]) +self.assertRaises(KeyError, lambda: pdf[[1, 8]]) # non-string column names pdf = pd.DataFrame( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-46333][SQL] Replace `IllegalStateException` by `SparkException.internalError` in catalyst
This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8e95929ac423 [SPARK-46333][SQL] Replace `IllegalStateException` by `SparkException.internalError` in catalyst 8e95929ac423 is described below commit 8e95929ac4238d02dca379837ccf2fbc1cd1926d Author: Max Gekk AuthorDate: Sat Dec 9 12:32:21 2023 +0300 [SPARK-46333][SQL] Replace `IllegalStateException` by `SparkException.internalError` in catalyst ### What changes were proposed in this pull request? In the PR, I propose to replace all `IllegalStateException` exception in `catalyst` by `SparkException.internalError`. ### Why are the changes needed? This is a part of migration onto new error framework and error classes. ### Does this PR introduce _any_ user-facing change? No, users shouldn't face to `IllegalStateException` in regular cases. ### How was this patch tested? Using existing GAs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44263 from MaxGekk/bind-ref-internal-error. Authored-by: Max Gekk Signed-off-by: Max Gekk --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 ++- .../sql/catalyst/analysis/CheckAnalysis.scala | 6 ++--- .../sql/catalyst/analysis/v2ResolutionPlans.scala | 13 +- .../sql/catalyst/catalog/SessionCatalog.scala | 3 ++- .../spark/sql/catalyst/catalog/interface.scala | 3 ++- .../sql/catalyst/expressions/BoundAttribute.scala | 3 ++- .../expressions/EquivalentExpressions.scala| 5 ++-- .../expressions/InterpretedUnsafeProjection.scala | 4 ++- .../expressions/ProjectionOverSchema.scala | 5 ++-- .../sql/catalyst/expressions/arithmetic.scala | 4 +-- .../expressions/codegen/CodeGenerator.scala| 4 +-- .../catalyst/expressions/codegen/javaCode.scala| 3 ++- .../expressions/collectionOperations.scala | 6 ++--- .../catalyst/expressions/complexTypeCreator.scala | 5 ++-- .../sql/catalyst/expressions/csvExpressions.scala | 3 ++- .../sql/catalyst/expressions/jsonExpressions.scala | 5 ++-- .../catalyst/expressions/namedExpressions.scala| 3 ++- .../catalyst/optimizer/DecorrelateInnerQuery.scala | 4 +-- .../catalyst/optimizer/NestedColumnAliasing.scala | 3 ++- .../optimizer/NormalizeFloatingNumbers.scala | 5 ++-- .../spark/sql/catalyst/optimizer/Optimizer.scala | 5 ++-- .../optimizer/PushExtraPredicateThroughJoin.scala | 3 ++- .../optimizer/ReplaceExceptWithFilter.scala| 3 ++- .../spark/sql/catalyst/optimizer/objects.scala | 7 ++--- .../spark/sql/catalyst/optimizer/subquery.scala| 2 +- .../spark/sql/catalyst/parser/AstBuilder.scala | 6 ++--- .../sql/catalyst/plans/physical/partitioning.scala | 6 +++-- .../apache/spark/sql/catalyst/trees/TreeNode.scala | 3 ++- .../sql/catalyst/util/ArrayBasedMapBuilder.scala | 3 ++- .../spark/sql/catalyst/util/DateTimeUtils.scala| 8 +++--- .../sql/catalyst/analysis/AnalysisSuite.scala | 22 ++-- .../optimizer/ReassignLambdaVariableIDSuite.scala | 8 -- .../sql/catalyst/util/DateTimeUtilsSuite.scala | 30 -- .../sql/execution/WholeStageCodegenSuite.scala | 14 ++ 34 files changed, 126 insertions(+), 84 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e5961b46e743..ec91f9b21a76 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -25,6 +25,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Random, Success, Try} +import org.apache.spark.SparkException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.catalog._ @@ -3706,7 +3707,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor case u @ UpCast(child, _, _) if !child.resolved => u case UpCast(_, target, _) if target != DecimalType && !target.isInstanceOf[DataType] => - throw new IllegalStateException( + throw SparkException.internalError( s"UpCast only supports DecimalType as AbstractDataType yet, but got: $target") case UpCast(child, target, walkedTypePath) if target == DecimalType diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index ea1af1d3c8cd..1ce984a39b27 10