Repository: spark Updated Branches: refs/heads/master 9734a928a -> ce233f18e
[SPARK-19463][SQL] refresh cache after the InsertIntoHadoopFsRelationCommand ## What changes were proposed in this pull request? If we first cache a DataSource table, then we insert some data into the table, we should refresh the data in the cache after the insert command. ## How was this patch tested? unit test added Author: windpiger <song...@outlook.com> Closes #16809 from windpiger/refreshCacheAfterInsert. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce233f18 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce233f18 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce233f18 Branch: refs/heads/master Commit: ce233f18e381fa1ea00be74ca26e97d35baa6c9c Parents: 9734a92 Author: windpiger <song...@outlook.com> Authored: Tue Feb 28 11:59:18 2017 -0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Tue Feb 28 11:59:18 2017 -0800 ---------------------------------------------------------------------- .../InsertIntoHadoopFsRelationCommand.scala | 3 +++ .../datasources/parquet/ParquetQuerySuite.scala | 4 ---- .../apache/spark/sql/sources/InsertSuite.scala | 18 +++++++++--------- .../apache/spark/sql/hive/CachedTableSuite.scala | 14 ++------------ 4 files changed, 14 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ce233f18/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 652bcc8..19b51d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -147,7 +147,10 @@ case class InsertIntoHadoopFsRelationCommand( refreshFunction = refreshPartitionsCallback, options = options) + // refresh cached files in FileIndex fileIndex.foreach(_.refresh()) + // refresh data cache if table is cached + sparkSession.catalog.refreshByPath(outputPath.toString) } else { logInfo("Skipping insertion into a relation that already exists.") } http://git-wip-us.apache.org/repos/asf/spark/blob/ce233f18/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index d7d7176..200e356 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -77,8 +77,6 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext val df = spark.read.parquet(path).cache() assert(df.count() == 1000) spark.range(10).write.mode("overwrite").parquet(path) - assert(df.count() == 1000) - spark.catalog.refreshByPath(path) assert(df.count() == 10) assert(spark.read.parquet(path).count() == 10) } @@ -91,8 +89,6 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext val df = spark.read.parquet(path).cache() assert(df.count() == 1000) spark.range(10).write.mode("append").parquet(path) - assert(df.count() == 1000) - spark.catalog.refreshByPath(path) assert(df.count() == 1010) assert(spark.read.parquet(path).count() == 1010) } http://git-wip-us.apache.org/repos/asf/spark/blob/ce233f18/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 19835cd..2eae66d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -281,15 +281,15 @@ class InsertSuite extends DataSourceTest with SharedSQLContext { """.stripMargin) // jsonTable should be recached. assertCached(sql("SELECT * FROM jsonTable")) - // TODO we need to invalidate the cached data in InsertIntoHadoopFsRelation -// // The cached data is the new data. -// checkAnswer( -// sql("SELECT a, b FROM jsonTable"), -// sql("SELECT a * 2, b FROM jt").collect()) -// -// // Verify uncaching -// spark.catalog.uncacheTable("jsonTable") -// assertCached(sql("SELECT * FROM jsonTable"), 0) + + // The cached data is the new data. + checkAnswer( + sql("SELECT a, b FROM jsonTable"), + sql("SELECT a * 2, b FROM jt").collect()) + + // Verify uncaching + spark.catalog.uncacheTable("jsonTable") + assertCached(sql("SELECT * FROM jsonTable"), 0) } test("it's not allowed to insert into a relation that is not an InsertableRelation") { http://git-wip-us.apache.org/repos/asf/spark/blob/ce233f18/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 3871b3d..8ccc2b7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -204,13 +204,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto assertCached(table("refreshTable")) // Append new data. table("src").write.mode(SaveMode.Append).parquet(tempPath.toString) - // We are still using the old data. assertCached(table("refreshTable")) - checkAnswer( - table("refreshTable"), - table("src").collect()) - // Refresh the table. - sql("REFRESH TABLE refreshTable") + // We are using the new data. assertCached(table("refreshTable")) checkAnswer( @@ -249,13 +244,8 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto assertCached(table("refreshTable")) // Append new data. table("src").write.mode(SaveMode.Append).parquet(tempPath.toString) - // We are still using the old data. assertCached(table("refreshTable")) - checkAnswer( - table("refreshTable"), - table("src").collect()) - // Refresh the table. - sql(s"REFRESH ${tempPath.toString}") + // We are using the new data. assertCached(table("refreshTable")) checkAnswer( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org