[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-12-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r238489997
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala ---
@@ -148,12 +156,25 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSQLContext {
   })
   // When we are regenerating the golden files we don't need to run 
all the configs as they
   // all need to return the same result
-  if (regenerateGoldenFiles && configs.nonEmpty) {
-configs.take(1)
+  if (regenerateGoldenFiles) {
+if (configs.nonEmpty) {
+  configs.take(1)
+} else {
+  Array.empty[Array[(String, String)]]
--- End diff --

ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-12-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22512


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r238328405
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala ---
@@ -148,12 +156,25 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSQLContext {
   })
   // When we are regenerating the golden files we don't need to run 
all the configs as they
   // all need to return the same result
-  if (regenerateGoldenFiles && configs.nonEmpty) {
-configs.take(1)
+  if (regenerateGoldenFiles) {
+if (configs.nonEmpty) {
+  configs.take(1)
+} else {
+  Array.empty[Array[(String, String)]]
--- End diff --

nit: since configs don't matter when generating result, I think we can just 
return empty configs here. We can clean it up in a followup PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-12-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r238177033
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala ---
@@ -148,12 +156,21 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSQLContext {
   })
   // When we are regenerating the golden files we don't need to run 
all the configs as they
   // all need to return the same result
-  if (regenerateGoldenFiles && configs.nonEmpty) {
+  if (regenerateGoldenFiles) {
 configs.take(1)
--- End diff --

For better readability, fixed in 
https://github.com/apache/spark/pull/22512/commits/4cdc5040feb3da1e4cf9efcf434138d5873fae04


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-12-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r238176184
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -64,7 +85,7 @@ class InterpretedMutableProjection(expressions: 
Seq[Expression]) extends Mutable
 i = 0
 while (i < validExprs.length) {
   val (_, ordinal) = validExprs(i)
-  mutableRow(ordinal) = buffer(ordinal)
+  fieldWriters(i)(buffer(ordinal))
--- End diff --

fixed in 
https://github.com/apache/spark/pull/22512/commits/95411c8b8b76503dff93756482642083a694b0b7


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-12-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r238175630
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala ---
@@ -148,12 +156,21 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSQLContext {
   })
   // When we are regenerating the golden files we don't need to run 
all the configs as they
   // all need to return the same result
-  if (regenerateGoldenFiles && configs.nonEmpty) {
+  if (regenerateGoldenFiles) {
 configs.take(1)
--- End diff --

Actually, it returns an empty array?
```
scala> Array.empty.take(1)
res0: Array[Nothing] = Array()

scala> Seq.empty.take(1)
res1: Seq[Nothing] = List()
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r238172470
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala ---
@@ -148,12 +156,21 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSQLContext {
   })
   // When we are regenerating the golden files we don't need to run 
all the configs as they
   // all need to return the same result
-  if (regenerateGoldenFiles && configs.nonEmpty) {
+  if (regenerateGoldenFiles) {
 configs.take(1)
--- End diff --

what if `configs` is empty? `take(1)` will fail


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-12-02 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r238151565
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -64,7 +85,7 @@ class InterpretedMutableProjection(expressions: 
Seq[Expression]) extends Mutable
 i = 0
 while (i < validExprs.length) {
   val (_, ordinal) = validExprs(i)
-  mutableRow(ordinal) = buffer(ordinal)
+  fieldWriters(i)(buffer(ordinal))
--- End diff --

ah, sounds reasonable. I'll update later.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-12-02 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r238146834
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -64,7 +85,7 @@ class InterpretedMutableProjection(expressions: 
Seq[Expression]) extends Mutable
 i = 0
 while (i < validExprs.length) {
   val (_, ordinal) = validExprs(i)
-  mutableRow(ordinal) = buffer(ordinal)
+  fieldWriters(i)(buffer(ordinal))
--- End diff --

Since `fieldWriters` is accessed via index, we should use `IndexedSeq` or 
`Array` explicitly?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-10-23 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r227626505
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MutableProjectionSuite.scala
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+class MutableProjectionSuite extends SparkFunSuite with 
ExpressionEvalHelper {
+
+  private def createMutableProjection(dataTypes: Array[DataType]): 
MutableProjection = {
+MutableProjection.create(dataTypes.zipWithIndex.map(x => 
BoundReference(x._2, x._1, true)))
+  }
+
+  testBothCodegenAndInterpreted("fixed-length types") {
+val fixedLengthTypes = Array[DataType](
+  BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, 
DoubleType,
+  DateType, TimestampType)
+val proj = createMutableProjection(fixedLengthTypes)
+val inputRow = InternalRow.fromSeq(
+  Seq(false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 5.0, 100, 200L))
+assert(proj(inputRow) === inputRow)
+
+// Use UnsafeRow as buffer
+val numBytes = 
UnsafeRow.calculateBitSetWidthInBytes(fixedLengthTypes.length)
+val unsafeBuffer = UnsafeRow.createFromByteArray(numBytes, 
fixedLengthTypes.length)
+val projUnsafeRow = proj.target(unsafeBuffer)(inputRow)
+assert(FromUnsafeProjection(fixedLengthTypes)(projUnsafeRow) === 
inputRow)
+  }
+
+  testBothCodegenAndInterpreted("variable-length types") {
+val variableLengthTypes = Array(
+  StringType, DecimalType.defaultConcreteType, CalendarIntervalType, 
BinaryType,
+  ArrayType(StringType), MapType(IntegerType, StringType),
+  StructType.fromDDL("a INT, b STRING"), 
ObjectType(classOf[java.lang.Integer]))
+val proj = createMutableProjection(variableLengthTypes)
--- End diff --

sure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-10-23 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r227626456
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala ---
@@ -143,4 +144,25 @@ object InternalRow {
 case u: UserDefinedType[_] => getAccessor(u.sqlType)
 case _ => (input, ordinal) => input.get(ordinal, dataType)
   }
+
+  /**
+   * Returns a writer for an `InternalRow` with given data type.
+   */
+  def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = 
dt match {
+case BooleanType => (input, v) => input.setBoolean(ordinal, 
v.asInstanceOf[Boolean])
+case ByteType => (input, v) => input.setByte(ordinal, 
v.asInstanceOf[Byte])
+case ShortType => (input, v) => input.setShort(ordinal, 
v.asInstanceOf[Short])
+case IntegerType | DateType => (input, v) => input.setInt(ordinal, 
v.asInstanceOf[Int])
+case LongType | TimestampType => (input, v) => input.setLong(ordinal, 
v.asInstanceOf[Long])
+case FloatType => (input, v) => input.setFloat(ordinal, 
v.asInstanceOf[Float])
+case DoubleType => (input, v) => input.setDouble(ordinal, 
v.asInstanceOf[Double])
+case DecimalType.Fixed(precision, _) =>
+  (input, v) => input.setDecimal(ordinal, v.asInstanceOf[Decimal], 
precision)
+case CalendarIntervalType | BinaryType | _: ArrayType | StringType | 
_: StructType |
+ _: MapType | _: ObjectType =>
+  (input, v) => input.update(ordinal, v)
+case udt: UserDefinedType[_] => getWriter(ordinal, udt.sqlType)
+case NullType => (input, _) => input.setNullAt(ordinal)
+case _ => throw new SparkException(s"Unsupported data type $dt")
--- End diff --

ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r227618778
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MutableProjectionSuite.scala
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+class MutableProjectionSuite extends SparkFunSuite with 
ExpressionEvalHelper {
+
+  private def createMutableProjection(dataTypes: Array[DataType]): 
MutableProjection = {
+MutableProjection.create(dataTypes.zipWithIndex.map(x => 
BoundReference(x._2, x._1, true)))
+  }
+
+  testBothCodegenAndInterpreted("fixed-length types") {
+val fixedLengthTypes = Array[DataType](
+  BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, 
DoubleType,
+  DateType, TimestampType)
+val proj = createMutableProjection(fixedLengthTypes)
+val inputRow = InternalRow.fromSeq(
+  Seq(false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 5.0, 100, 200L))
+assert(proj(inputRow) === inputRow)
+
+// Use UnsafeRow as buffer
+val numBytes = 
UnsafeRow.calculateBitSetWidthInBytes(fixedLengthTypes.length)
+val unsafeBuffer = UnsafeRow.createFromByteArray(numBytes, 
fixedLengthTypes.length)
+val projUnsafeRow = proj.target(unsafeBuffer)(inputRow)
+assert(FromUnsafeProjection(fixedLengthTypes)(projUnsafeRow) === 
inputRow)
+  }
+
+  testBothCodegenAndInterpreted("variable-length types") {
+val variableLengthTypes = Array(
+  StringType, DecimalType.defaultConcreteType, CalendarIntervalType, 
BinaryType,
+  ArrayType(StringType), MapType(IntegerType, StringType),
+  StructType.fromDDL("a INT, b STRING"), 
ObjectType(classOf[java.lang.Integer]))
+val proj = createMutableProjection(variableLengthTypes)
--- End diff --

shall we also test that we should fail if the target row is unsafe row?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r227618313
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala ---
@@ -143,4 +144,25 @@ object InternalRow {
 case u: UserDefinedType[_] => getAccessor(u.sqlType)
 case _ => (input, ordinal) => input.get(ordinal, dataType)
   }
+
+  /**
+   * Returns a writer for an `InternalRow` with given data type.
+   */
+  def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = 
dt match {
+case BooleanType => (input, v) => input.setBoolean(ordinal, 
v.asInstanceOf[Boolean])
+case ByteType => (input, v) => input.setByte(ordinal, 
v.asInstanceOf[Byte])
+case ShortType => (input, v) => input.setShort(ordinal, 
v.asInstanceOf[Short])
+case IntegerType | DateType => (input, v) => input.setInt(ordinal, 
v.asInstanceOf[Int])
+case LongType | TimestampType => (input, v) => input.setLong(ordinal, 
v.asInstanceOf[Long])
+case FloatType => (input, v) => input.setFloat(ordinal, 
v.asInstanceOf[Float])
+case DoubleType => (input, v) => input.setDouble(ordinal, 
v.asInstanceOf[Double])
+case DecimalType.Fixed(precision, _) =>
+  (input, v) => input.setDecimal(ordinal, v.asInstanceOf[Decimal], 
precision)
+case CalendarIntervalType | BinaryType | _: ArrayType | StringType | 
_: StructType |
+ _: MapType | _: ObjectType =>
+  (input, v) => input.update(ordinal, v)
+case udt: UserDefinedType[_] => getWriter(ordinal, udt.sqlType)
+case NullType => (input, _) => input.setNullAt(ordinal)
+case _ => throw new SparkException(s"Unsupported data type $dt")
--- End diff --

one minor point: the codegen version just call `row.update` for un-caught 
types, which means it supports object type as well. Shall we follow?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-10-23 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r227609747
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala ---
@@ -143,4 +144,24 @@ object InternalRow {
 case u: UserDefinedType[_] => getAccessor(u.sqlType)
 case _ => (input, ordinal) => input.get(ordinal, dataType)
   }
+
+  /**
+   * Returns a writer for an `InternalRow` with given data type.
+   */
+  def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = 
dt match {
+case BooleanType => (input, v) => input.setBoolean(ordinal, 
v.asInstanceOf[Boolean])
+case ByteType => (input, v) => input.setByte(ordinal, 
v.asInstanceOf[Byte])
+case ShortType => (input, v) => input.setShort(ordinal, 
v.asInstanceOf[Short])
+case IntegerType | DateType => (input, v) => input.setInt(ordinal, 
v.asInstanceOf[Int])
+case LongType | TimestampType => (input, v) => input.setLong(ordinal, 
v.asInstanceOf[Long])
+case FloatType => (input, v) => input.setFloat(ordinal, 
v.asInstanceOf[Float])
+case DoubleType => (input, v) => input.setDouble(ordinal, 
v.asInstanceOf[Double])
+case DecimalType.Fixed(precision, _) =>
+  (input, v) => input.setDecimal(ordinal, v.asInstanceOf[Decimal], 
precision)
+case CalendarIntervalType | BinaryType | _: ArrayType | StringType | 
_: StructType |
+ _: MapType | _: ObjectType | _: UserDefinedType[_] =>
+  (input, v) => input.update(ordinal, v)
+case NullType => (input, v) => {}
--- End diff --

ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-10-23 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r227609600
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala ---
@@ -143,4 +144,24 @@ object InternalRow {
 case u: UserDefinedType[_] => getAccessor(u.sqlType)
 case _ => (input, ordinal) => input.get(ordinal, dataType)
   }
+
+  /**
+   * Returns a writer for an `InternalRow` with given data type.
+   */
+  def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = 
dt match {
+case BooleanType => (input, v) => input.setBoolean(ordinal, 
v.asInstanceOf[Boolean])
+case ByteType => (input, v) => input.setByte(ordinal, 
v.asInstanceOf[Byte])
+case ShortType => (input, v) => input.setShort(ordinal, 
v.asInstanceOf[Short])
+case IntegerType | DateType => (input, v) => input.setInt(ordinal, 
v.asInstanceOf[Int])
+case LongType | TimestampType => (input, v) => input.setLong(ordinal, 
v.asInstanceOf[Long])
+case FloatType => (input, v) => input.setFloat(ordinal, 
v.asInstanceOf[Float])
+case DoubleType => (input, v) => input.setDouble(ordinal, 
v.asInstanceOf[Double])
+case DecimalType.Fixed(precision, _) =>
+  (input, v) => input.setDecimal(ordinal, v.asInstanceOf[Decimal], 
precision)
+case CalendarIntervalType | BinaryType | _: ArrayType | StringType | 
_: StructType |
+ _: MapType | _: ObjectType | _: UserDefinedType[_] =>
--- End diff --

ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r227433044
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala ---
@@ -143,4 +144,24 @@ object InternalRow {
 case u: UserDefinedType[_] => getAccessor(u.sqlType)
 case _ => (input, ordinal) => input.get(ordinal, dataType)
   }
+
+  /**
+   * Returns a writer for an `InternalRow` with given data type.
+   */
+  def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = 
dt match {
+case BooleanType => (input, v) => input.setBoolean(ordinal, 
v.asInstanceOf[Boolean])
+case ByteType => (input, v) => input.setByte(ordinal, 
v.asInstanceOf[Byte])
+case ShortType => (input, v) => input.setShort(ordinal, 
v.asInstanceOf[Short])
+case IntegerType | DateType => (input, v) => input.setInt(ordinal, 
v.asInstanceOf[Int])
+case LongType | TimestampType => (input, v) => input.setLong(ordinal, 
v.asInstanceOf[Long])
+case FloatType => (input, v) => input.setFloat(ordinal, 
v.asInstanceOf[Float])
+case DoubleType => (input, v) => input.setDouble(ordinal, 
v.asInstanceOf[Double])
+case DecimalType.Fixed(precision, _) =>
+  (input, v) => input.setDecimal(ordinal, v.asInstanceOf[Decimal], 
precision)
+case CalendarIntervalType | BinaryType | _: ArrayType | StringType | 
_: StructType |
+ _: MapType | _: ObjectType | _: UserDefinedType[_] =>
--- End diff --

we should recursive into UDT.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-10-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r227432603
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala ---
@@ -143,4 +144,24 @@ object InternalRow {
 case u: UserDefinedType[_] => getAccessor(u.sqlType)
 case _ => (input, ordinal) => input.get(ordinal, dataType)
   }
+
+  /**
+   * Returns a writer for an `InternalRow` with given data type.
+   */
+  def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = 
dt match {
+case BooleanType => (input, v) => input.setBoolean(ordinal, 
v.asInstanceOf[Boolean])
+case ByteType => (input, v) => input.setByte(ordinal, 
v.asInstanceOf[Byte])
+case ShortType => (input, v) => input.setShort(ordinal, 
v.asInstanceOf[Short])
+case IntegerType | DateType => (input, v) => input.setInt(ordinal, 
v.asInstanceOf[Int])
+case LongType | TimestampType => (input, v) => input.setLong(ordinal, 
v.asInstanceOf[Long])
+case FloatType => (input, v) => input.setFloat(ordinal, 
v.asInstanceOf[Float])
+case DoubleType => (input, v) => input.setDouble(ordinal, 
v.asInstanceOf[Double])
+case DecimalType.Fixed(precision, _) =>
+  (input, v) => input.setDecimal(ordinal, v.asInstanceOf[Decimal], 
precision)
+case CalendarIntervalType | BinaryType | _: ArrayType | StringType | 
_: StructType |
+ _: MapType | _: ObjectType | _: UserDefinedType[_] =>
+  (input, v) => input.update(ordinal, v)
+case NullType => (input, v) => {}
--- End diff --

In the codegen version `CodeGenerator.setColumn`, we don't match NullType 
and eventually call `row.update(null, i)`. Shall we follow and call 
`row.setNullAt` here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r22723
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -49,10 +51,54 @@ class InterpretedMutableProjection(expressions: 
Seq[Expression]) extends Mutable
   def currentValue: InternalRow = mutableRow
 
   override def target(row: InternalRow): MutableProjection = {
+// If `mutableRow` is `UnsafeRow`, `MutableProjection` accepts 
fixed-length types only
+assert(!row.isInstanceOf[UnsafeRow] ||
+  validExprs.forall { case (e, _) => 
UnsafeRow.isFixedLength(e.dataType) })
 mutableRow = row
 this
   }
 
+  private[this] val fieldWriters = validExprs.map { case (e, i) =>
+val writer = generateRowWriter(i, e.dataType)
+if (!e.nullable) {
+  (v: Any) => writer(v)
+} else {
+  (v: Any) => {
+if (v == null) {
+  mutableRow.setNullAt(i)
+} else {
+  writer(v)
+}
+  }
+}
+  }
+
+  private def generateRowWriter(ordinal: Int, dt: DataType): Any => Unit = 
dt match {
+case BooleanType =>
+  v => mutableRow.setBoolean(ordinal, v.asInstanceOf[Boolean])
+case ByteType =>
+  v => mutableRow.setByte(ordinal, v.asInstanceOf[Byte])
+case ShortType =>
+  v => mutableRow.setShort(ordinal, v.asInstanceOf[Short])
+case IntegerType | DateType =>
+  v => mutableRow.setInt(ordinal, v.asInstanceOf[Int])
+case LongType | TimestampType =>
+  v => mutableRow.setLong(ordinal, v.asInstanceOf[Long])
+case FloatType =>
+  v => mutableRow.setFloat(ordinal, v.asInstanceOf[Float])
+case DoubleType =>
+  v => mutableRow.setDouble(ordinal, v.asInstanceOf[Double])
+case DecimalType.Fixed(precision, _) =>
+  v => mutableRow.setDecimal(ordinal, v.asInstanceOf[Decimal], 
precision)
+case CalendarIntervalType | BinaryType | _: ArrayType | StringType | 
_: StructType |
+ _: MapType | _: UserDefinedType[_] =>
+  v => mutableRow.update(ordinal, v)
+case NullType =>
+  v => {}
--- End diff --

the corresponding logic in the codegen version is simply call 
`row.update(null, i)`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-10-22 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r227226902
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala ---
@@ -140,6 +141,14 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSQLContext {
 val input = fileToString(new File(testCase.inputFile))
 
 val (comments, code) = input.split("\n").partition(_.startsWith("--"))
+
+// Runs all the tests on both codegen-only and interpreter modes. 
Since explain results differ
+// when `WHOLESTAGE_CODEGEN_ENABLED` disabled, we don't run these 
tests now.
+val codegenConfigSets = Array(("false", "NO_CODEGEN"), ("true", 
"CODEGEN_ONLY")).map {
+  case (wholeStageCodegenEnabled, codegenFactoryMode) =>
+Array( // SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
wholeStageCodegenEnabled,
--- End diff --

ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-10-22 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r227224209
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -49,10 +51,54 @@ class InterpretedMutableProjection(expressions: 
Seq[Expression]) extends Mutable
   def currentValue: InternalRow = mutableRow
 
   override def target(row: InternalRow): MutableProjection = {
+// If `mutableRow` is `UnsafeRow`, `MutableProjection` accepts 
fixed-length types only
+assert(!row.isInstanceOf[UnsafeRow] ||
+  validExprs.forall { case (e, _) => 
UnsafeRow.isFixedLength(e.dataType) })
 mutableRow = row
 this
   }
 
+  private[this] val fieldWriters = validExprs.map { case (e, i) =>
+val writer = generateRowWriter(i, e.dataType)
+if (!e.nullable) {
+  (v: Any) => writer(v)
+} else {
+  (v: Any) => {
+if (v == null) {
+  mutableRow.setNullAt(i)
+} else {
+  writer(v)
+}
+  }
+}
+  }
+
+  private def generateRowWriter(ordinal: Int, dt: DataType): Any => Unit = 
dt match {
--- End diff --

oh, yes! yea, I will.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-10-22 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r227224030
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -49,10 +51,54 @@ class InterpretedMutableProjection(expressions: 
Seq[Expression]) extends Mutable
   def currentValue: InternalRow = mutableRow
 
   override def target(row: InternalRow): MutableProjection = {
+// If `mutableRow` is `UnsafeRow`, `MutableProjection` accepts 
fixed-length types only
+assert(!row.isInstanceOf[UnsafeRow] ||
+  validExprs.forall { case (e, _) => 
UnsafeRow.isFixedLength(e.dataType) })
 mutableRow = row
 this
   }
 
+  private[this] val fieldWriters = validExprs.map { case (e, i) =>
+val writer = generateRowWriter(i, e.dataType)
+if (!e.nullable) {
+  (v: Any) => writer(v)
+} else {
+  (v: Any) => {
+if (v == null) {
+  mutableRow.setNullAt(i)
+} else {
+  writer(v)
+}
+  }
+}
+  }
+
+  private def generateRowWriter(ordinal: Int, dt: DataType): Any => Unit = 
dt match {
+case BooleanType =>
+  v => mutableRow.setBoolean(ordinal, v.asInstanceOf[Boolean])
+case ByteType =>
+  v => mutableRow.setByte(ordinal, v.asInstanceOf[Byte])
+case ShortType =>
+  v => mutableRow.setShort(ordinal, v.asInstanceOf[Short])
+case IntegerType | DateType =>
+  v => mutableRow.setInt(ordinal, v.asInstanceOf[Int])
+case LongType | TimestampType =>
+  v => mutableRow.setLong(ordinal, v.asInstanceOf[Long])
+case FloatType =>
+  v => mutableRow.setFloat(ordinal, v.asInstanceOf[Float])
+case DoubleType =>
+  v => mutableRow.setDouble(ordinal, v.asInstanceOf[Double])
+case DecimalType.Fixed(precision, _) =>
+  v => mutableRow.setDecimal(ordinal, v.asInstanceOf[Decimal], 
precision)
+case CalendarIntervalType | BinaryType | _: ArrayType | StringType | 
_: StructType |
+ _: MapType | _: UserDefinedType[_] =>
+  v => mutableRow.update(ordinal, v)
+case NullType =>
+  v => {}
--- End diff --

We need to take care of `e.nullable && e.dataType == NullType` here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r227202171
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala ---
@@ -140,6 +141,14 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSQLContext {
 val input = fileToString(new File(testCase.inputFile))
 
 val (comments, code) = input.split("\n").partition(_.startsWith("--"))
+
+// Runs all the tests on both codegen-only and interpreter modes. 
Since explain results differ
+// when `WHOLESTAGE_CODEGEN_ENABLED` disabled, we don't run these 
tests now.
+val codegenConfigSets = Array(("false", "NO_CODEGEN"), ("true", 
"CODEGEN_ONLY")).map {
+  case (wholeStageCodegenEnabled, codegenFactoryMode) =>
+Array( // SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
wholeStageCodegenEnabled,
--- End diff --

If `wholeStageCodegenEnabled` is not used, let's not complicate the code 
now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r227200656
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -49,10 +51,54 @@ class InterpretedMutableProjection(expressions: 
Seq[Expression]) extends Mutable
   def currentValue: InternalRow = mutableRow
 
   override def target(row: InternalRow): MutableProjection = {
+// If `mutableRow` is `UnsafeRow`, `MutableProjection` accepts 
fixed-length types only
+assert(!row.isInstanceOf[UnsafeRow] ||
+  validExprs.forall { case (e, _) => 
UnsafeRow.isFixedLength(e.dataType) })
 mutableRow = row
 this
   }
 
+  private[this] val fieldWriters = validExprs.map { case (e, i) =>
+val writer = generateRowWriter(i, e.dataType)
+if (!e.nullable) {
+  (v: Any) => writer(v)
+} else {
+  (v: Any) => {
+if (v == null) {
+  mutableRow.setNullAt(i)
+} else {
+  writer(v)
+}
+  }
+}
+  }
+
+  private def generateRowWriter(ordinal: Int, dt: DataType): Any => Unit = 
dt match {
--- End diff --

we have `InternalRow.getAccessor`, shall we move this method there too?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-10-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r227200458
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -49,10 +51,54 @@ class InterpretedMutableProjection(expressions: 
Seq[Expression]) extends Mutable
   def currentValue: InternalRow = mutableRow
 
   override def target(row: InternalRow): MutableProjection = {
+// If `mutableRow` is `UnsafeRow`, `MutableProjection` accepts 
fixed-length types only
+assert(!row.isInstanceOf[UnsafeRow] ||
+  validExprs.forall { case (e, _) => 
UnsafeRow.isFixedLength(e.dataType) })
 mutableRow = row
 this
   }
 
+  private[this] val fieldWriters = validExprs.map { case (e, i) =>
+val writer = generateRowWriter(i, e.dataType)
+if (!e.nullable) {
+  (v: Any) => writer(v)
+} else {
+  (v: Any) => {
+if (v == null) {
+  mutableRow.setNullAt(i)
+} else {
+  writer(v)
+}
+  }
+}
+  }
+
+  private def generateRowWriter(ordinal: Int, dt: DataType): Any => Unit = 
dt match {
+case BooleanType =>
+  v => mutableRow.setBoolean(ordinal, v.asInstanceOf[Boolean])
+case ByteType =>
+  v => mutableRow.setByte(ordinal, v.asInstanceOf[Byte])
+case ShortType =>
+  v => mutableRow.setShort(ordinal, v.asInstanceOf[Short])
+case IntegerType | DateType =>
+  v => mutableRow.setInt(ordinal, v.asInstanceOf[Int])
+case LongType | TimestampType =>
+  v => mutableRow.setLong(ordinal, v.asInstanceOf[Long])
+case FloatType =>
+  v => mutableRow.setFloat(ordinal, v.asInstanceOf[Float])
+case DoubleType =>
+  v => mutableRow.setDouble(ordinal, v.asInstanceOf[Double])
+case DecimalType.Fixed(precision, _) =>
+  v => mutableRow.setDecimal(ordinal, v.asInstanceOf[Decimal], 
precision)
+case CalendarIntervalType | BinaryType | _: ArrayType | StringType | 
_: StructType |
+ _: MapType | _: UserDefinedType[_] =>
+  v => mutableRow.update(ordinal, v)
+case NullType =>
+  v => {}
--- End diff --

shall we call `mutableRow.setNullAt`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22512: [SPARK-25498][SQL] InterpretedMutableProjection s...

2018-10-15 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22512#discussion_r225140616
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala
 ---
@@ -53,6 +55,47 @@ class InterpretedMutableProjection(expressions: 
Seq[Expression]) extends Mutable
 this
   }
 
+  private[this] val fieldWriters = validExprs.map { case (e, i) =>
+val writer = generateRowWriter(i, e.dataType)
+if (!e.nullable) {
+  (v: Any) => writer(v)
+} else {
+  (v: Any) => {
+if (v == null) {
+  mutableRow.setNullAt(i)
+} else {
+  writer(v)
+}
+  }
+}
+  }
+
+  private def generateRowWriter(ordinal: Int, dt: DataType): Any => Unit = 
dt match {
+case BooleanType =>
+  v => mutableRow.setBoolean(ordinal, v.asInstanceOf[Boolean])
+case ByteType =>
+  v => mutableRow.setByte(ordinal, v.asInstanceOf[Byte])
+case ShortType =>
+  v => mutableRow.setShort(ordinal, v.asInstanceOf[Short])
+case IntegerType | DateType =>
+  v => mutableRow.setInt(ordinal, v.asInstanceOf[Int])
+case LongType | TimestampType =>
+  v => mutableRow.setLong(ordinal, v.asInstanceOf[Long])
+case FloatType =>
+  v => mutableRow.setFloat(ordinal, v.asInstanceOf[Float])
+case DoubleType =>
+  v => mutableRow.setDouble(ordinal, v.asInstanceOf[Double])
+case DecimalType.Fixed(precision, _) =>
+  v => mutableRow.setDecimal(ordinal, v.asInstanceOf[Decimal], 
precision)
+case CalendarIntervalType | BinaryType | _: ArrayType | StringType | 
_: StructType |
+ _: MapType | _: UserDefinedType[_] =>
+  v => mutableRow.update(ordinal, v)
--- End diff --

This match should only accept the generic internal rows, so I added code to 
verify types for the `UnsafeRow` case;

https://github.com/apache/spark/pull/22512/files#diff-3ed819282d4e4941571dd3b08fc03e37R55


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org