[ https://issues.apache.org/jira/browse/SPARK-39218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-39218. ---------------------------------- Fix Version/s: 3.3.0 Resolution: Fixed Issue resolved by pull request 36589 [https://github.com/apache/spark/pull/36589] > Python foreachBatch streaming query cannot be stopped gracefully after pin > thread mode is enabled > ------------------------------------------------------------------------------------------------- > > Key: SPARK-39218 > URL: https://issues.apache.org/jira/browse/SPARK-39218 > Project: Spark > Issue Type: Bug > Components: PySpark, Structured Streaming > Affects Versions: 3.0.3, 3.1.2, 3.2.1, 3.3.0 > Reporter: Hyukjin Kwon > Assignee: Hyukjin Kwon > Priority: Major > Fix For: 3.3.0 > > > For example, > {code} > import time > def func(batch_df, batch_id): > time.sleep(10) > print(batch_df.count()) > q = > spark.readStream.format("rate").load().writeStream.foreachBatch(func).start() > time.sleep(5) > q.stop() > {code} > works find with pinned thread mode is disabled. Whe pinned thread mode is > enabled: > {code} > 22/05/18 15:23:24 ERROR MicroBatchExecution: Query [id = > 2538f8a2-c6e4-44c9-bf38-e6dab555267e, runId = > 1d500478-1d77-46aa-b35a-585264a809b9] terminated with error > py4j.Py4JException: An exception was raised by the Python Proxy. Return > Message: Traceback (most recent call last): > File "/.../spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", > line 617, in _call_proxy > return_value = getattr(self.pool[obj_id], method)(*params) > File "/.../spark/python/pyspark/sql/utils.py", line 272, in call > raise e > File "/.../spark/python/pyspark/sql/utils.py", line 269, in call > self.func(DataFrame(jdf, self.session), batch_id) > File "<stdin>", line 3, in func > File "/.../spark/python/pyspark/sql/dataframe.py", line 804, in count > return int(self._jdf.count()) > File "/.../spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", > line 1321, in __call__ > return_value = get_return_value( > File "/.../spark/python/pyspark/sql/utils.py", line 190, in deco > return f(*a, **kw) > File "/.../spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line > 326, in get_return_value > raise Py4JJavaError( > py4j.protocol.Py4JJavaError: An error occurred while calling o44.count. > : java.lang.InterruptedException > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302) > at > scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:242) > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258) > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:187) > at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:334) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:943) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2227) > {code} -- This message was sent by Atlassian Jira (v8.20.7#820007) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org