[GitHub] spark pull request #21594: [SPARK-24596][SQL] Non-cascading Cache Invalidati...

2018-06-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21594


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21594: [SPARK-24596][SQL] Non-cascading Cache Invalidati...

2018-06-22 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21594#discussion_r197600101
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -105,24 +105,58 @@ class CacheManager extends Logging {
   }
 
   /**
-   * Un-cache all the cache entries that refer to the given plan.
+   * Un-cache the given plan or all the cache entries that refer to the 
given plan.
+   * @param query The [[Dataset]] to be un-cached.
+   * @param cascade   If true, un-cache all the cache entries that refer 
to the given
+   *  [[Dataset]]; otherwise un-cache the given 
[[Dataset]] only.
+   * @param blocking  Whether to block until all blocks are deleted.
*/
-  def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = 
writeLock {
-uncacheQuery(query.sparkSession, query.logicalPlan, blocking)
+  def uncacheQuery(query: Dataset[_],
+  cascade: Boolean, blocking: Boolean = true): Unit = writeLock {
+uncacheQuery(query.sparkSession, query.logicalPlan, cascade, blocking)
   }
 
   /**
-   * Un-cache all the cache entries that refer to the given plan.
+   * Un-cache the given plan or all the cache entries that refer to the 
given plan.
+   * @param spark The Spark session.
+   * @param plan  The plan to be un-cached.
+   * @param cascade   If true, un-cache all the cache entries that refer 
to the given
+   *  plan; otherwise un-cache the given plan only.
+   * @param blocking  Whether to block until all blocks are deleted.
*/
-  def uncacheQuery(spark: SparkSession, plan: LogicalPlan, blocking: 
Boolean): Unit = writeLock {
+  def uncacheQuery(spark: SparkSession, plan: LogicalPlan,
+  cascade: Boolean, blocking: Boolean): Unit = writeLock {
+val shouldRemove: LogicalPlan => Boolean =
+  if (cascade) {
+_.find(_.sameResult(plan)).isDefined
+  } else {
+_.sameResult(plan)
+  }
 val it = cachedData.iterator()
 while (it.hasNext) {
   val cd = it.next()
-  if (cd.plan.find(_.sameResult(plan)).isDefined) {
+  if (shouldRemove(cd.plan)) {
 cd.cachedRepresentation.cacheBuilder.clearCache(blocking)
 it.remove()
   }
 }
+// Re-compile dependent cached queries after removing the cached query.
+if (!cascade) {
+  val it = cachedData.iterator()
+  val needToRecache = 
scala.collection.mutable.ArrayBuffer.empty[CachedData]
+  while (it.hasNext) {
+val cd = it.next()
+if (cd.plan.find(_.sameResult(plan)).isDefined) {
+  it.remove()
+  val plan = 
spark.sessionState.executePlan(AnalysisBarrier(cd.plan)).executedPlan
+  val newCache = InMemoryRelation(
+cacheBuilder = 
cd.cachedRepresentation.cacheBuilder.withCachedPlan(plan),
+logicalPlan = cd.plan)
+  needToRecache += cd.copy(cachedRepresentation = newCache)
+}
+  }
+  needToRecache.foreach(cachedData.add)
--- End diff --

It's almost the same logic as "recache", except that it tries to reuse the 
cached buffer here. It would be nice to integrate these two, but it would look 
so clean given the inconvenience of copying a CacheBuilder. I'll try though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21594: [SPARK-24596][SQL] Non-cascading Cache Invalidati...

2018-06-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21594#discussion_r197591957
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala ---
@@ -490,7 +494,7 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
 // cached version and make the new version cached lazily.
 if (isCached(table)) {
   // Uncache the logicalPlan.
-  sparkSession.sharedState.cacheManager.uncacheQuery(table, blocking = 
true)
+  sparkSession.sharedState.cacheManager.uncacheQuery(table, true, 
blocking = true)
--- End diff --

the same here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21594: [SPARK-24596][SQL] Non-cascading Cache Invalidati...

2018-06-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21594#discussion_r197591943
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala ---
@@ -438,7 +440,9 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
* @since 2.0.0
*/
   override def uncacheTable(tableName: String): Unit = {
-
sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName))
+val tableIdent = 
sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+sparkSession.sharedState.cacheManager.uncacheQuery(
+  sparkSession.table(tableName), 
!sessionCatalog.isTemporaryTable(tableIdent))
--- End diff --

val cascade = !sessionCatalog.isTemporaryTable(tableIdent)

...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21594: [SPARK-24596][SQL] Non-cascading Cache Invalidati...

2018-06-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21594#discussion_r197591886
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -204,9 +205,10 @@ case class DropTableCommand(
   }
 }
 
-if (catalog.isTemporaryTable(tableName) || 
catalog.tableExists(tableName)) {
+if (isTempTable || catalog.tableExists(tableName)) {
   try {
-
sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName))
+sparkSession.sharedState.cacheManager.uncacheQuery(
+  sparkSession.table(tableName), !isTempTable)
--- End diff --

`cascade = !isTempTable`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21594: [SPARK-24596][SQL] Non-cascading Cache Invalidati...

2018-06-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21594#discussion_r197591842
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -189,8 +189,9 @@ case class DropTableCommand(
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 val catalog = sparkSession.sessionState.catalog
+val isTempTable = catalog.isTemporaryTable(tableName)
--- End diff --

rename it to `isTempView`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21594: [SPARK-24596][SQL] Non-cascading Cache Invalidati...

2018-06-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21594#discussion_r197591661
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2971,7 +2971,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
   def unpersist(blocking: Boolean): this.type = {
-sparkSession.sharedState.cacheManager.uncacheQuery(this, blocking)
+sparkSession.sharedState.cacheManager.uncacheQuery(this, cascade = 
false, blocking)
--- End diff --

Also update the comment of line 2966 and line 2979 and explain the new 
behavior


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21594: [SPARK-24596][SQL] Non-cascading Cache Invalidati...

2018-06-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21594#discussion_r197586914
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---
@@ -493,7 +493,7 @@ case class TruncateTableCommand(
 spark.sessionState.refreshTable(tableName.unquotedString)
 // Also try to drop the contents of the table from the columnar cache
 try {
-  
spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier))
+  
spark.sharedState.cacheManager.uncacheQuery(spark.table(table.identifier), true)
--- End diff --

named argument. `cascade = true`




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21594: [SPARK-24596][SQL] Non-cascading Cache Invalidati...

2018-06-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21594#discussion_r197586970
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -105,24 +105,58 @@ class CacheManager extends Logging {
   }
 
   /**
-   * Un-cache all the cache entries that refer to the given plan.
+   * Un-cache the given plan or all the cache entries that refer to the 
given plan.
+   * @param query The [[Dataset]] to be un-cached.
+   * @param cascade   If true, un-cache all the cache entries that refer 
to the given
+   *  [[Dataset]]; otherwise un-cache the given 
[[Dataset]] only.
+   * @param blocking  Whether to block until all blocks are deleted.
*/
-  def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = 
writeLock {
-uncacheQuery(query.sparkSession, query.logicalPlan, blocking)
+  def uncacheQuery(query: Dataset[_],
+  cascade: Boolean, blocking: Boolean = true): Unit = writeLock {
+uncacheQuery(query.sparkSession, query.logicalPlan, cascade, blocking)
   }
 
   /**
-   * Un-cache all the cache entries that refer to the given plan.
+   * Un-cache the given plan or all the cache entries that refer to the 
given plan.
+   * @param spark The Spark session.
+   * @param plan  The plan to be un-cached.
+   * @param cascade   If true, un-cache all the cache entries that refer 
to the given
+   *  plan; otherwise un-cache the given plan only.
+   * @param blocking  Whether to block until all blocks are deleted.
*/
-  def uncacheQuery(spark: SparkSession, plan: LogicalPlan, blocking: 
Boolean): Unit = writeLock {
+  def uncacheQuery(spark: SparkSession, plan: LogicalPlan,
+  cascade: Boolean, blocking: Boolean): Unit = writeLock {
+val shouldRemove: LogicalPlan => Boolean =
+  if (cascade) {
+_.find(_.sameResult(plan)).isDefined
+  } else {
+_.sameResult(plan)
+  }
 val it = cachedData.iterator()
 while (it.hasNext) {
   val cd = it.next()
-  if (cd.plan.find(_.sameResult(plan)).isDefined) {
+  if (shouldRemove(cd.plan)) {
 cd.cachedRepresentation.cacheBuilder.clearCache(blocking)
 it.remove()
   }
 }
+// Re-compile dependent cached queries after removing the cached query.
+if (!cascade) {
+  val it = cachedData.iterator()
+  val needToRecache = 
scala.collection.mutable.ArrayBuffer.empty[CachedData]
+  while (it.hasNext) {
+val cd = it.next()
+if (cd.plan.find(_.sameResult(plan)).isDefined) {
+  it.remove()
+  val plan = 
spark.sessionState.executePlan(AnalysisBarrier(cd.plan)).executedPlan
+  val newCache = InMemoryRelation(
+cacheBuilder = 
cd.cachedRepresentation.cacheBuilder.withCachedPlan(plan),
+logicalPlan = cd.plan)
+  needToRecache += cd.copy(cachedRepresentation = newCache)
+}
+  }
+  needToRecache.foreach(cachedData.add)
--- End diff --

create a private function from line 144 and line 158?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21594: [SPARK-24596][SQL] Non-cascading Cache Invalidati...

2018-06-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21594#discussion_r197586499
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -105,24 +105,58 @@ class CacheManager extends Logging {
   }
 
   /**
-   * Un-cache all the cache entries that refer to the given plan.
+   * Un-cache the given plan or all the cache entries that refer to the 
given plan.
+   * @param query The [[Dataset]] to be un-cached.
+   * @param cascade   If true, un-cache all the cache entries that refer 
to the given
+   *  [[Dataset]]; otherwise un-cache the given 
[[Dataset]] only.
+   * @param blocking  Whether to block until all blocks are deleted.
*/
-  def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = 
writeLock {
-uncacheQuery(query.sparkSession, query.logicalPlan, blocking)
+  def uncacheQuery(query: Dataset[_],
+  cascade: Boolean, blocking: Boolean = true): Unit = writeLock {
--- End diff --

indent 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21594: [SPARK-24596][SQL] Non-cascading Cache Invalidati...

2018-06-22 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21594#discussion_r197289750
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -105,24 +105,58 @@ class CacheManager extends Logging {
   }
 
   /**
-   * Un-cache all the cache entries that refer to the given plan.
+   * Un-cache the given plan or all the cache entries that refer to the 
given plan.
+   * @param query The [[Dataset]] to be un-cached.
+   * @param cascade   If true, un-cache all the cache entries that refer 
to the given
+   *  [[Dataset]]; otherwise un-cache the given 
[[Dataset]] only.
+   * @param blocking  Whether to block until all blocks are deleted.
*/
-  def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = 
writeLock {
-uncacheQuery(query.sparkSession, query.logicalPlan, blocking)
+  def uncacheQuery(query: Dataset[_],
+  cascade: Boolean, blocking: Boolean = true): Unit = writeLock {
+uncacheQuery(query.sparkSession, query.logicalPlan, cascade, blocking)
   }
 
   /**
-   * Un-cache all the cache entries that refer to the given plan.
+   * Un-cache the given plan or all the cache entries that refer to the 
given plan.
+   * @param spark The Spark session.
+   * @param plan  The plan to be un-cached.
+   * @param cascade   If true, un-cache all the cache entries that refer 
to the given
+   *  plan; otherwise un-cache the given plan only.
+   * @param blocking  Whether to block until all blocks are deleted.
*/
-  def uncacheQuery(spark: SparkSession, plan: LogicalPlan, blocking: 
Boolean): Unit = writeLock {
+  def uncacheQuery(spark: SparkSession, plan: LogicalPlan,
+  cascade: Boolean, blocking: Boolean): Unit = writeLock {
--- End diff --

indent. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21594: [SPARK-24596][SQL] Non-cascading Cache Invalidati...

2018-06-21 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21594#discussion_r197314689
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---
@@ -801,4 +800,67 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
 }
 assert(cachedData.collect === Seq(1001))
   }
+
+  test("SPARK-24596 Non-cascading Cache Invalidation - uncache temporary 
view") {
+withView("t1", "t2") {
--- End diff --

Yes.. good catch! A mistake caused by copy-paste.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21594: [SPARK-24596][SQL] Non-cascading Cache Invalidati...

2018-06-21 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21594#discussion_r197314556
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala ---
@@ -143,9 +153,57 @@ class DatasetCacheSuite extends QueryTest with 
SharedSQLContext with TimeLimits
 df.count()
 df2.cache()
 
-val plan = df2.queryExecution.withCachedData
-assert(plan.isInstanceOf[InMemoryRelation])
-val internalPlan = 
plan.asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan
-
assert(internalPlan.find(_.isInstanceOf[InMemoryTableScanExec]).isDefined)
+assertCacheDependency(df2)
+  }
+
+  test("SPARK-24596 Non-cascading Cache Invalidation") {
+val df = Seq(("a", 1), ("b", 2)).toDF("s", "i")
+val df2 = df.filter('i > 1)
+val df3 = df.filter('i < 2)
+
+df2.cache()
+df.cache()
+df.count()
+df3.cache()
+
+df.unpersist()
+
+// df un-cached; df2 and df3's cache plan re-compiled
+assert(df.storageLevel == StorageLevel.NONE)
+assertCacheDependency(df2, 0)
+assertCacheDependency(df3, 0)
+  }
+
+  test("SPARK-24596 Non-cascading Cache Invalidation - verify cached data 
reuse") {
+val expensiveUDF = udf({ x: Int => Thread.sleep(5000); x })
+val df = spark.range(0, 10).toDF("a")
+val df1 = df.withColumn("b", expensiveUDF($"a"))
+val df2 = df1.groupBy('a).agg(sum('b))
+val df3 = df.agg(sum('a))
+
+df1.cache()
+df2.cache()
+df2.collect()
+df3.cache()
+
+assertCacheDependency(df2)
+
+df1.unpersist(blocking = true)
+
+// df1 un-cached; df2's cache plan re-compiled
+assert(df1.storageLevel == StorageLevel.NONE)
+assertCacheDependency(df1.groupBy('a).agg(sum('b)), 0)
+
+val df4 = df1.groupBy('a).agg(sum('b)).select("sum(b)")
+assertCached(df4)
+// reuse loaded cache
+failAfter(3 seconds) {
+  df4.collect()
+}
+
+val df5 = df.agg(sum('a)).filter($"sum(a)" > 1)
+assertCached(df5)
+// first time use, load cache
+df5.collect()
--- End diff --

We just need to prove the new InMemoryRelation works alright for building 
cache (since the plan has been re-compiled) ... maybe we should check result 
though. Plus, I deliberately made this dataframe not dependent on the UDF so it 
can finish quickly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21594: [SPARK-24596][SQL] Non-cascading Cache Invalidati...

2018-06-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21594#discussion_r197311829
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---
@@ -801,4 +800,67 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
 }
 assert(cachedData.collect === Seq(1001))
   }
+
+  test("SPARK-24596 Non-cascading Cache Invalidation - uncache temporary 
view") {
+withView("t1", "t2") {
--- End diff --

`withTempView`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21594: [SPARK-24596][SQL] Non-cascading Cache Invalidati...

2018-06-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21594#discussion_r197312423
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala ---
@@ -143,9 +153,57 @@ class DatasetCacheSuite extends QueryTest with 
SharedSQLContext with TimeLimits
 df.count()
 df2.cache()
 
-val plan = df2.queryExecution.withCachedData
-assert(plan.isInstanceOf[InMemoryRelation])
-val internalPlan = 
plan.asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan
-
assert(internalPlan.find(_.isInstanceOf[InMemoryTableScanExec]).isDefined)
+assertCacheDependency(df2)
+  }
+
+  test("SPARK-24596 Non-cascading Cache Invalidation") {
+val df = Seq(("a", 1), ("b", 2)).toDF("s", "i")
+val df2 = df.filter('i > 1)
+val df3 = df.filter('i < 2)
+
+df2.cache()
+df.cache()
+df.count()
+df3.cache()
+
+df.unpersist()
+
+// df un-cached; df2 and df3's cache plan re-compiled
+assert(df.storageLevel == StorageLevel.NONE)
+assertCacheDependency(df2, 0)
+assertCacheDependency(df3, 0)
+  }
+
+  test("SPARK-24596 Non-cascading Cache Invalidation - verify cached data 
reuse") {
+val expensiveUDF = udf({ x: Int => Thread.sleep(5000); x })
+val df = spark.range(0, 10).toDF("a")
+val df1 = df.withColumn("b", expensiveUDF($"a"))
+val df2 = df1.groupBy('a).agg(sum('b))
+val df3 = df.agg(sum('a))
+
+df1.cache()
+df2.cache()
+df2.collect()
+df3.cache()
+
+assertCacheDependency(df2)
+
+df1.unpersist(blocking = true)
+
+// df1 un-cached; df2's cache plan re-compiled
+assert(df1.storageLevel == StorageLevel.NONE)
+assertCacheDependency(df1.groupBy('a).agg(sum('b)), 0)
+
+val df4 = df1.groupBy('a).agg(sum('b)).select("sum(b)")
+assertCached(df4)
+// reuse loaded cache
+failAfter(3 seconds) {
+  df4.collect()
+}
+
+val df5 = df.agg(sum('a)).filter($"sum(a)" > 1)
+assertCached(df5)
+// first time use, load cache
+df5.collect()
--- End diff --

how do we prove this takes more than 5 seconds?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21594: [SPARK-24596][SQL] Non-cascading Cache Invalidati...

2018-06-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21594#discussion_r197311907
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---
@@ -801,4 +800,67 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
 }
 assert(cachedData.collect === Seq(1001))
   }
+
+  test("SPARK-24596 Non-cascading Cache Invalidation - uncache temporary 
view") {
+withView("t1", "t2") {
+  sql("CACHE TABLE t1 AS SELECT * FROM testData WHERE key > 1")
+  sql("CACHE TABLE t2 as SELECT * FROM t1 WHERE value > 1")
+
+  assert(spark.catalog.isCached("t1"))
+  assert(spark.catalog.isCached("t2"))
+  sql("UNCACHE TABLE t1")
+  assert(!spark.catalog.isCached("t1"))
+  assert(spark.catalog.isCached("t2"))
+}
+  }
+
+  test("SPARK-24596 Non-cascading Cache Invalidation - drop temporary 
view") {
+withView("t1", "t2") {
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21594: [SPARK-24596][SQL] Non-cascading Cache Invalidati...

2018-06-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21594#discussion_r197247925
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -105,24 +105,58 @@ class CacheManager extends Logging {
   }
 
   /**
-   * Un-cache all the cache entries that refer to the given plan.
+   * Un-cache the given plan or all the cache entries that refer to the 
given plan.
+   * @param query The [[Dataset]] to be un-cached.
+   * @param cascade   If true, un-cache all the cache entries that refer 
to the given
+   *  [[Dataset]]; otherwise un-cache the given 
[[Dataset]] only.
+   * @param blocking  Whether to block until all blocks are deleted.
*/
-  def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = 
writeLock {
-uncacheQuery(query.sparkSession, query.logicalPlan, blocking)
+  def uncacheQuery(query: Dataset[_],
+cascade: Boolean, blocking: Boolean = true): Unit = writeLock {
+uncacheQuery(query.sparkSession, query.logicalPlan, cascade, blocking)
   }
 
   /**
-   * Un-cache all the cache entries that refer to the given plan.
+   * Un-cache the given plan or all the cache entries that refer to the 
given plan.
+   * @param spark The Spark session.
+   * @param plan  The plan to be un-cached.
+   * @param cascade   If true, un-cache all the cache entries that refer 
to the given
+   *  plan; otherwise un-cache the given plan only.
+   * @param blocking  Whether to block until all blocks are deleted.
*/
-  def uncacheQuery(spark: SparkSession, plan: LogicalPlan, blocking: 
Boolean): Unit = writeLock {
+  def uncacheQuery(spark: SparkSession, plan: LogicalPlan,
+cascade: Boolean, blocking: Boolean): Unit = writeLock {
+val condition: LogicalPlan => Boolean =
--- End diff --

`condition` -> `shouldUnCache`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21594: [SPARK-24596][SQL] Non-cascading Cache Invalidati...

2018-06-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21594#discussion_r197247763
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -105,24 +105,58 @@ class CacheManager extends Logging {
   }
 
   /**
-   * Un-cache all the cache entries that refer to the given plan.
+   * Un-cache the given plan or all the cache entries that refer to the 
given plan.
+   * @param query The [[Dataset]] to be un-cached.
+   * @param cascade   If true, un-cache all the cache entries that refer 
to the given
+   *  [[Dataset]]; otherwise un-cache the given 
[[Dataset]] only.
+   * @param blocking  Whether to block until all blocks are deleted.
*/
-  def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = 
writeLock {
-uncacheQuery(query.sparkSession, query.logicalPlan, blocking)
+  def uncacheQuery(query: Dataset[_],
+cascade: Boolean, blocking: Boolean = true): Unit = writeLock {
+uncacheQuery(query.sparkSession, query.logicalPlan, cascade, blocking)
   }
 
   /**
-   * Un-cache all the cache entries that refer to the given plan.
+   * Un-cache the given plan or all the cache entries that refer to the 
given plan.
+   * @param spark The Spark session.
+   * @param plan  The plan to be un-cached.
+   * @param cascade   If true, un-cache all the cache entries that refer 
to the given
+   *  plan; otherwise un-cache the given plan only.
+   * @param blocking  Whether to block until all blocks are deleted.
*/
-  def uncacheQuery(spark: SparkSession, plan: LogicalPlan, blocking: 
Boolean): Unit = writeLock {
+  def uncacheQuery(spark: SparkSession, plan: LogicalPlan,
+cascade: Boolean, blocking: Boolean): Unit = writeLock {
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21594: [SPARK-24596][SQL] Non-cascading Cache Invalidati...

2018-06-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21594#discussion_r197247440
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -2971,7 +2971,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
   def unpersist(blocking: Boolean): this.type = {
-sparkSession.sharedState.cacheManager.uncacheQuery(this, blocking)
+sparkSession.sharedState.cacheManager.uncacheQuery(this, false, 
blocking)
--- End diff --

nit: it's clearer to write `cascade =false`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21594: [SPARK-24596][SQL] Non-cascading Cache Invalidati...

2018-06-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21594#discussion_r197247632
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -105,24 +105,58 @@ class CacheManager extends Logging {
   }
 
   /**
-   * Un-cache all the cache entries that refer to the given plan.
+   * Un-cache the given plan or all the cache entries that refer to the 
given plan.
+   * @param query The [[Dataset]] to be un-cached.
+   * @param cascade   If true, un-cache all the cache entries that refer 
to the given
+   *  [[Dataset]]; otherwise un-cache the given 
[[Dataset]] only.
+   * @param blocking  Whether to block until all blocks are deleted.
*/
-  def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = 
writeLock {
-uncacheQuery(query.sparkSession, query.logicalPlan, blocking)
+  def uncacheQuery(query: Dataset[_],
+cascade: Boolean, blocking: Boolean = true): Unit = writeLock {
--- End diff --

nit
```
def f(
param1: X,
param2: Y)
```
4 space indentation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21594: [SPARK-24596][SQL] Non-cascading Cache Invalidati...

2018-06-20 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21594#discussion_r196899266
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -107,22 +107,35 @@ class CacheManager extends Logging {
   /**
* Un-cache all the cache entries that refer to the given plan.
*/
-  def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = 
writeLock {
-uncacheQuery(query.sparkSession, query.logicalPlan, blocking)
+  def uncacheQuery(query: Dataset[_],
+cascade: Boolean, blocking: Boolean = true): Unit = writeLock {
+uncacheQuery(query.sparkSession, query.logicalPlan, cascade, blocking)
   }
 
   /**
* Un-cache all the cache entries that refer to the given plan.
*/
-  def uncacheQuery(spark: SparkSession, plan: LogicalPlan, blocking: 
Boolean): Unit = writeLock {
+  def uncacheQuery(spark: SparkSession, plan: LogicalPlan,
+cascade: Boolean, blocking: Boolean): Unit = writeLock {
 val it = cachedData.iterator()
+val needToRecache = 
scala.collection.mutable.ArrayBuffer.empty[CachedData]
 while (it.hasNext) {
   val cd = it.next()
   if (cd.plan.find(_.sameResult(plan)).isDefined) {
-cd.cachedRepresentation.cacheBuilder.clearCache(blocking)
 it.remove()
+if (cascade || cd.plan.sameResult(plan)) {
+  cd.cachedRepresentation.cacheBuilder.clearCache(blocking)
+} else {
+  val plan = spark.sessionState.executePlan(cd.plan).executedPlan
+  val newCache = InMemoryRelation(
--- End diff --

Yes, you are right, although it wouldn't lead to any error just like all 
other compiled dataframes that refer to this old InMemoryRelation. I'll change 
this piece of code. But you've brought out another interesting question:
A scenario similar to what you've mentioned:
```df1 = ...
df2 = df1.filter(...)
df2.cache()
df1.cache()
df1.collect()
```
, which means we cache the dependent cache first and the cache being 
depended upon next. Optimally when you do df2.collect(), you would like df2 to 
use the cached data in df1, but it doesn't work like this now since df2's 
execution plan has already been generated before we call df1.cache(). It might 
be worth revisiting the caches and update their plans if necessary when we call 
cacheQuery()


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21594: [SPARK-24596][SQL] Non-cascading Cache Invalidati...

2018-06-20 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21594#discussion_r196799526
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -107,22 +107,35 @@ class CacheManager extends Logging {
   /**
* Un-cache all the cache entries that refer to the given plan.
*/
-  def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = 
writeLock {
-uncacheQuery(query.sparkSession, query.logicalPlan, blocking)
+  def uncacheQuery(query: Dataset[_],
+cascade: Boolean, blocking: Boolean = true): Unit = writeLock {
+uncacheQuery(query.sparkSession, query.logicalPlan, cascade, blocking)
   }
 
   /**
* Un-cache all the cache entries that refer to the given plan.
*/
-  def uncacheQuery(spark: SparkSession, plan: LogicalPlan, blocking: 
Boolean): Unit = writeLock {
+  def uncacheQuery(spark: SparkSession, plan: LogicalPlan,
+cascade: Boolean, blocking: Boolean): Unit = writeLock {
 val it = cachedData.iterator()
+val needToRecache = 
scala.collection.mutable.ArrayBuffer.empty[CachedData]
 while (it.hasNext) {
   val cd = it.next()
   if (cd.plan.find(_.sameResult(plan)).isDefined) {
-cd.cachedRepresentation.cacheBuilder.clearCache(blocking)
 it.remove()
+if (cascade || cd.plan.sameResult(plan)) {
+  cd.cachedRepresentation.cacheBuilder.clearCache(blocking)
+} else {
+  val plan = spark.sessionState.executePlan(cd.plan).executedPlan
+  val newCache = InMemoryRelation(
--- End diff --

hmm, if the plan to uncache is iterated after a plan containing it, doesn't 
this still use its cached plan?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21594: [SPARK-24596][SQL] Non-cascading Cache Invalidati...

2018-06-20 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21594#discussion_r196795998
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---
@@ -107,22 +107,35 @@ class CacheManager extends Logging {
   /**
* Un-cache all the cache entries that refer to the given plan.
*/
-  def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = 
writeLock {
-uncacheQuery(query.sparkSession, query.logicalPlan, blocking)
+  def uncacheQuery(query: Dataset[_],
+cascade: Boolean, blocking: Boolean = true): Unit = writeLock {
+uncacheQuery(query.sparkSession, query.logicalPlan, cascade, blocking)
   }
 
   /**
* Un-cache all the cache entries that refer to the given plan.
--- End diff --

We should update this document.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21594: [SPARK-24596][SQL] Non-cascading Cache Invalidati...

2018-06-19 Thread maryannxue
GitHub user maryannxue opened a pull request:

https://github.com/apache/spark/pull/21594

[SPARK-24596][SQL] Non-cascading Cache Invalidation

## What changes were proposed in this pull request?

1. Add parameter 'cascade' in CacheManager.uncacheQuery(). Under 
'cascade=false' mode, only invalidate the current cache, and for other 
dependent caches, rebuild execution plan and reuse cached buffer.
2. Pass true/false from callers in different uncache scenarios:
- Drop tables and regular (persistent) views: regular mode
- Drop temporary views: non-cascading mode
- Modify table contents (INSERT/UPDATE/MERGE/DELETE): regular mode
- Call DataSet.unpersist(): non-cascading mode

Note that a regular (persistent) view is a database object just like a 
table, so after dropping a regular view (whether cached or not cached), any 
query referring to that view should no long be valid. Hence if a cached 
persistent view is dropped, we need to invalidate the all dependent caches so 
that exceptions will be thrown for any later reference. On the other hand, a 
temporary view is in fact equivalent to an unnamed DataSet, and dropping a 
temporary view should have no impact on queries referencing that view. Thus we 
should do non-cascading uncaching for temporary views, which also guarantees a 
consistent uncaching behavior between temporary views and unnamed DataSets.

## How was this patch tested?

New tests in CachedTableSuite and DatasetCacheSuite.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/maryannxue/spark noncascading-cache

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21594.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21594


commit 27e484b97ec5f9fdbfdaa5c8c1d9f45233cbbdbe
Author: Maryann Xue 
Date:   2018-06-19T04:32:11Z

noncascading cache

commit 483008c577c0ec7335b0a9a1c567f60311bb83a6
Author: Maryann Xue 
Date:   2018-06-19T18:18:06Z

code refine

commit a782aacd5d4943b8bbfadde27a9c9e9d30c223fe
Author: Maryann Xue 
Date:   2018-06-19T18:24:57Z

Merge remote-tracking branch 'origin/master' into noncascading-cache

commit 0cd8dc10eb85b6df1704e13084f53f9cefe410b3
Author: Maryann Xue 
Date:   2018-06-19T21:36:29Z

refine test cases

commit 71b93ed598833d760955e972894685c089af297b
Author: Maryann Xue 
Date:   2018-06-19T22:19:05Z

refine test cases




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org