Not sure if this is why but perhaps the constraint framework?

On Tuesday, September 13, 2016, Mendelson, Assaf <assaf.mendel...@rsa.com>
wrote:

> I did, they look the same:
>
>
>
> scala> my_func.explain(true)
>
> == Parsed Logical Plan ==
>
> Filter smaller#3L < 10
>
> +- Project [id#0L AS smaller#3L]
>
>    +- Range (0, 500000000, splits=1)
>
>
>
> == Analyzed Logical Plan ==
>
> smaller: bigint
>
> Filter smaller#3L < 10
>
> +- Project [id#0L AS smaller#3L]
>
>    +- Range (0, 500000000, splits=1)
>
>
>
> == Optimized Logical Plan ==
>
> Filter smaller#3L < 10
>
> +- InMemoryRelation [smaller#3L], true, 10000, StorageLevel(disk, memory,
> deserialized, 1 replicas)
>
>    :  +- *Project [id#0L AS smaller#3L]
>
>    :     +- *Range (0, 500000000, splits=1)
>
>
>
> == Physical Plan ==
>
> *Filter smaller#3L < 10
>
> +- InMemoryTableScan [smaller#3L], [smaller#3L < 10]
>
>    :  +- InMemoryRelation [smaller#3L], true, 10000, StorageLevel(disk,
> memory, deserialized, 1 replicas)
>
>    :     :  +- *Project [id#0L AS smaller#3L]
>
>    :     :     +- *Range (0, 500000000, splits=1)
>
>
>
> scala> base_filter_df.explain(true)
>
> == Parsed Logical Plan ==
>
> 'Filter (smaller#3L < 10)
>
> +- Project [id#0L AS smaller#3L]
>
>    +- Range (0, 500000000, splits=1)
>
>
>
> == Analyzed Logical Plan ==
>
> smaller: bigint
>
> Filter (smaller#3L < cast(10 as bigint))
>
> +- Project [id#0L AS smaller#3L]
>
>    +- Range (0, 500000000, splits=1)
>
>
>
> == Optimized Logical Plan ==
>
> Filter (smaller#3L < 10)
>
> +- InMemoryRelation [smaller#3L], true, 10000, StorageLevel(disk, memory,
> deserialized, 1 replicas)
>
>    :  +- *Project [id#0L AS smaller#3L]
>
>    :     +- *Range (0, 500000000, splits=1)
>
>
>
> == Physical Plan ==
>
> *Filter (smaller#3L < 10)
>
> +- InMemoryTableScan [smaller#3L], [(smaller#3L < 10)]
>
>    :  +- InMemoryRelation [smaller#3L], true, 10000, StorageLevel(disk,
> memory, deserialized, 1 replicas)
>
>    :     :  +- *Project [id#0L AS smaller#3L]
>
>    :     :     +- *Range (0, 500000000, splits=1)
>
>
>
>
>
> Also when I do:
>
>
>
> import org.apache.spark.sql.execution.debug._
>
> df.debugCodegen
>
>
>
> on both of them they are identical.
>
> I did notice that if I change the code to do > instead of < then they have
> almost the same performance so I imagine this has something to do with some
> optimization that understands that range is ordered and therefore once the
> first condition fails, all would fail.
>
> The problem is I don’t see this in the plan, nor can I find it in the code.
>
>
>
>
>
> *From:* Takeshi Yamamuro [mailto:linguin....@gmail.com
> <javascript:_e(%7B%7D,'cvml','linguin....@gmail.com');>]
> *Sent:* Monday, September 12, 2016 7:12 PM
> *To:* Mendelson, Assaf
> *Cc:* dev@spark.apache.org
> <javascript:_e(%7B%7D,'cvml','dev@spark.apache.org');>
> *Subject:* Re: UDF and native functions performance
>
>
>
> Hi,
>
>
>
> I think you'd better off comparing the gen'd code of `df.filter` and your
> gen'd code
>
> by using .debugCodegen().
>
>
>
> // maropu
>
>
>
> On Mon, Sep 12, 2016 at 7:43 PM, assaf.mendelson <assaf.mendel...@rsa.com
> <javascript:_e(%7B%7D,'cvml','assaf.mendel...@rsa.com');>> wrote:
>
> I am trying to create UDFs with improved performance. So I decided to
> compare several ways of doing it.
>
> In general I created a dataframe using range with 50M elements, cached it
> and counted it to manifest it.
>
>
>
> I then implemented a simple predicate (x<10) in 4 different ways, counted
> the elements and timed it.
>
> The 4 ways were:
>
> -          Standard expression (took 90 millisonds)
>
> -          Udf  (took 539 miliseconds)
>
> -          Codegen (took 358 miliseconds)
>
> -          Dataset filter (took 1022 miliseconds)
>
>
>
> I understand why filter is so much slower. I also understand why UDF is
> slower (with volcano model taking up processing time).
>
> I do not understand why the codegen I created is so slow. What am I
> missing?
>
>
>
> The code to generate the numbers is followed:
>
>
>
> import org.apache.spark.sql.codegenFuncs._
>
> val df = spark.range(50000000).withColumnRenamed("id","smaller")
>
> df.cache().count()
>
>
>
> val base_filter_df = df.filter(df("smaller") < 10)
>
>
>
> import org.apache.spark.sql.functions.udf
>
> def asUdf=udf((x: Int) => x < 10)
>
> val udf_filter_df = df.filter(asUdf(df("smaller")))
>
>
>
> val my_func = df.filter(genf_func(df("smaller")))
>
>
>
> case class tmpclass(smaller: BigInt)
>
>
>
> val simpleFilter = df.as[tmpclass].filter((x: tmpclass) => (x.smaller <
> 10))
>
>
>
> def time[R](block: => R) = {
>
>     val t0 = System.nanoTime()
>
>     val result = block    // call-by-name
>
>     val t1 = System.nanoTime()
>
>     (t1 - t0)/1000000
>
> }
>
>
>
> def avgTime[R](block: => R) = {
>
>     val times = for (i <- 1 to 5) yield time(block)
>
>     times.sum / 5
>
> }
>
>
>
>
>
> println("base " + avgTime(base_filter_df.count()))
>
> //>> got a result of 90
>
> println("udf " + avgTime(udf_filter_df.count()))
>
> //>> got a result of 539
>
> println("codegen " + avgTime(my_func.count()))
>
> //>> got a result of 358
>
> println("filter " + avgTime(simpleFilter.count()))
>
> //>> got a result of 1022
>
>
>
> And the code for the genf_func:
>
>
>
> *package *org.apache.spark.sql
>
> *import *org.apache.spark.sql.catalyst.InternalRow
> *import *org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext,
> ExprCode}
> *import *org.apache.spark.sql.types._
> *import *org.apache.spark.sql.catalyst.expressions._
>
> *object *codegenFuncs {
>   *case class *genf(child: Expression) *extends *UnaryExpression *with 
> *Predicate
> *with *ImplicitCastInputTypes {
>
>     *override def *inputTypes: Seq[AbstractDataType] = *Seq*(IntegerType)
>
>     *override def *toString: String = *s"**$*child
>
> * < 10"     **override def *eval(input: InternalRow): Any = {
>       *val *value = child.eval(input)
>       *if *(value == *null*)
>       {
>
> *false       *} *else *{
>         child.dataType *match *{
>           *case *IntegerType => value.asInstanceOf[Int] < 10
>         }
>       }
>     }
>
>     *override def *doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode
> = {
>       defineCodeGen(ctx, ev, c => *s"(**$*c*) < 10"*)
>     }
>   }
>
>   *private def *withExpr(expr: Expression): Column = *Column*(expr)
>
>   *def *genf_func(v: Column): Column = *withExpr *{ *genf*(v.expr) }
> }
>
>
>
>
>
>
> ------------------------------
>
> View this message in context: UDF and native functions performance
> <http://apache-spark-developers-list.1001551.n3.nabble.com/UDF-and-native-functions-performance-tp18920.html>
> Sent from the Apache Spark Developers List mailing list archive
> <http://apache-spark-developers-list.1001551.n3.nabble.com/> at
> Nabble.com.
>
>
>
>
>
> --
>
> ---
> Takeshi Yamamuro
>

Reply via email to