[jira] [Commented] (SPARK-10953) Benchmark codegen vs. hand-written code for univariate statistics

2015-10-06 Thread Xiangrui Meng (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14945409#comment-14945409
 ] 

Xiangrui Meng commented on SPARK-10953:
---

[~JihongMA] Do you have time to benchmark this?

> Benchmark codegen vs. hand-written code for univariate statistics
> -
>
> Key: SPARK-10953
> URL: https://issues.apache.org/jira/browse/SPARK-10953
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Xiangrui Meng
>
> I checked the generated code for a simple stddev_pop call:
> {code}
> val df = sqlContext.range(100)
> df.select(stddev_pop(col("id"))).show()
> {code}
> This is the generated code for the merge part, which is very long and 
> complex. I'm not sure whether we can get benefit from the code generation for 
> univariate statistics. We should benchmark it against Scala implementation.
> {code}
> 15/10/06 10:10:57 DEBUG GenerateMutableProjection: code for if 
> (isnull(input[1, DoubleType])) cast(0 as double) else input[1, DoubleType],if 
> (isnull(input[1, DoubleType])) input[6, DoubleType] else if (isnull(input[6, 
> DoubleType])) input[1, DoubleType] else (input[1, DoubleType] + input[6, 
> DoubleType]),if (isnull(input[3, DoubleType])) cast(0 as double) else 
> input[3, DoubleType],if (isnull(input[3, DoubleType])) input[8, DoubleType] 
> else if (isnull(input[8, DoubleType])) input[3, DoubleType] else (((input[3, 
> DoubleType] * input[0, DoubleType]) + (input[8, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType])),if 
> (isnull(input[4, DoubleType])) input[9, DoubleType] else if (isnull(input[9, 
> DoubleType])) input[4, DoubleType] else ((input[4, DoubleType] + input[9, 
> DoubleType]) + input[8, DoubleType] - input[2, DoubleType]) * (input[8, 
> DoubleType] - input[2, DoubleType])) * (input[0, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType]))):
> public Object generate(org.apache.spark.sql.catalyst.expressions.Expression[] 
> expr) {
>   return new SpecificMutableProjection(expr);
> }
> class SpecificMutableProjection extends 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
>   private org.apache.spark.sql.catalyst.expressions.Expression[] expressions;
>   private org.apache.spark.sql.catalyst.expressions.MutableRow mutableRow;
>   public 
> SpecificMutableProjection(org.apache.spark.sql.catalyst.expressions.Expression[]
>  expr) {
> expressions = expr;
> mutableRow = new 
> org.apache.spark.sql.catalyst.expressions.GenericMutableRow(5);
>   }
>   public 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection 
> target(org.apache.spark.sql.catalyst.expressions.MutableRow row) {
> mutableRow = row;
> return this;
>   }
>   /* Provide immutable access to the last projected row. */
>   public InternalRow currentValue() {
> return (InternalRow) mutableRow;
>   }
>   public Object apply(Object _i) {
> InternalRow i = (InternalRow) _i;
> /* if (isnull(input[1, DoubleType])) cast(0 as double) else input[1, 
> DoubleType] */
> /* isnull(input[1, DoubleType]) */
> /* input[1, DoubleType] */
> boolean isNull4 = i.isNullAt(1);
> double primitive5 = isNull4 ? -1.0 : (i.getDouble(1));
> boolean isNull0 = false;
> double primitive1 = -1.0;
> if (!false && isNull4) {
>   /* cast(0 as double) */
>   /* 0 */
>   boolean isNull6 = false;
>   double primitive7 = -1.0;
>   if (!false) {
> primitive7 = (double) 0;
>   }
>   isNull0 = isNull6;
>   primitive1 = primitive7;
> } else {
>   /* input[1, DoubleType] */
>   boolean isNull10 = i.isNullAt(1);
>   double primitive11 = isNull10 ? -1.0 : (i.getDouble(1));
>   isNull0 = isNull10;
>   primitive1 = primitive11;
> }
> if (isNull0) {
>   mutableRow.setNullAt(0);
> } else {
>   mutableRow.setDouble(0, primitive1);
> }
> /* if (isnull(input[1, DoubleType])) input[6, DoubleType] else if 
> (isnull(input[6, DoubleType])) input[1, DoubleType] else (input[1, 
> DoubleType] + input[6, DoubleType]) */
> /* isnull(input[1, DoubleType]) */
> /* input[1, DoubleType] */
> boolean isNull16 = i.isNullAt(1);
> double primitive17 = isNull16 ? -1.0 : (i.getDouble(1));
> boolean isNull12 = false;
> double primitive13 = -1.0;
> if (!false && isNull16) {
>   /* input[6, DoubleType] */
>   boolean isNull18 = i.isNullAt(6);
>   double primitive19 = isNull18 ? -1.0 : (i.getDouble(6));
>   isNull12 = isNull18;
>   primitive13 = primitive19;
> } else {
>   /* if (isnull(input[6, DoubleType])) input[1, DoubleType] else 
> (input[1, DoubleType] + input[6, DoubleType]) */
>   /* isnull(input[6, DoubleType])

[jira] [Commented] (SPARK-10953) Benchmark codegen vs. hand-written code for univariate statistics

2015-10-06 Thread Jihong MA (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14945532#comment-14945532
 ] 

Jihong MA commented on SPARK-10953:
---

[~mengxr] do you mean comparing an implementation which operate directly at RDD 
level vs. leveraging UDAF framework?  like what has been done under 
sql/core/src/main/scala/org/apache/spark/sql/execution/stat/. 

> Benchmark codegen vs. hand-written code for univariate statistics
> -
>
> Key: SPARK-10953
> URL: https://issues.apache.org/jira/browse/SPARK-10953
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Xiangrui Meng
>
> I checked the generated code for a simple stddev_pop call:
> {code}
> val df = sqlContext.range(100)
> df.select(stddev_pop(col("id"))).show()
> {code}
> This is the generated code for the merge part, which is very long and 
> complex. I'm not sure whether we can get benefit from the code generation for 
> univariate statistics. We should benchmark it against Scala implementation.
> {code}
> 15/10/06 10:10:57 DEBUG GenerateMutableProjection: code for if 
> (isnull(input[1, DoubleType])) cast(0 as double) else input[1, DoubleType],if 
> (isnull(input[1, DoubleType])) input[6, DoubleType] else if (isnull(input[6, 
> DoubleType])) input[1, DoubleType] else (input[1, DoubleType] + input[6, 
> DoubleType]),if (isnull(input[3, DoubleType])) cast(0 as double) else 
> input[3, DoubleType],if (isnull(input[3, DoubleType])) input[8, DoubleType] 
> else if (isnull(input[8, DoubleType])) input[3, DoubleType] else (((input[3, 
> DoubleType] * input[0, DoubleType]) + (input[8, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType])),if 
> (isnull(input[4, DoubleType])) input[9, DoubleType] else if (isnull(input[9, 
> DoubleType])) input[4, DoubleType] else ((input[4, DoubleType] + input[9, 
> DoubleType]) + input[8, DoubleType] - input[2, DoubleType]) * (input[8, 
> DoubleType] - input[2, DoubleType])) * (input[0, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType]))):
> public Object generate(org.apache.spark.sql.catalyst.expressions.Expression[] 
> expr) {
>   return new SpecificMutableProjection(expr);
> }
> class SpecificMutableProjection extends 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
>   private org.apache.spark.sql.catalyst.expressions.Expression[] expressions;
>   private org.apache.spark.sql.catalyst.expressions.MutableRow mutableRow;
>   public 
> SpecificMutableProjection(org.apache.spark.sql.catalyst.expressions.Expression[]
>  expr) {
> expressions = expr;
> mutableRow = new 
> org.apache.spark.sql.catalyst.expressions.GenericMutableRow(5);
>   }
>   public 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection 
> target(org.apache.spark.sql.catalyst.expressions.MutableRow row) {
> mutableRow = row;
> return this;
>   }
>   /* Provide immutable access to the last projected row. */
>   public InternalRow currentValue() {
> return (InternalRow) mutableRow;
>   }
>   public Object apply(Object _i) {
> InternalRow i = (InternalRow) _i;
> /* if (isnull(input[1, DoubleType])) cast(0 as double) else input[1, 
> DoubleType] */
> /* isnull(input[1, DoubleType]) */
> /* input[1, DoubleType] */
> boolean isNull4 = i.isNullAt(1);
> double primitive5 = isNull4 ? -1.0 : (i.getDouble(1));
> boolean isNull0 = false;
> double primitive1 = -1.0;
> if (!false && isNull4) {
>   /* cast(0 as double) */
>   /* 0 */
>   boolean isNull6 = false;
>   double primitive7 = -1.0;
>   if (!false) {
> primitive7 = (double) 0;
>   }
>   isNull0 = isNull6;
>   primitive1 = primitive7;
> } else {
>   /* input[1, DoubleType] */
>   boolean isNull10 = i.isNullAt(1);
>   double primitive11 = isNull10 ? -1.0 : (i.getDouble(1));
>   isNull0 = isNull10;
>   primitive1 = primitive11;
> }
> if (isNull0) {
>   mutableRow.setNullAt(0);
> } else {
>   mutableRow.setDouble(0, primitive1);
> }
> /* if (isnull(input[1, DoubleType])) input[6, DoubleType] else if 
> (isnull(input[6, DoubleType])) input[1, DoubleType] else (input[1, 
> DoubleType] + input[6, DoubleType]) */
> /* isnull(input[1, DoubleType]) */
> /* input[1, DoubleType] */
> boolean isNull16 = i.isNullAt(1);
> double primitive17 = isNull16 ? -1.0 : (i.getDouble(1));
> boolean isNull12 = false;
> double primitive13 = -1.0;
> if (!false && isNull16) {
>   /* input[6, DoubleType] */
>   boolean isNull18 = i.isNullAt(6);
>   double primitive19 = isNull18 ? -1.0 : (i.getDouble(6));
>   isNull12 = isNull18;
>   primitive13 = primitive19;
> } else {
>

[jira] [Commented] (SPARK-10953) Benchmark codegen vs. hand-written code for univariate statistics

2015-10-06 Thread Xiangrui Meng (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14945920#comment-14945920
 ] 

Xiangrui Meng commented on SPARK-10953:
---

The simple one is to test StatCounter with RDDs and the StdDevAgg 
implementation in Spark master. Then we can put an alternative implementation 
that implements AggregateFunction, similar to HyperLogLogPlusPlus under 
functions.scala.

> Benchmark codegen vs. hand-written code for univariate statistics
> -
>
> Key: SPARK-10953
> URL: https://issues.apache.org/jira/browse/SPARK-10953
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Xiangrui Meng
>
> I checked the generated code for a simple stddev_pop call:
> {code}
> val df = sqlContext.range(100)
> df.select(stddev_pop(col("id"))).show()
> {code}
> This is the generated code for the merge part, which is very long and 
> complex. I'm not sure whether we can get benefit from the code generation for 
> univariate statistics. We should benchmark it against Scala implementation.
> {code}
> 15/10/06 10:10:57 DEBUG GenerateMutableProjection: code for if 
> (isnull(input[1, DoubleType])) cast(0 as double) else input[1, DoubleType],if 
> (isnull(input[1, DoubleType])) input[6, DoubleType] else if (isnull(input[6, 
> DoubleType])) input[1, DoubleType] else (input[1, DoubleType] + input[6, 
> DoubleType]),if (isnull(input[3, DoubleType])) cast(0 as double) else 
> input[3, DoubleType],if (isnull(input[3, DoubleType])) input[8, DoubleType] 
> else if (isnull(input[8, DoubleType])) input[3, DoubleType] else (((input[3, 
> DoubleType] * input[0, DoubleType]) + (input[8, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType])),if 
> (isnull(input[4, DoubleType])) input[9, DoubleType] else if (isnull(input[9, 
> DoubleType])) input[4, DoubleType] else ((input[4, DoubleType] + input[9, 
> DoubleType]) + input[8, DoubleType] - input[2, DoubleType]) * (input[8, 
> DoubleType] - input[2, DoubleType])) * (input[0, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType]))):
> public Object generate(org.apache.spark.sql.catalyst.expressions.Expression[] 
> expr) {
>   return new SpecificMutableProjection(expr);
> }
> class SpecificMutableProjection extends 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
>   private org.apache.spark.sql.catalyst.expressions.Expression[] expressions;
>   private org.apache.spark.sql.catalyst.expressions.MutableRow mutableRow;
>   public 
> SpecificMutableProjection(org.apache.spark.sql.catalyst.expressions.Expression[]
>  expr) {
> expressions = expr;
> mutableRow = new 
> org.apache.spark.sql.catalyst.expressions.GenericMutableRow(5);
>   }
>   public 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection 
> target(org.apache.spark.sql.catalyst.expressions.MutableRow row) {
> mutableRow = row;
> return this;
>   }
>   /* Provide immutable access to the last projected row. */
>   public InternalRow currentValue() {
> return (InternalRow) mutableRow;
>   }
>   public Object apply(Object _i) {
> InternalRow i = (InternalRow) _i;
> /* if (isnull(input[1, DoubleType])) cast(0 as double) else input[1, 
> DoubleType] */
> /* isnull(input[1, DoubleType]) */
> /* input[1, DoubleType] */
> boolean isNull4 = i.isNullAt(1);
> double primitive5 = isNull4 ? -1.0 : (i.getDouble(1));
> boolean isNull0 = false;
> double primitive1 = -1.0;
> if (!false && isNull4) {
>   /* cast(0 as double) */
>   /* 0 */
>   boolean isNull6 = false;
>   double primitive7 = -1.0;
>   if (!false) {
> primitive7 = (double) 0;
>   }
>   isNull0 = isNull6;
>   primitive1 = primitive7;
> } else {
>   /* input[1, DoubleType] */
>   boolean isNull10 = i.isNullAt(1);
>   double primitive11 = isNull10 ? -1.0 : (i.getDouble(1));
>   isNull0 = isNull10;
>   primitive1 = primitive11;
> }
> if (isNull0) {
>   mutableRow.setNullAt(0);
> } else {
>   mutableRow.setDouble(0, primitive1);
> }
> /* if (isnull(input[1, DoubleType])) input[6, DoubleType] else if 
> (isnull(input[6, DoubleType])) input[1, DoubleType] else (input[1, 
> DoubleType] + input[6, DoubleType]) */
> /* isnull(input[1, DoubleType]) */
> /* input[1, DoubleType] */
> boolean isNull16 = i.isNullAt(1);
> double primitive17 = isNull16 ? -1.0 : (i.getDouble(1));
> boolean isNull12 = false;
> double primitive13 = -1.0;
> if (!false && isNull16) {
>   /* input[6, DoubleType] */
>   boolean isNull18 = i.isNullAt(6);
>   double primitive19 = isNull18 ? -1.0 : (i.getDouble(6));
>   isNull12 = isNull18;
>   primitive13 = pr

[jira] [Commented] (SPARK-10953) Benchmark codegen vs. hand-written code for univariate statistics

2015-10-06 Thread Jihong MA (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14946061#comment-14946061
 ] 

Jihong MA commented on SPARK-10953:
---


it should not be too hard to put together an implementation based on 
AggregateFunction2 interface (e.g HyperLogLogPlusPlus) , would it make more 
sense to compare implementation with AggregateFunction2 vs. AlgebraicAggregate, 
if we will go for AggregateFunction2 as a better alternative based on the 
result.

or we could do as you suggested ,  comparing between rdd.stats() and 
df.describe() where describe uses UDAF AlgebraicAggregate internally. we need a 
bigger cluster (bare-medal or on cloud ), not sure when we can have it, and are 
there performance tools to generate data for testing purpose? 



> Benchmark codegen vs. hand-written code for univariate statistics
> -
>
> Key: SPARK-10953
> URL: https://issues.apache.org/jira/browse/SPARK-10953
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Xiangrui Meng
>
> I checked the generated code for a simple stddev_pop call:
> {code}
> val df = sqlContext.range(100)
> df.select(stddev_pop(col("id"))).show()
> {code}
> This is the generated code for the merge part, which is very long and 
> complex. I'm not sure whether we can get benefit from the code generation for 
> univariate statistics. We should benchmark it against Scala implementation.
> {code}
> 15/10/06 10:10:57 DEBUG GenerateMutableProjection: code for if 
> (isnull(input[1, DoubleType])) cast(0 as double) else input[1, DoubleType],if 
> (isnull(input[1, DoubleType])) input[6, DoubleType] else if (isnull(input[6, 
> DoubleType])) input[1, DoubleType] else (input[1, DoubleType] + input[6, 
> DoubleType]),if (isnull(input[3, DoubleType])) cast(0 as double) else 
> input[3, DoubleType],if (isnull(input[3, DoubleType])) input[8, DoubleType] 
> else if (isnull(input[8, DoubleType])) input[3, DoubleType] else (((input[3, 
> DoubleType] * input[0, DoubleType]) + (input[8, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType])),if 
> (isnull(input[4, DoubleType])) input[9, DoubleType] else if (isnull(input[9, 
> DoubleType])) input[4, DoubleType] else ((input[4, DoubleType] + input[9, 
> DoubleType]) + input[8, DoubleType] - input[2, DoubleType]) * (input[8, 
> DoubleType] - input[2, DoubleType])) * (input[0, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType]))):
> public Object generate(org.apache.spark.sql.catalyst.expressions.Expression[] 
> expr) {
>   return new SpecificMutableProjection(expr);
> }
> class SpecificMutableProjection extends 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
>   private org.apache.spark.sql.catalyst.expressions.Expression[] expressions;
>   private org.apache.spark.sql.catalyst.expressions.MutableRow mutableRow;
>   public 
> SpecificMutableProjection(org.apache.spark.sql.catalyst.expressions.Expression[]
>  expr) {
> expressions = expr;
> mutableRow = new 
> org.apache.spark.sql.catalyst.expressions.GenericMutableRow(5);
>   }
>   public 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection 
> target(org.apache.spark.sql.catalyst.expressions.MutableRow row) {
> mutableRow = row;
> return this;
>   }
>   /* Provide immutable access to the last projected row. */
>   public InternalRow currentValue() {
> return (InternalRow) mutableRow;
>   }
>   public Object apply(Object _i) {
> InternalRow i = (InternalRow) _i;
> /* if (isnull(input[1, DoubleType])) cast(0 as double) else input[1, 
> DoubleType] */
> /* isnull(input[1, DoubleType]) */
> /* input[1, DoubleType] */
> boolean isNull4 = i.isNullAt(1);
> double primitive5 = isNull4 ? -1.0 : (i.getDouble(1));
> boolean isNull0 = false;
> double primitive1 = -1.0;
> if (!false && isNull4) {
>   /* cast(0 as double) */
>   /* 0 */
>   boolean isNull6 = false;
>   double primitive7 = -1.0;
>   if (!false) {
> primitive7 = (double) 0;
>   }
>   isNull0 = isNull6;
>   primitive1 = primitive7;
> } else {
>   /* input[1, DoubleType] */
>   boolean isNull10 = i.isNullAt(1);
>   double primitive11 = isNull10 ? -1.0 : (i.getDouble(1));
>   isNull0 = isNull10;
>   primitive1 = primitive11;
> }
> if (isNull0) {
>   mutableRow.setNullAt(0);
> } else {
>   mutableRow.setDouble(0, primitive1);
> }
> /* if (isnull(input[1, DoubleType])) input[6, DoubleType] else if 
> (isnull(input[6, DoubleType])) input[1, DoubleType] else (input[1, 
> DoubleType] + input[6, DoubleType]) */
> /* isnull(input[1, DoubleType]) */
> /* input[1, DoubleType] */
> boolean isNull16 = i.is

[jira] [Commented] (SPARK-10953) Benchmark codegen vs. hand-written code for univariate statistics

2015-10-08 Thread Jihong MA (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14949797#comment-14949797
 ] 

Jihong MA commented on SPARK-10953:
---

we should have a cluster for testing next Monday. we will run the performance 
comparison between ImperativeAggregate vs. DeclarativeAggregate (initially 
named AggregateFunction2 vs. AlgebraicAggregate). 
[~yhuai] my understanding of  the difference of these two: 
 DeclarativeAggregate directly manipulate the aggregatebuffer where 
ImperativeAggregate express aggregates as expression, at runtime, Declarative 
uses TungstenAggregate where ImperativeAggregate uses SortBasedAggregate? 
please clarify if it is correct. Thanks!

> Benchmark codegen vs. hand-written code for univariate statistics
> -
>
> Key: SPARK-10953
> URL: https://issues.apache.org/jira/browse/SPARK-10953
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Xiangrui Meng
>
> I checked the generated code for a simple stddev_pop call:
> {code}
> val df = sqlContext.range(100)
> df.select(stddev_pop(col("id"))).show()
> {code}
> This is the generated code for the merge part, which is very long and 
> complex. I'm not sure whether we can get benefit from the code generation for 
> univariate statistics. We should benchmark it against Scala implementation.
> {code}
> 15/10/06 10:10:57 DEBUG GenerateMutableProjection: code for if 
> (isnull(input[1, DoubleType])) cast(0 as double) else input[1, DoubleType],if 
> (isnull(input[1, DoubleType])) input[6, DoubleType] else if (isnull(input[6, 
> DoubleType])) input[1, DoubleType] else (input[1, DoubleType] + input[6, 
> DoubleType]),if (isnull(input[3, DoubleType])) cast(0 as double) else 
> input[3, DoubleType],if (isnull(input[3, DoubleType])) input[8, DoubleType] 
> else if (isnull(input[8, DoubleType])) input[3, DoubleType] else (((input[3, 
> DoubleType] * input[0, DoubleType]) + (input[8, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType])),if 
> (isnull(input[4, DoubleType])) input[9, DoubleType] else if (isnull(input[9, 
> DoubleType])) input[4, DoubleType] else ((input[4, DoubleType] + input[9, 
> DoubleType]) + input[8, DoubleType] - input[2, DoubleType]) * (input[8, 
> DoubleType] - input[2, DoubleType])) * (input[0, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType]))):
> public Object generate(org.apache.spark.sql.catalyst.expressions.Expression[] 
> expr) {
>   return new SpecificMutableProjection(expr);
> }
> class SpecificMutableProjection extends 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
>   private org.apache.spark.sql.catalyst.expressions.Expression[] expressions;
>   private org.apache.spark.sql.catalyst.expressions.MutableRow mutableRow;
>   public 
> SpecificMutableProjection(org.apache.spark.sql.catalyst.expressions.Expression[]
>  expr) {
> expressions = expr;
> mutableRow = new 
> org.apache.spark.sql.catalyst.expressions.GenericMutableRow(5);
>   }
>   public 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection 
> target(org.apache.spark.sql.catalyst.expressions.MutableRow row) {
> mutableRow = row;
> return this;
>   }
>   /* Provide immutable access to the last projected row. */
>   public InternalRow currentValue() {
> return (InternalRow) mutableRow;
>   }
>   public Object apply(Object _i) {
> InternalRow i = (InternalRow) _i;
> /* if (isnull(input[1, DoubleType])) cast(0 as double) else input[1, 
> DoubleType] */
> /* isnull(input[1, DoubleType]) */
> /* input[1, DoubleType] */
> boolean isNull4 = i.isNullAt(1);
> double primitive5 = isNull4 ? -1.0 : (i.getDouble(1));
> boolean isNull0 = false;
> double primitive1 = -1.0;
> if (!false && isNull4) {
>   /* cast(0 as double) */
>   /* 0 */
>   boolean isNull6 = false;
>   double primitive7 = -1.0;
>   if (!false) {
> primitive7 = (double) 0;
>   }
>   isNull0 = isNull6;
>   primitive1 = primitive7;
> } else {
>   /* input[1, DoubleType] */
>   boolean isNull10 = i.isNullAt(1);
>   double primitive11 = isNull10 ? -1.0 : (i.getDouble(1));
>   isNull0 = isNull10;
>   primitive1 = primitive11;
> }
> if (isNull0) {
>   mutableRow.setNullAt(0);
> } else {
>   mutableRow.setDouble(0, primitive1);
> }
> /* if (isnull(input[1, DoubleType])) input[6, DoubleType] else if 
> (isnull(input[6, DoubleType])) input[1, DoubleType] else (input[1, 
> DoubleType] + input[6, DoubleType]) */
> /* isnull(input[1, DoubleType]) */
> /* input[1, DoubleType] */
> boolean isNull16 = i.isNullAt(1);
> double primitive17 = isNull16 ? -1.0 : (i.getDouble(1));
> 

[jira] [Commented] (SPARK-10953) Benchmark codegen vs. hand-written code for univariate statistics

2015-10-09 Thread Yin Huai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14950600#comment-14950600
 ] 

Yin Huai commented on SPARK-10953:
--

[~JihongMA] Yes. ImperativeAggregates are based on expressions, which we will 
do code-gen at runtime. We have a PR to make TungstenAggregate support certain 
ImperativeAggregates. You can find the PR at 
https://github.com/apache/spark/pull/9038. I guess it makes sense to do the 
test based on this one since we can rule out impacts of performance from 
different operator implementations and focus just on the agg expression 
implementation.I think at here, we can write a ImperativeAggregate version of 
Stddev and use TungstenAggregate to run the query. So, we can see the 
performance difference between ImperativeAggregate version and 
DeclarativeAggregate version.

Another way to test is to use SortBasedAggregate to run both versions. Maybe it 
is a good starting point while we are finalizing 
https://github.com/apache/spark/pull/9038.

> Benchmark codegen vs. hand-written code for univariate statistics
> -
>
> Key: SPARK-10953
> URL: https://issues.apache.org/jira/browse/SPARK-10953
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Xiangrui Meng
>Assignee: Jihong MA
>
> I checked the generated code for a simple stddev_pop call:
> {code}
> val df = sqlContext.range(100)
> df.select(stddev_pop(col("id"))).show()
> {code}
> This is the generated code for the merge part, which is very long and 
> complex. I'm not sure whether we can get benefit from the code generation for 
> univariate statistics. We should benchmark it against Scala implementation.
> {code}
> 15/10/06 10:10:57 DEBUG GenerateMutableProjection: code for if 
> (isnull(input[1, DoubleType])) cast(0 as double) else input[1, DoubleType],if 
> (isnull(input[1, DoubleType])) input[6, DoubleType] else if (isnull(input[6, 
> DoubleType])) input[1, DoubleType] else (input[1, DoubleType] + input[6, 
> DoubleType]),if (isnull(input[3, DoubleType])) cast(0 as double) else 
> input[3, DoubleType],if (isnull(input[3, DoubleType])) input[8, DoubleType] 
> else if (isnull(input[8, DoubleType])) input[3, DoubleType] else (((input[3, 
> DoubleType] * input[0, DoubleType]) + (input[8, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType])),if 
> (isnull(input[4, DoubleType])) input[9, DoubleType] else if (isnull(input[9, 
> DoubleType])) input[4, DoubleType] else ((input[4, DoubleType] + input[9, 
> DoubleType]) + input[8, DoubleType] - input[2, DoubleType]) * (input[8, 
> DoubleType] - input[2, DoubleType])) * (input[0, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType]))):
> public Object generate(org.apache.spark.sql.catalyst.expressions.Expression[] 
> expr) {
>   return new SpecificMutableProjection(expr);
> }
> class SpecificMutableProjection extends 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
>   private org.apache.spark.sql.catalyst.expressions.Expression[] expressions;
>   private org.apache.spark.sql.catalyst.expressions.MutableRow mutableRow;
>   public 
> SpecificMutableProjection(org.apache.spark.sql.catalyst.expressions.Expression[]
>  expr) {
> expressions = expr;
> mutableRow = new 
> org.apache.spark.sql.catalyst.expressions.GenericMutableRow(5);
>   }
>   public 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection 
> target(org.apache.spark.sql.catalyst.expressions.MutableRow row) {
> mutableRow = row;
> return this;
>   }
>   /* Provide immutable access to the last projected row. */
>   public InternalRow currentValue() {
> return (InternalRow) mutableRow;
>   }
>   public Object apply(Object _i) {
> InternalRow i = (InternalRow) _i;
> /* if (isnull(input[1, DoubleType])) cast(0 as double) else input[1, 
> DoubleType] */
> /* isnull(input[1, DoubleType]) */
> /* input[1, DoubleType] */
> boolean isNull4 = i.isNullAt(1);
> double primitive5 = isNull4 ? -1.0 : (i.getDouble(1));
> boolean isNull0 = false;
> double primitive1 = -1.0;
> if (!false && isNull4) {
>   /* cast(0 as double) */
>   /* 0 */
>   boolean isNull6 = false;
>   double primitive7 = -1.0;
>   if (!false) {
> primitive7 = (double) 0;
>   }
>   isNull0 = isNull6;
>   primitive1 = primitive7;
> } else {
>   /* input[1, DoubleType] */
>   boolean isNull10 = i.isNullAt(1);
>   double primitive11 = isNull10 ? -1.0 : (i.getDouble(1));
>   isNull0 = isNull10;
>   primitive1 = primitive11;
> }
> if (isNull0) {
>   mutableRow.setNullAt(0);
> } else {
>   mutableRow.setDouble(0, primitive1);
> }
> /* if (isnul

[jira] [Commented] (SPARK-10953) Benchmark codegen vs. hand-written code for univariate statistics

2015-10-12 Thread Xiangrui Meng (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14953842#comment-14953842
 ] 

Xiangrui Meng commented on SPARK-10953:
---

[~JihongMA] Any updates? I don't think it is necessary to benchmark it on a 
cluster. One a single machine with a huge number of values, we should be able 
to see the difference.

[~yhuai] A question for you. If we have expressions:

{code}
a = b + c
d = a * a
e = a * d
{code}

`d` will be translated to `(b + c) * (b + c)` and `e` will be translated to `(b 
+ c) * (b + c) * (b + c)` without common sub-express elimination, correct?


> Benchmark codegen vs. hand-written code for univariate statistics
> -
>
> Key: SPARK-10953
> URL: https://issues.apache.org/jira/browse/SPARK-10953
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Xiangrui Meng
>Assignee: Jihong MA
>
> I checked the generated code for a simple stddev_pop call:
> {code}
> val df = sqlContext.range(100)
> df.select(stddev_pop(col("id"))).show()
> {code}
> This is the generated code for the merge part, which is very long and 
> complex. I'm not sure whether we can get benefit from the code generation for 
> univariate statistics. We should benchmark it against Scala implementation.
> {code}
> 15/10/06 10:10:57 DEBUG GenerateMutableProjection: code for if 
> (isnull(input[1, DoubleType])) cast(0 as double) else input[1, DoubleType],if 
> (isnull(input[1, DoubleType])) input[6, DoubleType] else if (isnull(input[6, 
> DoubleType])) input[1, DoubleType] else (input[1, DoubleType] + input[6, 
> DoubleType]),if (isnull(input[3, DoubleType])) cast(0 as double) else 
> input[3, DoubleType],if (isnull(input[3, DoubleType])) input[8, DoubleType] 
> else if (isnull(input[8, DoubleType])) input[3, DoubleType] else (((input[3, 
> DoubleType] * input[0, DoubleType]) + (input[8, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType])),if 
> (isnull(input[4, DoubleType])) input[9, DoubleType] else if (isnull(input[9, 
> DoubleType])) input[4, DoubleType] else ((input[4, DoubleType] + input[9, 
> DoubleType]) + input[8, DoubleType] - input[2, DoubleType]) * (input[8, 
> DoubleType] - input[2, DoubleType])) * (input[0, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType]))):
> public Object generate(org.apache.spark.sql.catalyst.expressions.Expression[] 
> expr) {
>   return new SpecificMutableProjection(expr);
> }
> class SpecificMutableProjection extends 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
>   private org.apache.spark.sql.catalyst.expressions.Expression[] expressions;
>   private org.apache.spark.sql.catalyst.expressions.MutableRow mutableRow;
>   public 
> SpecificMutableProjection(org.apache.spark.sql.catalyst.expressions.Expression[]
>  expr) {
> expressions = expr;
> mutableRow = new 
> org.apache.spark.sql.catalyst.expressions.GenericMutableRow(5);
>   }
>   public 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection 
> target(org.apache.spark.sql.catalyst.expressions.MutableRow row) {
> mutableRow = row;
> return this;
>   }
>   /* Provide immutable access to the last projected row. */
>   public InternalRow currentValue() {
> return (InternalRow) mutableRow;
>   }
>   public Object apply(Object _i) {
> InternalRow i = (InternalRow) _i;
> /* if (isnull(input[1, DoubleType])) cast(0 as double) else input[1, 
> DoubleType] */
> /* isnull(input[1, DoubleType]) */
> /* input[1, DoubleType] */
> boolean isNull4 = i.isNullAt(1);
> double primitive5 = isNull4 ? -1.0 : (i.getDouble(1));
> boolean isNull0 = false;
> double primitive1 = -1.0;
> if (!false && isNull4) {
>   /* cast(0 as double) */
>   /* 0 */
>   boolean isNull6 = false;
>   double primitive7 = -1.0;
>   if (!false) {
> primitive7 = (double) 0;
>   }
>   isNull0 = isNull6;
>   primitive1 = primitive7;
> } else {
>   /* input[1, DoubleType] */
>   boolean isNull10 = i.isNullAt(1);
>   double primitive11 = isNull10 ? -1.0 : (i.getDouble(1));
>   isNull0 = isNull10;
>   primitive1 = primitive11;
> }
> if (isNull0) {
>   mutableRow.setNullAt(0);
> } else {
>   mutableRow.setDouble(0, primitive1);
> }
> /* if (isnull(input[1, DoubleType])) input[6, DoubleType] else if 
> (isnull(input[6, DoubleType])) input[1, DoubleType] else (input[1, 
> DoubleType] + input[6, DoubleType]) */
> /* isnull(input[1, DoubleType]) */
> /* input[1, DoubleType] */
> boolean isNull16 = i.isNullAt(1);
> double primitive17 = isNull16 ? -1.0 : (i.getDouble(1));
> boolean isNull12 = false;
> double primitive13 = 

[jira] [Commented] (SPARK-10953) Benchmark codegen vs. hand-written code for univariate statistics

2015-10-12 Thread Yin Huai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14953852#comment-14953852
 ] 

Yin Huai commented on SPARK-10953:
--

Yes, I think so.

> Benchmark codegen vs. hand-written code for univariate statistics
> -
>
> Key: SPARK-10953
> URL: https://issues.apache.org/jira/browse/SPARK-10953
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Xiangrui Meng
>Assignee: Jihong MA
>
> I checked the generated code for a simple stddev_pop call:
> {code}
> val df = sqlContext.range(100)
> df.select(stddev_pop(col("id"))).show()
> {code}
> This is the generated code for the merge part, which is very long and 
> complex. I'm not sure whether we can get benefit from the code generation for 
> univariate statistics. We should benchmark it against Scala implementation.
> {code}
> 15/10/06 10:10:57 DEBUG GenerateMutableProjection: code for if 
> (isnull(input[1, DoubleType])) cast(0 as double) else input[1, DoubleType],if 
> (isnull(input[1, DoubleType])) input[6, DoubleType] else if (isnull(input[6, 
> DoubleType])) input[1, DoubleType] else (input[1, DoubleType] + input[6, 
> DoubleType]),if (isnull(input[3, DoubleType])) cast(0 as double) else 
> input[3, DoubleType],if (isnull(input[3, DoubleType])) input[8, DoubleType] 
> else if (isnull(input[8, DoubleType])) input[3, DoubleType] else (((input[3, 
> DoubleType] * input[0, DoubleType]) + (input[8, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType])),if 
> (isnull(input[4, DoubleType])) input[9, DoubleType] else if (isnull(input[9, 
> DoubleType])) input[4, DoubleType] else ((input[4, DoubleType] + input[9, 
> DoubleType]) + input[8, DoubleType] - input[2, DoubleType]) * (input[8, 
> DoubleType] - input[2, DoubleType])) * (input[0, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType]))):
> public Object generate(org.apache.spark.sql.catalyst.expressions.Expression[] 
> expr) {
>   return new SpecificMutableProjection(expr);
> }
> class SpecificMutableProjection extends 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
>   private org.apache.spark.sql.catalyst.expressions.Expression[] expressions;
>   private org.apache.spark.sql.catalyst.expressions.MutableRow mutableRow;
>   public 
> SpecificMutableProjection(org.apache.spark.sql.catalyst.expressions.Expression[]
>  expr) {
> expressions = expr;
> mutableRow = new 
> org.apache.spark.sql.catalyst.expressions.GenericMutableRow(5);
>   }
>   public 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection 
> target(org.apache.spark.sql.catalyst.expressions.MutableRow row) {
> mutableRow = row;
> return this;
>   }
>   /* Provide immutable access to the last projected row. */
>   public InternalRow currentValue() {
> return (InternalRow) mutableRow;
>   }
>   public Object apply(Object _i) {
> InternalRow i = (InternalRow) _i;
> /* if (isnull(input[1, DoubleType])) cast(0 as double) else input[1, 
> DoubleType] */
> /* isnull(input[1, DoubleType]) */
> /* input[1, DoubleType] */
> boolean isNull4 = i.isNullAt(1);
> double primitive5 = isNull4 ? -1.0 : (i.getDouble(1));
> boolean isNull0 = false;
> double primitive1 = -1.0;
> if (!false && isNull4) {
>   /* cast(0 as double) */
>   /* 0 */
>   boolean isNull6 = false;
>   double primitive7 = -1.0;
>   if (!false) {
> primitive7 = (double) 0;
>   }
>   isNull0 = isNull6;
>   primitive1 = primitive7;
> } else {
>   /* input[1, DoubleType] */
>   boolean isNull10 = i.isNullAt(1);
>   double primitive11 = isNull10 ? -1.0 : (i.getDouble(1));
>   isNull0 = isNull10;
>   primitive1 = primitive11;
> }
> if (isNull0) {
>   mutableRow.setNullAt(0);
> } else {
>   mutableRow.setDouble(0, primitive1);
> }
> /* if (isnull(input[1, DoubleType])) input[6, DoubleType] else if 
> (isnull(input[6, DoubleType])) input[1, DoubleType] else (input[1, 
> DoubleType] + input[6, DoubleType]) */
> /* isnull(input[1, DoubleType]) */
> /* input[1, DoubleType] */
> boolean isNull16 = i.isNullAt(1);
> double primitive17 = isNull16 ? -1.0 : (i.getDouble(1));
> boolean isNull12 = false;
> double primitive13 = -1.0;
> if (!false && isNull16) {
>   /* input[6, DoubleType] */
>   boolean isNull18 = i.isNullAt(6);
>   double primitive19 = isNull18 ? -1.0 : (i.getDouble(6));
>   isNull12 = isNull18;
>   primitive13 = primitive19;
> } else {
>   /* if (isnull(input[6, DoubleType])) input[1, DoubleType] else 
> (input[1, DoubleType] + input[6, DoubleType]) */
>   /* isnull(input[6, DoubleType]) */
>   

[jira] [Commented] (SPARK-10953) Benchmark codegen vs. hand-written code for univariate statistics

2015-10-12 Thread Jihong MA (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14954281#comment-14954281
 ] 

Jihong MA commented on SPARK-10953:
---

[~mengxr]as Yin indicated in the comment, we would like to merge the pull 
request for enabling TungstenAggregate support for ImperativeAggregate (pull 
9038), to make it a fair comparison between ImperativeAggregate vs. 
DeclariativeAggregate and eliminate the perf impact due to runtime difference 
(used to be SortBasedAggregate), I noticed there are issues with the pull 
request and Josh merged couple more commits later this afternoon, [~yhuai] 
would you say it is ok now to merge those code for perf testing? Thanks! 

> Benchmark codegen vs. hand-written code for univariate statistics
> -
>
> Key: SPARK-10953
> URL: https://issues.apache.org/jira/browse/SPARK-10953
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Xiangrui Meng
>Assignee: Jihong MA
>
> I checked the generated code for a simple stddev_pop call:
> {code}
> val df = sqlContext.range(100)
> df.select(stddev_pop(col("id"))).show()
> {code}
> This is the generated code for the merge part, which is very long and 
> complex. I'm not sure whether we can get benefit from the code generation for 
> univariate statistics. We should benchmark it against Scala implementation.
> {code}
> 15/10/06 10:10:57 DEBUG GenerateMutableProjection: code for if 
> (isnull(input[1, DoubleType])) cast(0 as double) else input[1, DoubleType],if 
> (isnull(input[1, DoubleType])) input[6, DoubleType] else if (isnull(input[6, 
> DoubleType])) input[1, DoubleType] else (input[1, DoubleType] + input[6, 
> DoubleType]),if (isnull(input[3, DoubleType])) cast(0 as double) else 
> input[3, DoubleType],if (isnull(input[3, DoubleType])) input[8, DoubleType] 
> else if (isnull(input[8, DoubleType])) input[3, DoubleType] else (((input[3, 
> DoubleType] * input[0, DoubleType]) + (input[8, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType])),if 
> (isnull(input[4, DoubleType])) input[9, DoubleType] else if (isnull(input[9, 
> DoubleType])) input[4, DoubleType] else ((input[4, DoubleType] + input[9, 
> DoubleType]) + input[8, DoubleType] - input[2, DoubleType]) * (input[8, 
> DoubleType] - input[2, DoubleType])) * (input[0, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType]))):
> public Object generate(org.apache.spark.sql.catalyst.expressions.Expression[] 
> expr) {
>   return new SpecificMutableProjection(expr);
> }
> class SpecificMutableProjection extends 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
>   private org.apache.spark.sql.catalyst.expressions.Expression[] expressions;
>   private org.apache.spark.sql.catalyst.expressions.MutableRow mutableRow;
>   public 
> SpecificMutableProjection(org.apache.spark.sql.catalyst.expressions.Expression[]
>  expr) {
> expressions = expr;
> mutableRow = new 
> org.apache.spark.sql.catalyst.expressions.GenericMutableRow(5);
>   }
>   public 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection 
> target(org.apache.spark.sql.catalyst.expressions.MutableRow row) {
> mutableRow = row;
> return this;
>   }
>   /* Provide immutable access to the last projected row. */
>   public InternalRow currentValue() {
> return (InternalRow) mutableRow;
>   }
>   public Object apply(Object _i) {
> InternalRow i = (InternalRow) _i;
> /* if (isnull(input[1, DoubleType])) cast(0 as double) else input[1, 
> DoubleType] */
> /* isnull(input[1, DoubleType]) */
> /* input[1, DoubleType] */
> boolean isNull4 = i.isNullAt(1);
> double primitive5 = isNull4 ? -1.0 : (i.getDouble(1));
> boolean isNull0 = false;
> double primitive1 = -1.0;
> if (!false && isNull4) {
>   /* cast(0 as double) */
>   /* 0 */
>   boolean isNull6 = false;
>   double primitive7 = -1.0;
>   if (!false) {
> primitive7 = (double) 0;
>   }
>   isNull0 = isNull6;
>   primitive1 = primitive7;
> } else {
>   /* input[1, DoubleType] */
>   boolean isNull10 = i.isNullAt(1);
>   double primitive11 = isNull10 ? -1.0 : (i.getDouble(1));
>   isNull0 = isNull10;
>   primitive1 = primitive11;
> }
> if (isNull0) {
>   mutableRow.setNullAt(0);
> } else {
>   mutableRow.setDouble(0, primitive1);
> }
> /* if (isnull(input[1, DoubleType])) input[6, DoubleType] else if 
> (isnull(input[6, DoubleType])) input[1, DoubleType] else (input[1, 
> DoubleType] + input[6, DoubleType]) */
> /* isnull(input[1, DoubleType]) */
> /* input[1, DoubleType] */
> boolean isNull16 = i.isNullAt(1);
> double primitive17 = isNull16 ? -1

[jira] [Commented] (SPARK-10953) Benchmark codegen vs. hand-written code for univariate statistics

2015-10-12 Thread Xiangrui Meng (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14954331#comment-14954331
 ] 

Xiangrui Meng commented on SPARK-10953:
---

We can run benchmark without #9038. I tested the following on my laptop:

{code}
val n = 1 // 1e8
val rdd = sc.parallelize(0 until n, 4).map(_.toDouble)
rdd.stats()
{code}

{code}
val df = sqlContext.range(0, n, 1, 4)
df.select(stddev_samp(col("id"))).show()
{code}

The RDD version took 2.4s, while the DataFrame version took 10.5s. Then the 
question is whether this is caused by using expressions. I guess so because 
without sub-expression elimination it would be quite expensive to compute 
stddev, and it will be more expensive on higher-order moments. So we can simply 
implement ImperativeAggregate. I guess it will be much faster than the current 
implementation, with or without #9038.

[~JihongMA] Could you verify it?

> Benchmark codegen vs. hand-written code for univariate statistics
> -
>
> Key: SPARK-10953
> URL: https://issues.apache.org/jira/browse/SPARK-10953
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Xiangrui Meng
>Assignee: Jihong MA
>
> I checked the generated code for a simple stddev_pop call:
> {code}
> val df = sqlContext.range(100)
> df.select(stddev_pop(col("id"))).show()
> {code}
> This is the generated code for the merge part, which is very long and 
> complex. I'm not sure whether we can get benefit from the code generation for 
> univariate statistics. We should benchmark it against Scala implementation.
> {code}
> 15/10/06 10:10:57 DEBUG GenerateMutableProjection: code for if 
> (isnull(input[1, DoubleType])) cast(0 as double) else input[1, DoubleType],if 
> (isnull(input[1, DoubleType])) input[6, DoubleType] else if (isnull(input[6, 
> DoubleType])) input[1, DoubleType] else (input[1, DoubleType] + input[6, 
> DoubleType]),if (isnull(input[3, DoubleType])) cast(0 as double) else 
> input[3, DoubleType],if (isnull(input[3, DoubleType])) input[8, DoubleType] 
> else if (isnull(input[8, DoubleType])) input[3, DoubleType] else (((input[3, 
> DoubleType] * input[0, DoubleType]) + (input[8, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType])),if 
> (isnull(input[4, DoubleType])) input[9, DoubleType] else if (isnull(input[9, 
> DoubleType])) input[4, DoubleType] else ((input[4, DoubleType] + input[9, 
> DoubleType]) + input[8, DoubleType] - input[2, DoubleType]) * (input[8, 
> DoubleType] - input[2, DoubleType])) * (input[0, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType]))):
> public Object generate(org.apache.spark.sql.catalyst.expressions.Expression[] 
> expr) {
>   return new SpecificMutableProjection(expr);
> }
> class SpecificMutableProjection extends 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
>   private org.apache.spark.sql.catalyst.expressions.Expression[] expressions;
>   private org.apache.spark.sql.catalyst.expressions.MutableRow mutableRow;
>   public 
> SpecificMutableProjection(org.apache.spark.sql.catalyst.expressions.Expression[]
>  expr) {
> expressions = expr;
> mutableRow = new 
> org.apache.spark.sql.catalyst.expressions.GenericMutableRow(5);
>   }
>   public 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection 
> target(org.apache.spark.sql.catalyst.expressions.MutableRow row) {
> mutableRow = row;
> return this;
>   }
>   /* Provide immutable access to the last projected row. */
>   public InternalRow currentValue() {
> return (InternalRow) mutableRow;
>   }
>   public Object apply(Object _i) {
> InternalRow i = (InternalRow) _i;
> /* if (isnull(input[1, DoubleType])) cast(0 as double) else input[1, 
> DoubleType] */
> /* isnull(input[1, DoubleType]) */
> /* input[1, DoubleType] */
> boolean isNull4 = i.isNullAt(1);
> double primitive5 = isNull4 ? -1.0 : (i.getDouble(1));
> boolean isNull0 = false;
> double primitive1 = -1.0;
> if (!false && isNull4) {
>   /* cast(0 as double) */
>   /* 0 */
>   boolean isNull6 = false;
>   double primitive7 = -1.0;
>   if (!false) {
> primitive7 = (double) 0;
>   }
>   isNull0 = isNull6;
>   primitive1 = primitive7;
> } else {
>   /* input[1, DoubleType] */
>   boolean isNull10 = i.isNullAt(1);
>   double primitive11 = isNull10 ? -1.0 : (i.getDouble(1));
>   isNull0 = isNull10;
>   primitive1 = primitive11;
> }
> if (isNull0) {
>   mutableRow.setNullAt(0);
> } else {
>   mutableRow.setDouble(0, primitive1);
> }
> /* if (isnull(input[1, DoubleType])) input[6, DoubleType] else if 
> (isnull(input[6, DoubleType])) input[1, Dou

[jira] [Commented] (SPARK-10953) Benchmark codegen vs. hand-written code for univariate statistics

2015-10-12 Thread Jihong MA (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14954447#comment-14954447
 ] 

Jihong MA commented on SPARK-10953:
---

had a quick run on my laptop, with a stddev implementation based on 
ImperativeAggregate vs. DeclarativeAggregate, where ImperativeAggregate still 
use SortBasedAggregate at runtime, where DeclarativeAggregate uses 
TungstenAggregate. 

with a single double column of DF (cached): 

#rowsImperativeAggregate DeclarativeAggregate
100 58ms   0.1s
1000   0.4s 0.6s
1 4s 7s

overall it seems ImperativeAggregate perform better.  if enabling 
TungstenAggregate support for ImperativeAggregate is in good shape (PR 9038), I 
will merge them in and have another try.

> Benchmark codegen vs. hand-written code for univariate statistics
> -
>
> Key: SPARK-10953
> URL: https://issues.apache.org/jira/browse/SPARK-10953
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Xiangrui Meng
>Assignee: Jihong MA
>
> I checked the generated code for a simple stddev_pop call:
> {code}
> val df = sqlContext.range(100)
> df.select(stddev_pop(col("id"))).show()
> {code}
> This is the generated code for the merge part, which is very long and 
> complex. I'm not sure whether we can get benefit from the code generation for 
> univariate statistics. We should benchmark it against Scala implementation.
> {code}
> 15/10/06 10:10:57 DEBUG GenerateMutableProjection: code for if 
> (isnull(input[1, DoubleType])) cast(0 as double) else input[1, DoubleType],if 
> (isnull(input[1, DoubleType])) input[6, DoubleType] else if (isnull(input[6, 
> DoubleType])) input[1, DoubleType] else (input[1, DoubleType] + input[6, 
> DoubleType]),if (isnull(input[3, DoubleType])) cast(0 as double) else 
> input[3, DoubleType],if (isnull(input[3, DoubleType])) input[8, DoubleType] 
> else if (isnull(input[8, DoubleType])) input[3, DoubleType] else (((input[3, 
> DoubleType] * input[0, DoubleType]) + (input[8, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType])),if 
> (isnull(input[4, DoubleType])) input[9, DoubleType] else if (isnull(input[9, 
> DoubleType])) input[4, DoubleType] else ((input[4, DoubleType] + input[9, 
> DoubleType]) + input[8, DoubleType] - input[2, DoubleType]) * (input[8, 
> DoubleType] - input[2, DoubleType])) * (input[0, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType]))):
> public Object generate(org.apache.spark.sql.catalyst.expressions.Expression[] 
> expr) {
>   return new SpecificMutableProjection(expr);
> }
> class SpecificMutableProjection extends 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
>   private org.apache.spark.sql.catalyst.expressions.Expression[] expressions;
>   private org.apache.spark.sql.catalyst.expressions.MutableRow mutableRow;
>   public 
> SpecificMutableProjection(org.apache.spark.sql.catalyst.expressions.Expression[]
>  expr) {
> expressions = expr;
> mutableRow = new 
> org.apache.spark.sql.catalyst.expressions.GenericMutableRow(5);
>   }
>   public 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection 
> target(org.apache.spark.sql.catalyst.expressions.MutableRow row) {
> mutableRow = row;
> return this;
>   }
>   /* Provide immutable access to the last projected row. */
>   public InternalRow currentValue() {
> return (InternalRow) mutableRow;
>   }
>   public Object apply(Object _i) {
> InternalRow i = (InternalRow) _i;
> /* if (isnull(input[1, DoubleType])) cast(0 as double) else input[1, 
> DoubleType] */
> /* isnull(input[1, DoubleType]) */
> /* input[1, DoubleType] */
> boolean isNull4 = i.isNullAt(1);
> double primitive5 = isNull4 ? -1.0 : (i.getDouble(1));
> boolean isNull0 = false;
> double primitive1 = -1.0;
> if (!false && isNull4) {
>   /* cast(0 as double) */
>   /* 0 */
>   boolean isNull6 = false;
>   double primitive7 = -1.0;
>   if (!false) {
> primitive7 = (double) 0;
>   }
>   isNull0 = isNull6;
>   primitive1 = primitive7;
> } else {
>   /* input[1, DoubleType] */
>   boolean isNull10 = i.isNullAt(1);
>   double primitive11 = isNull10 ? -1.0 : (i.getDouble(1));
>   isNull0 = isNull10;
>   primitive1 = primitive11;
> }
> if (isNull0) {
>   mutableRow.setNullAt(0);
> } else {
>   mutableRow.setDouble(0, primitive1);
> }
> /* if (isnull(input[1, DoubleType])) input[6, DoubleType] else if 
> (isnull(input[6, DoubleType])) input[1, DoubleType] e

[jira] [Commented] (SPARK-10953) Benchmark codegen vs. hand-written code for univariate statistics

2015-10-14 Thread Jihong MA (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14958386#comment-14958386
 ] 

Jihong MA commented on SPARK-10953:
---

[~mengxr] merged PR9038.  below are the avg elapsed time collected, which is in 
line with what we observed earlier.  Seth has started preparing an 
ImperativeAggregate implementation for SPARK-10641 (skewness, kurtosis)

#rowsImperativeAggregate DeclarativeAggregate
100  90ms0.2s
10000.4s  0.8s
1  4s  7s

> Benchmark codegen vs. hand-written code for univariate statistics
> -
>
> Key: SPARK-10953
> URL: https://issues.apache.org/jira/browse/SPARK-10953
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Xiangrui Meng
>Assignee: Jihong MA
>
> I checked the generated code for a simple stddev_pop call:
> {code}
> val df = sqlContext.range(100)
> df.select(stddev_pop(col("id"))).show()
> {code}
> This is the generated code for the merge part, which is very long and 
> complex. I'm not sure whether we can get benefit from the code generation for 
> univariate statistics. We should benchmark it against Scala implementation.
> {code}
> 15/10/06 10:10:57 DEBUG GenerateMutableProjection: code for if 
> (isnull(input[1, DoubleType])) cast(0 as double) else input[1, DoubleType],if 
> (isnull(input[1, DoubleType])) input[6, DoubleType] else if (isnull(input[6, 
> DoubleType])) input[1, DoubleType] else (input[1, DoubleType] + input[6, 
> DoubleType]),if (isnull(input[3, DoubleType])) cast(0 as double) else 
> input[3, DoubleType],if (isnull(input[3, DoubleType])) input[8, DoubleType] 
> else if (isnull(input[8, DoubleType])) input[3, DoubleType] else (((input[3, 
> DoubleType] * input[0, DoubleType]) + (input[8, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType])),if 
> (isnull(input[4, DoubleType])) input[9, DoubleType] else if (isnull(input[9, 
> DoubleType])) input[4, DoubleType] else ((input[4, DoubleType] + input[9, 
> DoubleType]) + input[8, DoubleType] - input[2, DoubleType]) * (input[8, 
> DoubleType] - input[2, DoubleType])) * (input[0, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType]))):
> public Object generate(org.apache.spark.sql.catalyst.expressions.Expression[] 
> expr) {
>   return new SpecificMutableProjection(expr);
> }
> class SpecificMutableProjection extends 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
>   private org.apache.spark.sql.catalyst.expressions.Expression[] expressions;
>   private org.apache.spark.sql.catalyst.expressions.MutableRow mutableRow;
>   public 
> SpecificMutableProjection(org.apache.spark.sql.catalyst.expressions.Expression[]
>  expr) {
> expressions = expr;
> mutableRow = new 
> org.apache.spark.sql.catalyst.expressions.GenericMutableRow(5);
>   }
>   public 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection 
> target(org.apache.spark.sql.catalyst.expressions.MutableRow row) {
> mutableRow = row;
> return this;
>   }
>   /* Provide immutable access to the last projected row. */
>   public InternalRow currentValue() {
> return (InternalRow) mutableRow;
>   }
>   public Object apply(Object _i) {
> InternalRow i = (InternalRow) _i;
> /* if (isnull(input[1, DoubleType])) cast(0 as double) else input[1, 
> DoubleType] */
> /* isnull(input[1, DoubleType]) */
> /* input[1, DoubleType] */
> boolean isNull4 = i.isNullAt(1);
> double primitive5 = isNull4 ? -1.0 : (i.getDouble(1));
> boolean isNull0 = false;
> double primitive1 = -1.0;
> if (!false && isNull4) {
>   /* cast(0 as double) */
>   /* 0 */
>   boolean isNull6 = false;
>   double primitive7 = -1.0;
>   if (!false) {
> primitive7 = (double) 0;
>   }
>   isNull0 = isNull6;
>   primitive1 = primitive7;
> } else {
>   /* input[1, DoubleType] */
>   boolean isNull10 = i.isNullAt(1);
>   double primitive11 = isNull10 ? -1.0 : (i.getDouble(1));
>   isNull0 = isNull10;
>   primitive1 = primitive11;
> }
> if (isNull0) {
>   mutableRow.setNullAt(0);
> } else {
>   mutableRow.setDouble(0, primitive1);
> }
> /* if (isnull(input[1, DoubleType])) input[6, DoubleType] else if 
> (isnull(input[6, DoubleType])) input[1, DoubleType] else (input[1, 
> DoubleType] + input[6, DoubleType]) */
> /* isnull(input[1, DoubleType]) */
> /* input[1, DoubleType] */
> boolean isNull16 = i.isNullAt(1);
> double primitive17 = isNull16 ? -1.0 : (i.getDouble(1));
> boolean isNull12 = fal

[jira] [Commented] (SPARK-10953) Benchmark codegen vs. hand-written code for univariate statistics

2015-10-16 Thread Xiangrui Meng (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14961209#comment-14961209
 ] 

Xiangrui Meng commented on SPARK-10953:
---

That sounds good. I'm closing this for now since the conclusion is clear.

> Benchmark codegen vs. hand-written code for univariate statistics
> -
>
> Key: SPARK-10953
> URL: https://issues.apache.org/jira/browse/SPARK-10953
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: Xiangrui Meng
>Assignee: Jihong MA
> Fix For: 1.6.0
>
>
> I checked the generated code for a simple stddev_pop call:
> {code}
> val df = sqlContext.range(100)
> df.select(stddev_pop(col("id"))).show()
> {code}
> This is the generated code for the merge part, which is very long and 
> complex. I'm not sure whether we can get benefit from the code generation for 
> univariate statistics. We should benchmark it against Scala implementation.
> {code}
> 15/10/06 10:10:57 DEBUG GenerateMutableProjection: code for if 
> (isnull(input[1, DoubleType])) cast(0 as double) else input[1, DoubleType],if 
> (isnull(input[1, DoubleType])) input[6, DoubleType] else if (isnull(input[6, 
> DoubleType])) input[1, DoubleType] else (input[1, DoubleType] + input[6, 
> DoubleType]),if (isnull(input[3, DoubleType])) cast(0 as double) else 
> input[3, DoubleType],if (isnull(input[3, DoubleType])) input[8, DoubleType] 
> else if (isnull(input[8, DoubleType])) input[3, DoubleType] else (((input[3, 
> DoubleType] * input[0, DoubleType]) + (input[8, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType])),if 
> (isnull(input[4, DoubleType])) input[9, DoubleType] else if (isnull(input[9, 
> DoubleType])) input[4, DoubleType] else ((input[4, DoubleType] + input[9, 
> DoubleType]) + input[8, DoubleType] - input[2, DoubleType]) * (input[8, 
> DoubleType] - input[2, DoubleType])) * (input[0, DoubleType] * input[6, 
> DoubleType])) / (input[0, DoubleType] + input[6, DoubleType]))):
> public Object generate(org.apache.spark.sql.catalyst.expressions.Expression[] 
> expr) {
>   return new SpecificMutableProjection(expr);
> }
> class SpecificMutableProjection extends 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection {
>   private org.apache.spark.sql.catalyst.expressions.Expression[] expressions;
>   private org.apache.spark.sql.catalyst.expressions.MutableRow mutableRow;
>   public 
> SpecificMutableProjection(org.apache.spark.sql.catalyst.expressions.Expression[]
>  expr) {
> expressions = expr;
> mutableRow = new 
> org.apache.spark.sql.catalyst.expressions.GenericMutableRow(5);
>   }
>   public 
> org.apache.spark.sql.catalyst.expressions.codegen.BaseMutableProjection 
> target(org.apache.spark.sql.catalyst.expressions.MutableRow row) {
> mutableRow = row;
> return this;
>   }
>   /* Provide immutable access to the last projected row. */
>   public InternalRow currentValue() {
> return (InternalRow) mutableRow;
>   }
>   public Object apply(Object _i) {
> InternalRow i = (InternalRow) _i;
> /* if (isnull(input[1, DoubleType])) cast(0 as double) else input[1, 
> DoubleType] */
> /* isnull(input[1, DoubleType]) */
> /* input[1, DoubleType] */
> boolean isNull4 = i.isNullAt(1);
> double primitive5 = isNull4 ? -1.0 : (i.getDouble(1));
> boolean isNull0 = false;
> double primitive1 = -1.0;
> if (!false && isNull4) {
>   /* cast(0 as double) */
>   /* 0 */
>   boolean isNull6 = false;
>   double primitive7 = -1.0;
>   if (!false) {
> primitive7 = (double) 0;
>   }
>   isNull0 = isNull6;
>   primitive1 = primitive7;
> } else {
>   /* input[1, DoubleType] */
>   boolean isNull10 = i.isNullAt(1);
>   double primitive11 = isNull10 ? -1.0 : (i.getDouble(1));
>   isNull0 = isNull10;
>   primitive1 = primitive11;
> }
> if (isNull0) {
>   mutableRow.setNullAt(0);
> } else {
>   mutableRow.setDouble(0, primitive1);
> }
> /* if (isnull(input[1, DoubleType])) input[6, DoubleType] else if 
> (isnull(input[6, DoubleType])) input[1, DoubleType] else (input[1, 
> DoubleType] + input[6, DoubleType]) */
> /* isnull(input[1, DoubleType]) */
> /* input[1, DoubleType] */
> boolean isNull16 = i.isNullAt(1);
> double primitive17 = isNull16 ? -1.0 : (i.getDouble(1));
> boolean isNull12 = false;
> double primitive13 = -1.0;
> if (!false && isNull16) {
>   /* input[6, DoubleType] */
>   boolean isNull18 = i.isNullAt(6);
>   double primitive19 = isNull18 ? -1.0 : (i.getDouble(6));
>   isNull12 = isNull18;
>   primitive13 = primitive19;
> } else {
>   /* if (isnull(input[6, DoubleType])) input[1, DoubleType] else