HyukjinKwon commented on code in PR #36589:
URL: https://github.com/apache/spark/pull/36589#discussion_r876494195


##########
python/pyspark/sql/tests/test_streaming.py:
##########
@@ -592,6 +592,18 @@ def collectBatch(df, id):
             if q:
                 q.stop()
 
+    def test_streaming_foreachBatch_graceful_stop(self):
+        # SPARK-39218: Make foreachBatch streaming query stop gracefully
+        def func(batch_df, _):
+            time.sleep(10)
+            batch_df.count()
+
+        q = 
self.spark.readStream.format("rate").load().writeStream.foreachBatch(func).start()
+        time.sleep(5)
+        q.stop()
+        time.sleep(15)  # Wait enough for the exception to be propagated if 
exists.
+        self.assertIsNone(q.exception(), "No exception has to be propagated.")

Review Comment:
   I just took a simpler path. I can use the test source approach like 
`ThrowingExceptionInCreateSource` but that would require more complicated test 
cases like 
https://github.com/apache/spark/blob/master/python/pyspark/sql/tests/test_dataframe.py#L1245-L1266.
   
   The test cases here allow false positive but won't be flaky at least.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to