[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r238489997 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala --- @@ -148,12 +156,25 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { }) // When we are regenerating the golden files we don't need to run all the configs as they // all need to return the same result - if (regenerateGoldenFiles && configs.nonEmpty) { -configs.take(1) + if (regenerateGoldenFiles) { +if (configs.nonEmpty) { + configs.take(1) +} else { + Array.empty[Array[(String, String)]] --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22512 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r238328405 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala --- @@ -148,12 +156,25 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { }) // When we are regenerating the golden files we don't need to run all the configs as they // all need to return the same result - if (regenerateGoldenFiles && configs.nonEmpty) { -configs.take(1) + if (regenerateGoldenFiles) { +if (configs.nonEmpty) { + configs.take(1) +} else { + Array.empty[Array[(String, String)]] --- End diff -- nit: since configs don't matter when generating result, I think we can just return empty configs here. We can clean it up in a followup PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r238177033 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala --- @@ -148,12 +156,21 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { }) // When we are regenerating the golden files we don't need to run all the configs as they // all need to return the same result - if (regenerateGoldenFiles && configs.nonEmpty) { + if (regenerateGoldenFiles) { configs.take(1) --- End diff -- For better readability, fixed in https://github.com/apache/spark/pull/22512/commits/4cdc5040feb3da1e4cf9efcf434138d5873fae04 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r238176184 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -64,7 +85,7 @@ class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mutable i = 0 while (i < validExprs.length) { val (_, ordinal) = validExprs(i) - mutableRow(ordinal) = buffer(ordinal) + fieldWriters(i)(buffer(ordinal)) --- End diff -- fixed in https://github.com/apache/spark/pull/22512/commits/95411c8b8b76503dff93756482642083a694b0b7 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r238175630 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala --- @@ -148,12 +156,21 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { }) // When we are regenerating the golden files we don't need to run all the configs as they // all need to return the same result - if (regenerateGoldenFiles && configs.nonEmpty) { + if (regenerateGoldenFiles) { configs.take(1) --- End diff -- Actually, it returns an empty array? ``` scala> Array.empty.take(1) res0: Array[Nothing] = Array() scala> Seq.empty.take(1) res1: Seq[Nothing] = List() ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r238172470 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala --- @@ -148,12 +156,21 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { }) // When we are regenerating the golden files we don't need to run all the configs as they // all need to return the same result - if (regenerateGoldenFiles && configs.nonEmpty) { + if (regenerateGoldenFiles) { configs.take(1) --- End diff -- what if `configs` is empty? `take(1)` will fail --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r238151565 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -64,7 +85,7 @@ class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mutable i = 0 while (i < validExprs.length) { val (_, ordinal) = validExprs(i) - mutableRow(ordinal) = buffer(ordinal) + fieldWriters(i)(buffer(ordinal)) --- End diff -- ah, sounds reasonable. I'll update later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r238146834 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -64,7 +85,7 @@ class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mutable i = 0 while (i < validExprs.length) { val (_, ordinal) = validExprs(i) - mutableRow(ordinal) = buffer(ordinal) + fieldWriters(i)(buffer(ordinal)) --- End diff -- Since `fieldWriters` is accessed via index, we should use `IndexedSeq` or `Array` explicitly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r227626505 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MutableProjectionSuite.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +class MutableProjectionSuite extends SparkFunSuite with ExpressionEvalHelper { + + private def createMutableProjection(dataTypes: Array[DataType]): MutableProjection = { +MutableProjection.create(dataTypes.zipWithIndex.map(x => BoundReference(x._2, x._1, true))) + } + + testBothCodegenAndInterpreted("fixed-length types") { +val fixedLengthTypes = Array[DataType]( + BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, + DateType, TimestampType) +val proj = createMutableProjection(fixedLengthTypes) +val inputRow = InternalRow.fromSeq( + Seq(false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 5.0, 100, 200L)) +assert(proj(inputRow) === inputRow) + +// Use UnsafeRow as buffer +val numBytes = UnsafeRow.calculateBitSetWidthInBytes(fixedLengthTypes.length) +val unsafeBuffer = UnsafeRow.createFromByteArray(numBytes, fixedLengthTypes.length) +val projUnsafeRow = proj.target(unsafeBuffer)(inputRow) +assert(FromUnsafeProjection(fixedLengthTypes)(projUnsafeRow) === inputRow) + } + + testBothCodegenAndInterpreted("variable-length types") { +val variableLengthTypes = Array( + StringType, DecimalType.defaultConcreteType, CalendarIntervalType, BinaryType, + ArrayType(StringType), MapType(IntegerType, StringType), + StructType.fromDDL("a INT, b STRING"), ObjectType(classOf[java.lang.Integer])) +val proj = createMutableProjection(variableLengthTypes) --- End diff -- sure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r227626456 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala --- @@ -143,4 +144,25 @@ object InternalRow { case u: UserDefinedType[_] => getAccessor(u.sqlType) case _ => (input, ordinal) => input.get(ordinal, dataType) } + + /** + * Returns a writer for an `InternalRow` with given data type. + */ + def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = dt match { +case BooleanType => (input, v) => input.setBoolean(ordinal, v.asInstanceOf[Boolean]) +case ByteType => (input, v) => input.setByte(ordinal, v.asInstanceOf[Byte]) +case ShortType => (input, v) => input.setShort(ordinal, v.asInstanceOf[Short]) +case IntegerType | DateType => (input, v) => input.setInt(ordinal, v.asInstanceOf[Int]) +case LongType | TimestampType => (input, v) => input.setLong(ordinal, v.asInstanceOf[Long]) +case FloatType => (input, v) => input.setFloat(ordinal, v.asInstanceOf[Float]) +case DoubleType => (input, v) => input.setDouble(ordinal, v.asInstanceOf[Double]) +case DecimalType.Fixed(precision, _) => + (input, v) => input.setDecimal(ordinal, v.asInstanceOf[Decimal], precision) +case CalendarIntervalType | BinaryType | _: ArrayType | StringType | _: StructType | + _: MapType | _: ObjectType => + (input, v) => input.update(ordinal, v) +case udt: UserDefinedType[_] => getWriter(ordinal, udt.sqlType) +case NullType => (input, _) => input.setNullAt(ordinal) +case _ => throw new SparkException(s"Unsupported data type $dt") --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r227618778 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MutableProjectionSuite.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +class MutableProjectionSuite extends SparkFunSuite with ExpressionEvalHelper { + + private def createMutableProjection(dataTypes: Array[DataType]): MutableProjection = { +MutableProjection.create(dataTypes.zipWithIndex.map(x => BoundReference(x._2, x._1, true))) + } + + testBothCodegenAndInterpreted("fixed-length types") { +val fixedLengthTypes = Array[DataType]( + BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, + DateType, TimestampType) +val proj = createMutableProjection(fixedLengthTypes) +val inputRow = InternalRow.fromSeq( + Seq(false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 5.0, 100, 200L)) +assert(proj(inputRow) === inputRow) + +// Use UnsafeRow as buffer +val numBytes = UnsafeRow.calculateBitSetWidthInBytes(fixedLengthTypes.length) +val unsafeBuffer = UnsafeRow.createFromByteArray(numBytes, fixedLengthTypes.length) +val projUnsafeRow = proj.target(unsafeBuffer)(inputRow) +assert(FromUnsafeProjection(fixedLengthTypes)(projUnsafeRow) === inputRow) + } + + testBothCodegenAndInterpreted("variable-length types") { +val variableLengthTypes = Array( + StringType, DecimalType.defaultConcreteType, CalendarIntervalType, BinaryType, + ArrayType(StringType), MapType(IntegerType, StringType), + StructType.fromDDL("a INT, b STRING"), ObjectType(classOf[java.lang.Integer])) +val proj = createMutableProjection(variableLengthTypes) --- End diff -- shall we also test that we should fail if the target row is unsafe row? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r227618313 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala --- @@ -143,4 +144,25 @@ object InternalRow { case u: UserDefinedType[_] => getAccessor(u.sqlType) case _ => (input, ordinal) => input.get(ordinal, dataType) } + + /** + * Returns a writer for an `InternalRow` with given data type. + */ + def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = dt match { +case BooleanType => (input, v) => input.setBoolean(ordinal, v.asInstanceOf[Boolean]) +case ByteType => (input, v) => input.setByte(ordinal, v.asInstanceOf[Byte]) +case ShortType => (input, v) => input.setShort(ordinal, v.asInstanceOf[Short]) +case IntegerType | DateType => (input, v) => input.setInt(ordinal, v.asInstanceOf[Int]) +case LongType | TimestampType => (input, v) => input.setLong(ordinal, v.asInstanceOf[Long]) +case FloatType => (input, v) => input.setFloat(ordinal, v.asInstanceOf[Float]) +case DoubleType => (input, v) => input.setDouble(ordinal, v.asInstanceOf[Double]) +case DecimalType.Fixed(precision, _) => + (input, v) => input.setDecimal(ordinal, v.asInstanceOf[Decimal], precision) +case CalendarIntervalType | BinaryType | _: ArrayType | StringType | _: StructType | + _: MapType | _: ObjectType => + (input, v) => input.update(ordinal, v) +case udt: UserDefinedType[_] => getWriter(ordinal, udt.sqlType) +case NullType => (input, _) => input.setNullAt(ordinal) +case _ => throw new SparkException(s"Unsupported data type $dt") --- End diff -- one minor point: the codegen version just call `row.update` for un-caught types, which means it supports object type as well. Shall we follow? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r227609747 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala --- @@ -143,4 +144,24 @@ object InternalRow { case u: UserDefinedType[_] => getAccessor(u.sqlType) case _ => (input, ordinal) => input.get(ordinal, dataType) } + + /** + * Returns a writer for an `InternalRow` with given data type. + */ + def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = dt match { +case BooleanType => (input, v) => input.setBoolean(ordinal, v.asInstanceOf[Boolean]) +case ByteType => (input, v) => input.setByte(ordinal, v.asInstanceOf[Byte]) +case ShortType => (input, v) => input.setShort(ordinal, v.asInstanceOf[Short]) +case IntegerType | DateType => (input, v) => input.setInt(ordinal, v.asInstanceOf[Int]) +case LongType | TimestampType => (input, v) => input.setLong(ordinal, v.asInstanceOf[Long]) +case FloatType => (input, v) => input.setFloat(ordinal, v.asInstanceOf[Float]) +case DoubleType => (input, v) => input.setDouble(ordinal, v.asInstanceOf[Double]) +case DecimalType.Fixed(precision, _) => + (input, v) => input.setDecimal(ordinal, v.asInstanceOf[Decimal], precision) +case CalendarIntervalType | BinaryType | _: ArrayType | StringType | _: StructType | + _: MapType | _: ObjectType | _: UserDefinedType[_] => + (input, v) => input.update(ordinal, v) +case NullType => (input, v) => {} --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r227609600 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala --- @@ -143,4 +144,24 @@ object InternalRow { case u: UserDefinedType[_] => getAccessor(u.sqlType) case _ => (input, ordinal) => input.get(ordinal, dataType) } + + /** + * Returns a writer for an `InternalRow` with given data type. + */ + def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = dt match { +case BooleanType => (input, v) => input.setBoolean(ordinal, v.asInstanceOf[Boolean]) +case ByteType => (input, v) => input.setByte(ordinal, v.asInstanceOf[Byte]) +case ShortType => (input, v) => input.setShort(ordinal, v.asInstanceOf[Short]) +case IntegerType | DateType => (input, v) => input.setInt(ordinal, v.asInstanceOf[Int]) +case LongType | TimestampType => (input, v) => input.setLong(ordinal, v.asInstanceOf[Long]) +case FloatType => (input, v) => input.setFloat(ordinal, v.asInstanceOf[Float]) +case DoubleType => (input, v) => input.setDouble(ordinal, v.asInstanceOf[Double]) +case DecimalType.Fixed(precision, _) => + (input, v) => input.setDecimal(ordinal, v.asInstanceOf[Decimal], precision) +case CalendarIntervalType | BinaryType | _: ArrayType | StringType | _: StructType | + _: MapType | _: ObjectType | _: UserDefinedType[_] => --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r227433044 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala --- @@ -143,4 +144,24 @@ object InternalRow { case u: UserDefinedType[_] => getAccessor(u.sqlType) case _ => (input, ordinal) => input.get(ordinal, dataType) } + + /** + * Returns a writer for an `InternalRow` with given data type. + */ + def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = dt match { +case BooleanType => (input, v) => input.setBoolean(ordinal, v.asInstanceOf[Boolean]) +case ByteType => (input, v) => input.setByte(ordinal, v.asInstanceOf[Byte]) +case ShortType => (input, v) => input.setShort(ordinal, v.asInstanceOf[Short]) +case IntegerType | DateType => (input, v) => input.setInt(ordinal, v.asInstanceOf[Int]) +case LongType | TimestampType => (input, v) => input.setLong(ordinal, v.asInstanceOf[Long]) +case FloatType => (input, v) => input.setFloat(ordinal, v.asInstanceOf[Float]) +case DoubleType => (input, v) => input.setDouble(ordinal, v.asInstanceOf[Double]) +case DecimalType.Fixed(precision, _) => + (input, v) => input.setDecimal(ordinal, v.asInstanceOf[Decimal], precision) +case CalendarIntervalType | BinaryType | _: ArrayType | StringType | _: StructType | + _: MapType | _: ObjectType | _: UserDefinedType[_] => --- End diff -- we should recursive into UDT. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r227432603 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala --- @@ -143,4 +144,24 @@ object InternalRow { case u: UserDefinedType[_] => getAccessor(u.sqlType) case _ => (input, ordinal) => input.get(ordinal, dataType) } + + /** + * Returns a writer for an `InternalRow` with given data type. + */ + def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = dt match { +case BooleanType => (input, v) => input.setBoolean(ordinal, v.asInstanceOf[Boolean]) +case ByteType => (input, v) => input.setByte(ordinal, v.asInstanceOf[Byte]) +case ShortType => (input, v) => input.setShort(ordinal, v.asInstanceOf[Short]) +case IntegerType | DateType => (input, v) => input.setInt(ordinal, v.asInstanceOf[Int]) +case LongType | TimestampType => (input, v) => input.setLong(ordinal, v.asInstanceOf[Long]) +case FloatType => (input, v) => input.setFloat(ordinal, v.asInstanceOf[Float]) +case DoubleType => (input, v) => input.setDouble(ordinal, v.asInstanceOf[Double]) +case DecimalType.Fixed(precision, _) => + (input, v) => input.setDecimal(ordinal, v.asInstanceOf[Decimal], precision) +case CalendarIntervalType | BinaryType | _: ArrayType | StringType | _: StructType | + _: MapType | _: ObjectType | _: UserDefinedType[_] => + (input, v) => input.update(ordinal, v) +case NullType => (input, v) => {} --- End diff -- In the codegen version `CodeGenerator.setColumn`, we don't match NullType and eventually call `row.update(null, i)`. Shall we follow and call `row.setNullAt` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r22723 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -49,10 +51,54 @@ class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mutable def currentValue: InternalRow = mutableRow override def target(row: InternalRow): MutableProjection = { +// If `mutableRow` is `UnsafeRow`, `MutableProjection` accepts fixed-length types only +assert(!row.isInstanceOf[UnsafeRow] || + validExprs.forall { case (e, _) => UnsafeRow.isFixedLength(e.dataType) }) mutableRow = row this } + private[this] val fieldWriters = validExprs.map { case (e, i) => +val writer = generateRowWriter(i, e.dataType) +if (!e.nullable) { + (v: Any) => writer(v) +} else { + (v: Any) => { +if (v == null) { + mutableRow.setNullAt(i) +} else { + writer(v) +} + } +} + } + + private def generateRowWriter(ordinal: Int, dt: DataType): Any => Unit = dt match { +case BooleanType => + v => mutableRow.setBoolean(ordinal, v.asInstanceOf[Boolean]) +case ByteType => + v => mutableRow.setByte(ordinal, v.asInstanceOf[Byte]) +case ShortType => + v => mutableRow.setShort(ordinal, v.asInstanceOf[Short]) +case IntegerType | DateType => + v => mutableRow.setInt(ordinal, v.asInstanceOf[Int]) +case LongType | TimestampType => + v => mutableRow.setLong(ordinal, v.asInstanceOf[Long]) +case FloatType => + v => mutableRow.setFloat(ordinal, v.asInstanceOf[Float]) +case DoubleType => + v => mutableRow.setDouble(ordinal, v.asInstanceOf[Double]) +case DecimalType.Fixed(precision, _) => + v => mutableRow.setDecimal(ordinal, v.asInstanceOf[Decimal], precision) +case CalendarIntervalType | BinaryType | _: ArrayType | StringType | _: StructType | + _: MapType | _: UserDefinedType[_] => + v => mutableRow.update(ordinal, v) +case NullType => + v => {} --- End diff -- the corresponding logic in the codegen version is simply call `row.update(null, i)`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r227226902 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala --- @@ -140,6 +141,14 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { val input = fileToString(new File(testCase.inputFile)) val (comments, code) = input.split("\n").partition(_.startsWith("--")) + +// Runs all the tests on both codegen-only and interpreter modes. Since explain results differ +// when `WHOLESTAGE_CODEGEN_ENABLED` disabled, we don't run these tests now. +val codegenConfigSets = Array(("false", "NO_CODEGEN"), ("true", "CODEGEN_ONLY")).map { + case (wholeStageCodegenEnabled, codegenFactoryMode) => +Array( // SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> wholeStageCodegenEnabled, --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r227224209 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -49,10 +51,54 @@ class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mutable def currentValue: InternalRow = mutableRow override def target(row: InternalRow): MutableProjection = { +// If `mutableRow` is `UnsafeRow`, `MutableProjection` accepts fixed-length types only +assert(!row.isInstanceOf[UnsafeRow] || + validExprs.forall { case (e, _) => UnsafeRow.isFixedLength(e.dataType) }) mutableRow = row this } + private[this] val fieldWriters = validExprs.map { case (e, i) => +val writer = generateRowWriter(i, e.dataType) +if (!e.nullable) { + (v: Any) => writer(v) +} else { + (v: Any) => { +if (v == null) { + mutableRow.setNullAt(i) +} else { + writer(v) +} + } +} + } + + private def generateRowWriter(ordinal: Int, dt: DataType): Any => Unit = dt match { --- End diff -- oh, yes! yea, I will. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r227224030 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -49,10 +51,54 @@ class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mutable def currentValue: InternalRow = mutableRow override def target(row: InternalRow): MutableProjection = { +// If `mutableRow` is `UnsafeRow`, `MutableProjection` accepts fixed-length types only +assert(!row.isInstanceOf[UnsafeRow] || + validExprs.forall { case (e, _) => UnsafeRow.isFixedLength(e.dataType) }) mutableRow = row this } + private[this] val fieldWriters = validExprs.map { case (e, i) => +val writer = generateRowWriter(i, e.dataType) +if (!e.nullable) { + (v: Any) => writer(v) +} else { + (v: Any) => { +if (v == null) { + mutableRow.setNullAt(i) +} else { + writer(v) +} + } +} + } + + private def generateRowWriter(ordinal: Int, dt: DataType): Any => Unit = dt match { +case BooleanType => + v => mutableRow.setBoolean(ordinal, v.asInstanceOf[Boolean]) +case ByteType => + v => mutableRow.setByte(ordinal, v.asInstanceOf[Byte]) +case ShortType => + v => mutableRow.setShort(ordinal, v.asInstanceOf[Short]) +case IntegerType | DateType => + v => mutableRow.setInt(ordinal, v.asInstanceOf[Int]) +case LongType | TimestampType => + v => mutableRow.setLong(ordinal, v.asInstanceOf[Long]) +case FloatType => + v => mutableRow.setFloat(ordinal, v.asInstanceOf[Float]) +case DoubleType => + v => mutableRow.setDouble(ordinal, v.asInstanceOf[Double]) +case DecimalType.Fixed(precision, _) => + v => mutableRow.setDecimal(ordinal, v.asInstanceOf[Decimal], precision) +case CalendarIntervalType | BinaryType | _: ArrayType | StringType | _: StructType | + _: MapType | _: UserDefinedType[_] => + v => mutableRow.update(ordinal, v) +case NullType => + v => {} --- End diff -- We need to take care of `e.nullable && e.dataType == NullType` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r227202171 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala --- @@ -140,6 +141,14 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext { val input = fileToString(new File(testCase.inputFile)) val (comments, code) = input.split("\n").partition(_.startsWith("--")) + +// Runs all the tests on both codegen-only and interpreter modes. Since explain results differ +// when `WHOLESTAGE_CODEGEN_ENABLED` disabled, we don't run these tests now. +val codegenConfigSets = Array(("false", "NO_CODEGEN"), ("true", "CODEGEN_ONLY")).map { + case (wholeStageCodegenEnabled, codegenFactoryMode) => +Array( // SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> wholeStageCodegenEnabled, --- End diff -- If `wholeStageCodegenEnabled` is not used, let's not complicate the code now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r227200656 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -49,10 +51,54 @@ class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mutable def currentValue: InternalRow = mutableRow override def target(row: InternalRow): MutableProjection = { +// If `mutableRow` is `UnsafeRow`, `MutableProjection` accepts fixed-length types only +assert(!row.isInstanceOf[UnsafeRow] || + validExprs.forall { case (e, _) => UnsafeRow.isFixedLength(e.dataType) }) mutableRow = row this } + private[this] val fieldWriters = validExprs.map { case (e, i) => +val writer = generateRowWriter(i, e.dataType) +if (!e.nullable) { + (v: Any) => writer(v) +} else { + (v: Any) => { +if (v == null) { + mutableRow.setNullAt(i) +} else { + writer(v) +} + } +} + } + + private def generateRowWriter(ordinal: Int, dt: DataType): Any => Unit = dt match { --- End diff -- we have `InternalRow.getAccessor`, shall we move this method there too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r227200458 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -49,10 +51,54 @@ class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mutable def currentValue: InternalRow = mutableRow override def target(row: InternalRow): MutableProjection = { +// If `mutableRow` is `UnsafeRow`, `MutableProjection` accepts fixed-length types only +assert(!row.isInstanceOf[UnsafeRow] || + validExprs.forall { case (e, _) => UnsafeRow.isFixedLength(e.dataType) }) mutableRow = row this } + private[this] val fieldWriters = validExprs.map { case (e, i) => +val writer = generateRowWriter(i, e.dataType) +if (!e.nullable) { + (v: Any) => writer(v) +} else { + (v: Any) => { +if (v == null) { + mutableRow.setNullAt(i) +} else { + writer(v) +} + } +} + } + + private def generateRowWriter(ordinal: Int, dt: DataType): Any => Unit = dt match { +case BooleanType => + v => mutableRow.setBoolean(ordinal, v.asInstanceOf[Boolean]) +case ByteType => + v => mutableRow.setByte(ordinal, v.asInstanceOf[Byte]) +case ShortType => + v => mutableRow.setShort(ordinal, v.asInstanceOf[Short]) +case IntegerType | DateType => + v => mutableRow.setInt(ordinal, v.asInstanceOf[Int]) +case LongType | TimestampType => + v => mutableRow.setLong(ordinal, v.asInstanceOf[Long]) +case FloatType => + v => mutableRow.setFloat(ordinal, v.asInstanceOf[Float]) +case DoubleType => + v => mutableRow.setDouble(ordinal, v.asInstanceOf[Double]) +case DecimalType.Fixed(precision, _) => + v => mutableRow.setDecimal(ordinal, v.asInstanceOf[Decimal], precision) +case CalendarIntervalType | BinaryType | _: ArrayType | StringType | _: StructType | + _: MapType | _: UserDefinedType[_] => + v => mutableRow.update(ordinal, v) +case NullType => + v => {} --- End diff -- shall we call `mutableRow.setNullAt`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22512#discussion_r225140616 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala --- @@ -53,6 +55,47 @@ class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mutable this } + private[this] val fieldWriters = validExprs.map { case (e, i) => +val writer = generateRowWriter(i, e.dataType) +if (!e.nullable) { + (v: Any) => writer(v) +} else { + (v: Any) => { +if (v == null) { + mutableRow.setNullAt(i) +} else { + writer(v) +} + } +} + } + + private def generateRowWriter(ordinal: Int, dt: DataType): Any => Unit = dt match { +case BooleanType => + v => mutableRow.setBoolean(ordinal, v.asInstanceOf[Boolean]) +case ByteType => + v => mutableRow.setByte(ordinal, v.asInstanceOf[Byte]) +case ShortType => + v => mutableRow.setShort(ordinal, v.asInstanceOf[Short]) +case IntegerType | DateType => + v => mutableRow.setInt(ordinal, v.asInstanceOf[Int]) +case LongType | TimestampType => + v => mutableRow.setLong(ordinal, v.asInstanceOf[Long]) +case FloatType => + v => mutableRow.setFloat(ordinal, v.asInstanceOf[Float]) +case DoubleType => + v => mutableRow.setDouble(ordinal, v.asInstanceOf[Double]) +case DecimalType.Fixed(precision, _) => + v => mutableRow.setDecimal(ordinal, v.asInstanceOf[Decimal], precision) +case CalendarIntervalType | BinaryType | _: ArrayType | StringType | _: StructType | + _: MapType | _: UserDefinedType[_] => + v => mutableRow.update(ordinal, v) --- End diff -- This match should only accept the generic internal rows, so I added code to verify types for the `UnsafeRow` case; https://github.com/apache/spark/pull/22512/files#diff-3ed819282d4e4941571dd3b08fc03e37R55 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org