[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user ueshin commented on the pull request: https://github.com/apache/spark/pull/993#issuecomment-50578802 Hi @marmbrus, thanks for great work! But it seems to break build. I got the following result when I run `sbt assembly` or `sbt publish-local`: ``` [error] (catalyst/compile:doc) Scaladoc generation failed ``` and I found a lot of error messages in the build log saying `value q is not a member of StringContext`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user marmbrus commented on the pull request: https://github.com/apache/spark/pull/993#issuecomment-50580695 @ueshin thanks for reporting. I've opened #1653 to fix this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/993#issuecomment-50534281 QA tests have started for PR 993. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17376/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user marmbrus commented on the pull request: https://github.com/apache/spark/pull/993#issuecomment-50563076 test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/993#issuecomment-50563149 QA tests have started for PR 993. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17402/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user marmbrus commented on the pull request: https://github.com/apache/spark/pull/993#issuecomment-50571300 Thanks for looking at this everyone. I've merged it into master! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/993 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/993#issuecomment-50379727 QA tests have started for PR 993. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17295/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/993#issuecomment-50287338 QA tests have started for PR 993. This patch DID NOT merge cleanly! brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17251/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/993#issuecomment-50287472 QA tests have started for PR 993. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17252/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/993#issuecomment-50290093 QA results for PR 993:br- This patch PASSES unit tests.brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17251/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user liancheng commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15444048 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala --- @@ -39,6 +39,18 @@ trait SQLConf { private[spark] def numShufflePartitions: Int = get(SHUFFLE_PARTITIONS, 200).toInt /** + * When set to true, Spark SQL will use the Scala compiler at runtime to generate custom bytecode + * that evaluates expressions found in queries. In general this custom code runs much faster + * than interpreted evaluation, but there are significant start-up costs due to compilation. + * As a result codegen is only benificial when queries run for a long time, or when the same + * expressions are used multiple times. + * + * Defaults to false as this feature is currently experimental. + */ + private[spark] def codegenEnabled: Boolean = +if (get(spark.sql.codegen, false) == true) true else false --- End diff -- Collected all Spark SQL configurations properties in [`object SQLConf`](https://github.com/apache/spark/blob/81fcdd22c8ef52889ed51b3ec5c2747708505fc2/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala#L94-L102) in the JDBC Thrift server PR. We can put this one there too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15435513 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import com.google.common.cache.{CacheLoader, CacheBuilder} + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code to perform expression evaluation. Includes a set of + * helpers for referring to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Logging { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + protected val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + protected val rowType = typeOf[Row] + protected val mutableRowType = typeOf[MutableRow] + protected val genericRowType = typeOf[GenericRow] + protected val genericMutableRowType = typeOf[GenericMutableRow] + + protected val projectionType = typeOf[Projection] + protected val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeparator = $ + + /** + * Generates a class for a given input expression. Called when there is not cached code + * already available. + */ + protected def create(in: InType): OutType + + /** + * Canonicalizes an input expression. Used to avoid double caching expressions that differ only + * cosmetically. + */ + protected def canonicalize(in: InType): InType + + /** Binds an input expression to a given input schema */ + protected def bind(in: InType, inputSchema: Seq[Attribute]): InType + + protected val cache = CacheBuilder.newBuilder() +.maximumSize(1000) +.build( + new CacheLoader[InType, OutType]() { +override def load(in: InType): OutType = globalLock.synchronized { + create(in) +} + }) + + def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType= --- End diff -- I'll agree with you that the use of `apply` after a `target is a little confusing, but in all the other cases we are using apply in places where the object is acting like a stateful function. In this particular case the apply is so you can do something like `GenerateProjection(...)`, which is pretty standard factory method style. I have added a comment though to clarify what this function does. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15435514 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + + +/** + * Generates bytecode that produces a new [[Row]] object based on a fixed set of input + * [[Expression Expressions]] and a given input [[Row]]. The returned [[Row]] object is custom + * generated based on the output types of the [[Expression]] to avoid boxing of primitive values. + */ +object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + protected def canonicalize(in: Seq[Expression]): Seq[Expression] = +in.map(ExpressionCanonicalizer(_)) + + protected def bind(in: Seq[Expression], inputSchema: Seq[Attribute]): Seq[Expression] = +in.map(BindReferences.bindReference(_, inputSchema)) + + // Make Mutablility optional... + protected def create(expressions: Seq[Expression]): Projection = { +val tupleLength = ru.Literal(Constant(expressions.length)) +val lengthDef = qfinal val length = $tupleLength + +/* TODO: Configurable... +val nullFunctions = + q +private final val nullSet = new org.apache.spark.util.collection.BitSet(length) +final def setNullAt(i: Int) = nullSet.set(i) +final def isNullAt(i: Int) = nullSet.get(i) + + */ + +val nullFunctions = + q +private[this] var nullBits = new Array[Boolean](${expressions.size}) +final def setNullAt(i: Int) = { nullBits(i) = true } +final def isNullAt(i: Int) = nullBits(i) + .children + +val tupleElements = expressions.zipWithIndex.flatMap { + case (e, i) = +val elementName = newTermName(sc$i) +val evaluatedExpression = expressionEvaluator(e) --- End diff -- `CodeGenerator` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15435527 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import com.google.common.cache.{CacheLoader, CacheBuilder} + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code to perform expression evaluation. Includes a set of + * helpers for referring to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Logging { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + protected val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + protected val rowType = typeOf[Row] + protected val mutableRowType = typeOf[MutableRow] + protected val genericRowType = typeOf[GenericRow] + protected val genericMutableRowType = typeOf[GenericMutableRow] + + protected val projectionType = typeOf[Projection] + protected val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeparator = $ + + /** + * Generates a class for a given input expression. Called when there is not cached code + * already available. + */ + protected def create(in: InType): OutType + + /** + * Canonicalizes an input expression. Used to avoid double caching expressions that differ only + * cosmetically. + */ + protected def canonicalize(in: InType): InType + + /** Binds an input expression to a given input schema */ + protected def bind(in: InType, inputSchema: Seq[Attribute]): InType + + protected val cache = CacheBuilder.newBuilder() +.maximumSize(1000) +.build( + new CacheLoader[InType, OutType]() { +override def load(in: InType): OutType = globalLock.synchronized { + create(in) +} + }) + + def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType= +apply(bind(expressions, inputSchema)) + + def apply(expressions: InType): OutType = cache.get(canonicalize(expressions)) + + /** + * Returns a term name that is unique within this instance of a `CodeGenerator`. + * + * (Since we aren't in a macro context we do not seem to have access to the built in `freshName` + * function.) + */ + protected def freshName(prefix: String): TermName = { +newTermName(s$prefix$javaSeparator${curId.getAndIncrement}) + } + + /** + * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input. + * + * @param code The sequence of statements required to evaluate the expression. + * @param nullTerm A term that holds a boolean value representing whether the expression evaluated + * to null. + * @param primitiveTerm A term for a possible primitive value of the result of the evaluation. Not + * valid if `nullTerm` is set to `false`. + * @param objectTerm A possibly boxed version of the result of evaluating this expression. + */ + protected case class EvaluatedExpression( + code: Seq[Tree], + nullTerm: TermName, + primitiveTerm: TermName, + objectTerm: TermName) + + /** + * Given an expression tree returns an [[EvaluatedExpression]], which contains Scala trees that + * can
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15435534 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import com.google.common.cache.{CacheLoader, CacheBuilder} + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code to perform expression evaluation. Includes a set of + * helpers for referring to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Logging { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + protected val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + protected val rowType = typeOf[Row] + protected val mutableRowType = typeOf[MutableRow] + protected val genericRowType = typeOf[GenericRow] + protected val genericMutableRowType = typeOf[GenericMutableRow] + + protected val projectionType = typeOf[Projection] + protected val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeparator = $ + + /** + * Generates a class for a given input expression. Called when there is not cached code + * already available. + */ + protected def create(in: InType): OutType + + /** + * Canonicalizes an input expression. Used to avoid double caching expressions that differ only + * cosmetically. + */ + protected def canonicalize(in: InType): InType + + /** Binds an input expression to a given input schema */ + protected def bind(in: InType, inputSchema: Seq[Attribute]): InType + + protected val cache = CacheBuilder.newBuilder() +.maximumSize(1000) +.build( + new CacheLoader[InType, OutType]() { +override def load(in: InType): OutType = globalLock.synchronized { + create(in) +} + }) + + def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType= +apply(bind(expressions, inputSchema)) + + def apply(expressions: InType): OutType = cache.get(canonicalize(expressions)) + + /** + * Returns a term name that is unique within this instance of a `CodeGenerator`. + * + * (Since we aren't in a macro context we do not seem to have access to the built in `freshName` + * function.) + */ + protected def freshName(prefix: String): TermName = { +newTermName(s$prefix$javaSeparator${curId.getAndIncrement}) + } + + /** + * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input. + * + * @param code The sequence of statements required to evaluate the expression. + * @param nullTerm A term that holds a boolean value representing whether the expression evaluated + * to null. + * @param primitiveTerm A term for a possible primitive value of the result of the evaluation. Not + * valid if `nullTerm` is set to `false`. + * @param objectTerm A possibly boxed version of the result of evaluating this expression. + */ + protected case class EvaluatedExpression( + code: Seq[Tree], + nullTerm: TermName, + primitiveTerm: TermName, + objectTerm: TermName) + + /** + * Given an expression tree returns an [[EvaluatedExpression]], which contains Scala trees that + * can
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15435573 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratedEvaluationSuite.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen._ + +/** + * Overrides our expression evaluation tests to use code generation for evaluation. + */ +class GeneratedEvaluationSuite extends ExpressionEvaluationSuite { + override def checkEvaluation( + expression: Expression, + expected: Any, + inputRow: Row = EmptyRow): Unit = { +val plan = try { + GenerateMutableProjection(Alias(expression, sOptimized($expression))() :: Nil)() +} catch { + case e: Throwable = +val evaluated = GenerateProjection.expressionEvaluator(expression) +fail( + s +|Code generation of $expression failed: +|${evaluated.code.mkString(\n)} +|$e + .stripMargin) +} + +val actual = plan(inputRow).apply(0) +if(actual != expected) { + val input = if(inputRow == EmptyRow) else s, input: $inputRow + fail(sIncorrect Evaluation: $expression, actual: $actual, expected: $expected$input) +} + } + + + test(multithreaded eval) { +import scala.concurrent._ +import ExecutionContext.Implicits.global +import scala.concurrent.duration._ + +val futures = (1 to 20).map { _ = + future { +GeneratePredicate(EqualTo(Literal(1), Literal(1))) +GenerateProjection(EqualTo(Literal(1), Literal(1)) :: Nil) +GenerateMutableProjection(EqualTo(Literal(1), Literal(1)) :: Nil) +GenerateOrdering(Add(Literal(1), Literal(1)).asc :: Nil) + } +} + +futures.foreach(Await.result(_, 10.seconds)) + } +} + +/** + * Overrides our expression evaluation tests to use generated code on mutable rows. + */ +class GeneratedMutableEvaluationSuite extends ExpressionEvaluationSuite { + override def checkEvaluation( + expression: Expression, + expected: Any, + inputRow: Row = EmptyRow): Unit = { +lazy val evaluated = GenerateProjection.expressionEvaluator(expression) + +val plan = try { + GenerateProjection(Alias(expression, sOptimized($expression))() :: Nil) +} catch { + case e: Throwable = +fail( + s +|Code generation of $expression failed: +|${evaluated.code.mkString(\n)} +|$e + .stripMargin) +} + +val actual = plan(inputRow) +val expectedRow = new GenericRow(Array[Any](expected)) +if (actual.hashCode() != expectedRow.hashCode()) { + fail( +s + |Mismatched hashCodes for values: $actual, $expectedRow + |Hash Codes: ${actual.hashCode()} != ${expectedRow.hashCode()} + |${evaluated.code.mkString(\n)} +.stripMargin) +} +if (actual != expectedRow) { + val input = if(inputRow == EmptyRow) else s, input: $inputRow + fail(sIncorrect Evaluation: $expression, actual: $actual, expected: $expected$input) +} + } +} --- End diff -- Yeah, we should make scalastyle check test files too... though I'm afraid there will be a fair amount of work to make that pass. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15435619 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -51,8 +82,46 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging { */ def executeCollect(): Array[Row] = execute().map(_.copy()).collect() - protected def buildRow(values: Seq[Any]): Row = -new GenericRow(values.toArray) + protected def newProjection( + expressions: Seq[Expression], inputSchema: Seq[Attribute]): Projection = { +log.debug( + sCreating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled) +if (codegenEnabled) { + GenerateProjection(expressions, inputSchema) +} else { + new InterpretedProjection(expressions, inputSchema) +} + } + + protected def newMutableProjection( + expressions: Seq[Expression], + inputSchema: Seq[Attribute]): () = MutableProjection = { +log.debug( + sCreating MutableProj: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled) +if(codegenEnabled) { + GenerateMutableProjection(expressions, inputSchema) --- End diff -- This is standard factory method in an object. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15435630 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.types._ + +case class AggregateEvaluation( +schema: Seq[Attribute], +initialValues: Seq[Expression], +update: Seq[Expression], +result: Expression) + +/** + * :: DeveloperApi :: + * Alternate version of aggregation that leverages projection and thus code generation. + * Aggregations are converted into a set of projections from a aggregation buffer tuple back onto + * itself. Currently only used for simple aggregations like SUM, COUNT, or AVERAGE are supported. + * + * @param partial if true then aggregation is done partially on local data without shuffling to + *ensure all values where `groupingExpressions` are equal are present. + * @param groupingExpressions expressions that are evaluated to determine grouping. + * @param aggregateExpressions expressions that are computed for each group. + * @param child the input data source. + */ +@DeveloperApi +case class GeneratedAggregate( +partial: Boolean, +groupingExpressions: Seq[Expression], +aggregateExpressions: Seq[NamedExpression], +child: SparkPlan) + extends UnaryNode { + + override def requiredChildDistribution = +if (partial) { + UnspecifiedDistribution :: Nil +} else { + if (groupingExpressions == Nil) { +AllTuples :: Nil + } else { +ClusteredDistribution(groupingExpressions) :: Nil + } +} + + override def output = aggregateExpressions.map(_.toAttribute) + + override def execute() = { +val aggregatesToCompute = aggregateExpressions.flatMap { a = + a.collect { case agg: AggregateExpression = agg} +} + +val computeFunctions = aggregatesToCompute.map { + case c @ Count(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val initialValue = Literal(0L) +val updateFunction = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val result = currentCount + +AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case Sum(expr) = +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialValue = Cast(Literal(0L), expr.dataType) + +// Coalasce avoids double calculation... +// but really, common sub expression elimination would be better +val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) +val result = currentSum + +AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case a @ Average(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialCount = Literal(0L) +val initialSum = Cast(Literal(0L), expr.dataType) +val updateCount = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) + +val result = Divide(Cast(currentSum, DoubleType), Cast(currentCount, DoubleType)) + +AggregateEvaluation( + currentCount :: currentSum :: Nil,
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/993#issuecomment-50247318 QA tests have started for PR 993. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17227/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user marmbrus commented on the pull request: https://github.com/apache/spark/pull/993#issuecomment-50247741 Hey @rxin, thanks for the careful review! I think I've addressed most of your comments. Regarding the GeneratedAggregate code, I'm happy to sit down and explain in more detail at some point. One thing to note is that its only used in a few circumstances at the moment, and its pretty easy to switch off if we find it causes problems in the future. That said, I think there is the possibility of pretty huge speed up here. Here's an example that demonstrates how the rewrite happens for a simple query: `hql(SELECT AVG(key) + 1 FROM src GROUP BY value)` Partial Aggregation: ``` Initial values: 0,CAST(0, LongType) Grouping Projection: value#25 Update Expressions: if (IS NOT NULL CAST(key#24, LongType)) (currentCount#30L + 1) else currentCount#30L,Coalesce((CAST(key#24, LongType) + currentSum#31L),currentSum#31L) Result Projection: value#25,currentCount#30L AS PartialCount#27L,currentSum#31L AS PartialSum#26L ``` The updates compute the new currentSum and currentCount given an update buffer joined with the input row. Final aggregation: ``` Initial values: CAST(0, LongType),CAST(0, LongType) Grouping Projection: value#25 Update Expressions: Coalesce((PartialSum#26L + currentSum#28L),currentSum#28L),Coalesce((PartialCount#27L + currentSum#29L),currentSum#29L) Result Projection: ((CAST(currentSum#28L, DoubleType) / CAST(currentSum#29L, DoubleType)) + 1.0) AS c_0#22 ``` The updates calculate the sum of the counts and partial sums. The result divides them and adds 1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15437450 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala --- @@ -47,23 +47,29 @@ case class Generate( } } - override def output = + // This must be a val since the generator output expr ids are not preserved by serialization. + override val output = if (join) child.output ++ generatorOutput else generatorOutput + val boundGenerator = BindReferences.bindReference(generator, child.output) + + /** Codegenned rows are not serializable... */ --- End diff -- Actually it was a misunderstanding (the comment is misleading). I thought you'd want to code gen them in the fturue when we make it serializable. maybe expand the comment to explain why codegen is disabled (what's the relationship between serializable and generate?) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15437455 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -18,22 +18,53 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Logging, Row} +import org.apache.spark.sql.{SQLContext, Row} import org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.GenericRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.BaseRelation import org.apache.spark.sql.catalyst.plans.physical._ + +object SparkPlan { + protected[sql] val currentContext = new ThreadLocal[SQLContext]() +} + /** * :: DeveloperApi :: */ @DeveloperApi -abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging { +abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable { self: Product = + /** + * A handle to the SQL Context that was used to create this plan. Since many operators need + * access to the sqlContext for RDD operations or configuration this field is automatically + * populated by the query planning infrastructure. + */ + @transient + protected val sqlContext = SparkPlan.currentContext.get() + + protected def sparkContext = sqlContext.sparkContext + + def logger = log + + val codegenEnabled: Boolean = if(sqlContext != null) { --- End diff -- would be great to just add that as inline comment. also space after if --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15331132 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUdf.scala --- @@ -29,6 +29,7 @@ case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expressi override def eval(input: Row): Any = { children.size match { + case 0 = function.asInstanceOf[() = Any]() --- End diff -- this is for another time, but if you add an explicitly init to expressions, we can move all of these branches from the inner loop (once per row) directly to the outer loop (once per partition). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15331160 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import com.google.common.cache.{CacheLoader, CacheBuilder} + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code to perform expression evaluation. Includes a set of + * helpers for referring to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Logging { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + protected val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + protected val rowType = typeOf[Row] + protected val mutableRowType = typeOf[MutableRow] + protected val genericRowType = typeOf[GenericRow] + protected val genericMutableRowType = typeOf[GenericMutableRow] + + protected val projectionType = typeOf[Projection] + protected val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeparator = $ + + /** + * Generates a class for a given input expression. Called when there is not cached code + * already available. + */ + protected def create(in: InType): OutType + + /** + * Canonicalizes an input expression. Used to avoid double caching expressions that differ only + * cosmetically. + */ + protected def canonicalize(in: InType): InType + + /** Binds an input expression to a given input schema */ + protected def bind(in: InType, inputSchema: Seq[Attribute]): InType + + protected val cache = CacheBuilder.newBuilder() --- End diff -- we should comment on the behavior of this cache so readers don't have to go read Guava documentation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15331362 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import com.google.common.cache.{CacheLoader, CacheBuilder} + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code to perform expression evaluation. Includes a set of + * helpers for referring to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Logging { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + protected val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + protected val rowType = typeOf[Row] + protected val mutableRowType = typeOf[MutableRow] + protected val genericRowType = typeOf[GenericRow] + protected val genericMutableRowType = typeOf[GenericMutableRow] + + protected val projectionType = typeOf[Projection] + protected val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeparator = $ + + /** + * Generates a class for a given input expression. Called when there is not cached code + * already available. + */ + protected def create(in: InType): OutType + + /** + * Canonicalizes an input expression. Used to avoid double caching expressions that differ only + * cosmetically. + */ + protected def canonicalize(in: InType): InType + + /** Binds an input expression to a given input schema */ + protected def bind(in: InType, inputSchema: Seq[Attribute]): InType + + protected val cache = CacheBuilder.newBuilder() +.maximumSize(1000) +.build( + new CacheLoader[InType, OutType]() { +override def load(in: InType): OutType = globalLock.synchronized { + create(in) +} + }) + + def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType= --- End diff -- nit: space before = Also would be great to add inline doc on what these apply's are doing, since apply's are used quite a lot in catalyst with different semantics. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15332107 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + + +/** + * Generates bytecode that produces a new [[Row]] object based on a fixed set of input + * [[Expression Expressions]] and a given input [[Row]]. The returned [[Row]] object is custom + * generated based on the output types of the [[Expression]] to avoid boxing of primitive values. + */ +object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + protected def canonicalize(in: Seq[Expression]): Seq[Expression] = +in.map(ExpressionCanonicalizer(_)) + + protected def bind(in: Seq[Expression], inputSchema: Seq[Attribute]): Seq[Expression] = +in.map(BindReferences.bindReference(_, inputSchema)) + + // Make Mutablility optional... + protected def create(expressions: Seq[Expression]): Projection = { +val tupleLength = ru.Literal(Constant(expressions.length)) +val lengthDef = qfinal val length = $tupleLength + +/* TODO: Configurable... +val nullFunctions = + q +private final val nullSet = new org.apache.spark.util.collection.BitSet(length) +final def setNullAt(i: Int) = nullSet.set(i) +final def isNullAt(i: Int) = nullSet.get(i) + + */ + +val nullFunctions = + q +private[this] var nullBits = new Array[Boolean](${expressions.size}) +final def setNullAt(i: Int) = { nullBits(i) = true } +final def isNullAt(i: Int) = nullBits(i) + .children + +val tupleElements = expressions.zipWithIndex.flatMap { + case (e, i) = +val elementName = newTermName(sc$i) +val evaluatedExpression = expressionEvaluator(e) --- End diff -- where is expressionEvaluator defined? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15332175 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import com.google.common.cache.{CacheLoader, CacheBuilder} + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code to perform expression evaluation. Includes a set of + * helpers for referring to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Logging { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + protected val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + protected val rowType = typeOf[Row] + protected val mutableRowType = typeOf[MutableRow] + protected val genericRowType = typeOf[GenericRow] + protected val genericMutableRowType = typeOf[GenericMutableRow] + + protected val projectionType = typeOf[Projection] + protected val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeparator = $ + + /** + * Generates a class for a given input expression. Called when there is not cached code + * already available. + */ + protected def create(in: InType): OutType + + /** + * Canonicalizes an input expression. Used to avoid double caching expressions that differ only + * cosmetically. + */ + protected def canonicalize(in: InType): InType + + /** Binds an input expression to a given input schema */ + protected def bind(in: InType, inputSchema: Seq[Attribute]): InType + + protected val cache = CacheBuilder.newBuilder() +.maximumSize(1000) +.build( + new CacheLoader[InType, OutType]() { +override def load(in: InType): OutType = globalLock.synchronized { + create(in) +} + }) + + def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType= --- End diff -- actually after reading through the code more, i'd argue we should rename this function from apply to something more informative. unless semantically very explicit obvious (e.g. array), the use of apply makes it harder to understand code around its usage. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r1512 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import com.google.common.cache.{CacheLoader, CacheBuilder} + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code to perform expression evaluation. Includes a set of + * helpers for referring to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Logging { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + protected val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + protected val rowType = typeOf[Row] + protected val mutableRowType = typeOf[MutableRow] + protected val genericRowType = typeOf[GenericRow] + protected val genericMutableRowType = typeOf[GenericMutableRow] + + protected val projectionType = typeOf[Projection] + protected val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeparator = $ + + /** + * Generates a class for a given input expression. Called when there is not cached code + * already available. + */ + protected def create(in: InType): OutType + + /** + * Canonicalizes an input expression. Used to avoid double caching expressions that differ only + * cosmetically. + */ + protected def canonicalize(in: InType): InType + + /** Binds an input expression to a given input schema */ + protected def bind(in: InType, inputSchema: Seq[Attribute]): InType + + protected val cache = CacheBuilder.newBuilder() +.maximumSize(1000) +.build( + new CacheLoader[InType, OutType]() { +override def load(in: InType): OutType = globalLock.synchronized { + create(in) +} + }) + + def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType= +apply(bind(expressions, inputSchema)) + + def apply(expressions: InType): OutType = cache.get(canonicalize(expressions)) + + /** + * Returns a term name that is unique within this instance of a `CodeGenerator`. + * + * (Since we aren't in a macro context we do not seem to have access to the built in `freshName` + * function.) + */ + protected def freshName(prefix: String): TermName = { +newTermName(s$prefix$javaSeparator${curId.getAndIncrement}) + } + + /** + * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input. + * + * @param code The sequence of statements required to evaluate the expression. + * @param nullTerm A term that holds a boolean value representing whether the expression evaluated + * to null. + * @param primitiveTerm A term for a possible primitive value of the result of the evaluation. Not + * valid if `nullTerm` is set to `false`. + * @param objectTerm A possibly boxed version of the result of evaluating this expression. + */ + protected case class EvaluatedExpression( + code: Seq[Tree], + nullTerm: TermName, + primitiveTerm: TermName, + objectTerm: TermName) + + /** + * Given an expression tree returns an [[EvaluatedExpression]], which contains Scala trees that + * can be
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r1552 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import com.google.common.cache.{CacheLoader, CacheBuilder} + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code to perform expression evaluation. Includes a set of + * helpers for referring to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Logging { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + protected val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + protected val rowType = typeOf[Row] + protected val mutableRowType = typeOf[MutableRow] + protected val genericRowType = typeOf[GenericRow] + protected val genericMutableRowType = typeOf[GenericMutableRow] + + protected val projectionType = typeOf[Projection] + protected val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeparator = $ + + /** + * Generates a class for a given input expression. Called when there is not cached code + * already available. + */ + protected def create(in: InType): OutType + + /** + * Canonicalizes an input expression. Used to avoid double caching expressions that differ only + * cosmetically. + */ + protected def canonicalize(in: InType): InType + + /** Binds an input expression to a given input schema */ + protected def bind(in: InType, inputSchema: Seq[Attribute]): InType + + protected val cache = CacheBuilder.newBuilder() +.maximumSize(1000) +.build( + new CacheLoader[InType, OutType]() { +override def load(in: InType): OutType = globalLock.synchronized { + create(in) +} + }) + + def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType= +apply(bind(expressions, inputSchema)) + + def apply(expressions: InType): OutType = cache.get(canonicalize(expressions)) + + /** + * Returns a term name that is unique within this instance of a `CodeGenerator`. + * + * (Since we aren't in a macro context we do not seem to have access to the built in `freshName` + * function.) + */ + protected def freshName(prefix: String): TermName = { +newTermName(s$prefix$javaSeparator${curId.getAndIncrement}) + } + + /** + * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input. + * + * @param code The sequence of statements required to evaluate the expression. + * @param nullTerm A term that holds a boolean value representing whether the expression evaluated + * to null. + * @param primitiveTerm A term for a possible primitive value of the result of the evaluation. Not + * valid if `nullTerm` is set to `false`. + * @param objectTerm A possibly boxed version of the result of evaluating this expression. + */ + protected case class EvaluatedExpression( + code: Seq[Tree], + nullTerm: TermName, + primitiveTerm: TermName, + objectTerm: TermName) + + /** + * Given an expression tree returns an [[EvaluatedExpression]], which contains Scala trees that + * can be
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r1579 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import com.google.common.cache.{CacheLoader, CacheBuilder} + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code to perform expression evaluation. Includes a set of + * helpers for referring to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Logging { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + protected val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + protected val rowType = typeOf[Row] + protected val mutableRowType = typeOf[MutableRow] + protected val genericRowType = typeOf[GenericRow] + protected val genericMutableRowType = typeOf[GenericMutableRow] + + protected val projectionType = typeOf[Projection] + protected val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeparator = $ + + /** + * Generates a class for a given input expression. Called when there is not cached code + * already available. + */ + protected def create(in: InType): OutType + + /** + * Canonicalizes an input expression. Used to avoid double caching expressions that differ only + * cosmetically. + */ + protected def canonicalize(in: InType): InType + + /** Binds an input expression to a given input schema */ + protected def bind(in: InType, inputSchema: Seq[Attribute]): InType + + protected val cache = CacheBuilder.newBuilder() +.maximumSize(1000) +.build( + new CacheLoader[InType, OutType]() { +override def load(in: InType): OutType = globalLock.synchronized { + create(in) +} + }) + + def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType= +apply(bind(expressions, inputSchema)) + + def apply(expressions: InType): OutType = cache.get(canonicalize(expressions)) + + /** + * Returns a term name that is unique within this instance of a `CodeGenerator`. + * + * (Since we aren't in a macro context we do not seem to have access to the built in `freshName` + * function.) + */ + protected def freshName(prefix: String): TermName = { +newTermName(s$prefix$javaSeparator${curId.getAndIncrement}) + } + + /** + * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input. + * + * @param code The sequence of statements required to evaluate the expression. + * @param nullTerm A term that holds a boolean value representing whether the expression evaluated + * to null. + * @param primitiveTerm A term for a possible primitive value of the result of the evaluation. Not + * valid if `nullTerm` is set to `false`. + * @param objectTerm A possibly boxed version of the result of evaluating this expression. + */ + protected case class EvaluatedExpression( + code: Seq[Tree], + nullTerm: TermName, + primitiveTerm: TermName, + objectTerm: TermName) + + /** + * Given an expression tree returns an [[EvaluatedExpression]], which contains Scala trees that + * can be
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333476 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import com.google.common.cache.{CacheLoader, CacheBuilder} + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code to perform expression evaluation. Includes a set of + * helpers for referring to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Logging { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + protected val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + protected val rowType = typeOf[Row] + protected val mutableRowType = typeOf[MutableRow] + protected val genericRowType = typeOf[GenericRow] + protected val genericMutableRowType = typeOf[GenericMutableRow] + + protected val projectionType = typeOf[Projection] + protected val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeparator = $ + + /** + * Generates a class for a given input expression. Called when there is not cached code + * already available. + */ + protected def create(in: InType): OutType + + /** + * Canonicalizes an input expression. Used to avoid double caching expressions that differ only + * cosmetically. + */ + protected def canonicalize(in: InType): InType + + /** Binds an input expression to a given input schema */ + protected def bind(in: InType, inputSchema: Seq[Attribute]): InType + + protected val cache = CacheBuilder.newBuilder() +.maximumSize(1000) +.build( + new CacheLoader[InType, OutType]() { +override def load(in: InType): OutType = globalLock.synchronized { + create(in) +} + }) + + def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType= +apply(bind(expressions, inputSchema)) + + def apply(expressions: InType): OutType = cache.get(canonicalize(expressions)) + + /** + * Returns a term name that is unique within this instance of a `CodeGenerator`. + * + * (Since we aren't in a macro context we do not seem to have access to the built in `freshName` + * function.) + */ + protected def freshName(prefix: String): TermName = { +newTermName(s$prefix$javaSeparator${curId.getAndIncrement}) + } + + /** + * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input. + * + * @param code The sequence of statements required to evaluate the expression. + * @param nullTerm A term that holds a boolean value representing whether the expression evaluated + * to null. + * @param primitiveTerm A term for a possible primitive value of the result of the evaluation. Not + * valid if `nullTerm` is set to `false`. + * @param objectTerm A possibly boxed version of the result of evaluating this expression. + */ + protected case class EvaluatedExpression( + code: Seq[Tree], + nullTerm: TermName, + primitiveTerm: TermName, + objectTerm: TermName) + + /** + * Given an expression tree returns an [[EvaluatedExpression]], which contains Scala trees that + * can be
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333508 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import com.google.common.cache.{CacheLoader, CacheBuilder} + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code to perform expression evaluation. Includes a set of + * helpers for referring to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Logging { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + protected val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + protected val rowType = typeOf[Row] + protected val mutableRowType = typeOf[MutableRow] + protected val genericRowType = typeOf[GenericRow] + protected val genericMutableRowType = typeOf[GenericMutableRow] + + protected val projectionType = typeOf[Projection] + protected val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeparator = $ + + /** + * Generates a class for a given input expression. Called when there is not cached code + * already available. + */ + protected def create(in: InType): OutType + + /** + * Canonicalizes an input expression. Used to avoid double caching expressions that differ only + * cosmetically. + */ + protected def canonicalize(in: InType): InType + + /** Binds an input expression to a given input schema */ + protected def bind(in: InType, inputSchema: Seq[Attribute]): InType + + protected val cache = CacheBuilder.newBuilder() +.maximumSize(1000) +.build( + new CacheLoader[InType, OutType]() { +override def load(in: InType): OutType = globalLock.synchronized { + create(in) +} + }) + + def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType= +apply(bind(expressions, inputSchema)) + + def apply(expressions: InType): OutType = cache.get(canonicalize(expressions)) + + /** + * Returns a term name that is unique within this instance of a `CodeGenerator`. + * + * (Since we aren't in a macro context we do not seem to have access to the built in `freshName` + * function.) + */ + protected def freshName(prefix: String): TermName = { +newTermName(s$prefix$javaSeparator${curId.getAndIncrement}) + } + + /** + * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input. + * + * @param code The sequence of statements required to evaluate the expression. + * @param nullTerm A term that holds a boolean value representing whether the expression evaluated + * to null. + * @param primitiveTerm A term for a possible primitive value of the result of the evaluation. Not + * valid if `nullTerm` is set to `false`. + * @param objectTerm A possibly boxed version of the result of evaluating this expression. + */ + protected case class EvaluatedExpression( + code: Seq[Tree], + nullTerm: TermName, + primitiveTerm: TermName, + objectTerm: TermName) + + /** + * Given an expression tree returns an [[EvaluatedExpression]], which contains Scala trees that + * can be
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333558 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import com.google.common.cache.{CacheLoader, CacheBuilder} + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code to perform expression evaluation. Includes a set of + * helpers for referring to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Logging { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + protected val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + protected val rowType = typeOf[Row] + protected val mutableRowType = typeOf[MutableRow] + protected val genericRowType = typeOf[GenericRow] + protected val genericMutableRowType = typeOf[GenericMutableRow] + + protected val projectionType = typeOf[Projection] + protected val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeparator = $ + + /** + * Generates a class for a given input expression. Called when there is not cached code + * already available. + */ + protected def create(in: InType): OutType + + /** + * Canonicalizes an input expression. Used to avoid double caching expressions that differ only + * cosmetically. + */ + protected def canonicalize(in: InType): InType + + /** Binds an input expression to a given input schema */ + protected def bind(in: InType, inputSchema: Seq[Attribute]): InType + + protected val cache = CacheBuilder.newBuilder() +.maximumSize(1000) +.build( + new CacheLoader[InType, OutType]() { +override def load(in: InType): OutType = globalLock.synchronized { + create(in) +} + }) + + def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType= +apply(bind(expressions, inputSchema)) + + def apply(expressions: InType): OutType = cache.get(canonicalize(expressions)) + + /** + * Returns a term name that is unique within this instance of a `CodeGenerator`. + * + * (Since we aren't in a macro context we do not seem to have access to the built in `freshName` + * function.) + */ + protected def freshName(prefix: String): TermName = { +newTermName(s$prefix$javaSeparator${curId.getAndIncrement}) + } + + /** + * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input. + * + * @param code The sequence of statements required to evaluate the expression. + * @param nullTerm A term that holds a boolean value representing whether the expression evaluated + * to null. + * @param primitiveTerm A term for a possible primitive value of the result of the evaluation. Not + * valid if `nullTerm` is set to `false`. + * @param objectTerm A possibly boxed version of the result of evaluating this expression. + */ + protected case class EvaluatedExpression( + code: Seq[Tree], + nullTerm: TermName, + primitiveTerm: TermName, + objectTerm: TermName) + + /** + * Given an expression tree returns an [[EvaluatedExpression]], which contains Scala trees that + * can be
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333592 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala --- @@ -47,4 +47,30 @@ package org.apache.spark.sql.catalyst * ==Evaluation== * The result of expressions can be evaluated using the `Expression.apply(Row)` method. */ -package object expressions +package object expressions { + + /** + * Converts a [[Row]] to another Row given a sequence of expression that define each column of the + * new row. If the schema of the input row is specified, then the given expression will be bound + * to that schema. + */ + abstract class Projection extends (Row = Row) + + /** + * Converts a [[Row]] to another Row given a sequence of expression that define each column of the + * new row. If the schema of the input row is specified, then the given expression will be bound + * to that schema. + * + * In contrast to a normal projection, a MutableProjection reuses the same underlying row object + * each time an input row is added. This significantly reduces the cost of calculating the + * projection, but means that it is not safe to hold on to a reference to a [[Row]] after `next()` + * has been called on the [[Iterator]] that produced it. Instead, the user must call `Row.copy()` + * and hold on to the returned [[Row]] before calling `next()`. + */ + abstract class MutableProjection extends Projection { +def currentValue: Row + +/** Updates the target of this projection to a new MutableRow */ --- End diff -- maybe ```Uses the given row to store the output of the projection.``` ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333688 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratedEvaluationSuite.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen._ + +/** + * Overrides our expression evaluation tests to use code generation for evaluation. + */ +class GeneratedEvaluationSuite extends ExpressionEvaluationSuite { + override def checkEvaluation( + expression: Expression, + expected: Any, + inputRow: Row = EmptyRow): Unit = { +val plan = try { + GenerateMutableProjection(Alias(expression, sOptimized($expression))() :: Nil)() +} catch { + case e: Throwable = +val evaluated = GenerateProjection.expressionEvaluator(expression) +fail( + s +|Code generation of $expression failed: +|${evaluated.code.mkString(\n)} +|$e + .stripMargin) +} + +val actual = plan(inputRow).apply(0) +if(actual != expected) { + val input = if(inputRow == EmptyRow) else s, input: $inputRow + fail(sIncorrect Evaluation: $expression, actual: $actual, expected: $expected$input) +} + } + + + test(multithreaded eval) { +import scala.concurrent._ +import ExecutionContext.Implicits.global +import scala.concurrent.duration._ + +val futures = (1 to 20).map { _ = + future { +GeneratePredicate(EqualTo(Literal(1), Literal(1))) +GenerateProjection(EqualTo(Literal(1), Literal(1)) :: Nil) +GenerateMutableProjection(EqualTo(Literal(1), Literal(1)) :: Nil) +GenerateOrdering(Add(Literal(1), Literal(1)).asc :: Nil) + } +} + +futures.foreach(Await.result(_, 10.seconds)) + } +} + +/** + * Overrides our expression evaluation tests to use generated code on mutable rows. + */ +class GeneratedMutableEvaluationSuite extends ExpressionEvaluationSuite { --- End diff -- Best to take this out into a separate file, since that's the common standard across all Spark modules (each test suite has its own file) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333694 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/GeneratedEvaluationSuite.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen._ + +/** + * Overrides our expression evaluation tests to use code generation for evaluation. + */ +class GeneratedEvaluationSuite extends ExpressionEvaluationSuite { + override def checkEvaluation( + expression: Expression, + expected: Any, + inputRow: Row = EmptyRow): Unit = { +val plan = try { + GenerateMutableProjection(Alias(expression, sOptimized($expression))() :: Nil)() +} catch { + case e: Throwable = +val evaluated = GenerateProjection.expressionEvaluator(expression) +fail( + s +|Code generation of $expression failed: +|${evaluated.code.mkString(\n)} +|$e + .stripMargin) +} + +val actual = plan(inputRow).apply(0) +if(actual != expected) { + val input = if(inputRow == EmptyRow) else s, input: $inputRow + fail(sIncorrect Evaluation: $expression, actual: $actual, expected: $expected$input) +} + } + + + test(multithreaded eval) { +import scala.concurrent._ +import ExecutionContext.Implicits.global +import scala.concurrent.duration._ + +val futures = (1 to 20).map { _ = + future { +GeneratePredicate(EqualTo(Literal(1), Literal(1))) +GenerateProjection(EqualTo(Literal(1), Literal(1)) :: Nil) +GenerateMutableProjection(EqualTo(Literal(1), Literal(1)) :: Nil) +GenerateOrdering(Add(Literal(1), Literal(1)).asc :: Nil) + } +} + +futures.foreach(Await.result(_, 10.seconds)) + } +} + +/** + * Overrides our expression evaluation tests to use generated code on mutable rows. + */ +class GeneratedMutableEvaluationSuite extends ExpressionEvaluationSuite { + override def checkEvaluation( + expression: Expression, + expected: Any, + inputRow: Row = EmptyRow): Unit = { +lazy val evaluated = GenerateProjection.expressionEvaluator(expression) + +val plan = try { + GenerateProjection(Alias(expression, sOptimized($expression))() :: Nil) +} catch { + case e: Throwable = +fail( + s +|Code generation of $expression failed: +|${evaluated.code.mkString(\n)} +|$e + .stripMargin) +} + +val actual = plan(inputRow) +val expectedRow = new GenericRow(Array[Any](expected)) +if (actual.hashCode() != expectedRow.hashCode()) { + fail( +s + |Mismatched hashCodes for values: $actual, $expectedRow + |Hash Codes: ${actual.hashCode()} != ${expectedRow.hashCode()} + |${evaluated.code.mkString(\n)} +.stripMargin) +} +if (actual != expectedRow) { + val input = if(inputRow == EmptyRow) else s, input: $inputRow + fail(sIncorrect Evaluation: $expression, actual: $actual, expected: $expected$input) +} + } +} --- End diff -- nitpick - add a new line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333762 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/Generate.scala --- @@ -47,23 +47,29 @@ case class Generate( } } - override def output = + // This must be a val since the generator output expr ids are not preserved by serialization. + override val output = if (join) child.output ++ generatorOutput else generatorOutput + val boundGenerator = BindReferences.bindReference(generator, child.output) + + /** Codegenned rows are not serializable... */ --- End diff -- TODO? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333803 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.types._ + +case class AggregateEvaluation( +schema: Seq[Attribute], +initialValues: Seq[Expression], +update: Seq[Expression], +result: Expression) + +/** + * :: DeveloperApi :: + * Alternate version of aggregation that leverages projection and thus code generation. + * Aggregations are converted into a set of projections from a aggregation buffer tuple back onto + * itself. Currently only used for simple aggregations like SUM, COUNT, or AVERAGE are supported. + * + * @param partial if true then aggregation is done partially on local data without shuffling to + *ensure all values where `groupingExpressions` are equal are present. + * @param groupingExpressions expressions that are evaluated to determine grouping. + * @param aggregateExpressions expressions that are computed for each group. + * @param child the input data source. + */ +@DeveloperApi +case class GeneratedAggregate( +partial: Boolean, +groupingExpressions: Seq[Expression], +aggregateExpressions: Seq[NamedExpression], +child: SparkPlan) + extends UnaryNode { + + override def requiredChildDistribution = +if (partial) { + UnspecifiedDistribution :: Nil +} else { + if (groupingExpressions == Nil) { +AllTuples :: Nil + } else { +ClusteredDistribution(groupingExpressions) :: Nil + } +} + + override def output = aggregateExpressions.map(_.toAttribute) + + override def execute() = { +val aggregatesToCompute = aggregateExpressions.flatMap { a = + a.collect { case agg: AggregateExpression = agg} +} + +val computeFunctions = aggregatesToCompute.map { + case c @ Count(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val initialValue = Literal(0L) +val updateFunction = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val result = currentCount + +AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case Sum(expr) = +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialValue = Cast(Literal(0L), expr.dataType) + +// Coalasce avoids double calculation... +// but really, common sub expression elimination would be better +val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) +val result = currentSum + +AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case a @ Average(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialCount = Literal(0L) +val initialSum = Cast(Literal(0L), expr.dataType) +val updateCount = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) + +val result = Divide(Cast(currentSum, DoubleType), Cast(currentCount, DoubleType)) + +AggregateEvaluation( + currentCount :: currentSum :: Nil, +
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333814 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.types._ + +case class AggregateEvaluation( +schema: Seq[Attribute], +initialValues: Seq[Expression], +update: Seq[Expression], +result: Expression) + +/** + * :: DeveloperApi :: + * Alternate version of aggregation that leverages projection and thus code generation. + * Aggregations are converted into a set of projections from a aggregation buffer tuple back onto + * itself. Currently only used for simple aggregations like SUM, COUNT, or AVERAGE are supported. + * + * @param partial if true then aggregation is done partially on local data without shuffling to + *ensure all values where `groupingExpressions` are equal are present. + * @param groupingExpressions expressions that are evaluated to determine grouping. + * @param aggregateExpressions expressions that are computed for each group. + * @param child the input data source. + */ +@DeveloperApi +case class GeneratedAggregate( +partial: Boolean, +groupingExpressions: Seq[Expression], +aggregateExpressions: Seq[NamedExpression], +child: SparkPlan) + extends UnaryNode { + + override def requiredChildDistribution = +if (partial) { + UnspecifiedDistribution :: Nil +} else { + if (groupingExpressions == Nil) { +AllTuples :: Nil + } else { +ClusteredDistribution(groupingExpressions) :: Nil + } +} + + override def output = aggregateExpressions.map(_.toAttribute) + + override def execute() = { +val aggregatesToCompute = aggregateExpressions.flatMap { a = + a.collect { case agg: AggregateExpression = agg} +} + +val computeFunctions = aggregatesToCompute.map { + case c @ Count(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val initialValue = Literal(0L) +val updateFunction = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val result = currentCount + +AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case Sum(expr) = +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialValue = Cast(Literal(0L), expr.dataType) + +// Coalasce avoids double calculation... +// but really, common sub expression elimination would be better +val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) +val result = currentSum + +AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case a @ Average(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialCount = Literal(0L) +val initialSum = Cast(Literal(0L), expr.dataType) +val updateCount = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) + +val result = Divide(Cast(currentSum, DoubleType), Cast(currentCount, DoubleType)) + +AggregateEvaluation( + currentCount :: currentSum :: Nil, +
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333832 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.types._ + +case class AggregateEvaluation( +schema: Seq[Attribute], +initialValues: Seq[Expression], +update: Seq[Expression], +result: Expression) + +/** + * :: DeveloperApi :: + * Alternate version of aggregation that leverages projection and thus code generation. + * Aggregations are converted into a set of projections from a aggregation buffer tuple back onto + * itself. Currently only used for simple aggregations like SUM, COUNT, or AVERAGE are supported. + * + * @param partial if true then aggregation is done partially on local data without shuffling to + *ensure all values where `groupingExpressions` are equal are present. + * @param groupingExpressions expressions that are evaluated to determine grouping. + * @param aggregateExpressions expressions that are computed for each group. + * @param child the input data source. + */ +@DeveloperApi +case class GeneratedAggregate( +partial: Boolean, +groupingExpressions: Seq[Expression], +aggregateExpressions: Seq[NamedExpression], +child: SparkPlan) + extends UnaryNode { + + override def requiredChildDistribution = +if (partial) { + UnspecifiedDistribution :: Nil +} else { + if (groupingExpressions == Nil) { +AllTuples :: Nil + } else { +ClusteredDistribution(groupingExpressions) :: Nil + } +} + + override def output = aggregateExpressions.map(_.toAttribute) + + override def execute() = { +val aggregatesToCompute = aggregateExpressions.flatMap { a = + a.collect { case agg: AggregateExpression = agg} +} + +val computeFunctions = aggregatesToCompute.map { + case c @ Count(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val initialValue = Literal(0L) +val updateFunction = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val result = currentCount + +AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case Sum(expr) = +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialValue = Cast(Literal(0L), expr.dataType) + +// Coalasce avoids double calculation... +// but really, common sub expression elimination would be better +val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) +val result = currentSum + +AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case a @ Average(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialCount = Literal(0L) +val initialSum = Cast(Literal(0L), expr.dataType) +val updateCount = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) + +val result = Divide(Cast(currentSum, DoubleType), Cast(currentCount, DoubleType)) + +AggregateEvaluation( + currentCount :: currentSum :: Nil, +
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333860 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.types._ + +case class AggregateEvaluation( +schema: Seq[Attribute], +initialValues: Seq[Expression], +update: Seq[Expression], +result: Expression) + +/** + * :: DeveloperApi :: + * Alternate version of aggregation that leverages projection and thus code generation. + * Aggregations are converted into a set of projections from a aggregation buffer tuple back onto + * itself. Currently only used for simple aggregations like SUM, COUNT, or AVERAGE are supported. + * + * @param partial if true then aggregation is done partially on local data without shuffling to + *ensure all values where `groupingExpressions` are equal are present. + * @param groupingExpressions expressions that are evaluated to determine grouping. + * @param aggregateExpressions expressions that are computed for each group. + * @param child the input data source. + */ +@DeveloperApi +case class GeneratedAggregate( +partial: Boolean, +groupingExpressions: Seq[Expression], +aggregateExpressions: Seq[NamedExpression], +child: SparkPlan) + extends UnaryNode { + + override def requiredChildDistribution = +if (partial) { + UnspecifiedDistribution :: Nil +} else { + if (groupingExpressions == Nil) { +AllTuples :: Nil + } else { +ClusteredDistribution(groupingExpressions) :: Nil + } +} + + override def output = aggregateExpressions.map(_.toAttribute) + + override def execute() = { +val aggregatesToCompute = aggregateExpressions.flatMap { a = + a.collect { case agg: AggregateExpression = agg} +} + +val computeFunctions = aggregatesToCompute.map { + case c @ Count(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val initialValue = Literal(0L) +val updateFunction = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val result = currentCount + +AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case Sum(expr) = +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialValue = Cast(Literal(0L), expr.dataType) + +// Coalasce avoids double calculation... +// but really, common sub expression elimination would be better +val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) +val result = currentSum + +AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case a @ Average(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialCount = Literal(0L) +val initialSum = Cast(Literal(0L), expr.dataType) +val updateCount = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) + +val result = Divide(Cast(currentSum, DoubleType), Cast(currentCount, DoubleType)) + +AggregateEvaluation( + currentCount :: currentSum :: Nil, +
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333900 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.types._ + +case class AggregateEvaluation( +schema: Seq[Attribute], +initialValues: Seq[Expression], +update: Seq[Expression], +result: Expression) + +/** + * :: DeveloperApi :: + * Alternate version of aggregation that leverages projection and thus code generation. + * Aggregations are converted into a set of projections from a aggregation buffer tuple back onto + * itself. Currently only used for simple aggregations like SUM, COUNT, or AVERAGE are supported. + * + * @param partial if true then aggregation is done partially on local data without shuffling to + *ensure all values where `groupingExpressions` are equal are present. + * @param groupingExpressions expressions that are evaluated to determine grouping. + * @param aggregateExpressions expressions that are computed for each group. + * @param child the input data source. + */ +@DeveloperApi +case class GeneratedAggregate( +partial: Boolean, +groupingExpressions: Seq[Expression], +aggregateExpressions: Seq[NamedExpression], +child: SparkPlan) + extends UnaryNode { + + override def requiredChildDistribution = +if (partial) { + UnspecifiedDistribution :: Nil +} else { + if (groupingExpressions == Nil) { +AllTuples :: Nil + } else { +ClusteredDistribution(groupingExpressions) :: Nil + } +} + + override def output = aggregateExpressions.map(_.toAttribute) + + override def execute() = { +val aggregatesToCompute = aggregateExpressions.flatMap { a = + a.collect { case agg: AggregateExpression = agg} +} + +val computeFunctions = aggregatesToCompute.map { + case c @ Count(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val initialValue = Literal(0L) +val updateFunction = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val result = currentCount + +AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case Sum(expr) = +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialValue = Cast(Literal(0L), expr.dataType) + +// Coalasce avoids double calculation... +// but really, common sub expression elimination would be better +val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) +val result = currentSum + +AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case a @ Average(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialCount = Literal(0L) +val initialSum = Cast(Literal(0L), expr.dataType) +val updateCount = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) + +val result = Divide(Cast(currentSum, DoubleType), Cast(currentCount, DoubleType)) + +AggregateEvaluation( + currentCount :: currentSum :: Nil, +
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333888 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/GeneratedAggregate.scala --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.types._ + +case class AggregateEvaluation( +schema: Seq[Attribute], +initialValues: Seq[Expression], +update: Seq[Expression], +result: Expression) + +/** + * :: DeveloperApi :: + * Alternate version of aggregation that leverages projection and thus code generation. + * Aggregations are converted into a set of projections from a aggregation buffer tuple back onto + * itself. Currently only used for simple aggregations like SUM, COUNT, or AVERAGE are supported. + * + * @param partial if true then aggregation is done partially on local data without shuffling to + *ensure all values where `groupingExpressions` are equal are present. + * @param groupingExpressions expressions that are evaluated to determine grouping. + * @param aggregateExpressions expressions that are computed for each group. + * @param child the input data source. + */ +@DeveloperApi +case class GeneratedAggregate( +partial: Boolean, +groupingExpressions: Seq[Expression], +aggregateExpressions: Seq[NamedExpression], +child: SparkPlan) + extends UnaryNode { + + override def requiredChildDistribution = +if (partial) { + UnspecifiedDistribution :: Nil +} else { + if (groupingExpressions == Nil) { +AllTuples :: Nil + } else { +ClusteredDistribution(groupingExpressions) :: Nil + } +} + + override def output = aggregateExpressions.map(_.toAttribute) + + override def execute() = { +val aggregatesToCompute = aggregateExpressions.flatMap { a = + a.collect { case agg: AggregateExpression = agg} +} + +val computeFunctions = aggregatesToCompute.map { + case c @ Count(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val initialValue = Literal(0L) +val updateFunction = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val result = currentCount + +AggregateEvaluation(currentCount :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case Sum(expr) = +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialValue = Cast(Literal(0L), expr.dataType) + +// Coalasce avoids double calculation... +// but really, common sub expression elimination would be better +val updateFunction = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) +val result = currentSum + +AggregateEvaluation(currentSum :: Nil, initialValue :: Nil, updateFunction :: Nil, result) + + case a @ Average(expr) = +val currentCount = AttributeReference(currentCount, LongType, nullable = false)() +val currentSum = AttributeReference(currentSum, expr.dataType, nullable = false)() +val initialCount = Literal(0L) +val initialSum = Cast(Literal(0L), expr.dataType) +val updateCount = If(IsNotNull(expr), Add(currentCount, Literal(1L)), currentCount) +val updateSum = Coalesce(Add(expr, currentSum) :: currentSum :: Nil) + +val result = Divide(Cast(currentSum, DoubleType), Cast(currentCount, DoubleType)) + +AggregateEvaluation( + currentCount :: currentSum :: Nil, +
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333936 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -18,22 +18,53 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.Logging import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Logging, Row} +import org.apache.spark.sql.{SQLContext, Row} import org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.GenericRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.BaseRelation import org.apache.spark.sql.catalyst.plans.physical._ + +object SparkPlan { + protected[sql] val currentContext = new ThreadLocal[SQLContext]() +} + /** * :: DeveloperApi :: */ @DeveloperApi -abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging { +abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable { self: Product = + /** + * A handle to the SQL Context that was used to create this plan. Since many operators need + * access to the sqlContext for RDD operations or configuration this field is automatically + * populated by the query planning infrastructure. + */ + @transient + protected val sqlContext = SparkPlan.currentContext.get() + + protected def sparkContext = sqlContext.sparkContext + + def logger = log + + val codegenEnabled: Boolean = if(sqlContext != null) { --- End diff -- when is context null? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333966 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -51,8 +82,46 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging { */ def executeCollect(): Array[Row] = execute().map(_.copy()).collect() - protected def buildRow(values: Seq[Any]): Row = -new GenericRow(values.toArray) + protected def newProjection( + expressions: Seq[Expression], inputSchema: Seq[Attribute]): Projection = { +log.debug( + sCreating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled) +if (codegenEnabled) { + GenerateProjection(expressions, inputSchema) +} else { + new InterpretedProjection(expressions, inputSchema) +} + } + + protected def newMutableProjection( + expressions: Seq[Expression], + inputSchema: Seq[Attribute]): () = MutableProjection = { +log.debug( + sCreating MutableProj: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled) +if(codegenEnabled) { + GenerateMutableProjection(expressions, inputSchema) --- End diff -- This is another use of apply that is very confusing, because it is not obvious GenerateMutableProjection is a closure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15333978 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -192,9 +187,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { val relation = ParquetRelation.create(path, child, sparkContext.hadoopConfiguration) // Note: overwrite=false because otherwise the metadata we just created will be deleted -InsertIntoParquetTable(relation, planLater(child), overwrite=false)(sqlContext) :: Nil +InsertIntoParquetTable(relation, planLater(child), overwrite=false) :: Nil --- End diff -- nit: space after/before = --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15334051 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala --- @@ -300,8 +298,16 @@ case class LeftSemiJoinBNL( case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode { def output = left.output ++ right.output - def execute() = left.execute().map(_.copy()).cartesian(right.execute().map(_.copy())).map { -case (l: Row, r: Row) = buildRow(l ++ r) + def execute() = { +val leftResults = left.execute().map(_.copy()) +val rightResults = right.execute().map(_.copy()) + +leftResults.cartesian(rightResults).mapPartitions { iter = + val joinedRow = new JoinedRow + iter.map { +case (l: Row, r: Row) = joinedRow(l, r) --- End diff -- maybe perf doesn't matter too much here since it is a cartesian product already, but you can remove the pattern matching to improve perf. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15334075 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala --- @@ -17,6 +17,7 @@ package org.apache.spark.sql.parquet +import org.apache.spark.sql.execution.SparkPlan --- End diff -- import order --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15334407 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -0,0 +1,458 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import com.google.common.cache.{CacheLoader, CacheBuilder} + +import scala.language.existentials + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + +/** + * A base class for generators of byte code to perform expression evaluation. Includes a set of + * helpers for referring to Catalyst types and building trees that perform evaluation of individual + * expressions. + */ +abstract class CodeGenerator[InType : AnyRef, OutType : AnyRef] extends Logging { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + import scala.tools.reflect.ToolBox + + protected val toolBox = runtimeMirror(getClass.getClassLoader).mkToolBox() + + protected val rowType = typeOf[Row] + protected val mutableRowType = typeOf[MutableRow] + protected val genericRowType = typeOf[GenericRow] + protected val genericMutableRowType = typeOf[GenericMutableRow] + + protected val projectionType = typeOf[Projection] + protected val mutableProjectionType = typeOf[MutableProjection] + + private val curId = new java.util.concurrent.atomic.AtomicInteger() + private val javaSeparator = $ + + /** + * Generates a class for a given input expression. Called when there is not cached code + * already available. + */ + protected def create(in: InType): OutType + + /** + * Canonicalizes an input expression. Used to avoid double caching expressions that differ only + * cosmetically. + */ + protected def canonicalize(in: InType): InType + + /** Binds an input expression to a given input schema */ + protected def bind(in: InType, inputSchema: Seq[Attribute]): InType + + protected val cache = CacheBuilder.newBuilder() +.maximumSize(1000) +.build( + new CacheLoader[InType, OutType]() { +override def load(in: InType): OutType = globalLock.synchronized { + create(in) +} + }) + + def apply(expressions: InType, inputSchema: Seq[Attribute]): OutType= +apply(bind(expressions, inputSchema)) + + def apply(expressions: InType): OutType = cache.get(canonicalize(expressions)) + + /** + * Returns a term name that is unique within this instance of a `CodeGenerator`. + * + * (Since we aren't in a macro context we do not seem to have access to the built in `freshName` + * function.) + */ + protected def freshName(prefix: String): TermName = { +newTermName(s$prefix$javaSeparator${curId.getAndIncrement}) + } + + /** + * Scala ASTs for evaluating an [[Expression]] given a [[Row]] of input. + * + * @param code The sequence of statements required to evaluate the expression. + * @param nullTerm A term that holds a boolean value representing whether the expression evaluated + * to null. + * @param primitiveTerm A term for a possible primitive value of the result of the evaluation. Not + * valid if `nullTerm` is set to `false`. + * @param objectTerm A possibly boxed version of the result of evaluating this expression. + */ + protected case class EvaluatedExpression( + code: Seq[Tree], + nullTerm: TermName, + primitiveTerm: TermName, + objectTerm: TermName) + + /** + * Given an expression tree returns an [[EvaluatedExpression]], which contains Scala trees that + *
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/993#issuecomment-49949147 QA tests have started for PR 993. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17066/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user marmbrus commented on the pull request: https://github.com/apache/spark/pull/993#issuecomment-49949454 Okay, I think this is getting pretty close to merging in. I've tested it with codegen turned on and it passes all of the tests. I've also built it successfully with both maven and sbt. I've also tried to address all of the previous comments. @pwendell, I'd appreciate it if you could take a quick glance at the build changes. Though they ended up being pretty minimal. @rxin @concretevitamin @yhuai @liancheng I think you have all already looked at this, but if there is any other feedback please let me know. I'd like to merge this in the next day or so as it makes some pretty sweeping changes which might cause conflicts as we get closer to the merge deadline. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15322479 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateProjection.scala --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.codegen + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ + + +/** + * Generates bytecode that produces a new [[Row]] object based on a fixed set of input + * [[Expression Expressions]] and a given input [[Row]]. The returned [[Row]] object is custom + * generated based on the output types of the [[Expression]] to avoid boxing of primitive values. + */ +object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] { + import scala.reflect.runtime.{universe = ru} + import scala.reflect.runtime.universe._ + + protected def canonicalize(in: Seq[Expression]): Seq[Expression] = +in.map(ExpressionCanonicalizer(_)) + + protected def bind(in: Seq[Expression], inputSchema: Seq[Attribute]): Seq[Expression] = +in.map(BindReferences.bindReference(_, inputSchema)) + + // Make Mutablility optional... + protected def create(expressions: Seq[Expression]): Projection = { +val tupleLength = ru.Literal(Constant(expressions.length)) +val lengthDef = qfinal val length = $tupleLength + +/* TODO: Configurable... +val nullFunctions = + q +private final val nullSet = new org.apache.spark.util.collection.BitSet(length) +final def setNullAt(i: Int) = nullSet.set(i) --- End diff -- I'd be pretty surprised if the JIT didn't take care of that for a function this small. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2054][SQL] Code Generation for Expressi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/993#discussion_r15330980 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala --- @@ -28,61 +30,27 @@ import org.apache.spark.sql.Logging * to be retrieved more efficiently. However, since operations like column pruning can change * the layout of intermediate tuples, BindReferences should be run after all such transformations. */ -case class BoundReference(ordinal: Int, baseReference: Attribute) - extends Attribute with trees.LeafNode[Expression] { +case class BoundReference(ordinal: Int, dataType: DataType, nullable: Boolean) + extends Expression with trees.LeafNode[Expression] { type EvaluatedType = Any - override def nullable = baseReference.nullable - override def dataType = baseReference.dataType - override def exprId = baseReference.exprId - override def qualifiers = baseReference.qualifiers - override def name = baseReference.name + def references = Set.empty --- End diff -- override here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---