Repository: spark Updated Branches: refs/heads/branch-1.6 4381e2121 -> 6ea7d4bd3
[SPARK-16313][SQL][BRANCH-1.6] Spark should not silently drop exceptions in file listing ## What changes were proposed in this pull request? Spark silently drops exceptions during file listing. This is a very bad behavior because it can mask legitimate errors and the resulting plan will silently have 0 rows. This patch changes it to not silently drop the errors. After making partition discovery not silently drop exceptions, HiveMetastoreCatalog can trigger partition discovery on empty tables, which cause FileNotFoundExceptions (these Exceptions were dropped by partition discovery silently). To address this issue, this PR introduces two **hacks** to workaround the issues. These two hacks try to avoid of triggering partition discovery on empty tables in HiveMetastoreCatalog. ## How was this patch tested? Manually tested. **Note: This is a backport of https://github.com/apache/spark/pull/13987** Author: Yin Huai <yh...@databricks.com> Closes #14139 from yhuai/SPARK-16313-branch-1.6. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ea7d4bd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ea7d4bd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ea7d4bd Branch: refs/heads/branch-1.6 Commit: 6ea7d4bd393911d2d15b61e78df7473a7ea9b161 Parents: 4381e21 Author: Yin Huai <yh...@databricks.com> Authored: Thu Jul 14 12:00:31 2016 -0700 Committer: Yin Huai <yh...@databricks.com> Committed: Thu Jul 14 12:00:31 2016 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/sources/interfaces.scala | 6 ++-- .../org/apache/spark/sql/SQLQuerySuite.scala | 2 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 30 +++++++++++++++++--- .../sql/hive/MetastoreDataSourcesSuite.scala | 23 ++++++++++++--- 4 files changed, 49 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6ea7d4bd/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index ce5f3dc..5aba55c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -453,9 +453,9 @@ abstract class HadoopFsRelation private[sql]( val jobConf = new JobConf(hadoopConf, this.getClass()) val pathFilter = FileInputFormat.getInputPathFilter(jobConf) if (pathFilter != null) { - Try(fs.listStatus(qualified, pathFilter)).getOrElse(Array.empty) + fs.listStatus(qualified, pathFilter) } else { - Try(fs.listStatus(qualified)).getOrElse(Array.empty) + fs.listStatus(qualified) } }.filterNot { status => val name = status.getPath.getName @@ -903,7 +903,7 @@ private[sql] object HadoopFsRelation extends Logging { val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(serializableConfiguration.value) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - Try(listLeafFiles(fs, fs.getFileStatus(qualified))).getOrElse(Array.empty) + listLeafFiles(fs, fs.getFileStatus(qualified)) }.map { status => FakeFileStatus( status.getPath.toString, http://git-wip-us.apache.org/repos/asf/spark/blob/6ea7d4bd/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 6fec580..be23043 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1757,7 +1757,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { val e3 = intercept[AnalysisException] { sql("select * from json.invalid_file") } - assert(e3.message.contains("No input paths specified")) + assert(e3.message.contains("invalid_file does not exist")) } test("SortMergeJoin returns wrong results when using UnsafeRows") { http://git-wip-us.apache.org/repos/asf/spark/blob/6ea7d4bd/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 0562e33..03720c9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -273,6 +273,22 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive serdeProperties = options) } + def hasPartitionColumns(relation: HadoopFsRelation): Boolean = { + try { + // HACK for "[SPARK-16313][SQL][BRANCH-1.6] Spark should not silently drop exceptions in + // file listing" https://github.com/apache/spark/pull/14139 + // Calling hadoopFsRelation.partitionColumns will trigger the refresh call of + // the HadoopFsRelation, which will validate input paths. However, when we create + // an empty table, the dir of the table has not been created, which will + // cause a FileNotFoundException. So, at here we will catch the FileNotFoundException + // and return false. + relation.partitionColumns.nonEmpty + } catch { + case _: java.io.FileNotFoundException => + false + } + } + def newHiveCompatibleMetastoreTable(relation: HadoopFsRelation, serde: HiveSerDe): HiveTable = { def schemaToHiveColumn(schema: StructType): Seq[HiveColumn] = { schema.map { field => @@ -284,12 +300,18 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive } assert(partitionColumns.isEmpty) - assert(relation.partitionColumns.isEmpty) + assert(!hasPartitionColumns(relation)) HiveTable( specifiedDatabase = Option(dbName), name = tblName, - schema = schemaToHiveColumn(relation.schema), + // HACK for "[SPARK-16313][SQL][BRANCH-1.6] Spark should not silently drop exceptions in + // file listing" https://github.com/apache/spark/pull/14139 + // Since the table is not partitioned, we use dataSchema instead of using schema. + // Using schema which will trigger partition discovery on the path that + // may not be created causing FileNotFoundException. So, we just get dataSchema + // instead of calling relation.schema. + schema = schemaToHiveColumn(relation.dataSchema), partitionColumns = Nil, tableType = tableType, properties = tableProperties.toMap, @@ -312,14 +334,14 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive (None, message) case (Some(serde), relation: HadoopFsRelation) - if relation.paths.length == 1 && relation.partitionColumns.isEmpty => + if relation.paths.length == 1 && !hasPartitionColumns(relation) => val hiveTable = newHiveCompatibleMetastoreTable(relation, serde) val message = s"Persisting data source relation $qualifiedTableName with a single input path " + s"into Hive metastore in Hive compatible format. Input path: ${relation.paths.head}." (Some(hiveTable), message) - case (Some(serde), relation: HadoopFsRelation) if relation.partitionColumns.nonEmpty => + case (Some(serde), relation: HadoopFsRelation) if hasPartitionColumns(relation) => val message = s"Persisting partitioned data source relation $qualifiedTableName into " + "Hive metastore in Spark SQL specific format, which is NOT compatible with Hive. " + http://git-wip-us.apache.org/repos/asf/spark/blob/6ea7d4bd/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index cd83178..21bc956 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -24,13 +24,13 @@ import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.fs.Path import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable} import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types._ -import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.util.Utils /** @@ -696,19 +696,34 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } } + test("a table with an invalid path can be still dropped") { + val schema = StructType(StructField("int", IntegerType, true) :: Nil) + val tableIdent = TableIdentifier("test_drop_table_with_invalid_path") + catalog.createDataSourceTable( + tableIdent = tableIdent, + userSpecifiedSchema = Some(schema), + partitionColumns = Array.empty[String], + provider = "json", + options = Map("path" -> "an invalid path"), + isExternal = false) + + sql("DROP TABLE test_drop_table_with_invalid_path") + } + test("SPARK-6024 wide schema support") { withSQLConf(SQLConf.SCHEMA_STRING_LENGTH_THRESHOLD.key -> "4000") { withTable("wide_schema") { // We will need 80 splits for this schema if the threshold is 4000. val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true))) - + val tableIdent = TableIdentifier("wide_schema") + val path = catalog.hiveDefaultTableFilePath(tableIdent) // Manually create a metastore data source table. catalog.createDataSourceTable( - tableIdent = TableIdentifier("wide_schema"), + tableIdent = tableIdent, userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], provider = "json", - options = Map("path" -> "just a dummy path"), + options = Map("path" -> path), isExternal = false) invalidateTable("wide_schema") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org