zsxwing commented on code in PR #36589:
URL: https://github.com/apache/spark/pull/36589#discussion_r877642301


##########
python/pyspark/sql/tests/test_streaming.py:
##########
@@ -592,6 +592,18 @@ def collectBatch(df, id):
             if q:
                 q.stop()
 
+    def test_streaming_foreachBatch_graceful_stop(self):
+        # SPARK-39218: Make foreachBatch streaming query stop gracefully
+        def func(batch_df, _):
+            time.sleep(10)
+            batch_df.count()
+
+        q = 
self.spark.readStream.format("rate").load().writeStream.foreachBatch(func).start()
+        time.sleep(5)
+        q.stop()
+        time.sleep(15)  # Wait enough for the exception to be propagated if 
exists.

Review Comment:
   this is not needed. `q.stop()` will wait until the streaming thread is dead.



##########
python/pyspark/sql/tests/test_streaming.py:
##########
@@ -592,6 +592,18 @@ def collectBatch(df, id):
             if q:
                 q.stop()
 
+    def test_streaming_foreachBatch_graceful_stop(self):
+        # SPARK-39218: Make foreachBatch streaming query stop gracefully
+        def func(batch_df, _):
+            time.sleep(10)

Review Comment:
   How does this func trigger InterruptedException? I would expect codes like 
`self.spark._jvm.java.lang.Thread.sleep(10000)` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to