Repository: spark
Updated Branches:
  refs/heads/branch-2.0 cc100ab54 -> 0754ccb2b


[SPARK-16311][SQL] Metadata refresh should work on temporary views

## What changes were proposed in this pull request?
This patch fixes the bug that the refresh command does not work on temporary 
views. This patch is based on https://github.com/apache/spark/pull/13989, but 
removes the public Dataset.refresh() API as well as improved test coverage.

Note that I actually think the public refresh() API is very useful. We can in 
the future implement it by also invalidating the lazy vals in QueryExecution 
(or alternatively just create a new QueryExecution).

## How was this patch tested?
Re-enabled a previously ignored test, and added a new test suite for Hive 
testing behavior of temporary views against MetastoreRelation.

Author: Reynold Xin <r...@databricks.com>
Author: petermaxlee <petermax...@gmail.com>

Closes #14009 from rxin/SPARK-16311.

(cherry picked from commit 16a2a7d714f945b06978e3bd20a58ea32f0621ac)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0754ccb2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0754ccb2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0754ccb2

Branch: refs/heads/branch-2.0
Commit: 0754ccb2be79e90bc746de54b01aa6af55f3291f
Parents: cc100ab
Author: Reynold Xin <r...@databricks.com>
Authored: Tue Jul 5 11:36:05 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue Jul 5 11:36:20 2016 -0700

----------------------------------------------------------------------
 .../sql/catalyst/catalog/SessionCatalog.scala   | 16 ++---
 .../catalyst/plans/logical/LogicalPlan.scala    |  5 ++
 .../spark/sql/execution/command/ddl.scala       |  2 +-
 .../spark/sql/execution/command/tables.scala    |  4 +-
 .../execution/datasources/LogicalRelation.scala |  5 ++
 .../spark/sql/internal/SessionState.scala       |  4 +-
 .../apache/spark/sql/MetadataCacheSuite.scala   |  8 +--
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  4 --
 .../spark/sql/hive/HiveSessionCatalog.scala     |  5 +-
 .../spark/sql/hive/HiveMetadataCacheSuite.scala | 62 ++++++++++++++++++++
 .../sql/hive/MetastoreDataSourcesSuite.scala    | 16 ++---
 .../apache/spark/sql/hive/parquetSuites.scala   |  6 +-
 12 files changed, 101 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0754ccb2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 8c620d3..e1d4991 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -462,17 +462,17 @@ class SessionCatalog(
     }
   }
 
-  // TODO: It's strange that we have both refresh and invalidate here.
-
   /**
    * Refresh the cache entry for a metastore table, if any.
    */
-  def refreshTable(name: TableIdentifier): Unit = { /* no-op */ }
-
-  /**
-   * Invalidate the cache entry for a metastore table, if any.
-   */
-  def invalidateTable(name: TableIdentifier): Unit = { /* no-op */ }
+  def refreshTable(name: TableIdentifier): Unit = {
+    // Go through temporary tables and invalidate them.
+    // If the database is defined, this is definitely not a temp table.
+    // If the database is not defined, there is a good chance this is a temp 
table.
+    if (name.database.isEmpty) {
+      tempTables.get(name.table).foreach(_.refresh())
+    }
+  }
 
   /**
    * Drop all existing temporary tables.

http://git-wip-us.apache.org/repos/asf/spark/blob/0754ccb2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 4984f23..d0b2b5d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -265,6 +265,11 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] 
with Logging {
           s"Reference '$name' is ambiguous, could be: $referenceNames.")
     }
   }
+
+  /**
+   * Refreshes (or invalidates) any metadata/data cached in the plan 
recursively.
+   */
+  def refresh(): Unit = children.foreach(_.refresh())
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/0754ccb2/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index fc00912..226f61e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -206,7 +206,7 @@ case class DropTableCommand(
       } catch {
         case NonFatal(e) => log.warn(e.toString, e)
       }
-      catalog.invalidateTable(tableName)
+      catalog.refreshTable(tableName)
       catalog.dropTable(tableName, ifExists)
     }
     Seq.empty[Row]

http://git-wip-us.apache.org/repos/asf/spark/blob/0754ccb2/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 30dc7e8..1483604 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -172,7 +172,7 @@ case class AlterTableRenameCommand(
       }
       // Invalidate the table last, otherwise uncaching the table would load 
the logical plan
       // back into the hive metastore cache
-      catalog.invalidateTable(oldName)
+      catalog.refreshTable(oldName)
       catalog.renameTable(oldName, newName)
       if (wasCached) {
         sparkSession.catalog.cacheTable(newName.unquotedString)
@@ -373,7 +373,7 @@ case class TruncateTableCommand(
     }
     // After deleting the data, invalidate the table to make sure we don't 
keep around a stale
     // file relation in the metastore cache.
-    spark.sessionState.invalidateTable(tableName.unquotedString)
+    spark.sessionState.refreshTable(tableName.unquotedString)
     // Also try to drop the contents of the table from the columnar cache
     try {
       
spark.sharedState.cacheManager.uncacheQuery(spark.table(tableName.quotedString))

http://git-wip-us.apache.org/repos/asf/spark/blob/0754ccb2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 39c8606..90711f2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -85,5 +85,10 @@ case class LogicalRelation(
       expectedOutputAttributes,
       metastoreTableIdentifier).asInstanceOf[this.type]
 
+  override def refresh(): Unit = relation match {
+    case fs: HadoopFsRelation => fs.refresh()
+    case _ =>  // Do nothing.
+  }
+
   override def simpleString: String = 
s"Relation[${Utils.truncatedString(output, ",")}] $relation"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0754ccb2/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 5f5cf5c..01cc13f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -166,8 +166,8 @@ private[sql] class SessionState(sparkSession: SparkSession) 
{
 
   def executePlan(plan: LogicalPlan): QueryExecution = new 
QueryExecution(sparkSession, plan)
 
-  def invalidateTable(tableName: String): Unit = {
-    catalog.invalidateTable(sqlParser.parseTableIdentifier(tableName))
+  def refreshTable(tableName: String): Unit = {
+    catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
   }
 
   def addJar(path: String): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/0754ccb2/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
index d872f4b..3f8cc81 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
@@ -59,8 +59,8 @@ class MetadataCacheSuite extends QueryTest with 
SharedSQLContext {
     }
   }
 
-  ignore("SPARK-16337 temporary view refresh") {
-    withTempPath { (location: File) =>
+  test("SPARK-16337 temporary view refresh") {
+    withTempTable("view_refresh") { withTempPath { (location: File) =>
       // Create a Parquet directory
       spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
         .write.parquet(location.getAbsolutePath)
@@ -77,12 +77,12 @@ class MetadataCacheSuite extends QueryTest with 
SharedSQLContext {
         sql("select count(*) from view_refresh").first()
       }
       assert(e.getMessage.contains("FileNotFoundException"))
-      assert(e.getMessage.contains("refresh()"))
+      assert(e.getMessage.contains("REFRESH"))
 
       // Refresh and we should be able to read it again.
       spark.catalog.refreshTable("view_refresh")
       val newCount = sql("select count(*) from 
view_refresh").first().getLong(0)
       assert(newCount > 0 && newCount < 100)
-    }
+    }}
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0754ccb2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 2e0b5d5..d6c5325 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -147,10 +147,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
     // it is better at here to invalidate the cache to avoid confusing waring 
logs from the
     // cache loader (e.g. cannot find data source provider, which is only 
defined for
     // data source table.).
-    invalidateTable(tableIdent)
-  }
-
-  def invalidateTable(tableIdent: TableIdentifier): Unit = {
     cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0754ccb2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 2589b9d..ea818b5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -90,13 +90,10 @@ private[sql] class HiveSessionCatalog(
   val CreateTables: Rule[LogicalPlan] = metastoreCatalog.CreateTables
 
   override def refreshTable(name: TableIdentifier): Unit = {
+    super.refreshTable(name)
     metastoreCatalog.refreshTable(name)
   }
 
-  override def invalidateTable(name: TableIdentifier): Unit = {
-    metastoreCatalog.invalidateTable(name)
-  }
-
   def invalidateCache(): Unit = {
     metastoreCatalog.cachedDataSourceTables.invalidateAll()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0754ccb2/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
new file mode 100644
index 0000000..5714d06
--- /dev/null
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetadataCacheSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
+
+/**
+ * Test suite to handle metadata cache related.
+ */
+class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with 
TestHiveSingleton {
+
+  test("SPARK-16337 temporary view refresh") {
+    withTempTable("view_refresh") {
+      withTable("view_table") {
+        // Create a Parquet directory
+        spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
+          .write.saveAsTable("view_table")
+
+        // Read the table in
+        spark.table("view_table").filter("id > 
-1").createOrReplaceTempView("view_refresh")
+        assert(sql("select count(*) from view_refresh").first().getLong(0) == 
100)
+
+        // Delete a file using the Hadoop file system interface since the path 
returned by
+        // inputFiles is not recognizable by Java IO.
+        val p = new Path(spark.table("view_table").inputFiles.head)
+        
assert(p.getFileSystem(hiveContext.sessionState.newHadoopConf()).delete(p, 
false))
+
+        // Read it again and now we should see a FileNotFoundException
+        val e = intercept[SparkException] {
+          sql("select count(*) from view_refresh").first()
+        }
+        assert(e.getMessage.contains("FileNotFoundException"))
+        assert(e.getMessage.contains("REFRESH"))
+
+        // Refresh and we should be able to read it again.
+        spark.catalog.refreshTable("view_refresh")
+        val newCount = sql("select count(*) from 
view_refresh").first().getLong(0)
+        assert(newCount > 0 && newCount < 100)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0754ccb2/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index b028d49..12d250d 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -255,13 +255,13 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
           sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq)
 
         // Discard the cached relation.
-        sessionState.invalidateTable("jsonTable")
+        sessionState.refreshTable("jsonTable")
 
         checkAnswer(
           sql("SELECT * FROM jsonTable"),
           sql("SELECT `c_!@(3)` FROM expectedJsonTable").collect().toSeq)
 
-        sessionState.invalidateTable("jsonTable")
+        sessionState.refreshTable("jsonTable")
         val expectedSchema = StructType(StructField("c_!@(3)", IntegerType, 
true) :: Nil)
 
         assert(expectedSchema === table("jsonTable").schema)
@@ -349,7 +349,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
            """.stripMargin)
 
         // Discard the cached relation.
-        sessionState.invalidateTable("ctasJsonTable")
+        sessionState.refreshTable("ctasJsonTable")
 
         // Schema should not be changed.
         assert(table("ctasJsonTable").schema === table("jsonTable").schema)
@@ -424,7 +424,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
         sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"),
         (6 to 10).map(i => Row(i, s"str$i")))
 
-      sessionState.invalidateTable("savedJsonTable")
+      sessionState.refreshTable("savedJsonTable")
 
       checkAnswer(
         sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"),
@@ -710,7 +710,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
             options = Map("path" -> tempDir.getCanonicalPath),
             isExternal = false)
 
-          sessionState.invalidateTable("wide_schema")
+          sessionState.refreshTable("wide_schema")
 
           val actualSchema = table("wide_schema").schema
           assert(schema === actualSchema)
@@ -743,7 +743,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
 
       sharedState.externalCatalog.createTable("default", hiveTable, 
ignoreIfExists = false)
 
-      sessionState.invalidateTable(tableName)
+      sessionState.refreshTable(tableName)
       val actualSchema = table(tableName).schema
       assert(schema === actualSchema)
 
@@ -758,7 +758,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
 
     withTable(tableName) {
       df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName)
-      sessionState.invalidateTable(tableName)
+      sessionState.refreshTable(tableName)
       val metastoreTable = sharedState.externalCatalog.getTable("default", 
tableName)
       val expectedPartitionColumns = StructType(df.schema("d") :: 
df.schema("b") :: Nil)
 
@@ -793,7 +793,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
         .bucketBy(8, "d", "b")
         .sortBy("c")
         .saveAsTable(tableName)
-      sessionState.invalidateTable(tableName)
+      sessionState.refreshTable(tableName)
       val metastoreTable = sharedState.externalCatalog.getTable("default", 
tableName)
       val expectedBucketByColumns = StructType(df.schema("d") :: 
df.schema("b") :: Nil)
       val expectedSortByColumns = StructType(df.schema("c") :: Nil)

http://git-wip-us.apache.org/repos/asf/spark/blob/0754ccb2/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index fe7253d..a78a285 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -462,7 +462,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest 
{
     checkCached(tableIdentifier)
     // For insert into non-partitioned table, we will do the conversion,
     // so the converted test_insert_parquet should be cached.
-    sessionState.invalidateTable("test_insert_parquet")
+    sessionState.refreshTable("test_insert_parquet")
     assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === 
null)
     sql(
       """
@@ -475,7 +475,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest 
{
       sql("select * from test_insert_parquet"),
       sql("select a, b from jt").collect())
     // Invalidate the cache.
-    sessionState.invalidateTable("test_insert_parquet")
+    sessionState.refreshTable("test_insert_parquet")
     assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === 
null)
 
     // Create a partitioned table.
@@ -525,7 +525,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest 
{
           |select b, '2015-04-02', a FROM jt
         """.stripMargin).collect())
 
-    sessionState.invalidateTable("test_parquet_partitioned_cache_test")
+    sessionState.refreshTable("test_parquet_partitioned_cache_test")
     assert(sessionState.catalog.getCachedDataSourceTable(tableIdentifier) === 
null)
 
     dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to