[jira] [Created] (SPARK-44897) Local Property Propagation to Subquery Broadcast Exec

2023-08-21 Thread Michael Chen (Jira)
Michael Chen created SPARK-44897:


 Summary: Local Property Propagation to Subquery Broadcast Exec
 Key: SPARK-44897
 URL: https://issues.apache.org/jira/browse/SPARK-44897
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.4.0
Reporter: Michael Chen


https://issues.apache.org/jira/browse/SPARK-32748 was opened and then I believe 
mistakenly reverted to address this issue. The claim was local properties 
propagation in SubqueryBroadcastExec to the dynamic pruning thread is not 
necessary because they will be propagated by broadcast threads anyways. 
However, in a scenario where the dynamic pruning thread is first to initialize 
the broadcast relation future, the local properties will not be propagated 
correctly. This is because the local properties being propagated to the 
broadcast threads would already be incorrect.
I do not have a good way of reproducing this consistently because generally the 
SubqueryBroadcastExec is not the first to initialize the broadcast relation 
future, but by adding a Thread.sleep(1) into the doPrepare method of 
SubqueryBroadcastExec, the following test always fails.
{code:java}
withSQLConf(StaticSQLConf.SUBQUERY_BROADCAST_MAX_THREAD_THRESHOLD.key -> "1") {
  withTable("a", "b") {
val confKey = "spark.sql.y"
val confValue1 = UUID.randomUUID().toString()
val confValue2 = UUID.randomUUID().toString()
Seq((confValue1, "1")).toDF("key", "value")
  .write
  .format("parquet")
  .partitionBy("key")
  .mode("overwrite")
  .saveAsTable("a")
val df1 = spark.table("a")

def generateBroadcastDataFrame(confKey: String, confValue: String): 
Dataset[String] = {
  val df = spark.range(1).mapPartitions { _ =>
Iterator(TaskContext.get.getLocalProperty(confKey))
  }.filter($"value".contains(confValue)).as("c")
  df.hint("broadcast")
}

// set local property and assert
val df2 = generateBroadcastDataFrame(confKey, confValue1)
spark.sparkContext.setLocalProperty(confKey, confValue1)
val checkDF = df1.join(df2).where($"a.key" === $"c.value").select($"a.key", 
$"c.value")
val checks = checkDF.collect()
assert(checks.forall(_.toSeq == Seq(confValue1, confValue1)))

// change local property and re-assert
Seq((confValue2, "1")).toDF("key", "value")
  .write
  .format("parquet")
  .partitionBy("key")
  .mode("overwrite")
  .saveAsTable("b")
val df3 = spark.table("b")
val df4 = generateBroadcastDataFrame(confKey, confValue2)
spark.sparkContext.setLocalProperty(confKey, confValue2)
val checks2DF = df3.join(df4).where($"b.key" === 
$"c.value").select($"b.key", $"c.value")
val checks2 = checks2DF.collect()
assert(checks2.forall(_.toSeq == Seq(confValue2, confValue2)))
assert(checks2.nonEmpty)
  }
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-37442) In AQE, wrong InMemoryRelation size estimation causes "Cannot broadcast the table that is larger than 8GB: 8 GB" failure

2021-11-22 Thread Michael Chen (Jira)
Michael Chen created SPARK-37442:


 Summary: In AQE, wrong InMemoryRelation size estimation causes 
"Cannot broadcast the table that is larger than 8GB: 8 GB" failure
 Key: SPARK-37442
 URL: https://issues.apache.org/jira/browse/SPARK-37442
 Project: Spark
  Issue Type: Bug
  Components: Optimizer, SQL
Affects Versions: 3.2.0, 3.1.1
Reporter: Michael Chen


There is a period in time where an InMemoryRelation will have the cached 
buffers loaded, but the statistics will be inaccurate (anywhere between 0 -> 
size in bytes reported by accumulators). When AQE is enabled, it is possible 
that join planning strategies will happen in this window. In this scenario, 
join children sizes including InMemoryRelation are greatly underestimated and a 
broadcast join can be planned when it shouldn't be. We have seen scenarios 
where a broadcast join is planned with the builder size greater than 8GB 
because at planning time, the optimizer believes the InMemoryRelation is 0 
bytes.

Here is an example test case where the broadcast threshold is being ignored. It 
can mimic the 8GB error by increasing the size of the tables.
{code:java}
withSQLConf(
  SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
  SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1048584") {
  // Spark estimates a string column as 20 bytes so with 60k rows, these 
relations should be
  // estimated at ~120m bytes which is greater than the broadcast join threshold
  Seq.fill(6)("a").toDF("key")
.createOrReplaceTempView("temp")
  Seq.fill(6)("b").toDF("key")
.createOrReplaceTempView("temp2")

  Seq("a").toDF("key").createOrReplaceTempView("smallTemp")
  spark.sql("SELECT key as newKey FROM temp").persist()

  val query =
  s"""
 |SELECT t3.newKey
 |FROM
 |  (SELECT t1.newKey
 |  FROM (SELECT key as newKey FROM temp) as t1
 |JOIN
 |(SELECT key FROM smallTemp) as t2
 |ON t1.newKey = t2.key
 |  ) as t3
 |  JOIN
 |  (SELECT key FROM temp2) as t4
 |  ON t3.newKey = t4.key
 |UNION
 |SELECT t1.newKey
 |FROM
 |(SELECT key as newKey FROM temp) as t1
 |JOIN
 |(SELECT key FROM temp2) as t2
 |ON t1.newKey = t2.key
 |""".stripMargin
  val df = spark.sql(query)
  df.collect()
  val adaptivePlan = df.queryExecution.executedPlan
  val bhj = findTopLevelBroadcastHashJoin(adaptivePlan)
  assert(bhj.length == 1) {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36911) Add AQE Planning Times to SQL Metrics

2021-10-01 Thread Michael Chen (Jira)
Michael Chen created SPARK-36911:


 Summary: Add AQE Planning Times to SQL Metrics
 Key: SPARK-36911
 URL: https://issues.apache.org/jira/browse/SPARK-36911
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.1.2
Reporter: Michael Chen


Add metrics for durations of "reOptimize", "generate explainString" and 
"createQueryStages" to AdaptiveSparkPlanExec metrics to make it easier to see 
overhead of AQE for a query



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-36795) Explain Formatted has Duplicated Node IDs with InMemoryRelation Present

2021-09-17 Thread Michael Chen (Jira)
Michael Chen created SPARK-36795:


 Summary: Explain Formatted has Duplicated Node IDs with 
InMemoryRelation Present
 Key: SPARK-36795
 URL: https://issues.apache.org/jira/browse/SPARK-36795
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.1.2
Reporter: Michael Chen


When a query contains an InMemoryRelation, the output of Explain Formatted will 
contain duplicate node IDs.


{code:java}
== Physical Plan ==
AdaptiveSparkPlan (14)
+- == Final Plan ==
   * BroadcastHashJoin Inner BuildLeft (9)
   :- BroadcastQueryStage (5)
   :  +- BroadcastExchange (4)
   : +- * Filter (3)
   :+- * ColumnarToRow (2)
   :   +- InMemoryTableScan (1)
   : +- InMemoryRelation (2)
   :   +- * ColumnarToRow (4)
   :  +- Scan parquet default.t1 (3)
   +- * Filter (8)
  +- * ColumnarToRow (7)
 +- Scan parquet default.t2 (6)
+- == Initial Plan ==
   BroadcastHashJoin Inner BuildLeft (13)
   :- BroadcastExchange (11)
   :  +- Filter (10)
   : +- InMemoryTableScan (1)
   :   +- InMemoryRelation (2)
   : +- * ColumnarToRow (4)
   :+- Scan parquet default.t1 (3)
   +- Filter (12)
  +- Scan parquet default.t2 (6)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34780) Cached Table (parquet) with old Configs Used

2021-04-14 Thread Michael Chen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17321270#comment-17321270
 ] 

Michael Chen commented on SPARK-34780:
--

Hi [~csun]. Also sorry for late reply. My understanding is the config lets you 
ignore malformed columns if they aren't part of the schema. I thought this 
affected correctness because you would get different results with the config 
on/off based on the presence of malformed columns that aren't relevant to the 
query.
I also think the way to solve this issue would be defining equality for SQLConf.

> Cached Table (parquet) with old Configs Used
> 
>
> Key: SPARK-34780
> URL: https://issues.apache.org/jira/browse/SPARK-34780
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4, 3.1.1
>Reporter: Michael Chen
>Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark 
> session meaning the SQLConfs are stored. Then if a different dataframe can 
> replace parts of it's logical plan with a cached logical plan, the cached 
> SQLConfs will be used for the evaluation of the cached logical plan. This is 
> because HadoopFsRelation ignores sparkSession for equality checks (introduced 
> in https://issues.apache.org/jira/browse/SPARK-17358).
> {code:java}
> test("cache uses old SQLConf") {
>   import testImplicits._
>   withTempDir { dir =>
> val tableDir = dir.getAbsoluteFile + "/table"
> val df = Seq("a").toDF("key")
> df.write.parquet(tableDir)
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1Stats = spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
> val df2 = spark.read.parquet(tableDir).select("key")
> df2.cache()
> val compression10Stats = df2.queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1StatsWithCache = 
> spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> // I expect these stats to be the same because file compression factor is 
> the same
> assert(compression1Stats == compression1StatsWithCache)
> // Instead, we can see the file compression factor is being cached and 
> used along with
> // the logical plan
> assert(compression10Stats == compression1StatsWithCache)
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34780) Cached Table (parquet) with old Configs Used

2021-03-29 Thread Michael Chen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17311008#comment-17311008
 ] 

Michael Chen commented on SPARK-34780:
--

Hey [~csun]. I did find a conf that can cause a diff in correctness, but I'm 
not sure how serious it is (I didn't find other confs affecting correctness, 
but also didn't look too hard/do people flip this conf?) I added a test case 
that shows the problem though.
{code:java}
test("cache uses old SQLConf") {
  SQLConf.get.setConfString(SQLConf.CSV_PARSER_COLUMN_PRUNING.key, "true")
  val carsFile = "test-data/cars.csv"
  val cars = spark.read
.format("csv")
.option("multiLine", false)
.options(Map("header" -> "true", "mode" -> "dropmalformed"))
.load(testFile(carsFile))

  val numRows = cars.select("year").collect().length

  SQLConf.get.setConfString(SQLConf.CSV_PARSER_COLUMN_PRUNING.key, "false")
  spark.read
.format("csv")
.option("multiLine", false)
.options(Map("header" -> "true", "mode" -> "dropmalformed"))
.load(testFile(carsFile)).cache().count()

  SQLConf.get.setConfString(SQLConf.CSV_PARSER_COLUMN_PRUNING.key, "true")
  val numRowsReadCache = spark.read
.format("csv")
.option("multiLine", false)
.options(Map("header" -> "true", "mode" -> "dropmalformed"))
.load(testFile(carsFile)).select("year").collect().length

  assert(numRows == numRowsReadCache)
}
{code}

> Cached Table (parquet) with old Configs Used
> 
>
> Key: SPARK-34780
> URL: https://issues.apache.org/jira/browse/SPARK-34780
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4, 3.1.1
>Reporter: Michael Chen
>Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark 
> session meaning the SQLConfs are stored. Then if a different dataframe can 
> replace parts of it's logical plan with a cached logical plan, the cached 
> SQLConfs will be used for the evaluation of the cached logical plan. This is 
> because HadoopFsRelation ignores sparkSession for equality checks (introduced 
> in https://issues.apache.org/jira/browse/SPARK-17358).
> {code:java}
> test("cache uses old SQLConf") {
>   import testImplicits._
>   withTempDir { dir =>
> val tableDir = dir.getAbsoluteFile + "/table"
> val df = Seq("a").toDF("key")
> df.write.parquet(tableDir)
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1Stats = spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
> val df2 = spark.read.parquet(tableDir).select("key")
> df2.cache()
> val compression10Stats = df2.queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1StatsWithCache = 
> spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> // I expect these stats to be the same because file compression factor is 
> the same
> assert(compression1Stats == compression1StatsWithCache)
> // Instead, we can see the file compression factor is being cached and 
> used along with
> // the logical plan
> assert(compression10Stats == compression1StatsWithCache)
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34780) Cached Table (parquet) with old Configs Used

2021-03-26 Thread Michael Chen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17309514#comment-17309514
 ] 

Michael Chen commented on SPARK-34780:
--

Yes, I think as long as there aren't any correctness issues it's fine. 

> Cached Table (parquet) with old Configs Used
> 
>
> Key: SPARK-34780
> URL: https://issues.apache.org/jira/browse/SPARK-34780
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4, 3.1.1
>Reporter: Michael Chen
>Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark 
> session meaning the SQLConfs are stored. Then if a different dataframe can 
> replace parts of it's logical plan with a cached logical plan, the cached 
> SQLConfs will be used for the evaluation of the cached logical plan. This is 
> because HadoopFsRelation ignores sparkSession for equality checks (introduced 
> in https://issues.apache.org/jira/browse/SPARK-17358).
> {code:java}
> test("cache uses old SQLConf") {
>   import testImplicits._
>   withTempDir { dir =>
> val tableDir = dir.getAbsoluteFile + "/table"
> val df = Seq("a").toDF("key")
> df.write.parquet(tableDir)
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1Stats = spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
> val df2 = spark.read.parquet(tableDir).select("key")
> df2.cache()
> val compression10Stats = df2.queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1StatsWithCache = 
> spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> // I expect these stats to be the same because file compression factor is 
> the same
> assert(compression1Stats == compression1StatsWithCache)
> // Instead, we can see the file compression factor is being cached and 
> used along with
> // the logical plan
> assert(compression10Stats == compression1StatsWithCache)
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34780) Cached Table (parquet) with old Configs Used

2021-03-25 Thread Michael Chen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17308811#comment-17308811
 ] 

Michael Chen commented on SPARK-34780:
--

Np [~csun]. If the cache isn't materialized until after the configs change, 
then I believe the input RDDs for InMemoryTableScanExec are still built with 
the old stale confs so the stale confs would also be used in the 
DataSourceScanExec? Even if the cache was materialized before the configs 
changed, reading an RDD that was created with a stale conf would be a concern 
if any of the confs can change results right? (not sure if this is possible)

> Cached Table (parquet) with old Configs Used
> 
>
> Key: SPARK-34780
> URL: https://issues.apache.org/jira/browse/SPARK-34780
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4, 3.1.1
>Reporter: Michael Chen
>Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark 
> session meaning the SQLConfs are stored. Then if a different dataframe can 
> replace parts of it's logical plan with a cached logical plan, the cached 
> SQLConfs will be used for the evaluation of the cached logical plan. This is 
> because HadoopFsRelation ignores sparkSession for equality checks (introduced 
> in https://issues.apache.org/jira/browse/SPARK-17358).
> {code:java}
> test("cache uses old SQLConf") {
>   import testImplicits._
>   withTempDir { dir =>
> val tableDir = dir.getAbsoluteFile + "/table"
> val df = Seq("a").toDF("key")
> df.write.parquet(tableDir)
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1Stats = spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
> val df2 = spark.read.parquet(tableDir).select("key")
> df2.cache()
> val compression10Stats = df2.queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1StatsWithCache = 
> spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> // I expect these stats to be the same because file compression factor is 
> the same
> assert(compression1Stats == compression1StatsWithCache)
> // Instead, we can see the file compression factor is being cached and 
> used along with
> // the logical plan
> assert(compression10Stats == compression1StatsWithCache)
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-34780) Cached Table (parquet) with old Configs Used

2021-03-19 Thread Michael Chen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17305162#comment-17305162
 ] 

Michael Chen edited comment on SPARK-34780 at 3/19/21, 8:00 PM:


I used computeStats because it was a simple way to display the problem. But any 
config that is read through the relation's spark session would have the same 
problem where the cached config is read instead of config set through SQLConf 
(or other mechanisms). For example, when building the file readers in 
DataSourceScanExec, the fsRelation.sparkSession is passed along so configs in 
these readers could be wrong.


was (Author: mikechen):
I used computeStats because it was a simple way to display the problem. But any 
config that is read through the relation's spark session would have the same 
problem where the cached config is read instead of config set through SQLConf 
(or other mechanisms)

> Cached Table (parquet) with old Configs Used
> 
>
> Key: SPARK-34780
> URL: https://issues.apache.org/jira/browse/SPARK-34780
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4, 3.1.1
>Reporter: Michael Chen
>Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark 
> session meaning the SQLConfs are stored. Then if a different dataframe can 
> replace parts of it's logical plan with a cached logical plan, the cached 
> SQLConfs will be used for the evaluation of the cached logical plan. This is 
> because HadoopFsRelation ignores sparkSession for equality checks (introduced 
> in https://issues.apache.org/jira/browse/SPARK-17358).
> {code:java}
> test("cache uses old SQLConf") {
>   import testImplicits._
>   withTempDir { dir =>
> val tableDir = dir.getAbsoluteFile + "/table"
> val df = Seq("a").toDF("key")
> df.write.parquet(tableDir)
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1Stats = spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
> val df2 = spark.read.parquet(tableDir).select("key")
> df2.cache()
> val compression10Stats = df2.queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1StatsWithCache = 
> spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> // I expect these stats to be the same because file compression factor is 
> the same
> assert(compression1Stats == compression1StatsWithCache)
> // Instead, we can see the file compression factor is being cached and 
> used along with
> // the logical plan
> assert(compression10Stats == compression1StatsWithCache)
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-34780) Cached Table (parquet) with old Configs Used

2021-03-19 Thread Michael Chen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17305162#comment-17305162
 ] 

Michael Chen edited comment on SPARK-34780 at 3/19/21, 8:00 PM:


I used computeStats because it was a simple way to display the problem. But any 
config that is read through the relation's spark session would have the same 
problem where the cached config is read instead of config set through SQLConf 
(or other mechanisms). For example, when building the file readers in 
DataSourceScanExec, relation.sparkSession is passed along so configs in these 
readers could be wrong.


was (Author: mikechen):
I used computeStats because it was a simple way to display the problem. But any 
config that is read through the relation's spark session would have the same 
problem where the cached config is read instead of config set through SQLConf 
(or other mechanisms). For example, when building the file readers in 
DataSourceScanExec, the fsRelation.sparkSession is passed along so configs in 
these readers could be wrong.

> Cached Table (parquet) with old Configs Used
> 
>
> Key: SPARK-34780
> URL: https://issues.apache.org/jira/browse/SPARK-34780
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4, 3.1.1
>Reporter: Michael Chen
>Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark 
> session meaning the SQLConfs are stored. Then if a different dataframe can 
> replace parts of it's logical plan with a cached logical plan, the cached 
> SQLConfs will be used for the evaluation of the cached logical plan. This is 
> because HadoopFsRelation ignores sparkSession for equality checks (introduced 
> in https://issues.apache.org/jira/browse/SPARK-17358).
> {code:java}
> test("cache uses old SQLConf") {
>   import testImplicits._
>   withTempDir { dir =>
> val tableDir = dir.getAbsoluteFile + "/table"
> val df = Seq("a").toDF("key")
> df.write.parquet(tableDir)
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1Stats = spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
> val df2 = spark.read.parquet(tableDir).select("key")
> df2.cache()
> val compression10Stats = df2.queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1StatsWithCache = 
> spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> // I expect these stats to be the same because file compression factor is 
> the same
> assert(compression1Stats == compression1StatsWithCache)
> // Instead, we can see the file compression factor is being cached and 
> used along with
> // the logical plan
> assert(compression10Stats == compression1StatsWithCache)
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-34780) Cached Table (parquet) with old Configs Used

2021-03-19 Thread Michael Chen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17305162#comment-17305162
 ] 

Michael Chen commented on SPARK-34780:
--

I used computeStats because it was a simple way to display the problem. But any 
config that is read through the relation's spark session would have the same 
problem where the cached config is read instead of config set through SQLConf 
(or other mechanisms)

> Cached Table (parquet) with old Configs Used
> 
>
> Key: SPARK-34780
> URL: https://issues.apache.org/jira/browse/SPARK-34780
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4, 3.1.1
>Reporter: Michael Chen
>Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark 
> session meaning the SQLConfs are stored. Then if a different dataframe can 
> replace parts of it's logical plan with a cached logical plan, the cached 
> SQLConfs will be used for the evaluation of the cached logical plan. This is 
> because HadoopFsRelation ignores sparkSession for equality checks (introduced 
> in https://issues.apache.org/jira/browse/SPARK-17358).
> {code:java}
> test("cache uses old SQLConf") {
>   import testImplicits._
>   withTempDir { dir =>
> val tableDir = dir.getAbsoluteFile + "/table"
> val df = Seq("a").toDF("key")
> df.write.parquet(tableDir)
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1Stats = spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
> val df2 = spark.read.parquet(tableDir).select("key")
> df2.cache()
> val compression10Stats = df2.queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1StatsWithCache = 
> spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> // I expect these stats to be the same because file compression factor is 
> the same
> assert(compression1Stats == compression1StatsWithCache)
> // Instead, we can see the file compression factor is being cached and 
> used along with
> // the logical plan
> assert(compression10Stats == compression1StatsWithCache)
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34780) Cached Table (parquet) with old Configs Used

2021-03-18 Thread Michael Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Chen updated SPARK-34780:
-
Description: 
When a dataframe is cached, the logical plan can contain copies of the spark 
session meaning the SQLConfs are stored. Then if a different dataframe can 
replace parts of it's logical plan with a cached logical plan, the cached 
SQLConfs will be used for the evaluation of the cached logical plan. This is 
because HadoopFsRelation ignores sparkSession for equality checks (introduced 
in https://issues.apache.org/jira/browse/SPARK-17358).
{code:java}
test("cache uses old SQLConf") {
  import testImplicits._
  withTempDir { dir =>
val tableDir = dir.getAbsoluteFile + "/table"
val df = Seq("a").toDF("key")
df.write.parquet(tableDir)
SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
val compression1Stats = spark.read.parquet(tableDir).select("key").
  queryExecution.optimizedPlan.collect {
  case l: LogicalRelation => l
  case m: InMemoryRelation => m
}.map(_.computeStats())

SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
val df2 = spark.read.parquet(tableDir).select("key")
df2.cache()
val compression10Stats = df2.queryExecution.optimizedPlan.collect {
  case l: LogicalRelation => l
  case m: InMemoryRelation => m
}.map(_.computeStats())

SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
val compression1StatsWithCache = spark.read.parquet(tableDir).select("key").
  queryExecution.optimizedPlan.collect {
  case l: LogicalRelation => l
  case m: InMemoryRelation => m
}.map(_.computeStats())

// I expect these stats to be the same because file compression factor is 
the same
assert(compression1Stats == compression1StatsWithCache)
// Instead, we can see the file compression factor is being cached and used 
along with
// the logical plan
assert(compression10Stats == compression1StatsWithCache)
  }
}{code}
 

  was:
When a dataframe is cached, the logical plan can contain copies of the spark 
session meaning the SQLConfs are stored. Then if a different dataframe can 
replace parts of it's logical plan with a cached logical plan, the cached 
SQLConfs will be used for the evaluation of the cached logical plan. This is 
because HadoopFsRelation ignores sparkSession for equality checks (introduced 
in https://issues.apache.org/jira/browse/SPARK-17358). I suspect this also 
happens in other versions of Spark but haven't tested yet.
{code:java}
test("cache uses old SQLConf") {
  import testImplicits._
  withTempDir { dir =>
val tableDir = dir.getAbsoluteFile + "/table"
val df = Seq("a").toDF("key")
df.write.parquet(tableDir)
SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
val compression1Stats = spark.read.parquet(tableDir).select("key").
  queryExecution.optimizedPlan.collect {
  case l: LogicalRelation => l
  case m: InMemoryRelation => m
}.map(_.computeStats())

SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
val df2 = spark.read.parquet(tableDir).select("key")
df2.cache()
val compression10Stats = df2.queryExecution.optimizedPlan.collect {
  case l: LogicalRelation => l
  case m: InMemoryRelation => m
}.map(_.computeStats())

SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
val compression1StatsWithCache = spark.read.parquet(tableDir).select("key").
  queryExecution.optimizedPlan.collect {
  case l: LogicalRelation => l
  case m: InMemoryRelation => m
}.map(_.computeStats())

// I expect these stats to be the same because file compression factor is 
the same
assert(compression1Stats == compression1StatsWithCache)
// Instead, we can see the file compression factor is being cached and used 
along with
// the logical plan
assert(compression10Stats == compression1StatsWithCache)
  }
}{code}
 


> Cached Table (parquet) with old Configs Used
> 
>
> Key: SPARK-34780
> URL: https://issues.apache.org/jira/browse/SPARK-34780
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4, 3.1.1
>Reporter: Michael Chen
>Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark 
> session meaning the SQLConfs are stored. Then if a different dataframe can 
> replace parts of it's logical plan with a cached logical plan, the cached 
> SQLConfs will be used for the evaluation of the cached logical plan. This is 
> because HadoopFsRelation ignores sparkSession for equality checks (introduced 
> in https://issues.apache.org/jira/browse/SPARK-17358).
> {code:java}
> test("cache uses old SQLConf") {
>   import 

[jira] [Updated] (SPARK-34780) Cached Table (parquet) with old Configs Used

2021-03-18 Thread Michael Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Chen updated SPARK-34780:
-
Affects Version/s: 3.1.1

> Cached Table (parquet) with old Configs Used
> 
>
> Key: SPARK-34780
> URL: https://issues.apache.org/jira/browse/SPARK-34780
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4, 3.1.1
>Reporter: Michael Chen
>Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark 
> session meaning the SQLConfs are stored. Then if a different dataframe can 
> replace parts of it's logical plan with a cached logical plan, the cached 
> SQLConfs will be used for the evaluation of the cached logical plan. This is 
> because HadoopFsRelation ignores sparkSession for equality checks (introduced 
> in https://issues.apache.org/jira/browse/SPARK-17358). I suspect this also 
> happens in other versions of Spark but haven't tested yet.
> {code:java}
> test("cache uses old SQLConf") {
>   import testImplicits._
>   withTempDir { dir =>
> val tableDir = dir.getAbsoluteFile + "/table"
> val df = Seq("a").toDF("key")
> df.write.parquet(tableDir)
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1Stats = spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
> val df2 = spark.read.parquet(tableDir).select("key")
> df2.cache()
> val compression10Stats = df2.queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
> val compression1StatsWithCache = 
> spark.read.parquet(tableDir).select("key").
>   queryExecution.optimizedPlan.collect {
>   case l: LogicalRelation => l
>   case m: InMemoryRelation => m
> }.map(_.computeStats())
> // I expect these stats to be the same because file compression factor is 
> the same
> assert(compression1Stats == compression1StatsWithCache)
> // Instead, we can see the file compression factor is being cached and 
> used along with
> // the logical plan
> assert(compression10Stats == compression1StatsWithCache)
>   }
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-34780) Cached Table (parquet) with old Configs Used

2021-03-17 Thread Michael Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-34780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Chen updated SPARK-34780:
-
Description: 
When a dataframe is cached, the logical plan can contain copies of the spark 
session meaning the SQLConfs are stored. Then if a different dataframe can 
replace parts of it's logical plan with a cached logical plan, the cached 
SQLConfs will be used for the evaluation of the cached logical plan. This is 
because HadoopFsRelation ignores sparkSession for equality checks (introduced 
in https://issues.apache.org/jira/browse/SPARK-17358). I suspect this also 
happens in other versions of Spark but haven't tested yet.
{code:java}
test("cache uses old SQLConf") {
  import testImplicits._
  withTempDir { dir =>
val tableDir = dir.getAbsoluteFile + "/table"
val df = Seq("a").toDF("key")
df.write.parquet(tableDir)
SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
val compression1Stats = spark.read.parquet(tableDir).select("key").
  queryExecution.optimizedPlan.collect {
  case l: LogicalRelation => l
  case m: InMemoryRelation => m
}.map(_.computeStats())

SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
val df2 = spark.read.parquet(tableDir).select("key")
df2.cache()
val compression10Stats = df2.queryExecution.optimizedPlan.collect {
  case l: LogicalRelation => l
  case m: InMemoryRelation => m
}.map(_.computeStats())

SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
val compression1StatsWithCache = spark.read.parquet(tableDir).select("key").
  queryExecution.optimizedPlan.collect {
  case l: LogicalRelation => l
  case m: InMemoryRelation => m
}.map(_.computeStats())

// I expect these stats to be the same because file compression factor is 
the same
assert(compression1Stats == compression1StatsWithCache)
// Instead, we can see the file compression factor is being cached and used 
along with
// the logical plan
assert(compression10Stats == compression1StatsWithCache)
  }
}{code}
 

  was:
When a dataframe is cached, the logical plan can contain copies of the spark 
session meaning the SQLConfs are stored. Then if a different dataframe can 
replace parts of it's logical plan with a cached logical plan, the cached 
SQLConfs will be used for the evaluation of the cached logical plan. This is 
because HadoopFsRelation ignores sparkSession for equality checks (introduced 
in https://issues.apache.org/jira/browse/SPARK-17358).
{code:java}
test("cache uses old SQLConf") {
  import testImplicits._
  withTempDir { dir =>
val tableDir = dir.getAbsoluteFile + "/table"
val df = Seq("a").toDF("key")
df.write.parquet(tableDir)
SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
val compression1Stats = spark.read.parquet(tableDir).select("key").
  queryExecution.optimizedPlan.collect {
  case l: LogicalRelation => l
  case m: InMemoryRelation => m
}.map(_.computeStats())

SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
val df2 = spark.read.parquet(tableDir).select("key")
df2.cache()
val compression10Stats = df2.queryExecution.optimizedPlan.collect {
  case l: LogicalRelation => l
  case m: InMemoryRelation => m
}.map(_.computeStats())

SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
val compression1StatsWithCache = spark.read.parquet(tableDir).select("key").
  queryExecution.optimizedPlan.collect {
  case l: LogicalRelation => l
  case m: InMemoryRelation => m
}.map(_.computeStats())

// I expect these stats to be the same because file compression factor is 
the same
assert(compression1Stats == compression1StatsWithCache)
// Instead, we can see the file compression factor is being cached and used 
along with
// the logical plan
assert(compression10Stats == compression1StatsWithCache)
  }
}{code}
 


> Cached Table (parquet) with old Configs Used
> 
>
> Key: SPARK-34780
> URL: https://issues.apache.org/jira/browse/SPARK-34780
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.4
>Reporter: Michael Chen
>Priority: Major
>
> When a dataframe is cached, the logical plan can contain copies of the spark 
> session meaning the SQLConfs are stored. Then if a different dataframe can 
> replace parts of it's logical plan with a cached logical plan, the cached 
> SQLConfs will be used for the evaluation of the cached logical plan. This is 
> because HadoopFsRelation ignores sparkSession for equality checks (introduced 
> in https://issues.apache.org/jira/browse/SPARK-17358). I suspect this also 
> happens in other versions of Spark but haven't 

[jira] [Created] (SPARK-34780) Cached Table (parquet) with old Configs Used

2021-03-17 Thread Michael Chen (Jira)
Michael Chen created SPARK-34780:


 Summary: Cached Table (parquet) with old Configs Used
 Key: SPARK-34780
 URL: https://issues.apache.org/jira/browse/SPARK-34780
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.4.4
Reporter: Michael Chen


When a dataframe is cached, the logical plan can contain copies of the spark 
session meaning the SQLConfs are stored. Then if a different dataframe can 
replace parts of it's logical plan with a cached logical plan, the cached 
SQLConfs will be used for the evaluation of the cached logical plan. This is 
because HadoopFsRelation ignores sparkSession for equality checks (introduced 
in https://issues.apache.org/jira/browse/SPARK-17358).
{code:java}
test("cache uses old SQLConf") {
  import testImplicits._
  withTempDir { dir =>
val tableDir = dir.getAbsoluteFile + "/table"
val df = Seq("a").toDF("key")
df.write.parquet(tableDir)
SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
val compression1Stats = spark.read.parquet(tableDir).select("key").
  queryExecution.optimizedPlan.collect {
  case l: LogicalRelation => l
  case m: InMemoryRelation => m
}.map(_.computeStats())

SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "10")
val df2 = spark.read.parquet(tableDir).select("key")
df2.cache()
val compression10Stats = df2.queryExecution.optimizedPlan.collect {
  case l: LogicalRelation => l
  case m: InMemoryRelation => m
}.map(_.computeStats())

SQLConf.get.setConfString(SQLConf.FILE_COMPRESSION_FACTOR.key, "1")
val compression1StatsWithCache = spark.read.parquet(tableDir).select("key").
  queryExecution.optimizedPlan.collect {
  case l: LogicalRelation => l
  case m: InMemoryRelation => m
}.map(_.computeStats())

// I expect these stats to be the same because file compression factor is 
the same
assert(compression1Stats == compression1StatsWithCache)
// Instead, we can see the file compression factor is being cached and used 
along with
// the logical plan
assert(compression10Stats == compression1StatsWithCache)
  }
}{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-29561) Large Case Statement Code Generation OOM

2019-10-29 Thread Michael Chen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962361#comment-16962361
 ] 

Michael Chen commented on SPARK-29561:
--

If I increase the memory, it will run into the generated code grows beyond 64 
KB exception and disable whole stage code generation for the plan. So that is 
ok.
But if I increase the number of branches/complexity of the branches, it will 
just run into the OOM problem again.

> Large Case Statement Code Generation OOM
> 
>
> Key: SPARK-29561
> URL: https://issues.apache.org/jira/browse/SPARK-29561
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Michael Chen
>Priority: Major
> Attachments: apacheSparkCase.sql
>
>
> Spark Configuration
> spark.driver.memory = 1g
>  spark.master = "local"
>  spark.deploy.mode = "client"
> Try to execute a case statement with 3000+ branches. Added sql statement as 
> attachment
>  Spark runs for a while before it OOM
> {noformat}
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>   at 
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182)
>   at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320)
>   at 
> org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178)
>   at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73)
> 19/10/22 16:19:54 ERROR FileFormatWriter: Aborting job null.
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>   at java.util.HashMap.newNode(HashMap.java:1750)
>   at java.util.HashMap.putVal(HashMap.java:631)
>   at java.util.HashMap.putMapEntries(HashMap.java:515)
>   at java.util.HashMap.putAll(HashMap.java:785)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3345)
>   at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:212)
>   at 
> org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3230)
>   at 
> org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3198)
>   at 
> org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3351)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3254)
>   at org.codehaus.janino.UnitCompiler.access$3900(UnitCompiler.java:212)
>   at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3216)
>   at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3198)
>   at org.codehaus.janino.Java$Block.accept(Java.java:2756)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3260)
>   at org.codehaus.janino.UnitCompiler.access$4000(UnitCompiler.java:212)
>   at 
> org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3217)
>   at 
> org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3198)
>   at org.codehaus.janino.Java$DoStatement.accept(Java.java:3304)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3186)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3009)
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1336)
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1309)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:799)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:958)
>   at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:212)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:393)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:385)
>   at 
> org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1286)
> 19/10/22 16:19:54 ERROR Utils: throw uncaught fatal error in thread Spark 
> Context Cleaner
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>   at 
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182)
>   at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320)
>   at 
> org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178)
>   at 
> 

[jira] [Comment Edited] (SPARK-29561) Large Case Statement Code Generation OOM

2019-10-29 Thread Michael Chen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962361#comment-16962361
 ] 

Michael Chen edited comment on SPARK-29561 at 10/29/19 7:59 PM:


If I increase the memory, it will run into the generated code grows beyond 64 
KB exception and disable whole stage code generation for the plan.
 But if I increase the number of branches/complexity of the branches, it will 
just run into the OOM problem again.


was (Author: mikechen):
If I increase the memory, it will run into the generated code grows beyond 64 
KB exception and disable whole stage code generation for the plan. So that is 
ok.
But if I increase the number of branches/complexity of the branches, it will 
just run into the OOM problem again.

> Large Case Statement Code Generation OOM
> 
>
> Key: SPARK-29561
> URL: https://issues.apache.org/jira/browse/SPARK-29561
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Michael Chen
>Priority: Major
> Attachments: apacheSparkCase.sql
>
>
> Spark Configuration
> spark.driver.memory = 1g
>  spark.master = "local"
>  spark.deploy.mode = "client"
> Try to execute a case statement with 3000+ branches. Added sql statement as 
> attachment
>  Spark runs for a while before it OOM
> {noformat}
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>   at 
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182)
>   at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320)
>   at 
> org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178)
>   at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73)
> 19/10/22 16:19:54 ERROR FileFormatWriter: Aborting job null.
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>   at java.util.HashMap.newNode(HashMap.java:1750)
>   at java.util.HashMap.putVal(HashMap.java:631)
>   at java.util.HashMap.putMapEntries(HashMap.java:515)
>   at java.util.HashMap.putAll(HashMap.java:785)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3345)
>   at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:212)
>   at 
> org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3230)
>   at 
> org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3198)
>   at 
> org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3351)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3254)
>   at org.codehaus.janino.UnitCompiler.access$3900(UnitCompiler.java:212)
>   at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3216)
>   at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3198)
>   at org.codehaus.janino.Java$Block.accept(Java.java:2756)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3260)
>   at org.codehaus.janino.UnitCompiler.access$4000(UnitCompiler.java:212)
>   at 
> org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3217)
>   at 
> org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3198)
>   at org.codehaus.janino.Java$DoStatement.accept(Java.java:3304)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3186)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3009)
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1336)
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1309)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:799)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:958)
>   at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:212)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:393)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:385)
>   at 
> org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1286)
> 19/10/22 16:19:54 ERROR Utils: throw uncaught fatal error in thread Spark 
> Context Cleaner
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>   at 
> 

[jira] [Comment Edited] (SPARK-29561) Large Case Statement Code Generation OOM

2019-10-29 Thread Michael Chen (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962361#comment-16962361
 ] 

Michael Chen edited comment on SPARK-29561 at 10/29/19 7:59 PM:


Yes it works when I increase the driver memory. It just runs into the generated 
code grows beyond 64 KB exception and then disables whole stage code generation 
for the plan.
 But if I increase the number of branches/complexity of the branches, it will 
just run into the OOM problem again.


was (Author: mikechen):
If I increase the memory, it will run into the generated code grows beyond 64 
KB exception and disable whole stage code generation for the plan.
 But if I increase the number of branches/complexity of the branches, it will 
just run into the OOM problem again.

> Large Case Statement Code Generation OOM
> 
>
> Key: SPARK-29561
> URL: https://issues.apache.org/jira/browse/SPARK-29561
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Michael Chen
>Priority: Major
> Attachments: apacheSparkCase.sql
>
>
> Spark Configuration
> spark.driver.memory = 1g
>  spark.master = "local"
>  spark.deploy.mode = "client"
> Try to execute a case statement with 3000+ branches. Added sql statement as 
> attachment
>  Spark runs for a while before it OOM
> {noformat}
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>   at 
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182)
>   at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320)
>   at 
> org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178)
>   at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73)
> 19/10/22 16:19:54 ERROR FileFormatWriter: Aborting job null.
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>   at java.util.HashMap.newNode(HashMap.java:1750)
>   at java.util.HashMap.putVal(HashMap.java:631)
>   at java.util.HashMap.putMapEntries(HashMap.java:515)
>   at java.util.HashMap.putAll(HashMap.java:785)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3345)
>   at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:212)
>   at 
> org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3230)
>   at 
> org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3198)
>   at 
> org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3351)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3254)
>   at org.codehaus.janino.UnitCompiler.access$3900(UnitCompiler.java:212)
>   at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3216)
>   at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3198)
>   at org.codehaus.janino.Java$Block.accept(Java.java:2756)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3260)
>   at org.codehaus.janino.UnitCompiler.access$4000(UnitCompiler.java:212)
>   at 
> org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3217)
>   at 
> org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3198)
>   at org.codehaus.janino.Java$DoStatement.accept(Java.java:3304)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3186)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3009)
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1336)
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1309)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:799)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:958)
>   at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:212)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:393)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:385)
>   at 
> org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1286)
> 19/10/22 16:19:54 ERROR Utils: throw uncaught fatal error in thread Spark 
> Context Cleaner
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>   at 
> 

[jira] [Updated] (SPARK-29561) Large Case Statement Code Generation OOM

2019-10-22 Thread Michael Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Chen updated SPARK-29561:
-
Description: 
Spark Configuration

spark.driver.memory = 1g
 spark.master = "local"
 spark.deploy.mode = "client"

Try to execute a case statement with 3000+ branches. Added sql statement as 
attachment
 Spark runs for a while before it OOM
{noformat}
java.lang.OutOfMemoryError: GC overhead limit exceeded
at 
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320)
at 
org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178)
at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73)
19/10/22 16:19:54 ERROR FileFormatWriter: Aborting job null.
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.HashMap.newNode(HashMap.java:1750)
at java.util.HashMap.putVal(HashMap.java:631)
at java.util.HashMap.putMapEntries(HashMap.java:515)
at java.util.HashMap.putAll(HashMap.java:785)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3345)
at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3230)
at 
org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3198)
at 
org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3351)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3254)
at org.codehaus.janino.UnitCompiler.access$3900(UnitCompiler.java:212)
at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3216)
at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3198)
at org.codehaus.janino.Java$Block.accept(Java.java:2756)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3260)
at org.codehaus.janino.UnitCompiler.access$4000(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3217)
at 
org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3198)
at org.codehaus.janino.Java$DoStatement.accept(Java.java:3304)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3186)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3009)
at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1336)
at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1309)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:799)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:958)
at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:393)
at 
org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:385)
at 
org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1286)
19/10/22 16:19:54 ERROR Utils: throw uncaught fatal error in thread Spark 
Context Cleaner
java.lang.OutOfMemoryError: GC overhead limit exceeded
at 
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320)
at 
org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178)
at 
org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73){noformat}
 Generated code looks like
{noformat}
/* 029 */   private void project_doConsume(InternalRow scan_row, UTF8String 
project_expr_0, boolean project_exprIsNull_0) throws java.io.IOException {
/* 030 */     byte project_caseWhenResultState = -1;
/* 031 */     do {
/* 032 */       boolean project_isNull1 = true;
/* 033 */       boolean project_value1 = false;
/* 034 */
/* 035 */       boolean project_isNull2 = project_exprIsNull_0;
/* 036 */       int project_value2 = -1;
/* 037 */       if (!project_exprIsNull_0) {
/* 038 */         UTF8String.IntWrapper project_intWrapper = new 
UTF8String.IntWrapper();
/* 039 */         if (project_expr_0.toInt(project_intWrapper)) {
/* 040 */           project_value2 = 

[jira] [Updated] (SPARK-29561) Large Case Statement Code Generation OOM

2019-10-22 Thread Michael Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Chen updated SPARK-29561:
-
Attachment: apacheSparkCase.sql

> Large Case Statement Code Generation OOM
> 
>
> Key: SPARK-29561
> URL: https://issues.apache.org/jira/browse/SPARK-29561
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Michael Chen
>Priority: Major
> Attachments: apacheSparkCase.sql
>
>
> Spark Configuration
> spark.driver.memory = 1g
>  spark.master = "local"
>  spark.deploy.mode = "client"
> Try to execute a case statement with 3000+ branches.
>  Spark runs for a while before it OOM
> {noformat}
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>   at 
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182)
>   at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320)
>   at 
> org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178)
>   at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73)
> 19/10/22 16:19:54 ERROR FileFormatWriter: Aborting job null.
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>   at java.util.HashMap.newNode(HashMap.java:1750)
>   at java.util.HashMap.putVal(HashMap.java:631)
>   at java.util.HashMap.putMapEntries(HashMap.java:515)
>   at java.util.HashMap.putAll(HashMap.java:785)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3345)
>   at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:212)
>   at 
> org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3230)
>   at 
> org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3198)
>   at 
> org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3351)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3254)
>   at org.codehaus.janino.UnitCompiler.access$3900(UnitCompiler.java:212)
>   at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3216)
>   at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3198)
>   at org.codehaus.janino.Java$Block.accept(Java.java:2756)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3260)
>   at org.codehaus.janino.UnitCompiler.access$4000(UnitCompiler.java:212)
>   at 
> org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3217)
>   at 
> org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3198)
>   at org.codehaus.janino.Java$DoStatement.accept(Java.java:3304)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197)
>   at 
> org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3186)
>   at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3009)
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1336)
>   at 
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1309)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:799)
>   at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:958)
>   at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:212)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:393)
>   at 
> org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:385)
>   at 
> org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1286)
> 19/10/22 16:19:54 ERROR Utils: throw uncaught fatal error in thread Spark 
> Context Cleaner
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>   at 
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182)
>   at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320)
>   at 
> org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178)
>   at 
> org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73){noformat}
>  Generated code looks like
> {noformat}
> /* 029 */   private void project_doConsume(InternalRow scan_row, UTF8String 
> project_expr_0, boolean project_exprIsNull_0) throws java.io.IOException {
> /* 030 */     byte project_caseWhenResultState = -1;
> 

[jira] [Updated] (SPARK-29561) Large Case Statement Code Generation OOM

2019-10-22 Thread Michael Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Chen updated SPARK-29561:
-
Description: 
Spark Configuration

spark.driver.memory = 1g
 spark.master = "local"
 spark.deploy.mode = "client"

Try to execute a case statement with 3000+ branches. Added sql statement as 
attachment
 Spark runs for a while before it OOM
{noformat}
java.lang.OutOfMemoryError: GC overhead limit exceeded
at 
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320)
at 
org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178)
at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73)
19/10/22 16:19:54 ERROR FileFormatWriter: Aborting job null.
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.HashMap.newNode(HashMap.java:1750)
at java.util.HashMap.putVal(HashMap.java:631)
at java.util.HashMap.putMapEntries(HashMap.java:515)
at java.util.HashMap.putAll(HashMap.java:785)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3345)
at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3230)
at 
org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3198)
at 
org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3351)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3254)
at org.codehaus.janino.UnitCompiler.access$3900(UnitCompiler.java:212)
at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3216)
at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3198)
at org.codehaus.janino.Java$Block.accept(Java.java:2756)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3260)
at org.codehaus.janino.UnitCompiler.access$4000(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3217)
at 
org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3198)
at org.codehaus.janino.Java$DoStatement.accept(Java.java:3304)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3186)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3009)
at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1336)
at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1309)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:799)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:958)
at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:393)
at 
org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:385)
at 
org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1286)
19/10/22 16:19:54 ERROR Utils: throw uncaught fatal error in thread Spark 
Context Cleaner
java.lang.OutOfMemoryError: GC overhead limit exceeded
at 
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320)
at 
org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178)
at 
org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73){noformat}
 Generated code looks like
{noformat}
/* 029 */   private void project_doConsume(InternalRow scan_row, UTF8String 
project_expr_0, boolean project_exprIsNull_0) throws java.io.IOException {
/* 030 */     byte project_caseWhenResultState = -1;
/* 031 */     do {
/* 032 */       boolean project_isNull1 = true;
/* 033 */       boolean project_value1 = false;
/* 034 */
/* 035 */       boolean project_isNull2 = project_exprIsNull_0;
/* 036 */       int project_value2 = -1;
/* 037 */       if (!project_exprIsNull_0) {
/* 038 */         UTF8String.IntWrapper project_intWrapper = new 
UTF8String.IntWrapper();
/* 039 */         if (project_expr_0.toInt(project_intWrapper)) {
/* 040 */           project_value2 = 

[jira] [Created] (SPARK-29561) Large Case Statement Code Generation OOM

2019-10-22 Thread Michael Chen (Jira)
Michael Chen created SPARK-29561:


 Summary: Large Case Statement Code Generation OOM
 Key: SPARK-29561
 URL: https://issues.apache.org/jira/browse/SPARK-29561
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.0
Reporter: Michael Chen


Spark Configuration

spark.driver.memory = 1g
spark.master = "local"
spark.deploy.mode = "client"

Try to execute a case statement with 3000+ branches.
Spark runs for a while before it OOM
{noformat}
java.lang.OutOfMemoryError: GC overhead limit exceeded
at 
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320)
at 
org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178)
at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73)
19/10/22 16:19:54 ERROR FileFormatWriter: Aborting job null.
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.HashMap.newNode(HashMap.java:1750)
at java.util.HashMap.putVal(HashMap.java:631)
at java.util.HashMap.putMapEntries(HashMap.java:515)
at java.util.HashMap.putAll(HashMap.java:785)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3345)
at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3230)
at 
org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3198)
at 
org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3351)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3254)
at org.codehaus.janino.UnitCompiler.access$3900(UnitCompiler.java:212)
at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3216)
at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3198)
at org.codehaus.janino.Java$Block.accept(Java.java:2756)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3260)
at org.codehaus.janino.UnitCompiler.access$4000(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3217)
at 
org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3198)
at org.codehaus.janino.Java$DoStatement.accept(Java.java:3304)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3186)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3009)
at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1336)
at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1309)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:799)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:958)
at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:393)
at 
org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:385)
at 
org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1286)
19/10/22 16:19:54 ERROR Utils: throw uncaught fatal error in thread Spark 
Context Cleaner
java.lang.OutOfMemoryError: GC overhead limit exceeded
at 
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320)
at 
org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178)
at 
org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73){noformat}
 

 

Generated code looks like
{noformat}
/* 029 */   private void project_doConsume(InternalRow scan_row, UTF8String 
project_expr_0, boolean project_exprIsNull_0) throws java.io.IOException {
/* 030 */     byte project_caseWhenResultState = -1;
/* 031 */     do {
/* 032 */       boolean project_isNull1 = true;
/* 033 */       boolean project_value1 = false;
/* 034 */
/* 035 */       boolean project_isNull2 = project_exprIsNull_0;
/* 036 */       int project_value2 = -1;
/* 037 */       if (!project_exprIsNull_0) {
/* 038 */         UTF8String.IntWrapper project_intWrapper = new 

[jira] [Updated] (SPARK-29561) Large Case Statement Code Generation OOM

2019-10-22 Thread Michael Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Chen updated SPARK-29561:
-
Description: 
Spark Configuration

spark.driver.memory = 1g
 spark.master = "local"
 spark.deploy.mode = "client"

Try to execute a case statement with 3000+ branches.
 Spark runs for a while before it OOM
{noformat}
java.lang.OutOfMemoryError: GC overhead limit exceeded
at 
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320)
at 
org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178)
at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73)
19/10/22 16:19:54 ERROR FileFormatWriter: Aborting job null.
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.HashMap.newNode(HashMap.java:1750)
at java.util.HashMap.putVal(HashMap.java:631)
at java.util.HashMap.putMapEntries(HashMap.java:515)
at java.util.HashMap.putAll(HashMap.java:785)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3345)
at org.codehaus.janino.UnitCompiler.access$5000(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3230)
at 
org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:3198)
at 
org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:3351)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3254)
at org.codehaus.janino.UnitCompiler.access$3900(UnitCompiler.java:212)
at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3216)
at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:3198)
at org.codehaus.janino.Java$Block.accept(Java.java:2756)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3260)
at org.codehaus.janino.UnitCompiler.access$4000(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3217)
at 
org.codehaus.janino.UnitCompiler$8.visitDoStatement(UnitCompiler.java:3198)
at org.codehaus.janino.Java$DoStatement.accept(Java.java:3304)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3197)
at 
org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3186)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3009)
at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1336)
at 
org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1309)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:799)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:958)
at org.codehaus.janino.UnitCompiler.access$700(UnitCompiler.java:212)
at 
org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:393)
at 
org.codehaus.janino.UnitCompiler$2.visitMemberClassDeclaration(UnitCompiler.java:385)
at 
org.codehaus.janino.Java$MemberClassDeclaration.accept(Java.java:1286)
19/10/22 16:19:54 ERROR Utils: throw uncaught fatal error in thread Spark 
Context Cleaner
java.lang.OutOfMemoryError: GC overhead limit exceeded
at 
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:182)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1320)
at 
org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178)
at 
org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73){noformat}
 Generated code looks like
{noformat}
/* 029 */   private void project_doConsume(InternalRow scan_row, UTF8String 
project_expr_0, boolean project_exprIsNull_0) throws java.io.IOException {
/* 030 */     byte project_caseWhenResultState = -1;
/* 031 */     do {
/* 032 */       boolean project_isNull1 = true;
/* 033 */       boolean project_value1 = false;
/* 034 */
/* 035 */       boolean project_isNull2 = project_exprIsNull_0;
/* 036 */       int project_value2 = -1;
/* 037 */       if (!project_exprIsNull_0) {
/* 038 */         UTF8String.IntWrapper project_intWrapper = new 
UTF8String.IntWrapper();
/* 039 */         if (project_expr_0.toInt(project_intWrapper)) {
/* 040 */           project_value2 = project_intWrapper.value;
/* 041 */         } else