Not sure if this is why but perhaps the constraint framework? On Tuesday, September 13, 2016, Mendelson, Assaf <assaf.mendel...@rsa.com> wrote:
> I did, they look the same: > > > > scala> my_func.explain(true) > > == Parsed Logical Plan == > > Filter smaller#3L < 10 > > +- Project [id#0L AS smaller#3L] > > +- Range (0, 500000000, splits=1) > > > > == Analyzed Logical Plan == > > smaller: bigint > > Filter smaller#3L < 10 > > +- Project [id#0L AS smaller#3L] > > +- Range (0, 500000000, splits=1) > > > > == Optimized Logical Plan == > > Filter smaller#3L < 10 > > +- InMemoryRelation [smaller#3L], true, 10000, StorageLevel(disk, memory, > deserialized, 1 replicas) > > : +- *Project [id#0L AS smaller#3L] > > : +- *Range (0, 500000000, splits=1) > > > > == Physical Plan == > > *Filter smaller#3L < 10 > > +- InMemoryTableScan [smaller#3L], [smaller#3L < 10] > > : +- InMemoryRelation [smaller#3L], true, 10000, StorageLevel(disk, > memory, deserialized, 1 replicas) > > : : +- *Project [id#0L AS smaller#3L] > > : : +- *Range (0, 500000000, splits=1) > > > > scala> base_filter_df.explain(true) > > == Parsed Logical Plan == > > 'Filter (smaller#3L < 10) > > +- Project [id#0L AS smaller#3L] > > +- Range (0, 500000000, splits=1) > > > > == Analyzed Logical Plan == > > smaller: bigint > > Filter (smaller#3L < cast(10 as bigint)) > > +- Project [id#0L AS smaller#3L] > > +- Range (0, 500000000, splits=1) > > > > == Optimized Logical Plan == > > Filter (smaller#3L < 10) > > +- InMemoryRelation [smaller#3L], true, 10000, StorageLevel(disk, memory, > deserialized, 1 replicas) > > : +- *Project [id#0L AS smaller#3L] > > : +- *Range (0, 500000000, splits=1) > > > > == Physical Plan == > > *Filter (smaller#3L < 10) > > +- InMemoryTableScan [smaller#3L], [(smaller#3L < 10)] > > : +- InMemoryRelation [smaller#3L], true, 10000, StorageLevel(disk, > memory, deserialized, 1 replicas) > > : : +- *Project [id#0L AS smaller#3L] > > : : +- *Range (0, 500000000, splits=1) > > > > > > Also when I do: > > > > import org.apache.spark.sql.execution.debug._ > > df.debugCodegen > > > > on both of them they are identical. > > I did notice that if I change the code to do > instead of < then they have > almost the same performance so I imagine this has something to do with some > optimization that understands that range is ordered and therefore once the > first condition fails, all would fail. > > The problem is I don’t see this in the plan, nor can I find it in the code. > > > > > > *From:* Takeshi Yamamuro [mailto:linguin....@gmail.com > <javascript:_e(%7B%7D,'cvml','linguin....@gmail.com');>] > *Sent:* Monday, September 12, 2016 7:12 PM > *To:* Mendelson, Assaf > *Cc:* dev@spark.apache.org > <javascript:_e(%7B%7D,'cvml','dev@spark.apache.org');> > *Subject:* Re: UDF and native functions performance > > > > Hi, > > > > I think you'd better off comparing the gen'd code of `df.filter` and your > gen'd code > > by using .debugCodegen(). > > > > // maropu > > > > On Mon, Sep 12, 2016 at 7:43 PM, assaf.mendelson <assaf.mendel...@rsa.com > <javascript:_e(%7B%7D,'cvml','assaf.mendel...@rsa.com');>> wrote: > > I am trying to create UDFs with improved performance. So I decided to > compare several ways of doing it. > > In general I created a dataframe using range with 50M elements, cached it > and counted it to manifest it. > > > > I then implemented a simple predicate (x<10) in 4 different ways, counted > the elements and timed it. > > The 4 ways were: > > - Standard expression (took 90 millisonds) > > - Udf (took 539 miliseconds) > > - Codegen (took 358 miliseconds) > > - Dataset filter (took 1022 miliseconds) > > > > I understand why filter is so much slower. I also understand why UDF is > slower (with volcano model taking up processing time). > > I do not understand why the codegen I created is so slow. What am I > missing? > > > > The code to generate the numbers is followed: > > > > import org.apache.spark.sql.codegenFuncs._ > > val df = spark.range(50000000).withColumnRenamed("id","smaller") > > df.cache().count() > > > > val base_filter_df = df.filter(df("smaller") < 10) > > > > import org.apache.spark.sql.functions.udf > > def asUdf=udf((x: Int) => x < 10) > > val udf_filter_df = df.filter(asUdf(df("smaller"))) > > > > val my_func = df.filter(genf_func(df("smaller"))) > > > > case class tmpclass(smaller: BigInt) > > > > val simpleFilter = df.as[tmpclass].filter((x: tmpclass) => (x.smaller < > 10)) > > > > def time[R](block: => R) = { > > val t0 = System.nanoTime() > > val result = block // call-by-name > > val t1 = System.nanoTime() > > (t1 - t0)/1000000 > > } > > > > def avgTime[R](block: => R) = { > > val times = for (i <- 1 to 5) yield time(block) > > times.sum / 5 > > } > > > > > > println("base " + avgTime(base_filter_df.count())) > > //>> got a result of 90 > > println("udf " + avgTime(udf_filter_df.count())) > > //>> got a result of 539 > > println("codegen " + avgTime(my_func.count())) > > //>> got a result of 358 > > println("filter " + avgTime(simpleFilter.count())) > > //>> got a result of 1022 > > > > And the code for the genf_func: > > > > *package *org.apache.spark.sql > > *import *org.apache.spark.sql.catalyst.InternalRow > *import *org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, > ExprCode} > *import *org.apache.spark.sql.types._ > *import *org.apache.spark.sql.catalyst.expressions._ > > *object *codegenFuncs { > *case class *genf(child: Expression) *extends *UnaryExpression *with > *Predicate > *with *ImplicitCastInputTypes { > > *override def *inputTypes: Seq[AbstractDataType] = *Seq*(IntegerType) > > *override def *toString: String = *s"**$*child > > * < 10" **override def *eval(input: InternalRow): Any = { > *val *value = child.eval(input) > *if *(value == *null*) > { > > *false *} *else *{ > child.dataType *match *{ > *case *IntegerType => value.asInstanceOf[Int] < 10 > } > } > } > > *override def *doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode > = { > defineCodeGen(ctx, ev, c => *s"(**$*c*) < 10"*) > } > } > > *private def *withExpr(expr: Expression): Column = *Column*(expr) > > *def *genf_func(v: Column): Column = *withExpr *{ *genf*(v.expr) } > } > > > > > > > ------------------------------ > > View this message in context: UDF and native functions performance > <http://apache-spark-developers-list.1001551.n3.nabble.com/UDF-and-native-functions-performance-tp18920.html> > Sent from the Apache Spark Developers List mailing list archive > <http://apache-spark-developers-list.1001551.n3.nabble.com/> at > Nabble.com. > > > > > > -- > > --- > Takeshi Yamamuro >