[ https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17311008#comment-17311008 ]
Michael Chen commented on SPARK-34780: -------------------------------------- Hey [~csun]. I did find a conf that can cause a diff in correctness, but I'm not sure how serious it is (I didn't find other confs affecting correctness, but also didn't look too hard/do people flip this conf?) I added a test case that shows the problem though. {code:java} test("cache uses old SQLConf") { SQLConf.get.setConfString(SQLConf.CSV_PARSER_COLUMN_PRUNING.key, "true") val carsFile = "test-data/cars.csv" val cars = spark.read .format("csv") .option("multiLine", false) .options(Map("header" -> "true", "mode" -> "dropmalformed")) .load(testFile(carsFile)) val numRows = cars.select("year").collect().length SQLConf.get.setConfString(SQLConf.CSV_PARSER_COLUMN_PRUNING.key, "false") spark.read .format("csv") .option("multiLine", false) .options(Map("header" -> "true", "mode" -> "dropmalformed")) .load(testFile(carsFile)).cache().count() SQLConf.get.setConfString(SQLConf.CSV_PARSER_COLUMN_PRUNING.key, "true") val numRowsReadCache = spark.read .format("csv") .option("multiLine", false) .options(Map("header" -> "true", "mode" -> "dropmalformed")) .load(testFile(carsFile)).select("year").collect().length assert(numRows == numRowsReadCache) } {code} > Cached Table (parquet) with old Configs Used > -------------------------------------------- > > Key: SPARK-34780 > URL: https://issues.apache.org/jira/browse/SPARK-34780 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.4.4, 3.1.1 > Reporter: Michael Chen > Priority: Major > > When a dataframe is cached, the logical plan can contain copies of the spark > session meaning the SQLConfs are stored. Then if a different dataframe can > replace parts of it's logical plan with a cached logical plan, the cached > SQLConfs will be used for the evaluation of the cached logical plan. This is > because HadoopFsRelation ignores sparkSession for equality checks (introduced > in https://issues.apache.org/jira/browse/SPARK-17358). > {code:java} > test("cache uses old SQLConf") { > import testImplicits._ > withTempDir { dir => > val tableDir = dir.getAbsoluteFile + "/table" > val df = Seq("a").toDF("key") > df.write.parquet(tableDir) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") > val compression1Stats = spark.read.parquet(tableDir).select("key"). > queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10") > val df2 = spark.read.parquet(tableDir).select("key") > df2.cache() > val compression10Stats = df2.queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") > val compression1StatsWithCache = > spark.read.parquet(tableDir).select("key"). > queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > // I expect these stats to be the same because file compression factor is > the same > assert(compression1Stats == compression1StatsWithCache) > // Instead, we can see the file compression factor is being cached and > used along with > // the logical plan > assert(compression10Stats == compression1StatsWithCache) > } > }{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org