Repository: spark
Updated Branches:
  refs/heads/branch-1.6 4381e2121 -> 6ea7d4bd3


[SPARK-16313][SQL][BRANCH-1.6] Spark should not silently drop exceptions in 
file listing

## What changes were proposed in this pull request?
Spark silently drops exceptions during file listing. This is a very bad 
behavior because it can mask legitimate errors and the resulting plan will 
silently have 0 rows. This patch changes it to not silently drop the errors.

After making partition discovery not silently drop exceptions, 
HiveMetastoreCatalog can trigger partition discovery on empty tables, which 
cause FileNotFoundExceptions (these Exceptions were dropped by partition 
discovery silently). To address this issue, this PR introduces two **hacks** to 
workaround the issues. These two hacks try to avoid of triggering partition 
discovery on empty tables in HiveMetastoreCatalog.

## How was this patch tested?
Manually tested.

**Note: This is a backport of https://github.com/apache/spark/pull/13987**

Author: Yin Huai <yh...@databricks.com>

Closes #14139 from yhuai/SPARK-16313-branch-1.6.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6ea7d4bd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6ea7d4bd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6ea7d4bd

Branch: refs/heads/branch-1.6
Commit: 6ea7d4bd393911d2d15b61e78df7473a7ea9b161
Parents: 4381e21
Author: Yin Huai <yh...@databricks.com>
Authored: Thu Jul 14 12:00:31 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Thu Jul 14 12:00:31 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/sources/interfaces.scala   |  6 ++--
 .../org/apache/spark/sql/SQLQuerySuite.scala    |  2 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 30 +++++++++++++++++---
 .../sql/hive/MetastoreDataSourcesSuite.scala    | 23 ++++++++++++---
 4 files changed, 49 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6ea7d4bd/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index ce5f3dc..5aba55c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -453,9 +453,9 @@ abstract class HadoopFsRelation private[sql](
           val jobConf = new JobConf(hadoopConf, this.getClass())
           val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
           if (pathFilter != null) {
-            Try(fs.listStatus(qualified, pathFilter)).getOrElse(Array.empty)
+            fs.listStatus(qualified, pathFilter)
           } else {
-            Try(fs.listStatus(qualified)).getOrElse(Array.empty)
+            fs.listStatus(qualified)
           }
         }.filterNot { status =>
           val name = status.getPath.getName
@@ -903,7 +903,7 @@ private[sql] object HadoopFsRelation extends Logging {
       val hdfsPath = new Path(path)
       val fs = hdfsPath.getFileSystem(serializableConfiguration.value)
       val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
-      Try(listLeafFiles(fs, 
fs.getFileStatus(qualified))).getOrElse(Array.empty)
+      listLeafFiles(fs, fs.getFileStatus(qualified))
     }.map { status =>
       FakeFileStatus(
         status.getPath.toString,

http://git-wip-us.apache.org/repos/asf/spark/blob/6ea7d4bd/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 6fec580..be23043 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1757,7 +1757,7 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
     val e3 = intercept[AnalysisException] {
       sql("select * from json.invalid_file")
     }
-    assert(e3.message.contains("No input paths specified"))
+    assert(e3.message.contains("invalid_file does not exist"))
   }
 
   test("SortMergeJoin returns wrong results when using UnsafeRows") {

http://git-wip-us.apache.org/repos/asf/spark/blob/6ea7d4bd/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 0562e33..03720c9 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -273,6 +273,22 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
         serdeProperties = options)
     }
 
+    def hasPartitionColumns(relation: HadoopFsRelation): Boolean = {
+      try {
+        // HACK for "[SPARK-16313][SQL][BRANCH-1.6] Spark should not silently 
drop exceptions in
+        // file listing" https://github.com/apache/spark/pull/14139
+        // Calling hadoopFsRelation.partitionColumns will trigger the refresh 
call of
+        // the HadoopFsRelation, which will validate input paths. However, 
when we create
+        // an empty table, the dir of the table has not been created, which 
will
+        // cause a FileNotFoundException. So, at here we will catch the 
FileNotFoundException
+        // and return false.
+        relation.partitionColumns.nonEmpty
+      } catch {
+        case _: java.io.FileNotFoundException =>
+          false
+      }
+    }
+
     def newHiveCompatibleMetastoreTable(relation: HadoopFsRelation, serde: 
HiveSerDe): HiveTable = {
       def schemaToHiveColumn(schema: StructType): Seq[HiveColumn] = {
         schema.map { field =>
@@ -284,12 +300,18 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
       }
 
       assert(partitionColumns.isEmpty)
-      assert(relation.partitionColumns.isEmpty)
+      assert(!hasPartitionColumns(relation))
 
       HiveTable(
         specifiedDatabase = Option(dbName),
         name = tblName,
-        schema = schemaToHiveColumn(relation.schema),
+        // HACK for "[SPARK-16313][SQL][BRANCH-1.6] Spark should not silently 
drop exceptions in
+        // file listing" https://github.com/apache/spark/pull/14139
+        // Since the table is not partitioned, we use dataSchema instead of 
using schema.
+        // Using schema which will trigger partition discovery on the path that
+        // may not be created causing FileNotFoundException. So, we just get 
dataSchema
+        // instead of calling relation.schema.
+        schema = schemaToHiveColumn(relation.dataSchema),
         partitionColumns = Nil,
         tableType = tableType,
         properties = tableProperties.toMap,
@@ -312,14 +334,14 @@ private[hive] class HiveMetastoreCatalog(val client: 
ClientInterface, hive: Hive
         (None, message)
 
       case (Some(serde), relation: HadoopFsRelation)
-        if relation.paths.length == 1 && relation.partitionColumns.isEmpty =>
+        if relation.paths.length == 1 && !hasPartitionColumns(relation) =>
         val hiveTable = newHiveCompatibleMetastoreTable(relation, serde)
         val message =
           s"Persisting data source relation $qualifiedTableName with a single 
input path " +
             s"into Hive metastore in Hive compatible format. Input path: 
${relation.paths.head}."
         (Some(hiveTable), message)
 
-      case (Some(serde), relation: HadoopFsRelation) if 
relation.partitionColumns.nonEmpty =>
+      case (Some(serde), relation: HadoopFsRelation) if 
hasPartitionColumns(relation) =>
         val message =
           s"Persisting partitioned data source relation $qualifiedTableName 
into " +
             "Hive metastore in Spark SQL specific format, which is NOT 
compatible with Hive. " +

http://git-wip-us.apache.org/repos/asf/spark/blob/6ea7d4bd/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index cd83178..21bc956 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -24,13 +24,13 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.util.Utils
 
 /**
@@ -696,19 +696,34 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
     }
   }
 
+  test("a table with an invalid path can be still dropped") {
+    val schema = StructType(StructField("int", IntegerType, true) :: Nil)
+    val tableIdent = TableIdentifier("test_drop_table_with_invalid_path")
+    catalog.createDataSourceTable(
+      tableIdent = tableIdent,
+      userSpecifiedSchema = Some(schema),
+      partitionColumns = Array.empty[String],
+      provider = "json",
+      options = Map("path" -> "an invalid path"),
+      isExternal = false)
+
+    sql("DROP TABLE test_drop_table_with_invalid_path")
+  }
+
   test("SPARK-6024 wide schema support") {
     withSQLConf(SQLConf.SCHEMA_STRING_LENGTH_THRESHOLD.key -> "4000") {
       withTable("wide_schema") {
         // We will need 80 splits for this schema if the threshold is 4000.
         val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", 
StringType, true)))
-
+        val tableIdent = TableIdentifier("wide_schema")
+        val path = catalog.hiveDefaultTableFilePath(tableIdent)
         // Manually create a metastore data source table.
         catalog.createDataSourceTable(
-          tableIdent = TableIdentifier("wide_schema"),
+          tableIdent = tableIdent,
           userSpecifiedSchema = Some(schema),
           partitionColumns = Array.empty[String],
           provider = "json",
-          options = Map("path" -> "just a dummy path"),
+          options = Map("path" -> path),
           isExternal = false)
 
         invalidateTable("wide_schema")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to