[ https://issues.apache.org/jira/browse/SPARK-33121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dmitry Tverdokhleb updated SPARK-33121: --------------------------------------- Description: Hi. I am trying to migrate from spark 2.4.5 to 3.1.1 and there is a problem in graceful shutdown. Config parameter "spark.streaming.stopGracefullyOnShutdown" is set as "true". Here is the code: {code:java} inputStream.foreachRDD { rdd => rdd.foreachPartition { Thread.sleep(5000) } } {code} I send a SIGTERM signal to stop the spark streaming and after sleeping an exception arises: {noformat} streaming-agg-tds-data_1 | java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@7ca7f0b8 rejected from java.util.concurrent.ThreadPoolExecutor@2474219c[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1] streaming-agg-tds-data_1 | at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) streaming-agg-tds-data_1 | at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) streaming-agg-tds-data_1 | at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) streaming-agg-tds-data_1 | at org.apache.spark.executor.Executor.launchTask(Executor.scala:270) streaming-agg-tds-data_1 | at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93) streaming-agg-tds-data_1 | at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91) streaming-agg-tds-data_1 | at scala.collection.Iterator.foreach(Iterator.scala:941) streaming-agg-tds-data_1 | at scala.collection.Iterator.foreach$(Iterator.scala:941) streaming-agg-tds-data_1 | at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) streaming-agg-tds-data_1 | at scala.collection.IterableLike.foreach(IterableLike.scala:74) streaming-agg-tds-data_1 | at scala.collection.IterableLike.foreach$(IterableLike.scala:73) streaming-agg-tds-data_1 | at scala.collection.AbstractIterable.foreach(Iterable.scala:56) streaming-agg-tds-data_1 | at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91) streaming-agg-tds-data_1 | at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:68) streaming-agg-tds-data_1 | at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) streaming-agg-tds-data_1 | at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213) streaming-agg-tds-data_1 | at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) streaming-agg-tds-data_1 | at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) streaming-agg-tds-data_1 | at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) streaming-agg-tds-data_1 | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) streaming-agg-tds-data_1 | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) streaming-agg-tds-data_1 | at java.lang.Thread.run(Thread.java:748) streaming-agg-tds-data_1 | 2021-04-22 13:33:41 WARN JobGenerator - Timed out while stopping the job generator (timeout = 10000) streaming-agg-tds-data_1 | 2021-04-22 13:33:41 INFO JobGenerator - Waited for jobs to be processed and checkpoints to be written streaming-agg-tds-data_1 | 2021-04-22 13:33:41 INFO JobGenerator - Stopped JobGenerator{noformat} After this exception and "JobGenerator - Stopped JobGenerator", streaming freezes and halts by timeout (Config parameter "hadoop.service.shutdown.timeout"). Besides, there is no problem with the graceful shutdown in spark 2.4.5. was: Hi. I am trying to migrate from spark 2.4.5 to 3.0.1 and there is a problem in graceful shutdown. Config parameter "spark.streaming.stopGracefullyOnShutdown" is set as "true". Here is the code: {code:java} inputStream.foreachRDD { rdd => rdd .foreachPartition { Thread.sleep(5000) } } {code} I send a SIGTERM signal to stop the spark streaming, but exception arrises: {noformat} 2020-10-12 14:12:29 ERROR Inbox - Ignoring error java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) at org.apache.spark.executor.Executor.launchTask(Executor.scala:230) at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93) at org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91) at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:68) at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203) at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748){noformat} A piece of logs: {noformat} ... Calling rdd.mapPartitions ... Sending SIGTERM signal ... 2020-10-12 14:12:22 INFO MyProject - Shutdown hook called 2020-10-12 14:12:22 DEBUG JobScheduler - Stopping JobScheduler 2020-10-12 14:12:22 INFO ReceiverTracker - ReceiverTracker stopped 2020-10-12 14:12:22 INFO JobGenerator - Stopping JobGenerator gracefully 2020-10-12 14:12:22 INFO JobGenerator - Waiting for all received blocks to be consumed for job generation 2020-10-12 14:12:22 INFO JobGenerator - Waited for all received blocks to be consumed for job generation ... Calling rdd.filter 2020-10-12 14:12:29 ERROR Inbox - Ignoring error java.util.concurrent.RejectedExecutionException: Task org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]{noformat} After this exception, streaming freezes and halts by timeout (Config parameter "hadoop.service.shutdown.timeout"). Pay attention, this exception arises only for RDD operations (Like map, filter, etc.), business logic is processing normally without any errors. Besides, there is no problem with graceful shutdown in spark 2.4.5. > Spark does not shutdown gracefully > ---------------------------------- > > Key: SPARK-33121 > URL: https://issues.apache.org/jira/browse/SPARK-33121 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 3.0.1 > Reporter: Dmitry Tverdokhleb > Priority: Major > > Hi. I am trying to migrate from spark 2.4.5 to 3.1.1 and there is a problem > in graceful shutdown. > Config parameter "spark.streaming.stopGracefullyOnShutdown" is set as "true". > Here is the code: > {code:java} > inputStream.foreachRDD { > rdd => > rdd.foreachPartition { > Thread.sleep(5000) > } > } > {code} > I send a SIGTERM signal to stop the spark streaming and after sleeping an > exception arises: > {noformat} > streaming-agg-tds-data_1 | java.util.concurrent.RejectedExecutionException: > Task org.apache.spark.executor.Executor$TaskRunner@7ca7f0b8 rejected from > java.util.concurrent.ThreadPoolExecutor@2474219c[Terminated, pool size = 0, > active threads = 0, queued tasks = 0, completed tasks = 1] > streaming-agg-tds-data_1 | at > java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) > streaming-agg-tds-data_1 | at > java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) > streaming-agg-tds-data_1 | at > java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) > streaming-agg-tds-data_1 | at > org.apache.spark.executor.Executor.launchTask(Executor.scala:270) > streaming-agg-tds-data_1 | at > org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93) > streaming-agg-tds-data_1 | at > org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91) > streaming-agg-tds-data_1 | at > scala.collection.Iterator.foreach(Iterator.scala:941) > streaming-agg-tds-data_1 | at > scala.collection.Iterator.foreach$(Iterator.scala:941) > streaming-agg-tds-data_1 | at > scala.collection.AbstractIterator.foreach(Iterator.scala:1429) > streaming-agg-tds-data_1 | at > scala.collection.IterableLike.foreach(IterableLike.scala:74) > streaming-agg-tds-data_1 | at > scala.collection.IterableLike.foreach$(IterableLike.scala:73) > streaming-agg-tds-data_1 | at > scala.collection.AbstractIterable.foreach(Iterable.scala:56) > streaming-agg-tds-data_1 | at > org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91) > streaming-agg-tds-data_1 | at > org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:68) > streaming-agg-tds-data_1 | at > org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115) > streaming-agg-tds-data_1 | at > org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213) > streaming-agg-tds-data_1 | at > org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100) > streaming-agg-tds-data_1 | at > org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75) > streaming-agg-tds-data_1 | at > org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41) > streaming-agg-tds-data_1 | at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > streaming-agg-tds-data_1 | at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > streaming-agg-tds-data_1 | at java.lang.Thread.run(Thread.java:748) > streaming-agg-tds-data_1 | 2021-04-22 13:33:41 WARN JobGenerator - Timed > out while stopping the job generator (timeout = 10000) > streaming-agg-tds-data_1 | 2021-04-22 13:33:41 INFO JobGenerator - Waited > for jobs to be processed and checkpoints to be written > streaming-agg-tds-data_1 | 2021-04-22 13:33:41 INFO JobGenerator - Stopped > JobGenerator{noformat} > After this exception and "JobGenerator - Stopped JobGenerator", streaming > freezes and halts by timeout (Config parameter > "hadoop.service.shutdown.timeout"). > Besides, there is no problem with the graceful shutdown in spark 2.4.5. > > > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org