[ 
https://issues.apache.org/jira/browse/SPARK-25158?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Jie updated SPARK-25158:
-----------------------------
    External issue URL: https://github.com/apache/spark/pull/22149

> Executor accidentally exit because ScriptTransformationWriterThread throws 
> TaskKilledException.
> -----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-25158
>                 URL: https://issues.apache.org/jira/browse/SPARK-25158
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0, 2.2.0, 2.3.0
>            Reporter: Yang Jie
>            Priority: Major
>
> In production environment, user run Spark-Sql use transform features with 
> config 'spark.speculation = true', sometimes job fails and we found many 
> Executor Dead through `Executor Tab` of Spark ui and here are some relevant 
> sample logs:
> Driver Side  Log:
> {code:java}
> 18/08/14 16:17:52 INFO TaskSetManager: Starting task 2909.1 in stage 2.0 (TID 
> 3929, executor.330, executor 7, partition 2909, PROCESS_LOCAL, 6791 bytes)
> 18/08/14 16:17:53 INFO TaskSetManager: Killing attempt 1 for task 2909.1 in 
> stage 2.0 (TID 3929) on executor.330 as the attempt 0 succeeded on executor.58
> 18/08/14 16:17:53 WARN TaskSetManager: Lost task 2909.1 in stage 2.0 (TID 
> 3929, executor.330, executor 7): TaskKilled (killed intentionally)
> 18/08/14 16:17:53 INFO TaskSetManager: Task 2909.1 in stage 2.0 (TID 3929) 
> failed, but another instance of the task has already succeeded, so not 
> re-queuing the task to be re-executed.
> {code}
>  
> Executor Side Log: 
> {code:java}
> 18/08/14 16:17:52 INFO Executor: Running task 2909.1 in stage 2.0 (TID 3929)
> 18/08/14 16:17:53 INFO Executor: Executor is trying to kill task 2909.1 in 
> stage 2.0 (TID 3929)
> 18/08/14 16:17:53 ERROR ScriptTransformationWriterThread:
> 18/08/14 16:17:53 ERROR Utils: Uncaught exception in thread 
> Thread-ScriptTransformation-Feed
> org.apache.spark.TaskKilledException
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortedIterator.loadNext(UnsafeInMemorySorter.java:295)
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.loadNext(UnsafeExternalSorter.java:573)
> at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:161)
> at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:148)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:380)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at 
> org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread$$anonfun$run$1.apply$mcV$sp(ScriptTransformation.scala:289)
> at 
> org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread$$anonfun$run$1.apply(ScriptTransformation.scala:278)
> at 
> org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread$$anonfun$run$1.apply(ScriptTransformation.scala:278)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
> at 
> org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread.run(ScriptTransformation.scala:278)
> 18/08/14 16:17:53 INFO Executor: Executor killed task 2909.1 in stage 2.0 
> (TID 3929)
> 18/08/14 16:17:53 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
> thread Thread[Thread-ScriptTransformation-Feed,5,main]
> org.apache.spark.TaskKilledException
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortedIterator.loadNext(UnsafeInMemorySorter.java:295)
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.loadNext(UnsafeExternalSorter.java:573)
> at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:161)
> at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:148)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:380)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at 
> org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread$$anonfun$run$1.apply$mcV$sp(ScriptTransformation.scala:289)
> at 
> org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread$$anonfun$run$1.apply(ScriptTransformation.scala:278)
> at 
> org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread$$anonfun$run$1.apply(ScriptTransformation.scala:278)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
> at 
> org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread.run(ScriptTransformation.scala:278)
> 18/08/14 16:17:53 INFO DiskBlockManager: Shutdown hook called
> {code}
>  
>  
> Through analysis log of Task TID 3929, we found that the Task had completed 
> the Task Kill process normally on the Driver side and Executor side, the 
> discord was SparkUncaughtExceptionHandler captured and handled 
> TaskKilledException, then Executor entering the Shutdown process. The 
> following is the code part and analysis of the problem process :
>  
> {code:java}
> override def run(): Unit = Utils.logUncaughtExceptions {
> ...
> // We can't use Utils.tryWithSafeFinally here because we also need a `catch` 
> block, so
> // let's use a variable to record whether the `finally` block was hit due to 
> an exception
> var threwException: Boolean = true
> val len = inputSchema.length
> try {
> iter.map(outputProjection).foreach { row => // line 289
> ...
> }
> threwException = false
> } catch {
> case t: Throwable =>
> // An error occurred while writing input, so kill the child process. 
> According to the
> // Javadoc this call will not throw an exception:
> _exception = t
> proc.destroy()
> throw t
> } finally {
> ...
> }
> }
> {code}
>  
>  # TaskKill cmd will mark `interrupted=true` of TaskContext, forecach method 
> in ScriptTransformationWriterThread trigger TaskKilledException throw and 
> catch by ScriptTransformationWriterThread.
>  # ScriptTransformationWriterThread catch TaskKilledException ,  assign it to 
> `_exception` and re-throw it.
>  # ScriptTransformation in TaskRuner found 
> `ScriptTransformationWriterThread.exception.isDefined` is true and throw 
> TaskKilledException, it will handle by `catch block` in TaskRunner to 
> complete TaskKill, we can confirm the conclusion from the log.
>  # TaskKilledException re-throw by ScriptTransformationWriterThread will 
> catch by Utils.logUncaughtExceptions method, logUncaughtExceptions method 
> will log and re-throw it again.
>  # ScriptTransformationWriterThread is sub Thread of TaskRuner, belonging to 
> main ThreadGroup, so TaskKilledException throw from 
> ScriptTransformationWriterThread will captured by 
> SparkUncaughtExceptionHandler which registered during Executor start rather 
> than TaskRunner, and SparkUncaughtExceptionHandler will log the 
> TaskKilledException then call System.exit (SparkExitCode.UNCAUGHT_EXCEPTION) 
> to shutdown Executor
>  
> From the above analysis we can be sure that any Throwable is thrown during 
> the ScriptTransformationWriterThread run will lead Executor into the shutdown 
> process, which is not appropriate. Current solution is add case matches for 
> TaskKilledException in ScriptTransformationWriterThread catch block, only log 
> and assign TaskKilledException to `_exception`, no longer rethrow it. We 
> re-run the user job with this change and set `spark.speculation = true`, the 
> problem no longer reappears.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to