Repository: spark Updated Branches: refs/heads/master 7fc8881b0 -> 32da87dfa
[SPARK-25286][CORE] Removing the dangerous parmap ## What changes were proposed in this pull request? I propose to remove one of `parmap` methods which accepts an execution context as a parameter. The method should be removed to eliminate any deadlocks that can occur if `parmap` is called recursively on thread pools restricted by size. Closes #22292 from MaxGekk/remove-overloaded-parmap. Authored-by: Maxim Gekk <maxim.g...@databricks.com> Signed-off-by: Xiao Li <gatorsm...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32da87df Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32da87df Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32da87df Branch: refs/heads/master Commit: 32da87dfa451fff677ed9316f740be2abdbff6a4 Parents: 7fc8881 Author: Maxim Gekk <maxim.g...@databricks.com> Authored: Fri Aug 31 10:43:30 2018 -0700 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Fri Aug 31 10:43:30 2018 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/rdd/UnionRDD.scala | 17 ++++++----- .../org/apache/spark/util/ThreadUtils.scala | 32 +++----------------- .../streaming/util/FileBasedWriteAheadLog.scala | 5 +-- 3 files changed, 16 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/32da87df/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala index 4b6f732..60e383a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala @@ -20,13 +20,12 @@ package org.apache.spark.rdd import java.io.{IOException, ObjectOutputStream} import scala.collection.mutable.ArrayBuffer -import scala.concurrent.ExecutionContext +import scala.collection.parallel.ForkJoinTaskSupport import scala.concurrent.forkjoin.ForkJoinPool import scala.reflect.ClassTag import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.util.ThreadUtils.parmap import org.apache.spark.util.Utils /** @@ -60,7 +59,8 @@ private[spark] class UnionPartition[T: ClassTag]( } object UnionRDD { - private[spark] lazy val threadPool = new ForkJoinPool(8) + private[spark] lazy val partitionEvalTaskSupport = + new ForkJoinTaskSupport(new ForkJoinPool(8)) } @DeveloperApi @@ -74,13 +74,14 @@ class UnionRDD[T: ClassTag]( rdds.length > conf.getInt("spark.rdd.parallelListingThreshold", 10) override def getPartitions: Array[Partition] = { - val partitionLengths = if (isPartitionListingParallel) { - implicit val ec = ExecutionContext.fromExecutor(UnionRDD.threadPool) - parmap(rdds)(_.partitions.length) + val parRDDs = if (isPartitionListingParallel) { + val parArray = rdds.par + parArray.tasksupport = UnionRDD.partitionEvalTaskSupport + parArray } else { - rdds.map(_.partitions.length) + rdds } - val array = new Array[Partition](partitionLengths.sum) + val array = new Array[Partition](parRDDs.map(_.partitions.length).seq.sum) var pos = 0 for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) { array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index) http://git-wip-us.apache.org/repos/asf/spark/blob/32da87df/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index f0e5add..cb0c205 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -284,36 +284,12 @@ private[spark] object ThreadUtils { try { implicit val ec = ExecutionContext.fromExecutor(pool) - parmap(in)(f) + val futures = in.map(x => Future(f(x))) + val futureSeq = Future.sequence(futures) + + awaitResult(futureSeq, Duration.Inf) } finally { pool.shutdownNow() } } - - /** - * Transforms input collection by applying the given function to each element in parallel fashion. - * Comparing to the map() method of Scala parallel collections, this method can be interrupted - * at any time. This is useful on canceling of task execution, for example. - * - * @param in - the input collection which should be transformed in parallel. - * @param f - the lambda function will be applied to each element of `in`. - * @param ec - an execution context for parallel applying of the given function `f`. - * @tparam I - the type of elements in the input collection. - * @tparam O - the type of elements in resulted collection. - * @return new collection in which each element was given from the input collection `in` by - * applying the lambda function `f`. - */ - def parmap[I, O, Col[X] <: TraversableLike[X, Col[X]]] - (in: Col[I]) - (f: I => O) - (implicit - cbf: CanBuildFrom[Col[I], Future[O], Col[Future[O]]], // For in.map - cbf2: CanBuildFrom[Col[Future[O]], O, Col[O]], // for Future.sequence - ec: ExecutionContext - ): Col[O] = { - val futures = in.map(x => Future(f(x))) - val futureSeq = Future.sequence(futures) - - awaitResult(futureSeq, Duration.Inf) - } } http://git-wip-us.apache.org/repos/asf/spark/blob/32da87df/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index bba071e..f0161e1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -312,10 +312,11 @@ private[streaming] object FileBasedWriteAheadLog { handler: I => Iterator[O]): Iterator[O] = { val taskSupport = new ExecutionContextTaskSupport(executionContext) val groupSize = taskSupport.parallelismLevel.max(8) - implicit val ec = executionContext source.grouped(groupSize).flatMap { group => - ThreadUtils.parmap(group)(handler) + val parallelCollection = group.par + parallelCollection.tasksupport = taskSupport + parallelCollection.map(handler) }.flatten } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org