Repository: spark
Updated Branches:
  refs/heads/master 9734a928a -> ce233f18e


[SPARK-19463][SQL] refresh cache after the InsertIntoHadoopFsRelationCommand

## What changes were proposed in this pull request?

If we first cache a DataSource table, then we insert some data into the table, 
we should refresh the data in the cache after the insert command.

## How was this patch tested?
unit test added

Author: windpiger <song...@outlook.com>

Closes #16809 from windpiger/refreshCacheAfterInsert.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce233f18
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce233f18
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce233f18

Branch: refs/heads/master
Commit: ce233f18e381fa1ea00be74ca26e97d35baa6c9c
Parents: 9734a92
Author: windpiger <song...@outlook.com>
Authored: Tue Feb 28 11:59:18 2017 -0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Tue Feb 28 11:59:18 2017 -0800

----------------------------------------------------------------------
 .../InsertIntoHadoopFsRelationCommand.scala       |  3 +++
 .../datasources/parquet/ParquetQuerySuite.scala   |  4 ----
 .../apache/spark/sql/sources/InsertSuite.scala    | 18 +++++++++---------
 .../apache/spark/sql/hive/CachedTableSuite.scala  | 14 ++------------
 4 files changed, 14 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ce233f18/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 652bcc8..19b51d4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -147,7 +147,10 @@ case class InsertIntoHadoopFsRelationCommand(
         refreshFunction = refreshPartitionsCallback,
         options = options)
 
+      // refresh cached files in FileIndex
       fileIndex.foreach(_.refresh())
+      // refresh data cache if table is cached
+      sparkSession.catalog.refreshByPath(outputPath.toString)
     } else {
       logInfo("Skipping insertion into a relation that already exists.")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ce233f18/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index d7d7176..200e356 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -77,8 +77,6 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
with SharedSQLContext
       val df = spark.read.parquet(path).cache()
       assert(df.count() == 1000)
       spark.range(10).write.mode("overwrite").parquet(path)
-      assert(df.count() == 1000)
-      spark.catalog.refreshByPath(path)
       assert(df.count() == 10)
       assert(spark.read.parquet(path).count() == 10)
     }
@@ -91,8 +89,6 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
with SharedSQLContext
       val df = spark.read.parquet(path).cache()
       assert(df.count() == 1000)
       spark.range(10).write.mode("append").parquet(path)
-      assert(df.count() == 1000)
-      spark.catalog.refreshByPath(path)
       assert(df.count() == 1010)
       assert(spark.read.parquet(path).count() == 1010)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/ce233f18/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 19835cd..2eae66d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -281,15 +281,15 @@ class InsertSuite extends DataSourceTest with 
SharedSQLContext {
       """.stripMargin)
     // jsonTable should be recached.
     assertCached(sql("SELECT * FROM jsonTable"))
-    // TODO we need to invalidate the cached data in InsertIntoHadoopFsRelation
-//    // The cached data is the new data.
-//    checkAnswer(
-//      sql("SELECT a, b FROM jsonTable"),
-//      sql("SELECT a * 2, b FROM jt").collect())
-//
-//    // Verify uncaching
-//    spark.catalog.uncacheTable("jsonTable")
-//    assertCached(sql("SELECT * FROM jsonTable"), 0)
+
+    // The cached data is the new data.
+    checkAnswer(
+      sql("SELECT a, b FROM jsonTable"),
+      sql("SELECT a * 2, b FROM jt").collect())
+
+    // Verify uncaching
+    spark.catalog.uncacheTable("jsonTable")
+    assertCached(sql("SELECT * FROM jsonTable"), 0)
   }
 
   test("it's not allowed to insert into a relation that is not an 
InsertableRelation") {

http://git-wip-us.apache.org/repos/asf/spark/blob/ce233f18/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 3871b3d..8ccc2b7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -204,13 +204,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils 
with TestHiveSingleto
     assertCached(table("refreshTable"))
     // Append new data.
     table("src").write.mode(SaveMode.Append).parquet(tempPath.toString)
-    // We are still using the old data.
     assertCached(table("refreshTable"))
-    checkAnswer(
-      table("refreshTable"),
-      table("src").collect())
-    // Refresh the table.
-    sql("REFRESH TABLE refreshTable")
+
     // We are using the new data.
     assertCached(table("refreshTable"))
     checkAnswer(
@@ -249,13 +244,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils 
with TestHiveSingleto
     assertCached(table("refreshTable"))
     // Append new data.
     table("src").write.mode(SaveMode.Append).parquet(tempPath.toString)
-    // We are still using the old data.
     assertCached(table("refreshTable"))
-    checkAnswer(
-      table("refreshTable"),
-      table("src").collect())
-    // Refresh the table.
-    sql(s"REFRESH ${tempPath.toString}")
+
     // We are using the new data.
     assertCached(table("refreshTable"))
     checkAnswer(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to