[ 
https://issues.apache.org/jira/browse/SPARK-15678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15877735#comment-15877735
 ] 

Gen TANG commented on SPARK-15678:
----------------------------------

Hi, All 

I seems that refreshByPath(_path_) should be called n times if there are n 
cache operation on dataFrame from the _path_

{code:title=not work code}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession

def f(path: String, spark: SparkSession): DataFrame = {
  spark.catalog.refreshByPath(path)
  val data = spark.read.option("mergeSchema", "true").parquet(path)
  println(data.count)  // always correct
  val df = data.filter("id>10")
  df.cache
  println(df.count) // always correct
  val df1 = df.filter("id>11")
  df1.cache
  println(df1.count)
  df1
}

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
f(dir, spark).count // output 88 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
f(dir, spark).count // output 88 which is incorrect
{code}

{code:title=work code}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession

def f(path: String, spark: SparkSession): DataFrame = {
  spark.catalog.refreshByPath(path)
  spark.catalog.refreshByPath(path)
  val data = spark.read.option("mergeSchema", "true").parquet(path)
  println(data.count)
  val df = data.filter("id>10")
  df.cache
  println(df.count)
  val df1 = df.filter("id>11")
  df1.cache
  println(df1.count)
  df1
}

val dir = "/tmp/test"
spark.range(100).write.mode("overwrite").parquet(dir)
f(dir, spark).count // output 88 which is correct

spark.range(1000).write.mode("overwrite").parquet(dir)
f(dir, spark).count // output 988 which is incorrect
{code}


> Not use cache on appends and overwrites
> ---------------------------------------
>
>                 Key: SPARK-15678
>                 URL: https://issues.apache.org/jira/browse/SPARK-15678
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 2.0.0
>            Reporter: Sameer Agarwal
>            Assignee: Sameer Agarwal
>             Fix For: 2.0.0
>
>
> SparkSQL currently doesn't drop caches if the underlying data is overwritten.
> {code}
> val dir = "/tmp/test"
> sqlContext.range(1000).write.mode("overwrite").parquet(dir)
> val df = sqlContext.read.parquet(dir).cache()
> df.count() // outputs 1000
> sqlContext.range(10).write.mode("overwrite").parquet(dir)
> sqlContext.read.parquet(dir).count() // outputs 1000 instead of 10 <---- We 
> are still using the cached dataset
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to