[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22355 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22355#discussion_r218042119 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala --- @@ -69,11 +85,26 @@ class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanT test("codegen failures in the CODEGEN_ONLY mode") { val errMsg = intercept[ExecutionException] { val input = Seq(BoundReference(0, IntegerType, nullable = true)) - val codegenOnly = CodegenObjectFactoryMode.CODEGEN_ONLY.toString withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) { FailedCodegenProjection.createObject(input) } + val noCodegen = CodegenObjectFactoryMode.NO_CODEGEN.toString --- End diff -- oh.. yes. I'll remove. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22355#discussion_r218041490 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala --- @@ -69,11 +85,26 @@ class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanT test("codegen failures in the CODEGEN_ONLY mode") { val errMsg = intercept[ExecutionException] { val input = Seq(BoundReference(0, IntegerType, nullable = true)) - val codegenOnly = CodegenObjectFactoryMode.CODEGEN_ONLY.toString withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) { FailedCodegenProjection.createObject(input) } + val noCodegen = CodegenObjectFactoryMode.NO_CODEGEN.toString --- End diff -- unnecessary line? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22355#discussion_r217860646 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp + + +/** + * A [[MutableProjection]] that is calculated by calling `eval` on each of the specified + * expressions. + * + * @param expressions a sequence of expressions that determine the value of each column of the + *output row. + */ +class InterpretedMutableProjection(expressions: Seq[Expression]) extends MutableProjection { + def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) = +this(toBoundExprs(expressions, inputSchema)) + + private[this] val buffer = new Array[Any](expressions.size) + + override def initialize(partitionIndex: Int): Unit = { +expressions.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => +}) + } + + private[this] val validExprs = expressions.zipWithIndex.filter { +case (NoOp, _) => false +case _ => true + } + private[this] var mutableRow: InternalRow = new GenericInternalRow(expressions.size) + def currentValue: InternalRow = mutableRow + + override def target(row: InternalRow): MutableProjection = { +mutableRow = row +this + } + + override def apply(input: InternalRow): InternalRow = { +validExprs.foreach { case (expr, i) => --- End diff -- oh, I forgot that we should do that in performance-sensitive places... Thanks! I'll update. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22355#discussion_r217841164 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp + + +/** + * A [[MutableProjection]] that is calculated by calling `eval` on each of the specified + * expressions. + * + * @param expressions a sequence of expressions that determine the value of each column of the + *output row. + */ +class InterpretedMutableProjection(expressions: Seq[Expression]) extends MutableProjection { + def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) = +this(toBoundExprs(expressions, inputSchema)) + + private[this] val buffer = new Array[Any](expressions.size) + + override def initialize(partitionIndex: Int): Unit = { +expressions.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => +}) + } + + private[this] val validExprs = expressions.zipWithIndex.filter { +case (NoOp, _) => false +case _ => true + } + private[this] var mutableRow: InternalRow = new GenericInternalRow(expressions.size) + def currentValue: InternalRow = mutableRow + + override def target(row: InternalRow): MutableProjection = { +mutableRow = row +this + } + + override def apply(input: InternalRow): InternalRow = { +validExprs.foreach { case (expr, i) => --- End diff -- Can you please use the old code? That should be much more performant that this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22355#discussion_r217242015 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala --- @@ -37,19 +37,22 @@ object CodegenObjectFactoryMode extends Enumeration { */ abstract class CodeGeneratorWithInterpretedFallback[IN, OUT] extends Logging { - def createObject(in: IN): OUT = { + def createObject(in: IN): OUT = +createObject(in, subexpressionEliminationEnabled = false) --- End diff -- ok, I'll do that first in another pr. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22355#discussion_r217241810 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala --- @@ -37,19 +37,22 @@ object CodegenObjectFactoryMode extends Enumeration { */ abstract class CodeGeneratorWithInterpretedFallback[IN, OUT] extends Logging { - def createObject(in: IN): OUT = { + def createObject(in: IN): OUT = +createObject(in, subexpressionEliminationEnabled = false) --- End diff -- I took some more look, seems it's not an actual config, but just a method parameter. Maybe we should remove this config in 3.0. that said, let's keep your PR as it is. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22355#discussion_r217233868 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala --- @@ -37,19 +37,22 @@ object CodegenObjectFactoryMode extends Enumeration { */ abstract class CodeGeneratorWithInterpretedFallback[IN, OUT] extends Logging { - def createObject(in: IN): OUT = { + def createObject(in: IN): OUT = +createObject(in, subexpressionEliminationEnabled = false) --- End diff -- can we do that first? I think that will be a small change and can also make this PR simpler and easier to review. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22355#discussion_r217058960 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala --- @@ -37,19 +37,22 @@ object CodegenObjectFactoryMode extends Enumeration { */ abstract class CodeGeneratorWithInterpretedFallback[IN, OUT] extends Logging { - def createObject(in: IN): OUT = { + def createObject(in: IN): OUT = +createObject(in, subexpressionEliminationEnabled = false) --- End diff -- ah, it might be yes. I'll check in follow-up. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22355#discussion_r217054661 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala --- @@ -37,19 +37,22 @@ object CodegenObjectFactoryMode extends Enumeration { */ abstract class CodeGeneratorWithInterpretedFallback[IN, OUT] extends Logging { - def createObject(in: IN): OUT = { + def createObject(in: IN): OUT = +createObject(in, subexpressionEliminationEnabled = false) --- End diff -- Can we eliminate it? I think we can use `SQLConf.get` directly when we need to access the conf. This should be done in a different PR though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22355#discussion_r217017120 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.InternalRow + + +/** + * A [[MutableProjection]] that is calculated by calling `eval` on each of the specified + * expressions. + * + * @param expressions a sequence of expressions that determine the value of each column of the + *output row. + */ +class InterpretedMutableProjection(expressions: Seq[Expression]) extends MutableProjection { + def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) = +this(expressions.map(toBoundExprs(_, inputSchema))) --- End diff -- yea, haha. I've already updated! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...
Github user rednaxelafx commented on a diff in the pull request: https://github.com/apache/spark/pull/22355#discussion_r217016675 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.InternalRow + + +/** + * A [[MutableProjection]] that is calculated by calling `eval` on each of the specified + * expressions. + * + * @param expressions a sequence of expressions that determine the value of each column of the + *output row. + */ +class InterpretedMutableProjection(expressions: Seq[Expression]) extends MutableProjection { + def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) = +this(expressions.map(toBoundExprs(_, inputSchema))) --- End diff -- ``` [error] /home/jenkins/workspace/SparkPullRequestBuilder@2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala:32: type mismatch; [error] found : org.apache.spark.sql.catalyst.expressions.Expression [error] required: Seq[org.apache.spark.sql.catalyst.expressions.Expression] [error] this(expressions.map(toBoundExprs(_, inputSchema))) [error] ^ ``` It's probably `this(toBoundExprs(expressions, inputSchema))` right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...
Github user rednaxelafx commented on a diff in the pull request: https://github.com/apache/spark/pull/22355#discussion_r216525084 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.InternalRow + + +/** + * A [[MutableProjection]] that is calculated by calling `eval` on each of the specified + * expressions. + * + * @param expressions a sequence of expressions that determine the value of each column of the + *output row. + */ +class InterpretedMutableProjection(expressions: Seq[Expression]) extends MutableProjection { + def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) = +this(expressions.map(BindReferences.bindReference(_, inputSchema))) --- End diff -- use `toBoundExpr`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...
Github user rednaxelafx commented on a diff in the pull request: https://github.com/apache/spark/pull/22355#discussion_r216524666 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala --- @@ -86,24 +86,12 @@ package object expressions { } /** - * Converts a [[InternalRow]] to another Row given a sequence of expression that define each - * column of the new row. If the schema of the input row is specified, then the given expression - * will be bound to that schema. - * - * In contrast to a normal projection, a MutableProjection reuses the same underlying row object - * each time an input row is added. This significantly reduces the cost of calculating the - * projection, but means that it is not safe to hold on to a reference to a [[InternalRow]] after - * `next()` has been called on the [[Iterator]] that produced it. Instead, the user must call - * `InternalRow.copy()` and hold on to the returned [[InternalRow]] before calling `next()`. + * A helper function to bound given expressions to an input schema. --- End diff -- Spelling nitpick: s/bound/bind/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...
Github user rednaxelafx commented on a diff in the pull request: https://github.com/apache/spark/pull/22355#discussion_r216526434 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.InternalRow + + +/** + * A [[MutableProjection]] that is calculated by calling `eval` on each of the specified + * expressions. + * + * @param expressions a sequence of expressions that determine the value of each column of the + *output row. + */ +class InterpretedMutableProjection(expressions: Seq[Expression]) extends MutableProjection { + def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) = +this(expressions.map(BindReferences.bindReference(_, inputSchema))) + + private[this] val buffer = new Array[Any](expressions.size) + + override def initialize(partitionIndex: Int): Unit = { +expressions.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => +}) + } + + private[this] val exprArray = expressions.toArray + private[this] var mutableRow: InternalRow = new GenericInternalRow(exprArray.length) + def currentValue: InternalRow = mutableRow + + override def target(row: InternalRow): MutableProjection = { +mutableRow = row +this + } + + override def apply(input: InternalRow): InternalRow = { +var i = 0 +while (i < exprArray.length) { + // Store the result into buffer first, to make the projection atomic (needed by aggregation) + buffer(i) = exprArray(i).eval(input) + i += 1 +} +i = 0 +while (i < exprArray.length) { + mutableRow(i) = buffer(i) + i += 1 +} +mutableRow + } +} --- End diff -- +1 on the check for `NoOp`s. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...
Github user sadhen commented on a diff in the pull request: https://github.com/apache/spark/pull/22355#discussion_r216116648 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.InternalRow + + +/** + * A [[MutableProjection]] that is calculated by calling `eval` on each of the specified + * expressions. + * + * @param expressions a sequence of expressions that determine the value of each column of the + *output row. + */ +class InterpretedMutableProjection(expressions: Seq[Expression]) extends MutableProjection { + def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) = +this(expressions.map(BindReferences.bindReference(_, inputSchema))) + + private[this] val buffer = new Array[Any](expressions.size) + + override def initialize(partitionIndex: Int): Unit = { +expressions.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => +}) + } + + private[this] val exprArray = expressions.toArray + private[this] var mutableRow: InternalRow = new GenericInternalRow(exprArray.length) + def currentValue: InternalRow = mutableRow + + override def target(row: InternalRow): MutableProjection = { +mutableRow = row +this + } + + override def apply(input: InternalRow): InternalRow = { +var i = 0 +while (i < exprArray.length) { + // Store the result into buffer first, to make the projection atomic (needed by aggregation) + buffer(i) = exprArray(i).eval(input) + i += 1 +} +i = 0 +while (i < exprArray.length) { + mutableRow(i) = buffer(i) + i += 1 +} +mutableRow + } +} --- End diff -- Should be easy to create a test case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22355#discussion_r216114653 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.InternalRow + + +/** + * A [[MutableProjection]] that is calculated by calling `eval` on each of the specified + * expressions. + * + * @param expressions a sequence of expressions that determine the value of each column of the + *output row. + */ +class InterpretedMutableProjection(expressions: Seq[Expression]) extends MutableProjection { + def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) = +this(expressions.map(BindReferences.bindReference(_, inputSchema))) + + private[this] val buffer = new Array[Any](expressions.size) + + override def initialize(partitionIndex: Int): Unit = { +expressions.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => +}) + } + + private[this] val exprArray = expressions.toArray + private[this] var mutableRow: InternalRow = new GenericInternalRow(exprArray.length) + def currentValue: InternalRow = mutableRow + + override def target(row: InternalRow): MutableProjection = { +mutableRow = row +this + } + + override def apply(input: InternalRow): InternalRow = { +var i = 0 +while (i < exprArray.length) { + // Store the result into buffer first, to make the projection atomic (needed by aggregation) + buffer(i) = exprArray(i).eval(input) + i += 1 +} +i = 0 +while (i < exprArray.length) { + mutableRow(i) = buffer(i) + i += 1 +} +mutableRow + } +} --- End diff -- The change seems to look good to me though, I'ld like to address performance and code quality issues in follow-ups. cc: @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/22355#discussion_r215897598 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.InternalRow + + +/** + * A [[MutableProjection]] that is calculated by calling `eval` on each of the specified + * expressions. + * + * @param expressions a sequence of expressions that determine the value of each column of the + *output row. + */ +class InterpretedMutableProjection(expressions: Seq[Expression]) extends MutableProjection { + def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) = +this(expressions.map(BindReferences.bindReference(_, inputSchema))) + + private[this] val buffer = new Array[Any](expressions.size) + + override def initialize(partitionIndex: Int): Unit = { +expressions.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => +}) + } + + private[this] val exprArray = expressions.toArray + private[this] var mutableRow: InternalRow = new GenericInternalRow(exprArray.length) + def currentValue: InternalRow = mutableRow + + override def target(row: InternalRow): MutableProjection = { +mutableRow = row +this + } + + override def apply(input: InternalRow): InternalRow = { +var i = 0 +while (i < exprArray.length) { + // Store the result into buffer first, to make the projection atomic (needed by aggregation) + buffer(i) = exprArray(i).eval(input) + i += 1 +} +i = 0 +while (i < exprArray.length) { + mutableRow(i) = buffer(i) + i += 1 +} +mutableRow + } +} --- End diff -- Looks good point. Do you know whether there is a test cause that causes the exception with the interpreted one? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...
Github user sadhen commented on a diff in the pull request: https://github.com/apache/spark/pull/22355#discussion_r215862272 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.InternalRow + + +/** + * A [[MutableProjection]] that is calculated by calling `eval` on each of the specified + * expressions. + * + * @param expressions a sequence of expressions that determine the value of each column of the + *output row. + */ +class InterpretedMutableProjection(expressions: Seq[Expression]) extends MutableProjection { + def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) = +this(expressions.map(BindReferences.bindReference(_, inputSchema))) + + private[this] val buffer = new Array[Any](expressions.size) + + override def initialize(partitionIndex: Int): Unit = { +expressions.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => +}) + } + + private[this] val exprArray = expressions.toArray + private[this] var mutableRow: InternalRow = new GenericInternalRow(exprArray.length) + def currentValue: InternalRow = mutableRow + + override def target(row: InternalRow): MutableProjection = { +mutableRow = row +this + } + + override def apply(input: InternalRow): InternalRow = { +var i = 0 +while (i < exprArray.length) { + // Store the result into buffer first, to make the projection atomic (needed by aggregation) + buffer(i) = exprArray(i).eval(input) + i += 1 +} +i = 0 +while (i < exprArray.length) { + mutableRow(i) = buffer(i) + i += 1 +} +mutableRow + } +} --- End diff -- The improvement (written 3 months ago) is based on the generated Java code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...
Github user sadhen commented on a diff in the pull request: https://github.com/apache/spark/pull/22355#discussion_r215859282 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.InternalRow + + +/** + * A [[MutableProjection]] that is calculated by calling `eval` on each of the specified + * expressions. + * + * @param expressions a sequence of expressions that determine the value of each column of the + *output row. + */ +class InterpretedMutableProjection(expressions: Seq[Expression]) extends MutableProjection { + def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) = +this(expressions.map(BindReferences.bindReference(_, inputSchema))) + + private[this] val buffer = new Array[Any](expressions.size) + + override def initialize(partitionIndex: Int): Unit = { +expressions.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => +}) + } + + private[this] val exprArray = expressions.toArray + private[this] var mutableRow: InternalRow = new GenericInternalRow(exprArray.length) + def currentValue: InternalRow = mutableRow + + override def target(row: InternalRow): MutableProjection = { +mutableRow = row +this + } + + override def apply(input: InternalRow): InternalRow = { +var i = 0 +while (i < exprArray.length) { + // Store the result into buffer first, to make the projection atomic (needed by aggregation) + buffer(i) = exprArray(i).eval(input) + i += 1 +} +i = 0 +while (i < exprArray.length) { + mutableRow(i) = buffer(i) + i += 1 +} +mutableRow + } +} --- End diff -- This is my improved version of InterpretedMutableProject: ``` override def apply(input: InternalRow): InternalRow = { var i = 0 while (i < exprArray.length) { // Store the result into buffer first, to make the projection atomic (needed by aggregation) if (exprArray(i) != NoOp) { buffer(i) = exprArray(i).eval(input) } i += 1 } i = 0 while (i < exprArray.length) { if (exprArray(i) != NoOp) { mutableRow(i) = buffer(i) } i += 1 } mutableRow } ``` The AggregationIterator uses NoOp. If you replace the codegen one with the interpreted one. You will encounter an exception from `NoOp.eval`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22355#discussion_r215659345 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala --- @@ -178,15 +236,7 @@ object UnsafeProjection exprs: Seq[Expression], inputSchema: Seq[Attribute], subexpressionEliminationEnabled: Boolean): UnsafeProjection = { -val unsafeExprs = toUnsafeExprs(toBoundExprs(exprs, inputSchema)) -try { - GenerateUnsafeProjection.generate(unsafeExprs, subexpressionEliminationEnabled) -} catch { - case NonFatal(_) => -// We should have already seen the error message in `CodeGenerator` -logWarning("Expr codegen error and falling back to interpreter mode") -InterpretedUnsafeProjection.createProjection(unsafeExprs) -} +createObject(toUnsafeExprs(toBoundExprs(exprs, inputSchema)), subexpressionEliminationEnabled) --- End diff -- Removed the duplicated fallback logic here and reused the logic of `CodeGeneratorWithInterpretedFallback`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22355: [SPARK-25358][SQL] MutableProjection supports fal...
GitHub user maropu opened a pull request: https://github.com/apache/spark/pull/22355 [SPARK-25358][SQL] MutableProjection supports fallback to an interpreted mode ## What changes were proposed in this pull request? In SPARK-23711, `UnsafeProjection` supports fallback to an interpreted mode. Therefore, this pr fixed code to support the same fallback mode in `MutableProjection` based on `CodeGeneratorWithInterpretedFallback`. ## How was this patch tested? Added tests in `CodeGeneratorWithInterpretedFallbackSuite`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/maropu/spark SPARK-25358 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22355.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22355 commit c25c4269f6127b445208c752ddadcc23ae77a578 Author: Takeshi Yamamuro Date: 2018-09-06T14:48:04Z Fix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org