[jira] [Updated] (SPARK-23791) Sub-optimal generated code for sum aggregating

2018-03-24 Thread Valentin Nikotin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Valentin Nikotin updated SPARK-23791:
-
Summary: Sub-optimal generated code for sum aggregating  (was: Sub-optimal 
generated code when aggregating nullable columns)

> Sub-optimal generated code for sum aggregating
> --
>
> Key: SPARK-23791
> URL: https://issues.apache.org/jira/browse/SPARK-23791
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Valentin Nikotin
>Priority: Major
>  Labels: performance
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> It appears to be that with wholeStage codegen enabled simple spark job 
> performing sum aggregation of 50 nullable columns runs ~4 timer slower than 
> without wholeStage codegen.
> Please check test case code. Please note that udf is only to prevent 
> elimination optimizations that could be applied to literals. 
> {code:scala}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.{Column, DataFrame, SparkSession}
> import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
> object SPARK_23791 {
>   def main(args: Array[String]): Unit = {
> val spark = SparkSession
>   .builder()
>   .master("local[4]")
>   .appName("test")
>   .getOrCreate()
> def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: 
> DataFrame) =
>   (0 until cnt).foldLeft(inputDF)((df, idx) => 
> df.withColumn(s"$prefix$idx", value))
> val dummy = udf(() => Option.empty[Int])
> def test(cnt: Int = 50, rows: Int = 500, grps: Int = 1000): Double = {
>   val t0 = System.nanoTime()
>   spark.range(rows).toDF()
> .withColumn("grp", col("id").mod(grps))
> .transform(addConstColumns("null_", cnt, dummy()))
> .groupBy("grp")
> .agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*)
> .collect()
>   val t1 = System.nanoTime()
>   (t1 - t0) / 1e9
> }
> val timings = for (i <- 1 to 3) yield {
>   spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true)
>   val with_wholestage = test()
>   spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false)
>   val without_wholestage = test()
>   (with_wholestage, without_wholestage)
> }
> timings.foreach(println)
> println("Press enter ...")
> System.in.read()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-23791) Sub-optimal generated code for sum aggregating

2018-03-24 Thread Valentin Nikotin (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Valentin Nikotin updated SPARK-23791:
-
Description: 
It appears to be that with wholeStage codegen enabled simple spark job 
performing sum aggregation of 50 columns runs ~4 timer slower than without 
wholeStage codegen.

Please check test case code. Please note that udf is only to prevent 
elimination optimizations that could be applied to literals. 


{code:scala}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED

object SPARK_23791 {

  def main(args: Array[String]): Unit = {

val spark = SparkSession
  .builder()
  .master("local[4]")
  .appName("test")
  .getOrCreate()

def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: 
DataFrame) =
  (0 until cnt).foldLeft(inputDF)((df, idx) => 
df.withColumn(s"$prefix$idx", value))

val dummy = udf(() => Option.empty[Int])

def test(cnt: Int = 50, rows: Int = 500, grps: Int = 1000): Double = {
  val t0 = System.nanoTime()
  spark.range(rows).toDF()
.withColumn("grp", col("id").mod(grps))
.transform(addConstColumns("null_", cnt, dummy()))
.groupBy("grp")
.agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*)
.collect()
  val t1 = System.nanoTime()
  (t1 - t0) / 1e9
}

val timings = for (i <- 1 to 3) yield {
  spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true)
  val with_wholestage = test()
  spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false)
  val without_wholestage = test()
  (with_wholestage, without_wholestage)
}

timings.foreach(println)

println("Press enter ...")
System.in.read()
  }
}
{code}


  was:
It appears to be that with wholeStage codegen enabled simple spark job 
performing sum aggregation of 50 nullable columns runs ~4 timer slower than 
without wholeStage codegen.

Please check test case code. Please note that udf is only to prevent 
elimination optimizations that could be applied to literals. 


{code:scala}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED

object SPARK_23791 {

  def main(args: Array[String]): Unit = {

val spark = SparkSession
  .builder()
  .master("local[4]")
  .appName("test")
  .getOrCreate()

def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: 
DataFrame) =
  (0 until cnt).foldLeft(inputDF)((df, idx) => 
df.withColumn(s"$prefix$idx", value))

val dummy = udf(() => Option.empty[Int])

def test(cnt: Int = 50, rows: Int = 500, grps: Int = 1000): Double = {
  val t0 = System.nanoTime()
  spark.range(rows).toDF()
.withColumn("grp", col("id").mod(grps))
.transform(addConstColumns("null_", cnt, dummy()))
.groupBy("grp")
.agg(sum("null_0"), (1 until cnt).map(idx => sum(s"null_$idx")): _*)
.collect()
  val t1 = System.nanoTime()
  (t1 - t0) / 1e9
}

val timings = for (i <- 1 to 3) yield {
  spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, true)
  val with_wholestage = test()
  spark.sessionState.conf.setConf(WHOLESTAGE_CODEGEN_ENABLED, false)
  val without_wholestage = test()
  (with_wholestage, without_wholestage)
}

timings.foreach(println)

println("Press enter ...")
System.in.read()
  }
}
{code}



> Sub-optimal generated code for sum aggregating
> --
>
> Key: SPARK-23791
> URL: https://issues.apache.org/jira/browse/SPARK-23791
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Valentin Nikotin
>Priority: Major
>  Labels: performance
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> It appears to be that with wholeStage codegen enabled simple spark job 
> performing sum aggregation of 50 columns runs ~4 timer slower than without 
> wholeStage codegen.
> Please check test case code. Please note that udf is only to prevent 
> elimination optimizations that could be applied to literals. 
> {code:scala}
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.{Column, DataFrame, SparkSession}
> import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
> object SPARK_23791 {
>   def main(args: Array[String]): Unit = {
> val spark = SparkSession
>   .builder()
>   .master("local[4]")
>   .appName("test")
>   .getOrCreate()
> def addConstColumns(prefix: String, cnt: Int, value: Column)(inputDF: 
> DataFrame