[ https://issues.apache.org/jira/browse/SPARK-33121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dmitry Tverdokhleb updated SPARK-33121: --------------------------------------- Description: Hi. I am trying to migrate from spark 2.4.5 to 3.0.1 and there is a problem in graceful shutdown. Config parameter "spark.streaming.stopGracefullyOnShutdown" is set as "true". Here is the code: {code:java} inputStream.foreachRDD { rdd => rdd .foreachPartition { Thread.sleep(5000) } } {code} I send a SIGTERM signal to stop the spark streaming, but exception arrises: {noformat} 2020-10-12 14:12:29 ERROR Inbox - Ignoring error java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at org.apache.spark.executor.Executor.launchTask(Executor.scala:230) at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93) at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91) at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:68) at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748){noformat} A piece of logs: {noformat} ... Calling rdd.mapPartitions ... Sending SIGTERM signal ... 2020-10-12 14:12:22 INFO MyProject - Shutdown hook called 2020-10-12 14:12:22 DEBUG JobScheduler - Stopping JobScheduler 2020-10-12 14:12:22 INFO ReceiverTracker - ReceiverTracker stopped 2020-10-12 14:12:22 INFO JobGenerator - Stopping JobGenerator gracefully 2020-10-12 14:12:22 INFO JobGenerator - Waiting for all received blocks to be consumed for job generation 2020-10-12 14:12:22 INFO JobGenerator - Waited for all received blocks to be consumed for job generation ... Calling rdd.filter 2020-10-12 14:12:29 ERROR Inbox - Ignoring error java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]{noformat} After this exception, streaming freezes and halts by timeout (Config parameter "hadoop.service.shutdown.timeout"). Pay attention, this exception arises only for RDD operations (Like map, filter, etc.), business logic is processing normally without any errors. Besides, there is no problem with graceful shutdown in spark 2.4.5. was: Hi. I am trying to migrate from spark 2.4.5 to 3.0.1 and there is a problem in graceful shutdown. Config parameter "spark.streaming.stopGracefullyOnShutdown" is set as "true". Here is the code: {code:java} inputStream.foreachRDD { rdd => rdd .mapPartitions { // Some operations mapPartitions } .filter { // Some operations filter } .groupBy { // Some operatons groupBy } } {code} I send a SIGTERM signal to stop the spark streaming, but exception arrises: {noformat} 2020-10-12 14:12:29 ERROR Inbox - Ignoring error java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at org.apache.spark.executor.Executor.launchTask(Executor.scala:230) at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93) at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91) at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:68) at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748){noformat} A piece of logs: {noformat} ... Calling rdd.mapPartitions ... Sending SIGTERM signal ... 2020-10-12 14:12:22 INFO MyProject - Shutdown hook called 2020-10-12 14:12:22 DEBUG JobScheduler - Stopping JobScheduler 2020-10-12 14:12:22 INFO ReceiverTracker - ReceiverTracker stopped 2020-10-12 14:12:22 INFO JobGenerator - Stopping JobGenerator gracefully 2020-10-12 14:12:22 INFO JobGenerator - Waiting for all received blocks to be consumed for job generation 2020-10-12 14:12:22 INFO JobGenerator - Waited for all received blocks to be consumed for job generation ... Calling rdd.filter 2020-10-12 14:12:29 ERROR Inbox - Ignoring error java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]{noformat} After this exception, streaming freezes and halts by timeout (Config parameter "hadoop.service.shutdown.timeout"). Pay attention, this exception arises only for RDD operations (Like map, filter, etc.), business logic is processing normally without any errors. Besides, there is no problem with graceful shutdown in spark 2.4.5. > Spark does not shutdown gracefully > ---------------------------------- > > Key: SPARK-33121 > URL: https://issues.apache.org/jira/browse/SPARK-33121 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 3.0.1 > Reporter: Dmitry Tverdokhleb > Priority: Major > > Hi. I am trying to migrate from spark 2.4.5 to 3.0.1 and there is a problem > in graceful shutdown. > Config parameter "spark.streaming.stopGracefullyOnShutdown" is set as "true". > Here is the code: > {code:java} > inputStream.foreachRDD { > rdd => > rdd > .foreachPartition { > Thread.sleep(5000) > } > } > {code} > I send a SIGTERM signal to stop the spark streaming, but exception arrises: > {noformat} > 2020-10-12 14:12:29 ERROR Inbox - Ignoring error > java.util.concurrent.RejectedExecutionException: Task > org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from > java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, > active threads = 0, queued tasks = 0, completed tasks = 6] > at > java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) > at > java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) > at > java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) > at org.apache.spark.executor.Executor.launchTask(Executor.scala:230) > at > org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93) > at > org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91) > at scala.collection.Iterator.foreach(Iterator.scala:941) > at scala.collection.Iterator.foreach$(Iterator.scala:941) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at > org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91) > at > org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:68) > at > org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) > at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203) > at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) > at > org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) > at > org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748){noformat} > A piece of logs: > {noformat} > ... > Calling rdd.mapPartitions > ... > Sending SIGTERM signal > ... > 2020-10-12 14:12:22 INFO MyProject - Shutdown hook called > 2020-10-12 14:12:22 DEBUG JobScheduler - Stopping JobScheduler > 2020-10-12 14:12:22 INFO ReceiverTracker - ReceiverTracker stopped > 2020-10-12 14:12:22 INFO JobGenerator - Stopping JobGenerator gracefully > 2020-10-12 14:12:22 INFO JobGenerator - Waiting for all received blocks to > be consumed for job generation > 2020-10-12 14:12:22 INFO JobGenerator - Waited for all received blocks to be > consumed for job generation > ... > Calling rdd.filter > 2020-10-12 14:12:29 ERROR Inbox - Ignoring error > java.util.concurrent.RejectedExecutionException: Task > org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from > java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, > active threads = 0, queued tasks = 0, completed tasks = 6]{noformat} > After this exception, streaming freezes and halts by timeout (Config > parameter "hadoop.service.shutdown.timeout"). > Pay attention, this exception arises only for RDD operations (Like map, > filter, etc.), business logic is processing normally without any errors. > Besides, there is no problem with graceful shutdown in spark 2.4.5. > > > > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org