[ https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Michael Chen updated SPARK-34780: --------------------------------- Description: When a dataframe is cached, the logical plan can contain copies of the spark session meaning the SQLConfs are stored. Then if a different dataframe can replace parts of it's logical plan with a cached logical plan, the cached SQLConfs will be used for the evaluation of the cached logical plan. This is because HadoopFsRelation ignores sparkSession for equality checks (introduced in https://issues.apache.org/jira/browse/SPARK-17358). I suspect this also happens in other versions of Spark but haven't tested yet. {code:java} test("cache uses old SQLConf") { import testImplicits._ withTempDir { dir => val tableDir = dir.getAbsoluteFile + "/table" val df = Seq("a").toDF("key") df.write.parquet(tableDir) SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") val compression1Stats = spark.read.parquet(tableDir).select("key"). queryExecution.optimizedPlan.collect { case l: LogicalRelation => l case m: InMemoryRelation => m }.map(_.computeStats()) SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10") val df2 = spark.read.parquet(tableDir).select("key") df2.cache() val compression10Stats = df2.queryExecution.optimizedPlan.collect { case l: LogicalRelation => l case m: InMemoryRelation => m }.map(_.computeStats()) SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") val compression1StatsWithCache = spark.read.parquet(tableDir).select("key"). queryExecution.optimizedPlan.collect { case l: LogicalRelation => l case m: InMemoryRelation => m }.map(_.computeStats()) // I expect these stats to be the same because file compression factor is the same assert(compression1Stats == compression1StatsWithCache) // Instead, we can see the file compression factor is being cached and used along with // the logical plan assert(compression10Stats == compression1StatsWithCache) } }{code} was: When a dataframe is cached, the logical plan can contain copies of the spark session meaning the SQLConfs are stored. Then if a different dataframe can replace parts of it's logical plan with a cached logical plan, the cached SQLConfs will be used for the evaluation of the cached logical plan. This is because HadoopFsRelation ignores sparkSession for equality checks (introduced in https://issues.apache.org/jira/browse/SPARK-17358). {code:java} test("cache uses old SQLConf") { import testImplicits._ withTempDir { dir => val tableDir = dir.getAbsoluteFile + "/table" val df = Seq("a").toDF("key") df.write.parquet(tableDir) SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") val compression1Stats = spark.read.parquet(tableDir).select("key"). queryExecution.optimizedPlan.collect { case l: LogicalRelation => l case m: InMemoryRelation => m }.map(_.computeStats()) SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10") val df2 = spark.read.parquet(tableDir).select("key") df2.cache() val compression10Stats = df2.queryExecution.optimizedPlan.collect { case l: LogicalRelation => l case m: InMemoryRelation => m }.map(_.computeStats()) SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") val compression1StatsWithCache = spark.read.parquet(tableDir).select("key"). queryExecution.optimizedPlan.collect { case l: LogicalRelation => l case m: InMemoryRelation => m }.map(_.computeStats()) // I expect these stats to be the same because file compression factor is the same assert(compression1Stats == compression1StatsWithCache) // Instead, we can see the file compression factor is being cached and used along with // the logical plan assert(compression10Stats == compression1StatsWithCache) } }{code} > Cached Table (parquet) with old Configs Used > -------------------------------------------- > > Key: SPARK-34780 > URL: https://issues.apache.org/jira/browse/SPARK-34780 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.4.4 > Reporter: Michael Chen > Priority: Major > > When a dataframe is cached, the logical plan can contain copies of the spark > session meaning the SQLConfs are stored. Then if a different dataframe can > replace parts of it's logical plan with a cached logical plan, the cached > SQLConfs will be used for the evaluation of the cached logical plan. This is > because HadoopFsRelation ignores sparkSession for equality checks (introduced > in https://issues.apache.org/jira/browse/SPARK-17358). I suspect this also > happens in other versions of Spark but haven't tested yet. > {code:java} > test("cache uses old SQLConf") { > import testImplicits._ > withTempDir { dir => > val tableDir = dir.getAbsoluteFile + "/table" > val df = Seq("a").toDF("key") > df.write.parquet(tableDir) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") > val compression1Stats = spark.read.parquet(tableDir).select("key"). > queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10") > val df2 = spark.read.parquet(tableDir).select("key") > df2.cache() > val compression10Stats = df2.queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") > val compression1StatsWithCache = > spark.read.parquet(tableDir).select("key"). > queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > // I expect these stats to be the same because file compression factor is > the same > assert(compression1Stats == compression1StatsWithCache) > // Instead, we can see the file compression factor is being cached and > used along with > // the logical plan > assert(compression10Stats == compression1StatsWithCache) > } > }{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org