Repository: spark
Updated Branches:
  refs/heads/master 8ccca9170 -> 064fadd2a


[SPARK-19059][SQL] Unable to retrieve data from parquet table whose name 
startswith underscore

## What changes were proposed in this pull request?
The initial shouldFilterOut() method invocation filter the root path name(table 
name in the intial call) and remove if it contains _. I moved the check one 
level below, so it first list files/directories in the given root path and then 
apply filter.
(Please fill in changes proposed in this fix)

## How was this patch tested?
Added new test case for this scenario
(Please explain how this patch was tested. E.g. unit tests, integration tests, 
manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: jayadevanmurali <jayadeva...@tcs.com>
Author: jayadevan <jayadeva...@tcs.com>

Closes #16635 from jayadevanmurali/branch-0.1-SPARK-19059.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/064fadd2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/064fadd2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/064fadd2

Branch: refs/heads/master
Commit: 064fadd2a25d1c118e062e505a0ed56be31bdf34
Parents: 8ccca91
Author: jayadevanmurali <jayadeva...@tcs.com>
Authored: Thu Jan 19 20:07:52 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Jan 19 20:07:52 2017 +0800

----------------------------------------------------------------------
 .../PartitioningAwareFileIndex.scala            | 91 ++++++++++----------
 .../org/apache/spark/sql/SQLQuerySuite.scala    |  8 ++
 2 files changed, 53 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/064fadd2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
index 82c1599..fe9c657 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
@@ -385,55 +385,54 @@ object PartitioningAwareFileIndex extends Logging {
     logTrace(s"Listing $path")
     val fs = path.getFileSystem(hadoopConf)
     val name = path.getName.toLowerCase
-    if (shouldFilterOut(name)) {
-      Seq.empty[FileStatus]
-    } else {
-      // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't 
exist
-      // Note that statuses only include FileStatus for the files and dirs 
directly under path,
-      // and does not include anything else recursively.
-      val statuses = try fs.listStatus(path) catch {
-        case _: FileNotFoundException =>
-          logWarning(s"The directory $path was not found. Was it deleted very 
recently?")
-          Array.empty[FileStatus]
-      }
 
-      val allLeafStatuses = {
-        val (dirs, topLevelFiles) = statuses.partition(_.isDirectory)
-        val nestedFiles: Seq[FileStatus] = sessionOpt match {
-          case Some(session) =>
-            bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, 
session).flatMap(_._2)
-          case _ =>
-            dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, 
sessionOpt))
-        }
-        val allFiles = topLevelFiles ++ nestedFiles
-        if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) 
else allFiles
-      }
+    // [SPARK-17599] Prevent InMemoryFileIndex from failing if path doesn't 
exist
+    // Note that statuses only include FileStatus for the files and dirs 
directly under path,
+    // and does not include anything else recursively.
+    val statuses = try fs.listStatus(path) catch {
+      case _: FileNotFoundException =>
+        logWarning(s"The directory $path was not found. Was it deleted very 
recently?")
+        Array.empty[FileStatus]
+    }
 
-      allLeafStatuses.filterNot(status => 
shouldFilterOut(status.getPath.getName)).map {
-        case f: LocatedFileStatus =>
-          f
-
-        // NOTE:
-        //
-        // - Although S3/S3A/S3N file system can be quite slow for remote file 
metadata
-        //   operations, calling `getFileBlockLocations` does no harm here 
since these file system
-        //   implementations don't actually issue RPC for this method.
-        //
-        // - Here we are calling `getFileBlockLocations` in a sequential 
manner, but it should not
-        //   be a big deal since we always use to `listLeafFilesInParallel` 
when the number of
-        //   paths exceeds threshold.
-        case f =>
-          // The other constructor of LocatedFileStatus will call 
FileStatus.getPermission(),
-          // which is very slow on some file system (RawLocalFileSystem, which 
is launch a
-          // subprocess and parse the stdout).
-          val locations = fs.getFileBlockLocations(f, 0, f.getLen)
-          val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, 
f.getReplication, f.getBlockSize,
-            f.getModificationTime, 0, null, null, null, null, f.getPath, 
locations)
-          if (f.isSymlink) {
-            lfs.setSymlink(f.getSymlink)
-          }
-          lfs
+    val filteredStatuses = statuses.filterNot(status => 
shouldFilterOut(status.getPath.getName))
+
+    val allLeafStatuses = {
+      val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
+      val nestedFiles: Seq[FileStatus] = sessionOpt match {
+        case Some(session) =>
+          bulkListLeafFiles(dirs.map(_.getPath), hadoopConf, filter, 
session).flatMap(_._2)
+        case _ =>
+          dirs.flatMap(dir => listLeafFiles(dir.getPath, hadoopConf, filter, 
sessionOpt))
       }
+      val allFiles = topLevelFiles ++ nestedFiles
+      if (filter != null) allFiles.filter(f => filter.accept(f.getPath)) else 
allFiles
+    }
+
+    allLeafStatuses.filterNot(status => 
shouldFilterOut(status.getPath.getName)).map {
+      case f: LocatedFileStatus =>
+        f
+
+      // NOTE:
+      //
+      // - Although S3/S3A/S3N file system can be quite slow for remote file 
metadata
+      //   operations, calling `getFileBlockLocations` does no harm here since 
these file system
+      //   implementations don't actually issue RPC for this method.
+      //
+      // - Here we are calling `getFileBlockLocations` in a sequential manner, 
but it should not
+      //   be a big deal since we always use to `listLeafFilesInParallel` when 
the number of
+      //   paths exceeds threshold.
+      case f =>
+        // The other constructor of LocatedFileStatus will call 
FileStatus.getPermission(),
+        // which is very slow on some file system (RawLocalFileSystem, which 
is launch a
+        // subprocess and parse the stdout).
+        val locations = fs.getFileBlockLocations(f, 0, f.getLen)
+        val lfs = new LocatedFileStatus(f.getLen, f.isDirectory, 
f.getReplication, f.getBlockSize,
+          f.getModificationTime, 0, null, null, null, null, f.getPath, 
locations)
+        if (f.isSymlink) {
+          lfs.setSymlink(f.getSymlink)
+        }
+        lfs
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/064fadd2/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 10607b8..8f1beaa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -2513,4 +2513,12 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
       }
     }
   }
+
+  test("SPARK-19059: read file based table whose name starts with underscore") 
{
+    withTable("_tbl") {
+      sql("CREATE TABLE `_tbl`(i INT) USING parquet")
+      sql("INSERT INTO `_tbl` VALUES (1), (2), (3)")
+      checkAnswer( sql("SELECT * FROM `_tbl`"), Row(1) :: Row(2) :: Row(3) :: 
Nil)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to