Repository: spark Updated Branches: refs/heads/branch-2.1 03cc18ba1 -> 58a8a379d
[SPARK-20920][SQL] ForkJoinPool pools are leaked when writing hive tables with many partitions ## What changes were proposed in this pull request? Don't leave thread pool running from AlterTableRecoverPartitionsCommand DDL command ## How was this patch tested? Existing tests. Author: Sean Owen <so...@cloudera.com> Closes #18216 from srowen/SPARK-20920. (cherry picked from commit 7b7c85ede398996aafffb126440e5f0c67f67210) Signed-off-by: Sean Owen <so...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/58a8a379 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/58a8a379 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/58a8a379 Branch: refs/heads/branch-2.1 Commit: 58a8a379df96aef934ecc63881d4af39c25ac6ff Parents: 03cc18b Author: Sean Owen <so...@cloudera.com> Authored: Tue Jun 13 10:48:07 2017 +0100 Committer: Sean Owen <so...@cloudera.com> Committed: Tue Jun 13 10:48:25 2017 +0100 ---------------------------------------------------------------------- .../spark/sql/execution/command/ddl.scala | 21 ++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/58a8a379/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index f9afe46..a59560e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.command import scala.collection.{GenMap, GenSeq} import scala.collection.parallel.ForkJoinTaskSupport -import scala.concurrent.forkjoin.ForkJoinPool import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration @@ -34,7 +33,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.types._ -import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} // Note: The definition of these commands are based on the ones described in // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL @@ -508,8 +507,15 @@ case class AlterTableRecoverPartitionsCommand( val threshold = spark.conf.get("spark.rdd.parallelListingThreshold", "10").toInt val hadoopConf = spark.sparkContext.hadoopConfiguration val pathFilter = getPathFilter(hadoopConf) - val partitionSpecsAndLocs = scanPartitions(spark, fs, pathFilter, root, Map(), - table.partitionColumnNames, threshold, spark.sessionState.conf.resolver) + + val evalPool = ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8) + val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] = + try { + scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold, + spark.sessionState.conf.resolver, new ForkJoinTaskSupport(evalPool)).seq + } finally { + evalPool.shutdown() + } val total = partitionSpecsAndLocs.length logInfo(s"Found $total partitions in $root") @@ -530,8 +536,6 @@ case class AlterTableRecoverPartitionsCommand( Seq.empty[Row] } - @transient private lazy val evalTaskSupport = new ForkJoinTaskSupport(new ForkJoinPool(8)) - private def scanPartitions( spark: SparkSession, fs: FileSystem, @@ -540,7 +544,8 @@ case class AlterTableRecoverPartitionsCommand( spec: TablePartitionSpec, partitionNames: Seq[String], threshold: Int, - resolver: Resolver): GenSeq[(TablePartitionSpec, Path)] = { + resolver: Resolver, + evalTaskSupport: ForkJoinTaskSupport): GenSeq[(TablePartitionSpec, Path)] = { if (partitionNames.isEmpty) { return Seq(spec -> path) } @@ -564,7 +569,7 @@ case class AlterTableRecoverPartitionsCommand( val value = ExternalCatalogUtils.unescapePathName(ps(1)) if (resolver(columnName, partitionNames.head)) { scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value), - partitionNames.drop(1), threshold, resolver) + partitionNames.drop(1), threshold, resolver, evalTaskSupport) } else { logWarning( s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org