zsxwing commented on code in PR #36589: URL: https://github.com/apache/spark/pull/36589#discussion_r877642301
########## python/pyspark/sql/tests/test_streaming.py: ########## @@ -592,6 +592,18 @@ def collectBatch(df, id): if q: q.stop() + def test_streaming_foreachBatch_graceful_stop(self): + # SPARK-39218: Make foreachBatch streaming query stop gracefully + def func(batch_df, _): + time.sleep(10) + batch_df.count() + + q = self.spark.readStream.format("rate").load().writeStream.foreachBatch(func).start() + time.sleep(5) + q.stop() + time.sleep(15) # Wait enough for the exception to be propagated if exists. Review Comment: this is not needed. `q.stop()` will wait until the streaming thread is dead. ########## python/pyspark/sql/tests/test_streaming.py: ########## @@ -592,6 +592,18 @@ def collectBatch(df, id): if q: q.stop() + def test_streaming_foreachBatch_graceful_stop(self): + # SPARK-39218: Make foreachBatch streaming query stop gracefully + def func(batch_df, _): + time.sleep(10) Review Comment: How does this func trigger InterruptedException? I would expect codes like `self.spark._jvm.java.lang.Thread.sleep(10000)` instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org