[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-12-04 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r238894837
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
 ---
@@ -535,4 +535,98 @@ class UnsafeRowConverterSuite extends SparkFunSuite 
with Matchers with PlanTestB
 assert(unsafeRow.getSizeInBytes ==
   8 + 8 * 2 + roundedSize(field1.getSizeInBytes) + 
roundedSize(field2.getSizeInBytes))
   }
+
+  testBothCodegenAndInterpreted("SPARK-25374 converts back into safe 
representation") {
+def convertBackToInternalRow(inputRow: InternalRow, fields: 
Array[DataType]): InternalRow = {
+  val unsafeProj = UnsafeProjection.create(fields)
+  val unsafeRow = unsafeProj(inputRow)
+  val safeProj = SafeProjection.create(fields)
+  safeProj(unsafeRow)
+}
+
+// Simple tests
+val inputRow = InternalRow.fromSeq(Seq(
+  false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 8.0, 
UTF8String.fromString("test"),
+  Decimal(255), CalendarInterval.fromString("interval 1 day"), 
Array[Byte](1, 2)
+))
+val fields1 = Array(
+  BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType,
+  DoubleType, StringType, DecimalType.defaultConcreteType, 
CalendarIntervalType,
+  BinaryType)
+
+assert(convertBackToInternalRow(inputRow, fields1) === inputRow)
+
+// Array tests
+val arrayRow = InternalRow.fromSeq(Seq(
+  createArray(1, 2, 3),
+  createArray(
+createArray(Seq("a", "b", "c").map(UTF8String.fromString): _*),
+createArray(Seq("d").map(UTF8String.fromString): _*))
+))
+val fields2 = Array[DataType](
+  ArrayType(IntegerType),
+  ArrayType(ArrayType(StringType)))
+
+assert(convertBackToInternalRow(arrayRow, fields2) === arrayRow)
+
+// Struct tests
+val structRow = InternalRow.fromSeq(Seq(
+  InternalRow.fromSeq(Seq[Any](1, 4.0)),
+  InternalRow.fromSeq(Seq(
+UTF8String.fromString("test"),
+InternalRow.fromSeq(Seq(
+  1,
+  createArray(Seq("2", "3").map(UTF8String.fromString): _*)
+))
+  ))
+))
+val fields3 = Array[DataType](
+  StructType(
+StructField("c0", IntegerType) ::
+StructField("c1", DoubleType) ::
+Nil),
+  StructType(
+StructField("c2", StringType) ::
+StructField("c3", StructType(
+  StructField("c4", IntegerType) ::
+  StructField("c5", ArrayType(StringType)) ::
+  Nil)) ::
+Nil))
+
+assert(convertBackToInternalRow(structRow, fields3) === structRow)
+
+// Map tests
+val mapRow = InternalRow.fromSeq(Seq(
+  createMap(Seq("k1", "k2").map(UTF8String.fromString): _*)(1, 2),
+  createMap(
+createMap(3, 5)(Seq("v1", "v2").map(UTF8String.fromString): _*),
+createMap(7, 9)(Seq("v3", "v4").map(UTF8String.fromString): _*)
+  )(
+createMap(Seq("k3", "k4").map(UTF8String.fromString): 
_*)(3.toShort, 4.toShort),
+createMap(Seq("k5", "k6").map(UTF8String.fromString): 
_*)(5.toShort, 6.toShort)
+  )))
+val fields4 = Array[DataType](
+  MapType(StringType, IntegerType),
+  MapType(MapType(IntegerType, StringType), MapType(StringType, 
ShortType)))
+
+val mapResultRow = convertBackToInternalRow(mapRow, 
fields4).toSeq(fields4)
+val mapExpectedRow = mapRow.toSeq(fields4)
+// Since `ArrayBasedMapData` does not override `equals` and `hashCode`,
--- End diff --

Aha, thanks. I remember that its related to SPARK-18134.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-12-04 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r238683833
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
 ---
@@ -535,4 +535,98 @@ class UnsafeRowConverterSuite extends SparkFunSuite 
with Matchers with PlanTestB
 assert(unsafeRow.getSizeInBytes ==
   8 + 8 * 2 + roundedSize(field1.getSizeInBytes) + 
roundedSize(field2.getSizeInBytes))
   }
+
+  testBothCodegenAndInterpreted("SPARK-25374 converts back into safe 
representation") {
+def convertBackToInternalRow(inputRow: InternalRow, fields: 
Array[DataType]): InternalRow = {
+  val unsafeProj = UnsafeProjection.create(fields)
+  val unsafeRow = unsafeProj(inputRow)
+  val safeProj = SafeProjection.create(fields)
+  safeProj(unsafeRow)
+}
+
+// Simple tests
+val inputRow = InternalRow.fromSeq(Seq(
+  false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 8.0, 
UTF8String.fromString("test"),
+  Decimal(255), CalendarInterval.fromString("interval 1 day"), 
Array[Byte](1, 2)
+))
+val fields1 = Array(
+  BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType,
+  DoubleType, StringType, DecimalType.defaultConcreteType, 
CalendarIntervalType,
+  BinaryType)
+
+assert(convertBackToInternalRow(inputRow, fields1) === inputRow)
+
+// Array tests
+val arrayRow = InternalRow.fromSeq(Seq(
+  createArray(1, 2, 3),
+  createArray(
+createArray(Seq("a", "b", "c").map(UTF8String.fromString): _*),
+createArray(Seq("d").map(UTF8String.fromString): _*))
+))
+val fields2 = Array[DataType](
+  ArrayType(IntegerType),
+  ArrayType(ArrayType(StringType)))
+
+assert(convertBackToInternalRow(arrayRow, fields2) === arrayRow)
+
+// Struct tests
+val structRow = InternalRow.fromSeq(Seq(
+  InternalRow.fromSeq(Seq[Any](1, 4.0)),
+  InternalRow.fromSeq(Seq(
+UTF8String.fromString("test"),
+InternalRow.fromSeq(Seq(
+  1,
+  createArray(Seq("2", "3").map(UTF8String.fromString): _*)
+))
+  ))
+))
+val fields3 = Array[DataType](
+  StructType(
+StructField("c0", IntegerType) ::
+StructField("c1", DoubleType) ::
+Nil),
+  StructType(
+StructField("c2", StringType) ::
+StructField("c3", StructType(
+  StructField("c4", IntegerType) ::
+  StructField("c5", ArrayType(StringType)) ::
+  Nil)) ::
+Nil))
+
+assert(convertBackToInternalRow(structRow, fields3) === structRow)
+
+// Map tests
+val mapRow = InternalRow.fromSeq(Seq(
+  createMap(Seq("k1", "k2").map(UTF8String.fromString): _*)(1, 2),
+  createMap(
+createMap(3, 5)(Seq("v1", "v2").map(UTF8String.fromString): _*),
+createMap(7, 9)(Seq("v3", "v4").map(UTF8String.fromString): _*)
+  )(
+createMap(Seq("k3", "k4").map(UTF8String.fromString): 
_*)(3.toShort, 4.toShort),
+createMap(Seq("k5", "k6").map(UTF8String.fromString): 
_*)(5.toShort, 6.toShort)
+  )))
+val fields4 = Array[DataType](
+  MapType(StringType, IntegerType),
+  MapType(MapType(IntegerType, StringType), MapType(StringType, 
ShortType)))
+
+val mapResultRow = convertBackToInternalRow(mapRow, 
fields4).toSeq(fields4)
+val mapExpectedRow = mapRow.toSeq(fields4)
+// Since `ArrayBasedMapData` does not override `equals` and `hashCode`,
--- End diff --

`ArrayBasedMapData`/`UnsafeMapData` does not have `equals()` or 
`hashCode()` implemented because we do not have a good story around map 
equality. Implementing equals/hashcode for map is only half of the solution, we 
would also need a comparable binary format.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-12-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22468


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-12-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r238543369
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
 ---
@@ -535,4 +535,98 @@ class UnsafeRowConverterSuite extends SparkFunSuite 
with Matchers with PlanTestB
 assert(unsafeRow.getSizeInBytes ==
   8 + 8 * 2 + roundedSize(field1.getSizeInBytes) + 
roundedSize(field2.getSizeInBytes))
   }
+
+  testBothCodegenAndInterpreted("SPARK-25374 converts back into safe 
representation") {
+def convertBackToInternalRow(inputRow: InternalRow, fields: 
Array[DataType]): InternalRow = {
+  val unsafeProj = UnsafeProjection.create(fields)
+  val unsafeRow = unsafeProj(inputRow)
+  val safeProj = SafeProjection.create(fields)
+  safeProj(unsafeRow)
+}
+
+// Simple tests
+val inputRow = InternalRow.fromSeq(Seq(
+  false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 8.0, 
UTF8String.fromString("test"),
+  Decimal(255), CalendarInterval.fromString("interval 1 day"), 
Array[Byte](1, 2)
+))
+val fields1 = Array(
+  BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType,
+  DoubleType, StringType, DecimalType.defaultConcreteType, 
CalendarIntervalType,
+  BinaryType)
+
+assert(convertBackToInternalRow(inputRow, fields1) === inputRow)
+
+// Array tests
+val arrayRow = InternalRow.fromSeq(Seq(
+  createArray(1, 2, 3),
+  createArray(
+createArray(Seq("a", "b", "c").map(UTF8String.fromString): _*),
+createArray(Seq("d").map(UTF8String.fromString): _*))
+))
+val fields2 = Array[DataType](
+  ArrayType(IntegerType),
+  ArrayType(ArrayType(StringType)))
+
+assert(convertBackToInternalRow(arrayRow, fields2) === arrayRow)
+
+// Struct tests
+val structRow = InternalRow.fromSeq(Seq(
+  InternalRow.fromSeq(Seq[Any](1, 4.0)),
+  InternalRow.fromSeq(Seq(
+UTF8String.fromString("test"),
+InternalRow.fromSeq(Seq(
+  1,
+  createArray(Seq("2", "3").map(UTF8String.fromString): _*)
+))
+  ))
+))
+val fields3 = Array[DataType](
+  StructType(
+StructField("c0", IntegerType) ::
+StructField("c1", DoubleType) ::
+Nil),
+  StructType(
+StructField("c2", StringType) ::
+StructField("c3", StructType(
+  StructField("c4", IntegerType) ::
+  StructField("c5", ArrayType(StringType)) ::
+  Nil)) ::
+Nil))
+
+assert(convertBackToInternalRow(structRow, fields3) === structRow)
+
+// Map tests
+val mapRow = InternalRow.fromSeq(Seq(
+  createMap(Seq("k1", "k2").map(UTF8String.fromString): _*)(1, 2),
+  createMap(
+createMap(3, 5)(Seq("v1", "v2").map(UTF8String.fromString): _*),
+createMap(7, 9)(Seq("v3", "v4").map(UTF8String.fromString): _*)
+  )(
+createMap(Seq("k3", "k4").map(UTF8String.fromString): 
_*)(3.toShort, 4.toShort),
+createMap(Seq("k5", "k6").map(UTF8String.fromString): 
_*)(5.toShort, 6.toShort)
+  )))
+val fields4 = Array[DataType](
+  MapType(StringType, IntegerType),
+  MapType(MapType(IntegerType, StringType), MapType(StringType, 
ShortType)))
+
+val mapResultRow = convertBackToInternalRow(mapRow, 
fields4).toSeq(fields4)
+val mapExpectedRow = mapRow.toSeq(fields4)
+// Since `ArrayBasedMapData` does not override `equals` and `hashCode`,
--- End diff --

fixed code to use `ExpressionEvalHelper.checkResult`.

I don't remember correctly though, we might have some historical reasons 
about that; `ArrayBasedMapData` has no `hashCode` and `equals`. Probably, 
somebody might know this... cc: @hvanhovell @viirya


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r238534101
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
 ---
@@ -535,4 +535,98 @@ class UnsafeRowConverterSuite extends SparkFunSuite 
with Matchers with PlanTestB
 assert(unsafeRow.getSizeInBytes ==
   8 + 8 * 2 + roundedSize(field1.getSizeInBytes) + 
roundedSize(field2.getSizeInBytes))
   }
+
+  testBothCodegenAndInterpreted("SPARK-25374 converts back into safe 
representation") {
+def convertBackToInternalRow(inputRow: InternalRow, fields: 
Array[DataType]): InternalRow = {
+  val unsafeProj = UnsafeProjection.create(fields)
+  val unsafeRow = unsafeProj(inputRow)
+  val safeProj = SafeProjection.create(fields)
+  safeProj(unsafeRow)
+}
+
+// Simple tests
+val inputRow = InternalRow.fromSeq(Seq(
+  false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 8.0, 
UTF8String.fromString("test"),
+  Decimal(255), CalendarInterval.fromString("interval 1 day"), 
Array[Byte](1, 2)
+))
+val fields1 = Array(
+  BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType,
+  DoubleType, StringType, DecimalType.defaultConcreteType, 
CalendarIntervalType,
+  BinaryType)
+
+assert(convertBackToInternalRow(inputRow, fields1) === inputRow)
+
+// Array tests
+val arrayRow = InternalRow.fromSeq(Seq(
+  createArray(1, 2, 3),
+  createArray(
+createArray(Seq("a", "b", "c").map(UTF8String.fromString): _*),
+createArray(Seq("d").map(UTF8String.fromString): _*))
+))
+val fields2 = Array[DataType](
+  ArrayType(IntegerType),
+  ArrayType(ArrayType(StringType)))
+
+assert(convertBackToInternalRow(arrayRow, fields2) === arrayRow)
+
+// Struct tests
+val structRow = InternalRow.fromSeq(Seq(
+  InternalRow.fromSeq(Seq[Any](1, 4.0)),
+  InternalRow.fromSeq(Seq(
+UTF8String.fromString("test"),
+InternalRow.fromSeq(Seq(
+  1,
+  createArray(Seq("2", "3").map(UTF8String.fromString): _*)
+))
+  ))
+))
+val fields3 = Array[DataType](
+  StructType(
+StructField("c0", IntegerType) ::
+StructField("c1", DoubleType) ::
+Nil),
+  StructType(
+StructField("c2", StringType) ::
+StructField("c3", StructType(
+  StructField("c4", IntegerType) ::
+  StructField("c5", ArrayType(StringType)) ::
+  Nil)) ::
+Nil))
+
+assert(convertBackToInternalRow(structRow, fields3) === structRow)
+
+// Map tests
+val mapRow = InternalRow.fromSeq(Seq(
+  createMap(Seq("k1", "k2").map(UTF8String.fromString): _*)(1, 2),
+  createMap(
+createMap(3, 5)(Seq("v1", "v2").map(UTF8String.fromString): _*),
+createMap(7, 9)(Seq("v3", "v4").map(UTF8String.fromString): _*)
+  )(
+createMap(Seq("k3", "k4").map(UTF8String.fromString): 
_*)(3.toShort, 4.toShort),
+createMap(Seq("k5", "k6").map(UTF8String.fromString): 
_*)(5.toShort, 6.toShort)
+  )))
+val fields4 = Array[DataType](
+  MapType(StringType, IntegerType),
+  MapType(MapType(IntegerType, StringType), MapType(StringType, 
ShortType)))
+
+val mapResultRow = convertBackToInternalRow(mapRow, 
fields4).toSeq(fields4)
+val mapExpectedRow = mapRow.toSeq(fields4)
+// Since `ArrayBasedMapData` does not override `equals` and `hashCode`,
--- End diff --

Or we can use `ExpressionEvalHelper.checkResult` here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r238533700
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
 ---
@@ -535,4 +535,98 @@ class UnsafeRowConverterSuite extends SparkFunSuite 
with Matchers with PlanTestB
 assert(unsafeRow.getSizeInBytes ==
   8 + 8 * 2 + roundedSize(field1.getSizeInBytes) + 
roundedSize(field2.getSizeInBytes))
   }
+
+  testBothCodegenAndInterpreted("SPARK-25374 converts back into safe 
representation") {
+def convertBackToInternalRow(inputRow: InternalRow, fields: 
Array[DataType]): InternalRow = {
+  val unsafeProj = UnsafeProjection.create(fields)
+  val unsafeRow = unsafeProj(inputRow)
+  val safeProj = SafeProjection.create(fields)
+  safeProj(unsafeRow)
+}
+
+// Simple tests
+val inputRow = InternalRow.fromSeq(Seq(
+  false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 8.0, 
UTF8String.fromString("test"),
+  Decimal(255), CalendarInterval.fromString("interval 1 day"), 
Array[Byte](1, 2)
+))
+val fields1 = Array(
+  BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType,
+  DoubleType, StringType, DecimalType.defaultConcreteType, 
CalendarIntervalType,
+  BinaryType)
+
+assert(convertBackToInternalRow(inputRow, fields1) === inputRow)
+
+// Array tests
+val arrayRow = InternalRow.fromSeq(Seq(
+  createArray(1, 2, 3),
+  createArray(
+createArray(Seq("a", "b", "c").map(UTF8String.fromString): _*),
+createArray(Seq("d").map(UTF8String.fromString): _*))
+))
+val fields2 = Array[DataType](
+  ArrayType(IntegerType),
+  ArrayType(ArrayType(StringType)))
+
+assert(convertBackToInternalRow(arrayRow, fields2) === arrayRow)
+
+// Struct tests
+val structRow = InternalRow.fromSeq(Seq(
+  InternalRow.fromSeq(Seq[Any](1, 4.0)),
+  InternalRow.fromSeq(Seq(
+UTF8String.fromString("test"),
+InternalRow.fromSeq(Seq(
+  1,
+  createArray(Seq("2", "3").map(UTF8String.fromString): _*)
+))
+  ))
+))
+val fields3 = Array[DataType](
+  StructType(
+StructField("c0", IntegerType) ::
+StructField("c1", DoubleType) ::
+Nil),
+  StructType(
+StructField("c2", StringType) ::
+StructField("c3", StructType(
+  StructField("c4", IntegerType) ::
+  StructField("c5", ArrayType(StringType)) ::
+  Nil)) ::
+Nil))
+
+assert(convertBackToInternalRow(structRow, fields3) === structRow)
+
+// Map tests
+val mapRow = InternalRow.fromSeq(Seq(
+  createMap(Seq("k1", "k2").map(UTF8String.fromString): _*)(1, 2),
+  createMap(
+createMap(3, 5)(Seq("v1", "v2").map(UTF8String.fromString): _*),
+createMap(7, 9)(Seq("v3", "v4").map(UTF8String.fromString): _*)
+  )(
+createMap(Seq("k3", "k4").map(UTF8String.fromString): 
_*)(3.toShort, 4.toShort),
+createMap(Seq("k5", "k6").map(UTF8String.fromString): 
_*)(5.toShort, 6.toShort)
+  )))
+val fields4 = Array[DataType](
+  MapType(StringType, IntegerType),
+  MapType(MapType(IntegerType, StringType), MapType(StringType, 
ShortType)))
+
+val mapResultRow = convertBackToInternalRow(mapRow, 
fields4).toSeq(fields4)
+val mapExpectedRow = mapRow.toSeq(fields4)
+// Since `ArrayBasedMapData` does not override `equals` and `hashCode`,
--- End diff --

maybe we should implement `equals` and `hashCode` in `ArrayBasedMapData` 
and `UnsafeMapData`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-12-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r238530264
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
 ---
@@ -535,4 +535,100 @@ class UnsafeRowConverterSuite extends SparkFunSuite 
with Matchers with PlanTestB
 assert(unsafeRow.getSizeInBytes ==
   8 + 8 * 2 + roundedSize(field1.getSizeInBytes) + 
roundedSize(field2.getSizeInBytes))
   }
+
+  testBothCodegenAndInterpreted("SPARK-25374 converts back into safe 
representation") {
+def convertBackToInternalRow(inputRow: InternalRow, fields: 
Array[DataType]): InternalRow = {
+  val unsafeProj = UnsafeProjection.create(fields)
+  val unsafeRow = unsafeProj(inputRow)
+  val safeProj = SafeProjection.create(fields)
+  safeProj(unsafeRow)
+}
+
+// Simple tests
+val inputRow = InternalRow.fromSeq(Seq(
+  false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 8.0, 
UTF8String.fromString("test"),
+  Decimal(255), CalendarInterval.fromString("interval 1 day"), 
Array[Byte](1, 2)
+))
+val fields1 = Array(
+  BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType,
+  DoubleType, StringType, DecimalType.defaultConcreteType, 
CalendarIntervalType,
+  BinaryType)
+
+assert(convertBackToInternalRow(inputRow, fields1) === inputRow)
+
+// Array tests
+val arrayRow = InternalRow.fromSeq(Seq(
+  createArray(1, 2, 3),
+  createArray(
+createArray(Seq("a", "b", "c").map(UTF8String.fromString): _*),
+createArray(Seq("d").map(UTF8String.fromString): _*))
+))
+val fields2 = Array[DataType](
+  ArrayType(IntegerType),
+  ArrayType(ArrayType(StringType)))
+
+assert(convertBackToInternalRow(arrayRow, fields2) === arrayRow)
+
+// Struct tests
+val structRow = InternalRow.fromSeq(Seq(
+  InternalRow.fromSeq(Seq[Any](1, 4.0)),
+  InternalRow.fromSeq(Seq(
+UTF8String.fromString("test"),
+InternalRow.fromSeq(Seq(
+  1,
+  createArray(Seq("2", "3").map(UTF8String.fromString): _*)
+))
+  ))
+))
+val fields3 = Array[DataType](
+  StructType(
+StructField("c0", IntegerType) ::
+StructField("c1", DoubleType) ::
+Nil),
+  StructType(
+StructField("c2", StringType) ::
+StructField("c3", StructType(
+  StructField("c4", IntegerType) ::
+  StructField("c5", ArrayType(StringType)) ::
+  Nil)) ::
+Nil))
+
+assert(convertBackToInternalRow(structRow, fields3) === structRow)
+
+// Map tests
+val mapRow = InternalRow.fromSeq(Seq(
+  createMap(Seq("k1", "k2").map(UTF8String.fromString): _*)(1, 2),
+  createMap(
+createMap(3, 5)(Seq("v1", "v2").map(UTF8String.fromString): _*),
+createMap(7, 9)(Seq("v3", "v4").map(UTF8String.fromString): _*)
+  )(
+createMap(Seq("k3", "k4").map(UTF8String.fromString): 
_*)(3.toShort, 4.toShort),
+createMap(Seq("k5", "k6").map(UTF8String.fromString): 
_*)(5.toShort, 6.toShort)
+  )))
+val fields4 = Array[DataType](
+  MapType(StringType, IntegerType),
+  MapType(MapType(IntegerType, StringType), MapType(StringType, 
ShortType)))
+
+// Since `ArrayBasedMapData` does not override `equals` and `hashCode`,
+// we need to take care of it to compare rows.
+def toComparable(d: Any): Any = d match {
--- End diff --

Since we cannot compare `ArrayBasedMapData`s directly (that is, 
`assert(mapResultRow === mapExpectedRow)` fails), I just converted them into 
the `Seq`s of keys/values by this method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r238520267
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
 ---
@@ -535,4 +535,100 @@ class UnsafeRowConverterSuite extends SparkFunSuite 
with Matchers with PlanTestB
 assert(unsafeRow.getSizeInBytes ==
   8 + 8 * 2 + roundedSize(field1.getSizeInBytes) + 
roundedSize(field2.getSizeInBytes))
   }
+
+  testBothCodegenAndInterpreted("SPARK-25374 converts back into safe 
representation") {
+def convertBackToInternalRow(inputRow: InternalRow, fields: 
Array[DataType]): InternalRow = {
+  val unsafeProj = UnsafeProjection.create(fields)
+  val unsafeRow = unsafeProj(inputRow)
+  val safeProj = SafeProjection.create(fields)
+  safeProj(unsafeRow)
+}
+
+// Simple tests
+val inputRow = InternalRow.fromSeq(Seq(
+  false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 8.0, 
UTF8String.fromString("test"),
+  Decimal(255), CalendarInterval.fromString("interval 1 day"), 
Array[Byte](1, 2)
+))
+val fields1 = Array(
+  BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType,
+  DoubleType, StringType, DecimalType.defaultConcreteType, 
CalendarIntervalType,
+  BinaryType)
+
+assert(convertBackToInternalRow(inputRow, fields1) === inputRow)
+
+// Array tests
+val arrayRow = InternalRow.fromSeq(Seq(
+  createArray(1, 2, 3),
+  createArray(
+createArray(Seq("a", "b", "c").map(UTF8String.fromString): _*),
+createArray(Seq("d").map(UTF8String.fromString): _*))
+))
+val fields2 = Array[DataType](
+  ArrayType(IntegerType),
+  ArrayType(ArrayType(StringType)))
+
+assert(convertBackToInternalRow(arrayRow, fields2) === arrayRow)
+
+// Struct tests
+val structRow = InternalRow.fromSeq(Seq(
+  InternalRow.fromSeq(Seq[Any](1, 4.0)),
+  InternalRow.fromSeq(Seq(
+UTF8String.fromString("test"),
+InternalRow.fromSeq(Seq(
+  1,
+  createArray(Seq("2", "3").map(UTF8String.fromString): _*)
+))
+  ))
+))
+val fields3 = Array[DataType](
+  StructType(
+StructField("c0", IntegerType) ::
+StructField("c1", DoubleType) ::
+Nil),
+  StructType(
+StructField("c2", StringType) ::
+StructField("c3", StructType(
+  StructField("c4", IntegerType) ::
+  StructField("c5", ArrayType(StringType)) ::
+  Nil)) ::
+Nil))
+
+assert(convertBackToInternalRow(structRow, fields3) === structRow)
+
+// Map tests
+val mapRow = InternalRow.fromSeq(Seq(
+  createMap(Seq("k1", "k2").map(UTF8String.fromString): _*)(1, 2),
+  createMap(
+createMap(3, 5)(Seq("v1", "v2").map(UTF8String.fromString): _*),
+createMap(7, 9)(Seq("v3", "v4").map(UTF8String.fromString): _*)
+  )(
+createMap(Seq("k3", "k4").map(UTF8String.fromString): 
_*)(3.toShort, 4.toShort),
+createMap(Seq("k5", "k6").map(UTF8String.fromString): 
_*)(5.toShort, 6.toShort)
+  )))
+val fields4 = Array[DataType](
+  MapType(StringType, IntegerType),
+  MapType(MapType(IntegerType, StringType), MapType(StringType, 
ShortType)))
+
+// Since `ArrayBasedMapData` does not override `equals` and `hashCode`,
+// we need to take care of it to compare rows.
+def toComparable(d: Any): Any = d match {
--- End diff --

this does nothing, isn't it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r238515544
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala
 ---
@@ -106,4 +106,19 @@ class CodeGeneratorWithInterpretedFallbackSuite 
extends SparkFunSuite with PlanT
   assert(proj(input).toSeq(StructType.fromDDL("c0 int, c1 int")) === 
expected)
 }
   }
+
+  test("SPARK-25374 Correctly handles NoOp in SafeProjection") {
+val exprs = Seq(Add(BoundReference(0, IntegerType, nullable = true), 
Literal.create(1)), NoOp)
+val input = InternalRow.fromSeq(1 :: 1 :: Nil)
+val expected = 2 :: null :: Nil
+withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) {
--- End diff --

nvm, this is the code style in this test suite


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r238515444
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala
 ---
@@ -106,4 +106,19 @@ class CodeGeneratorWithInterpretedFallbackSuite 
extends SparkFunSuite with PlanT
   assert(proj(input).toSeq(StructType.fromDDL("c0 int, c1 int")) === 
expected)
 }
   }
+
+  test("SPARK-25374 Correctly handles NoOp in SafeProjection") {
+val exprs = Seq(Add(BoundReference(0, IntegerType, nullable = true), 
Literal.create(1)), NoOp)
+val input = InternalRow.fromSeq(1 :: 1 :: Nil)
+val expected = 2 :: null :: Nil
+withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) {
--- End diff --

can we use `testWithBothCodegenAndIntepreted`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r238515227
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
 ---
@@ -166,29 +166,40 @@ object UnsafeProjection
   }
 }
 
-/**
- * A projection that could turn UnsafeRow into GenericInternalRow
--- End diff --

can we keep this comment?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-12-03 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r238490121
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala ---
@@ -157,4 +157,22 @@ object InternalRow {
   getValueNullSafe
 }
   }
+
+  /**
+   * Returns a writer for an `InternalRow` with given data type.
+   */
+  def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = 
dt match {
--- End diff --

ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-12-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r238330712
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala ---
@@ -157,4 +157,22 @@ object InternalRow {
   getValueNullSafe
 }
   }
+
+  /**
+   * Returns a writer for an `InternalRow` with given data type.
+   */
+  def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = 
dt match {
--- End diff --

We can rebase now


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-10-22 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r227185139
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala ---
@@ -31,48 +30,7 @@ import org.apache.spark.sql.types._
 @BeanInfo
 private[sql] case class MyLabeledPoint(
   @BeanProperty label: Double,
-  @BeanProperty features: UDT.MyDenseVector)
-
-// Wrapped in an object to check Scala compatibility. See SPARK-13929
-object UDT {
--- End diff --

That's because we use `MyDenseVectorUDT` in `UnsafeRowConverterSuite.scala` 
for unit tests. (`MyDenseVectorUDT` is located in the `core`, but 
`UnsafeRowConverterSuite` located in the `catalyst`).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-10-22 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r227184697
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
GenericArrayData, MapData}
+import org.apache.spark.sql.types._
+
+
+/**
+ * An interpreted version of a safe projection.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedSafeProjection(expressions: Seq[Expression]) extends 
Projection {
+
+  private[this] val mutableRow = new 
SpecificInternalRow(expressions.map(_.dataType))
+
+  private[this] val exprsWithWriters = expressions.zipWithIndex.filter {
+case (NoOp, _) => false
+case _ => true
+  }.map { case (e, i) =>
+val converter = generateSafeValueConverter(e.dataType)
+val writer = generateRowWriter(i, e.dataType)
+val f = if (!e.nullable) {
+  (v: Any) => writer(converter(v))
+} else {
+  (v: Any) => {
+if (v == null) {
+  mutableRow.setNullAt(i)
+} else {
+  writer(converter(v))
+}
+  }
+}
+(e, f)
+  }
+
+  private def isPrimitive(dataType: DataType): Boolean = dataType match {
+case BooleanType => true
+case ByteType => true
+case ShortType => true
+case IntegerType => true
+case LongType => true
+case FloatType => true
+case DoubleType => true
+case _ => false
+  }
+
+  private def generateSafeValueConverter(dt: DataType): Any => Any = dt 
match {
+case ArrayType(elemType, _) =>
+  if (isPrimitive(elemType)) {
+v => {
+  val arrayValue = v.asInstanceOf[ArrayData]
+  new GenericArrayData(arrayValue.toArray[Any](elemType))
+}
+  } else {
+val elementConverter = generateSafeValueConverter(elemType)
+v => {
+  val arrayValue = v.asInstanceOf[ArrayData]
+  val result = new Array[Any](arrayValue.numElements())
+  arrayValue.foreach(elemType, (i, e) => {
+result(i) = elementConverter(e)
+  })
+  new GenericArrayData(result)
+}
+  }
+
+case st: StructType =>
+  val fieldTypes = st.fields.map(_.dataType)
+  val fieldConverters = fieldTypes.map(generateSafeValueConverter)
+  v => {
+val row = v.asInstanceOf[InternalRow]
+val ar = new Array[Any](row.numFields)
+var idx = 0
+while (idx < row.numFields) {
+  ar(idx) = fieldConverters(idx)(row.get(idx, fieldTypes(idx)))
+  idx += 1
+}
+new GenericInternalRow(ar)
+  }
+
+case MapType(keyType, valueType, _) =>
+  lazy val keyConverter = generateSafeValueConverter(keyType)
+  lazy val valueConverter = generateSafeValueConverter(valueType)
+  v => {
+val mapValue = v.asInstanceOf[MapData]
+val keys = mapValue.keyArray().toArray[Any](keyType)
+val values = mapValue.valueArray().toArray[Any](valueType)
+val convertedKeys =
+  if (isPrimitive(keyType)) keys else keys.map(keyConverter)
+val convertedValues =
+  if (isPrimitive(valueType)) values else 
values.map(valueConverter)
+
+ArrayBasedMapData(convertedKeys, convertedValues)
+  }
+
+case udt: UserDefinedType[_] =>
+  

[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-10-22 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r227184308
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
GenericArrayData, MapData}
+import org.apache.spark.sql.types._
+
+
+/**
+ * An interpreted version of a safe projection.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedSafeProjection(expressions: Seq[Expression]) extends 
Projection {
+
+  private[this] val mutableRow = new 
SpecificInternalRow(expressions.map(_.dataType))
+
+  private[this] val exprsWithWriters = expressions.zipWithIndex.filter {
+case (NoOp, _) => false
+case _ => true
+  }.map { case (e, i) =>
+val converter = generateSafeValueConverter(e.dataType)
+val writer = generateRowWriter(i, e.dataType)
+val f = if (!e.nullable) {
+  (v: Any) => writer(converter(v))
+} else {
+  (v: Any) => {
+if (v == null) {
+  mutableRow.setNullAt(i)
+} else {
+  writer(converter(v))
+}
+  }
+}
+(e, f)
+  }
+
+  private def isPrimitive(dataType: DataType): Boolean = dataType match {
+case BooleanType => true
+case ByteType => true
+case ShortType => true
+case IntegerType => true
+case LongType => true
+case FloatType => true
+case DoubleType => true
+case _ => false
+  }
+
+  private def generateSafeValueConverter(dt: DataType): Any => Any = dt 
match {
+case ArrayType(elemType, _) =>
+  if (isPrimitive(elemType)) {
--- End diff --

ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-10-22 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r227183344
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
GenericArrayData, MapData}
+import org.apache.spark.sql.types._
+
+
+/**
+ * An interpreted version of a safe projection.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedSafeProjection(expressions: Seq[Expression]) extends 
Projection {
+
+  private[this] val mutableRow = new 
SpecificInternalRow(expressions.map(_.dataType))
+
+  private[this] val exprsWithWriters = expressions.zipWithIndex.filter {
+case (NoOp, _) => false
+case _ => true
+  }.map { case (e, i) =>
+val converter = generateSafeValueConverter(e.dataType)
+val writer = generateRowWriter(i, e.dataType)
+val f = if (!e.nullable) {
+  (v: Any) => writer(converter(v))
+} else {
+  (v: Any) => {
+if (v == null) {
+  mutableRow.setNullAt(i)
+} else {
+  writer(converter(v))
+}
+  }
+}
+(e, f)
+  }
+
+  private def isPrimitive(dataType: DataType): Boolean = dataType match {
--- End diff --

ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-10-22 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r227183303
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
GenericArrayData, MapData}
+import org.apache.spark.sql.types._
+
+
+/**
+ * An interpreted version of a safe projection.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedSafeProjection(expressions: Seq[Expression]) extends 
Projection {
+
+  private[this] val mutableRow = new 
SpecificInternalRow(expressions.map(_.dataType))
+
+  private[this] val exprsWithWriters = expressions.zipWithIndex.filter {
+case (NoOp, _) => false
--- End diff --

IIUC the input expressions in `UnsafeProjection` possibly have `NoOp`s 
passed from aggregate expressions? So, IIUC `GenerateSafeProjection` handles 
`NoOp`s here:

https://github.com/apache/spark/blob/3b4556745e90a13f4ae7ebae4ab682617de25c38/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala#L153
I'm not 100% sure though... 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r225522164
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala ---
@@ -31,48 +30,7 @@ import org.apache.spark.sql.types._
 @BeanInfo
 private[sql] case class MyLabeledPoint(
   @BeanProperty label: Double,
-  @BeanProperty features: UDT.MyDenseVector)
-
-// Wrapped in an object to check Scala compatibility. See SPARK-13929
-object UDT {
--- End diff --

why do we change it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r225521786
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
GenericArrayData, MapData}
+import org.apache.spark.sql.types._
+
+
+/**
+ * An interpreted version of a safe projection.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedSafeProjection(expressions: Seq[Expression]) extends 
Projection {
+
+  private[this] val mutableRow = new 
SpecificInternalRow(expressions.map(_.dataType))
+
+  private[this] val exprsWithWriters = expressions.zipWithIndex.filter {
+case (NoOp, _) => false
+case _ => true
+  }.map { case (e, i) =>
+val converter = generateSafeValueConverter(e.dataType)
+val writer = generateRowWriter(i, e.dataType)
+val f = if (!e.nullable) {
+  (v: Any) => writer(converter(v))
+} else {
+  (v: Any) => {
+if (v == null) {
+  mutableRow.setNullAt(i)
+} else {
+  writer(converter(v))
+}
+  }
+}
+(e, f)
+  }
+
+  private def isPrimitive(dataType: DataType): Boolean = dataType match {
+case BooleanType => true
+case ByteType => true
+case ShortType => true
+case IntegerType => true
+case LongType => true
+case FloatType => true
+case DoubleType => true
+case _ => false
+  }
+
+  private def generateSafeValueConverter(dt: DataType): Any => Any = dt 
match {
+case ArrayType(elemType, _) =>
+  if (isPrimitive(elemType)) {
+v => {
+  val arrayValue = v.asInstanceOf[ArrayData]
+  new GenericArrayData(arrayValue.toArray[Any](elemType))
+}
+  } else {
+val elementConverter = generateSafeValueConverter(elemType)
+v => {
+  val arrayValue = v.asInstanceOf[ArrayData]
+  val result = new Array[Any](arrayValue.numElements())
+  arrayValue.foreach(elemType, (i, e) => {
+result(i) = elementConverter(e)
+  })
+  new GenericArrayData(result)
+}
+  }
+
+case st: StructType =>
+  val fieldTypes = st.fields.map(_.dataType)
+  val fieldConverters = fieldTypes.map(generateSafeValueConverter)
+  v => {
+val row = v.asInstanceOf[InternalRow]
+val ar = new Array[Any](row.numFields)
+var idx = 0
+while (idx < row.numFields) {
+  ar(idx) = fieldConverters(idx)(row.get(idx, fieldTypes(idx)))
+  idx += 1
+}
+new GenericInternalRow(ar)
+  }
+
+case MapType(keyType, valueType, _) =>
+  lazy val keyConverter = generateSafeValueConverter(keyType)
+  lazy val valueConverter = generateSafeValueConverter(valueType)
+  v => {
+val mapValue = v.asInstanceOf[MapData]
+val keys = mapValue.keyArray().toArray[Any](keyType)
+val values = mapValue.valueArray().toArray[Any](valueType)
+val convertedKeys =
+  if (isPrimitive(keyType)) keys else keys.map(keyConverter)
+val convertedValues =
+  if (isPrimitive(valueType)) values else 
values.map(valueConverter)
+
+ArrayBasedMapData(convertedKeys, convertedValues)
+  }
+
+case udt: UserDefinedType[_] =>
+  

[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r225521397
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
GenericArrayData, MapData}
+import org.apache.spark.sql.types._
+
+
+/**
+ * An interpreted version of a safe projection.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedSafeProjection(expressions: Seq[Expression]) extends 
Projection {
+
+  private[this] val mutableRow = new 
SpecificInternalRow(expressions.map(_.dataType))
+
+  private[this] val exprsWithWriters = expressions.zipWithIndex.filter {
+case (NoOp, _) => false
+case _ => true
+  }.map { case (e, i) =>
+val converter = generateSafeValueConverter(e.dataType)
+val writer = generateRowWriter(i, e.dataType)
+val f = if (!e.nullable) {
+  (v: Any) => writer(converter(v))
+} else {
+  (v: Any) => {
+if (v == null) {
+  mutableRow.setNullAt(i)
+} else {
+  writer(converter(v))
+}
+  }
+}
+(e, f)
+  }
+
+  private def isPrimitive(dataType: DataType): Boolean = dataType match {
+case BooleanType => true
+case ByteType => true
+case ShortType => true
+case IntegerType => true
+case LongType => true
+case FloatType => true
+case DoubleType => true
+case _ => false
+  }
+
+  private def generateSafeValueConverter(dt: DataType): Any => Any = dt 
match {
+case ArrayType(elemType, _) =>
+  if (isPrimitive(elemType)) {
+v => {
+  val arrayValue = v.asInstanceOf[ArrayData]
+  new GenericArrayData(arrayValue.toArray[Any](elemType))
+}
+  } else {
+val elementConverter = generateSafeValueConverter(elemType)
+v => {
+  val arrayValue = v.asInstanceOf[ArrayData]
+  val result = new Array[Any](arrayValue.numElements())
+  arrayValue.foreach(elemType, (i, e) => {
+result(i) = elementConverter(e)
+  })
+  new GenericArrayData(result)
+}
+  }
+
+case st: StructType =>
+  val fieldTypes = st.fields.map(_.dataType)
+  val fieldConverters = fieldTypes.map(generateSafeValueConverter)
+  v => {
+val row = v.asInstanceOf[InternalRow]
+val ar = new Array[Any](row.numFields)
+var idx = 0
+while (idx < row.numFields) {
+  ar(idx) = fieldConverters(idx)(row.get(idx, fieldTypes(idx)))
+  idx += 1
+}
+new GenericInternalRow(ar)
+  }
+
+case MapType(keyType, valueType, _) =>
+  lazy val keyConverter = generateSafeValueConverter(keyType)
+  lazy val valueConverter = generateSafeValueConverter(valueType)
+  v => {
+val mapValue = v.asInstanceOf[MapData]
+val keys = mapValue.keyArray().toArray[Any](keyType)
+val values = mapValue.valueArray().toArray[Any](valueType)
+val convertedKeys =
+  if (isPrimitive(keyType)) keys else keys.map(keyConverter)
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r225521206
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
GenericArrayData, MapData}
+import org.apache.spark.sql.types._
+
+
+/**
+ * An interpreted version of a safe projection.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedSafeProjection(expressions: Seq[Expression]) extends 
Projection {
+
+  private[this] val mutableRow = new 
SpecificInternalRow(expressions.map(_.dataType))
+
+  private[this] val exprsWithWriters = expressions.zipWithIndex.filter {
+case (NoOp, _) => false
+case _ => true
+  }.map { case (e, i) =>
+val converter = generateSafeValueConverter(e.dataType)
+val writer = generateRowWriter(i, e.dataType)
+val f = if (!e.nullable) {
+  (v: Any) => writer(converter(v))
+} else {
+  (v: Any) => {
+if (v == null) {
+  mutableRow.setNullAt(i)
+} else {
+  writer(converter(v))
+}
+  }
+}
+(e, f)
+  }
+
+  private def isPrimitive(dataType: DataType): Boolean = dataType match {
+case BooleanType => true
+case ByteType => true
+case ShortType => true
+case IntegerType => true
+case LongType => true
+case FloatType => true
+case DoubleType => true
+case _ => false
+  }
+
+  private def generateSafeValueConverter(dt: DataType): Any => Any = dt 
match {
+case ArrayType(elemType, _) =>
+  if (isPrimitive(elemType)) {
--- End diff --

Let's not add this optimization at the beginning. We can add it later with 
a benchmark.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r225520812
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
GenericArrayData, MapData}
+import org.apache.spark.sql.types._
+
+
+/**
+ * An interpreted version of a safe projection.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedSafeProjection(expressions: Seq[Expression]) extends 
Projection {
+
+  private[this] val mutableRow = new 
SpecificInternalRow(expressions.map(_.dataType))
+
+  private[this] val exprsWithWriters = expressions.zipWithIndex.filter {
+case (NoOp, _) => false
+case _ => true
+  }.map { case (e, i) =>
+val converter = generateSafeValueConverter(e.dataType)
+val writer = generateRowWriter(i, e.dataType)
+val f = if (!e.nullable) {
+  (v: Any) => writer(converter(v))
+} else {
+  (v: Any) => {
+if (v == null) {
+  mutableRow.setNullAt(i)
+} else {
+  writer(converter(v))
+}
+  }
+}
+(e, f)
+  }
+
+  private def isPrimitive(dataType: DataType): Boolean = dataType match {
--- End diff --

`CodeGenerator.isPrimitiveType`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-10-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22468#discussion_r225520290
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.expressions
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, 
GenericArrayData, MapData}
+import org.apache.spark.sql.types._
+
+
+/**
+ * An interpreted version of a safe projection.
+ *
+ * @param expressions that produces the resulting fields. These 
expressions must be bound
+ *to a schema.
+ */
+class InterpretedSafeProjection(expressions: Seq[Expression]) extends 
Projection {
+
+  private[this] val mutableRow = new 
SpecificInternalRow(expressions.map(_.dataType))
+
+  private[this] val exprsWithWriters = expressions.zipWithIndex.filter {
+case (NoOp, _) => false
--- End diff --

does `SafeProjection` need to handle `NoOp`? It's only used with 
`MutableProjection` in aggregate.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...

2018-09-19 Thread maropu
GitHub user maropu opened a pull request:

https://github.com/apache/spark/pull/22468

[SPARK-25374][SQL] SafeProjection supports fallback to an interpreted mode

## What changes were proposed in this pull request?
In SPARK-23711, we have implemented the expression fallback logic to an 
interpreted mode. So, this pr fixed code to support the same fallback mode in 
`SafeProjection` based on `CodeGeneratorWithInterpretedFallback`.

## How was this patch tested?
Add tests in `CodeGeneratorWithInterpretedFallbackSuite` and 
`UnsafeRowConverterSuite`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/maropu/spark SPARK-25374-3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22468.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22468


commit 96b0f62758e91dcee520ce9e4a003cac172d5b2c
Author: Takeshi Yamamuro 
Date:   2018-09-06T14:48:04Z

Implement InterpretedSafeProjection




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org