[jira] [Created] (SPARK-44897) Local Property Propagation to Subquery Broadcast Exec
Michael Chen created SPARK-44897: Summary: Local Property Propagation to Subquery Broadcast Exec Key: SPARK-44897 URL: https://issues.apache.org/jira/browse/SPARK-44897 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.4.0 Reporter: Michael Chen https://issues.apache.org/jira/browse/SPARK-32748 was opened and then I believe mistakenly reverted to address this issue. The claim was local properties propagation in SubqueryBroadcastExec to the dynamic pruning thread is not necessary because they will be propagated by broadcast threads anyways. However, in a scenario where the dynamic pruning thread is first to initialize the broadcast relation future, the local properties will not be propagated correctly. This is because the local properties being propagated to the broadcast threads would already be incorrect. I do not have a good way of reproducing this consistently because generally the SubqueryBroadcastExec is not the first to initialize the broadcast relation future, but by adding a Thread.sleep(1) into the doPrepare method of SubqueryBroadcastExec, the following test always fails. {code:java} withSQLConf(StaticSQLConf.SUBQUERY_BROADCAST_MAX_THREAD_THRESHOLD.key -> "1") { withTable("a", "b") { val confKey = "spark.sql.y" val confValue1 = UUID.randomUUID().toString() val confValue2 = UUID.randomUUID().toString() Seq((confValue1, "1")).toDF("key", "value") .write .format("parquet") .partitionBy("key") .mode("overwrite") .saveAsTable("a") val df1 = spark.table("a") def generateBroadcastDataFrame(confKey: String, confValue: String): Dataset[String] = { val df = spark.range(1).mapPartitions { _ => Iterator(TaskContext.get.getLocalProperty(confKey)) }.filter($"value".contains(confValue)).as("c") df.hint("broadcast") } // set local property and assert val df2 = generateBroadcastDataFrame(confKey, confValue1) spark.sparkContext.setLocalProperty(confKey, confValue1) val checkDF = df1.join(df2).where($"a.key" === $"c.value").select($"a.key", $"c.value") val checks = checkDF.collect() assert(checks.forall(_.toSeq == Seq(confValue1, confValue1))) // change local property and re-assert Seq((confValue2, "1")).toDF("key", "value") .write .format("parquet") .partitionBy("key") .mode("overwrite") .saveAsTable("b") val df3 = spark.table("b") val df4 = generateBroadcastDataFrame(confKey, confValue2) spark.sparkContext.setLocalProperty(confKey, confValue2) val checks2DF = df3.join(df4).where($"b.key" === $"c.value").select($"b.key", $"c.value") val checks2 = checks2DF.collect() assert(checks2.forall(_.toSeq == Seq(confValue2, confValue2))) assert(checks2.nonEmpty) } } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-37442) In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the table that is larger than 8GB: 8 GB" failure
Michael Chen created SPARK-37442: Summary: In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the table that is larger than 8GB: 8 GB" failure Key: SPARK-37442 URL: https://issues.apache.org/jira/browse/SPARK-37442 Project: Spark Issue Type: Bug Components: Optimizer, SQL Affects Versions: 3.2.0, 3.1.1 Reporter: Michael Chen There is a period in time where an InMemoryRelation will have the cached buffers loaded, but the statistics will be inaccurate (anywhere between 0 -> size in bytes reported by accumulators). When AQE is enabled, it is possible that join planning strategies will happen in this window. In this scenario, join children sizes including InMemoryRelation are greatly underestimated and a broadcast join can be planned when it shouldn't be. We have seen scenarios where a broadcast join is planned with the builder size greater than 8GB because at planning time, the optimizer believes the InMemoryRelation is 0 bytes. Here is an example test case where the broadcast threshold is being ignored. It can mimic the 8GB error by increasing the size of the tables. {code:java} withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1048584") { // Spark estimates a string column as 20 bytes so with 60k rows, these relations should be // estimated at ~120m bytes which is greater than the broadcast join threshold Seq.fill(6)("a").toDF("key") .createOrReplaceTempView("temp") Seq.fill(6)("b").toDF("key") .createOrReplaceTempView("temp2") Seq("a").toDF("key").createOrReplaceTempView("smallTemp") spark.sql("SELECT key as newKey FROM temp").persist() val query = s""" |SELECT t3.newKey |FROM | (SELECT t1.newKey | FROM (SELECT key as newKey FROM temp) as t1 |JOIN |(SELECT key FROM smallTemp) as t2 |ON t1.newKey = t2.key | ) as t3 | JOIN | (SELECT key FROM temp2) as t4 | ON t3.newKey = t4.key |UNION |SELECT t1.newKey |FROM |(SELECT key as newKey FROM temp) as t1 |JOIN |(SELECT key FROM temp2) as t2 |ON t1.newKey = t2.key |""".stripMargin val df = spark.sql(query) df.collect() val adaptivePlan = df.queryExecution.executedPlan val bhj = findTopLevelBroadcastHashJoin(adaptivePlan) assert(bhj.length == 1) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36911) Add AQE Planning Times to SQL Metrics
Michael Chen created SPARK-36911: Summary: Add AQE Planning Times to SQL Metrics Key: SPARK-36911 URL: https://issues.apache.org/jira/browse/SPARK-36911 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.1.2 Reporter: Michael Chen Add metrics for durations of "reOptimize", "generate explainString" and "createQueryStages" to AdaptiveSparkPlanExec metrics to make it easier to see overhead of AQE for a query -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-36795) Explain Formatted has Duplicated Node IDs with InMemoryRelation Present
Michael Chen created SPARK-36795: Summary: Explain Formatted has Duplicated Node IDs with InMemoryRelation Present Key: SPARK-36795 URL: https://issues.apache.org/jira/browse/SPARK-36795 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.1.2 Reporter: Michael Chen When a query contains an InMemoryRelation, the output of Explain Formatted will contain duplicate node IDs. {code:java} == Physical Plan == AdaptiveSparkPlan (14) +- == Final Plan == * BroadcastHashJoin Inner BuildLeft (9) :- BroadcastQueryStage (5) : +- BroadcastExchange (4) : +- * Filter (3) :+- * ColumnarToRow (2) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- * ColumnarToRow (4) : +- Scan parquet default.t1 (3) +- * Filter (8) +- * ColumnarToRow (7) +- Scan parquet default.t2 (6) +- == Initial Plan == BroadcastHashJoin Inner BuildLeft (13) :- BroadcastExchange (11) : +- Filter (10) : +- InMemoryTableScan (1) : +- InMemoryRelation (2) : +- * ColumnarToRow (4) :+- Scan parquet default.t1 (3) +- Filter (12) +- Scan parquet default.t2 (6) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34780) Cached Table (parquet) with old Configs Used
[ https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17321270#comment-17321270 ] Michael Chen commented on SPARK-34780: -- Hi [~csun]. Also sorry for late reply. My understanding is the config lets you ignore malformed columns if they aren't part of the schema. I thought this affected correctness because you would get different results with the config on/off based on the presence of malformed columns that aren't relevant to the query. I also think the way to solve this issue would be defining equality for SQLConf. > Cached Table (parquet) with old Configs Used > > > Key: SPARK-34780 > URL: https://issues.apache.org/jira/browse/SPARK-34780 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.4, 3.1.1 >Reporter: Michael Chen >Priority: Major > > When a dataframe is cached, the logical plan can contain copies of the spark > session meaning the SQLConfs are stored. Then if a different dataframe can > replace parts of it's logical plan with a cached logical plan, the cached > SQLConfs will be used for the evaluation of the cached logical plan. This is > because HadoopFsRelation ignores sparkSession for equality checks (introduced > in https://issues.apache.org/jira/browse/SPARK-17358). > {code:java} > test("cache uses old SQLConf") { > import testImplicits._ > withTempDir { dir => > val tableDir = dir.getAbsoluteFile + "/table" > val df = Seq("a").toDF("key") > df.write.parquet(tableDir) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") > val compression1Stats = spark.read.parquet(tableDir).select("key"). > queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10") > val df2 = spark.read.parquet(tableDir).select("key") > df2.cache() > val compression10Stats = df2.queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") > val compression1StatsWithCache = > spark.read.parquet(tableDir).select("key"). > queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > // I expect these stats to be the same because file compression factor is > the same > assert(compression1Stats == compression1StatsWithCache) > // Instead, we can see the file compression factor is being cached and > used along with > // the logical plan > assert(compression10Stats == compression1StatsWithCache) > } > }{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34780) Cached Table (parquet) with old Configs Used
[ https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17311008#comment-17311008 ] Michael Chen commented on SPARK-34780: -- Hey [~csun]. I did find a conf that can cause a diff in correctness, but I'm not sure how serious it is (I didn't find other confs affecting correctness, but also didn't look too hard/do people flip this conf?) I added a test case that shows the problem though. {code:java} test("cache uses old SQLConf") { SQLConf.get.setConfString(SQLConf.CSV_PARSER_COLUMN_PRUNING.key, "true") val carsFile = "test-data/cars.csv" val cars = spark.read .format("csv") .option("multiLine", false) .options(Map("header" -> "true", "mode" -> "dropmalformed")) .load(testFile(carsFile)) val numRows = cars.select("year").collect().length SQLConf.get.setConfString(SQLConf.CSV_PARSER_COLUMN_PRUNING.key, "false") spark.read .format("csv") .option("multiLine", false) .options(Map("header" -> "true", "mode" -> "dropmalformed")) .load(testFile(carsFile)).cache().count() SQLConf.get.setConfString(SQLConf.CSV_PARSER_COLUMN_PRUNING.key, "true") val numRowsReadCache = spark.read .format("csv") .option("multiLine", false) .options(Map("header" -> "true", "mode" -> "dropmalformed")) .load(testFile(carsFile)).select("year").collect().length assert(numRows == numRowsReadCache) } {code} > Cached Table (parquet) with old Configs Used > > > Key: SPARK-34780 > URL: https://issues.apache.org/jira/browse/SPARK-34780 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.4, 3.1.1 >Reporter: Michael Chen >Priority: Major > > When a dataframe is cached, the logical plan can contain copies of the spark > session meaning the SQLConfs are stored. Then if a different dataframe can > replace parts of it's logical plan with a cached logical plan, the cached > SQLConfs will be used for the evaluation of the cached logical plan. This is > because HadoopFsRelation ignores sparkSession for equality checks (introduced > in https://issues.apache.org/jira/browse/SPARK-17358). > {code:java} > test("cache uses old SQLConf") { > import testImplicits._ > withTempDir { dir => > val tableDir = dir.getAbsoluteFile + "/table" > val df = Seq("a").toDF("key") > df.write.parquet(tableDir) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") > val compression1Stats = spark.read.parquet(tableDir).select("key"). > queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10") > val df2 = spark.read.parquet(tableDir).select("key") > df2.cache() > val compression10Stats = df2.queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") > val compression1StatsWithCache = > spark.read.parquet(tableDir).select("key"). > queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > // I expect these stats to be the same because file compression factor is > the same > assert(compression1Stats == compression1StatsWithCache) > // Instead, we can see the file compression factor is being cached and > used along with > // the logical plan > assert(compression10Stats == compression1StatsWithCache) > } > }{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34780) Cached Table (parquet) with old Configs Used
[ https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17309514#comment-17309514 ] Michael Chen commented on SPARK-34780: -- Yes, I think as long as there aren't any correctness issues it's fine. > Cached Table (parquet) with old Configs Used > > > Key: SPARK-34780 > URL: https://issues.apache.org/jira/browse/SPARK-34780 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.4, 3.1.1 >Reporter: Michael Chen >Priority: Major > > When a dataframe is cached, the logical plan can contain copies of the spark > session meaning the SQLConfs are stored. Then if a different dataframe can > replace parts of it's logical plan with a cached logical plan, the cached > SQLConfs will be used for the evaluation of the cached logical plan. This is > because HadoopFsRelation ignores sparkSession for equality checks (introduced > in https://issues.apache.org/jira/browse/SPARK-17358). > {code:java} > test("cache uses old SQLConf") { > import testImplicits._ > withTempDir { dir => > val tableDir = dir.getAbsoluteFile + "/table" > val df = Seq("a").toDF("key") > df.write.parquet(tableDir) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") > val compression1Stats = spark.read.parquet(tableDir).select("key"). > queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10") > val df2 = spark.read.parquet(tableDir).select("key") > df2.cache() > val compression10Stats = df2.queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") > val compression1StatsWithCache = > spark.read.parquet(tableDir).select("key"). > queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > // I expect these stats to be the same because file compression factor is > the same > assert(compression1Stats == compression1StatsWithCache) > // Instead, we can see the file compression factor is being cached and > used along with > // the logical plan > assert(compression10Stats == compression1StatsWithCache) > } > }{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34780) Cached Table (parquet) with old Configs Used
[ https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17308811#comment-17308811 ] Michael Chen commented on SPARK-34780: -- Np [~csun]. If the cache isn't materialized until after the configs change, then I believe the input RDDs for InMemoryTableScanExec are still built with the old stale confs so the stale confs would also be used in the DataSourceScanExec? Even if the cache was materialized before the configs changed, reading an RDD that was created with a stale conf would be a concern if any of the confs can change results right? (not sure if this is possible) > Cached Table (parquet) with old Configs Used > > > Key: SPARK-34780 > URL: https://issues.apache.org/jira/browse/SPARK-34780 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.4, 3.1.1 >Reporter: Michael Chen >Priority: Major > > When a dataframe is cached, the logical plan can contain copies of the spark > session meaning the SQLConfs are stored. Then if a different dataframe can > replace parts of it's logical plan with a cached logical plan, the cached > SQLConfs will be used for the evaluation of the cached logical plan. This is > because HadoopFsRelation ignores sparkSession for equality checks (introduced > in https://issues.apache.org/jira/browse/SPARK-17358). > {code:java} > test("cache uses old SQLConf") { > import testImplicits._ > withTempDir { dir => > val tableDir = dir.getAbsoluteFile + "/table" > val df = Seq("a").toDF("key") > df.write.parquet(tableDir) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") > val compression1Stats = spark.read.parquet(tableDir).select("key"). > queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10") > val df2 = spark.read.parquet(tableDir).select("key") > df2.cache() > val compression10Stats = df2.queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") > val compression1StatsWithCache = > spark.read.parquet(tableDir).select("key"). > queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > // I expect these stats to be the same because file compression factor is > the same > assert(compression1Stats == compression1StatsWithCache) > // Instead, we can see the file compression factor is being cached and > used along with > // the logical plan > assert(compression10Stats == compression1StatsWithCache) > } > }{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-34780) Cached Table (parquet) with old Configs Used
[ https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17305162#comment-17305162 ] Michael Chen edited comment on SPARK-34780 at 3/19/21, 8:00 PM: I used computeStats because it was a simple way to display the problem. But any config that is read through the relation's spark session would have the same problem where the cached config is read instead of config set through SQLConf (or other mechanisms). For example, when building the file readers in DataSourceScanExec, the fsRelation.sparkSession is passed along so configs in these readers could be wrong. was (Author: mikechen): I used computeStats because it was a simple way to display the problem. But any config that is read through the relation's spark session would have the same problem where the cached config is read instead of config set through SQLConf (or other mechanisms) > Cached Table (parquet) with old Configs Used > > > Key: SPARK-34780 > URL: https://issues.apache.org/jira/browse/SPARK-34780 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.4, 3.1.1 >Reporter: Michael Chen >Priority: Major > > When a dataframe is cached, the logical plan can contain copies of the spark > session meaning the SQLConfs are stored. Then if a different dataframe can > replace parts of it's logical plan with a cached logical plan, the cached > SQLConfs will be used for the evaluation of the cached logical plan. This is > because HadoopFsRelation ignores sparkSession for equality checks (introduced > in https://issues.apache.org/jira/browse/SPARK-17358). > {code:java} > test("cache uses old SQLConf") { > import testImplicits._ > withTempDir { dir => > val tableDir = dir.getAbsoluteFile + "/table" > val df = Seq("a").toDF("key") > df.write.parquet(tableDir) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") > val compression1Stats = spark.read.parquet(tableDir).select("key"). > queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10") > val df2 = spark.read.parquet(tableDir).select("key") > df2.cache() > val compression10Stats = df2.queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") > val compression1StatsWithCache = > spark.read.parquet(tableDir).select("key"). > queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > // I expect these stats to be the same because file compression factor is > the same > assert(compression1Stats == compression1StatsWithCache) > // Instead, we can see the file compression factor is being cached and > used along with > // the logical plan > assert(compression10Stats == compression1StatsWithCache) > } > }{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-34780) Cached Table (parquet) with old Configs Used
[ https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17305162#comment-17305162 ] Michael Chen edited comment on SPARK-34780 at 3/19/21, 8:00 PM: I used computeStats because it was a simple way to display the problem. But any config that is read through the relation's spark session would have the same problem where the cached config is read instead of config set through SQLConf (or other mechanisms). For example, when building the file readers in DataSourceScanExec, relation.sparkSession is passed along so configs in these readers could be wrong. was (Author: mikechen): I used computeStats because it was a simple way to display the problem. But any config that is read through the relation's spark session would have the same problem where the cached config is read instead of config set through SQLConf (or other mechanisms). For example, when building the file readers in DataSourceScanExec, the fsRelation.sparkSession is passed along so configs in these readers could be wrong. > Cached Table (parquet) with old Configs Used > > > Key: SPARK-34780 > URL: https://issues.apache.org/jira/browse/SPARK-34780 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.4, 3.1.1 >Reporter: Michael Chen >Priority: Major > > When a dataframe is cached, the logical plan can contain copies of the spark > session meaning the SQLConfs are stored. Then if a different dataframe can > replace parts of it's logical plan with a cached logical plan, the cached > SQLConfs will be used for the evaluation of the cached logical plan. This is > because HadoopFsRelation ignores sparkSession for equality checks (introduced > in https://issues.apache.org/jira/browse/SPARK-17358). > {code:java} > test("cache uses old SQLConf") { > import testImplicits._ > withTempDir { dir => > val tableDir = dir.getAbsoluteFile + "/table" > val df = Seq("a").toDF("key") > df.write.parquet(tableDir) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") > val compression1Stats = spark.read.parquet(tableDir).select("key"). > queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10") > val df2 = spark.read.parquet(tableDir).select("key") > df2.cache() > val compression10Stats = df2.queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") > val compression1StatsWithCache = > spark.read.parquet(tableDir).select("key"). > queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > // I expect these stats to be the same because file compression factor is > the same > assert(compression1Stats == compression1StatsWithCache) > // Instead, we can see the file compression factor is being cached and > used along with > // the logical plan > assert(compression10Stats == compression1StatsWithCache) > } > }{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-34780) Cached Table (parquet) with old Configs Used
[ https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17305162#comment-17305162 ] Michael Chen commented on SPARK-34780: -- I used computeStats because it was a simple way to display the problem. But any config that is read through the relation's spark session would have the same problem where the cached config is read instead of config set through SQLConf (or other mechanisms) > Cached Table (parquet) with old Configs Used > > > Key: SPARK-34780 > URL: https://issues.apache.org/jira/browse/SPARK-34780 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.4, 3.1.1 >Reporter: Michael Chen >Priority: Major > > When a dataframe is cached, the logical plan can contain copies of the spark > session meaning the SQLConfs are stored. Then if a different dataframe can > replace parts of it's logical plan with a cached logical plan, the cached > SQLConfs will be used for the evaluation of the cached logical plan. This is > because HadoopFsRelation ignores sparkSession for equality checks (introduced > in https://issues.apache.org/jira/browse/SPARK-17358). > {code:java} > test("cache uses old SQLConf") { > import testImplicits._ > withTempDir { dir => > val tableDir = dir.getAbsoluteFile + "/table" > val df = Seq("a").toDF("key") > df.write.parquet(tableDir) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") > val compression1Stats = spark.read.parquet(tableDir).select("key"). > queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10") > val df2 = spark.read.parquet(tableDir).select("key") > df2.cache() > val compression10Stats = df2.queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") > val compression1StatsWithCache = > spark.read.parquet(tableDir).select("key"). > queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > // I expect these stats to be the same because file compression factor is > the same > assert(compression1Stats == compression1StatsWithCache) > // Instead, we can see the file compression factor is being cached and > used along with > // the logical plan > assert(compression10Stats == compression1StatsWithCache) > } > }{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34780) Cached Table (parquet) with old Configs Used
[ https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Chen updated SPARK-34780: - Description: When a dataframe is cached, the logical plan can contain copies of the spark session meaning the SQLConfs are stored. Then if a different dataframe can replace parts of it's logical plan with a cached logical plan, the cached SQLConfs will be used for the evaluation of the cached logical plan. This is because HadoopFsRelation ignores sparkSession for equality checks (introduced in https://issues.apache.org/jira/browse/SPARK-17358). {code:java} test("cache uses old SQLConf") { import testImplicits._ withTempDir { dir => val tableDir = dir.getAbsoluteFile + "/table" val df = Seq("a").toDF("key") df.write.parquet(tableDir) SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") val compression1Stats = spark.read.parquet(tableDir).select("key"). queryExecution.optimizedPlan.collect { case l: LogicalRelation => l case m: InMemoryRelation => m }.map(_.computeStats()) SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10") val df2 = spark.read.parquet(tableDir).select("key") df2.cache() val compression10Stats = df2.queryExecution.optimizedPlan.collect { case l: LogicalRelation => l case m: InMemoryRelation => m }.map(_.computeStats()) SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") val compression1StatsWithCache = spark.read.parquet(tableDir).select("key"). queryExecution.optimizedPlan.collect { case l: LogicalRelation => l case m: InMemoryRelation => m }.map(_.computeStats()) // I expect these stats to be the same because file compression factor is the same assert(compression1Stats == compression1StatsWithCache) // Instead, we can see the file compression factor is being cached and used along with // the logical plan assert(compression10Stats == compression1StatsWithCache) } }{code} was: When a dataframe is cached, the logical plan can contain copies of the spark session meaning the SQLConfs are stored. Then if a different dataframe can replace parts of it's logical plan with a cached logical plan, the cached SQLConfs will be used for the evaluation of the cached logical plan. This is because HadoopFsRelation ignores sparkSession for equality checks (introduced in https://issues.apache.org/jira/browse/SPARK-17358). I suspect this also happens in other versions of Spark but haven't tested yet. {code:java} test("cache uses old SQLConf") { import testImplicits._ withTempDir { dir => val tableDir = dir.getAbsoluteFile + "/table" val df = Seq("a").toDF("key") df.write.parquet(tableDir) SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") val compression1Stats = spark.read.parquet(tableDir).select("key"). queryExecution.optimizedPlan.collect { case l: LogicalRelation => l case m: InMemoryRelation => m }.map(_.computeStats()) SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10") val df2 = spark.read.parquet(tableDir).select("key") df2.cache() val compression10Stats = df2.queryExecution.optimizedPlan.collect { case l: LogicalRelation => l case m: InMemoryRelation => m }.map(_.computeStats()) SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") val compression1StatsWithCache = spark.read.parquet(tableDir).select("key"). queryExecution.optimizedPlan.collect { case l: LogicalRelation => l case m: InMemoryRelation => m }.map(_.computeStats()) // I expect these stats to be the same because file compression factor is the same assert(compression1Stats == compression1StatsWithCache) // Instead, we can see the file compression factor is being cached and used along with // the logical plan assert(compression10Stats == compression1StatsWithCache) } }{code} > Cached Table (parquet) with old Configs Used > > > Key: SPARK-34780 > URL: https://issues.apache.org/jira/browse/SPARK-34780 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.4, 3.1.1 >Reporter: Michael Chen >Priority: Major > > When a dataframe is cached, the logical plan can contain copies of the spark > session meaning the SQLConfs are stored. Then if a different dataframe can > replace parts of it's logical plan with a cached logical plan, the cached > SQLConfs will be used for the evaluation of the cached logical plan. This is > because HadoopFsRelation ignores sparkSession for equality checks (introduced > in https://issues.apache.org/jira/browse/SPARK-17358). > {code:java} > test("cache uses old SQLConf") { > import test
[jira] [Updated] (SPARK-34780) Cached Table (parquet) with old Configs Used
[ https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Chen updated SPARK-34780: - Affects Version/s: 3.1.1 > Cached Table (parquet) with old Configs Used > > > Key: SPARK-34780 > URL: https://issues.apache.org/jira/browse/SPARK-34780 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.4, 3.1.1 >Reporter: Michael Chen >Priority: Major > > When a dataframe is cached, the logical plan can contain copies of the spark > session meaning the SQLConfs are stored. Then if a different dataframe can > replace parts of it's logical plan with a cached logical plan, the cached > SQLConfs will be used for the evaluation of the cached logical plan. This is > because HadoopFsRelation ignores sparkSession for equality checks (introduced > in https://issues.apache.org/jira/browse/SPARK-17358). I suspect this also > happens in other versions of Spark but haven't tested yet. > {code:java} > test("cache uses old SQLConf") { > import testImplicits._ > withTempDir { dir => > val tableDir = dir.getAbsoluteFile + "/table" > val df = Seq("a").toDF("key") > df.write.parquet(tableDir) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") > val compression1Stats = spark.read.parquet(tableDir).select("key"). > queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10") > val df2 = spark.read.parquet(tableDir).select("key") > df2.cache() > val compression10Stats = df2.queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") > val compression1StatsWithCache = > spark.read.parquet(tableDir).select("key"). > queryExecution.optimizedPlan.collect { > case l: LogicalRelation => l > case m: InMemoryRelation => m > }.map(_.computeStats()) > // I expect these stats to be the same because file compression factor is > the same > assert(compression1Stats == compression1StatsWithCache) > // Instead, we can see the file compression factor is being cached and > used along with > // the logical plan > assert(compression10Stats == compression1StatsWithCache) > } > }{code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-34780) Cached Table (parquet) with old Configs Used
[ https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Chen updated SPARK-34780: - Description: When a dataframe is cached, the logical plan can contain copies of the spark session meaning the SQLConfs are stored. Then if a different dataframe can replace parts of it's logical plan with a cached logical plan, the cached SQLConfs will be used for the evaluation of the cached logical plan. This is because HadoopFsRelation ignores sparkSession for equality checks (introduced in https://issues.apache.org/jira/browse/SPARK-17358). I suspect this also happens in other versions of Spark but haven't tested yet. {code:java} test("cache uses old SQLConf") { import testImplicits._ withTempDir { dir => val tableDir = dir.getAbsoluteFile + "/table" val df = Seq("a").toDF("key") df.write.parquet(tableDir) SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") val compression1Stats = spark.read.parquet(tableDir).select("key"). queryExecution.optimizedPlan.collect { case l: LogicalRelation => l case m: InMemoryRelation => m }.map(_.computeStats()) SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10") val df2 = spark.read.parquet(tableDir).select("key") df2.cache() val compression10Stats = df2.queryExecution.optimizedPlan.collect { case l: LogicalRelation => l case m: InMemoryRelation => m }.map(_.computeStats()) SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") val compression1StatsWithCache = spark.read.parquet(tableDir).select("key"). queryExecution.optimizedPlan.collect { case l: LogicalRelation => l case m: InMemoryRelation => m }.map(_.computeStats()) // I expect these stats to be the same because file compression factor is the same assert(compression1Stats == compression1StatsWithCache) // Instead, we can see the file compression factor is being cached and used along with // the logical plan assert(compression10Stats == compression1StatsWithCache) } }{code} was: When a dataframe is cached, the logical plan can contain copies of the spark session meaning the SQLConfs are stored. Then if a different dataframe can replace parts of it's logical plan with a cached logical plan, the cached SQLConfs will be used for the evaluation of the cached logical plan. This is because HadoopFsRelation ignores sparkSession for equality checks (introduced in https://issues.apache.org/jira/browse/SPARK-17358). {code:java} test("cache uses old SQLConf") { import testImplicits._ withTempDir { dir => val tableDir = dir.getAbsoluteFile + "/table" val df = Seq("a").toDF("key") df.write.parquet(tableDir) SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") val compression1Stats = spark.read.parquet(tableDir).select("key"). queryExecution.optimizedPlan.collect { case l: LogicalRelation => l case m: InMemoryRelation => m }.map(_.computeStats()) SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10") val df2 = spark.read.parquet(tableDir).select("key") df2.cache() val compression10Stats = df2.queryExecution.optimizedPlan.collect { case l: LogicalRelation => l case m: InMemoryRelation => m }.map(_.computeStats()) SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") val compression1StatsWithCache = spark.read.parquet(tableDir).select("key"). queryExecution.optimizedPlan.collect { case l: LogicalRelation => l case m: InMemoryRelation => m }.map(_.computeStats()) // I expect these stats to be the same because file compression factor is the same assert(compression1Stats == compression1StatsWithCache) // Instead, we can see the file compression factor is being cached and used along with // the logical plan assert(compression10Stats == compression1StatsWithCache) } }{code} > Cached Table (parquet) with old Configs Used > > > Key: SPARK-34780 > URL: https://issues.apache.org/jira/browse/SPARK-34780 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.4 >Reporter: Michael Chen >Priority: Major > > When a dataframe is cached, the logical plan can contain copies of the spark > session meaning the SQLConfs are stored. Then if a different dataframe can > replace parts of it's logical plan with a cached logical plan, the cached > SQLConfs will be used for the evaluation of the cached logical plan. This is > because HadoopFsRelation ignores sparkSession for equality checks (introduced > in https://issues.apache.org/jira/browse/SPARK-17358). I suspect this also > happens in other versions of Spark but haven't t
[jira] [Created] (SPARK-34780) Cached Table (parquet) with old Configs Used
Michael Chen created SPARK-34780: Summary: Cached Table (parquet) with old Configs Used Key: SPARK-34780 URL: https://issues.apache.org/jira/browse/SPARK-34780 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.4 Reporter: Michael Chen When a dataframe is cached, the logical plan can contain copies of the spark session meaning the SQLConfs are stored. Then if a different dataframe can replace parts of it's logical plan with a cached logical plan, the cached SQLConfs will be used for the evaluation of the cached logical plan. This is because HadoopFsRelation ignores sparkSession for equality checks (introduced in https://issues.apache.org/jira/browse/SPARK-17358). {code:java} test("cache uses old SQLConf") { import testImplicits._ withTempDir { dir => val tableDir = dir.getAbsoluteFile + "/table" val df = Seq("a").toDF("key") df.write.parquet(tableDir) SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") val compression1Stats = spark.read.parquet(tableDir).select("key"). queryExecution.optimizedPlan.collect { case l: LogicalRelation => l case m: InMemoryRelation => m }.map(_.computeStats()) SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10") val df2 = spark.read.parquet(tableDir).select("key") df2.cache() val compression10Stats = df2.queryExecution.optimizedPlan.collect { case l: LogicalRelation => l case m: InMemoryRelation => m }.map(_.computeStats()) SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1") val compression1StatsWithCache = spark.read.parquet(tableDir).select("key"). queryExecution.optimizedPlan.collect { case l: LogicalRelation => l case m: InMemoryRelation => m }.map(_.computeStats()) // I expect these stats to be the same because file compression factor is the same assert(compression1Stats == compression1StatsWithCache) // Instead, we can see the file compression factor is being cached and used along with // the logical plan assert(compression10Stats == compression1StatsWithCache) } }{code} -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-29561) Large Case Statement Code Generation OOM
[ https://issues.apache.org/jira/browse/SPARK-29561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16962361#comment-16962361 ] Michael Chen commented on SPARK-29561: -- If I increase the memory, it will run into the generated code grows beyond 64 KB exception and disable whole stage code generation for the plan. So that is ok. But if I increase the number of branches/complexity of the branches, it will just run into the OOM problem again. > Large Case Statement Code Generation OOM > > > Key: SPARK-29561 > URL: https://issues.apache.org/jira/browse/SPARK-29561 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Michael Chen >Priority: Major > Attachments: apacheSparkCase.sql > > > Spark Configuration > spark.driver.memory = 1g > spark.master = "local" > spark.deploy.mode = "client" > Try to execute a case statement with 3000+ branches. Added sql statement as > attachment > Spark runs for a while before it OOM > {noformat} > java.lang.OutOfMemoryError: GC overhead limit exceeded > at > org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182) > at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320) > at > org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178) > at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73) > 19/10/22 16:19:54 ERROR FileFormatWriter: Aborting job null. > java.lang.OutOfMemoryError: GC overhead limit exceeded > at java.util.HashMap.newNode(HashMap.java:1750) > at java.util.HashMap.putVal(HashMap.java:631) > at java.util.HashMap.putMapEntries(HashMap.java:515) > at java.util.HashMap.putAll(HashMap.java:785) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3345) > at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:212) > at > org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3230) > at > org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3198) > at > org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3351) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3254) > at org.codehaus.janino.UnitCompiler.access$3900(UnitCompiler.java:212) > at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3216) > at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3198) > at org.codehaus.janino.Java$Block.accept(Java.java:2756) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3260) > at org.codehaus.janino.UnitCompiler.access$4000(UnitCompiler.java:212) > at > org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3217) > at > org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3198) > at org.codehaus.janino.Java$DoStatement.accept(Java.java:3304) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3186) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3009) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1336) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1309) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:799) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:958) > at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:212) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:393) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:385) > at > org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1286) > 19/10/22 16:19:54 ERROR Utils: throw uncaught fatal error in thread Spark > Context Cleaner > java.lang.OutOfMemoryError: GC overhead limit exceeded > at > org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182) > at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320) > at > org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178) > at >
[jira] [Comment Edited] (SPARK-29561) Large Case Statement Code Generation OOM
[ https://issues.apache.org/jira/browse/SPARK-29561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16962361#comment-16962361 ] Michael Chen edited comment on SPARK-29561 at 10/29/19 7:59 PM: If I increase the memory, it will run into the generated code grows beyond 64 KB exception and disable whole stage code generation for the plan. But if I increase the number of branches/complexity of the branches, it will just run into the OOM problem again. was (Author: mikechen): If I increase the memory, it will run into the generated code grows beyond 64 KB exception and disable whole stage code generation for the plan. So that is ok. But if I increase the number of branches/complexity of the branches, it will just run into the OOM problem again. > Large Case Statement Code Generation OOM > > > Key: SPARK-29561 > URL: https://issues.apache.org/jira/browse/SPARK-29561 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Michael Chen >Priority: Major > Attachments: apacheSparkCase.sql > > > Spark Configuration > spark.driver.memory = 1g > spark.master = "local" > spark.deploy.mode = "client" > Try to execute a case statement with 3000+ branches. Added sql statement as > attachment > Spark runs for a while before it OOM > {noformat} > java.lang.OutOfMemoryError: GC overhead limit exceeded > at > org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182) > at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320) > at > org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178) > at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73) > 19/10/22 16:19:54 ERROR FileFormatWriter: Aborting job null. > java.lang.OutOfMemoryError: GC overhead limit exceeded > at java.util.HashMap.newNode(HashMap.java:1750) > at java.util.HashMap.putVal(HashMap.java:631) > at java.util.HashMap.putMapEntries(HashMap.java:515) > at java.util.HashMap.putAll(HashMap.java:785) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3345) > at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:212) > at > org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3230) > at > org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3198) > at > org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3351) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3254) > at org.codehaus.janino.UnitCompiler.access$3900(UnitCompiler.java:212) > at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3216) > at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3198) > at org.codehaus.janino.Java$Block.accept(Java.java:2756) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3260) > at org.codehaus.janino.UnitCompiler.access$4000(UnitCompiler.java:212) > at > org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3217) > at > org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3198) > at org.codehaus.janino.Java$DoStatement.accept(Java.java:3304) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3186) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3009) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1336) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1309) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:799) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:958) > at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:212) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:393) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:385) > at > org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1286) > 19/10/22 16:19:54 ERROR Utils: throw uncaught fatal error in thread Spark > Context Cleaner > java.lang.OutOfMemoryError: GC overhead limit exceeded > at > o
[jira] [Comment Edited] (SPARK-29561) Large Case Statement Code Generation OOM
[ https://issues.apache.org/jira/browse/SPARK-29561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16962361#comment-16962361 ] Michael Chen edited comment on SPARK-29561 at 10/29/19 7:59 PM: Yes it works when I increase the driver memory. It just runs into the generated code grows beyond 64 KB exception and then disables whole stage code generation for the plan. But if I increase the number of branches/complexity of the branches, it will just run into the OOM problem again. was (Author: mikechen): If I increase the memory, it will run into the generated code grows beyond 64 KB exception and disable whole stage code generation for the plan. But if I increase the number of branches/complexity of the branches, it will just run into the OOM problem again. > Large Case Statement Code Generation OOM > > > Key: SPARK-29561 > URL: https://issues.apache.org/jira/browse/SPARK-29561 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Michael Chen >Priority: Major > Attachments: apacheSparkCase.sql > > > Spark Configuration > spark.driver.memory = 1g > spark.master = "local" > spark.deploy.mode = "client" > Try to execute a case statement with 3000+ branches. Added sql statement as > attachment > Spark runs for a while before it OOM > {noformat} > java.lang.OutOfMemoryError: GC overhead limit exceeded > at > org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182) > at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320) > at > org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178) > at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73) > 19/10/22 16:19:54 ERROR FileFormatWriter: Aborting job null. > java.lang.OutOfMemoryError: GC overhead limit exceeded > at java.util.HashMap.newNode(HashMap.java:1750) > at java.util.HashMap.putVal(HashMap.java:631) > at java.util.HashMap.putMapEntries(HashMap.java:515) > at java.util.HashMap.putAll(HashMap.java:785) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3345) > at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:212) > at > org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3230) > at > org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3198) > at > org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3351) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3254) > at org.codehaus.janino.UnitCompiler.access$3900(UnitCompiler.java:212) > at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3216) > at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3198) > at org.codehaus.janino.Java$Block.accept(Java.java:2756) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3260) > at org.codehaus.janino.UnitCompiler.access$4000(UnitCompiler.java:212) > at > org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3217) > at > org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3198) > at org.codehaus.janino.Java$DoStatement.accept(Java.java:3304) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3186) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3009) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1336) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1309) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:799) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:958) > at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:212) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:393) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:385) > at > org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1286) > 19/10/22 16:19:54 ERROR Utils: throw uncaught fatal error in thread Spark > Context Cleaner > java.lang.OutOfMemoryError: GC overhead limit exceeded
[jira] [Updated] (SPARK-29561) Large Case Statement Code Generation OOM
[ https://issues.apache.org/jira/browse/SPARK-29561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Chen updated SPARK-29561: - Description: Spark Configuration spark.driver.memory = 1g spark.master = "local" spark.deploy.mode = "client" Try to execute a case statement with 3000+ branches. Added sql statement as attachment Spark runs for a while before it OOM {noformat} java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320) at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178) at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73) 19/10/22 16:19:54 ERROR FileFormatWriter: Aborting job null. java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.HashMap.newNode(HashMap.java:1750) at java.util.HashMap.putVal(HashMap.java:631) at java.util.HashMap.putMapEntries(HashMap.java:515) at java.util.HashMap.putAll(HashMap.java:785) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3345) at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3230) at org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3198) at org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3351) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3254) at org.codehaus.janino.UnitCompiler.access$3900(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3216) at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3198) at org.codehaus.janino.Java$Block.accept(Java.java:2756) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3260) at org.codehaus.janino.UnitCompiler.access$4000(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3217) at org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3198) at org.codehaus.janino.Java$DoStatement.accept(Java.java:3304) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3186) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3009) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1336) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1309) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:799) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:958) at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:393) at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:385) at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1286) 19/10/22 16:19:54 ERROR Utils: throw uncaught fatal error in thread Spark Context Cleaner java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320) at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178) at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73){noformat} Generated code looks like {noformat} /* 029 */ private void project_doConsume(InternalRow scan_row, UTF8String project_expr_0, boolean project_exprIsNull_0) throws java.io.IOException { /* 030 */ byte project_caseWhenResultState = -1; /* 031 */ do { /* 032 */ boolean project_isNull1 = true; /* 033 */ boolean project_value1 = false; /* 034 */ /* 035 */ boolean project_isNull2 = project_exprIsNull_0; /* 036 */ int project_value2 = -1; /* 037 */ if (!project_exprIsNull_0) { /* 038 */ UTF8String.IntWrapper project_intWrapper = new UTF8String.IntWrapper(); /* 039 */ if (project_expr_0.toInt(project_intWrapper)) { /* 040 */ project_value2 = project_intWrappe
[jira] [Updated] (SPARK-29561) Large Case Statement Code Generation OOM
[ https://issues.apache.org/jira/browse/SPARK-29561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Chen updated SPARK-29561: - Attachment: apacheSparkCase.sql > Large Case Statement Code Generation OOM > > > Key: SPARK-29561 > URL: https://issues.apache.org/jira/browse/SPARK-29561 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Michael Chen >Priority: Major > Attachments: apacheSparkCase.sql > > > Spark Configuration > spark.driver.memory = 1g > spark.master = "local" > spark.deploy.mode = "client" > Try to execute a case statement with 3000+ branches. > Spark runs for a while before it OOM > {noformat} > java.lang.OutOfMemoryError: GC overhead limit exceeded > at > org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182) > at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320) > at > org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178) > at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73) > 19/10/22 16:19:54 ERROR FileFormatWriter: Aborting job null. > java.lang.OutOfMemoryError: GC overhead limit exceeded > at java.util.HashMap.newNode(HashMap.java:1750) > at java.util.HashMap.putVal(HashMap.java:631) > at java.util.HashMap.putMapEntries(HashMap.java:515) > at java.util.HashMap.putAll(HashMap.java:785) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3345) > at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:212) > at > org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3230) > at > org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3198) > at > org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3351) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3254) > at org.codehaus.janino.UnitCompiler.access$3900(UnitCompiler.java:212) > at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3216) > at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3198) > at org.codehaus.janino.Java$Block.accept(Java.java:2756) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3260) > at org.codehaus.janino.UnitCompiler.access$4000(UnitCompiler.java:212) > at > org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3217) > at > org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3198) > at org.codehaus.janino.Java$DoStatement.accept(Java.java:3304) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197) > at > org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3186) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3009) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1336) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1309) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:799) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:958) > at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:212) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:393) > at > org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:385) > at > org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1286) > 19/10/22 16:19:54 ERROR Utils: throw uncaught fatal error in thread Spark > Context Cleaner > java.lang.OutOfMemoryError: GC overhead limit exceeded > at > org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182) > at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320) > at > org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178) > at > org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73){noformat} > Generated code looks like > {noformat} > /* 029 */ private void project_doConsume(InternalRow scan_row, UTF8String > project_expr_0, boolean project_exprIsNull_0) throws java.io.IOException { > /* 030 */ byte project_caseWhenResultState = -1; > /*
[jira] [Updated] (SPARK-29561) Large Case Statement Code Generation OOM
[ https://issues.apache.org/jira/browse/SPARK-29561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Chen updated SPARK-29561: - Description: Spark Configuration spark.driver.memory = 1g spark.master = "local" spark.deploy.mode = "client" Try to execute a case statement with 3000+ branches. Added sql statement as attachment Spark runs for a while before it OOM {noformat} java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320) at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178) at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73) 19/10/22 16:19:54 ERROR FileFormatWriter: Aborting job null. java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.HashMap.newNode(HashMap.java:1750) at java.util.HashMap.putVal(HashMap.java:631) at java.util.HashMap.putMapEntries(HashMap.java:515) at java.util.HashMap.putAll(HashMap.java:785) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3345) at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3230) at org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3198) at org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3351) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3254) at org.codehaus.janino.UnitCompiler.access$3900(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3216) at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3198) at org.codehaus.janino.Java$Block.accept(Java.java:2756) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3260) at org.codehaus.janino.UnitCompiler.access$4000(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3217) at org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3198) at org.codehaus.janino.Java$DoStatement.accept(Java.java:3304) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3186) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3009) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1336) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1309) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:799) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:958) at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:393) at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:385) at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1286) 19/10/22 16:19:54 ERROR Utils: throw uncaught fatal error in thread Spark Context Cleaner java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320) at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178) at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73){noformat} Generated code looks like {noformat} /* 029 */ private void project_doConsume(InternalRow scan_row, UTF8String project_expr_0, boolean project_exprIsNull_0) throws java.io.IOException { /* 030 */ byte project_caseWhenResultState = -1; /* 031 */ do { /* 032 */ boolean project_isNull1 = true; /* 033 */ boolean project_value1 = false; /* 034 */ /* 035 */ boolean project_isNull2 = project_exprIsNull_0; /* 036 */ int project_value2 = -1; /* 037 */ if (!project_exprIsNull_0) { /* 038 */ UTF8String.IntWrapper project_intWrapper = new UTF8String.IntWrapper(); /* 039 */ if (project_expr_0.toInt(project_intWrapper)) { /* 040 */ project_value2 = project_intWrappe
[jira] [Created] (SPARK-29561) Large Case Statement Code Generation OOM
Michael Chen created SPARK-29561: Summary: Large Case Statement Code Generation OOM Key: SPARK-29561 URL: https://issues.apache.org/jira/browse/SPARK-29561 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.3.0 Reporter: Michael Chen Spark Configuration spark.driver.memory = 1g spark.master = "local" spark.deploy.mode = "client" Try to execute a case statement with 3000+ branches. Spark runs for a while before it OOM {noformat} java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320) at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178) at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73) 19/10/22 16:19:54 ERROR FileFormatWriter: Aborting job null. java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.HashMap.newNode(HashMap.java:1750) at java.util.HashMap.putVal(HashMap.java:631) at java.util.HashMap.putMapEntries(HashMap.java:515) at java.util.HashMap.putAll(HashMap.java:785) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3345) at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3230) at org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3198) at org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3351) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3254) at org.codehaus.janino.UnitCompiler.access$3900(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3216) at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3198) at org.codehaus.janino.Java$Block.accept(Java.java:2756) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3260) at org.codehaus.janino.UnitCompiler.access$4000(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3217) at org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3198) at org.codehaus.janino.Java$DoStatement.accept(Java.java:3304) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3186) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3009) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1336) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1309) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:799) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:958) at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:393) at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:385) at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1286) 19/10/22 16:19:54 ERROR Utils: throw uncaught fatal error in thread Spark Context Cleaner java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320) at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178) at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73){noformat} Generated code looks like {noformat} /* 029 */ private void project_doConsume(InternalRow scan_row, UTF8String project_expr_0, boolean project_exprIsNull_0) throws java.io.IOException { /* 030 */ byte project_caseWhenResultState = -1; /* 031 */ do { /* 032 */ boolean project_isNull1 = true; /* 033 */ boolean project_value1 = false; /* 034 */ /* 035 */ boolean project_isNull2 = project_exprIsNull_0; /* 036 */ int project_value2 = -1; /* 037 */ if (!project_exprIsNull_0) { /* 038 */ UTF8String.IntWrapper project_intWrapper = new UTF8Stri
[jira] [Updated] (SPARK-29561) Large Case Statement Code Generation OOM
[ https://issues.apache.org/jira/browse/SPARK-29561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Chen updated SPARK-29561: - Description: Spark Configuration spark.driver.memory = 1g spark.master = "local" spark.deploy.mode = "client" Try to execute a case statement with 3000+ branches. Spark runs for a while before it OOM {noformat} java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320) at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178) at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73) 19/10/22 16:19:54 ERROR FileFormatWriter: Aborting job null. java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.HashMap.newNode(HashMap.java:1750) at java.util.HashMap.putVal(HashMap.java:631) at java.util.HashMap.putMapEntries(HashMap.java:515) at java.util.HashMap.putAll(HashMap.java:785) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3345) at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3230) at org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3198) at org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3351) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3254) at org.codehaus.janino.UnitCompiler.access$3900(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3216) at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3198) at org.codehaus.janino.Java$Block.accept(Java.java:2756) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3260) at org.codehaus.janino.UnitCompiler.access$4000(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3217) at org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3198) at org.codehaus.janino.Java$DoStatement.accept(Java.java:3304) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197) at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3186) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3009) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1336) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1309) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:799) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:958) at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:393) at org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:385) at org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1286) 19/10/22 16:19:54 ERROR Utils: throw uncaught fatal error in thread Spark Context Cleaner java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320) at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178) at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73){noformat} Generated code looks like {noformat} /* 029 */ private void project_doConsume(InternalRow scan_row, UTF8String project_expr_0, boolean project_exprIsNull_0) throws java.io.IOException { /* 030 */ byte project_caseWhenResultState = -1; /* 031 */ do { /* 032 */ boolean project_isNull1 = true; /* 033 */ boolean project_value1 = false; /* 034 */ /* 035 */ boolean project_isNull2 = project_exprIsNull_0; /* 036 */ int project_value2 = -1; /* 037 */ if (!project_exprIsNull_0) { /* 038 */ UTF8String.IntWrapper project_intWrapper = new UTF8String.IntWrapper(); /* 039 */ if (project_expr_0.toInt(project_intWrapper)) { /* 040 */ project_value2 = project_intWrapper.value; /* 041 */ } else {