[jira] [Commented] (SPARK-23791) Sub-optimal generated code for sum aggregating
[ https://issues.apache.org/jira/browse/SPARK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423312#comment-16423312 ] Takeshi Yamamuro commented on SPARK-23791: -- I checked the root cause of this ticket is the same with SPARK-21870 by running the query above on my pr: [https://github.com/apache/spark/compare/master...maropu:SPARK-21870-2|https://github.com/apache/spark/compare/master...maropu:SPARK-21870-2.] So, I'll close this as duplicate. > Sub-optimal generated code for sum aggregating > -- > > Key: SPARK-23791 > URL: https://issues.apache.org/jira/browse/SPARK-23791 > Project: Spark > Issue Type: Bug > Components: Optimizer >Affects Versions: 2.2.0, 2.3.0 >Reporter: Valentin Nikotin >Priority: Major > Labels: performance > Original Estimate: 24h > Remaining Estimate: 24h > > It appears to be that with wholeStage codegen enabled simple spark job > performing sum aggregation of 50 columns runs ~4 timer slower than without > wholeStage codegen. > Please check test case code. Please note that udf is only to prevent > elimination optimizations that could be applied to literals. > {code:scala} > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.{Column, DataFrame, SparkSession} > import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED > object SPARK_23791 { > def main(args: Array[String]): Unit = { > val spark = SparkSession > .builder() > .master("local[4]") > .appName("test") > .getOrCreate() > def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: > DataFrame) = > (0 until cnt).foldLeft(inputDF)((df, idx) => > df.withColumn(s"$prefix$idx", value)) > val dummy = udf(() => Option.empty[Int]) > def test(cnt: Int = 50, rows: Int = 500, grps: Int = 1000): Double = { > val t0 = System.nanoTime() > spark.range(rows).toDF() > .withColumn("grp", col("id").mod(grps)) > .transform(addConstColumns("null_", cnt, dummy())) > .groupBy("grp") > .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*) > .collect() > val t1 = System.nanoTime() > (t1 - t0) / 1e9 > } > val timings = for (i <- 1 to 3) yield { > spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true) > val with_wholestage = test() > spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false) > val without_wholestage = test() > (with_wholestage, without_wholestage) > } > timings.foreach(println) > println("Press enter ...") > System.in.read() > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23791) Sub-optimal generated code for sum aggregating
[ https://issues.apache.org/jira/browse/SPARK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422095#comment-16422095 ] Takeshi Yamamuro commented on SPARK-23791: -- Reopend (https://issues.apache.org/jira/browse/SPARK-21870) > Sub-optimal generated code for sum aggregating > -- > > Key: SPARK-23791 > URL: https://issues.apache.org/jira/browse/SPARK-23791 > Project: Spark > Issue Type: Bug > Components: Optimizer >Affects Versions: 2.2.0, 2.3.0 >Reporter: Valentin Nikotin >Priority: Major > Labels: performance > Original Estimate: 24h > Remaining Estimate: 24h > > It appears to be that with wholeStage codegen enabled simple spark job > performing sum aggregation of 50 columns runs ~4 timer slower than without > wholeStage codegen. > Please check test case code. Please note that udf is only to prevent > elimination optimizations that could be applied to literals. > {code:scala} > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.{Column, DataFrame, SparkSession} > import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED > object SPARK_23791 { > def main(args: Array[String]): Unit = { > val spark = SparkSession > .builder() > .master("local[4]") > .appName("test") > .getOrCreate() > def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: > DataFrame) = > (0 until cnt).foldLeft(inputDF)((df, idx) => > df.withColumn(s"$prefix$idx", value)) > val dummy = udf(() => Option.empty[Int]) > def test(cnt: Int = 50, rows: Int = 500, grps: Int = 1000): Double = { > val t0 = System.nanoTime() > spark.range(rows).toDF() > .withColumn("grp", col("id").mod(grps)) > .transform(addConstColumns("null_", cnt, dummy())) > .groupBy("grp") > .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*) > .collect() > val t1 = System.nanoTime() > (t1 - t0) / 1e9 > } > val timings = for (i <- 1 to 3) yield { > spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true) > val with_wholestage = test() > spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false) > val without_wholestage = test() > (with_wholestage, without_wholestage) > } > timings.foreach(println) > println("Press enter ...") > System.in.read() > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23791) Sub-optimal generated code for sum aggregating
[ https://issues.apache.org/jira/browse/SPARK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422074#comment-16422074 ] Takeshi Yamamuro commented on SPARK-23791: -- sure, I will. > Sub-optimal generated code for sum aggregating > -- > > Key: SPARK-23791 > URL: https://issues.apache.org/jira/browse/SPARK-23791 > Project: Spark > Issue Type: Bug > Components: Optimizer >Affects Versions: 2.2.0, 2.3.0 >Reporter: Valentin Nikotin >Priority: Major > Labels: performance > Original Estimate: 24h > Remaining Estimate: 24h > > It appears to be that with wholeStage codegen enabled simple spark job > performing sum aggregation of 50 columns runs ~4 timer slower than without > wholeStage codegen. > Please check test case code. Please note that udf is only to prevent > elimination optimizations that could be applied to literals. > {code:scala} > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.{Column, DataFrame, SparkSession} > import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED > object SPARK_23791 { > def main(args: Array[String]): Unit = { > val spark = SparkSession > .builder() > .master("local[4]") > .appName("test") > .getOrCreate() > def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: > DataFrame) = > (0 until cnt).foldLeft(inputDF)((df, idx) => > df.withColumn(s"$prefix$idx", value)) > val dummy = udf(() => Option.empty[Int]) > def test(cnt: Int = 50, rows: Int = 500, grps: Int = 1000): Double = { > val t0 = System.nanoTime() > spark.range(rows).toDF() > .withColumn("grp", col("id").mod(grps)) > .transform(addConstColumns("null_", cnt, dummy())) > .groupBy("grp") > .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*) > .collect() > val t1 = System.nanoTime() > (t1 - t0) / 1e9 > } > val timings = for (i <- 1 to 3) yield { > spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true) > val with_wholestage = test() > spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false) > val without_wholestage = test() > (with_wholestage, without_wholestage) > } > timings.foreach(println) > println("Press enter ...") > System.in.read() > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23791) Sub-optimal generated code for sum aggregating
[ https://issues.apache.org/jira/browse/SPARK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422068#comment-16422068 ] Marco Gaido commented on SPARK-23791: - Yes, I think you're right [~maropu]. Do you want to reopen your PR and go on with it? > Sub-optimal generated code for sum aggregating > -- > > Key: SPARK-23791 > URL: https://issues.apache.org/jira/browse/SPARK-23791 > Project: Spark > Issue Type: Bug > Components: Optimizer >Affects Versions: 2.2.0, 2.3.0 >Reporter: Valentin Nikotin >Priority: Major > Labels: performance > Original Estimate: 24h > Remaining Estimate: 24h > > It appears to be that with wholeStage codegen enabled simple spark job > performing sum aggregation of 50 columns runs ~4 timer slower than without > wholeStage codegen. > Please check test case code. Please note that udf is only to prevent > elimination optimizations that could be applied to literals. > {code:scala} > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.{Column, DataFrame, SparkSession} > import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED > object SPARK_23791 { > def main(args: Array[String]): Unit = { > val spark = SparkSession > .builder() > .master("local[4]") > .appName("test") > .getOrCreate() > def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: > DataFrame) = > (0 until cnt).foldLeft(inputDF)((df, idx) => > df.withColumn(s"$prefix$idx", value)) > val dummy = udf(() => Option.empty[Int]) > def test(cnt: Int = 50, rows: Int = 500, grps: Int = 1000): Double = { > val t0 = System.nanoTime() > spark.range(rows).toDF() > .withColumn("grp", col("id").mod(grps)) > .transform(addConstColumns("null_", cnt, dummy())) > .groupBy("grp") > .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*) > .collect() > val t1 = System.nanoTime() > (t1 - t0) / 1e9 > } > val timings = for (i <- 1 to 3) yield { > spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true) > val with_wholestage = test() > spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false) > val without_wholestage = test() > (with_wholestage, without_wholestage) > } > timings.foreach(println) > println("Press enter ...") > System.in.read() > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23791) Sub-optimal generated code for sum aggregating
[ https://issues.apache.org/jira/browse/SPARK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422064#comment-16422064 ] Marco Gaido commented on SPARK-23791: - Thanks, [~rednikotin]. The error you noticed in the range 84-99 has already been solved in SPARK-23628 (it is fixed in 2.3.1, 2.4.0). I think I know the reason of the performance issue: I will try and work on it in the next days. Thanks. > Sub-optimal generated code for sum aggregating > -- > > Key: SPARK-23791 > URL: https://issues.apache.org/jira/browse/SPARK-23791 > Project: Spark > Issue Type: Bug > Components: Optimizer >Affects Versions: 2.2.0, 2.3.0 >Reporter: Valentin Nikotin >Priority: Major > Labels: performance > Original Estimate: 24h > Remaining Estimate: 24h > > It appears to be that with wholeStage codegen enabled simple spark job > performing sum aggregation of 50 columns runs ~4 timer slower than without > wholeStage codegen. > Please check test case code. Please note that udf is only to prevent > elimination optimizations that could be applied to literals. > {code:scala} > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.{Column, DataFrame, SparkSession} > import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED > object SPARK_23791 { > def main(args: Array[String]): Unit = { > val spark = SparkSession > .builder() > .master("local[4]") > .appName("test") > .getOrCreate() > def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: > DataFrame) = > (0 until cnt).foldLeft(inputDF)((df, idx) => > df.withColumn(s"$prefix$idx", value)) > val dummy = udf(() => Option.empty[Int]) > def test(cnt: Int = 50, rows: Int = 500, grps: Int = 1000): Double = { > val t0 = System.nanoTime() > spark.range(rows).toDF() > .withColumn("grp", col("id").mod(grps)) > .transform(addConstColumns("null_", cnt, dummy())) > .groupBy("grp") > .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*) > .collect() > val t1 = System.nanoTime() > (t1 - t0) / 1e9 > } > val timings = for (i <- 1 to 3) yield { > spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true) > val with_wholestage = test() > spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false) > val without_wholestage = test() > (with_wholestage, without_wholestage) > } > timings.foreach(println) > println("Press enter ...") > System.in.read() > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23791) Sub-optimal generated code for sum aggregating
[ https://issues.apache.org/jira/browse/SPARK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422062#comment-16422062 ] Takeshi Yamamuro commented on SPARK-23791: -- Probably, this issue might to be related to https://issues.apache.org/jira/browse/SPARK-21870: my pr was here: https://github.com/apache/spark/pull/19082 > Sub-optimal generated code for sum aggregating > -- > > Key: SPARK-23791 > URL: https://issues.apache.org/jira/browse/SPARK-23791 > Project: Spark > Issue Type: Bug > Components: Optimizer >Affects Versions: 2.2.0, 2.3.0 >Reporter: Valentin Nikotin >Priority: Major > Labels: performance > Original Estimate: 24h > Remaining Estimate: 24h > > It appears to be that with wholeStage codegen enabled simple spark job > performing sum aggregation of 50 columns runs ~4 timer slower than without > wholeStage codegen. > Please check test case code. Please note that udf is only to prevent > elimination optimizations that could be applied to literals. > {code:scala} > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.{Column, DataFrame, SparkSession} > import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED > object SPARK_23791 { > def main(args: Array[String]): Unit = { > val spark = SparkSession > .builder() > .master("local[4]") > .appName("test") > .getOrCreate() > def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: > DataFrame) = > (0 until cnt).foldLeft(inputDF)((df, idx) => > df.withColumn(s"$prefix$idx", value)) > val dummy = udf(() => Option.empty[Int]) > def test(cnt: Int = 50, rows: Int = 500, grps: Int = 1000): Double = { > val t0 = System.nanoTime() > spark.range(rows).toDF() > .withColumn("grp", col("id").mod(grps)) > .transform(addConstColumns("null_", cnt, dummy())) > .groupBy("grp") > .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*) > .collect() > val t1 = System.nanoTime() > (t1 - t0) / 1e9 > } > val timings = for (i <- 1 to 3) yield { > spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true) > val with_wholestage = test() > spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false) > val without_wholestage = test() > (with_wholestage, without_wholestage) > } > timings.foreach(println) > println("Press enter ...") > System.in.read() > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23791) Sub-optimal generated code for sum aggregating
[ https://issues.apache.org/jira/browse/SPARK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421407#comment-16421407 ] Valentin Nikotin commented on SPARK-23791: -- Hi Marco Gaido, I tested performance with 2.2 and 2.3, there was not really difference between these 2 versions. 1. I am running test right now to get timings for 1 to 105 columns (gcp dataproc 1+4 nodes cluster with Spark 2.2). I've noticed that before and confirmed now: * 1-13 cols, wholeStage runs faster * starting from 14 cols timing jumped ~4-5 times, while with disabled wholeStage it's linearly growing. * 84 - 99 -- the error above * 100+ runs the same timings 2. I have not tried current master yet. I would like test it next week. I actually found very similar issue, but without too much details: [link title|https://issues.apache.org/jira/browse/SPARK-20479] > Sub-optimal generated code for sum aggregating > -- > > Key: SPARK-23791 > URL: https://issues.apache.org/jira/browse/SPARK-23791 > Project: Spark > Issue Type: Bug > Components: Optimizer >Affects Versions: 2.2.0, 2.3.0 >Reporter: Valentin Nikotin >Priority: Major > Labels: performance > Original Estimate: 24h > Remaining Estimate: 24h > > It appears to be that with wholeStage codegen enabled simple spark job > performing sum aggregation of 50 columns runs ~4 timer slower than without > wholeStage codegen. > Please check test case code. Please note that udf is only to prevent > elimination optimizations that could be applied to literals. > {code:scala} > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.{Column, DataFrame, SparkSession} > import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED > object SPARK_23791 { > def main(args: Array[String]): Unit = { > val spark = SparkSession > .builder() > .master("local[4]") > .appName("test") > .getOrCreate() > def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: > DataFrame) = > (0 until cnt).foldLeft(inputDF)((df, idx) => > df.withColumn(s"$prefix$idx", value)) > val dummy = udf(() => Option.empty[Int]) > def test(cnt: Int = 50, rows: Int = 500, grps: Int = 1000): Double = { > val t0 = System.nanoTime() > spark.range(rows).toDF() > .withColumn("grp", col("id").mod(grps)) > .transform(addConstColumns("null_", cnt, dummy())) > .groupBy("grp") > .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*) > .collect() > val t1 = System.nanoTime() > (t1 - t0) / 1e9 > } > val timings = for (i <- 1 to 3) yield { > spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true) > val with_wholestage = test() > spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false) > val without_wholestage = test() > (with_wholestage, without_wholestage) > } > timings.foreach(println) > println("Press enter ...") > System.in.read() > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23791) Sub-optimal generated code for sum aggregating
[ https://issues.apache.org/jira/browse/SPARK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420335#comment-16420335 ] Marco Gaido commented on SPARK-23791: - Hi [~rednikotin]. Thanks for reporting this. The error you mentioned in the comment should have been fixed in current master. May I kindly ask you some more details? 1 - Do you see the same perf issue with a low number of columns? Or only with large number of columns? Or "middle" number of columns? 2 - Have you tried if this is still true using current master branch? Thanks. > Sub-optimal generated code for sum aggregating > -- > > Key: SPARK-23791 > URL: https://issues.apache.org/jira/browse/SPARK-23791 > Project: Spark > Issue Type: Bug > Components: Optimizer >Affects Versions: 2.2.0, 2.3.0 >Reporter: Valentin Nikotin >Priority: Major > Labels: performance > Original Estimate: 24h > Remaining Estimate: 24h > > It appears to be that with wholeStage codegen enabled simple spark job > performing sum aggregation of 50 columns runs ~4 timer slower than without > wholeStage codegen. > Please check test case code. Please note that udf is only to prevent > elimination optimizations that could be applied to literals. > {code:scala} > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.{Column, DataFrame, SparkSession} > import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED > object SPARK_23791 { > def main(args: Array[String]): Unit = { > val spark = SparkSession > .builder() > .master("local[4]") > .appName("test") > .getOrCreate() > def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: > DataFrame) = > (0 until cnt).foldLeft(inputDF)((df, idx) => > df.withColumn(s"$prefix$idx", value)) > val dummy = udf(() => Option.empty[Int]) > def test(cnt: Int = 50, rows: Int = 500, grps: Int = 1000): Double = { > val t0 = System.nanoTime() > spark.range(rows).toDF() > .withColumn("grp", col("id").mod(grps)) > .transform(addConstColumns("null_", cnt, dummy())) > .groupBy("grp") > .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*) > .collect() > val t1 = System.nanoTime() > (t1 - t0) / 1e9 > } > val timings = for (i <- 1 to 3) yield { > spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true) > val with_wholestage = test() > spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false) > val without_wholestage = test() > (with_wholestage, without_wholestage) > } > timings.foreach(println) > println("Press enter ...") > System.in.read() > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23791) Sub-optimal generated code for sum aggregating
[ https://issues.apache.org/jira/browse/SPARK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16412799#comment-16412799 ] Valentin Nikotin commented on SPARK-23791: -- When testing aggregation with different number of columns (v2.3.0) I found that 100 columns works the approx same time for both cases. With 90 columns Spark job failed with {noformat} 18/03/25 00:11:33 ERROR Executor: Exception in task 117.0 in stage 1.0 (TID 4) java.lang.ClassFormatError: Too many arguments in method signature in class file org/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage2 at java.lang.ClassLoader.defineClass1(Native Method) {noformat} > Sub-optimal generated code for sum aggregating > -- > > Key: SPARK-23791 > URL: https://issues.apache.org/jira/browse/SPARK-23791 > Project: Spark > Issue Type: Bug > Components: Optimizer >Affects Versions: 2.2.0, 2.3.0 >Reporter: Valentin Nikotin >Priority: Major > Labels: performance > Original Estimate: 24h > Remaining Estimate: 24h > > It appears to be that with wholeStage codegen enabled simple spark job > performing sum aggregation of 50 columns runs ~4 timer slower than without > wholeStage codegen. > Please check test case code. Please note that udf is only to prevent > elimination optimizations that could be applied to literals. > {code:scala} > import org.apache.spark.sql.functions._ > import org.apache.spark.sql.{Column, DataFrame, SparkSession} > import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED > object SPARK_23791 { > def main(args: Array[String]): Unit = { > val spark = SparkSession > .builder() > .master("local[4]") > .appName("test") > .getOrCreate() > def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: > DataFrame) = > (0 until cnt).foldLeft(inputDF)((df, idx) => > df.withColumn(s"$prefix$idx", value)) > val dummy = udf(() => Option.empty[Int]) > def test(cnt: Int = 50, rows: Int = 500, grps: Int = 1000): Double = { > val t0 = System.nanoTime() > spark.range(rows).toDF() > .withColumn("grp", col("id").mod(grps)) > .transform(addConstColumns("null_", cnt, dummy())) > .groupBy("grp") > .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*) > .collect() > val t1 = System.nanoTime() > (t1 - t0) / 1e9 > } > val timings = for (i <- 1 to 3) yield { > spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true) > val with_wholestage = test() > spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false) > val without_wholestage = test() > (with_wholestage, without_wholestage) > } > timings.foreach(println) > println("Press enter ...") > System.in.read() > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org