[ 
https://issues.apache.org/jira/browse/SPARK-33121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry Tverdokhleb updated SPARK-33121:
---------------------------------------
    Description: 
Hi. I am trying to migrate from spark 2.4.5 to 3.0.1 and there is a problem in 
graceful shutdown.

Config parameter "spark.streaming.stopGracefullyOnShutdown" is set as "true".

Here is the code:
{code:java}
inputStream.foreachRDD {
  rdd =>
    rdd
      .mapPartitions {
        // Some operations mapPartitions
      }
      .filter {
        // Some operations filter
      }
      .groupBy {
        // Some operatons groupBy
      }
}
{code}
I send a SIGTERM signal to stop the spark streaming, but exception arrises:
{noformat}
2020-10-12 14:12:29 ERROR Inbox - Ignoring error
java.util.concurrent.RejectedExecutionException: Task 
org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from 
java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, 
active threads = 0, queued tasks = 0, completed tasks = 6]
        at 
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
        at 
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
        at 
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
        at org.apache.spark.executor.Executor.launchTask(Executor.scala:230)
        at 
org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93)
        at 
org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at 
org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91)
        at 
org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:68)
        at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
        at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203)
        at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
        at 
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
        at 
org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748){noformat}
A piece of logs:
{noformat}
...
Calling rdd.mapPartitions
...
2020-10-12 14:12:22 INFO  MyProject - Shutdown hook called
2020-10-12 14:12:22 DEBUG JobScheduler - Stopping JobScheduler
2020-10-12 14:12:22 INFO  ReceiverTracker - ReceiverTracker stopped
2020-10-12 14:12:22 INFO  JobGenerator - Stopping JobGenerator gracefully
2020-10-12 14:12:22 INFO  JobGenerator - Waiting for all received blocks to be 
consumed for job generation
2020-10-12 14:12:22 INFO  JobGenerator - Waited for all received blocks to be 
consumed for job generation
...
Calling rdd.filter
2020-10-12 14:12:29 ERROR Inbox - Ignoring error
java.util.concurrent.RejectedExecutionException: Task 
org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from 
java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, 
active threads = 0, queued tasks = 0, completed tasks = 6]{noformat}
This exception arises only for RDD operations (Like map, filter, etc.), not 
business logic.

Besides, there is no problem with graceful shutdown in spark 2.4.5.
  
  
  
  

  was:
Hi. I have a spark streaming code, like:
{code:java}
inputStream.foreachRDD {
  rdd =>
    rdd
      .mapPartitions {
        // Some operations mapPartitions
      }
      .filter {
        // Some operations filter
      }
      .groupBy {
        // Some operatons groupBy
      }
}
{code}
I set the config parameter "spark.streaming.stopGracefullyOnShutdown" as 
"*true*" and send a SIGTERM signal to stop the spark streaming, but an 
exception arrises:
{noformat}
2020-10-12 14:12:29 ERROR Inbox - Ignoring error
java.util.concurrent.RejectedExecutionException: Task 
org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from 
java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, 
active threads = 0, queued tasks = 0, completed tasks = 6]
        at 
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
        at 
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
        at 
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
        at org.apache.spark.executor.Executor.launchTask(Executor.scala:230)
        at 
org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93)
        at 
org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at 
org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91)
        at 
org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:68)
        at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
        at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203)
        at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
        at 
org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
        at 
org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748){noformat}
Logs:
{noformat}
...
Calling rdd.mapPartitions
...
2020-10-12 14:12:22 INFO  MyProject - Shutdown hook called
2020-10-12 14:12:22 DEBUG JobScheduler - Stopping JobScheduler
2020-10-12 14:12:22 INFO  ReceiverTracker - ReceiverTracker stopped
2020-10-12 14:12:22 INFO  JobGenerator - Stopping JobGenerator gracefully
2020-10-12 14:12:22 INFO  JobGenerator - Waiting for all received blocks to be 
consumed for job generation
2020-10-12 14:12:22 INFO  JobGenerator - Waited for all received blocks to be 
consumed for job generation
...
Calling rdd.filter
2020-10-12 14:12:29 ERROR Inbox - Ignoring error
java.util.concurrent.RejectedExecutionException: Task 
org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from 
java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, 
active threads = 0, queued tasks = 0, completed tasks = 6]{noformat}
This exception arises only for RDD operations (Like map, filter, etc.), not 
business logic.

Besides, there is no problem with graceful shutdown in spark 2.4.5.
  
  
  
 


> Spark does not shutdown gracefully
> ----------------------------------
>
>                 Key: SPARK-33121
>                 URL: https://issues.apache.org/jira/browse/SPARK-33121
>             Project: Spark
>          Issue Type: Bug
>          Components: DStreams
>    Affects Versions: 3.0.1
>            Reporter: Dmitry Tverdokhleb
>            Priority: Major
>
> Hi. I am trying to migrate from spark 2.4.5 to 3.0.1 and there is a problem 
> in graceful shutdown.
> Config parameter "spark.streaming.stopGracefullyOnShutdown" is set as "true".
> Here is the code:
> {code:java}
> inputStream.foreachRDD {
>   rdd =>
>     rdd
>       .mapPartitions {
>         // Some operations mapPartitions
>       }
>       .filter {
>         // Some operations filter
>       }
>       .groupBy {
>         // Some operatons groupBy
>       }
> }
> {code}
> I send a SIGTERM signal to stop the spark streaming, but exception arrises:
> {noformat}
> 2020-10-12 14:12:29 ERROR Inbox - Ignoring error
> java.util.concurrent.RejectedExecutionException: Task 
> org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from 
> java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, 
> active threads = 0, queued tasks = 0, completed tasks = 6]
>         at 
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
>         at 
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
>         at 
> java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
>         at org.apache.spark.executor.Executor.launchTask(Executor.scala:230)
>         at 
> org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1(LocalSchedulerBackend.scala:93)
>         at 
> org.apache.spark.scheduler.local.LocalEndpoint.$anonfun$reviveOffers$1$adapted(LocalSchedulerBackend.scala:91)
>         at scala.collection.Iterator.foreach(Iterator.scala:941)
>         at scala.collection.Iterator.foreach$(Iterator.scala:941)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
>         at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>         at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>         at 
> org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:91)
>         at 
> org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:68)
>         at 
> org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
>         at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:203)
>         at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
>         at 
> org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
>         at 
> org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748){noformat}
> A piece of logs:
> {noformat}
> ...
> Calling rdd.mapPartitions
> ...
> 2020-10-12 14:12:22 INFO  MyProject - Shutdown hook called
> 2020-10-12 14:12:22 DEBUG JobScheduler - Stopping JobScheduler
> 2020-10-12 14:12:22 INFO  ReceiverTracker - ReceiverTracker stopped
> 2020-10-12 14:12:22 INFO  JobGenerator - Stopping JobGenerator gracefully
> 2020-10-12 14:12:22 INFO  JobGenerator - Waiting for all received blocks to 
> be consumed for job generation
> 2020-10-12 14:12:22 INFO  JobGenerator - Waited for all received blocks to be 
> consumed for job generation
> ...
> Calling rdd.filter
> 2020-10-12 14:12:29 ERROR Inbox - Ignoring error
> java.util.concurrent.RejectedExecutionException: Task 
> org.apache.spark.executor.Executor$TaskRunner@68a46d91 rejected from 
> java.util.concurrent.ThreadPoolExecutor@2b13eeda[Terminated, pool size = 0, 
> active threads = 0, queued tasks = 0, completed tasks = 6]{noformat}
> This exception arises only for RDD operations (Like map, filter, etc.), not 
> business logic.
> Besides, there is no problem with graceful shutdown in spark 2.4.5.
>   
>   
>   
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to