This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e69857195a6 Revert "[SPARK-47777][PYTHON][SS][TESTS] Add spark 
connect test for python streaming data source"
4e69857195a6 is described below

commit 4e69857195a6f95c22f962e3eed950876036c04f
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Mon May 6 09:38:09 2024 +0900

    Revert "[SPARK-47777][PYTHON][SS][TESTS] Add spark connect test for python 
streaming data source"
    
    This reverts commit 3d2b7fea7fe0a835166ba2b98973300f6844a29b.
---
 dev/sparktestsupport/modules.py                    |  1 -
 .../test_parity_python_streaming_datasource.py     | 39 ----------------------
 .../sql/tests/test_python_streaming_datasource.py  |  6 +++-
 3 files changed, 5 insertions(+), 41 deletions(-)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 8a94187bfced..e73cdd7b80c3 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -1047,7 +1047,6 @@ pyspark_connect = Module(
         "pyspark.sql.tests.connect.test_parity_arrow_grouped_map",
         "pyspark.sql.tests.connect.test_parity_arrow_cogrouped_map",
         "pyspark.sql.tests.connect.test_parity_python_datasource",
-        "pyspark.sql.tests.connect.test_parity_python_streaming_datasource",
         "pyspark.sql.tests.connect.test_utils",
         "pyspark.sql.tests.connect.client.test_artifact",
         "pyspark.sql.tests.connect.client.test_client",
diff --git 
a/python/pyspark/sql/tests/connect/test_parity_python_streaming_datasource.py 
b/python/pyspark/sql/tests/connect/test_parity_python_streaming_datasource.py
deleted file mode 100644
index 65bb4c021f4d..000000000000
--- 
a/python/pyspark/sql/tests/connect/test_parity_python_streaming_datasource.py
+++ /dev/null
@@ -1,39 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-from pyspark.sql.tests.test_python_streaming_datasource import (
-    BasePythonStreamingDataSourceTestsMixin,
-)
-from pyspark.testing.connectutils import ReusedConnectTestCase
-
-
-class PythonStreamingDataSourceParityTests(
-    BasePythonStreamingDataSourceTestsMixin, ReusedConnectTestCase
-):
-    pass
-
-
-if __name__ == "__main__":
-    import unittest
-    from pyspark.sql.tests.connect.test_parity_python_streaming_datasource 
import *  # noqa: F401
-
-    try:
-        import xmlrunner  # type: ignore[import]
-
-        testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", 
verbosity=2)
-    except ImportError:
-        testRunner = None
-    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/python/pyspark/sql/tests/test_python_streaming_datasource.py 
b/python/pyspark/sql/tests/test_python_streaming_datasource.py
index 5125e9ad6dec..90f06223e009 100644
--- a/python/pyspark/sql/tests/test_python_streaming_datasource.py
+++ b/python/pyspark/sql/tests/test_python_streaming_datasource.py
@@ -141,11 +141,15 @@ class BasePythonStreamingDataSourceTestsMixin:
         self.spark.dataSource.register(self._get_test_data_source())
         df = self.spark.readStream.format("TestDataSource").load()
 
+        current_batch_id = -1
+
         def check_batch(df, batch_id):
+            nonlocal current_batch_id
+            current_batch_id = batch_id
             assertDataFrameEqual(df, [Row(batch_id * 2), Row(batch_id * 2 + 
1)])
 
         q = df.writeStream.foreachBatch(check_batch).start()
-        while len(q.recentProgress) < 10:
+        while current_batch_id < 10:
             time.sleep(0.2)
         q.stop()
         q.awaitTermination


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to