[ 
https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17305109#comment-17305109
 ] 

Chao Sun commented on SPARK-34780:
----------------------------------

Thanks for the reporting [~mikechen], the test case you provided is very 
useful. 

I'm not sure, though, how severe is the issue since it only affects 
{{computeStats}}, and when the cache is actually materialized (e.g., via 
{{df2.count()}} after {{df2.cache()}}), the value from {{computeStats}} will be 
different anyways. Could you give more details?

> Cached Table (parquet) with old Configs Used
> --------------------------------------------
>
>                 Key: SPARK-34780
>                 URL: https://issues.apache.org/jira/browse/SPARK-34780
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.4, 3.1.1
>            Reporter: Michael Chen
>            Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark 
> session meaning the SQLConfs are stored. Then if a different dataframe can 
> replace parts of it's logical plan with a cached logical plan, the cached 
> SQLConfs will be used for the evaluation of the cached logical plan. This is 
> because HadoopFsRelation ignores sparkSession for equality checks (introduced 
> in https://issues.apache.org/jira/browse/SPARK-17358).
> {code:java}
> test("cache uses old SQLConf") {
>   import testImplicits._
>   withTempDir { dir =>
>     val tableDir = dir.getAbsoluteFile + "/table"
>     val df = Seq("a").toDF("key")
>     df.write.parquet(tableDir)
>     SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
>     val compression1Stats = spark.read.parquet(tableDir).select("key").
>       queryExecution.optimizedPlan.collect {
>       case l: LogicalRelation => l
>       case m: InMemoryRelation => m
>     }.map(_.computeStats())
>     SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
>     val df2 = spark.read.parquet(tableDir).select("key")
>     df2.cache()
>     val compression10Stats = df2.queryExecution.optimizedPlan.collect {
>       case l: LogicalRelation => l
>       case m: InMemoryRelation => m
>     }.map(_.computeStats())
>     SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
>     val compression1StatsWithCache = 
> spark.read.parquet(tableDir).select("key").
>       queryExecution.optimizedPlan.collect {
>       case l: LogicalRelation => l
>       case m: InMemoryRelation => m
>     }.map(_.computeStats())
>     // I expect these stats to be the same because file compression factor is 
> the same
>     assert(compression1Stats == compression1StatsWithCache)
>     // Instead, we can see the file compression factor is being cached and 
> used along with
>     // the logical plan
>     assert(compression10Stats == compression1StatsWithCache)
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to