Repository: spark Updated Branches: refs/heads/branch-1.6 bb044ec22 -> e4abfe932
[SPARK-11480][CORE][WEBUI] Wrong callsite is displayed when using AsyncRDDActions#takeAsync When we call AsyncRDDActions#takeAsync, actually another DAGScheduler#runJob is called from another thread so we cannot get proper callsite infomation. Following screenshots are before this patch applied and after. Before: <img width="1268" alt="2015-11-04 1 26 40" src="https://cloud.githubusercontent.com/assets/4736016/10914069/0ffc1306-8294-11e5-8e89-c4fadf58dd12.png"> <img width="1258" alt="2015-11-04 1 26 52" src="https://cloud.githubusercontent.com/assets/4736016/10914070/0ffe84ce-8294-11e5-8b2a-69d36276bedb.png"> After: <img width="1268" alt="2015-11-04 0 48 07" src="https://cloud.githubusercontent.com/assets/4736016/10914080/1d8cfb7a-8294-11e5-9e09-ede25c2563e8.png"> <img width="1269" alt="2015-11-04 0 48 26" src="https://cloud.githubusercontent.com/assets/4736016/10914081/1d934e3a-8294-11e5-8b5e-e3dc37aaced3.png"> Author: Kousuke Saruta <saru...@oss.nttdata.co.jp> Closes #9437 from sarutak/SPARK-11480. (cherry picked from commit 30f3cfda1cce8760f15c0a48a8c47f09a5167fca) Signed-off-by: Andrew Or <and...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e4abfe93 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e4abfe93 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e4abfe93 Branch: refs/heads/branch-1.6 Commit: e4abfe9324e3fdebace8b33cc6616f382ad7ac45 Parents: bb044ec Author: Kousuke Saruta <saru...@oss.nttdata.co.jp> Authored: Mon Nov 16 16:59:16 2015 -0800 Committer: Andrew Or <and...@databricks.com> Committed: Mon Nov 16 16:59:23 2015 -0800 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala | 2 ++ 1 file changed, 2 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e4abfe93/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala index ca1eb1f..d5e8536 100644 --- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala @@ -66,6 +66,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi */ def takeAsync(num: Int): FutureAction[Seq[T]] = self.withScope { val f = new ComplexFutureAction[Seq[T]] + val callSite = self.context.getCallSite f.run { // This is a blocking action so we should use "AsyncRDDActions.futureExecutionContext" which @@ -73,6 +74,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi val results = new ArrayBuffer[T](num) val totalParts = self.partitions.length var partsScanned = 0 + self.context.setCallSite(callSite) while (results.size < num && partsScanned < totalParts) { // The number of partitions to try in this iteration. It is ok for this number to be // greater than totalParts because we actually cap it at totalParts in runJob. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org