I am trying to create UDFs with improved performance. So I decided to compare
several ways of doing it.
In general I created a dataframe using range with 50M elements, cached it and
counted it to manifest it.
I then implemented a simple predicate (x<10) in 4 different ways, counted the
elements and timed it.
The 4 ways were:
- Standard expression (took 90 millisonds)
- Udf (took 539 miliseconds)
- Codegen (took 358 miliseconds)
- Dataset filter (took 1022 miliseconds)
I understand why filter is so much slower. I also understand why UDF is slower
(with volcano model taking up processing time).
I do not understand why the codegen I created is so slow. What am I missing?
The code to generate the numbers is followed:
import org.apache.spark.sql.codegenFuncs._
val df = spark.range(50000000).withColumnRenamed("id","smaller")
df.cache().count()
val base_filter_df = df.filter(df("smaller") < 10)
import org.apache.spark.sql.functions.udf
def asUdf=udf((x: Int) => x < 10)
val udf_filter_df = df.filter(asUdf(df("smaller")))
val my_func = df.filter(genf_func(df("smaller")))
case class tmpclass(smaller: BigInt)
val simpleFilter = df.as[tmpclass].filter((x: tmpclass) => (x.smaller < 10))
def time[R](block: => R) = {
val t0 = System.nanoTime()
val result = block // call-by-name
val t1 = System.nanoTime()
(t1 - t0)/1000000
}
def avgTime[R](block: => R) = {
val times = for (i <- 1 to 5) yield time(block)
times.sum / 5
}
println("base " + avgTime(base_filter_df.count()))
//>> got a result of 90
println("udf " + avgTime(udf_filter_df.count()))
//>> got a result of 539
println("codegen " + avgTime(my_func.count()))
//>> got a result of 358
println("filter " + avgTime(simpleFilter.count()))
//>> got a result of 1022
And the code for the genf_func:
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext,
ExprCode}
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.expressions._
object codegenFuncs {
case class genf(child: Expression) extends UnaryExpression with Predicate
with ImplicitCastInputTypes {
override def inputTypes: Seq[AbstractDataType] = Seq(IntegerType)
override def toString: String = s"$child < 10"
override def eval(input: InternalRow): Any = {
val value = child.eval(input)
if (value == null)
{
false
} else {
child.dataType match {
case IntegerType => value.asInstanceOf[Int] < 10
}
}
}
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
defineCodeGen(ctx, ev, c => s"($c) < 10")
}
}
private def withExpr(expr: Expression): Column = Column(expr)
def genf_func(v: Column): Column = withExpr { genf(v.expr) }
}
--
View this message in context:
http://apache-spark-developers-list.1001551.n3.nabble.com/UDF-and-native-functions-performance-tp18920.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.