Repository: spark
Updated Branches:
  refs/heads/master 2f5358873 -> 1f39a6111


[Spark-5068][SQL]Fix bug query data when path doesn't exist for HiveContext

This PR follow up PR #3907 & #3891 & #4356.
According to  marmbrus  liancheng 's comments, I try to use fs.globStatus to 
retrieve all FileStatus objects under path(s), and then do the filtering 
locally.

[1]. get pathPattern by path, and put it into pathPatternSet. 
(hdfs://cluster/user/demo/2016/08/12 -> hdfs://cluster/user/demo/*/*/*)
[2]. retrieve all FileStatus objects ,and cache them by undating existPathSet.
[3]. do the filtering locally
[4]. if we have new pathPattern,do 1,2 step again. (external table maybe have 
more than one partition pathPattern)

chenghao-intel jeanlyn

Author: lazymam500 <lazyman...@gmail.com>
Author: lazyman <lazyman...@gmail.com>

Closes #5059 from lazyman500/SPARK-5068 and squashes the following commits:

5bfcbfd [lazyman] move spark.sql.hive.verifyPartitionPath to SQLConf,fix scala 
style
e1d6386 [lazymam500] fix scala style
f23133f [lazymam500] bug fix
47e0023 [lazymam500] fix scala style,add config flag,break the chaining
04c443c [lazyman] SPARK-5068: fix bug when partition path doesn't exists #2
41f60ce [lazymam500] Merge pull request #1 from apache/master


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f39a611
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f39a611
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f39a611

Branch: refs/heads/master
Commit: 1f39a61118184e136f38381a9f3ba0b2d5d589d9
Parents: 2f53588
Author: lazymam500 <lazyman...@gmail.com>
Authored: Sat Apr 11 18:33:14 2015 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Sat Apr 11 18:33:14 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLConf.scala    |  6 ++
 .../org/apache/spark/sql/hive/TableReader.scala | 41 ++++++++++++-
 .../spark/sql/hive/QueryPartitionSuite.scala    | 64 ++++++++++++++++++++
 3 files changed, 110 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1f39a611/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 4815620..ee641bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -39,6 +39,8 @@ private[spark] object SQLConf {
   val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown"
   val PARQUET_USE_DATA_SOURCE_API = "spark.sql.parquet.useDataSourceApi"
 
+  val HIVE_VERIFY_PARTITIONPATH = "spark.sql.hive.verifyPartitionPath"
+
   val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord"
   val BROADCAST_TIMEOUT = "spark.sql.broadcastTimeout"
 
@@ -119,6 +121,10 @@ private[sql] class SQLConf extends Serializable {
   private[spark] def parquetUseDataSourceApi =
     getConf(PARQUET_USE_DATA_SOURCE_API, "true").toBoolean
 
+  /** When true uses verifyPartitionPath to prune the path which is not 
exists. */
+  private[spark] def verifyPartitionPath =
+    getConf(HIVE_VERIFY_PARTITIONPATH, "true").toBoolean
+
   /** When true the planner will use the external sort, which may spill to 
disk. */
   private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, 
"false").toBoolean
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1f39a611/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 3563472..d352915 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -142,7 +142,46 @@ class HadoopTableReader(
       partitionToDeserializer: Map[HivePartition,
       Class[_ <: Deserializer]],
       filterOpt: Option[PathFilter]): RDD[Row] = {
-    val hivePartitionRDDs = partitionToDeserializer.map { case (partition, 
partDeserializer) =>
+        
+    // SPARK-5068:get FileStatus and do the filtering locally when the path is 
not exists
+    def verifyPartitionPath(
+        partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]):
+        Map[HivePartition, Class[_ <: Deserializer]] = {
+      if (!sc.conf.verifyPartitionPath) {
+        partitionToDeserializer
+      } else {
+        var existPathSet = collection.mutable.Set[String]()
+        var pathPatternSet = collection.mutable.Set[String]()
+        partitionToDeserializer.filter {
+          case (partition, partDeserializer) =>
+            def updateExistPathSetByPathPattern(pathPatternStr: String) {
+              val pathPattern = new Path(pathPatternStr)
+              val fs = pathPattern.getFileSystem(sc.hiveconf)
+              val matches = fs.globStatus(pathPattern)
+              matches.foreach(fileStatus => existPathSet += 
fileStatus.getPath.toString)
+            }
+            // convert  /demo/data/year/month/day  to  /demo/data/*/*/*/
+            def getPathPatternByPath(parNum: Int, tempPath: Path): String = {
+              var path = tempPath
+              for (i <- (1 to parNum)) path = path.getParent
+              val tails = (1 to parNum).map(_ => "*").mkString("/", "/", "/")
+              path.toString + tails
+            }
+
+            val partPath = HiveShim.getDataLocationPath(partition)
+            val partNum = 
Utilities.getPartitionDesc(partition).getPartSpec.size();
+            var pathPatternStr = getPathPatternByPath(partNum, partPath)
+            if (!pathPatternSet.contains(pathPatternStr)) {
+              pathPatternSet += pathPatternStr
+              updateExistPathSetByPathPattern(pathPatternStr)
+            }
+            existPathSet.contains(partPath.toString)
+        }
+      }
+    }
+
+    val hivePartitionRDDs = verifyPartitionPath(partitionToDeserializer)
+      .map { case (partition, partDeserializer) =>
       val partDesc = Utilities.getPartitionDesc(partition)
       val partPath = HiveShim.getDataLocationPath(partition)
       val inputPathStr = applyFilterIfNeeded(partPath, filterOpt)

http://git-wip-us.apache.org/repos/asf/spark/blob/1f39a611/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
new file mode 100644
index 0000000..83f9712
--- /dev/null
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import com.google.common.io.Files
+import org.apache.spark.sql.{QueryTest, _}
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.util.Utils
+/* Implicits */
+import org.apache.spark.sql.hive.test.TestHive._
+
+
+
+class QueryPartitionSuite extends QueryTest {
+  import org.apache.spark.sql.hive.test.TestHive.implicits._
+
+  test("SPARK-5068: query data when path doesn't exists"){
+    val testData = TestHive.sparkContext.parallelize(
+      (1 to 10).map(i => TestData(i, i.toString))).toDF()
+    testData.registerTempTable("testData")
+
+    val tmpDir = Files.createTempDir()
+    //create the table for test
+    sql(s"CREATE TABLE table_with_partition(key int,value string) PARTITIONED 
by (ds string) location '${tmpDir.toURI.toString}' ")
+    sql("INSERT OVERWRITE TABLE table_with_partition  partition (ds='1') 
SELECT key,value FROM testData")
+    sql("INSERT OVERWRITE TABLE table_with_partition  partition (ds='2') 
SELECT key,value FROM testData")
+    sql("INSERT OVERWRITE TABLE table_with_partition  partition (ds='3') 
SELECT key,value FROM testData")
+    sql("INSERT OVERWRITE TABLE table_with_partition  partition (ds='4') 
SELECT key,value FROM testData")
+
+    //test for the exist path
+    checkAnswer(sql("select key,value from table_with_partition"),
+      testData.toSchemaRDD.collect ++ testData.toSchemaRDD.collect
+        ++ testData.toSchemaRDD.collect ++ testData.toSchemaRDD.collect)
+
+    //delect the path of one partition
+    val folders = tmpDir.listFiles.filter(_.isDirectory)
+    Utils.deleteRecursively(folders(0))
+
+    //test for affter delete the path
+    checkAnswer(sql("select key,value from table_with_partition"),
+      testData.toSchemaRDD.collect ++ testData.toSchemaRDD.collect
+        ++ testData.toSchemaRDD.collect)
+
+    sql("DROP TABLE table_with_partition")
+    sql("DROP TABLE createAndInsertTest")
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to