Yang Jie created SPARK-25158:
--------------------------------

             Summary: Executor accidentally exit because 
ScriptTransformationWriterThread throws TaskKilledException.
                 Key: SPARK-25158
                 URL: https://issues.apache.org/jira/browse/SPARK-25158
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.3.0, 2.2.0, 2.1.0
            Reporter: Yang Jie


In Baidu production environment, user run Spark-Sql use transform features with 
config 'spark.speculation = true', sometimes job fails and we found many 
Executor Dead through `Executor Tab` of Spark ui and here are some relevant 
sample logs:

Driver Side  Log:

 
{code:java}
18/08/14 16:17:52 INFO TaskSetManager: Starting task 2909.1 in stage 2.0 (TID 
3929, executor.330, executor 7, partition 2909, PROCESS_LOCAL, 6791 bytes)
18/08/14 16:17:53 INFO TaskSetManager: Killing attempt 1 for task 2909.1 in 
stage 2.0 (TID 3929) on executor.330 as the attempt 0 succeeded on 
yq01-inf-nmg01-spark58.yq01.baidu.com
18/08/14 16:17:53 WARN TaskSetManager: Lost task 2909.1 in stage 2.0 (TID 3929, 
executor.330, executor 7): TaskKilled (killed intentionally)
18/08/14 16:17:53 INFO TaskSetManager: Task 2909.1 in stage 2.0 (TID 3929) 
failed, but another instance of the task has already succeeded, so not 
re-queuing the task to be re-executed.
{code}
 

 

Executor Side Log:

 
{code:java}
18/08/14 16:17:52 INFO Executor: Running task 2909.1 in stage 2.0 (TID 3929)
18/08/14 16:17:53 INFO Executor: Executor is trying to kill task 2909.1 in 
stage 2.0 (TID 3929)
18/08/14 16:17:53 ERROR ScriptTransformationWriterThread:
18/08/14 16:17:53 ERROR Utils: Uncaught exception in thread 
Thread-ScriptTransformation-Feed
org.apache.spark.TaskKilledException
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortedIterator.loadNext(UnsafeInMemorySorter.java:295)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.loadNext(UnsafeExternalSorter.java:573)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:161)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:148)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:380)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at 
org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread$$anonfun$run$1.apply$mcV$sp(ScriptTransformation.scala:289)
at 
org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread$$anonfun$run$1.apply(ScriptTransformation.scala:278)
at 
org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread$$anonfun$run$1.apply(ScriptTransformation.scala:278)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
at 
org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread.run(ScriptTransformation.scala:278)
18/08/14 16:17:53 INFO Executor: Executor killed task 2909.1 in stage 2.0 (TID 
3929)
18/08/14 16:17:53 ERROR SparkUncaughtExceptionHandler: Uncaught exception in 
thread Thread[Thread-ScriptTransformation-Feed,5,main]
org.apache.spark.TaskKilledException
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter$SortedIterator.loadNext(UnsafeInMemorySorter.java:295)
at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.loadNext(UnsafeExternalSorter.java:573)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:161)
at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:148)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:380)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at 
org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread$$anonfun$run$1.apply$mcV$sp(ScriptTransformation.scala:289)
at 
org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread$$anonfun$run$1.apply(ScriptTransformation.scala:278)
at 
org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread$$anonfun$run$1.apply(ScriptTransformation.scala:278)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
at 
org.apache.spark.sql.hive.execution.ScriptTransformationWriterThread.run(ScriptTransformation.scala:278)
18/08/14 16:17:53 INFO DiskBlockManager: Shutdown hook called
{code}
 

 

Through analysis log of Task TID 3929, we found that the Task had completed the 
Task Kill process normally on the Driver side and Executor side, the discord 
was SparkUncaughtExceptionHandler captured and handled TaskKilledException, 
then Executor entering the Shutdown process. The following is the code part and 
analysis of the problem process :

 

 
{code:java}
override def run(): Unit = Utils.logUncaughtExceptions {
...

// We can't use Utils.tryWithSafeFinally here because we also need a `catch` 
block, so
// let's use a variable to record whether the `finally` block was hit due to an 
exception
var threwException: Boolean = true
val len = inputSchema.length
try {
iter.map(outputProjection).foreach { row => // line 289
...
}
threwException = false
} catch {
case t: Throwable =>
// An error occurred while writing input, so kill the child process. According 
to the
// Javadoc this call will not throw an exception:
_exception = t
proc.destroy()
throw t
} finally {
...
}
}
{code}
 
 # TaskKill cmd will mark `interrupted=true` of TaskContext, forecach method in 
ScriptTransformationWriterThread trigger TaskKilledException throw and catch by 
ScriptTransformationWriterThread.
 # ScriptTransformationWriterThread catch TaskKilledException ,  assign it to 
`_exception` and re-throw it.
 # ScriptTransformation in TaskRuner found 
`ScriptTransformationWriterThread.exception.isDefined` is true and throw 
TaskKilledException, it will handle by `catch block` in TaskRunner to complete 
TaskKill, we can confirm the conclusion from the log.
 # TaskKilledException re-throw by ScriptTransformationWriterThread will catch 
by Utils.logUncaughtExceptions method, logUncaughtExceptions method will log 
and re-throw it again.
 # ScriptTransformationWriterThread is sub Thread of TaskRuner, belonging to 
main ThreadGroup, so TaskKilledException throw from 
ScriptTransformationWriterThread will captured by SparkUncaughtExceptionHandler 
which registered during Executor start rather than TaskRunner, and 
SparkUncaughtExceptionHandler will log the TaskKilledException then call 
System.exit (SparkExitCode.UNCAUGHT_EXCEPTION) to shutdown Executor

 

>From the above analysis we can be sure that any Throwable is thrown during the 
>ScriptTransformationWriterThread run will lead Executor into the shutdown 
>process, which is not appropriate. Current solution is add case matches for 
>TaskKilledException in ScriptTransformationWriterThread catch block, only log 
>and assign TaskKilledException to `_exception`, no longer rethrow it. We 
>re-run the user job with this change and set `spark.speculation = true`, the 
>problem no longer reappears.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to