Repository: spark Updated Branches: refs/heads/master 4e3f3cebe -> aff8f15c1
[SPARK-25240][SQL] Fix for a deadlock in RECOVER PARTITIONS ## What changes were proposed in this pull request? In the PR, I propose to not perform recursive parallel listening of files in the `scanPartitions` method because it can cause a deadlock. Instead of that I propose to do `scanPartitions` in parallel for top level partitions only. ## How was this patch tested? I extended an existing test to trigger the deadlock. Author: Maxim Gekk <maxim.g...@databricks.com> Closes #22233 from MaxGekk/fix-recover-partitions. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aff8f15c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aff8f15c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aff8f15c Branch: refs/heads/master Commit: aff8f15c153f8031ceaffa237c60e040c6f8115f Parents: 4e3f3ce Author: Maxim Gekk <maxim.g...@databricks.com> Authored: Tue Aug 28 11:29:05 2018 -0700 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Tue Aug 28 11:29:05 2018 -0700 ---------------------------------------------------------------------- .../spark/sql/execution/command/ddl.scala | 34 +++++------ .../spark/sql/execution/command/DDLSuite.scala | 59 ++++++++++++-------- .../spark/sql/hive/execution/HiveDDLSuite.scala | 15 ++--- 3 files changed, 61 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/aff8f15c/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 7a6f574..e1faece 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command import java.util.Locale import scala.collection.{GenMap, GenSeq} -import scala.concurrent.ExecutionContext +import scala.collection.parallel.ForkJoinTaskSupport import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration @@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.Resolver +import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} @@ -40,7 +40,6 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.types._ import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} -import org.apache.spark.util.ThreadUtils.parmap // Note: The definition of these commands are based on the ones described in // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL @@ -622,9 +621,8 @@ case class AlterTableRecoverPartitionsCommand( val evalPool = ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8) val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] = try { - implicit val ec = ExecutionContext.fromExecutor(evalPool) scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold, - spark.sessionState.conf.resolver) + spark.sessionState.conf.resolver, new ForkJoinTaskSupport(evalPool)).seq } finally { evalPool.shutdown() } @@ -656,13 +654,23 @@ case class AlterTableRecoverPartitionsCommand( spec: TablePartitionSpec, partitionNames: Seq[String], threshold: Int, - resolver: Resolver)(implicit ec: ExecutionContext): Seq[(TablePartitionSpec, Path)] = { + resolver: Resolver, + evalTaskSupport: ForkJoinTaskSupport): GenSeq[(TablePartitionSpec, Path)] = { if (partitionNames.isEmpty) { return Seq(spec -> path) } - val statuses = fs.listStatus(path, filter).toSeq - def handleStatus(st: FileStatus): Seq[(TablePartitionSpec, Path)] = { + val statuses = fs.listStatus(path, filter) + val statusPar: GenSeq[FileStatus] = + if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) { + // parallelize the list of partitions here, then we can have better parallelism later. + val parArray = statuses.par + parArray.tasksupport = evalTaskSupport + parArray + } else { + statuses + } + statusPar.flatMap { st => val name = st.getPath.getName if (st.isDirectory && name.contains("=")) { val ps = name.split("=", 2) @@ -671,7 +679,7 @@ case class AlterTableRecoverPartitionsCommand( val value = ExternalCatalogUtils.unescapePathName(ps(1)) if (resolver(columnName, partitionNames.head)) { scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value), - partitionNames.drop(1), threshold, resolver) + partitionNames.drop(1), threshold, resolver, evalTaskSupport) } else { logWarning( s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it") @@ -682,14 +690,6 @@ case class AlterTableRecoverPartitionsCommand( Seq.empty } } - val result = if (partitionNames.length > 1 && - statuses.length > threshold || partitionNames.length > 2) { - parmap(statuses)(handleStatus _) - } else { - statuses.map(handleStatus) - } - - result.flatten } private def gatherPartitionStats( http://git-wip-us.apache.org/repos/asf/spark/blob/aff8f15c/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 78df1db..f8d98de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -52,23 +52,24 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with Befo protected override def generateTable( catalog: SessionCatalog, name: TableIdentifier, - isDataSource: Boolean = true): CatalogTable = { + isDataSource: Boolean = true, + partitionCols: Seq[String] = Seq("a", "b")): CatalogTable = { val storage = CatalogStorageFormat.empty.copy(locationUri = Some(catalog.defaultTablePath(name))) val metadata = new MetadataBuilder() .putString("key", "value") .build() + val schema = new StructType() + .add("col1", "int", nullable = true, metadata = metadata) + .add("col2", "string") CatalogTable( identifier = name, tableType = CatalogTableType.EXTERNAL, storage = storage, - schema = new StructType() - .add("col1", "int", nullable = true, metadata = metadata) - .add("col2", "string") - .add("a", "int") - .add("b", "int"), + schema = schema.copy( + fields = schema.fields ++ partitionCols.map(StructField(_, IntegerType))), provider = Some("parquet"), - partitionColumnNames = Seq("a", "b"), + partitionColumnNames = partitionCols, createTime = 0L, createVersion = org.apache.spark.SPARK_VERSION, tracksPartitionsInCatalog = true) @@ -176,7 +177,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { protected def generateTable( catalog: SessionCatalog, name: TableIdentifier, - isDataSource: Boolean = true): CatalogTable + isDataSource: Boolean = true, + partitionCols: Seq[String] = Seq("a", "b")): CatalogTable private val escapedIdentifier = "`(.+)`".r @@ -228,8 +230,10 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { private def createTable( catalog: SessionCatalog, name: TableIdentifier, - isDataSource: Boolean = true): Unit = { - catalog.createTable(generateTable(catalog, name, isDataSource), ignoreIfExists = false) + isDataSource: Boolean = true, + partitionCols: Seq[String] = Seq("a", "b")): Unit = { + catalog.createTable( + generateTable(catalog, name, isDataSource, partitionCols), ignoreIfExists = false) } private def createTablePartition( @@ -1131,7 +1135,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } test("alter table: recover partition (parallel)") { - withSQLConf("spark.rdd.parallelListingThreshold" -> "1") { + withSQLConf("spark.rdd.parallelListingThreshold" -> "0") { testRecoverPartitions() } } @@ -1144,23 +1148,32 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } val tableIdent = TableIdentifier("tab1") - createTable(catalog, tableIdent) - val part1 = Map("a" -> "1", "b" -> "5") + createTable(catalog, tableIdent, partitionCols = Seq("a", "b", "c")) + val part1 = Map("a" -> "1", "b" -> "5", "c" -> "19") createTablePartition(catalog, part1, tableIdent) assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1)) - val part2 = Map("a" -> "2", "b" -> "6") + val part2 = Map("a" -> "2", "b" -> "6", "c" -> "31") val root = new Path(catalog.getTableMetadata(tableIdent).location) val fs = root.getFileSystem(spark.sessionState.newHadoopConf()) // valid - fs.mkdirs(new Path(new Path(root, "a=1"), "b=5")) - fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv")) // file - fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "_SUCCESS")) // file - fs.mkdirs(new Path(new Path(root, "A=2"), "B=6")) - fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "b.csv")) // file - fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "c.csv")) // file - fs.createNewFile(new Path(new Path(root, "A=2/B=6"), ".hiddenFile")) // file - fs.mkdirs(new Path(new Path(root, "A=2/B=6"), "_temporary")) + fs.mkdirs(new Path(new Path(new Path(root, "a=1"), "b=5"), "c=19")) + fs.createNewFile(new Path(new Path(root, "a=1/b=5/c=19"), "a.csv")) // file + fs.createNewFile(new Path(new Path(root, "a=1/b=5/c=19"), "_SUCCESS")) // file + + fs.mkdirs(new Path(new Path(new Path(root, "A=2"), "B=6"), "C=31")) + fs.createNewFile(new Path(new Path(root, "A=2/B=6/C=31"), "b.csv")) // file + fs.createNewFile(new Path(new Path(root, "A=2/B=6/C=31"), "c.csv")) // file + fs.createNewFile(new Path(new Path(root, "A=2/B=6/C=31"), ".hiddenFile")) // file + fs.mkdirs(new Path(new Path(root, "A=2/B=6/C=31"), "_temporary")) + + val parts = (10 to 100).map { a => + val part = Map("a" -> a.toString, "b" -> "5", "c" -> "42") + fs.mkdirs(new Path(new Path(new Path(root, s"a=$a"), "b=5"), "c=42")) + fs.createNewFile(new Path(new Path(root, s"a=$a/b=5/c=42"), "a.csv")) // file + createTablePartition(catalog, part, tableIdent) + part + } // invalid fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name @@ -1174,7 +1187,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { try { sql("ALTER TABLE tab1 RECOVER PARTITIONS") assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == - Set(part1, part2)) + Set(part1, part2) ++ parts) if (!isUsingHiveMetastore) { assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1") assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2") http://git-wip-us.apache.org/repos/asf/spark/blob/aff8f15c/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 7288177..6708a50 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -60,7 +60,8 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA protected override def generateTable( catalog: SessionCatalog, name: TableIdentifier, - isDataSource: Boolean): CatalogTable = { + isDataSource: Boolean, + partitionCols: Seq[String] = Seq("a", "b")): CatalogTable = { val storage = if (isDataSource) { val serde = HiveSerDe.sourceToSerDe("parquet") @@ -84,17 +85,17 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA val metadata = new MetadataBuilder() .putString("key", "value") .build() + val schema = new StructType() + .add("col1", "int", nullable = true, metadata = metadata) + .add("col2", "string") CatalogTable( identifier = name, tableType = CatalogTableType.EXTERNAL, storage = storage, - schema = new StructType() - .add("col1", "int", nullable = true, metadata = metadata) - .add("col2", "string") - .add("a", "int") - .add("b", "int"), + schema = schema.copy( + fields = schema.fields ++ partitionCols.map(StructField(_, IntegerType))), provider = if (isDataSource) Some("parquet") else Some("hive"), - partitionColumnNames = Seq("a", "b"), + partitionColumnNames = partitionCols, createTime = 0L, createVersion = org.apache.spark.SPARK_VERSION, tracksPartitionsInCatalog = true) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org