[ https://issues.apache.org/jira/browse/SPARK-33409?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-33409: ------------------------------------ Assignee: (was: Apache Spark) > Spark job can not be killed in BoradcastNestedLoopJoin > ------------------------------------------------------ > > Key: SPARK-33409 > URL: https://issues.apache.org/jira/browse/SPARK-33409 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.0.1 > Reporter: zhou xiang > Priority: Major > > > If we kill a spark job in spark web UI, the task content will be marked > interrupted, as the code below shows: > {code:java} > /** > * Kills a task by setting the interrupted flag to true. This relies on the > upper level Spark > * code and user code to properly handle the flag. This function should be > idempotent so it can > * be called multiple times. > * If interruptThread is true, we will also call Thread.interrupt() on the > Task's executor thread. > */ > def kill(interruptThread: Boolean, reason: String): Unit = { > require(reason != null) > _reasonIfKilled = reason > if (context != null) { > context.markInterrupted(reason) > } > if (interruptThread && taskThread != null) { > taskThread.interrupt() > } > }{code} > > And spark will check the interrupt flag during the loop to stop it. Like this: > {code:java} > /** > * :: DeveloperApi :: > * An iterator that wraps around an existing iterator to provide task killing > functionality. > * It works by checking the interrupted flag in [[TaskContext]]. > */ > @DeveloperApi > class InterruptibleIterator[+T](val context: TaskContext, val delegate: > Iterator[T]) > extends Iterator[T] { > def hasNext: Boolean = { > // TODO(aarondav/rxin): Check Thread.interrupted instead of > context.interrupted if interrupt > // is allowed. The assumption is that Thread.interrupted does not have a > memory fence in read > // (just a volatile field in C), while context.interrupted is a volatile > in the JVM, which > // introduces an expensive read fence. > context.killTaskIfInterrupted() > delegate.hasNext > } > def next(): T = delegate.next() > }{code} > In my case, there is a "not in" in my spark sql, which leads to the > "BoradcastNestedLoopJoin" > The related code as below: > {code:java} > private def leftExistenceJoin( > relation: Broadcast[Array[InternalRow]], > exists: Boolean): RDD[InternalRow] = { > assert(buildSide == BuildRight) > streamed.execute().mapPartitionsInternal { streamedIter => > val buildRows = relation.value > val joinedRow = new JoinedRow > if (condition.isDefined) { > streamedIter.filter(l => > buildRows.exists(r => boundCondition(joinedRow(l, r))) == exists > ) > } else if (buildRows.nonEmpty == exists) { > streamedIter > } else { > Iterator.empty > } > } > }{code} > The "streamedIter" and "buildRows" both have millions of records, the > executor get stuck in the join loop, I found something wrong in my sql and > try to kill the job, but the executor thread is not interrupted. I have to > restart the executor to stop it. > I think we should also do this check: " context.killTaskIfInterrupted() " in > BoradcastNestedLoopJoin to support real cancel. > > > {code:java} > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org