[jira] [Commented] (SPARK-23791) Sub-optimal generated code for sum aggregating

2018-04-02 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423312#comment-16423312
 ] 

Takeshi Yamamuro commented on SPARK-23791:
--

I checked the root cause of this ticket is the same with SPARK-21870 by running 
the query above on my pr: 
[https://github.com/apache/spark/compare/master...maropu:SPARK-21870-2|https://github.com/apache/spark/compare/master...maropu:SPARK-21870-2.]
 So, I'll close this as duplicate.

> Sub-optimal generated code for sum aggregating
> --
>
> Key: SPARK-23791
> URL: https://issues.apache.org/jira/browse/SPARK-23791
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Valentin Nikotin
>Priority: Major
>  Labels: performance
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> It appears to be that with wholeStage codegen enabled simple spark job 
> performing sum aggregation of 50 columns runs ~4 timer slower than without 
> wholeStage codegen.
> Please check test case code. Please note that udf is only to prevent 
> elimination optimizations that could be applied to literals. 
> {code:scala}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.{Column, DataFrame, SparkSession}
> import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
> object SPARK_23791 {
>   def main(args: Array[String]): Unit = {
> val spark = SparkSession
>   .builder()
>   .master("local[4]")
>   .appName("test")
>   .getOrCreate()
> def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: 
> DataFrame) =
>   (0 until cnt).foldLeft(inputDF)((df, idx) => 
> df.withColumn(s"$prefix$idx", value))
> val dummy = udf(() => Option.empty[Int])
> def test(cnt: Int = 50, rows: Int = 500, grps: Int = 1000): Double = {
>   val t0 = System.nanoTime()
>   spark.range(rows).toDF()
> .withColumn("grp", col("id").mod(grps))
> .transform(addConstColumns("null_", cnt, dummy()))
> .groupBy("grp")
> .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*)
> .collect()
>   val t1 = System.nanoTime()
>   (t1 - t0) / 1e9
> }
> val timings = for (i <- 1 to 3) yield {
>   spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true)
>   val with_wholestage = test()
>   spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false)
>   val without_wholestage = test()
>   (with_wholestage, without_wholestage)
> }
> timings.foreach(println)
> println("Press enter ...")
> System.in.read()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23791) Sub-optimal generated code for sum aggregating

2018-04-02 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422095#comment-16422095
 ] 

Takeshi Yamamuro commented on SPARK-23791:
--

Reopend (https://issues.apache.org/jira/browse/SPARK-21870)

> Sub-optimal generated code for sum aggregating
> --
>
> Key: SPARK-23791
> URL: https://issues.apache.org/jira/browse/SPARK-23791
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Valentin Nikotin
>Priority: Major
>  Labels: performance
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> It appears to be that with wholeStage codegen enabled simple spark job 
> performing sum aggregation of 50 columns runs ~4 timer slower than without 
> wholeStage codegen.
> Please check test case code. Please note that udf is only to prevent 
> elimination optimizations that could be applied to literals. 
> {code:scala}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.{Column, DataFrame, SparkSession}
> import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
> object SPARK_23791 {
>   def main(args: Array[String]): Unit = {
> val spark = SparkSession
>   .builder()
>   .master("local[4]")
>   .appName("test")
>   .getOrCreate()
> def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: 
> DataFrame) =
>   (0 until cnt).foldLeft(inputDF)((df, idx) => 
> df.withColumn(s"$prefix$idx", value))
> val dummy = udf(() => Option.empty[Int])
> def test(cnt: Int = 50, rows: Int = 500, grps: Int = 1000): Double = {
>   val t0 = System.nanoTime()
>   spark.range(rows).toDF()
> .withColumn("grp", col("id").mod(grps))
> .transform(addConstColumns("null_", cnt, dummy()))
> .groupBy("grp")
> .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*)
> .collect()
>   val t1 = System.nanoTime()
>   (t1 - t0) / 1e9
> }
> val timings = for (i <- 1 to 3) yield {
>   spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true)
>   val with_wholestage = test()
>   spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false)
>   val without_wholestage = test()
>   (with_wholestage, without_wholestage)
> }
> timings.foreach(println)
> println("Press enter ...")
> System.in.read()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23791) Sub-optimal generated code for sum aggregating

2018-04-02 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422074#comment-16422074
 ] 

Takeshi Yamamuro commented on SPARK-23791:
--

sure, I will.

> Sub-optimal generated code for sum aggregating
> --
>
> Key: SPARK-23791
> URL: https://issues.apache.org/jira/browse/SPARK-23791
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Valentin Nikotin
>Priority: Major
>  Labels: performance
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> It appears to be that with wholeStage codegen enabled simple spark job 
> performing sum aggregation of 50 columns runs ~4 timer slower than without 
> wholeStage codegen.
> Please check test case code. Please note that udf is only to prevent 
> elimination optimizations that could be applied to literals. 
> {code:scala}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.{Column, DataFrame, SparkSession}
> import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
> object SPARK_23791 {
>   def main(args: Array[String]): Unit = {
> val spark = SparkSession
>   .builder()
>   .master("local[4]")
>   .appName("test")
>   .getOrCreate()
> def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: 
> DataFrame) =
>   (0 until cnt).foldLeft(inputDF)((df, idx) => 
> df.withColumn(s"$prefix$idx", value))
> val dummy = udf(() => Option.empty[Int])
> def test(cnt: Int = 50, rows: Int = 500, grps: Int = 1000): Double = {
>   val t0 = System.nanoTime()
>   spark.range(rows).toDF()
> .withColumn("grp", col("id").mod(grps))
> .transform(addConstColumns("null_", cnt, dummy()))
> .groupBy("grp")
> .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*)
> .collect()
>   val t1 = System.nanoTime()
>   (t1 - t0) / 1e9
> }
> val timings = for (i <- 1 to 3) yield {
>   spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true)
>   val with_wholestage = test()
>   spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false)
>   val without_wholestage = test()
>   (with_wholestage, without_wholestage)
> }
> timings.foreach(println)
> println("Press enter ...")
> System.in.read()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23791) Sub-optimal generated code for sum aggregating

2018-04-02 Thread Marco Gaido (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422068#comment-16422068
 ] 

Marco Gaido commented on SPARK-23791:
-

Yes, I think you're right [~maropu]. Do you want to reopen your PR and go on 
with it?

> Sub-optimal generated code for sum aggregating
> --
>
> Key: SPARK-23791
> URL: https://issues.apache.org/jira/browse/SPARK-23791
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Valentin Nikotin
>Priority: Major
>  Labels: performance
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> It appears to be that with wholeStage codegen enabled simple spark job 
> performing sum aggregation of 50 columns runs ~4 timer slower than without 
> wholeStage codegen.
> Please check test case code. Please note that udf is only to prevent 
> elimination optimizations that could be applied to literals. 
> {code:scala}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.{Column, DataFrame, SparkSession}
> import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
> object SPARK_23791 {
>   def main(args: Array[String]): Unit = {
> val spark = SparkSession
>   .builder()
>   .master("local[4]")
>   .appName("test")
>   .getOrCreate()
> def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: 
> DataFrame) =
>   (0 until cnt).foldLeft(inputDF)((df, idx) => 
> df.withColumn(s"$prefix$idx", value))
> val dummy = udf(() => Option.empty[Int])
> def test(cnt: Int = 50, rows: Int = 500, grps: Int = 1000): Double = {
>   val t0 = System.nanoTime()
>   spark.range(rows).toDF()
> .withColumn("grp", col("id").mod(grps))
> .transform(addConstColumns("null_", cnt, dummy()))
> .groupBy("grp")
> .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*)
> .collect()
>   val t1 = System.nanoTime()
>   (t1 - t0) / 1e9
> }
> val timings = for (i <- 1 to 3) yield {
>   spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true)
>   val with_wholestage = test()
>   spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false)
>   val without_wholestage = test()
>   (with_wholestage, without_wholestage)
> }
> timings.foreach(println)
> println("Press enter ...")
> System.in.read()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23791) Sub-optimal generated code for sum aggregating

2018-04-02 Thread Marco Gaido (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422064#comment-16422064
 ] 

Marco Gaido commented on SPARK-23791:
-

Thanks, [~rednikotin]. The error you noticed in the range 84-99 has already 
been solved in SPARK-23628 (it is fixed in 2.3.1, 2.4.0). I think I know the 
reason of the performance issue: I will try and work on it in the next days. 
Thanks.

> Sub-optimal generated code for sum aggregating
> --
>
> Key: SPARK-23791
> URL: https://issues.apache.org/jira/browse/SPARK-23791
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Valentin Nikotin
>Priority: Major
>  Labels: performance
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> It appears to be that with wholeStage codegen enabled simple spark job 
> performing sum aggregation of 50 columns runs ~4 timer slower than without 
> wholeStage codegen.
> Please check test case code. Please note that udf is only to prevent 
> elimination optimizations that could be applied to literals. 
> {code:scala}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.{Column, DataFrame, SparkSession}
> import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
> object SPARK_23791 {
>   def main(args: Array[String]): Unit = {
> val spark = SparkSession
>   .builder()
>   .master("local[4]")
>   .appName("test")
>   .getOrCreate()
> def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: 
> DataFrame) =
>   (0 until cnt).foldLeft(inputDF)((df, idx) => 
> df.withColumn(s"$prefix$idx", value))
> val dummy = udf(() => Option.empty[Int])
> def test(cnt: Int = 50, rows: Int = 500, grps: Int = 1000): Double = {
>   val t0 = System.nanoTime()
>   spark.range(rows).toDF()
> .withColumn("grp", col("id").mod(grps))
> .transform(addConstColumns("null_", cnt, dummy()))
> .groupBy("grp")
> .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*)
> .collect()
>   val t1 = System.nanoTime()
>   (t1 - t0) / 1e9
> }
> val timings = for (i <- 1 to 3) yield {
>   spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true)
>   val with_wholestage = test()
>   spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false)
>   val without_wholestage = test()
>   (with_wholestage, without_wholestage)
> }
> timings.foreach(println)
> println("Press enter ...")
> System.in.read()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23791) Sub-optimal generated code for sum aggregating

2018-04-02 Thread Takeshi Yamamuro (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16422062#comment-16422062
 ] 

Takeshi Yamamuro commented on SPARK-23791:
--

Probably, this issue might to be related to 
https://issues.apache.org/jira/browse/SPARK-21870:
my pr was here: https://github.com/apache/spark/pull/19082

> Sub-optimal generated code for sum aggregating
> --
>
> Key: SPARK-23791
> URL: https://issues.apache.org/jira/browse/SPARK-23791
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Valentin Nikotin
>Priority: Major
>  Labels: performance
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> It appears to be that with wholeStage codegen enabled simple spark job 
> performing sum aggregation of 50 columns runs ~4 timer slower than without 
> wholeStage codegen.
> Please check test case code. Please note that udf is only to prevent 
> elimination optimizations that could be applied to literals. 
> {code:scala}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.{Column, DataFrame, SparkSession}
> import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
> object SPARK_23791 {
>   def main(args: Array[String]): Unit = {
> val spark = SparkSession
>   .builder()
>   .master("local[4]")
>   .appName("test")
>   .getOrCreate()
> def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: 
> DataFrame) =
>   (0 until cnt).foldLeft(inputDF)((df, idx) => 
> df.withColumn(s"$prefix$idx", value))
> val dummy = udf(() => Option.empty[Int])
> def test(cnt: Int = 50, rows: Int = 500, grps: Int = 1000): Double = {
>   val t0 = System.nanoTime()
>   spark.range(rows).toDF()
> .withColumn("grp", col("id").mod(grps))
> .transform(addConstColumns("null_", cnt, dummy()))
> .groupBy("grp")
> .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*)
> .collect()
>   val t1 = System.nanoTime()
>   (t1 - t0) / 1e9
> }
> val timings = for (i <- 1 to 3) yield {
>   spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true)
>   val with_wholestage = test()
>   spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false)
>   val without_wholestage = test()
>   (with_wholestage, without_wholestage)
> }
> timings.foreach(println)
> println("Press enter ...")
> System.in.read()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23791) Sub-optimal generated code for sum aggregating

2018-03-31 Thread Valentin Nikotin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421407#comment-16421407
 ] 

Valentin Nikotin commented on SPARK-23791:
--

Hi Marco Gaido, 

I tested performance with 2.2 and 2.3, there was not really difference between 
these 2 versions. 
1. I am running test right now to get timings for 1 to 105 columns (gcp 
dataproc 1+4 nodes cluster with Spark 2.2). 
I've noticed that before and confirmed now: 

* 1-13 cols, wholeStage runs faster
* starting from 14 cols timing jumped ~4-5 times, while with disabled 
wholeStage it's linearly growing.
* 84 - 99 -- the error above
* 100+ runs the same timings

2. I have not tried current master yet. I would like test it next week.

I actually found very similar issue, but without too much details: [link 
title|https://issues.apache.org/jira/browse/SPARK-20479]

> Sub-optimal generated code for sum aggregating
> --
>
> Key: SPARK-23791
> URL: https://issues.apache.org/jira/browse/SPARK-23791
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Valentin Nikotin
>Priority: Major
>  Labels: performance
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> It appears to be that with wholeStage codegen enabled simple spark job 
> performing sum aggregation of 50 columns runs ~4 timer slower than without 
> wholeStage codegen.
> Please check test case code. Please note that udf is only to prevent 
> elimination optimizations that could be applied to literals. 
> {code:scala}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.{Column, DataFrame, SparkSession}
> import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
> object SPARK_23791 {
>   def main(args: Array[String]): Unit = {
> val spark = SparkSession
>   .builder()
>   .master("local[4]")
>   .appName("test")
>   .getOrCreate()
> def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: 
> DataFrame) =
>   (0 until cnt).foldLeft(inputDF)((df, idx) => 
> df.withColumn(s"$prefix$idx", value))
> val dummy = udf(() => Option.empty[Int])
> def test(cnt: Int = 50, rows: Int = 500, grps: Int = 1000): Double = {
>   val t0 = System.nanoTime()
>   spark.range(rows).toDF()
> .withColumn("grp", col("id").mod(grps))
> .transform(addConstColumns("null_", cnt, dummy()))
> .groupBy("grp")
> .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*)
> .collect()
>   val t1 = System.nanoTime()
>   (t1 - t0) / 1e9
> }
> val timings = for (i <- 1 to 3) yield {
>   spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true)
>   val with_wholestage = test()
>   spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false)
>   val without_wholestage = test()
>   (with_wholestage, without_wholestage)
> }
> timings.foreach(println)
> println("Press enter ...")
> System.in.read()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23791) Sub-optimal generated code for sum aggregating

2018-03-30 Thread Marco Gaido (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16420335#comment-16420335
 ] 

Marco Gaido commented on SPARK-23791:
-

Hi [~rednikotin]. Thanks for reporting this. The error you mentioned in the 
comment should have been fixed in current master.

May I kindly ask you some more details?
 1 - Do you see the same perf issue with a low number of columns? Or only with 
large number of columns? Or "middle" number of columns?
 2 - Have you tried if this is still true using current master branch?

Thanks.

> Sub-optimal generated code for sum aggregating
> --
>
> Key: SPARK-23791
> URL: https://issues.apache.org/jira/browse/SPARK-23791
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Valentin Nikotin
>Priority: Major
>  Labels: performance
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> It appears to be that with wholeStage codegen enabled simple spark job 
> performing sum aggregation of 50 columns runs ~4 timer slower than without 
> wholeStage codegen.
> Please check test case code. Please note that udf is only to prevent 
> elimination optimizations that could be applied to literals. 
> {code:scala}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.{Column, DataFrame, SparkSession}
> import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
> object SPARK_23791 {
>   def main(args: Array[String]): Unit = {
> val spark = SparkSession
>   .builder()
>   .master("local[4]")
>   .appName("test")
>   .getOrCreate()
> def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: 
> DataFrame) =
>   (0 until cnt).foldLeft(inputDF)((df, idx) => 
> df.withColumn(s"$prefix$idx", value))
> val dummy = udf(() => Option.empty[Int])
> def test(cnt: Int = 50, rows: Int = 500, grps: Int = 1000): Double = {
>   val t0 = System.nanoTime()
>   spark.range(rows).toDF()
> .withColumn("grp", col("id").mod(grps))
> .transform(addConstColumns("null_", cnt, dummy()))
> .groupBy("grp")
> .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*)
> .collect()
>   val t1 = System.nanoTime()
>   (t1 - t0) / 1e9
> }
> val timings = for (i <- 1 to 3) yield {
>   spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true)
>   val with_wholestage = test()
>   spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false)
>   val without_wholestage = test()
>   (with_wholestage, without_wholestage)
> }
> timings.foreach(println)
> println("Press enter ...")
> System.in.read()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23791) Sub-optimal generated code for sum aggregating

2018-03-24 Thread Valentin Nikotin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16412799#comment-16412799
 ] 

Valentin Nikotin commented on SPARK-23791:
--

When testing aggregation with different number of columns (v2.3.0) 
I found that 100 columns works the approx same time for both cases.
With 90 columns Spark job failed with 
{noformat}
18/03/25 00:11:33 ERROR Executor: Exception in task 117.0 in stage 1.0 (TID 4)
java.lang.ClassFormatError: Too many arguments in method signature in class 
file 
org/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage2
at java.lang.ClassLoader.defineClass1(Native Method)
{noformat}


> Sub-optimal generated code for sum aggregating
> --
>
> Key: SPARK-23791
> URL: https://issues.apache.org/jira/browse/SPARK-23791
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Valentin Nikotin
>Priority: Major
>  Labels: performance
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> It appears to be that with wholeStage codegen enabled simple spark job 
> performing sum aggregation of 50 columns runs ~4 timer slower than without 
> wholeStage codegen.
> Please check test case code. Please note that udf is only to prevent 
> elimination optimizations that could be applied to literals. 
> {code:scala}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.{Column, DataFrame, SparkSession}
> import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
> object SPARK_23791 {
>   def main(args: Array[String]): Unit = {
> val spark = SparkSession
>   .builder()
>   .master("local[4]")
>   .appName("test")
>   .getOrCreate()
> def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: 
> DataFrame) =
>   (0 until cnt).foldLeft(inputDF)((df, idx) => 
> df.withColumn(s"$prefix$idx", value))
> val dummy = udf(() => Option.empty[Int])
> def test(cnt: Int = 50, rows: Int = 500, grps: Int = 1000): Double = {
>   val t0 = System.nanoTime()
>   spark.range(rows).toDF()
> .withColumn("grp", col("id").mod(grps))
> .transform(addConstColumns("null_", cnt, dummy()))
> .groupBy("grp")
> .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*)
> .collect()
>   val t1 = System.nanoTime()
>   (t1 - t0) / 1e9
> }
> val timings = for (i <- 1 to 3) yield {
>   spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true)
>   val with_wholestage = test()
>   spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false)
>   val without_wholestage = test()
>   (with_wholestage, without_wholestage)
> }
> timings.foreach(println)
> println("Press enter ...")
> System.in.read()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org