[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-30 Thread ueshin
Github user ueshin commented on the pull request:

https://github.com/apache/spark/pull/993#issuecomment-50578802
  
Hi @marmbrus, thanks for great work!
But it seems to break build.

I got the following result when I run `sbt assembly` or `sbt publish-local`:

```
[error] (catalyst/compile:doc) Scaladoc generation failed
```

and I found a lot of error messages in the build log saying `value q is not 
a member of StringContext`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-30 Thread marmbrus
Github user marmbrus commented on the pull request:

https://github.com/apache/spark/pull/993#issuecomment-50580695
  
@ueshin thanks for reporting. I've opened #1653 to fix this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/993#issuecomment-50534281
  
QA tests have started for PR 993. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17376/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-29 Thread marmbrus
Github user marmbrus commented on the pull request:

https://github.com/apache/spark/pull/993#issuecomment-50563076
  
test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-29 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/993#issuecomment-50563149
  
QA tests have started for PR 993. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17402/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-29 Thread marmbrus
Github user marmbrus commented on the pull request:

https://github.com/apache/spark/pull/993#issuecomment-50571300
  
Thanks for looking at this everyone.  I've merged it into master!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/993


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-28 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/993#issuecomment-50379727
  
QA tests have started for PR 993. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17295/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/993#issuecomment-50287338
  
QA tests have started for PR 993. This patch DID NOT merge cleanly! 
brView progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17251/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/993#issuecomment-50287472
  
QA tests have started for PR 993. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17252/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-27 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/993#issuecomment-50290093
  
QA results for PR 993:br- This patch PASSES unit tests.brbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17251/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-27 Thread liancheng
Github user liancheng commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15444048
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala ---
@@ -39,6 +39,18 @@ trait SQLConf {
   private[spark] def numShufflePartitions: Int = get(SHUFFLE_PARTITIONS, 
200).toInt
 
   /**
+   * When set to true, Spark SQL will use the Scala compiler at runtime to 
generate custom bytecode
+   * that evaluates expressions found in queries.  In general this custom 
code runs much faster
+   * than interpreted evaluation, but there are significant start-up costs 
due to compilation.
+   * As a result codegen is only benificial when queries run for a long 
time, or when the same
+   * expressions are used multiple times.
+   *
+   * Defaults to false as this feature is currently experimental.
+   */
+  private[spark] def codegenEnabled: Boolean =
+if (get(spark.sql.codegen, false) == true) true else false
--- End diff --

Collected all Spark SQL configurations properties in [`object 
SQLConf`](https://github.com/apache/spark/blob/81fcdd22c8ef52889ed51b3ec5c2747708505fc2/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala#L94-L102)
 in the JDBC Thrift server PR. We can put this one there too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-26 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15435513
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import com.google.common.cache.{CacheLoader, CacheBuilder}
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code to perform expression 
evaluation.  Includes a set of
+ * helpers for referring to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends 
Logging {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  protected val toolBox = 
runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  protected val rowType = typeOf[Row]
+  protected val mutableRowType = typeOf[MutableRow]
+  protected val genericRowType = typeOf[GenericRow]
+  protected val genericMutableRowType = typeOf[GenericMutableRow]
+
+  protected val projectionType = typeOf[Projection]
+  protected val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeparator = $
+
+  /**
+   * Generates a class for a given input expression.  Called when there is 
not cached code
+   * already available.
+   */
+  protected def create(in: InType): OutType
+
+  /**
+   * Canonicalizes an input expression. Used to avoid double caching 
expressions that differ only
+   * cosmetically.
+   */
+  protected def canonicalize(in: InType): InType
+
+  /** Binds an input expression to a given input schema */
+  protected def bind(in: InType, inputSchema: Seq[Attribute]): InType
+
+  protected val cache = CacheBuilder.newBuilder()
+.maximumSize(1000)
+.build(
+  new CacheLoader[InType, OutType]() {
+override def load(in: InType): OutType = globalLock.synchronized {
+   create(in)
+}
+  })
+
+  def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType=
--- End diff --

I'll agree with you that the use of `apply` after a `target is a little 
confusing, but in all the other cases we are using apply in places where the 
object is acting like a stateful function.

In this particular case the apply is so you can do something like 
`GenerateProjection(...)`, which is pretty standard factory method style.  I 
have added a comment though to clarify what this function does.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-26 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15435514
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
 ---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+
+/**
+ * Generates bytecode that produces a new [[Row]] object based on a fixed 
set of input
+ * [[Expression Expressions]] and a given input [[Row]].  The returned 
[[Row]] object is custom
+ * generated based on the output types of the [[Expression]] to avoid 
boxing of primitive values.
+ */
+object GenerateProjection extends CodeGenerator[Seq[Expression], 
Projection] {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  protected def canonicalize(in: Seq[Expression]): Seq[Expression] =
+in.map(ExpressionCanonicalizer(_))
+
+  protected def bind(in: Seq[Expression], inputSchema: Seq[Attribute]): 
Seq[Expression] =
+in.map(BindReferences.bindReference(_, inputSchema))
+
+  // Make Mutablility optional...
+  protected def create(expressions: Seq[Expression]): Projection = {
+val tupleLength = ru.Literal(Constant(expressions.length))
+val lengthDef = qfinal val length = $tupleLength
+
+/* TODO: Configurable...
+val nullFunctions =
+  q
+private final val nullSet = new 
org.apache.spark.util.collection.BitSet(length)
+final def setNullAt(i: Int) = nullSet.set(i)
+final def isNullAt(i: Int) = nullSet.get(i)
+  
+ */
+
+val nullFunctions =
+  q
+private[this] var nullBits = new 
Array[Boolean](${expressions.size})
+final def setNullAt(i: Int) = { nullBits(i) = true }
+final def isNullAt(i: Int) = nullBits(i)
+  .children
+
+val tupleElements = expressions.zipWithIndex.flatMap {
+  case (e, i) =
+val elementName = newTermName(sc$i)
+val evaluatedExpression = expressionEvaluator(e)
--- End diff --

`CodeGenerator`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-26 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15435527
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import com.google.common.cache.{CacheLoader, CacheBuilder}
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code to perform expression 
evaluation.  Includes a set of
+ * helpers for referring to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends 
Logging {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  protected val toolBox = 
runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  protected val rowType = typeOf[Row]
+  protected val mutableRowType = typeOf[MutableRow]
+  protected val genericRowType = typeOf[GenericRow]
+  protected val genericMutableRowType = typeOf[GenericMutableRow]
+
+  protected val projectionType = typeOf[Projection]
+  protected val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeparator = $
+
+  /**
+   * Generates a class for a given input expression.  Called when there is 
not cached code
+   * already available.
+   */
+  protected def create(in: InType): OutType
+
+  /**
+   * Canonicalizes an input expression. Used to avoid double caching 
expressions that differ only
+   * cosmetically.
+   */
+  protected def canonicalize(in: InType): InType
+
+  /** Binds an input expression to a given input schema */
+  protected def bind(in: InType, inputSchema: Seq[Attribute]): InType
+
+  protected val cache = CacheBuilder.newBuilder()
+.maximumSize(1000)
+.build(
+  new CacheLoader[InType, OutType]() {
+override def load(in: InType): OutType = globalLock.synchronized {
+   create(in)
+}
+  })
+
+  def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType=
+apply(bind(expressions, inputSchema))
+
+  def apply(expressions: InType): OutType = 
cache.get(canonicalize(expressions))
+
+  /**
+   * Returns a term name that is unique within this instance of a 
`CodeGenerator`.
+   *
+   * (Since we aren't in a macro context we do not seem to have access to 
the built in `freshName`
+   * function.)
+   */
+  protected def freshName(prefix: String): TermName = {
+newTermName(s$prefix$javaSeparator${curId.getAndIncrement})
+  }
+
+  /**
+   * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input.
+   *
+   * @param code The sequence of statements required to evaluate the 
expression.
+   * @param nullTerm A term that holds a boolean value representing 
whether the expression evaluated
+   * to null.
+   * @param primitiveTerm A term for a possible primitive value of the 
result of the evaluation. Not
+   *  valid if `nullTerm` is set to `false`.
+   * @param objectTerm A possibly boxed version of the result of 
evaluating this expression.
+   */
+  protected case class EvaluatedExpression(
+  code: Seq[Tree],
+  nullTerm: TermName,
+  primitiveTerm: TermName,
+  objectTerm: TermName)
+
+  /**
+   * Given an expression tree returns an [[EvaluatedExpression]], which 
contains Scala trees that
+   * can 

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-26 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15435534
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import com.google.common.cache.{CacheLoader, CacheBuilder}
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code to perform expression 
evaluation.  Includes a set of
+ * helpers for referring to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends 
Logging {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  protected val toolBox = 
runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  protected val rowType = typeOf[Row]
+  protected val mutableRowType = typeOf[MutableRow]
+  protected val genericRowType = typeOf[GenericRow]
+  protected val genericMutableRowType = typeOf[GenericMutableRow]
+
+  protected val projectionType = typeOf[Projection]
+  protected val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeparator = $
+
+  /**
+   * Generates a class for a given input expression.  Called when there is 
not cached code
+   * already available.
+   */
+  protected def create(in: InType): OutType
+
+  /**
+   * Canonicalizes an input expression. Used to avoid double caching 
expressions that differ only
+   * cosmetically.
+   */
+  protected def canonicalize(in: InType): InType
+
+  /** Binds an input expression to a given input schema */
+  protected def bind(in: InType, inputSchema: Seq[Attribute]): InType
+
+  protected val cache = CacheBuilder.newBuilder()
+.maximumSize(1000)
+.build(
+  new CacheLoader[InType, OutType]() {
+override def load(in: InType): OutType = globalLock.synchronized {
+   create(in)
+}
+  })
+
+  def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType=
+apply(bind(expressions, inputSchema))
+
+  def apply(expressions: InType): OutType = 
cache.get(canonicalize(expressions))
+
+  /**
+   * Returns a term name that is unique within this instance of a 
`CodeGenerator`.
+   *
+   * (Since we aren't in a macro context we do not seem to have access to 
the built in `freshName`
+   * function.)
+   */
+  protected def freshName(prefix: String): TermName = {
+newTermName(s$prefix$javaSeparator${curId.getAndIncrement})
+  }
+
+  /**
+   * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input.
+   *
+   * @param code The sequence of statements required to evaluate the 
expression.
+   * @param nullTerm A term that holds a boolean value representing 
whether the expression evaluated
+   * to null.
+   * @param primitiveTerm A term for a possible primitive value of the 
result of the evaluation. Not
+   *  valid if `nullTerm` is set to `false`.
+   * @param objectTerm A possibly boxed version of the result of 
evaluating this expression.
+   */
+  protected case class EvaluatedExpression(
+  code: Seq[Tree],
+  nullTerm: TermName,
+  primitiveTerm: TermName,
+  objectTerm: TermName)
+
+  /**
+   * Given an expression tree returns an [[EvaluatedExpression]], which 
contains Scala trees that
+   * can 

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-26 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15435573
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratedEvaluationSuite.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen._
+
+/**
+ * Overrides our expression evaluation tests to use code generation for 
evaluation.
+ */
+class GeneratedEvaluationSuite extends ExpressionEvaluationSuite {
+  override def checkEvaluation(
+  expression: Expression,
+  expected: Any,
+  inputRow: Row = EmptyRow): Unit = {
+val plan = try {
+  GenerateMutableProjection(Alias(expression, 
sOptimized($expression))() :: Nil)()
+} catch {
+  case e: Throwable =
+val evaluated = GenerateProjection.expressionEvaluator(expression)
+fail(
+  s
+|Code generation of $expression failed:
+|${evaluated.code.mkString(\n)}
+|$e
+  .stripMargin)
+}
+
+val actual  = plan(inputRow).apply(0)
+if(actual != expected) {
+  val input = if(inputRow == EmptyRow)  else s, input: $inputRow
+  fail(sIncorrect Evaluation: $expression, actual: $actual, expected: 
$expected$input)
+}
+  }
+
+
+  test(multithreaded eval) {
+import scala.concurrent._
+import ExecutionContext.Implicits.global
+import scala.concurrent.duration._
+
+val futures = (1 to 20).map { _ =
+  future {
+GeneratePredicate(EqualTo(Literal(1), Literal(1)))
+GenerateProjection(EqualTo(Literal(1), Literal(1)) :: Nil)
+GenerateMutableProjection(EqualTo(Literal(1), Literal(1)) :: Nil)
+GenerateOrdering(Add(Literal(1), Literal(1)).asc :: Nil)
+  }
+}
+
+futures.foreach(Await.result(_, 10.seconds))
+  }
+}
+
+/**
+ * Overrides our expression evaluation tests to use generated code on 
mutable rows.
+ */
+class GeneratedMutableEvaluationSuite extends ExpressionEvaluationSuite {
+  override def checkEvaluation(
+  expression: Expression,
+  expected: Any,
+  inputRow: Row = EmptyRow): Unit = {
+lazy val evaluated = GenerateProjection.expressionEvaluator(expression)
+
+val plan = try {
+  GenerateProjection(Alias(expression, sOptimized($expression))() :: 
Nil)
+} catch {
+  case e: Throwable =
+fail(
+  s
+|Code generation of $expression failed:
+|${evaluated.code.mkString(\n)}
+|$e
+  .stripMargin)
+}
+
+val actual = plan(inputRow)
+val expectedRow = new GenericRow(Array[Any](expected))
+if (actual.hashCode() != expectedRow.hashCode()) {
+  fail(
+s
+  |Mismatched hashCodes for values: $actual, $expectedRow
+  |Hash Codes: ${actual.hashCode()} != ${expectedRow.hashCode()}
+  |${evaluated.code.mkString(\n)}
+.stripMargin)
+}
+if (actual != expectedRow) {
+  val input = if(inputRow == EmptyRow)  else s, input: $inputRow
+  fail(sIncorrect Evaluation: $expression, actual: $actual, expected: 
$expected$input)
+}
+  }
+}
--- End diff --

Yeah, we should make scalastyle check test files too... though I'm afraid 
there will be a fair amount of work to make that pass.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a 

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-26 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15435619
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -51,8 +82,46 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging {
*/
   def executeCollect(): Array[Row] = execute().map(_.copy()).collect()
 
-  protected def buildRow(values: Seq[Any]): Row =
-new GenericRow(values.toArray)
+  protected def newProjection(
+  expressions: Seq[Expression], inputSchema: Seq[Attribute]): 
Projection = {
+log.debug(
+  sCreating Projection: $expressions, inputSchema: $inputSchema, 
codegen:$codegenEnabled)
+if (codegenEnabled) {
+  GenerateProjection(expressions, inputSchema)
+} else {
+  new InterpretedProjection(expressions, inputSchema)
+}
+  }
+
+  protected def newMutableProjection(
+  expressions: Seq[Expression],
+  inputSchema: Seq[Attribute]): () = MutableProjection = {
+log.debug(
+  sCreating MutableProj: $expressions, inputSchema: $inputSchema, 
codegen:$codegenEnabled)
+if(codegenEnabled) {
+  GenerateMutableProjection(expressions, inputSchema)
--- End diff --

This is standard factory method in an object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-26 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15435630
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala 
---
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.types._
+
+case class AggregateEvaluation(
+schema: Seq[Attribute],
+initialValues: Seq[Expression],
+update: Seq[Expression],
+result: Expression)
+
+/**
+ * :: DeveloperApi ::
+ * Alternate version of aggregation that leverages projection and thus 
code generation.
+ * Aggregations are converted into a set of projections from a aggregation 
buffer tuple back onto
+ * itself. Currently only used for simple aggregations like SUM, COUNT, or 
AVERAGE are supported.
+ *
+ * @param partial if true then aggregation is done partially on local data 
without shuffling to
+ *ensure all values where `groupingExpressions` are equal 
are present.
+ * @param groupingExpressions expressions that are evaluated to determine 
grouping.
+ * @param aggregateExpressions expressions that are computed for each 
group.
+ * @param child the input data source.
+ */
+@DeveloperApi
+case class GeneratedAggregate(
+partial: Boolean,
+groupingExpressions: Seq[Expression],
+aggregateExpressions: Seq[NamedExpression],
+child: SparkPlan)
+  extends UnaryNode {
+
+  override def requiredChildDistribution =
+if (partial) {
+  UnspecifiedDistribution :: Nil
+} else {
+  if (groupingExpressions == Nil) {
+AllTuples :: Nil
+  } else {
+ClusteredDistribution(groupingExpressions) :: Nil
+  }
+}
+
+  override def output = aggregateExpressions.map(_.toAttribute)
+
+  override def execute() = {
+val aggregatesToCompute = aggregateExpressions.flatMap { a =
+  a.collect { case agg: AggregateExpression = agg}
+}
+
+val computeFunctions = aggregatesToCompute.map {
+  case c @ Count(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val initialValue = Literal(0L)
+val updateFunction = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val result = currentCount
+
+AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case Sum(expr) =
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialValue = Cast(Literal(0L), expr.dataType)
+
+// Coalasce avoids double calculation...
+// but really, common sub expression elimination would be 
better
+val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum 
:: Nil)
+val result = currentSum
+
+AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case a @ Average(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialCount = Literal(0L)
+val initialSum = Cast(Literal(0L), expr.dataType)
+val updateCount = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: 
Nil)
+
+val result = Divide(Cast(currentSum, DoubleType), 
Cast(currentCount, DoubleType))
+
+AggregateEvaluation(
+  currentCount :: currentSum :: Nil,

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-26 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/993#issuecomment-50247318
  
QA tests have started for PR 993. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17227/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-26 Thread marmbrus
Github user marmbrus commented on the pull request:

https://github.com/apache/spark/pull/993#issuecomment-50247741
  
Hey @rxin,  thanks for the careful review!  I think I've addressed most of 
your comments.  Regarding the GeneratedAggregate code, I'm happy to sit down 
and explain in more detail at some point.  One thing to note is that its only 
used in a few circumstances at the moment, and its pretty easy to switch off if 
we find it causes problems in the future.  That said, I think there is the 
possibility of pretty huge speed up here.  Here's an example that demonstrates 
how the rewrite happens for a simple query:

`hql(SELECT AVG(key) + 1 FROM src GROUP BY value)`

Partial Aggregation:
```
Initial values: 0,CAST(0, LongType)
Grouping Projection: value#25
Update Expressions: if (IS NOT NULL CAST(key#24, LongType)) 
(currentCount#30L + 1) else currentCount#30L,Coalesce((CAST(key#24, LongType) + 
currentSum#31L),currentSum#31L)
Result Projection: value#25,currentCount#30L AS 
PartialCount#27L,currentSum#31L AS PartialSum#26L
```

The updates compute the new currentSum and currentCount given an update 
buffer joined with the input row.

Final aggregation:
```
Initial values: CAST(0, LongType),CAST(0, LongType)
Grouping Projection: value#25
Update Expressions: Coalesce((PartialSum#26L + 
currentSum#28L),currentSum#28L),Coalesce((PartialCount#27L + 
currentSum#29L),currentSum#29L)
Result Projection: ((CAST(currentSum#28L, DoubleType) / 
CAST(currentSum#29L, DoubleType)) + 1.0) AS c_0#22
```

The updates calculate the sum of the counts and partial sums.  The result 
divides them and adds 1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-26 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15437450
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala ---
@@ -47,23 +47,29 @@ case class Generate(
 }
   }
 
-  override def output =
+  // This must be a val since the generator output expr ids are not 
preserved by serialization.
+  override val output =
 if (join) child.output ++ generatorOutput else generatorOutput
 
+  val boundGenerator = BindReferences.bindReference(generator, 
child.output)
+
+  /** Codegenned rows are not serializable... */
--- End diff --

Actually it was a misunderstanding (the comment is misleading). I thought 
you'd want to code gen them in the fturue when we make it serializable. maybe 
expand the comment to explain why codegen is disabled (what's the relationship 
between serializable and generate?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-26 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15437455
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -18,22 +18,53 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.Logging
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Logging, Row}
+import org.apache.spark.sql.{SQLContext, Row}
 import org.apache.spark.sql.catalyst.trees
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.GenericRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.BaseRelation
 import org.apache.spark.sql.catalyst.plans.physical._
 
+
+object SparkPlan {
+  protected[sql] val currentContext = new ThreadLocal[SQLContext]()
+}
+
 /**
  * :: DeveloperApi ::
  */
 @DeveloperApi
-abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {
+abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with 
Serializable {
   self: Product =
 
+  /**
+   * A handle to the SQL Context that was used to create this plan.   
Since many operators need
+   * access to the sqlContext for RDD operations or configuration this 
field is automatically
+   * populated by the query planning infrastructure.
+   */
+  @transient
+  protected val sqlContext = SparkPlan.currentContext.get()
+
+  protected def sparkContext = sqlContext.sparkContext
+
+  def logger = log
+
+  val codegenEnabled: Boolean = if(sqlContext != null) {
--- End diff --

would be great to just add that as inline comment.

also space after if


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15331132
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala
 ---
@@ -29,6 +29,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, 
children: Seq[Expressi
 
   override def eval(input: Row): Any = {
 children.size match {
+  case 0 = function.asInstanceOf[() = Any]()
--- End diff --

this is for another time, but if you add an explicitly init to expressions, 
we can move all of these branches from the inner loop (once per row) directly 
to the outer loop (once per partition).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15331160
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import com.google.common.cache.{CacheLoader, CacheBuilder}
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code to perform expression 
evaluation.  Includes a set of
+ * helpers for referring to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends 
Logging {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  protected val toolBox = 
runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  protected val rowType = typeOf[Row]
+  protected val mutableRowType = typeOf[MutableRow]
+  protected val genericRowType = typeOf[GenericRow]
+  protected val genericMutableRowType = typeOf[GenericMutableRow]
+
+  protected val projectionType = typeOf[Projection]
+  protected val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeparator = $
+
+  /**
+   * Generates a class for a given input expression.  Called when there is 
not cached code
+   * already available.
+   */
+  protected def create(in: InType): OutType
+
+  /**
+   * Canonicalizes an input expression. Used to avoid double caching 
expressions that differ only
+   * cosmetically.
+   */
+  protected def canonicalize(in: InType): InType
+
+  /** Binds an input expression to a given input schema */
+  protected def bind(in: InType, inputSchema: Seq[Attribute]): InType
+
+  protected val cache = CacheBuilder.newBuilder()
--- End diff --

we should comment on the behavior of this cache so readers don't have to go 
read Guava documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15331362
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import com.google.common.cache.{CacheLoader, CacheBuilder}
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code to perform expression 
evaluation.  Includes a set of
+ * helpers for referring to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends 
Logging {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  protected val toolBox = 
runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  protected val rowType = typeOf[Row]
+  protected val mutableRowType = typeOf[MutableRow]
+  protected val genericRowType = typeOf[GenericRow]
+  protected val genericMutableRowType = typeOf[GenericMutableRow]
+
+  protected val projectionType = typeOf[Projection]
+  protected val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeparator = $
+
+  /**
+   * Generates a class for a given input expression.  Called when there is 
not cached code
+   * already available.
+   */
+  protected def create(in: InType): OutType
+
+  /**
+   * Canonicalizes an input expression. Used to avoid double caching 
expressions that differ only
+   * cosmetically.
+   */
+  protected def canonicalize(in: InType): InType
+
+  /** Binds an input expression to a given input schema */
+  protected def bind(in: InType, inputSchema: Seq[Attribute]): InType
+
+  protected val cache = CacheBuilder.newBuilder()
+.maximumSize(1000)
+.build(
+  new CacheLoader[InType, OutType]() {
+override def load(in: InType): OutType = globalLock.synchronized {
+   create(in)
+}
+  })
+
+  def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType=
--- End diff --

nit: space before =

Also would be great to add inline doc on what these apply's are doing, 
since apply's are used quite a lot in catalyst with different semantics.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15332107
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
 ---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+
+/**
+ * Generates bytecode that produces a new [[Row]] object based on a fixed 
set of input
+ * [[Expression Expressions]] and a given input [[Row]].  The returned 
[[Row]] object is custom
+ * generated based on the output types of the [[Expression]] to avoid 
boxing of primitive values.
+ */
+object GenerateProjection extends CodeGenerator[Seq[Expression], 
Projection] {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  protected def canonicalize(in: Seq[Expression]): Seq[Expression] =
+in.map(ExpressionCanonicalizer(_))
+
+  protected def bind(in: Seq[Expression], inputSchema: Seq[Attribute]): 
Seq[Expression] =
+in.map(BindReferences.bindReference(_, inputSchema))
+
+  // Make Mutablility optional...
+  protected def create(expressions: Seq[Expression]): Projection = {
+val tupleLength = ru.Literal(Constant(expressions.length))
+val lengthDef = qfinal val length = $tupleLength
+
+/* TODO: Configurable...
+val nullFunctions =
+  q
+private final val nullSet = new 
org.apache.spark.util.collection.BitSet(length)
+final def setNullAt(i: Int) = nullSet.set(i)
+final def isNullAt(i: Int) = nullSet.get(i)
+  
+ */
+
+val nullFunctions =
+  q
+private[this] var nullBits = new 
Array[Boolean](${expressions.size})
+final def setNullAt(i: Int) = { nullBits(i) = true }
+final def isNullAt(i: Int) = nullBits(i)
+  .children
+
+val tupleElements = expressions.zipWithIndex.flatMap {
+  case (e, i) =
+val elementName = newTermName(sc$i)
+val evaluatedExpression = expressionEvaluator(e)
--- End diff --

where is expressionEvaluator defined?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15332175
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import com.google.common.cache.{CacheLoader, CacheBuilder}
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code to perform expression 
evaluation.  Includes a set of
+ * helpers for referring to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends 
Logging {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  protected val toolBox = 
runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  protected val rowType = typeOf[Row]
+  protected val mutableRowType = typeOf[MutableRow]
+  protected val genericRowType = typeOf[GenericRow]
+  protected val genericMutableRowType = typeOf[GenericMutableRow]
+
+  protected val projectionType = typeOf[Projection]
+  protected val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeparator = $
+
+  /**
+   * Generates a class for a given input expression.  Called when there is 
not cached code
+   * already available.
+   */
+  protected def create(in: InType): OutType
+
+  /**
+   * Canonicalizes an input expression. Used to avoid double caching 
expressions that differ only
+   * cosmetically.
+   */
+  protected def canonicalize(in: InType): InType
+
+  /** Binds an input expression to a given input schema */
+  protected def bind(in: InType, inputSchema: Seq[Attribute]): InType
+
+  protected val cache = CacheBuilder.newBuilder()
+.maximumSize(1000)
+.build(
+  new CacheLoader[InType, OutType]() {
+override def load(in: InType): OutType = globalLock.synchronized {
+   create(in)
+}
+  })
+
+  def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType=
--- End diff --

actually after reading through the code more, i'd argue we should rename 
this function from apply to something more informative. unless semantically 
very explicit  obvious (e.g. array), the use of apply makes it harder to 
understand code around its usage. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r1512
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import com.google.common.cache.{CacheLoader, CacheBuilder}
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code to perform expression 
evaluation.  Includes a set of
+ * helpers for referring to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends 
Logging {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  protected val toolBox = 
runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  protected val rowType = typeOf[Row]
+  protected val mutableRowType = typeOf[MutableRow]
+  protected val genericRowType = typeOf[GenericRow]
+  protected val genericMutableRowType = typeOf[GenericMutableRow]
+
+  protected val projectionType = typeOf[Projection]
+  protected val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeparator = $
+
+  /**
+   * Generates a class for a given input expression.  Called when there is 
not cached code
+   * already available.
+   */
+  protected def create(in: InType): OutType
+
+  /**
+   * Canonicalizes an input expression. Used to avoid double caching 
expressions that differ only
+   * cosmetically.
+   */
+  protected def canonicalize(in: InType): InType
+
+  /** Binds an input expression to a given input schema */
+  protected def bind(in: InType, inputSchema: Seq[Attribute]): InType
+
+  protected val cache = CacheBuilder.newBuilder()
+.maximumSize(1000)
+.build(
+  new CacheLoader[InType, OutType]() {
+override def load(in: InType): OutType = globalLock.synchronized {
+   create(in)
+}
+  })
+
+  def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType=
+apply(bind(expressions, inputSchema))
+
+  def apply(expressions: InType): OutType = 
cache.get(canonicalize(expressions))
+
+  /**
+   * Returns a term name that is unique within this instance of a 
`CodeGenerator`.
+   *
+   * (Since we aren't in a macro context we do not seem to have access to 
the built in `freshName`
+   * function.)
+   */
+  protected def freshName(prefix: String): TermName = {
+newTermName(s$prefix$javaSeparator${curId.getAndIncrement})
+  }
+
+  /**
+   * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input.
+   *
+   * @param code The sequence of statements required to evaluate the 
expression.
+   * @param nullTerm A term that holds a boolean value representing 
whether the expression evaluated
+   * to null.
+   * @param primitiveTerm A term for a possible primitive value of the 
result of the evaluation. Not
+   *  valid if `nullTerm` is set to `false`.
+   * @param objectTerm A possibly boxed version of the result of 
evaluating this expression.
+   */
+  protected case class EvaluatedExpression(
+  code: Seq[Tree],
+  nullTerm: TermName,
+  primitiveTerm: TermName,
+  objectTerm: TermName)
+
+  /**
+   * Given an expression tree returns an [[EvaluatedExpression]], which 
contains Scala trees that
+   * can be 

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r1552
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import com.google.common.cache.{CacheLoader, CacheBuilder}
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code to perform expression 
evaluation.  Includes a set of
+ * helpers for referring to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends 
Logging {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  protected val toolBox = 
runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  protected val rowType = typeOf[Row]
+  protected val mutableRowType = typeOf[MutableRow]
+  protected val genericRowType = typeOf[GenericRow]
+  protected val genericMutableRowType = typeOf[GenericMutableRow]
+
+  protected val projectionType = typeOf[Projection]
+  protected val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeparator = $
+
+  /**
+   * Generates a class for a given input expression.  Called when there is 
not cached code
+   * already available.
+   */
+  protected def create(in: InType): OutType
+
+  /**
+   * Canonicalizes an input expression. Used to avoid double caching 
expressions that differ only
+   * cosmetically.
+   */
+  protected def canonicalize(in: InType): InType
+
+  /** Binds an input expression to a given input schema */
+  protected def bind(in: InType, inputSchema: Seq[Attribute]): InType
+
+  protected val cache = CacheBuilder.newBuilder()
+.maximumSize(1000)
+.build(
+  new CacheLoader[InType, OutType]() {
+override def load(in: InType): OutType = globalLock.synchronized {
+   create(in)
+}
+  })
+
+  def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType=
+apply(bind(expressions, inputSchema))
+
+  def apply(expressions: InType): OutType = 
cache.get(canonicalize(expressions))
+
+  /**
+   * Returns a term name that is unique within this instance of a 
`CodeGenerator`.
+   *
+   * (Since we aren't in a macro context we do not seem to have access to 
the built in `freshName`
+   * function.)
+   */
+  protected def freshName(prefix: String): TermName = {
+newTermName(s$prefix$javaSeparator${curId.getAndIncrement})
+  }
+
+  /**
+   * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input.
+   *
+   * @param code The sequence of statements required to evaluate the 
expression.
+   * @param nullTerm A term that holds a boolean value representing 
whether the expression evaluated
+   * to null.
+   * @param primitiveTerm A term for a possible primitive value of the 
result of the evaluation. Not
+   *  valid if `nullTerm` is set to `false`.
+   * @param objectTerm A possibly boxed version of the result of 
evaluating this expression.
+   */
+  protected case class EvaluatedExpression(
+  code: Seq[Tree],
+  nullTerm: TermName,
+  primitiveTerm: TermName,
+  objectTerm: TermName)
+
+  /**
+   * Given an expression tree returns an [[EvaluatedExpression]], which 
contains Scala trees that
+   * can be 

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r1579
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import com.google.common.cache.{CacheLoader, CacheBuilder}
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code to perform expression 
evaluation.  Includes a set of
+ * helpers for referring to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends 
Logging {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  protected val toolBox = 
runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  protected val rowType = typeOf[Row]
+  protected val mutableRowType = typeOf[MutableRow]
+  protected val genericRowType = typeOf[GenericRow]
+  protected val genericMutableRowType = typeOf[GenericMutableRow]
+
+  protected val projectionType = typeOf[Projection]
+  protected val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeparator = $
+
+  /**
+   * Generates a class for a given input expression.  Called when there is 
not cached code
+   * already available.
+   */
+  protected def create(in: InType): OutType
+
+  /**
+   * Canonicalizes an input expression. Used to avoid double caching 
expressions that differ only
+   * cosmetically.
+   */
+  protected def canonicalize(in: InType): InType
+
+  /** Binds an input expression to a given input schema */
+  protected def bind(in: InType, inputSchema: Seq[Attribute]): InType
+
+  protected val cache = CacheBuilder.newBuilder()
+.maximumSize(1000)
+.build(
+  new CacheLoader[InType, OutType]() {
+override def load(in: InType): OutType = globalLock.synchronized {
+   create(in)
+}
+  })
+
+  def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType=
+apply(bind(expressions, inputSchema))
+
+  def apply(expressions: InType): OutType = 
cache.get(canonicalize(expressions))
+
+  /**
+   * Returns a term name that is unique within this instance of a 
`CodeGenerator`.
+   *
+   * (Since we aren't in a macro context we do not seem to have access to 
the built in `freshName`
+   * function.)
+   */
+  protected def freshName(prefix: String): TermName = {
+newTermName(s$prefix$javaSeparator${curId.getAndIncrement})
+  }
+
+  /**
+   * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input.
+   *
+   * @param code The sequence of statements required to evaluate the 
expression.
+   * @param nullTerm A term that holds a boolean value representing 
whether the expression evaluated
+   * to null.
+   * @param primitiveTerm A term for a possible primitive value of the 
result of the evaluation. Not
+   *  valid if `nullTerm` is set to `false`.
+   * @param objectTerm A possibly boxed version of the result of 
evaluating this expression.
+   */
+  protected case class EvaluatedExpression(
+  code: Seq[Tree],
+  nullTerm: TermName,
+  primitiveTerm: TermName,
+  objectTerm: TermName)
+
+  /**
+   * Given an expression tree returns an [[EvaluatedExpression]], which 
contains Scala trees that
+   * can be 

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333476
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import com.google.common.cache.{CacheLoader, CacheBuilder}
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code to perform expression 
evaluation.  Includes a set of
+ * helpers for referring to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends 
Logging {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  protected val toolBox = 
runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  protected val rowType = typeOf[Row]
+  protected val mutableRowType = typeOf[MutableRow]
+  protected val genericRowType = typeOf[GenericRow]
+  protected val genericMutableRowType = typeOf[GenericMutableRow]
+
+  protected val projectionType = typeOf[Projection]
+  protected val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeparator = $
+
+  /**
+   * Generates a class for a given input expression.  Called when there is 
not cached code
+   * already available.
+   */
+  protected def create(in: InType): OutType
+
+  /**
+   * Canonicalizes an input expression. Used to avoid double caching 
expressions that differ only
+   * cosmetically.
+   */
+  protected def canonicalize(in: InType): InType
+
+  /** Binds an input expression to a given input schema */
+  protected def bind(in: InType, inputSchema: Seq[Attribute]): InType
+
+  protected val cache = CacheBuilder.newBuilder()
+.maximumSize(1000)
+.build(
+  new CacheLoader[InType, OutType]() {
+override def load(in: InType): OutType = globalLock.synchronized {
+   create(in)
+}
+  })
+
+  def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType=
+apply(bind(expressions, inputSchema))
+
+  def apply(expressions: InType): OutType = 
cache.get(canonicalize(expressions))
+
+  /**
+   * Returns a term name that is unique within this instance of a 
`CodeGenerator`.
+   *
+   * (Since we aren't in a macro context we do not seem to have access to 
the built in `freshName`
+   * function.)
+   */
+  protected def freshName(prefix: String): TermName = {
+newTermName(s$prefix$javaSeparator${curId.getAndIncrement})
+  }
+
+  /**
+   * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input.
+   *
+   * @param code The sequence of statements required to evaluate the 
expression.
+   * @param nullTerm A term that holds a boolean value representing 
whether the expression evaluated
+   * to null.
+   * @param primitiveTerm A term for a possible primitive value of the 
result of the evaluation. Not
+   *  valid if `nullTerm` is set to `false`.
+   * @param objectTerm A possibly boxed version of the result of 
evaluating this expression.
+   */
+  protected case class EvaluatedExpression(
+  code: Seq[Tree],
+  nullTerm: TermName,
+  primitiveTerm: TermName,
+  objectTerm: TermName)
+
+  /**
+   * Given an expression tree returns an [[EvaluatedExpression]], which 
contains Scala trees that
+   * can be 

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333508
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import com.google.common.cache.{CacheLoader, CacheBuilder}
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code to perform expression 
evaluation.  Includes a set of
+ * helpers for referring to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends 
Logging {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  protected val toolBox = 
runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  protected val rowType = typeOf[Row]
+  protected val mutableRowType = typeOf[MutableRow]
+  protected val genericRowType = typeOf[GenericRow]
+  protected val genericMutableRowType = typeOf[GenericMutableRow]
+
+  protected val projectionType = typeOf[Projection]
+  protected val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeparator = $
+
+  /**
+   * Generates a class for a given input expression.  Called when there is 
not cached code
+   * already available.
+   */
+  protected def create(in: InType): OutType
+
+  /**
+   * Canonicalizes an input expression. Used to avoid double caching 
expressions that differ only
+   * cosmetically.
+   */
+  protected def canonicalize(in: InType): InType
+
+  /** Binds an input expression to a given input schema */
+  protected def bind(in: InType, inputSchema: Seq[Attribute]): InType
+
+  protected val cache = CacheBuilder.newBuilder()
+.maximumSize(1000)
+.build(
+  new CacheLoader[InType, OutType]() {
+override def load(in: InType): OutType = globalLock.synchronized {
+   create(in)
+}
+  })
+
+  def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType=
+apply(bind(expressions, inputSchema))
+
+  def apply(expressions: InType): OutType = 
cache.get(canonicalize(expressions))
+
+  /**
+   * Returns a term name that is unique within this instance of a 
`CodeGenerator`.
+   *
+   * (Since we aren't in a macro context we do not seem to have access to 
the built in `freshName`
+   * function.)
+   */
+  protected def freshName(prefix: String): TermName = {
+newTermName(s$prefix$javaSeparator${curId.getAndIncrement})
+  }
+
+  /**
+   * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input.
+   *
+   * @param code The sequence of statements required to evaluate the 
expression.
+   * @param nullTerm A term that holds a boolean value representing 
whether the expression evaluated
+   * to null.
+   * @param primitiveTerm A term for a possible primitive value of the 
result of the evaluation. Not
+   *  valid if `nullTerm` is set to `false`.
+   * @param objectTerm A possibly boxed version of the result of 
evaluating this expression.
+   */
+  protected case class EvaluatedExpression(
+  code: Seq[Tree],
+  nullTerm: TermName,
+  primitiveTerm: TermName,
+  objectTerm: TermName)
+
+  /**
+   * Given an expression tree returns an [[EvaluatedExpression]], which 
contains Scala trees that
+   * can be 

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333558
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import com.google.common.cache.{CacheLoader, CacheBuilder}
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code to perform expression 
evaluation.  Includes a set of
+ * helpers for referring to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends 
Logging {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  protected val toolBox = 
runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  protected val rowType = typeOf[Row]
+  protected val mutableRowType = typeOf[MutableRow]
+  protected val genericRowType = typeOf[GenericRow]
+  protected val genericMutableRowType = typeOf[GenericMutableRow]
+
+  protected val projectionType = typeOf[Projection]
+  protected val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeparator = $
+
+  /**
+   * Generates a class for a given input expression.  Called when there is 
not cached code
+   * already available.
+   */
+  protected def create(in: InType): OutType
+
+  /**
+   * Canonicalizes an input expression. Used to avoid double caching 
expressions that differ only
+   * cosmetically.
+   */
+  protected def canonicalize(in: InType): InType
+
+  /** Binds an input expression to a given input schema */
+  protected def bind(in: InType, inputSchema: Seq[Attribute]): InType
+
+  protected val cache = CacheBuilder.newBuilder()
+.maximumSize(1000)
+.build(
+  new CacheLoader[InType, OutType]() {
+override def load(in: InType): OutType = globalLock.synchronized {
+   create(in)
+}
+  })
+
+  def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType=
+apply(bind(expressions, inputSchema))
+
+  def apply(expressions: InType): OutType = 
cache.get(canonicalize(expressions))
+
+  /**
+   * Returns a term name that is unique within this instance of a 
`CodeGenerator`.
+   *
+   * (Since we aren't in a macro context we do not seem to have access to 
the built in `freshName`
+   * function.)
+   */
+  protected def freshName(prefix: String): TermName = {
+newTermName(s$prefix$javaSeparator${curId.getAndIncrement})
+  }
+
+  /**
+   * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input.
+   *
+   * @param code The sequence of statements required to evaluate the 
expression.
+   * @param nullTerm A term that holds a boolean value representing 
whether the expression evaluated
+   * to null.
+   * @param primitiveTerm A term for a possible primitive value of the 
result of the evaluation. Not
+   *  valid if `nullTerm` is set to `false`.
+   * @param objectTerm A possibly boxed version of the result of 
evaluating this expression.
+   */
+  protected case class EvaluatedExpression(
+  code: Seq[Tree],
+  nullTerm: TermName,
+  primitiveTerm: TermName,
+  objectTerm: TermName)
+
+  /**
+   * Given an expression tree returns an [[EvaluatedExpression]], which 
contains Scala trees that
+   * can be 

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333592
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
 ---
@@ -47,4 +47,30 @@ package org.apache.spark.sql.catalyst
  * ==Evaluation==
  * The result of expressions can be evaluated using the 
`Expression.apply(Row)` method.
  */
-package object expressions
+package object expressions  {
+
+  /**
+   * Converts a [[Row]] to another Row given a sequence of expression that 
define each column of the
+   * new row. If the schema of the input row is specified, then the given 
expression will be bound
+   * to that schema.
+   */
+  abstract class Projection extends (Row = Row)
+
+  /**
+   * Converts a [[Row]] to another Row given a sequence of expression that 
define each column of the
+   * new row. If the schema of the input row is specified, then the given 
expression will be bound
+   * to that schema.
+   *
+   * In contrast to a normal projection, a MutableProjection reuses the 
same underlying row object
+   * each time an input row is added.  This significantly reduces the cost 
of calculating the
+   * projection, but means that it is not safe to hold on to a reference 
to a [[Row]] after `next()`
+   * has been called on the [[Iterator]] that produced it. Instead, the 
user must call `Row.copy()`
+   * and hold on to the returned [[Row]] before calling `next()`.
+   */
+  abstract class MutableProjection extends Projection {
+def currentValue: Row
+
+/** Updates the target of this projection to a new MutableRow */
--- End diff --

maybe ```Uses the given row to store the output of the projection.``` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333688
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratedEvaluationSuite.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen._
+
+/**
+ * Overrides our expression evaluation tests to use code generation for 
evaluation.
+ */
+class GeneratedEvaluationSuite extends ExpressionEvaluationSuite {
+  override def checkEvaluation(
+  expression: Expression,
+  expected: Any,
+  inputRow: Row = EmptyRow): Unit = {
+val plan = try {
+  GenerateMutableProjection(Alias(expression, 
sOptimized($expression))() :: Nil)()
+} catch {
+  case e: Throwable =
+val evaluated = GenerateProjection.expressionEvaluator(expression)
+fail(
+  s
+|Code generation of $expression failed:
+|${evaluated.code.mkString(\n)}
+|$e
+  .stripMargin)
+}
+
+val actual  = plan(inputRow).apply(0)
+if(actual != expected) {
+  val input = if(inputRow == EmptyRow)  else s, input: $inputRow
+  fail(sIncorrect Evaluation: $expression, actual: $actual, expected: 
$expected$input)
+}
+  }
+
+
+  test(multithreaded eval) {
+import scala.concurrent._
+import ExecutionContext.Implicits.global
+import scala.concurrent.duration._
+
+val futures = (1 to 20).map { _ =
+  future {
+GeneratePredicate(EqualTo(Literal(1), Literal(1)))
+GenerateProjection(EqualTo(Literal(1), Literal(1)) :: Nil)
+GenerateMutableProjection(EqualTo(Literal(1), Literal(1)) :: Nil)
+GenerateOrdering(Add(Literal(1), Literal(1)).asc :: Nil)
+  }
+}
+
+futures.foreach(Await.result(_, 10.seconds))
+  }
+}
+
+/**
+ * Overrides our expression evaluation tests to use generated code on 
mutable rows.
+ */
+class GeneratedMutableEvaluationSuite extends ExpressionEvaluationSuite {
--- End diff --

Best to take this out into a separate file, since that's the common 
standard across all Spark modules (each test suite has its own file)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333694
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratedEvaluationSuite.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen._
+
+/**
+ * Overrides our expression evaluation tests to use code generation for 
evaluation.
+ */
+class GeneratedEvaluationSuite extends ExpressionEvaluationSuite {
+  override def checkEvaluation(
+  expression: Expression,
+  expected: Any,
+  inputRow: Row = EmptyRow): Unit = {
+val plan = try {
+  GenerateMutableProjection(Alias(expression, 
sOptimized($expression))() :: Nil)()
+} catch {
+  case e: Throwable =
+val evaluated = GenerateProjection.expressionEvaluator(expression)
+fail(
+  s
+|Code generation of $expression failed:
+|${evaluated.code.mkString(\n)}
+|$e
+  .stripMargin)
+}
+
+val actual  = plan(inputRow).apply(0)
+if(actual != expected) {
+  val input = if(inputRow == EmptyRow)  else s, input: $inputRow
+  fail(sIncorrect Evaluation: $expression, actual: $actual, expected: 
$expected$input)
+}
+  }
+
+
+  test(multithreaded eval) {
+import scala.concurrent._
+import ExecutionContext.Implicits.global
+import scala.concurrent.duration._
+
+val futures = (1 to 20).map { _ =
+  future {
+GeneratePredicate(EqualTo(Literal(1), Literal(1)))
+GenerateProjection(EqualTo(Literal(1), Literal(1)) :: Nil)
+GenerateMutableProjection(EqualTo(Literal(1), Literal(1)) :: Nil)
+GenerateOrdering(Add(Literal(1), Literal(1)).asc :: Nil)
+  }
+}
+
+futures.foreach(Await.result(_, 10.seconds))
+  }
+}
+
+/**
+ * Overrides our expression evaluation tests to use generated code on 
mutable rows.
+ */
+class GeneratedMutableEvaluationSuite extends ExpressionEvaluationSuite {
+  override def checkEvaluation(
+  expression: Expression,
+  expected: Any,
+  inputRow: Row = EmptyRow): Unit = {
+lazy val evaluated = GenerateProjection.expressionEvaluator(expression)
+
+val plan = try {
+  GenerateProjection(Alias(expression, sOptimized($expression))() :: 
Nil)
+} catch {
+  case e: Throwable =
+fail(
+  s
+|Code generation of $expression failed:
+|${evaluated.code.mkString(\n)}
+|$e
+  .stripMargin)
+}
+
+val actual = plan(inputRow)
+val expectedRow = new GenericRow(Array[Any](expected))
+if (actual.hashCode() != expectedRow.hashCode()) {
+  fail(
+s
+  |Mismatched hashCodes for values: $actual, $expectedRow
+  |Hash Codes: ${actual.hashCode()} != ${expectedRow.hashCode()}
+  |${evaluated.code.mkString(\n)}
+.stripMargin)
+}
+if (actual != expectedRow) {
+  val input = if(inputRow == EmptyRow)  else s, input: $inputRow
+  fail(sIncorrect Evaluation: $expression, actual: $actual, expected: 
$expected$input)
+}
+  }
+}
--- End diff --

nitpick - add a new line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333762
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala ---
@@ -47,23 +47,29 @@ case class Generate(
 }
   }
 
-  override def output =
+  // This must be a val since the generator output expr ids are not 
preserved by serialization.
+  override val output =
 if (join) child.output ++ generatorOutput else generatorOutput
 
+  val boundGenerator = BindReferences.bindReference(generator, 
child.output)
+
+  /** Codegenned rows are not serializable... */
--- End diff --

TODO?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333803
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala 
---
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.types._
+
+case class AggregateEvaluation(
+schema: Seq[Attribute],
+initialValues: Seq[Expression],
+update: Seq[Expression],
+result: Expression)
+
+/**
+ * :: DeveloperApi ::
+ * Alternate version of aggregation that leverages projection and thus 
code generation.
+ * Aggregations are converted into a set of projections from a aggregation 
buffer tuple back onto
+ * itself. Currently only used for simple aggregations like SUM, COUNT, or 
AVERAGE are supported.
+ *
+ * @param partial if true then aggregation is done partially on local data 
without shuffling to
+ *ensure all values where `groupingExpressions` are equal 
are present.
+ * @param groupingExpressions expressions that are evaluated to determine 
grouping.
+ * @param aggregateExpressions expressions that are computed for each 
group.
+ * @param child the input data source.
+ */
+@DeveloperApi
+case class GeneratedAggregate(
+partial: Boolean,
+groupingExpressions: Seq[Expression],
+aggregateExpressions: Seq[NamedExpression],
+child: SparkPlan)
+  extends UnaryNode {
+
+  override def requiredChildDistribution =
+if (partial) {
+  UnspecifiedDistribution :: Nil
+} else {
+  if (groupingExpressions == Nil) {
+AllTuples :: Nil
+  } else {
+ClusteredDistribution(groupingExpressions) :: Nil
+  }
+}
+
+  override def output = aggregateExpressions.map(_.toAttribute)
+
+  override def execute() = {
+val aggregatesToCompute = aggregateExpressions.flatMap { a =
+  a.collect { case agg: AggregateExpression = agg}
+}
+
+val computeFunctions = aggregatesToCompute.map {
+  case c @ Count(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val initialValue = Literal(0L)
+val updateFunction = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val result = currentCount
+
+AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case Sum(expr) =
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialValue = Cast(Literal(0L), expr.dataType)
+
+// Coalasce avoids double calculation...
+// but really, common sub expression elimination would be 
better
+val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum 
:: Nil)
+val result = currentSum
+
+AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case a @ Average(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialCount = Literal(0L)
+val initialSum = Cast(Literal(0L), expr.dataType)
+val updateCount = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: 
Nil)
+
+val result = Divide(Cast(currentSum, DoubleType), 
Cast(currentCount, DoubleType))
+
+AggregateEvaluation(
+  currentCount :: currentSum :: Nil,
+   

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333814
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala 
---
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.types._
+
+case class AggregateEvaluation(
+schema: Seq[Attribute],
+initialValues: Seq[Expression],
+update: Seq[Expression],
+result: Expression)
+
+/**
+ * :: DeveloperApi ::
+ * Alternate version of aggregation that leverages projection and thus 
code generation.
+ * Aggregations are converted into a set of projections from a aggregation 
buffer tuple back onto
+ * itself. Currently only used for simple aggregations like SUM, COUNT, or 
AVERAGE are supported.
+ *
+ * @param partial if true then aggregation is done partially on local data 
without shuffling to
+ *ensure all values where `groupingExpressions` are equal 
are present.
+ * @param groupingExpressions expressions that are evaluated to determine 
grouping.
+ * @param aggregateExpressions expressions that are computed for each 
group.
+ * @param child the input data source.
+ */
+@DeveloperApi
+case class GeneratedAggregate(
+partial: Boolean,
+groupingExpressions: Seq[Expression],
+aggregateExpressions: Seq[NamedExpression],
+child: SparkPlan)
+  extends UnaryNode {
+
+  override def requiredChildDistribution =
+if (partial) {
+  UnspecifiedDistribution :: Nil
+} else {
+  if (groupingExpressions == Nil) {
+AllTuples :: Nil
+  } else {
+ClusteredDistribution(groupingExpressions) :: Nil
+  }
+}
+
+  override def output = aggregateExpressions.map(_.toAttribute)
+
+  override def execute() = {
+val aggregatesToCompute = aggregateExpressions.flatMap { a =
+  a.collect { case agg: AggregateExpression = agg}
+}
+
+val computeFunctions = aggregatesToCompute.map {
+  case c @ Count(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val initialValue = Literal(0L)
+val updateFunction = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val result = currentCount
+
+AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case Sum(expr) =
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialValue = Cast(Literal(0L), expr.dataType)
+
+// Coalasce avoids double calculation...
+// but really, common sub expression elimination would be 
better
+val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum 
:: Nil)
+val result = currentSum
+
+AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case a @ Average(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialCount = Literal(0L)
+val initialSum = Cast(Literal(0L), expr.dataType)
+val updateCount = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: 
Nil)
+
+val result = Divide(Cast(currentSum, DoubleType), 
Cast(currentCount, DoubleType))
+
+AggregateEvaluation(
+  currentCount :: currentSum :: Nil,
+   

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333832
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala 
---
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.types._
+
+case class AggregateEvaluation(
+schema: Seq[Attribute],
+initialValues: Seq[Expression],
+update: Seq[Expression],
+result: Expression)
+
+/**
+ * :: DeveloperApi ::
+ * Alternate version of aggregation that leverages projection and thus 
code generation.
+ * Aggregations are converted into a set of projections from a aggregation 
buffer tuple back onto
+ * itself. Currently only used for simple aggregations like SUM, COUNT, or 
AVERAGE are supported.
+ *
+ * @param partial if true then aggregation is done partially on local data 
without shuffling to
+ *ensure all values where `groupingExpressions` are equal 
are present.
+ * @param groupingExpressions expressions that are evaluated to determine 
grouping.
+ * @param aggregateExpressions expressions that are computed for each 
group.
+ * @param child the input data source.
+ */
+@DeveloperApi
+case class GeneratedAggregate(
+partial: Boolean,
+groupingExpressions: Seq[Expression],
+aggregateExpressions: Seq[NamedExpression],
+child: SparkPlan)
+  extends UnaryNode {
+
+  override def requiredChildDistribution =
+if (partial) {
+  UnspecifiedDistribution :: Nil
+} else {
+  if (groupingExpressions == Nil) {
+AllTuples :: Nil
+  } else {
+ClusteredDistribution(groupingExpressions) :: Nil
+  }
+}
+
+  override def output = aggregateExpressions.map(_.toAttribute)
+
+  override def execute() = {
+val aggregatesToCompute = aggregateExpressions.flatMap { a =
+  a.collect { case agg: AggregateExpression = agg}
+}
+
+val computeFunctions = aggregatesToCompute.map {
+  case c @ Count(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val initialValue = Literal(0L)
+val updateFunction = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val result = currentCount
+
+AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case Sum(expr) =
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialValue = Cast(Literal(0L), expr.dataType)
+
+// Coalasce avoids double calculation...
+// but really, common sub expression elimination would be 
better
+val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum 
:: Nil)
+val result = currentSum
+
+AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case a @ Average(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialCount = Literal(0L)
+val initialSum = Cast(Literal(0L), expr.dataType)
+val updateCount = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: 
Nil)
+
+val result = Divide(Cast(currentSum, DoubleType), 
Cast(currentCount, DoubleType))
+
+AggregateEvaluation(
+  currentCount :: currentSum :: Nil,
+   

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333860
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala 
---
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.types._
+
+case class AggregateEvaluation(
+schema: Seq[Attribute],
+initialValues: Seq[Expression],
+update: Seq[Expression],
+result: Expression)
+
+/**
+ * :: DeveloperApi ::
+ * Alternate version of aggregation that leverages projection and thus 
code generation.
+ * Aggregations are converted into a set of projections from a aggregation 
buffer tuple back onto
+ * itself. Currently only used for simple aggregations like SUM, COUNT, or 
AVERAGE are supported.
+ *
+ * @param partial if true then aggregation is done partially on local data 
without shuffling to
+ *ensure all values where `groupingExpressions` are equal 
are present.
+ * @param groupingExpressions expressions that are evaluated to determine 
grouping.
+ * @param aggregateExpressions expressions that are computed for each 
group.
+ * @param child the input data source.
+ */
+@DeveloperApi
+case class GeneratedAggregate(
+partial: Boolean,
+groupingExpressions: Seq[Expression],
+aggregateExpressions: Seq[NamedExpression],
+child: SparkPlan)
+  extends UnaryNode {
+
+  override def requiredChildDistribution =
+if (partial) {
+  UnspecifiedDistribution :: Nil
+} else {
+  if (groupingExpressions == Nil) {
+AllTuples :: Nil
+  } else {
+ClusteredDistribution(groupingExpressions) :: Nil
+  }
+}
+
+  override def output = aggregateExpressions.map(_.toAttribute)
+
+  override def execute() = {
+val aggregatesToCompute = aggregateExpressions.flatMap { a =
+  a.collect { case agg: AggregateExpression = agg}
+}
+
+val computeFunctions = aggregatesToCompute.map {
+  case c @ Count(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val initialValue = Literal(0L)
+val updateFunction = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val result = currentCount
+
+AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case Sum(expr) =
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialValue = Cast(Literal(0L), expr.dataType)
+
+// Coalasce avoids double calculation...
+// but really, common sub expression elimination would be 
better
+val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum 
:: Nil)
+val result = currentSum
+
+AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case a @ Average(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialCount = Literal(0L)
+val initialSum = Cast(Literal(0L), expr.dataType)
+val updateCount = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: 
Nil)
+
+val result = Divide(Cast(currentSum, DoubleType), 
Cast(currentCount, DoubleType))
+
+AggregateEvaluation(
+  currentCount :: currentSum :: Nil,
+   

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333900
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala 
---
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.types._
+
+case class AggregateEvaluation(
+schema: Seq[Attribute],
+initialValues: Seq[Expression],
+update: Seq[Expression],
+result: Expression)
+
+/**
+ * :: DeveloperApi ::
+ * Alternate version of aggregation that leverages projection and thus 
code generation.
+ * Aggregations are converted into a set of projections from a aggregation 
buffer tuple back onto
+ * itself. Currently only used for simple aggregations like SUM, COUNT, or 
AVERAGE are supported.
+ *
+ * @param partial if true then aggregation is done partially on local data 
without shuffling to
+ *ensure all values where `groupingExpressions` are equal 
are present.
+ * @param groupingExpressions expressions that are evaluated to determine 
grouping.
+ * @param aggregateExpressions expressions that are computed for each 
group.
+ * @param child the input data source.
+ */
+@DeveloperApi
+case class GeneratedAggregate(
+partial: Boolean,
+groupingExpressions: Seq[Expression],
+aggregateExpressions: Seq[NamedExpression],
+child: SparkPlan)
+  extends UnaryNode {
+
+  override def requiredChildDistribution =
+if (partial) {
+  UnspecifiedDistribution :: Nil
+} else {
+  if (groupingExpressions == Nil) {
+AllTuples :: Nil
+  } else {
+ClusteredDistribution(groupingExpressions) :: Nil
+  }
+}
+
+  override def output = aggregateExpressions.map(_.toAttribute)
+
+  override def execute() = {
+val aggregatesToCompute = aggregateExpressions.flatMap { a =
+  a.collect { case agg: AggregateExpression = agg}
+}
+
+val computeFunctions = aggregatesToCompute.map {
+  case c @ Count(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val initialValue = Literal(0L)
+val updateFunction = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val result = currentCount
+
+AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case Sum(expr) =
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialValue = Cast(Literal(0L), expr.dataType)
+
+// Coalasce avoids double calculation...
+// but really, common sub expression elimination would be 
better
+val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum 
:: Nil)
+val result = currentSum
+
+AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case a @ Average(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialCount = Literal(0L)
+val initialSum = Cast(Literal(0L), expr.dataType)
+val updateCount = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: 
Nil)
+
+val result = Divide(Cast(currentSum, DoubleType), 
Cast(currentCount, DoubleType))
+
+AggregateEvaluation(
+  currentCount :: currentSum :: Nil,
+   

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333888
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala 
---
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.catalyst.types._
+
+case class AggregateEvaluation(
+schema: Seq[Attribute],
+initialValues: Seq[Expression],
+update: Seq[Expression],
+result: Expression)
+
+/**
+ * :: DeveloperApi ::
+ * Alternate version of aggregation that leverages projection and thus 
code generation.
+ * Aggregations are converted into a set of projections from a aggregation 
buffer tuple back onto
+ * itself. Currently only used for simple aggregations like SUM, COUNT, or 
AVERAGE are supported.
+ *
+ * @param partial if true then aggregation is done partially on local data 
without shuffling to
+ *ensure all values where `groupingExpressions` are equal 
are present.
+ * @param groupingExpressions expressions that are evaluated to determine 
grouping.
+ * @param aggregateExpressions expressions that are computed for each 
group.
+ * @param child the input data source.
+ */
+@DeveloperApi
+case class GeneratedAggregate(
+partial: Boolean,
+groupingExpressions: Seq[Expression],
+aggregateExpressions: Seq[NamedExpression],
+child: SparkPlan)
+  extends UnaryNode {
+
+  override def requiredChildDistribution =
+if (partial) {
+  UnspecifiedDistribution :: Nil
+} else {
+  if (groupingExpressions == Nil) {
+AllTuples :: Nil
+  } else {
+ClusteredDistribution(groupingExpressions) :: Nil
+  }
+}
+
+  override def output = aggregateExpressions.map(_.toAttribute)
+
+  override def execute() = {
+val aggregatesToCompute = aggregateExpressions.flatMap { a =
+  a.collect { case agg: AggregateExpression = agg}
+}
+
+val computeFunctions = aggregatesToCompute.map {
+  case c @ Count(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val initialValue = Literal(0L)
+val updateFunction = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val result = currentCount
+
+AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case Sum(expr) =
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialValue = Cast(Literal(0L), expr.dataType)
+
+// Coalasce avoids double calculation...
+// but really, common sub expression elimination would be 
better
+val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum 
:: Nil)
+val result = currentSum
+
+AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, 
updateFunction :: Nil, result)
+
+  case a @ Average(expr) =
+val currentCount = AttributeReference(currentCount, LongType, 
nullable = false)()
+val currentSum = AttributeReference(currentSum, expr.dataType, 
nullable = false)()
+val initialCount = Literal(0L)
+val initialSum = Cast(Literal(0L), expr.dataType)
+val updateCount = If(IsNotNull(expr), Add(currentCount, 
Literal(1L)), currentCount)
+val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: 
Nil)
+
+val result = Divide(Cast(currentSum, DoubleType), 
Cast(currentCount, DoubleType))
+
+AggregateEvaluation(
+  currentCount :: currentSum :: Nil,
+   

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333936
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -18,22 +18,53 @@
 package org.apache.spark.sql.execution
 
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.Logging
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Logging, Row}
+import org.apache.spark.sql.{SQLContext, Row}
 import org.apache.spark.sql.catalyst.trees
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.expressions.GenericRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.BaseRelation
 import org.apache.spark.sql.catalyst.plans.physical._
 
+
+object SparkPlan {
+  protected[sql] val currentContext = new ThreadLocal[SQLContext]()
+}
+
 /**
  * :: DeveloperApi ::
  */
 @DeveloperApi
-abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging {
+abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with 
Serializable {
   self: Product =
 
+  /**
+   * A handle to the SQL Context that was used to create this plan.   
Since many operators need
+   * access to the sqlContext for RDD operations or configuration this 
field is automatically
+   * populated by the query planning infrastructure.
+   */
+  @transient
+  protected val sqlContext = SparkPlan.currentContext.get()
+
+  protected def sparkContext = sqlContext.sparkContext
+
+  def logger = log
+
+  val codegenEnabled: Boolean = if(sqlContext != null) {
--- End diff --

when is context null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333966
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -51,8 +82,46 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging {
*/
   def executeCollect(): Array[Row] = execute().map(_.copy()).collect()
 
-  protected def buildRow(values: Seq[Any]): Row =
-new GenericRow(values.toArray)
+  protected def newProjection(
+  expressions: Seq[Expression], inputSchema: Seq[Attribute]): 
Projection = {
+log.debug(
+  sCreating Projection: $expressions, inputSchema: $inputSchema, 
codegen:$codegenEnabled)
+if (codegenEnabled) {
+  GenerateProjection(expressions, inputSchema)
+} else {
+  new InterpretedProjection(expressions, inputSchema)
+}
+  }
+
+  protected def newMutableProjection(
+  expressions: Seq[Expression],
+  inputSchema: Seq[Attribute]): () = MutableProjection = {
+log.debug(
+  sCreating MutableProj: $expressions, inputSchema: $inputSchema, 
codegen:$codegenEnabled)
+if(codegenEnabled) {
+  GenerateMutableProjection(expressions, inputSchema)
--- End diff --

This is another use of apply that is very confusing, because it is not 
obvious GenerateMutableProjection is a closure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15333978
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -192,9 +187,9 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 val relation =
   ParquetRelation.create(path, child, 
sparkContext.hadoopConfiguration)
 // Note: overwrite=false because otherwise the metadata we just 
created will be deleted
-InsertIntoParquetTable(relation, planLater(child), 
overwrite=false)(sqlContext) :: Nil
+InsertIntoParquetTable(relation, planLater(child), 
overwrite=false) :: Nil
--- End diff --

nit: space after/before =


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15334051
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
@@ -300,8 +298,16 @@ case class LeftSemiJoinBNL(
 case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends 
BinaryNode {
   def output = left.output ++ right.output
 
-  def execute() = 
left.execute().map(_.copy()).cartesian(right.execute().map(_.copy())).map {
-case (l: Row, r: Row) = buildRow(l ++ r)
+  def execute() = {
+val leftResults = left.execute().map(_.copy())
+val rightResults = right.execute().map(_.copy())
+
+leftResults.cartesian(rightResults).mapPartitions { iter =
+  val joinedRow = new JoinedRow
+  iter.map {
+case (l: Row, r: Row) = joinedRow(l, r)
--- End diff --

maybe perf doesn't matter too much here since it is a cartesian product 
already, but you can remove the pattern matching to improve perf.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15334075
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala ---
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.parquet
 
+import org.apache.spark.sql.execution.SparkPlan
--- End diff --

import order


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-24 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15334407
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 ---
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import com.google.common.cache.{CacheLoader, CacheBuilder}
+
+import scala.language.existentials
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * A base class for generators of byte code to perform expression 
evaluation.  Includes a set of
+ * helpers for referring to Catalyst types and building trees that perform 
evaluation of individual
+ * expressions.
+ */
+abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends 
Logging {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  import scala.tools.reflect.ToolBox
+
+  protected val toolBox = 
runtimeMirror(getClass.getClassLoader).mkToolBox()
+
+  protected val rowType = typeOf[Row]
+  protected val mutableRowType = typeOf[MutableRow]
+  protected val genericRowType = typeOf[GenericRow]
+  protected val genericMutableRowType = typeOf[GenericMutableRow]
+
+  protected val projectionType = typeOf[Projection]
+  protected val mutableProjectionType = typeOf[MutableProjection]
+
+  private val curId = new java.util.concurrent.atomic.AtomicInteger()
+  private val javaSeparator = $
+
+  /**
+   * Generates a class for a given input expression.  Called when there is 
not cached code
+   * already available.
+   */
+  protected def create(in: InType): OutType
+
+  /**
+   * Canonicalizes an input expression. Used to avoid double caching 
expressions that differ only
+   * cosmetically.
+   */
+  protected def canonicalize(in: InType): InType
+
+  /** Binds an input expression to a given input schema */
+  protected def bind(in: InType, inputSchema: Seq[Attribute]): InType
+
+  protected val cache = CacheBuilder.newBuilder()
+.maximumSize(1000)
+.build(
+  new CacheLoader[InType, OutType]() {
+override def load(in: InType): OutType = globalLock.synchronized {
+   create(in)
+}
+  })
+
+  def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType=
+apply(bind(expressions, inputSchema))
+
+  def apply(expressions: InType): OutType = 
cache.get(canonicalize(expressions))
+
+  /**
+   * Returns a term name that is unique within this instance of a 
`CodeGenerator`.
+   *
+   * (Since we aren't in a macro context we do not seem to have access to 
the built in `freshName`
+   * function.)
+   */
+  protected def freshName(prefix: String): TermName = {
+newTermName(s$prefix$javaSeparator${curId.getAndIncrement})
+  }
+
+  /**
+   * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input.
+   *
+   * @param code The sequence of statements required to evaluate the 
expression.
+   * @param nullTerm A term that holds a boolean value representing 
whether the expression evaluated
+   * to null.
+   * @param primitiveTerm A term for a possible primitive value of the 
result of the evaluation. Not
+   *  valid if `nullTerm` is set to `false`.
+   * @param objectTerm A possibly boxed version of the result of 
evaluating this expression.
+   */
+  protected case class EvaluatedExpression(
+  code: Seq[Tree],
+  nullTerm: TermName,
+  primitiveTerm: TermName,
+  objectTerm: TermName)
+
+  /**
+   * Given an expression tree returns an [[EvaluatedExpression]], which 
contains Scala trees that
+   * 

[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/993#issuecomment-49949147
  
QA tests have started for PR 993. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17066/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-23 Thread marmbrus
Github user marmbrus commented on the pull request:

https://github.com/apache/spark/pull/993#issuecomment-49949454
  
Okay, I think this is getting pretty close to merging in.  I've tested it 
with codegen turned on and it passes all of the tests.  I've also built it 
successfully with both maven and sbt.  I've also tried to address all of the 
previous comments.

@pwendell, I'd appreciate it if you could take a quick glance at the build 
changes.  Though they ended up being pretty minimal.
@rxin @concretevitamin @yhuai @liancheng I think you have all already 
looked at this, but if there is any other feedback please let me know.

I'd like to merge this in the next day or so as it makes some pretty 
sweeping changes which might cause conflicts as we get closer to the merge 
deadline.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-23 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15322479
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala
 ---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.codegen
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
+
+
+/**
+ * Generates bytecode that produces a new [[Row]] object based on a fixed 
set of input
+ * [[Expression Expressions]] and a given input [[Row]].  The returned 
[[Row]] object is custom
+ * generated based on the output types of the [[Expression]] to avoid 
boxing of primitive values.
+ */
+object GenerateProjection extends CodeGenerator[Seq[Expression], 
Projection] {
+  import scala.reflect.runtime.{universe = ru}
+  import scala.reflect.runtime.universe._
+
+  protected def canonicalize(in: Seq[Expression]): Seq[Expression] =
+in.map(ExpressionCanonicalizer(_))
+
+  protected def bind(in: Seq[Expression], inputSchema: Seq[Attribute]): 
Seq[Expression] =
+in.map(BindReferences.bindReference(_, inputSchema))
+
+  // Make Mutablility optional...
+  protected def create(expressions: Seq[Expression]): Projection = {
+val tupleLength = ru.Literal(Constant(expressions.length))
+val lengthDef = qfinal val length = $tupleLength
+
+/* TODO: Configurable...
+val nullFunctions =
+  q
+private final val nullSet = new 
org.apache.spark.util.collection.BitSet(length)
+final def setNullAt(i: Int) = nullSet.set(i)
--- End diff --

I'd be pretty surprised if the JIT didn't take care of that for a function 
this small.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...

2014-07-23 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/993#discussion_r15330980
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
 ---
@@ -28,61 +30,27 @@ import org.apache.spark.sql.Logging
  * to be retrieved more efficiently.  However, since operations like 
column pruning can change
  * the layout of intermediate tuples, BindReferences should be run after 
all such transformations.
  */
-case class BoundReference(ordinal: Int, baseReference: Attribute)
-  extends Attribute with trees.LeafNode[Expression] {
+case class BoundReference(ordinal: Int, dataType: DataType, nullable: 
Boolean)
+  extends Expression with trees.LeafNode[Expression] {
 
   type EvaluatedType = Any
 
-  override def nullable = baseReference.nullable
-  override def dataType = baseReference.dataType
-  override def exprId = baseReference.exprId
-  override def qualifiers = baseReference.qualifiers
-  override def name = baseReference.name
+  def references = Set.empty
--- End diff --

override here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---