Hi, I think you'd better off comparing the gen'd code of `df.filter` and your gen'd code by using .debugCodegen().
// maropu On Mon, Sep 12, 2016 at 7:43 PM, assaf.mendelson <assaf.mendel...@rsa.com> wrote: > I am trying to create UDFs with improved performance. So I decided to > compare several ways of doing it. > > In general I created a dataframe using range with 50M elements, cached it > and counted it to manifest it. > > > > I then implemented a simple predicate (x<10) in 4 different ways, counted > the elements and timed it. > > The 4 ways were: > > - Standard expression (took 90 millisonds) > > - Udf (took 539 miliseconds) > > - Codegen (took 358 miliseconds) > > - Dataset filter (took 1022 miliseconds) > > > > I understand why filter is so much slower. I also understand why UDF is > slower (with volcano model taking up processing time). > > I do not understand why the codegen I created is so slow. What am I > missing? > > > > The code to generate the numbers is followed: > > > > import org.apache.spark.sql.codegenFuncs._ > > val df = spark.range(50000000).withColumnRenamed("id","smaller") > > df.cache().count() > > > > val base_filter_df = df.filter(df("smaller") < 10) > > > > import org.apache.spark.sql.functions.udf > > def asUdf=udf((x: Int) => x < 10) > > val udf_filter_df = df.filter(asUdf(df("smaller"))) > > > > val my_func = df.filter(genf_func(df("smaller"))) > > > > case class tmpclass(smaller: BigInt) > > > > val simpleFilter = df.as[tmpclass].filter((x: tmpclass) => (x.smaller < > 10)) > > > > def time[R](block: => R) = { > > val t0 = System.nanoTime() > > val result = block // call-by-name > > val t1 = System.nanoTime() > > (t1 - t0)/1000000 > > } > > > > def avgTime[R](block: => R) = { > > val times = for (i <- 1 to 5) yield time(block) > > times.sum / 5 > > } > > > > > > println("base " + avgTime(base_filter_df.count())) > > //>> got a result of 90 > > println("udf " + avgTime(udf_filter_df.count())) > > //>> got a result of 539 > > println("codegen " + avgTime(my_func.count())) > > //>> got a result of 358 > > println("filter " + avgTime(simpleFilter.count())) > > //>> got a result of 1022 > > > > And the code for the genf_func: > > > > *package *org.apache.spark.sql > > *import *org.apache.spark.sql.catalyst.InternalRow > *import *org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, > ExprCode} > *import *org.apache.spark.sql.types._ > *import *org.apache.spark.sql.catalyst.expressions._ > > *object *codegenFuncs { > *case class *genf(child: Expression) *extends *UnaryExpression *with > *Predicate > *with *ImplicitCastInputTypes { > > *override def *inputTypes: Seq[AbstractDataType] = *Seq*(IntegerType) > > *override def *toString: String = *s"**$*child > > * < 10" **override def *eval(input: InternalRow): Any = { > *val *value = child.eval(input) > *if *(value == *null*) > { > > *false *} *else *{ > child.dataType *match *{ > *case *IntegerType => value.asInstanceOf[Int] < 10 > } > } > } > > *override def *doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode > = { > defineCodeGen(ctx, ev, c => *s"(**$*c*) < 10"*) > } > } > > *private def *withExpr(expr: Expression): Column = *Column*(expr) > > *def *genf_func(v: Column): Column = *withExpr *{ *genf*(v.expr) } > } > > > > > > ------------------------------ > View this message in context: UDF and native functions performance > <http://apache-spark-developers-list.1001551.n3.nabble.com/UDF-and-native-functions-performance-tp18920.html> > Sent from the Apache Spark Developers List mailing list archive > <http://apache-spark-developers-list.1001551.n3.nabble.com/> at > Nabble.com. > -- --- Takeshi Yamamuro