Hi,

I think you'd better off comparing the gen'd code of `df.filter` and your
gen'd code
by using .debugCodegen().

// maropu

On Mon, Sep 12, 2016 at 7:43 PM, assaf.mendelson <assaf.mendel...@rsa.com>
wrote:

> I am trying to create UDFs with improved performance. So I decided to
> compare several ways of doing it.
>
> In general I created a dataframe using range with 50M elements, cached it
> and counted it to manifest it.
>
>
>
> I then implemented a simple predicate (x<10) in 4 different ways, counted
> the elements and timed it.
>
> The 4 ways were:
>
> -          Standard expression (took 90 millisonds)
>
> -          Udf  (took 539 miliseconds)
>
> -          Codegen (took 358 miliseconds)
>
> -          Dataset filter (took 1022 miliseconds)
>
>
>
> I understand why filter is so much slower. I also understand why UDF is
> slower (with volcano model taking up processing time).
>
> I do not understand why the codegen I created is so slow. What am I
> missing?
>
>
>
> The code to generate the numbers is followed:
>
>
>
> import org.apache.spark.sql.codegenFuncs._
>
> val df = spark.range(50000000).withColumnRenamed("id","smaller")
>
> df.cache().count()
>
>
>
> val base_filter_df = df.filter(df("smaller") < 10)
>
>
>
> import org.apache.spark.sql.functions.udf
>
> def asUdf=udf((x: Int) => x < 10)
>
> val udf_filter_df = df.filter(asUdf(df("smaller")))
>
>
>
> val my_func = df.filter(genf_func(df("smaller")))
>
>
>
> case class tmpclass(smaller: BigInt)
>
>
>
> val simpleFilter = df.as[tmpclass].filter((x: tmpclass) => (x.smaller <
> 10))
>
>
>
> def time[R](block: => R) = {
>
>     val t0 = System.nanoTime()
>
>     val result = block    // call-by-name
>
>     val t1 = System.nanoTime()
>
>     (t1 - t0)/1000000
>
> }
>
>
>
> def avgTime[R](block: => R) = {
>
>     val times = for (i <- 1 to 5) yield time(block)
>
>     times.sum / 5
>
> }
>
>
>
>
>
> println("base " + avgTime(base_filter_df.count()))
>
> //>> got a result of 90
>
> println("udf " + avgTime(udf_filter_df.count()))
>
> //>> got a result of 539
>
> println("codegen " + avgTime(my_func.count()))
>
> //>> got a result of 358
>
> println("filter " + avgTime(simpleFilter.count()))
>
> //>> got a result of 1022
>
>
>
> And the code for the genf_func:
>
>
>
> *package *org.apache.spark.sql
>
> *import *org.apache.spark.sql.catalyst.InternalRow
> *import *org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext,
> ExprCode}
> *import *org.apache.spark.sql.types._
> *import *org.apache.spark.sql.catalyst.expressions._
>
> *object *codegenFuncs {
>   *case class *genf(child: Expression) *extends *UnaryExpression *with 
> *Predicate
> *with *ImplicitCastInputTypes {
>
>     *override def *inputTypes: Seq[AbstractDataType] = *Seq*(IntegerType)
>
>     *override def *toString: String = *s"**$*child
>
> * < 10"     **override def *eval(input: InternalRow): Any = {
>       *val *value = child.eval(input)
>       *if *(value == *null*)
>       {
>
> *false       *} *else *{
>         child.dataType *match *{
>           *case *IntegerType => value.asInstanceOf[Int] < 10
>         }
>       }
>     }
>
>     *override def *doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode
> = {
>       defineCodeGen(ctx, ev, c => *s"(**$*c*) < 10"*)
>     }
>   }
>
>   *private def *withExpr(expr: Expression): Column = *Column*(expr)
>
>   *def *genf_func(v: Column): Column = *withExpr *{ *genf*(v.expr) }
> }
>
>
>
>
>
> ------------------------------
> View this message in context: UDF and native functions performance
> <http://apache-spark-developers-list.1001551.n3.nabble.com/UDF-and-native-functions-performance-tp18920.html>
> Sent from the Apache Spark Developers List mailing list archive
> <http://apache-spark-developers-list.1001551.n3.nabble.com/> at
> Nabble.com.
>



-- 
---
Takeshi Yamamuro

Reply via email to