[ https://issues.apache.org/jira/browse/SPARK-33121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dmitry Tverdokhleb updated SPARK-33121: --------------------------------------- Description: Hi. I am trying to migrate from spark 2.4.5 to 3.0.1 and there is a problem in graceful shutdown. Config parameter "spark.streaming.stopGracefullyOnShutdown" is set as "true". Here is the code: {code:java} inputStream.foreachRDD { rdd => rdd .mapPartitions { // Some operations mapPartitions } .filter { // Some operations filter } .groupBy { // Some operatons groupBy } } {code} I send a SIGTERM signal to stop the spark streaming, but exception arrises: {noformat} 2020-10-12 14:12:29 ERROR Inbox - Ignoring error java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at org.apache.spark.executor.Executor.launchTask(Executor.scala:230) at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93) at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91) at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:68) at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748){noformat} A piece of logs: {noformat} ... Calling rdd.mapPartitions ... 2020-10-12 14:12:22 INFO MyProject - Shutdown hook called 2020-10-12 14:12:22 DEBUG JobScheduler - Stopping JobScheduler 2020-10-12 14:12:22 INFO ReceiverTracker - ReceiverTracker stopped 2020-10-12 14:12:22 INFO JobGenerator - Stopping JobGenerator gracefully 2020-10-12 14:12:22 INFO JobGenerator - Waiting for all received blocks to be consumed for job generation 2020-10-12 14:12:22 INFO JobGenerator - Waited for all received blocks to be consumed for job generation ... Calling rdd.filter 2020-10-12 14:12:29 ERROR Inbox - Ignoring error java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]{noformat} This exception arises only for RDD operations (Like map, filter, etc.), not business logic. Besides, there is no problem with graceful shutdown in spark 2.4.5. was: Hi. I have a spark streaming code, like: {code:java} inputStream.foreachRDD { rdd => rdd .mapPartitions { // Some operations mapPartitions } .filter { // Some operations filter } .groupBy { // Some operatons groupBy } } {code} I set the config parameter "spark.streaming.stopGracefullyOnShutdown" as "*true*" and send a SIGTERM signal to stop the spark streaming, but an exception arrises: {noformat} 2020-10-12 14:12:29 ERROR Inbox - Ignoring error java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at org.apache.spark.executor.Executor.launchTask(Executor.scala:230) at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93) at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91) at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:68) at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748){noformat} Logs: {noformat} ... Calling rdd.mapPartitions ... 2020-10-12 14:12:22 INFO MyProject - Shutdown hook called 2020-10-12 14:12:22 DEBUG JobScheduler - Stopping JobScheduler 2020-10-12 14:12:22 INFO ReceiverTracker - ReceiverTracker stopped 2020-10-12 14:12:22 INFO JobGenerator - Stopping JobGenerator gracefully 2020-10-12 14:12:22 INFO JobGenerator - Waiting for all received blocks to be consumed for job generation 2020-10-12 14:12:22 INFO JobGenerator - Waited for all received blocks to be consumed for job generation ... Calling rdd.filter 2020-10-12 14:12:29 ERROR Inbox - Ignoring error java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]{noformat} This exception arises only for RDD operations (Like map, filter, etc.), not business logic. Besides, there is no problem with graceful shutdown in spark 2.4.5. > Spark does not shutdown gracefully > ---------------------------------- > > Key: SPARK-33121 > URL: https://issues.apache.org/jira/browse/SPARK-33121 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 3.0.1 > Reporter: Dmitry Tverdokhleb > Priority: Major > > Hi. I am trying to migrate from spark 2.4.5 to 3.0.1 and there is a problem > in graceful shutdown. > Config parameter "spark.streaming.stopGracefullyOnShutdown" is set as "true". > Here is the code: > {code:java} > inputStream.foreachRDD { > rdd => > rdd > .mapPartitions { > // Some operations mapPartitions > } > .filter { > // Some operations filter > } > .groupBy { > // Some operatons groupBy > } > } > {code} > I send a SIGTERM signal to stop the spark streaming, but exception arrises: > {noformat} > 2020-10-12 14:12:29 ERROR Inbox - Ignoring error > java.util.concurrent.RejectedExecutionException: Task > org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from > java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, > active threads = 0, queued tasks = 0, completed tasks = 6] > at > java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) > at > java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) > at > java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) > at org.apache.spark.executor.Executor.launchTask(Executor.scala:230) > at > org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93) > at > org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91) > at scala.collection.Iterator.foreach(Iterator.scala:941) > at scala.collection.Iterator.foreach$(Iterator.scala:941) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at > org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91) > at > org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:68) > at > org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) > at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203) > at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) > at > org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) > at > org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748){noformat} > A piece of logs: > {noformat} > ... > Calling rdd.mapPartitions > ... > 2020-10-12 14:12:22 INFO MyProject - Shutdown hook called > 2020-10-12 14:12:22 DEBUG JobScheduler - Stopping JobScheduler > 2020-10-12 14:12:22 INFO ReceiverTracker - ReceiverTracker stopped > 2020-10-12 14:12:22 INFO JobGenerator - Stopping JobGenerator gracefully > 2020-10-12 14:12:22 INFO JobGenerator - Waiting for all received blocks to > be consumed for job generation > 2020-10-12 14:12:22 INFO JobGenerator - Waited for all received blocks to be > consumed for job generation > ... > Calling rdd.filter > 2020-10-12 14:12:29 ERROR Inbox - Ignoring error > java.util.concurrent.RejectedExecutionException: Task > org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from > java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, > active threads = 0, queued tasks = 0, completed tasks = 6]{noformat} > This exception arises only for RDD operations (Like map, filter, etc.), not > business logic. > Besides, there is no problem with graceful shutdown in spark 2.4.5. > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org