What is the constraint framework?
How would I add the same optimization to the sample function I created?


From: rxin [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n18932...@n3.nabble.com]
Sent: Tuesday, September 13, 2016 3:37 AM
To: Mendelson, Assaf
Subject: Re: UDF and native functions performance

Not sure if this is why but perhaps the constraint framework?

On Tuesday, September 13, 2016, Mendelson, Assaf <[hidden 
email]</user/SendEmail.jtp?type=node&node=18932&i=0>> wrote:
I did, they look the same:

scala> my_func.explain(true)
== Parsed Logical Plan ==
Filter smaller#3L < 10
+- Project [id#0L AS smaller#3L]
   +- Range (0, 500000000, splits=1)

== Analyzed Logical Plan ==
smaller: bigint
Filter smaller#3L < 10
+- Project [id#0L AS smaller#3L]
   +- Range (0, 500000000, splits=1)

== Optimized Logical Plan ==
Filter smaller#3L < 10
+- InMemoryRelation [smaller#3L], true, 10000, StorageLevel(disk, memory, 
deserialized, 1 replicas)
   :  +- *Project [id#0L AS smaller#3L]
   :     +- *Range (0, 500000000, splits=1)

== Physical Plan ==
*Filter smaller#3L < 10
+- InMemoryTableScan [smaller#3L], [smaller#3L < 10]
   :  +- InMemoryRelation [smaller#3L], true, 10000, StorageLevel(disk, memory, 
deserialized, 1 replicas)
   :     :  +- *Project [id#0L AS smaller#3L]
   :     :     +- *Range (0, 500000000, splits=1)

scala> base_filter_df.explain(true)
== Parsed Logical Plan ==
'Filter (smaller#3L < 10)
+- Project [id#0L AS smaller#3L]
   +- Range (0, 500000000, splits=1)

== Analyzed Logical Plan ==
smaller: bigint
Filter (smaller#3L < cast(10 as bigint))
+- Project [id#0L AS smaller#3L]
   +- Range (0, 500000000, splits=1)

== Optimized Logical Plan ==
Filter (smaller#3L < 10)
+- InMemoryRelation [smaller#3L], true, 10000, StorageLevel(disk, memory, 
deserialized, 1 replicas)
   :  +- *Project [id#0L AS smaller#3L]
   :     +- *Range (0, 500000000, splits=1)

== Physical Plan ==
*Filter (smaller#3L < 10)
+- InMemoryTableScan [smaller#3L], [(smaller#3L < 10)]
   :  +- InMemoryRelation [smaller#3L], true, 10000, StorageLevel(disk, memory, 
deserialized, 1 replicas)
   :     :  +- *Project [id#0L AS smaller#3L]
   :     :     +- *Range (0, 500000000, splits=1)


Also when I do:

import org.apache.spark.sql.execution.debug._
df.debugCodegen

on both of them they are identical.
I did notice that if I change the code to do > instead of < then they have 
almost the same performance so I imagine this has something to do with some 
optimization that understands that range is ordered and therefore once the 
first condition fails, all would fail.
The problem is I don’t see this in the plan, nor can I find it in the code.


From: Takeshi Yamamuro [mailto:<a 
href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;linguin....@gmail.com&#39;);" 
target="_blank">linguin.m.s@<mailto:linguin.m.s@>...]
Sent: Monday, September 12, 2016 7:12 PM
To: Mendelson, Assaf
Cc: <a 
href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;dev@spark.apache.org&#39;);" 
target="_blank">dev@...
Subject: Re: UDF and native functions performance

Hi,

I think you'd better off comparing the gen'd code of `df.filter` and your gen'd 
code
by using .debugCodegen().

// maropu

On Mon, Sep 12, 2016 at 7:43 PM, assaf.mendelson <<a 
href="javascript:_e(%7B%7D,&#39;cvml&#39;,&#39;assaf.mendel...@rsa.com&#39;);" 
target="_blank">assaf.mendelson@<mailto:assaf.mendelson@>...> wrote:
I am trying to create UDFs with improved performance. So I decided to compare 
several ways of doing it.
In general I created a dataframe using range with 50M elements, cached it and 
counted it to manifest it.

I then implemented a simple predicate (x<10) in 4 different ways, counted the 
elements and timed it.
The 4 ways were:

-          Standard expression (took 90 millisonds)

-          Udf  (took 539 miliseconds)

-          Codegen (took 358 miliseconds)

-          Dataset filter (took 1022 miliseconds)

I understand why filter is so much slower. I also understand why UDF is slower 
(with volcano model taking up processing time).
I do not understand why the codegen I created is so slow. What am I missing?

The code to generate the numbers is followed:

import org.apache.spark.sql.codegenFuncs._
val df = spark.range(50000000).withColumnRenamed("id","smaller")
df.cache().count()

val base_filter_df = df.filter(df("smaller") < 10)

import org.apache.spark.sql.functions.udf
def asUdf=udf((x: Int) => x < 10)
val udf_filter_df = df.filter(asUdf(df("smaller")))

val my_func = df.filter(genf_func(df("smaller")))

case class tmpclass(smaller: BigInt)

val simpleFilter = df.as<http://df.as>[tmpclass].filter((x: tmpclass) => 
(x.smaller < 10))

def time[R](block: => R) = {
    val t0 = System.nanoTime()
    val result = block    // call-by-name
    val t1 = System.nanoTime()
    (t1 - t0)/1000000
}

def avgTime[R](block: => R) = {
    val times = for (i <- 1 to 5) yield time(block)
    times.sum / 5
}


println("base " + avgTime(base_filter_df.count()))
//>> got a result of 90
println("udf " + avgTime(udf_filter_df.count()))
//>> got a result of 539
println("codegen " + avgTime(my_func.count()))
//>> got a result of 358
println("filter " + avgTime(simpleFilter.count()))
//>> got a result of 1022

And the code for the genf_func:

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.expressions._

object codegenFuncs {
  case class genf(child: Expression) extends UnaryExpression with Predicate 
with ImplicitCastInputTypes {

    override def inputTypes: Seq[AbstractDataType] = Seq(IntegerType)

    override def toString: String = s"$child < 10"

    override def eval(input: InternalRow): Any = {
      val value = child.eval(input)
      if (value == null)
      {
        false
      } else {
        child.dataType match {
          case IntegerType => value.asInstanceOf[Int] < 10
        }
      }
    }

    override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
      defineCodeGen(ctx, ev, c => s"($c) < 10")
    }
  }

  private def withExpr(expr: Expression): Column = Column(expr)

  def genf_func(v: Column): Column = withExpr { genf(v.expr) }
}



________________________________
View this message in context: UDF and native functions 
performance<http://apache-spark-developers-list.1001551.n3.nabble.com/UDF-and-native-functions-performance-tp18920.html>
Sent from the Apache Spark Developers List mailing list 
archive<http://apache-spark-developers-list.1001551.n3.nabble.com/> at 
Nabble.com.



--
---
Takeshi Yamamuro

________________________________
If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/UDF-and-native-functions-performance-tp18920p18932.html
To start a new topic under Apache Spark Developers List, email 
ml-node+s1001551n1...@n3.nabble.com<mailto:ml-node+s1001551n1...@n3.nabble.com>
To unsubscribe from Apache Spark Developers List, click 
here<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==>.
NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/UDF-and-native-functions-performance-tp18920p18933.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Reply via email to