This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 1f66a40e3b85 [SPARK-47734][PYTHON][TESTS][3.4] Fix flaky 
DataFrame.writeStream doctest by stopping streaming query
1f66a40e3b85 is described below

commit 1f66a40e3b85ea2153c021d65be8124920091fa7
Author: Josh Rosen <joshro...@databricks.com>
AuthorDate: Mon Apr 8 07:05:45 2024 +0900

    [SPARK-47734][PYTHON][TESTS][3.4] Fix flaky DataFrame.writeStream doctest 
by stopping streaming query
    
    ### What changes were proposed in this pull request?
    
    Backport of https://github.com/apache/spark/pull/45885.
    
    This PR deflakes the `pyspark.sql.dataframe.DataFrame.writeStream` doctest.
    
    PR https://github.com/apache/spark/pull/45298 aimed to fix that test but 
misdiagnosed the root issue. The problem is not that concurrent tests were 
colliding on a temporary directory. Rather, the issue is specific to the 
`DataFrame.writeStream` test's logic: that test is starting a streaming query 
that writes files to the temporary directory, the exits the temp directory 
context manager without first stopping the streaming query. That creates a race 
condition where the context manager [...]
    
    ```
    File "/__w/spark/spark/python/pyspark/sql/dataframe.py", line ?, in 
pyspark.sql.dataframe.DataFrame.writeStream
    Failed example:
        with tempfile.TemporaryDirectory() as d:
            # Create a table with Rate source.
            df.writeStream.toTable(
                "my_table", checkpointLocation=d)
    Exception raised:
        Traceback (most recent call last):
          File "/usr/lib/python3.11/doctest.py", line 1353, in __run
            exec(compile(example.source, filename, "single",
          File "<doctest pyspark.sql.dataframe.DataFrame.writeStream[3]>", line 
1, in <module>
            with tempfile.TemporaryDirectory() as d:
          File "/usr/lib/python3.11/tempfile.py", line 1043, in __exit__
            self.cleanup()
          File "/usr/lib/python3.11/tempfile.py", line 1047, in cleanup
            self._rmtree(self.name, ignore_errors=self._ignore_cleanup_errors)
          File "/usr/lib/python3.11/tempfile.py", line 1029, in _rmtree
            _rmtree(name, onerror=onerror)
          File "/usr/lib/python3.11/shutil.py", line 738, in rmtree
            onerror(os.rmdir, path, sys.exc_info())
          File "/usr/lib/python3.11/shutil.py", line 736, in rmtree
            os.rmdir(path, dir_fd=dir_fd)
        OSError: [Errno 39] Directory not empty: 
'/__w/spark/spark/python/target/4f062b09-213f-4ac2-a10a-2d704990141b/tmp29irqweq'
    ```
    
    In this PR, I update the doctest to properly stop the streaming query.
    
    ### Why are the changes needed?
    
    Fix flaky test.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, test-only. Small user-facing doc change, but one that is consistent 
with other doctest examples.
    
    ### How was this patch tested?
    
    Manually ran updated test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #45908 from JoshRosen/fix-flaky-writestream-doctest-3.4.
    
    Authored-by: Josh Rosen <joshro...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 python/pyspark/sql/dataframe.py | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 14426c514392..f69d74ad5002 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -527,6 +527,7 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         Examples
         --------
+        >>> import time
         >>> import tempfile
         >>> df = spark.readStream.format("rate").load()
         >>> type(df.writeStream)
@@ -534,9 +535,10 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
 
         >>> with tempfile.TemporaryDirectory() as d:
         ...     # Create a table with Rate source.
-        ...     df.writeStream.toTable(
-        ...         "my_table", checkpointLocation=d) # doctest: +ELLIPSIS
-        <pyspark.sql.streaming.query.StreamingQuery object at 0x...>
+        ...     query = df.writeStream.toTable(
+        ...         "my_table", checkpointLocation=d)
+        ...     time.sleep(3)
+        ...     query.stop()
         """
         return DataStreamWriter(self)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to