Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21594#discussion_r197312423
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala ---
    @@ -143,9 +153,57 @@ class DatasetCacheSuite extends QueryTest with 
SharedSQLContext with TimeLimits
         df.count()
         df2.cache()
     
    -    val plan = df2.queryExecution.withCachedData
    -    assert(plan.isInstanceOf[InMemoryRelation])
    -    val internalPlan = 
plan.asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan
    -    
assert(internalPlan.find(_.isInstanceOf[InMemoryTableScanExec]).isDefined)
    +    assertCacheDependency(df2)
    +  }
    +
    +  test("SPARK-24596 Non-cascading Cache Invalidation") {
    +    val df = Seq(("a", 1), ("b", 2)).toDF("s", "i")
    +    val df2 = df.filter('i > 1)
    +    val df3 = df.filter('i < 2)
    +
    +    df2.cache()
    +    df.cache()
    +    df.count()
    +    df3.cache()
    +
    +    df.unpersist()
    +
    +    // df un-cached; df2 and df3's cache plan re-compiled
    +    assert(df.storageLevel == StorageLevel.NONE)
    +    assertCacheDependency(df2, 0)
    +    assertCacheDependency(df3, 0)
    +  }
    +
    +  test("SPARK-24596 Non-cascading Cache Invalidation - verify cached data 
reuse") {
    +    val expensiveUDF = udf({ x: Int => Thread.sleep(5000); x })
    +    val df = spark.range(0, 10).toDF("a")
    +    val df1 = df.withColumn("b", expensiveUDF($"a"))
    +    val df2 = df1.groupBy('a).agg(sum('b))
    +    val df3 = df.agg(sum('a))
    +
    +    df1.cache()
    +    df2.cache()
    +    df2.collect()
    +    df3.cache()
    +
    +    assertCacheDependency(df2)
    +
    +    df1.unpersist(blocking = true)
    +
    +    // df1 un-cached; df2's cache plan re-compiled
    +    assert(df1.storageLevel == StorageLevel.NONE)
    +    assertCacheDependency(df1.groupBy('a).agg(sum('b)), 0)
    +
    +    val df4 = df1.groupBy('a).agg(sum('b)).select("sum(b)")
    +    assertCached(df4)
    +    // reuse loaded cache
    +    failAfter(3 seconds) {
    +      df4.collect()
    +    }
    +
    +    val df5 = df.agg(sum('a)).filter($"sum(a)" > 1)
    +    assertCached(df5)
    +    // first time use, load cache
    +    df5.collect()
    --- End diff --
    
    how do we prove this takes more than 5 seconds?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to