Repository: spark
Updated Branches:
  refs/heads/branch-2.0 798825c09 -> a08715c7a


[SPARK-15678] Add support to REFRESH data source paths

## What changes were proposed in this pull request?

Spark currently incorrectly continues to use cached data even if the underlying 
data is overwritten.

Current behavior:
```scala
val dir = "/tmp/test"
sqlContext.range(1000).write.mode("overwrite").parquet(dir)
val df = sqlContext.read.parquet(dir).cache()
df.count() // outputs 1000
sqlContext.range(10).write.mode("overwrite").parquet(dir)
sqlContext.read.parquet(dir).count() // outputs 1000 <---- We are still using 
the cached dataset
```

This patch fixes this bug by adding support for `REFRESH path` that invalidates 
and refreshes all the cached data (and the associated metadata) for any 
dataframe that contains the given data source path.

Expected behavior:
```scala
val dir = "/tmp/test"
sqlContext.range(1000).write.mode("overwrite").parquet(dir)
val df = sqlContext.read.parquet(dir).cache()
df.count() // outputs 1000
sqlContext.range(10).write.mode("overwrite").parquet(dir)
spark.catalog.refreshResource(dir)
sqlContext.read.parquet(dir).count() // outputs 10 <---- We are not using the 
cached dataset
```

## How was this patch tested?

Unit tests for overwrites and appends in `ParquetQuerySuite` and 
`CachedTableSuite`.

Author: Sameer Agarwal <sam...@databricks.com>

Closes #13566 from sameeragarwal/refresh-path-2.

(cherry picked from commit 468da03e23a01e02718608f05d778386cbb8416b)
Signed-off-by: Davies Liu <davies....@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a08715c7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a08715c7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a08715c7

Branch: refs/heads/branch-2.0
Commit: a08715c7a79ce1953b8d64a9cf0ec1c513d56eec
Parents: 798825c
Author: Sameer Agarwal <sam...@databricks.com>
Authored: Fri Jun 10 20:43:18 2016 -0700
Committer: Davies Liu <davies....@gmail.com>
Committed: Fri Jun 10 20:43:26 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |  1 +
 .../org/apache/spark/sql/catalog/Catalog.scala  |  7 +++
 .../spark/sql/execution/CacheManager.scala      | 51 +++++++++++++++++++-
 .../spark/sql/execution/SparkSqlParser.scala    |  9 +++-
 .../spark/sql/execution/datasources/ddl.scala   |  9 ++++
 .../apache/spark/sql/internal/CatalogImpl.scala | 10 ++++
 .../datasources/parquet/ParquetQuerySuite.scala | 28 +++++++++++
 .../spark/sql/hive/CachedTableSuite.scala       | 45 +++++++++++++++++
 8 files changed, 158 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a08715c7/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index d102559..044f910 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -113,6 +113,7 @@ statement
     | (DESC | DESCRIBE) option=(EXTENDED | FORMATTED)?
         tableIdentifier partitionSpec? describeColName?                
#describeTable
     | REFRESH TABLE tableIdentifier                                    
#refreshTable
+    | REFRESH .*?                                                      
#refreshResource
     | CACHE LAZY? TABLE identifier (AS? query)?                        
#cacheTable
     | UNCACHE TABLE identifier                                         
#uncacheTable
     | CLEAR CACHE                                                      
#clearCache

http://git-wip-us.apache.org/repos/asf/spark/blob/a08715c7/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 6ddb1a7..083a63c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -226,4 +226,11 @@ abstract class Catalog {
    */
   def refreshTable(tableName: String): Unit
 
+  /**
+   * Invalidate and refresh all the cached data (and the associated metadata) 
for any dataframe that
+   * contains the given data source path.
+   *
+   * @since 2.0.0
+   */
+  def refreshByPath(path: String): Unit
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a08715c7/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index c8bdb0d..b584cf4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -19,10 +19,14 @@ package org.apache.spark.sql.execution
 
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
+import org.apache.hadoop.fs.{FileSystem, Path}
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.columnar.InMemoryRelation
 import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.execution.columnar.InMemoryRelation
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
 
@@ -157,4 +161,49 @@ private[sql] class CacheManager extends Logging {
       case _ =>
     }
   }
+
+  /**
+   * Invalidates the cache of any data that contains `resourcePath` in one or 
more
+   * `HadoopFsRelation` node(s) as part of its logical plan.
+   */
+  private[sql] def invalidateCachedPath(
+      sparkSession: SparkSession, resourcePath: String): Unit = writeLock {
+    val (fs, qualifiedPath) = {
+      val path = new Path(resourcePath)
+      val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
+      (fs, path.makeQualified(fs.getUri, fs.getWorkingDirectory))
+    }
+
+    cachedData.foreach {
+      case data if data.plan.find(lookupAndRefresh(_, fs, 
qualifiedPath)).isDefined =>
+        val dataIndex = cachedData.indexWhere(cd => 
data.plan.sameResult(cd.plan))
+        if (dataIndex >= 0) {
+          data.cachedRepresentation.cachedColumnBuffers.unpersist(blocking = 
true)
+          cachedData.remove(dataIndex)
+        }
+        
sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, 
data.plan))
+      case _ => // Do Nothing
+    }
+  }
+
+  /**
+   * Traverses a given `plan` and searches for the occurrences of 
`qualifiedPath` in the
+   * [[org.apache.spark.sql.execution.datasources.FileCatalog]] of any 
[[HadoopFsRelation]] nodes
+   * in the plan. If found, we refresh the metadata and return true. 
Otherwise, this method returns
+   * false.
+   */
+  private def lookupAndRefresh(plan: LogicalPlan, fs: FileSystem, 
qualifiedPath: Path): Boolean = {
+    plan match {
+      case lr: LogicalRelation => lr.relation match {
+        case hr: HadoopFsRelation =>
+          val invalidate = hr.location.paths
+            .map(_.makeQualified(fs.getUri, fs.getWorkingDirectory))
+            .contains(qualifiedPath)
+          if (invalidate) hr.location.refresh()
+          invalidate
+        case _ => false
+      }
+      case _ => false
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a08715c7/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index dc74222..06d8f15 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -25,7 +25,6 @@ import org.antlr.v4.runtime.tree.TerminalNode
 
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.parser._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
@@ -210,6 +209,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder 
{
   }
 
   /**
+   * Create a [[RefreshTable]] logical plan.
+   */
+  override def visitRefreshResource(ctx: RefreshResourceContext): LogicalPlan 
= withOrigin(ctx) {
+    val resourcePath = remainder(ctx.REFRESH.getSymbol).trim
+    RefreshResource(resourcePath)
+  }
+
+  /**
    * Create a [[CacheTableCommand]] logical plan.
    */
   override def visitCacheTable(ctx: CacheTableContext): LogicalPlan = 
withOrigin(ctx) {

http://git-wip-us.apache.org/repos/asf/spark/blob/a08715c7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index aa42eae..31a2075 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -102,6 +102,15 @@ case class RefreshTable(tableIdent: TableIdentifier)
   }
 }
 
+case class RefreshResource(path: String)
+  extends RunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    sparkSession.catalog.refreshByPath(path)
+    Seq.empty[Row]
+  }
+}
+
 /**
  * Builds a map in which keys are case insensitive
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/a08715c7/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 70e17b1..f42fd17 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -373,6 +373,16 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
     }
   }
 
+  /**
+   * Refresh the cache entry and the associated metadata for all dataframes 
(if any), that contain
+   * the given data source path.
+   *
+   * @group cachemgmt
+   * @since 2.0.0
+   */
+  override def refreshByPath(resourcePath: String): Unit = {
+    sparkSession.sharedState.cacheManager.invalidateCachedPath(sparkSession, 
resourcePath)
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a08715c7/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index ea57f71..b4fd0ef 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -68,6 +68,34 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
with SharedSQLContext
       TableIdentifier("tmp"), ignoreIfNotExists = true)
   }
 
+  test("SPARK-15678: not use cache on overwrite") {
+    withTempDir { dir =>
+      val path = dir.toString
+      spark.range(1000).write.mode("overwrite").parquet(path)
+      val df = spark.read.parquet(path).cache()
+      assert(df.count() == 1000)
+      spark.range(10).write.mode("overwrite").parquet(path)
+      assert(df.count() == 1000)
+      spark.catalog.refreshByPath(path)
+      assert(df.count() == 10)
+      assert(spark.read.parquet(path).count() == 10)
+    }
+  }
+
+  test("SPARK-15678: not use cache on append") {
+    withTempDir { dir =>
+      val path = dir.toString
+      spark.range(1000).write.mode("append").parquet(path)
+      val df = spark.read.parquet(path).cache()
+      assert(df.count() == 1000)
+      spark.range(10).write.mode("append").parquet(path)
+      assert(df.count() == 1000)
+      spark.catalog.refreshByPath(path)
+      assert(df.count() == 1010)
+      assert(spark.read.parquet(path).count() == 1010)
+    }
+  }
+
   test("self-join") {
     // 4 rows, cells of column 1 of row 2 and row 4 are null
     val data = (1 to 4).map { i =>

http://git-wip-us.apache.org/repos/asf/spark/blob/a08715c7/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 52ba90f..5121440 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -206,6 +206,51 @@ class CachedTableSuite extends QueryTest with 
TestHiveSingleton {
     Utils.deleteRecursively(tempPath)
   }
 
+  test("SPARK-15678: REFRESH PATH") {
+    val tempPath: File = Utils.createTempDir()
+    tempPath.delete()
+    table("src").write.mode(SaveMode.Overwrite).parquet(tempPath.toString)
+    sql("DROP TABLE IF EXISTS refreshTable")
+    sparkSession.catalog.createExternalTable("refreshTable", 
tempPath.toString, "parquet")
+    checkAnswer(
+      table("refreshTable"),
+      table("src").collect())
+    // Cache the table.
+    sql("CACHE TABLE refreshTable")
+    assertCached(table("refreshTable"))
+    // Append new data.
+    table("src").write.mode(SaveMode.Append).parquet(tempPath.toString)
+    // We are still using the old data.
+    assertCached(table("refreshTable"))
+    checkAnswer(
+      table("refreshTable"),
+      table("src").collect())
+    // Refresh the table.
+    sql(s"REFRESH ${tempPath.toString}")
+    // We are using the new data.
+    assertCached(table("refreshTable"))
+    checkAnswer(
+      table("refreshTable"),
+      table("src").union(table("src")).collect())
+
+    // Drop the table and create it again.
+    sql("DROP TABLE refreshTable")
+    sparkSession.catalog.createExternalTable("refreshTable", 
tempPath.toString, "parquet")
+    // It is not cached.
+    assert(!isCached("refreshTable"), "refreshTable should not be cached.")
+    // Refresh the table. REFRESH command should not make a uncached
+    // table cached.
+    sql(s"REFRESH ${tempPath.toString}")
+    checkAnswer(
+      table("refreshTable"),
+      table("src").union(table("src")).collect())
+    // It is not cached.
+    assert(!isCached("refreshTable"), "refreshTable should not be cached.")
+
+    sql("DROP TABLE refreshTable")
+    Utils.deleteRecursively(tempPath)
+  }
+
   test("SPARK-11246 cache parquet table") {
     sql("CREATE TABLE cachedTable STORED AS PARQUET AS SELECT 1")
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to