[GitHub] [spark] cloud-fan commented on a diff in pull request #39517: [SPARK-41993][SQL] Move RowEncoder to AgnosticEncoders
cloud-fan commented on code in PR #39517: URL: https://github.com/apache/spark/pull/39517#discussion_r1071046887 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala: ## @@ -388,11 +388,10 @@ class ScalaReflectionSuite extends SparkFunSuite { } test("SPARK-15062: Get correct serializer for List[_]") { -val list = List(1, 2, 3) val serializer = serializerFor[List[Int]] -assert(serializer.isInstanceOf[NewInstance]) -assert(serializer.asInstanceOf[NewInstance] - .cls.isAssignableFrom(classOf[org.apache.spark.sql.catalyst.util.GenericArrayData])) +assert(serializer.isInstanceOf[MapObjects]) Review Comment: is `MapObjects` better than `NewInstance` for creating `List[Int]`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #39517: [SPARK-41993][SQL] Move RowEncoder to AgnosticEncoders
cloud-fan commented on code in PR #39517: URL: https://github.com/apache/spark/pull/39517#discussion_r1071007920 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala: ## @@ -82,12 +84,24 @@ object ScalaReflection extends ScalaReflection { } } - // TODO this name is slightly misleading. This returns the input - // data type we expect to see during serialization. - private[catalyst] def dataTypeFor(enc: AgnosticEncoder[_]): DataType = { + /** + * Return the data type we expect to see when deserializing a value with encoder `enc`. + */ + private[catalyst] def externalDataTypeFor(enc: AgnosticEncoder[_]): DataType = { +externalDataTypeFor(enc, lenientSerialization = false) + } + + private[catalyst] def lenientExternalDataTypeFor(enc: AgnosticEncoder[_]): DataType = Review Comment: ```suggestion private[catalyst] def lenientExternalDataTypeFor(enc: AgnosticEncoder[_]): DataType = ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #39517: [SPARK-41993][SQL] Move RowEncoder to AgnosticEncoders
cloud-fan commented on code in PR #39517: URL: https://github.com/apache/spark/pull/39517#discussion_r1068892182 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala: ## @@ -46,35 +46,42 @@ object AgnosticEncoders { override val clsTag: ClassTag[Option[E]] = ClassTag(classOf[Option[E]]) } - case class ArrayEncoder[E](element: AgnosticEncoder[E]) + case class ArrayEncoder[E](element: AgnosticEncoder[E], containsNull: Boolean) extends AgnosticEncoder[Array[E]] { override def isPrimitive: Boolean = false -override def dataType: DataType = ArrayType(element.dataType, element.nullable) +override def dataType: DataType = ArrayType(element.dataType, containsNull) override val clsTag: ClassTag[Array[E]] = element.clsTag.wrap } - case class IterableEncoder[C <: Iterable[E], E]( + case class IterableEncoder[C, E]( override val clsTag: ClassTag[C], - element: AgnosticEncoder[E]) + element: AgnosticEncoder[E], + containsNull: Boolean, + override val lenientSerialization: Boolean) Review Comment: Can we leave a code comment to mention it? It's not that obvious compared to `DateEncoder`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #39517: [SPARK-41993][SQL] Move RowEncoder to AgnosticEncoders
cloud-fan commented on code in PR #39517: URL: https://github.com/apache/spark/pull/39517#discussion_r1067668533 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala: ## @@ -46,35 +46,42 @@ object AgnosticEncoders { override val clsTag: ClassTag[Option[E]] = ClassTag(classOf[Option[E]]) } - case class ArrayEncoder[E](element: AgnosticEncoder[E]) + case class ArrayEncoder[E](element: AgnosticEncoder[E], containsNull: Boolean) extends AgnosticEncoder[Array[E]] { override def isPrimitive: Boolean = false -override def dataType: DataType = ArrayType(element.dataType, element.nullable) +override def dataType: DataType = ArrayType(element.dataType, containsNull) override val clsTag: ClassTag[Array[E]] = element.clsTag.wrap } - case class IterableEncoder[C <: Iterable[E], E]( + case class IterableEncoder[C, E]( override val clsTag: ClassTag[C], - element: AgnosticEncoder[E]) + element: AgnosticEncoder[E], + containsNull: Boolean, + override val lenientSerialization: Boolean) Review Comment: what does lenient mean for a `IterableEncoder`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #39517: [SPARK-41993][SQL] Move RowEncoder to AgnosticEncoders
cloud-fan commented on code in PR #39517: URL: https://github.com/apache/spark/pull/39517#discussion_r1067657414 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala: ## @@ -377,27 +408,96 @@ object ScalaReflection extends ScalaReflection { val getter = Invoke( KnownNotNull(input), field.name, - dataTypeFor(field.enc), - returnNullable = field.enc.nullable) + externalDataTypeFor(field.enc), + returnNullable = field.nullable) field.name -> serializerFor(field.enc, getter) } createSerializerForObject(input, serializedFields) + +case RowEncoder(fields) => + val serializedFields = fields.zipWithIndex.map { case (field, index) => +val fieldValue = serializerFor( + field.enc, + ValidateExternalType( +GetExternalRowField(input, index, field.name), +field.enc.dataType, +lenientExternalDataTypeFor(field.enc))) + +val convertedField = if (field.nullable) { + exprs.If( +Invoke(input, "isNullAt", BooleanType, exprs.Literal(index) :: Nil), +// Because we strip UDTs, `field.dataType` can be different from `fieldValue.dataType`. +// We should use `fieldValue.dataType` here. +exprs.Literal.create(null, fieldValue.dataType), +fieldValue + ) +} else { + AssertNotNull(fieldValue) +} +field.name -> convertedField + } + createSerializerForObject(input, serializedFields) } private def serializerForArray( - isArray: Boolean, elementEnc: AgnosticEncoder[_], - input: Expression): Expression = { -dataTypeFor(elementEnc) match { - case dt: ObjectType => -createSerializerForMapObjects(input, dt, serializerFor(elementEnc, _)) - case dt if isArray && elementEnc.isPrimitive => -createSerializerForPrimitiveArray(input, dt) - case dt => -createSerializerForGenericArray(input, dt, elementEnc.nullable) + elementNullable: Boolean, + input: Expression, + lenientSerialization: Boolean): Expression = { +// Default serializer for Seq and generic Arrays. This does not work for primitive arrays. Review Comment: hmm, why the name is `createSerializerForMapObjects`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #39517: [SPARK-41993][SQL] Move RowEncoder to AgnosticEncoders
cloud-fan commented on code in PR #39517: URL: https://github.com/apache/spark/pull/39517#discussion_r1067657104 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala: ## @@ -155,11 +169,19 @@ object ScalaReflection extends ScalaReflection { val walkedTypePath = WalkedTypePath().recordRoot(enc.clsTag.runtimeClass.getName) // Assumes we are deserializing the first column of a row. val input = GetColumnByOrdinal(0, enc.dataType) -val deserializer = deserializerFor( - enc, - upCastToExpectedType(input, enc.dataType, walkedTypePath), - walkedTypePath) -expressionWithNullSafety(deserializer, enc.nullable, walkedTypePath) +enc match { + case RowEncoder(fields) => Review Comment: what encoder do we create for inner struct? how is it different from the root `RowEncoder`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] cloud-fan commented on a diff in pull request #39517: [SPARK-41993][SQL] Move RowEncoder to AgnosticEncoders
cloud-fan commented on code in PR #39517: URL: https://github.com/apache/spark/pull/39517#discussion_r1067656569 ## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala: ## @@ -125,7 +125,7 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest { new StructType() .add("mapOfIntAndString", MapType(IntegerType, StringType)) .add("mapOfStringAndArray", MapType(StringType, arrayOfString)) - .add("mapOfArrayAndInt", MapType(arrayOfString, IntegerType)) Review Comment: `arrayOfString` doesn't work anymore inside map? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org