[ 
https://issues.apache.org/jira/browse/SPARK-33121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry Tverdokhleb updated SPARK-33121:
---------------------------------------
    Description: 
Hi. I am trying to migrate from spark 2.4.5 to 3.0.1 and there is a problem in 
graceful shutdown.

Config parameter "spark.streaming.stopGracefullyOnShutdown" is set as "true".

Here is the code:
{code:java}
inputStream.foreachRDD {
  rdd =>
    rdd
      .foreachPartition {
        Thread.sleep(5000)
      }
}
{code}
I send a SIGTERM signal to stop the spark streaming, but exception arrises:
{noformat}
2020-10-12 14:12:29 ERROR Inbox - Ignoring error
java.util.concurrent.RejectedExecutionException: Task 
org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from 
java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, 
active threads = 0, queued tasks = 0, completed tasks = 6]
        at 
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
        at 
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
        at 
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
        at org.apache.spark.executor.Executor.launchTask(Executor.scala:230)
        at 
org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93)
        at 
org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at 
org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91)
        at 
org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:68)
        at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
        at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203)
        at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
        at 
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
        at 
org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748){noformat}
A piece of logs:
{noformat}
...
Calling rdd.mapPartitions
...
Sending SIGTERM signal
...
2020-10-12 14:12:22 INFO  MyProject - Shutdown hook called
2020-10-12 14:12:22 DEBUG JobScheduler - Stopping JobScheduler
2020-10-12 14:12:22 INFO  ReceiverTracker - ReceiverTracker stopped
2020-10-12 14:12:22 INFO  JobGenerator - Stopping JobGenerator gracefully
2020-10-12 14:12:22 INFO  JobGenerator - Waiting for all received blocks to be 
consumed for job generation
2020-10-12 14:12:22 INFO  JobGenerator - Waited for all received blocks to be 
consumed for job generation
...
Calling rdd.filter
2020-10-12 14:12:29 ERROR Inbox - Ignoring error
java.util.concurrent.RejectedExecutionException: Task 
org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from 
java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, 
active threads = 0, queued tasks = 0, completed tasks = 6]{noformat}
After this exception, streaming freezes and halts by timeout (Config parameter 
"hadoop.service.shutdown.timeout").

Pay attention, this exception arises only for RDD operations (Like map, filter, 
etc.), business logic is processing normally without any errors.

Besides, there is no problem with graceful shutdown in spark 2.4.5.
  
  
  
  

  was:
Hi. I am trying to migrate from spark 2.4.5 to 3.0.1 and there is a problem in 
graceful shutdown.

Config parameter "spark.streaming.stopGracefullyOnShutdown" is set as "true".

Here is the code:
{code:java}
inputStream.foreachRDD {
  rdd =>
    rdd
      .mapPartitions {
        // Some operations mapPartitions
      }
      .filter {
        // Some operations filter
      }
      .groupBy {
        // Some operatons groupBy
      }
}
{code}
I send a SIGTERM signal to stop the spark streaming, but exception arrises:
{noformat}
2020-10-12 14:12:29 ERROR Inbox - Ignoring error
java.util.concurrent.RejectedExecutionException: Task 
org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from 
java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, 
active threads = 0, queued tasks = 0, completed tasks = 6]
        at 
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
        at 
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
        at 
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
        at org.apache.spark.executor.Executor.launchTask(Executor.scala:230)
        at 
org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93)
        at 
org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at 
org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91)
        at 
org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:68)
        at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
        at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203)
        at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
        at 
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
        at 
org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748){noformat}
A piece of logs:
{noformat}
...
Calling rdd.mapPartitions
...
Sending SIGTERM signal
...
2020-10-12 14:12:22 INFO  MyProject - Shutdown hook called
2020-10-12 14:12:22 DEBUG JobScheduler - Stopping JobScheduler
2020-10-12 14:12:22 INFO  ReceiverTracker - ReceiverTracker stopped
2020-10-12 14:12:22 INFO  JobGenerator - Stopping JobGenerator gracefully
2020-10-12 14:12:22 INFO  JobGenerator - Waiting for all received blocks to be 
consumed for job generation
2020-10-12 14:12:22 INFO  JobGenerator - Waited for all received blocks to be 
consumed for job generation
...
Calling rdd.filter
2020-10-12 14:12:29 ERROR Inbox - Ignoring error
java.util.concurrent.RejectedExecutionException: Task 
org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from 
java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, 
active threads = 0, queued tasks = 0, completed tasks = 6]{noformat}
After this exception, streaming freezes and halts by timeout (Config parameter 
"hadoop.service.shutdown.timeout").

Pay attention, this exception arises only for RDD operations (Like map, filter, 
etc.), business logic is processing normally without any errors.

Besides, there is no problem with graceful shutdown in spark 2.4.5.
  
  
  
  


> Spark does not shutdown gracefully
> ----------------------------------
>
>                 Key: SPARK-33121
>                 URL: https://issues.apache.org/jira/browse/SPARK-33121
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 3.0.1
>            Reporter: Dmitry Tverdokhleb
>            Priority: Major
>
> Hi. I am trying to migrate from spark 2.4.5 to 3.0.1 and there is a problem 
> in graceful shutdown.
> Config parameter "spark.streaming.stopGracefullyOnShutdown" is set as "true".
> Here is the code:
> {code:java}
> inputStream.foreachRDD {
>   rdd =>
>     rdd
>       .foreachPartition {
>         Thread.sleep(5000)
>       }
> }
> {code}
> I send a SIGTERM signal to stop the spark streaming, but exception arrises:
> {noformat}
> 2020-10-12 14:12:29 ERROR Inbox - Ignoring error
> java.util.concurrent.RejectedExecutionException: Task 
> org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from 
> java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, 
> active threads = 0, queued tasks = 0, completed tasks = 6]
>         at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
>         at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
>         at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
>         at org.apache.spark.executor.Executor.launchTask(Executor.scala:230)
>         at 
> org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93)
>         at 
> org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91)
>         at scala.collection.Iterator.foreach(Iterator.scala:941)
>         at scala.collection.Iterator.foreach$(Iterator.scala:941)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>         at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>         at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>         at 
> org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91)
>         at 
> org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:68)
>         at 
> org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
>         at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203)
>         at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
>         at 
> org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
>         at 
> org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748){noformat}
> A piece of logs:
> {noformat}
> ...
> Calling rdd.mapPartitions
> ...
> Sending SIGTERM signal
> ...
> 2020-10-12 14:12:22 INFO  MyProject - Shutdown hook called
> 2020-10-12 14:12:22 DEBUG JobScheduler - Stopping JobScheduler
> 2020-10-12 14:12:22 INFO  ReceiverTracker - ReceiverTracker stopped
> 2020-10-12 14:12:22 INFO  JobGenerator - Stopping JobGenerator gracefully
> 2020-10-12 14:12:22 INFO  JobGenerator - Waiting for all received blocks to 
> be consumed for job generation
> 2020-10-12 14:12:22 INFO  JobGenerator - Waited for all received blocks to be 
> consumed for job generation
> ...
> Calling rdd.filter
> 2020-10-12 14:12:29 ERROR Inbox - Ignoring error
> java.util.concurrent.RejectedExecutionException: Task 
> org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from 
> java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, 
> active threads = 0, queued tasks = 0, completed tasks = 6]{noformat}
> After this exception, streaming freezes and halts by timeout (Config 
> parameter "hadoop.service.shutdown.timeout").
> Pay attention, this exception arises only for RDD operations (Like map, 
> filter, etc.), business logic is processing normally without any errors.
> Besides, there is no problem with graceful shutdown in spark 2.4.5.
>   
>   
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to