[ 
https://issues.apache.org/jira/browse/SPARK-33409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-33409:
------------------------------------

    Assignee:     (was: Apache Spark)

> Spark job can not be killed in BoradcastNestedLoopJoin
> ------------------------------------------------------
>
>                 Key: SPARK-33409
>                 URL: https://issues.apache.org/jira/browse/SPARK-33409
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.1
>            Reporter: zhou xiang
>            Priority: Major
>
>  
> If we kill a spark job in spark web UI, the task content will be marked 
> interrupted, as the code below shows:
> {code:java}
> /**
>  * Kills a task by setting the interrupted flag to true. This relies on the 
> upper level Spark
>  * code and user code to properly handle the flag. This function should be 
> idempotent so it can
>  * be called multiple times.
>  * If interruptThread is true, we will also call Thread.interrupt() on the 
> Task's executor thread.
>  */
> def kill(interruptThread: Boolean, reason: String): Unit = {
>   require(reason != null)
>   _reasonIfKilled = reason
>   if (context != null) {
>     context.markInterrupted(reason)
>   }
>   if (interruptThread && taskThread != null) {
>     taskThread.interrupt()
>   }
> }{code}
>  
> And spark will check the interrupt flag during the loop to stop it. Like this:
> {code:java}
>  /**
>  * :: DeveloperApi ::
>  * An iterator that wraps around an existing iterator to provide task killing 
> functionality.
>  * It works by checking the interrupted flag in [[TaskContext]].
>  */
> @DeveloperApi
> class InterruptibleIterator[+T](val context: TaskContext, val delegate: 
> Iterator[T])
>   extends Iterator[T] {
>   def hasNext: Boolean = {
>     // TODO(aarondav/rxin): Check Thread.interrupted instead of 
> context.interrupted if interrupt
>     // is allowed. The assumption is that Thread.interrupted does not have a 
> memory fence in read
>     // (just a volatile field in C), while context.interrupted is a volatile 
> in the JVM, which
>     // introduces an expensive read fence.
>     context.killTaskIfInterrupted()
>     delegate.hasNext
>   }
>   def next(): T = delegate.next()
> }{code}
> In my case, there is a "not in" in my spark sql,  which leads to the 
> "BoradcastNestedLoopJoin"
> The related code as below:
> {code:java}
> private def leftExistenceJoin(
>     relation: Broadcast[Array[InternalRow]],
>     exists: Boolean): RDD[InternalRow] = {
>   assert(buildSide == BuildRight)
>   streamed.execute().mapPartitionsInternal { streamedIter =>
>     val buildRows = relation.value
>     val joinedRow = new JoinedRow
>     if (condition.isDefined) {
>       streamedIter.filter(l =>
>         buildRows.exists(r => boundCondition(joinedRow(l, r))) == exists
>       )
>     } else if (buildRows.nonEmpty == exists) {
>       streamedIter
>     } else {
>       Iterator.empty
>     }
>   }
> }{code}
> The "streamedIter" and "buildRows" both have millions of records,  the 
> executor get stuck in the join loop, I found something wrong in my sql and 
> try to kill the job, but the executor thread is not interrupted. I have to 
> restart the executor to stop it.
> I think we should also do this check: " context.killTaskIfInterrupted() "  in 
> BoradcastNestedLoopJoin to support real cancel.
>  
>  
> {code:java}
>  {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to