[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r238894837 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala --- @@ -535,4 +535,98 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers with PlanTestB assert(unsafeRow.getSizeInBytes == 8 + 8 * 2 + roundedSize(field1.getSizeInBytes) + roundedSize(field2.getSizeInBytes)) } + + testBothCodegenAndInterpreted("SPARK-25374 converts back into safe representation") { +def convertBackToInternalRow(inputRow: InternalRow, fields: Array[DataType]): InternalRow = { + val unsafeProj = UnsafeProjection.create(fields) + val unsafeRow = unsafeProj(inputRow) + val safeProj = SafeProjection.create(fields) + safeProj(unsafeRow) +} + +// Simple tests +val inputRow = InternalRow.fromSeq(Seq( + false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 8.0, UTF8String.fromString("test"), + Decimal(255), CalendarInterval.fromString("interval 1 day"), Array[Byte](1, 2) +)) +val fields1 = Array( + BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, + DoubleType, StringType, DecimalType.defaultConcreteType, CalendarIntervalType, + BinaryType) + +assert(convertBackToInternalRow(inputRow, fields1) === inputRow) + +// Array tests +val arrayRow = InternalRow.fromSeq(Seq( + createArray(1, 2, 3), + createArray( +createArray(Seq("a", "b", "c").map(UTF8String.fromString): _*), +createArray(Seq("d").map(UTF8String.fromString): _*)) +)) +val fields2 = Array[DataType]( + ArrayType(IntegerType), + ArrayType(ArrayType(StringType))) + +assert(convertBackToInternalRow(arrayRow, fields2) === arrayRow) + +// Struct tests +val structRow = InternalRow.fromSeq(Seq( + InternalRow.fromSeq(Seq[Any](1, 4.0)), + InternalRow.fromSeq(Seq( +UTF8String.fromString("test"), +InternalRow.fromSeq(Seq( + 1, + createArray(Seq("2", "3").map(UTF8String.fromString): _*) +)) + )) +)) +val fields3 = Array[DataType]( + StructType( +StructField("c0", IntegerType) :: +StructField("c1", DoubleType) :: +Nil), + StructType( +StructField("c2", StringType) :: +StructField("c3", StructType( + StructField("c4", IntegerType) :: + StructField("c5", ArrayType(StringType)) :: + Nil)) :: +Nil)) + +assert(convertBackToInternalRow(structRow, fields3) === structRow) + +// Map tests +val mapRow = InternalRow.fromSeq(Seq( + createMap(Seq("k1", "k2").map(UTF8String.fromString): _*)(1, 2), + createMap( +createMap(3, 5)(Seq("v1", "v2").map(UTF8String.fromString): _*), +createMap(7, 9)(Seq("v3", "v4").map(UTF8String.fromString): _*) + )( +createMap(Seq("k3", "k4").map(UTF8String.fromString): _*)(3.toShort, 4.toShort), +createMap(Seq("k5", "k6").map(UTF8String.fromString): _*)(5.toShort, 6.toShort) + ))) +val fields4 = Array[DataType]( + MapType(StringType, IntegerType), + MapType(MapType(IntegerType, StringType), MapType(StringType, ShortType))) + +val mapResultRow = convertBackToInternalRow(mapRow, fields4).toSeq(fields4) +val mapExpectedRow = mapRow.toSeq(fields4) +// Since `ArrayBasedMapData` does not override `equals` and `hashCode`, --- End diff -- Aha, thanks. I remember that its related to SPARK-18134. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r238683833 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala --- @@ -535,4 +535,98 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers with PlanTestB assert(unsafeRow.getSizeInBytes == 8 + 8 * 2 + roundedSize(field1.getSizeInBytes) + roundedSize(field2.getSizeInBytes)) } + + testBothCodegenAndInterpreted("SPARK-25374 converts back into safe representation") { +def convertBackToInternalRow(inputRow: InternalRow, fields: Array[DataType]): InternalRow = { + val unsafeProj = UnsafeProjection.create(fields) + val unsafeRow = unsafeProj(inputRow) + val safeProj = SafeProjection.create(fields) + safeProj(unsafeRow) +} + +// Simple tests +val inputRow = InternalRow.fromSeq(Seq( + false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 8.0, UTF8String.fromString("test"), + Decimal(255), CalendarInterval.fromString("interval 1 day"), Array[Byte](1, 2) +)) +val fields1 = Array( + BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, + DoubleType, StringType, DecimalType.defaultConcreteType, CalendarIntervalType, + BinaryType) + +assert(convertBackToInternalRow(inputRow, fields1) === inputRow) + +// Array tests +val arrayRow = InternalRow.fromSeq(Seq( + createArray(1, 2, 3), + createArray( +createArray(Seq("a", "b", "c").map(UTF8String.fromString): _*), +createArray(Seq("d").map(UTF8String.fromString): _*)) +)) +val fields2 = Array[DataType]( + ArrayType(IntegerType), + ArrayType(ArrayType(StringType))) + +assert(convertBackToInternalRow(arrayRow, fields2) === arrayRow) + +// Struct tests +val structRow = InternalRow.fromSeq(Seq( + InternalRow.fromSeq(Seq[Any](1, 4.0)), + InternalRow.fromSeq(Seq( +UTF8String.fromString("test"), +InternalRow.fromSeq(Seq( + 1, + createArray(Seq("2", "3").map(UTF8String.fromString): _*) +)) + )) +)) +val fields3 = Array[DataType]( + StructType( +StructField("c0", IntegerType) :: +StructField("c1", DoubleType) :: +Nil), + StructType( +StructField("c2", StringType) :: +StructField("c3", StructType( + StructField("c4", IntegerType) :: + StructField("c5", ArrayType(StringType)) :: + Nil)) :: +Nil)) + +assert(convertBackToInternalRow(structRow, fields3) === structRow) + +// Map tests +val mapRow = InternalRow.fromSeq(Seq( + createMap(Seq("k1", "k2").map(UTF8String.fromString): _*)(1, 2), + createMap( +createMap(3, 5)(Seq("v1", "v2").map(UTF8String.fromString): _*), +createMap(7, 9)(Seq("v3", "v4").map(UTF8String.fromString): _*) + )( +createMap(Seq("k3", "k4").map(UTF8String.fromString): _*)(3.toShort, 4.toShort), +createMap(Seq("k5", "k6").map(UTF8String.fromString): _*)(5.toShort, 6.toShort) + ))) +val fields4 = Array[DataType]( + MapType(StringType, IntegerType), + MapType(MapType(IntegerType, StringType), MapType(StringType, ShortType))) + +val mapResultRow = convertBackToInternalRow(mapRow, fields4).toSeq(fields4) +val mapExpectedRow = mapRow.toSeq(fields4) +// Since `ArrayBasedMapData` does not override `equals` and `hashCode`, --- End diff -- `ArrayBasedMapData`/`UnsafeMapData` does not have `equals()` or `hashCode()` implemented because we do not have a good story around map equality. Implementing equals/hashcode for map is only half of the solution, we would also need a comparable binary format. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22468 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r238543369 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala --- @@ -535,4 +535,98 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers with PlanTestB assert(unsafeRow.getSizeInBytes == 8 + 8 * 2 + roundedSize(field1.getSizeInBytes) + roundedSize(field2.getSizeInBytes)) } + + testBothCodegenAndInterpreted("SPARK-25374 converts back into safe representation") { +def convertBackToInternalRow(inputRow: InternalRow, fields: Array[DataType]): InternalRow = { + val unsafeProj = UnsafeProjection.create(fields) + val unsafeRow = unsafeProj(inputRow) + val safeProj = SafeProjection.create(fields) + safeProj(unsafeRow) +} + +// Simple tests +val inputRow = InternalRow.fromSeq(Seq( + false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 8.0, UTF8String.fromString("test"), + Decimal(255), CalendarInterval.fromString("interval 1 day"), Array[Byte](1, 2) +)) +val fields1 = Array( + BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, + DoubleType, StringType, DecimalType.defaultConcreteType, CalendarIntervalType, + BinaryType) + +assert(convertBackToInternalRow(inputRow, fields1) === inputRow) + +// Array tests +val arrayRow = InternalRow.fromSeq(Seq( + createArray(1, 2, 3), + createArray( +createArray(Seq("a", "b", "c").map(UTF8String.fromString): _*), +createArray(Seq("d").map(UTF8String.fromString): _*)) +)) +val fields2 = Array[DataType]( + ArrayType(IntegerType), + ArrayType(ArrayType(StringType))) + +assert(convertBackToInternalRow(arrayRow, fields2) === arrayRow) + +// Struct tests +val structRow = InternalRow.fromSeq(Seq( + InternalRow.fromSeq(Seq[Any](1, 4.0)), + InternalRow.fromSeq(Seq( +UTF8String.fromString("test"), +InternalRow.fromSeq(Seq( + 1, + createArray(Seq("2", "3").map(UTF8String.fromString): _*) +)) + )) +)) +val fields3 = Array[DataType]( + StructType( +StructField("c0", IntegerType) :: +StructField("c1", DoubleType) :: +Nil), + StructType( +StructField("c2", StringType) :: +StructField("c3", StructType( + StructField("c4", IntegerType) :: + StructField("c5", ArrayType(StringType)) :: + Nil)) :: +Nil)) + +assert(convertBackToInternalRow(structRow, fields3) === structRow) + +// Map tests +val mapRow = InternalRow.fromSeq(Seq( + createMap(Seq("k1", "k2").map(UTF8String.fromString): _*)(1, 2), + createMap( +createMap(3, 5)(Seq("v1", "v2").map(UTF8String.fromString): _*), +createMap(7, 9)(Seq("v3", "v4").map(UTF8String.fromString): _*) + )( +createMap(Seq("k3", "k4").map(UTF8String.fromString): _*)(3.toShort, 4.toShort), +createMap(Seq("k5", "k6").map(UTF8String.fromString): _*)(5.toShort, 6.toShort) + ))) +val fields4 = Array[DataType]( + MapType(StringType, IntegerType), + MapType(MapType(IntegerType, StringType), MapType(StringType, ShortType))) + +val mapResultRow = convertBackToInternalRow(mapRow, fields4).toSeq(fields4) +val mapExpectedRow = mapRow.toSeq(fields4) +// Since `ArrayBasedMapData` does not override `equals` and `hashCode`, --- End diff -- fixed code to use `ExpressionEvalHelper.checkResult`. I don't remember correctly though, we might have some historical reasons about that; `ArrayBasedMapData` has no `hashCode` and `equals`. Probably, somebody might know this... cc: @hvanhovell @viirya --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r238534101 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala --- @@ -535,4 +535,98 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers with PlanTestB assert(unsafeRow.getSizeInBytes == 8 + 8 * 2 + roundedSize(field1.getSizeInBytes) + roundedSize(field2.getSizeInBytes)) } + + testBothCodegenAndInterpreted("SPARK-25374 converts back into safe representation") { +def convertBackToInternalRow(inputRow: InternalRow, fields: Array[DataType]): InternalRow = { + val unsafeProj = UnsafeProjection.create(fields) + val unsafeRow = unsafeProj(inputRow) + val safeProj = SafeProjection.create(fields) + safeProj(unsafeRow) +} + +// Simple tests +val inputRow = InternalRow.fromSeq(Seq( + false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 8.0, UTF8String.fromString("test"), + Decimal(255), CalendarInterval.fromString("interval 1 day"), Array[Byte](1, 2) +)) +val fields1 = Array( + BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, + DoubleType, StringType, DecimalType.defaultConcreteType, CalendarIntervalType, + BinaryType) + +assert(convertBackToInternalRow(inputRow, fields1) === inputRow) + +// Array tests +val arrayRow = InternalRow.fromSeq(Seq( + createArray(1, 2, 3), + createArray( +createArray(Seq("a", "b", "c").map(UTF8String.fromString): _*), +createArray(Seq("d").map(UTF8String.fromString): _*)) +)) +val fields2 = Array[DataType]( + ArrayType(IntegerType), + ArrayType(ArrayType(StringType))) + +assert(convertBackToInternalRow(arrayRow, fields2) === arrayRow) + +// Struct tests +val structRow = InternalRow.fromSeq(Seq( + InternalRow.fromSeq(Seq[Any](1, 4.0)), + InternalRow.fromSeq(Seq( +UTF8String.fromString("test"), +InternalRow.fromSeq(Seq( + 1, + createArray(Seq("2", "3").map(UTF8String.fromString): _*) +)) + )) +)) +val fields3 = Array[DataType]( + StructType( +StructField("c0", IntegerType) :: +StructField("c1", DoubleType) :: +Nil), + StructType( +StructField("c2", StringType) :: +StructField("c3", StructType( + StructField("c4", IntegerType) :: + StructField("c5", ArrayType(StringType)) :: + Nil)) :: +Nil)) + +assert(convertBackToInternalRow(structRow, fields3) === structRow) + +// Map tests +val mapRow = InternalRow.fromSeq(Seq( + createMap(Seq("k1", "k2").map(UTF8String.fromString): _*)(1, 2), + createMap( +createMap(3, 5)(Seq("v1", "v2").map(UTF8String.fromString): _*), +createMap(7, 9)(Seq("v3", "v4").map(UTF8String.fromString): _*) + )( +createMap(Seq("k3", "k4").map(UTF8String.fromString): _*)(3.toShort, 4.toShort), +createMap(Seq("k5", "k6").map(UTF8String.fromString): _*)(5.toShort, 6.toShort) + ))) +val fields4 = Array[DataType]( + MapType(StringType, IntegerType), + MapType(MapType(IntegerType, StringType), MapType(StringType, ShortType))) + +val mapResultRow = convertBackToInternalRow(mapRow, fields4).toSeq(fields4) +val mapExpectedRow = mapRow.toSeq(fields4) +// Since `ArrayBasedMapData` does not override `equals` and `hashCode`, --- End diff -- Or we can use `ExpressionEvalHelper.checkResult` here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r238533700 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala --- @@ -535,4 +535,98 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers with PlanTestB assert(unsafeRow.getSizeInBytes == 8 + 8 * 2 + roundedSize(field1.getSizeInBytes) + roundedSize(field2.getSizeInBytes)) } + + testBothCodegenAndInterpreted("SPARK-25374 converts back into safe representation") { +def convertBackToInternalRow(inputRow: InternalRow, fields: Array[DataType]): InternalRow = { + val unsafeProj = UnsafeProjection.create(fields) + val unsafeRow = unsafeProj(inputRow) + val safeProj = SafeProjection.create(fields) + safeProj(unsafeRow) +} + +// Simple tests +val inputRow = InternalRow.fromSeq(Seq( + false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 8.0, UTF8String.fromString("test"), + Decimal(255), CalendarInterval.fromString("interval 1 day"), Array[Byte](1, 2) +)) +val fields1 = Array( + BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, + DoubleType, StringType, DecimalType.defaultConcreteType, CalendarIntervalType, + BinaryType) + +assert(convertBackToInternalRow(inputRow, fields1) === inputRow) + +// Array tests +val arrayRow = InternalRow.fromSeq(Seq( + createArray(1, 2, 3), + createArray( +createArray(Seq("a", "b", "c").map(UTF8String.fromString): _*), +createArray(Seq("d").map(UTF8String.fromString): _*)) +)) +val fields2 = Array[DataType]( + ArrayType(IntegerType), + ArrayType(ArrayType(StringType))) + +assert(convertBackToInternalRow(arrayRow, fields2) === arrayRow) + +// Struct tests +val structRow = InternalRow.fromSeq(Seq( + InternalRow.fromSeq(Seq[Any](1, 4.0)), + InternalRow.fromSeq(Seq( +UTF8String.fromString("test"), +InternalRow.fromSeq(Seq( + 1, + createArray(Seq("2", "3").map(UTF8String.fromString): _*) +)) + )) +)) +val fields3 = Array[DataType]( + StructType( +StructField("c0", IntegerType) :: +StructField("c1", DoubleType) :: +Nil), + StructType( +StructField("c2", StringType) :: +StructField("c3", StructType( + StructField("c4", IntegerType) :: + StructField("c5", ArrayType(StringType)) :: + Nil)) :: +Nil)) + +assert(convertBackToInternalRow(structRow, fields3) === structRow) + +// Map tests +val mapRow = InternalRow.fromSeq(Seq( + createMap(Seq("k1", "k2").map(UTF8String.fromString): _*)(1, 2), + createMap( +createMap(3, 5)(Seq("v1", "v2").map(UTF8String.fromString): _*), +createMap(7, 9)(Seq("v3", "v4").map(UTF8String.fromString): _*) + )( +createMap(Seq("k3", "k4").map(UTF8String.fromString): _*)(3.toShort, 4.toShort), +createMap(Seq("k5", "k6").map(UTF8String.fromString): _*)(5.toShort, 6.toShort) + ))) +val fields4 = Array[DataType]( + MapType(StringType, IntegerType), + MapType(MapType(IntegerType, StringType), MapType(StringType, ShortType))) + +val mapResultRow = convertBackToInternalRow(mapRow, fields4).toSeq(fields4) +val mapExpectedRow = mapRow.toSeq(fields4) +// Since `ArrayBasedMapData` does not override `equals` and `hashCode`, --- End diff -- maybe we should implement `equals` and `hashCode` in `ArrayBasedMapData` and `UnsafeMapData`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r238530264 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala --- @@ -535,4 +535,100 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers with PlanTestB assert(unsafeRow.getSizeInBytes == 8 + 8 * 2 + roundedSize(field1.getSizeInBytes) + roundedSize(field2.getSizeInBytes)) } + + testBothCodegenAndInterpreted("SPARK-25374 converts back into safe representation") { +def convertBackToInternalRow(inputRow: InternalRow, fields: Array[DataType]): InternalRow = { + val unsafeProj = UnsafeProjection.create(fields) + val unsafeRow = unsafeProj(inputRow) + val safeProj = SafeProjection.create(fields) + safeProj(unsafeRow) +} + +// Simple tests +val inputRow = InternalRow.fromSeq(Seq( + false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 8.0, UTF8String.fromString("test"), + Decimal(255), CalendarInterval.fromString("interval 1 day"), Array[Byte](1, 2) +)) +val fields1 = Array( + BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, + DoubleType, StringType, DecimalType.defaultConcreteType, CalendarIntervalType, + BinaryType) + +assert(convertBackToInternalRow(inputRow, fields1) === inputRow) + +// Array tests +val arrayRow = InternalRow.fromSeq(Seq( + createArray(1, 2, 3), + createArray( +createArray(Seq("a", "b", "c").map(UTF8String.fromString): _*), +createArray(Seq("d").map(UTF8String.fromString): _*)) +)) +val fields2 = Array[DataType]( + ArrayType(IntegerType), + ArrayType(ArrayType(StringType))) + +assert(convertBackToInternalRow(arrayRow, fields2) === arrayRow) + +// Struct tests +val structRow = InternalRow.fromSeq(Seq( + InternalRow.fromSeq(Seq[Any](1, 4.0)), + InternalRow.fromSeq(Seq( +UTF8String.fromString("test"), +InternalRow.fromSeq(Seq( + 1, + createArray(Seq("2", "3").map(UTF8String.fromString): _*) +)) + )) +)) +val fields3 = Array[DataType]( + StructType( +StructField("c0", IntegerType) :: +StructField("c1", DoubleType) :: +Nil), + StructType( +StructField("c2", StringType) :: +StructField("c3", StructType( + StructField("c4", IntegerType) :: + StructField("c5", ArrayType(StringType)) :: + Nil)) :: +Nil)) + +assert(convertBackToInternalRow(structRow, fields3) === structRow) + +// Map tests +val mapRow = InternalRow.fromSeq(Seq( + createMap(Seq("k1", "k2").map(UTF8String.fromString): _*)(1, 2), + createMap( +createMap(3, 5)(Seq("v1", "v2").map(UTF8String.fromString): _*), +createMap(7, 9)(Seq("v3", "v4").map(UTF8String.fromString): _*) + )( +createMap(Seq("k3", "k4").map(UTF8String.fromString): _*)(3.toShort, 4.toShort), +createMap(Seq("k5", "k6").map(UTF8String.fromString): _*)(5.toShort, 6.toShort) + ))) +val fields4 = Array[DataType]( + MapType(StringType, IntegerType), + MapType(MapType(IntegerType, StringType), MapType(StringType, ShortType))) + +// Since `ArrayBasedMapData` does not override `equals` and `hashCode`, +// we need to take care of it to compare rows. +def toComparable(d: Any): Any = d match { --- End diff -- Since we cannot compare `ArrayBasedMapData`s directly (that is, `assert(mapResultRow === mapExpectedRow)` fails), I just converted them into the `Seq`s of keys/values by this method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r238520267 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala --- @@ -535,4 +535,100 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers with PlanTestB assert(unsafeRow.getSizeInBytes == 8 + 8 * 2 + roundedSize(field1.getSizeInBytes) + roundedSize(field2.getSizeInBytes)) } + + testBothCodegenAndInterpreted("SPARK-25374 converts back into safe representation") { +def convertBackToInternalRow(inputRow: InternalRow, fields: Array[DataType]): InternalRow = { + val unsafeProj = UnsafeProjection.create(fields) + val unsafeRow = unsafeProj(inputRow) + val safeProj = SafeProjection.create(fields) + safeProj(unsafeRow) +} + +// Simple tests +val inputRow = InternalRow.fromSeq(Seq( + false, 3.toByte, 15.toShort, -83, 129L, 1.0f, 8.0, UTF8String.fromString("test"), + Decimal(255), CalendarInterval.fromString("interval 1 day"), Array[Byte](1, 2) +)) +val fields1 = Array( + BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, + DoubleType, StringType, DecimalType.defaultConcreteType, CalendarIntervalType, + BinaryType) + +assert(convertBackToInternalRow(inputRow, fields1) === inputRow) + +// Array tests +val arrayRow = InternalRow.fromSeq(Seq( + createArray(1, 2, 3), + createArray( +createArray(Seq("a", "b", "c").map(UTF8String.fromString): _*), +createArray(Seq("d").map(UTF8String.fromString): _*)) +)) +val fields2 = Array[DataType]( + ArrayType(IntegerType), + ArrayType(ArrayType(StringType))) + +assert(convertBackToInternalRow(arrayRow, fields2) === arrayRow) + +// Struct tests +val structRow = InternalRow.fromSeq(Seq( + InternalRow.fromSeq(Seq[Any](1, 4.0)), + InternalRow.fromSeq(Seq( +UTF8String.fromString("test"), +InternalRow.fromSeq(Seq( + 1, + createArray(Seq("2", "3").map(UTF8String.fromString): _*) +)) + )) +)) +val fields3 = Array[DataType]( + StructType( +StructField("c0", IntegerType) :: +StructField("c1", DoubleType) :: +Nil), + StructType( +StructField("c2", StringType) :: +StructField("c3", StructType( + StructField("c4", IntegerType) :: + StructField("c5", ArrayType(StringType)) :: + Nil)) :: +Nil)) + +assert(convertBackToInternalRow(structRow, fields3) === structRow) + +// Map tests +val mapRow = InternalRow.fromSeq(Seq( + createMap(Seq("k1", "k2").map(UTF8String.fromString): _*)(1, 2), + createMap( +createMap(3, 5)(Seq("v1", "v2").map(UTF8String.fromString): _*), +createMap(7, 9)(Seq("v3", "v4").map(UTF8String.fromString): _*) + )( +createMap(Seq("k3", "k4").map(UTF8String.fromString): _*)(3.toShort, 4.toShort), +createMap(Seq("k5", "k6").map(UTF8String.fromString): _*)(5.toShort, 6.toShort) + ))) +val fields4 = Array[DataType]( + MapType(StringType, IntegerType), + MapType(MapType(IntegerType, StringType), MapType(StringType, ShortType))) + +// Since `ArrayBasedMapData` does not override `equals` and `hashCode`, +// we need to take care of it to compare rows. +def toComparable(d: Any): Any = d match { --- End diff -- this does nothing, isn't it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r238515544 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala --- @@ -106,4 +106,19 @@ class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanT assert(proj(input).toSeq(StructType.fromDDL("c0 int, c1 int")) === expected) } } + + test("SPARK-25374 Correctly handles NoOp in SafeProjection") { +val exprs = Seq(Add(BoundReference(0, IntegerType, nullable = true), Literal.create(1)), NoOp) +val input = InternalRow.fromSeq(1 :: 1 :: Nil) +val expected = 2 :: null :: Nil +withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) { --- End diff -- nvm, this is the code style in this test suite --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r238515444 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala --- @@ -106,4 +106,19 @@ class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanT assert(proj(input).toSeq(StructType.fromDDL("c0 int, c1 int")) === expected) } } + + test("SPARK-25374 Correctly handles NoOp in SafeProjection") { +val exprs = Seq(Add(BoundReference(0, IntegerType, nullable = true), Literal.create(1)), NoOp) +val input = InternalRow.fromSeq(1 :: 1 :: Nil) +val expected = 2 :: null :: Nil +withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) { --- End diff -- can we use `testWithBothCodegenAndIntepreted`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r238515227 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala --- @@ -166,29 +166,40 @@ object UnsafeProjection } } -/** - * A projection that could turn UnsafeRow into GenericInternalRow --- End diff -- can we keep this comment? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r238490121 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala --- @@ -157,4 +157,22 @@ object InternalRow { getValueNullSafe } } + + /** + * Returns a writer for an `InternalRow` with given data type. + */ + def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = dt match { --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r238330712 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala --- @@ -157,4 +157,22 @@ object InternalRow { getValueNullSafe } } + + /** + * Returns a writer for an `InternalRow` with given data type. + */ + def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = dt match { --- End diff -- We can rebase now --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r227185139 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala --- @@ -31,48 +30,7 @@ import org.apache.spark.sql.types._ @BeanInfo private[sql] case class MyLabeledPoint( @BeanProperty label: Double, - @BeanProperty features: UDT.MyDenseVector) - -// Wrapped in an object to check Scala compatibility. See SPARK-13929 -object UDT { --- End diff -- That's because we use `MyDenseVectorUDT` in `UnsafeRowConverterSuite.scala` for unit tests. (`MyDenseVectorUDT` is located in the `core`, but `UnsafeRowConverterSuite` located in the `catalyst`). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r227184697 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData} +import org.apache.spark.sql.types._ + + +/** + * An interpreted version of a safe projection. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedSafeProjection(expressions: Seq[Expression]) extends Projection { + + private[this] val mutableRow = new SpecificInternalRow(expressions.map(_.dataType)) + + private[this] val exprsWithWriters = expressions.zipWithIndex.filter { +case (NoOp, _) => false +case _ => true + }.map { case (e, i) => +val converter = generateSafeValueConverter(e.dataType) +val writer = generateRowWriter(i, e.dataType) +val f = if (!e.nullable) { + (v: Any) => writer(converter(v)) +} else { + (v: Any) => { +if (v == null) { + mutableRow.setNullAt(i) +} else { + writer(converter(v)) +} + } +} +(e, f) + } + + private def isPrimitive(dataType: DataType): Boolean = dataType match { +case BooleanType => true +case ByteType => true +case ShortType => true +case IntegerType => true +case LongType => true +case FloatType => true +case DoubleType => true +case _ => false + } + + private def generateSafeValueConverter(dt: DataType): Any => Any = dt match { +case ArrayType(elemType, _) => + if (isPrimitive(elemType)) { +v => { + val arrayValue = v.asInstanceOf[ArrayData] + new GenericArrayData(arrayValue.toArray[Any](elemType)) +} + } else { +val elementConverter = generateSafeValueConverter(elemType) +v => { + val arrayValue = v.asInstanceOf[ArrayData] + val result = new Array[Any](arrayValue.numElements()) + arrayValue.foreach(elemType, (i, e) => { +result(i) = elementConverter(e) + }) + new GenericArrayData(result) +} + } + +case st: StructType => + val fieldTypes = st.fields.map(_.dataType) + val fieldConverters = fieldTypes.map(generateSafeValueConverter) + v => { +val row = v.asInstanceOf[InternalRow] +val ar = new Array[Any](row.numFields) +var idx = 0 +while (idx < row.numFields) { + ar(idx) = fieldConverters(idx)(row.get(idx, fieldTypes(idx))) + idx += 1 +} +new GenericInternalRow(ar) + } + +case MapType(keyType, valueType, _) => + lazy val keyConverter = generateSafeValueConverter(keyType) + lazy val valueConverter = generateSafeValueConverter(valueType) + v => { +val mapValue = v.asInstanceOf[MapData] +val keys = mapValue.keyArray().toArray[Any](keyType) +val values = mapValue.valueArray().toArray[Any](valueType) +val convertedKeys = + if (isPrimitive(keyType)) keys else keys.map(keyConverter) +val convertedValues = + if (isPrimitive(valueType)) values else values.map(valueConverter) + +ArrayBasedMapData(convertedKeys, convertedValues) + } + +case udt: UserDefinedType[_] => +
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r227184308 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData} +import org.apache.spark.sql.types._ + + +/** + * An interpreted version of a safe projection. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedSafeProjection(expressions: Seq[Expression]) extends Projection { + + private[this] val mutableRow = new SpecificInternalRow(expressions.map(_.dataType)) + + private[this] val exprsWithWriters = expressions.zipWithIndex.filter { +case (NoOp, _) => false +case _ => true + }.map { case (e, i) => +val converter = generateSafeValueConverter(e.dataType) +val writer = generateRowWriter(i, e.dataType) +val f = if (!e.nullable) { + (v: Any) => writer(converter(v)) +} else { + (v: Any) => { +if (v == null) { + mutableRow.setNullAt(i) +} else { + writer(converter(v)) +} + } +} +(e, f) + } + + private def isPrimitive(dataType: DataType): Boolean = dataType match { +case BooleanType => true +case ByteType => true +case ShortType => true +case IntegerType => true +case LongType => true +case FloatType => true +case DoubleType => true +case _ => false + } + + private def generateSafeValueConverter(dt: DataType): Any => Any = dt match { +case ArrayType(elemType, _) => + if (isPrimitive(elemType)) { --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r227183344 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData} +import org.apache.spark.sql.types._ + + +/** + * An interpreted version of a safe projection. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedSafeProjection(expressions: Seq[Expression]) extends Projection { + + private[this] val mutableRow = new SpecificInternalRow(expressions.map(_.dataType)) + + private[this] val exprsWithWriters = expressions.zipWithIndex.filter { +case (NoOp, _) => false +case _ => true + }.map { case (e, i) => +val converter = generateSafeValueConverter(e.dataType) +val writer = generateRowWriter(i, e.dataType) +val f = if (!e.nullable) { + (v: Any) => writer(converter(v)) +} else { + (v: Any) => { +if (v == null) { + mutableRow.setNullAt(i) +} else { + writer(converter(v)) +} + } +} +(e, f) + } + + private def isPrimitive(dataType: DataType): Boolean = dataType match { --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r227183303 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData} +import org.apache.spark.sql.types._ + + +/** + * An interpreted version of a safe projection. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedSafeProjection(expressions: Seq[Expression]) extends Projection { + + private[this] val mutableRow = new SpecificInternalRow(expressions.map(_.dataType)) + + private[this] val exprsWithWriters = expressions.zipWithIndex.filter { +case (NoOp, _) => false --- End diff -- IIUC the input expressions in `UnsafeProjection` possibly have `NoOp`s passed from aggregate expressions? So, IIUC `GenerateSafeProjection` handles `NoOp`s here: https://github.com/apache/spark/blob/3b4556745e90a13f4ae7ebae4ab682617de25c38/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateSafeProjection.scala#L153 I'm not 100% sure though... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r225522164 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala --- @@ -31,48 +30,7 @@ import org.apache.spark.sql.types._ @BeanInfo private[sql] case class MyLabeledPoint( @BeanProperty label: Double, - @BeanProperty features: UDT.MyDenseVector) - -// Wrapped in an object to check Scala compatibility. See SPARK-13929 -object UDT { --- End diff -- why do we change it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r225521786 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData} +import org.apache.spark.sql.types._ + + +/** + * An interpreted version of a safe projection. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedSafeProjection(expressions: Seq[Expression]) extends Projection { + + private[this] val mutableRow = new SpecificInternalRow(expressions.map(_.dataType)) + + private[this] val exprsWithWriters = expressions.zipWithIndex.filter { +case (NoOp, _) => false +case _ => true + }.map { case (e, i) => +val converter = generateSafeValueConverter(e.dataType) +val writer = generateRowWriter(i, e.dataType) +val f = if (!e.nullable) { + (v: Any) => writer(converter(v)) +} else { + (v: Any) => { +if (v == null) { + mutableRow.setNullAt(i) +} else { + writer(converter(v)) +} + } +} +(e, f) + } + + private def isPrimitive(dataType: DataType): Boolean = dataType match { +case BooleanType => true +case ByteType => true +case ShortType => true +case IntegerType => true +case LongType => true +case FloatType => true +case DoubleType => true +case _ => false + } + + private def generateSafeValueConverter(dt: DataType): Any => Any = dt match { +case ArrayType(elemType, _) => + if (isPrimitive(elemType)) { +v => { + val arrayValue = v.asInstanceOf[ArrayData] + new GenericArrayData(arrayValue.toArray[Any](elemType)) +} + } else { +val elementConverter = generateSafeValueConverter(elemType) +v => { + val arrayValue = v.asInstanceOf[ArrayData] + val result = new Array[Any](arrayValue.numElements()) + arrayValue.foreach(elemType, (i, e) => { +result(i) = elementConverter(e) + }) + new GenericArrayData(result) +} + } + +case st: StructType => + val fieldTypes = st.fields.map(_.dataType) + val fieldConverters = fieldTypes.map(generateSafeValueConverter) + v => { +val row = v.asInstanceOf[InternalRow] +val ar = new Array[Any](row.numFields) +var idx = 0 +while (idx < row.numFields) { + ar(idx) = fieldConverters(idx)(row.get(idx, fieldTypes(idx))) + idx += 1 +} +new GenericInternalRow(ar) + } + +case MapType(keyType, valueType, _) => + lazy val keyConverter = generateSafeValueConverter(keyType) + lazy val valueConverter = generateSafeValueConverter(valueType) + v => { +val mapValue = v.asInstanceOf[MapData] +val keys = mapValue.keyArray().toArray[Any](keyType) +val values = mapValue.valueArray().toArray[Any](valueType) +val convertedKeys = + if (isPrimitive(keyType)) keys else keys.map(keyConverter) +val convertedValues = + if (isPrimitive(valueType)) values else values.map(valueConverter) + +ArrayBasedMapData(convertedKeys, convertedValues) + } + +case udt: UserDefinedType[_] => +
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r225521397 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData} +import org.apache.spark.sql.types._ + + +/** + * An interpreted version of a safe projection. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedSafeProjection(expressions: Seq[Expression]) extends Projection { + + private[this] val mutableRow = new SpecificInternalRow(expressions.map(_.dataType)) + + private[this] val exprsWithWriters = expressions.zipWithIndex.filter { +case (NoOp, _) => false +case _ => true + }.map { case (e, i) => +val converter = generateSafeValueConverter(e.dataType) +val writer = generateRowWriter(i, e.dataType) +val f = if (!e.nullable) { + (v: Any) => writer(converter(v)) +} else { + (v: Any) => { +if (v == null) { + mutableRow.setNullAt(i) +} else { + writer(converter(v)) +} + } +} +(e, f) + } + + private def isPrimitive(dataType: DataType): Boolean = dataType match { +case BooleanType => true +case ByteType => true +case ShortType => true +case IntegerType => true +case LongType => true +case FloatType => true +case DoubleType => true +case _ => false + } + + private def generateSafeValueConverter(dt: DataType): Any => Any = dt match { +case ArrayType(elemType, _) => + if (isPrimitive(elemType)) { +v => { + val arrayValue = v.asInstanceOf[ArrayData] + new GenericArrayData(arrayValue.toArray[Any](elemType)) +} + } else { +val elementConverter = generateSafeValueConverter(elemType) +v => { + val arrayValue = v.asInstanceOf[ArrayData] + val result = new Array[Any](arrayValue.numElements()) + arrayValue.foreach(elemType, (i, e) => { +result(i) = elementConverter(e) + }) + new GenericArrayData(result) +} + } + +case st: StructType => + val fieldTypes = st.fields.map(_.dataType) + val fieldConverters = fieldTypes.map(generateSafeValueConverter) + v => { +val row = v.asInstanceOf[InternalRow] +val ar = new Array[Any](row.numFields) +var idx = 0 +while (idx < row.numFields) { + ar(idx) = fieldConverters(idx)(row.get(idx, fieldTypes(idx))) + idx += 1 +} +new GenericInternalRow(ar) + } + +case MapType(keyType, valueType, _) => + lazy val keyConverter = generateSafeValueConverter(keyType) + lazy val valueConverter = generateSafeValueConverter(valueType) + v => { +val mapValue = v.asInstanceOf[MapData] +val keys = mapValue.keyArray().toArray[Any](keyType) +val values = mapValue.valueArray().toArray[Any](valueType) +val convertedKeys = + if (isPrimitive(keyType)) keys else keys.map(keyConverter) --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r225521206 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData} +import org.apache.spark.sql.types._ + + +/** + * An interpreted version of a safe projection. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedSafeProjection(expressions: Seq[Expression]) extends Projection { + + private[this] val mutableRow = new SpecificInternalRow(expressions.map(_.dataType)) + + private[this] val exprsWithWriters = expressions.zipWithIndex.filter { +case (NoOp, _) => false +case _ => true + }.map { case (e, i) => +val converter = generateSafeValueConverter(e.dataType) +val writer = generateRowWriter(i, e.dataType) +val f = if (!e.nullable) { + (v: Any) => writer(converter(v)) +} else { + (v: Any) => { +if (v == null) { + mutableRow.setNullAt(i) +} else { + writer(converter(v)) +} + } +} +(e, f) + } + + private def isPrimitive(dataType: DataType): Boolean = dataType match { +case BooleanType => true +case ByteType => true +case ShortType => true +case IntegerType => true +case LongType => true +case FloatType => true +case DoubleType => true +case _ => false + } + + private def generateSafeValueConverter(dt: DataType): Any => Any = dt match { +case ArrayType(elemType, _) => + if (isPrimitive(elemType)) { --- End diff -- Let's not add this optimization at the beginning. We can add it later with a benchmark. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r225520812 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData} +import org.apache.spark.sql.types._ + + +/** + * An interpreted version of a safe projection. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedSafeProjection(expressions: Seq[Expression]) extends Projection { + + private[this] val mutableRow = new SpecificInternalRow(expressions.map(_.dataType)) + + private[this] val exprsWithWriters = expressions.zipWithIndex.filter { +case (NoOp, _) => false +case _ => true + }.map { case (e, i) => +val converter = generateSafeValueConverter(e.dataType) +val writer = generateRowWriter(i, e.dataType) +val f = if (!e.nullable) { + (v: Any) => writer(converter(v)) +} else { + (v: Any) => { +if (v == null) { + mutableRow.setNullAt(i) +} else { + writer(converter(v)) +} + } +} +(e, f) + } + + private def isPrimitive(dataType: DataType): Boolean = dataType match { --- End diff -- `CodeGenerator.isPrimitiveType` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22468#discussion_r225520290 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData} +import org.apache.spark.sql.types._ + + +/** + * An interpreted version of a safe projection. + * + * @param expressions that produces the resulting fields. These expressions must be bound + *to a schema. + */ +class InterpretedSafeProjection(expressions: Seq[Expression]) extends Projection { + + private[this] val mutableRow = new SpecificInternalRow(expressions.map(_.dataType)) + + private[this] val exprsWithWriters = expressions.zipWithIndex.filter { +case (NoOp, _) => false --- End diff -- does `SafeProjection` need to handle `NoOp`? It's only used with `MutableProjection` in aggregate. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22468: [SPARK-25374][SQL] SafeProjection supports fallba...
GitHub user maropu opened a pull request: https://github.com/apache/spark/pull/22468 [SPARK-25374][SQL] SafeProjection supports fallback to an interpreted mode ## What changes were proposed in this pull request? In SPARK-23711, we have implemented the expression fallback logic to an interpreted mode. So, this pr fixed code to support the same fallback mode in `SafeProjection` based on `CodeGeneratorWithInterpretedFallback`. ## How was this patch tested? Add tests in `CodeGeneratorWithInterpretedFallbackSuite` and `UnsafeRowConverterSuite`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/maropu/spark SPARK-25374-3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22468.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22468 commit 96b0f62758e91dcee520ce9e4a003cac172d5b2c Author: Takeshi Yamamuro Date: 2018-09-06T14:48:04Z Implement InterpretedSafeProjection --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org