HyukjinKwon commented on code in PR #36589: URL: https://github.com/apache/spark/pull/36589#discussion_r876494195
########## python/pyspark/sql/tests/test_streaming.py: ########## @@ -592,6 +592,18 @@ def collectBatch(df, id): if q: q.stop() + def test_streaming_foreachBatch_graceful_stop(self): + # SPARK-39218: Make foreachBatch streaming query stop gracefully + def func(batch_df, _): + time.sleep(10) + batch_df.count() + + q = self.spark.readStream.format("rate").load().writeStream.foreachBatch(func).start() + time.sleep(5) + q.stop() + time.sleep(15) # Wait enough for the exception to be propagated if exists. + self.assertIsNone(q.exception(), "No exception has to be propagated.") Review Comment: I just took a simpler path. I can use the test source approach like `ThrowingExceptionInCreateSource` but that would require more complicated test cases like https://github.com/apache/spark/blob/master/python/pyspark/sql/tests/test_dataframe.py#L1245-L1266. The test cases here allow false positive but won't be flaky at least. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org