[ 
https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17308811#comment-17308811
 ] 

Michael Chen commented on SPARK-34780:
--------------------------------------

Np [~csun]. If the cache isn't materialized until after the configs change, 
then I believe the input RDDs for InMemoryTableScanExec are still built with 
the old stale confs so the stale confs would also be used in the 
DataSourceScanExec? Even if the cache was materialized before the configs 
changed, reading an RDD that was created with a stale conf would be a concern 
if any of the confs can change results right? (not sure if this is possible)

> Cached Table (parquet) with old Configs Used
> --------------------------------------------
>
>                 Key: SPARK-34780
>                 URL: https://issues.apache.org/jira/browse/SPARK-34780
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.4, 3.1.1
>            Reporter: Michael Chen
>            Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark 
> session meaning the SQLConfs are stored. Then if a different dataframe can 
> replace parts of it's logical plan with a cached logical plan, the cached 
> SQLConfs will be used for the evaluation of the cached logical plan. This is 
> because HadoopFsRelation ignores sparkSession for equality checks (introduced 
> in https://issues.apache.org/jira/browse/SPARK-17358).
> {code:java}
> test("cache uses old SQLConf") {
>   import testImplicits._
>   withTempDir { dir =>
>     val tableDir = dir.getAbsoluteFile + "/table"
>     val df = Seq("a").toDF("key")
>     df.write.parquet(tableDir)
>     SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
>     val compression1Stats = spark.read.parquet(tableDir).select("key").
>       queryExecution.optimizedPlan.collect {
>       case l: LogicalRelation => l
>       case m: InMemoryRelation => m
>     }.map(_.computeStats())
>     SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
>     val df2 = spark.read.parquet(tableDir).select("key")
>     df2.cache()
>     val compression10Stats = df2.queryExecution.optimizedPlan.collect {
>       case l: LogicalRelation => l
>       case m: InMemoryRelation => m
>     }.map(_.computeStats())
>     SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
>     val compression1StatsWithCache = 
> spark.read.parquet(tableDir).select("key").
>       queryExecution.optimizedPlan.collect {
>       case l: LogicalRelation => l
>       case m: InMemoryRelation => m
>     }.map(_.computeStats())
>     // I expect these stats to be the same because file compression factor is 
> the same
>     assert(compression1Stats == compression1StatsWithCache)
>     // Instead, we can see the file compression factor is being cached and 
> used along with
>     // the logical plan
>     assert(compression10Stats == compression1StatsWithCache)
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to