RE: UDF and native functions performance

2016-09-12 Thread assaf.mendelson
What is the constraint framework?
How would I add the same optimization to the sample function I created?


From: rxin [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n18932...@n3.nabble.com]
Sent: Tuesday, September 13, 2016 3:37 AM
To: Mendelson, Assaf
Subject: Re: UDF and native functions performance

Not sure if this is why but perhaps the constraint framework?

On Tuesday, September 13, 2016, Mendelson, Assaf <[hidden 
email]> wrote:
I did, they look the same:

scala> my_func.explain(true)
== Parsed Logical Plan ==
Filter smaller#3L < 10
+- Project [id#0L AS smaller#3L]
   +- Range (0, 5, splits=1)

== Analyzed Logical Plan ==
smaller: bigint
Filter smaller#3L < 10
+- Project [id#0L AS smaller#3L]
   +- Range (0, 5, splits=1)

== Optimized Logical Plan ==
Filter smaller#3L < 10
+- InMemoryRelation [smaller#3L], true, 1, StorageLevel(disk, memory, 
deserialized, 1 replicas)
   :  +- *Project [id#0L AS smaller#3L]
   : +- *Range (0, 5, splits=1)

== Physical Plan ==
*Filter smaller#3L < 10
+- InMemoryTableScan [smaller#3L], [smaller#3L < 10]
   :  +- InMemoryRelation [smaller#3L], true, 1, StorageLevel(disk, memory, 
deserialized, 1 replicas)
   : :  +- *Project [id#0L AS smaller#3L]
   : : +- *Range (0, 5, splits=1)

scala> base_filter_df.explain(true)
== Parsed Logical Plan ==
'Filter (smaller#3L < 10)
+- Project [id#0L AS smaller#3L]
   +- Range (0, 5, splits=1)

== Analyzed Logical Plan ==
smaller: bigint
Filter (smaller#3L < cast(10 as bigint))
+- Project [id#0L AS smaller#3L]
   +- Range (0, 5, splits=1)

== Optimized Logical Plan ==
Filter (smaller#3L < 10)
+- InMemoryRelation [smaller#3L], true, 1, StorageLevel(disk, memory, 
deserialized, 1 replicas)
   :  +- *Project [id#0L AS smaller#3L]
   : +- *Range (0, 5, splits=1)

== Physical Plan ==
*Filter (smaller#3L < 10)
+- InMemoryTableScan [smaller#3L], [(smaller#3L < 10)]
   :  +- InMemoryRelation [smaller#3L], true, 1, StorageLevel(disk, memory, 
deserialized, 1 replicas)
   : :  +- *Project [id#0L AS smaller#3L]
   : : +- *Range (0, 5, splits=1)


Also when I do:

import org.apache.spark.sql.execution.debug._
df.debugCodegen

on both of them they are identical.
I did notice that if I change the code to do > instead of < then they have 
almost the same performance so I imagine this has something to do with some 
optimization that understands that range is ordered and therefore once the 
first condition fails, all would fail.
The problem is I don’t see this in the plan, nor can I find it in the code.


From: Takeshi Yamamuro [mailto:linguin.m.s@<mailto:linguin.m.s@>...]
Sent: Monday, September 12, 2016 7:12 PM
To: Mendelson, Assaf
Cc: dev@...
Subject: Re: UDF and native functions performance

Hi,

I think you'd better off comparing the gen'd code of `df.filter` and your gen'd 
code
by using .debugCodegen().

// maropu

On Mon, Sep 12, 2016 at 7:43 PM, assaf.mendelson <assaf.mendelson@<mailto:assaf.mendelson@>...> wrote:
I am trying to create UDFs with improved performance. So I decided to compare 
several ways of doing it.
In general I created a dataframe using range with 50M elements, cached it and 
counted it to manifest it.

I then implemented a simple predicate (x<10) in 4 different ways, counted the 
elements and timed it.
The 4 ways were:

-  Standard expression (took 90 millisonds)

-  Udf  (took 539 miliseconds)

-  Codegen (took 358 miliseconds)

-  Dataset filter (took 1022 miliseconds)

I understand why filter is so much slower. I also understand why UDF is slower 
(with volcano model taking up processing time).
I do not understand why the codegen I created is so slow. What am I missing?

The code to generate the numbers is followed:

import org.apache.spark.sql.codegenFuncs._
val df = spark.range(5000).withColumnRenamed("id","smaller")
df.cache().count()

val base_filter_df = df.filter(df("smaller") < 10)

import org.apache.spark.sql.functions.udf
def asUdf=udf((x: Int) => x < 10)
val udf_filter_df = df.filter(asUdf(df("smaller")))

val my_func = df.filter(genf_func(df("smaller")))

case class tmpclass(smaller: BigInt)

val simpleFilter = df.as<http://df.as>[tmpclass].filter((x: tmpclass) => 
(x.smaller < 10))

def time[R](block: => R) = {
val t0 = System.nanoTime()
val result = block// call-by-name
val t1 = System.nanoTime()
(t1 - t0)/100
}

def avgTime[R](block: => R) = {
val times = for (i <- 1 to 5) yield time(block)
times.sum / 5
}


println("base " + avgTime(base_filter_df.count()))
//>> got a result of 90
println("udf " + avgTime(udf_filter_df.count()))
//>> got a result of 539
println("codegen " + avgTime(my_func.count()))
//>> got a result

Re: UDF and native functions performance

2016-09-12 Thread Reynold Xin
Not sure if this is why but perhaps the constraint framework?

On Tuesday, September 13, 2016, Mendelson, Assaf <assaf.mendel...@rsa.com>
wrote:

> I did, they look the same:
>
>
>
> scala> my_func.explain(true)
>
> == Parsed Logical Plan ==
>
> Filter smaller#3L < 10
>
> +- Project [id#0L AS smaller#3L]
>
>+- Range (0, 5, splits=1)
>
>
>
> == Analyzed Logical Plan ==
>
> smaller: bigint
>
> Filter smaller#3L < 10
>
> +- Project [id#0L AS smaller#3L]
>
>+- Range (0, 5, splits=1)
>
>
>
> == Optimized Logical Plan ==
>
> Filter smaller#3L < 10
>
> +- InMemoryRelation [smaller#3L], true, 1, StorageLevel(disk, memory,
> deserialized, 1 replicas)
>
>:  +- *Project [id#0L AS smaller#3L]
>
>: +- *Range (0, 5, splits=1)
>
>
>
> == Physical Plan ==
>
> *Filter smaller#3L < 10
>
> +- InMemoryTableScan [smaller#3L], [smaller#3L < 10]
>
>:  +- InMemoryRelation [smaller#3L], true, 1, StorageLevel(disk,
> memory, deserialized, 1 replicas)
>
>: :  +- *Project [id#0L AS smaller#3L]
>
>: : +- *Range (0, 5, splits=1)
>
>
>
> scala> base_filter_df.explain(true)
>
> == Parsed Logical Plan ==
>
> 'Filter (smaller#3L < 10)
>
> +- Project [id#0L AS smaller#3L]
>
>+- Range (0, 5, splits=1)
>
>
>
> == Analyzed Logical Plan ==
>
> smaller: bigint
>
> Filter (smaller#3L < cast(10 as bigint))
>
> +- Project [id#0L AS smaller#3L]
>
>+- Range (0, 5, splits=1)
>
>
>
> == Optimized Logical Plan ==
>
> Filter (smaller#3L < 10)
>
> +- InMemoryRelation [smaller#3L], true, 1, StorageLevel(disk, memory,
> deserialized, 1 replicas)
>
>:  +- *Project [id#0L AS smaller#3L]
>
>: +- *Range (0, 5, splits=1)
>
>
>
> == Physical Plan ==
>
> *Filter (smaller#3L < 10)
>
> +- InMemoryTableScan [smaller#3L], [(smaller#3L < 10)]
>
>:  +- InMemoryRelation [smaller#3L], true, 1, StorageLevel(disk,
> memory, deserialized, 1 replicas)
>
>: :  +- *Project [id#0L AS smaller#3L]
>
>: : +- *Range (0, 5, splits=1)
>
>
>
>
>
> Also when I do:
>
>
>
> import org.apache.spark.sql.execution.debug._
>
> df.debugCodegen
>
>
>
> on both of them they are identical.
>
> I did notice that if I change the code to do > instead of < then they have
> almost the same performance so I imagine this has something to do with some
> optimization that understands that range is ordered and therefore once the
> first condition fails, all would fail.
>
> The problem is I don’t see this in the plan, nor can I find it in the code.
>
>
>
>
>
> *From:* Takeshi Yamamuro [mailto:linguin@gmail.com
> <javascript:_e(%7B%7D,'cvml','linguin@gmail.com');>]
> *Sent:* Monday, September 12, 2016 7:12 PM
> *To:* Mendelson, Assaf
> *Cc:* dev@spark.apache.org
> <javascript:_e(%7B%7D,'cvml','dev@spark.apache.org');>
> *Subject:* Re: UDF and native functions performance
>
>
>
> Hi,
>
>
>
> I think you'd better off comparing the gen'd code of `df.filter` and your
> gen'd code
>
> by using .debugCodegen().
>
>
>
> // maropu
>
>
>
> On Mon, Sep 12, 2016 at 7:43 PM, assaf.mendelson <assaf.mendel...@rsa.com
> <javascript:_e(%7B%7D,'cvml','assaf.mendel...@rsa.com');>> wrote:
>
> I am trying to create UDFs with improved performance. So I decided to
> compare several ways of doing it.
>
> In general I created a dataframe using range with 50M elements, cached it
> and counted it to manifest it.
>
>
>
> I then implemented a simple predicate (x<10) in 4 different ways, counted
> the elements and timed it.
>
> The 4 ways were:
>
> -  Standard expression (took 90 millisonds)
>
> -  Udf  (took 539 miliseconds)
>
> -  Codegen (took 358 miliseconds)
>
> -  Dataset filter (took 1022 miliseconds)
>
>
>
> I understand why filter is so much slower. I also understand why UDF is
> slower (with volcano model taking up processing time).
>
> I do not understand why the codegen I created is so slow. What am I
> missing?
>
>
>
> The code to generate the numbers is followed:
>
>
>
> import org.apache.spark.sql.codegenFuncs._
>
> val df = spark.range(5000).withColumnRenamed("id","smaller")
>
> df.cache().count()
>
>
>
> val base_filter_df = df.filter(df("smaller") < 10)
>
>
>
> import org.apache.s

RE: UDF and native functions performance

2016-09-12 Thread Mendelson, Assaf
I did, they look the same:

scala> my_func.explain(true)
== Parsed Logical Plan ==
Filter smaller#3L < 10
+- Project [id#0L AS smaller#3L]
   +- Range (0, 5, splits=1)

== Analyzed Logical Plan ==
smaller: bigint
Filter smaller#3L < 10
+- Project [id#0L AS smaller#3L]
   +- Range (0, 5, splits=1)

== Optimized Logical Plan ==
Filter smaller#3L < 10
+- InMemoryRelation [smaller#3L], true, 1, StorageLevel(disk, memory, 
deserialized, 1 replicas)
   :  +- *Project [id#0L AS smaller#3L]
   : +- *Range (0, 5, splits=1)

== Physical Plan ==
*Filter smaller#3L < 10
+- InMemoryTableScan [smaller#3L], [smaller#3L < 10]
   :  +- InMemoryRelation [smaller#3L], true, 1, StorageLevel(disk, memory, 
deserialized, 1 replicas)
   : :  +- *Project [id#0L AS smaller#3L]
   : : +- *Range (0, 5, splits=1)

scala> base_filter_df.explain(true)
== Parsed Logical Plan ==
'Filter (smaller#3L < 10)
+- Project [id#0L AS smaller#3L]
   +- Range (0, 5, splits=1)

== Analyzed Logical Plan ==
smaller: bigint
Filter (smaller#3L < cast(10 as bigint))
+- Project [id#0L AS smaller#3L]
   +- Range (0, 5, splits=1)

== Optimized Logical Plan ==
Filter (smaller#3L < 10)
+- InMemoryRelation [smaller#3L], true, 1, StorageLevel(disk, memory, 
deserialized, 1 replicas)
   :  +- *Project [id#0L AS smaller#3L]
   : +- *Range (0, 5, splits=1)

== Physical Plan ==
*Filter (smaller#3L < 10)
+- InMemoryTableScan [smaller#3L], [(smaller#3L < 10)]
   :  +- InMemoryRelation [smaller#3L], true, 1, StorageLevel(disk, memory, 
deserialized, 1 replicas)
   : :  +- *Project [id#0L AS smaller#3L]
   : : +- *Range (0, 5, splits=1)


Also when I do:

import org.apache.spark.sql.execution.debug._
df.debugCodegen

on both of them they are identical.
I did notice that if I change the code to do > instead of < then they have 
almost the same performance so I imagine this has something to do with some 
optimization that understands that range is ordered and therefore once the 
first condition fails, all would fail.
The problem is I don’t see this in the plan, nor can I find it in the code.


From: Takeshi Yamamuro [mailto:linguin@gmail.com]
Sent: Monday, September 12, 2016 7:12 PM
To: Mendelson, Assaf
Cc: dev@spark.apache.org
Subject: Re: UDF and native functions performance

Hi,

I think you'd better off comparing the gen'd code of `df.filter` and your gen'd 
code
by using .debugCodegen().

// maropu

On Mon, Sep 12, 2016 at 7:43 PM, assaf.mendelson 
<assaf.mendel...@rsa.com<mailto:assaf.mendel...@rsa.com>> wrote:
I am trying to create UDFs with improved performance. So I decided to compare 
several ways of doing it.
In general I created a dataframe using range with 50M elements, cached it and 
counted it to manifest it.

I then implemented a simple predicate (x<10) in 4 different ways, counted the 
elements and timed it.
The 4 ways were:

-  Standard expression (took 90 millisonds)

-  Udf  (took 539 miliseconds)

-  Codegen (took 358 miliseconds)

-  Dataset filter (took 1022 miliseconds)

I understand why filter is so much slower. I also understand why UDF is slower 
(with volcano model taking up processing time).
I do not understand why the codegen I created is so slow. What am I missing?

The code to generate the numbers is followed:

import org.apache.spark.sql.codegenFuncs._
val df = spark.range(5000).withColumnRenamed("id","smaller")
df.cache().count()

val base_filter_df = df.filter(df("smaller") < 10)

import org.apache.spark.sql.functions.udf
def asUdf=udf((x: Int) => x < 10)
val udf_filter_df = df.filter(asUdf(df("smaller")))

val my_func = df.filter(genf_func(df("smaller")))

case class tmpclass(smaller: BigInt)

val simpleFilter = df.as<http://df.as>[tmpclass].filter((x: tmpclass) => 
(x.smaller < 10))

def time[R](block: => R) = {
val t0 = System.nanoTime()
val result = block// call-by-name
val t1 = System.nanoTime()
(t1 - t0)/100
}

def avgTime[R](block: => R) = {
val times = for (i <- 1 to 5) yield time(block)
times.sum / 5
}


println("base " + avgTime(base_filter_df.count()))
//>> got a result of 90
println("udf " + avgTime(udf_filter_df.count()))
//>> got a result of 539
println("codegen " + avgTime(my_func.count()))
//>> got a result of 358
println("filter " + avgTime(simpleFilter.count()))
//>> got a result of 1022

And the code for the genf_func:

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.expressions._

object codegenFuncs {
  case class genf(child: Expression) extends UnaryE

Re: UDF and native functions performance

2016-09-12 Thread Takeshi Yamamuro
Hi,

I think you'd better off comparing the gen'd code of `df.filter` and your
gen'd code
by using .debugCodegen().

// maropu

On Mon, Sep 12, 2016 at 7:43 PM, assaf.mendelson 
wrote:

> I am trying to create UDFs with improved performance. So I decided to
> compare several ways of doing it.
>
> In general I created a dataframe using range with 50M elements, cached it
> and counted it to manifest it.
>
>
>
> I then implemented a simple predicate (x<10) in 4 different ways, counted
> the elements and timed it.
>
> The 4 ways were:
>
> -  Standard expression (took 90 millisonds)
>
> -  Udf  (took 539 miliseconds)
>
> -  Codegen (took 358 miliseconds)
>
> -  Dataset filter (took 1022 miliseconds)
>
>
>
> I understand why filter is so much slower. I also understand why UDF is
> slower (with volcano model taking up processing time).
>
> I do not understand why the codegen I created is so slow. What am I
> missing?
>
>
>
> The code to generate the numbers is followed:
>
>
>
> import org.apache.spark.sql.codegenFuncs._
>
> val df = spark.range(5000).withColumnRenamed("id","smaller")
>
> df.cache().count()
>
>
>
> val base_filter_df = df.filter(df("smaller") < 10)
>
>
>
> import org.apache.spark.sql.functions.udf
>
> def asUdf=udf((x: Int) => x < 10)
>
> val udf_filter_df = df.filter(asUdf(df("smaller")))
>
>
>
> val my_func = df.filter(genf_func(df("smaller")))
>
>
>
> case class tmpclass(smaller: BigInt)
>
>
>
> val simpleFilter = df.as[tmpclass].filter((x: tmpclass) => (x.smaller <
> 10))
>
>
>
> def time[R](block: => R) = {
>
> val t0 = System.nanoTime()
>
> val result = block// call-by-name
>
> val t1 = System.nanoTime()
>
> (t1 - t0)/100
>
> }
>
>
>
> def avgTime[R](block: => R) = {
>
> val times = for (i <- 1 to 5) yield time(block)
>
> times.sum / 5
>
> }
>
>
>
>
>
> println("base " + avgTime(base_filter_df.count()))
>
> //>> got a result of 90
>
> println("udf " + avgTime(udf_filter_df.count()))
>
> //>> got a result of 539
>
> println("codegen " + avgTime(my_func.count()))
>
> //>> got a result of 358
>
> println("filter " + avgTime(simpleFilter.count()))
>
> //>> got a result of 1022
>
>
>
> And the code for the genf_func:
>
>
>
> *package *org.apache.spark.sql
>
> *import *org.apache.spark.sql.catalyst.InternalRow
> *import *org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext,
> ExprCode}
> *import *org.apache.spark.sql.types._
> *import *org.apache.spark.sql.catalyst.expressions._
>
> *object *codegenFuncs {
>   *case class *genf(child: Expression) *extends *UnaryExpression *with 
> *Predicate
> *with *ImplicitCastInputTypes {
>
> *override def *inputTypes: Seq[AbstractDataType] = *Seq*(IntegerType)
>
> *override def *toString: String = *s"**$*child
>
> * < 10" **override def *eval(input: InternalRow): Any = {
>   *val *value = child.eval(input)
>   *if *(value == *null*)
>   {
>
> *false   *} *else *{
> child.dataType *match *{
>   *case *IntegerType => value.asInstanceOf[Int] < 10
> }
>   }
> }
>
> *override def *doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode
> = {
>   defineCodeGen(ctx, ev, c => *s"(**$*c*) < 10"*)
> }
>   }
>
>   *private def *withExpr(expr: Expression): Column = *Column*(expr)
>
>   *def *genf_func(v: Column): Column = *withExpr *{ *genf*(v.expr) }
> }
>
>
>
>
>
> --
> View this message in context: UDF and native functions performance
> 
> Sent from the Apache Spark Developers List mailing list archive
>  at
> Nabble.com.
>



-- 
---
Takeshi Yamamuro