What is the constraint framework? How would I add the same optimization to the sample function I created?
From: rxin [via Apache Spark Developers List] [mailto:ml-node+s1001551n18932...@n3.nabble.com] Sent: Tuesday, September 13, 2016 3:37 AM To: Mendelson, Assaf Subject: Re: UDF and native functions performance Not sure if this is why but perhaps the constraint framework? On Tuesday, September 13, 2016, Mendelson, Assaf <[hidden email]</user/SendEmail.jtp?type=node&node=18932&i=0>> wrote: I did, they look the same: scala> my_func.explain(true) == Parsed Logical Plan == Filter smaller#3L < 10 +- Project [id#0L AS smaller#3L] +- Range (0, 500000000, splits=1) == Analyzed Logical Plan == smaller: bigint Filter smaller#3L < 10 +- Project [id#0L AS smaller#3L] +- Range (0, 500000000, splits=1) == Optimized Logical Plan == Filter smaller#3L < 10 +- InMemoryRelation [smaller#3L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) : +- *Project [id#0L AS smaller#3L] : +- *Range (0, 500000000, splits=1) == Physical Plan == *Filter smaller#3L < 10 +- InMemoryTableScan [smaller#3L], [smaller#3L < 10] : +- InMemoryRelation [smaller#3L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) : : +- *Project [id#0L AS smaller#3L] : : +- *Range (0, 500000000, splits=1) scala> base_filter_df.explain(true) == Parsed Logical Plan == 'Filter (smaller#3L < 10) +- Project [id#0L AS smaller#3L] +- Range (0, 500000000, splits=1) == Analyzed Logical Plan == smaller: bigint Filter (smaller#3L < cast(10 as bigint)) +- Project [id#0L AS smaller#3L] +- Range (0, 500000000, splits=1) == Optimized Logical Plan == Filter (smaller#3L < 10) +- InMemoryRelation [smaller#3L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) : +- *Project [id#0L AS smaller#3L] : +- *Range (0, 500000000, splits=1) == Physical Plan == *Filter (smaller#3L < 10) +- InMemoryTableScan [smaller#3L], [(smaller#3L < 10)] : +- InMemoryRelation [smaller#3L], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) : : +- *Project [id#0L AS smaller#3L] : : +- *Range (0, 500000000, splits=1) Also when I do: import org.apache.spark.sql.execution.debug._ df.debugCodegen on both of them they are identical. I did notice that if I change the code to do > instead of < then they have almost the same performance so I imagine this has something to do with some optimization that understands that range is ordered and therefore once the first condition fails, all would fail. The problem is I don’t see this in the plan, nor can I find it in the code. From: Takeshi Yamamuro [mailto:<a href="javascript:_e(%7B%7D,'cvml','linguin....@gmail.com');" target="_blank">linguin.m.s@<mailto:linguin.m.s@>...] Sent: Monday, September 12, 2016 7:12 PM To: Mendelson, Assaf Cc: <a href="javascript:_e(%7B%7D,'cvml','dev@spark.apache.org');" target="_blank">dev@... Subject: Re: UDF and native functions performance Hi, I think you'd better off comparing the gen'd code of `df.filter` and your gen'd code by using .debugCodegen(). // maropu On Mon, Sep 12, 2016 at 7:43 PM, assaf.mendelson <<a href="javascript:_e(%7B%7D,'cvml','assaf.mendel...@rsa.com');" target="_blank">assaf.mendelson@<mailto:assaf.mendelson@>...> wrote: I am trying to create UDFs with improved performance. So I decided to compare several ways of doing it. In general I created a dataframe using range with 50M elements, cached it and counted it to manifest it. I then implemented a simple predicate (x<10) in 4 different ways, counted the elements and timed it. The 4 ways were: - Standard expression (took 90 millisonds) - Udf (took 539 miliseconds) - Codegen (took 358 miliseconds) - Dataset filter (took 1022 miliseconds) I understand why filter is so much slower. I also understand why UDF is slower (with volcano model taking up processing time). I do not understand why the codegen I created is so slow. What am I missing? The code to generate the numbers is followed: import org.apache.spark.sql.codegenFuncs._ val df = spark.range(50000000).withColumnRenamed("id","smaller") df.cache().count() val base_filter_df = df.filter(df("smaller") < 10) import org.apache.spark.sql.functions.udf def asUdf=udf((x: Int) => x < 10) val udf_filter_df = df.filter(asUdf(df("smaller"))) val my_func = df.filter(genf_func(df("smaller"))) case class tmpclass(smaller: BigInt) val simpleFilter = df.as<http://df.as>[tmpclass].filter((x: tmpclass) => (x.smaller < 10)) def time[R](block: => R) = { val t0 = System.nanoTime() val result = block // call-by-name val t1 = System.nanoTime() (t1 - t0)/1000000 } def avgTime[R](block: => R) = { val times = for (i <- 1 to 5) yield time(block) times.sum / 5 } println("base " + avgTime(base_filter_df.count())) //>> got a result of 90 println("udf " + avgTime(udf_filter_df.count())) //>> got a result of 539 println("codegen " + avgTime(my_func.count())) //>> got a result of 358 println("filter " + avgTime(simpleFilter.count())) //>> got a result of 1022 And the code for the genf_func: package org.apache.spark.sql import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.types._ import org.apache.spark.sql.catalyst.expressions._ object codegenFuncs { case class genf(child: Expression) extends UnaryExpression with Predicate with ImplicitCastInputTypes { override def inputTypes: Seq[AbstractDataType] = Seq(IntegerType) override def toString: String = s"$child < 10" override def eval(input: InternalRow): Any = { val value = child.eval(input) if (value == null) { false } else { child.dataType match { case IntegerType => value.asInstanceOf[Int] < 10 } } } override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { defineCodeGen(ctx, ev, c => s"($c) < 10") } } private def withExpr(expr: Expression): Column = Column(expr) def genf_func(v: Column): Column = withExpr { genf(v.expr) } } ________________________________ View this message in context: UDF and native functions performance<http://apache-spark-developers-list.1001551.n3.nabble.com/UDF-and-native-functions-performance-tp18920.html> Sent from the Apache Spark Developers List mailing list archive<http://apache-spark-developers-list.1001551.n3.nabble.com/> at Nabble.com. -- --- Takeshi Yamamuro ________________________________ If you reply to this email, your message will be added to the discussion below: http://apache-spark-developers-list.1001551.n3.nabble.com/UDF-and-native-functions-performance-tp18920p18932.html To start a new topic under Apache Spark Developers List, email ml-node+s1001551n1...@n3.nabble.com<mailto:ml-node+s1001551n1...@n3.nabble.com> To unsubscribe from Apache Spark Developers List, click here<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==>. NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml> -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/UDF-and-native-functions-performance-tp18920p18933.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com.