[jira] [Commented] (SPARK-34780) Cached Table (parquet) with old Configs Used

2021-04-14 Thread Michael Chen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17321270#comment-17321270
 ] 

Michael Chen commented on SPARK-34780:
--

Hi [~csun]. Also sorry for late reply. My understanding is the config lets you 
ignore malformed columns if they aren't part of the schema. I thought this 
affected correctness because you would get different results with the config 
on/off based on the presence of malformed columns that aren't relevant to the 
query.
I also think the way to solve this issue would be defining equality for SQLConf.

> Cached Table (parquet) with old Configs Used
> 
>
> Key: SPARK-34780
> URL: https://issues.apache.org/jira/browse/SPARK-34780
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4, 3.1.1
>Reporter: Michael Chen
>Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark 
> session meaning the SQLConfs are stored. Then if a different dataframe can 
> replace parts of it's logical plan with a cached logical plan, the cached 
> SQLConfs will be used for the evaluation of the cached logical plan. This is 
> because HadoopFsRelation ignores sparkSession for equality checks (introduced 
> in https://issues.apache.org/jira/browse/SPARK-17358).
> {code:java}
> test("cache uses old SQLConf") {
>   import testImplicits._
>   withTempDir { dir =>
> val tableDir = dir.getAbsoluteFile + "/table"
> val df = Seq("a").toDF("key")
> df.write.parquet(tableDir)
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1Stats = spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
> val df2 = spark.read.parquet(tableDir).select("key")
> df2.cache()
> val compression10Stats = df2.queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1StatsWithCache = 
> spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> // I expect these stats to be the same because file compression factor is 
> the same
> assert(compression1Stats == compression1StatsWithCache)
> // Instead, we can see the file compression factor is being cached and 
> used along with
> // the logical plan
> assert(compression10Stats == compression1StatsWithCache)
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34780) Cached Table (parquet) with old Configs Used

2021-04-07 Thread Chao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17316714#comment-17316714
 ] 

Chao Sun commented on SPARK-34780:
--

Hi [~mikechen] (and sorry for the late reply again), thanks for providing 
another very useful code snippet! I'm not sure if this qualifies as correctness 
issue though since it is (to me) more like different interpretations of 
malformed columns in CSV? 

My previous statement about {{SessionState}} is incorrect. It seems the conf in 
{{SessionState}} is always the most up-to-date one. The only solution I can 
think of to solve this issue is to take conf into account when checking 
equality of {{HadoopFsRelation}} (and potentially others), which means we'd 
need to define equality for {{SQLConf}}..

> Cached Table (parquet) with old Configs Used
> 
>
> Key: SPARK-34780
> URL: https://issues.apache.org/jira/browse/SPARK-34780
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4, 3.1.1
>Reporter: Michael Chen
>Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark 
> session meaning the SQLConfs are stored. Then if a different dataframe can 
> replace parts of it's logical plan with a cached logical plan, the cached 
> SQLConfs will be used for the evaluation of the cached logical plan. This is 
> because HadoopFsRelation ignores sparkSession for equality checks (introduced 
> in https://issues.apache.org/jira/browse/SPARK-17358).
> {code:java}
> test("cache uses old SQLConf") {
>   import testImplicits._
>   withTempDir { dir =>
> val tableDir = dir.getAbsoluteFile + "/table"
> val df = Seq("a").toDF("key")
> df.write.parquet(tableDir)
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1Stats = spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
> val df2 = spark.read.parquet(tableDir).select("key")
> df2.cache()
> val compression10Stats = df2.queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1StatsWithCache = 
> spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> // I expect these stats to be the same because file compression factor is 
> the same
> assert(compression1Stats == compression1StatsWithCache)
> // Instead, we can see the file compression factor is being cached and 
> used along with
> // the logical plan
> assert(compression10Stats == compression1StatsWithCache)
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34780) Cached Table (parquet) with old Configs Used

2021-03-29 Thread Michael Chen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17311008#comment-17311008
 ] 

Michael Chen commented on SPARK-34780:
--

Hey [~csun]. I did find a conf that can cause a diff in correctness, but I'm 
not sure how serious it is (I didn't find other confs affecting correctness, 
but also didn't look too hard/do people flip this conf?) I added a test case 
that shows the problem though.
{code:java}
test("cache uses old SQLConf") {
  SQLConf.get.setConfString(SQLConf.CSV_PARSER_COLUMN_PRUNING.key, "true")
  val carsFile = "test-data/cars.csv"
  val cars = spark.read
.format("csv")
.option("multiLine", false)
.options(Map("header" -> "true", "mode" -> "dropmalformed"))
.load(testFile(carsFile))

  val numRows = cars.select("year").collect().length

  SQLConf.get.setConfString(SQLConf.CSV_PARSER_COLUMN_PRUNING.key, "false")
  spark.read
.format("csv")
.option("multiLine", false)
.options(Map("header" -> "true", "mode" -> "dropmalformed"))
.load(testFile(carsFile)).cache().count()

  SQLConf.get.setConfString(SQLConf.CSV_PARSER_COLUMN_PRUNING.key, "true")
  val numRowsReadCache = spark.read
.format("csv")
.option("multiLine", false)
.options(Map("header" -> "true", "mode" -> "dropmalformed"))
.load(testFile(carsFile)).select("year").collect().length

  assert(numRows == numRowsReadCache)
}
{code}

> Cached Table (parquet) with old Configs Used
> 
>
> Key: SPARK-34780
> URL: https://issues.apache.org/jira/browse/SPARK-34780
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4, 3.1.1
>Reporter: Michael Chen
>Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark 
> session meaning the SQLConfs are stored. Then if a different dataframe can 
> replace parts of it's logical plan with a cached logical plan, the cached 
> SQLConfs will be used for the evaluation of the cached logical plan. This is 
> because HadoopFsRelation ignores sparkSession for equality checks (introduced 
> in https://issues.apache.org/jira/browse/SPARK-17358).
> {code:java}
> test("cache uses old SQLConf") {
>   import testImplicits._
>   withTempDir { dir =>
> val tableDir = dir.getAbsoluteFile + "/table"
> val df = Seq("a").toDF("key")
> df.write.parquet(tableDir)
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1Stats = spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
> val df2 = spark.read.parquet(tableDir).select("key")
> df2.cache()
> val compression10Stats = df2.queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1StatsWithCache = 
> spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> // I expect these stats to be the same because file compression factor is 
> the same
> assert(compression1Stats == compression1StatsWithCache)
> // Instead, we can see the file compression factor is being cached and 
> used along with
> // the logical plan
> assert(compression10Stats == compression1StatsWithCache)
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34780) Cached Table (parquet) with old Configs Used

2021-03-26 Thread Michael Chen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17309514#comment-17309514
 ] 

Michael Chen commented on SPARK-34780:
--

Yes, I think as long as there aren't any correctness issues it's fine. 

> Cached Table (parquet) with old Configs Used
> 
>
> Key: SPARK-34780
> URL: https://issues.apache.org/jira/browse/SPARK-34780
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4, 3.1.1
>Reporter: Michael Chen
>Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark 
> session meaning the SQLConfs are stored. Then if a different dataframe can 
> replace parts of it's logical plan with a cached logical plan, the cached 
> SQLConfs will be used for the evaluation of the cached logical plan. This is 
> because HadoopFsRelation ignores sparkSession for equality checks (introduced 
> in https://issues.apache.org/jira/browse/SPARK-17358).
> {code:java}
> test("cache uses old SQLConf") {
>   import testImplicits._
>   withTempDir { dir =>
> val tableDir = dir.getAbsoluteFile + "/table"
> val df = Seq("a").toDF("key")
> df.write.parquet(tableDir)
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1Stats = spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
> val df2 = spark.read.parquet(tableDir).select("key")
> df2.cache()
> val compression10Stats = df2.queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1StatsWithCache = 
> spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> // I expect these stats to be the same because file compression factor is 
> the same
> assert(compression1Stats == compression1StatsWithCache)
> // Instead, we can see the file compression factor is being cached and 
> used along with
> // the logical plan
> assert(compression10Stats == compression1StatsWithCache)
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34780) Cached Table (parquet) with old Configs Used

2021-03-25 Thread Chao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17308854#comment-17308854
 ] 

Chao Sun commented on SPARK-34780:
--

[~mikechen], yes you're right. I'm not sure if this is a big concern though, 
since it just means the plan fragment for the cache is executed with the stale 
conf. I guess as long as there is no correctness issue (which I'd be surprised 
to see if there's any), it should be fine?

It seems a bit tricky to fix the issue, since the {{SparkSession}} is leaked to 
many places. I guess one way is to follow the idea of SPARK-33389 and change 
{{SessionState}} to always use the active conf. 

> Cached Table (parquet) with old Configs Used
> 
>
> Key: SPARK-34780
> URL: https://issues.apache.org/jira/browse/SPARK-34780
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4, 3.1.1
>Reporter: Michael Chen
>Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark 
> session meaning the SQLConfs are stored. Then if a different dataframe can 
> replace parts of it's logical plan with a cached logical plan, the cached 
> SQLConfs will be used for the evaluation of the cached logical plan. This is 
> because HadoopFsRelation ignores sparkSession for equality checks (introduced 
> in https://issues.apache.org/jira/browse/SPARK-17358).
> {code:java}
> test("cache uses old SQLConf") {
>   import testImplicits._
>   withTempDir { dir =>
> val tableDir = dir.getAbsoluteFile + "/table"
> val df = Seq("a").toDF("key")
> df.write.parquet(tableDir)
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1Stats = spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
> val df2 = spark.read.parquet(tableDir).select("key")
> df2.cache()
> val compression10Stats = df2.queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1StatsWithCache = 
> spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> // I expect these stats to be the same because file compression factor is 
> the same
> assert(compression1Stats == compression1StatsWithCache)
> // Instead, we can see the file compression factor is being cached and 
> used along with
> // the logical plan
> assert(compression10Stats == compression1StatsWithCache)
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34780) Cached Table (parquet) with old Configs Used

2021-03-25 Thread Michael Chen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17308811#comment-17308811
 ] 

Michael Chen commented on SPARK-34780:
--

Np [~csun]. If the cache isn't materialized until after the configs change, 
then I believe the input RDDs for InMemoryTableScanExec are still built with 
the old stale confs so the stale confs would also be used in the 
DataSourceScanExec? Even if the cache was materialized before the configs 
changed, reading an RDD that was created with a stale conf would be a concern 
if any of the confs can change results right? (not sure if this is possible)

> Cached Table (parquet) with old Configs Used
> 
>
> Key: SPARK-34780
> URL: https://issues.apache.org/jira/browse/SPARK-34780
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4, 3.1.1
>Reporter: Michael Chen
>Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark 
> session meaning the SQLConfs are stored. Then if a different dataframe can 
> replace parts of it's logical plan with a cached logical plan, the cached 
> SQLConfs will be used for the evaluation of the cached logical plan. This is 
> because HadoopFsRelation ignores sparkSession for equality checks (introduced 
> in https://issues.apache.org/jira/browse/SPARK-17358).
> {code:java}
> test("cache uses old SQLConf") {
>   import testImplicits._
>   withTempDir { dir =>
> val tableDir = dir.getAbsoluteFile + "/table"
> val df = Seq("a").toDF("key")
> df.write.parquet(tableDir)
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1Stats = spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
> val df2 = spark.read.parquet(tableDir).select("key")
> df2.cache()
> val compression10Stats = df2.queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1StatsWithCache = 
> spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> // I expect these stats to be the same because file compression factor is 
> the same
> assert(compression1Stats == compression1StatsWithCache)
> // Instead, we can see the file compression factor is being cached and 
> used along with
> // the logical plan
> assert(compression10Stats == compression1StatsWithCache)
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34780) Cached Table (parquet) with old Configs Used

2021-03-24 Thread Chao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17308262#comment-17308262
 ] 

Chao Sun commented on SPARK-34780:
--

Sorry for the late reply [~mikechen]! There's something I still not quite 
clear: when the cache is retrieved, a {{InMemoryRelation}} will be used to 
replace the plan fragment that is matched. Therefore, how can the old stale 
conf still be used in places like {{DataSourceScanExec}}?

> Cached Table (parquet) with old Configs Used
> 
>
> Key: SPARK-34780
> URL: https://issues.apache.org/jira/browse/SPARK-34780
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4, 3.1.1
>Reporter: Michael Chen
>Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark 
> session meaning the SQLConfs are stored. Then if a different dataframe can 
> replace parts of it's logical plan with a cached logical plan, the cached 
> SQLConfs will be used for the evaluation of the cached logical plan. This is 
> because HadoopFsRelation ignores sparkSession for equality checks (introduced 
> in https://issues.apache.org/jira/browse/SPARK-17358).
> {code:java}
> test("cache uses old SQLConf") {
>   import testImplicits._
>   withTempDir { dir =>
> val tableDir = dir.getAbsoluteFile + "/table"
> val df = Seq("a").toDF("key")
> df.write.parquet(tableDir)
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1Stats = spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
> val df2 = spark.read.parquet(tableDir).select("key")
> df2.cache()
> val compression10Stats = df2.queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1StatsWithCache = 
> spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> // I expect these stats to be the same because file compression factor is 
> the same
> assert(compression1Stats == compression1StatsWithCache)
> // Instead, we can see the file compression factor is being cached and 
> used along with
> // the logical plan
> assert(compression10Stats == compression1StatsWithCache)
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34780) Cached Table (parquet) with old Configs Used

2021-03-19 Thread Michael Chen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17305162#comment-17305162
 ] 

Michael Chen commented on SPARK-34780:
--

I used computeStats because it was a simple way to display the problem. But any 
config that is read through the relation's spark session would have the same 
problem where the cached config is read instead of config set through SQLConf 
(or other mechanisms)

> Cached Table (parquet) with old Configs Used
> 
>
> Key: SPARK-34780
> URL: https://issues.apache.org/jira/browse/SPARK-34780
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4, 3.1.1
>Reporter: Michael Chen
>Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark 
> session meaning the SQLConfs are stored. Then if a different dataframe can 
> replace parts of it's logical plan with a cached logical plan, the cached 
> SQLConfs will be used for the evaluation of the cached logical plan. This is 
> because HadoopFsRelation ignores sparkSession for equality checks (introduced 
> in https://issues.apache.org/jira/browse/SPARK-17358).
> {code:java}
> test("cache uses old SQLConf") {
>   import testImplicits._
>   withTempDir { dir =>
> val tableDir = dir.getAbsoluteFile + "/table"
> val df = Seq("a").toDF("key")
> df.write.parquet(tableDir)
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1Stats = spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
> val df2 = spark.read.parquet(tableDir).select("key")
> df2.cache()
> val compression10Stats = df2.queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1StatsWithCache = 
> spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> // I expect these stats to be the same because file compression factor is 
> the same
> assert(compression1Stats == compression1StatsWithCache)
> // Instead, we can see the file compression factor is being cached and 
> used along with
> // the logical plan
> assert(compression10Stats == compression1StatsWithCache)
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34780) Cached Table (parquet) with old Configs Used

2021-03-19 Thread Chao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17305109#comment-17305109
 ] 

Chao Sun commented on SPARK-34780:
--

Thanks for the reporting [~mikechen], the test case you provided is very 
useful. 

I'm not sure, though, how severe is the issue since it only affects 
{{computeStats}}, and when the cache is actually materialized (e.g., via 
{{df2.count()}} after {{df2.cache()}}), the value from {{computeStats}} will be 
different anyways. Could you give more details?

> Cached Table (parquet) with old Configs Used
> 
>
> Key: SPARK-34780
> URL: https://issues.apache.org/jira/browse/SPARK-34780
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4, 3.1.1
>Reporter: Michael Chen
>Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark 
> session meaning the SQLConfs are stored. Then if a different dataframe can 
> replace parts of it's logical plan with a cached logical plan, the cached 
> SQLConfs will be used for the evaluation of the cached logical plan. This is 
> because HadoopFsRelation ignores sparkSession for equality checks (introduced 
> in https://issues.apache.org/jira/browse/SPARK-17358).
> {code:java}
> test("cache uses old SQLConf") {
>   import testImplicits._
>   withTempDir { dir =>
> val tableDir = dir.getAbsoluteFile + "/table"
> val df = Seq("a").toDF("key")
> df.write.parquet(tableDir)
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1Stats = spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
> val df2 = spark.read.parquet(tableDir).select("key")
> df2.cache()
> val compression10Stats = df2.queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1StatsWithCache = 
> spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> // I expect these stats to be the same because file compression factor is 
> the same
> assert(compression1Stats == compression1StatsWithCache)
> // Instead, we can see the file compression factor is being cached and 
> used along with
> // the logical plan
> assert(compression10Stats == compression1StatsWithCache)
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34780) Cached Table (parquet) with old Configs Used

2021-03-18 Thread Hyukjin Kwon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17304561#comment-17304561
 ] 

Hyukjin Kwon commented on SPARK-34780:
--

cc [~sunchao] [~maxgekk] FYI

> Cached Table (parquet) with old Configs Used
> 
>
> Key: SPARK-34780
> URL: https://issues.apache.org/jira/browse/SPARK-34780
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4, 3.1.1
>Reporter: Michael Chen
>Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark 
> session meaning the SQLConfs are stored. Then if a different dataframe can 
> replace parts of it's logical plan with a cached logical plan, the cached 
> SQLConfs will be used for the evaluation of the cached logical plan. This is 
> because HadoopFsRelation ignores sparkSession for equality checks (introduced 
> in https://issues.apache.org/jira/browse/SPARK-17358).
> {code:java}
> test("cache uses old SQLConf") {
>   import testImplicits._
>   withTempDir { dir =>
> val tableDir = dir.getAbsoluteFile + "/table"
> val df = Seq("a").toDF("key")
> df.write.parquet(tableDir)
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1Stats = spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
> val df2 = spark.read.parquet(tableDir).select("key")
> df2.cache()
> val compression10Stats = df2.queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1StatsWithCache = 
> spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> // I expect these stats to be the same because file compression factor is 
> the same
> assert(compression1Stats == compression1StatsWithCache)
> // Instead, we can see the file compression factor is being cached and 
> used along with
> // the logical plan
> assert(compression10Stats == compression1StatsWithCache)
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org