[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-20 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226844557
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
 ---
@@ -198,7 +196,7 @@ object RowEncoder {
 
   if (inputObject.nullable) {
 If(IsNull(inputObject),
-  Literal.create(null, inputType),
+  Literal.create(null, nonNullOutput.dataType),
--- End diff --

Created a separate PR at #22785.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-20 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226828791
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
 ---
@@ -198,7 +196,7 @@ object RowEncoder {
 
   if (inputObject.nullable) {
 If(IsNull(inputObject),
-  Literal.create(null, inputType),
+  Literal.create(null, nonNullOutput.dataType),
--- End diff --

This might be worth a separate PR. I'm considering to create one for it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226519284
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -43,10 +44,11 @@ import org.apache.spark.util.Utils
  *to the name `value`.
  */
 object ExpressionEncoder {
+
   def apply[T : TypeTag](): ExpressionEncoder[T] = {
-// We convert the not-serializable TypeTag into StructType and 
ClassTag.
 val mirror = ScalaReflection.mirror
-val tpe = typeTag[T].in(mirror).tpe
+val tpe = ScalaReflection.localTypeOf[T]
--- End diff --

`localTypeOf` has a `dealias` at the end.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226506566
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -43,10 +44,11 @@ import org.apache.spark.util.Utils
  *to the name `value`.
  */
 object ExpressionEncoder {
+
   def apply[T : TypeTag](): ExpressionEncoder[T] = {
-// We convert the not-serializable TypeTag into StructType and 
ClassTag.
 val mirror = ScalaReflection.mirror
-val tpe = typeTag[T].in(mirror).tpe
+val tpe = ScalaReflection.localTypeOf[T]
--- End diff --

`localTypeOf` is actually doing the same thing. I think it is better to use 
ScalaReflection for such thing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226506718
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -103,75 +88,61 @@ object ExpressionEncoder {
* name/positional binding is preserved.
*/
   def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = {
+if (encoders.length > 22) {
+  throw new RuntimeException("Can't construct a tuple encoder for more 
than 22 encoders.")
+}
+
 encoders.foreach(_.assertUnresolved())
 
 val schema = StructType(encoders.zipWithIndex.map {
   case (e, i) =>
-val (dataType, nullable) = if (e.flat) {
-  e.schema.head.dataType -> e.schema.head.nullable
-} else {
-  e.schema -> true
-}
-StructField(s"_${i + 1}", dataType, nullable)
+StructField(s"_${i + 1}", e.objSerializer.dataType, 
e.objSerializer.nullable)
 })
 
 val cls = 
Utils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple${encoders.size}")
 
-val serializer = encoders.zipWithIndex.map { case (enc, index) =>
-  val originalInputObject = enc.serializer.head.collect { case b: 
BoundReference => b }.head
+val serializers = encoders.zipWithIndex.map { case (enc, index) =>
+  val boundRefs = enc.objSerializer.collect { case b: BoundReference 
=> b }.distinct
+  assert(boundRefs.size == 1, "object serializer should have only one 
bound reference but " +
+s"there are ${boundRefs.size}")
+
+  val originalInputObject = boundRefs.head
   val newInputObject = Invoke(
 BoundReference(0, ObjectType(cls), nullable = true),
 s"_${index + 1}",
-originalInputObject.dataType)
+originalInputObject.dataType,
+returnNullable = originalInputObject.nullable)
 
-  val newSerializer = enc.serializer.map(_.transformUp {
+  val newSerializer = enc.objSerializer.transformUp {
 case b: BoundReference if b == originalInputObject => 
newInputObject
--- End diff --

yes, right.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226507104
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -212,21 +183,88 @@ object ExpressionEncoder {
  * A generic encoder for JVM objects that uses Catalyst Expressions for a 
`serializer`
  * and a `deserializer`.
  *
- * @param schema The schema after converting `T` to a Spark SQL row.
- * @param serializer A set of expressions, one for each top-level field 
that can be used to
- *   extract the values from a raw object into an 
[[InternalRow]].
- * @param deserializer An expression that will construct an object given 
an [[InternalRow]].
+ * @param objSerializer An expression that can be used to encode a raw 
object to corresponding
+ *   Spark SQL representation that can be a primitive 
column, array, map or a
+ *   struct. This represents how Spark SQL generally 
serializes an object of
+ *   type `T`.
+ * @param objDeserializer An expression that will construct an object 
given a Spark SQL
+ *representation. This represents how Spark SQL 
generally deserializes
+ *a serialized value in Spark SQL representation 
back to an object of
+ *type `T`.
  * @param clsTag A classtag for `T`.
  */
 case class ExpressionEncoder[T](
-schema: StructType,
-flat: Boolean,
-serializer: Seq[Expression],
-deserializer: Expression,
+objSerializer: Expression,
+objDeserializer: Expression,
 clsTag: ClassTag[T])
   extends Encoder[T] {
 
-  if (flat) require(serializer.size == 1)
+  /**
+   * A set of expressions, one for each top-level field that can be used to
+   * extract the values from a raw object into an [[InternalRow]]:
+   * 1. If `serializer` encodes a raw object to a struct, we directly use 
the `serializer`.
+   * 2. For other cases, we create a struct to wrap the `serializer`.
+   */
+  val serializer: Seq[NamedExpression] = {
+val serializedAsStruct = 
objSerializer.dataType.isInstanceOf[StructType]
+val clsName = Utils.getSimpleName(clsTag.runtimeClass)
+
+if (serializedAsStruct) {
+  val nullSafeSerializer = objSerializer.transformUp {
+case r: BoundReference =>
+  // For input object of Product type, we can't encode it to row 
if it's null, as Spark SQL
+  // doesn't allow top-level row to be null, only its columns can 
be null.
+  AssertNotNull(r, Seq("top level Product or row object"))
+  }
+  nullSafeSerializer match {
+case If(_, _, s: CreateNamedStruct) => s
--- End diff --

ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226506593
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -103,75 +88,61 @@ object ExpressionEncoder {
* name/positional binding is preserved.
*/
   def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = {
+if (encoders.length > 22) {
--- End diff --

sure. ok.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226301402
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -212,21 +183,88 @@ object ExpressionEncoder {
  * A generic encoder for JVM objects that uses Catalyst Expressions for a 
`serializer`
  * and a `deserializer`.
  *
- * @param schema The schema after converting `T` to a Spark SQL row.
- * @param serializer A set of expressions, one for each top-level field 
that can be used to
- *   extract the values from a raw object into an 
[[InternalRow]].
- * @param deserializer An expression that will construct an object given 
an [[InternalRow]].
+ * @param objSerializer An expression that can be used to encode a raw 
object to corresponding
+ *   Spark SQL representation that can be a primitive 
column, array, map or a
+ *   struct. This represents how Spark SQL generally 
serializes an object of
+ *   type `T`.
+ * @param objDeserializer An expression that will construct an object 
given a Spark SQL
+ *representation. This represents how Spark SQL 
generally deserializes
+ *a serialized value in Spark SQL representation 
back to an object of
+ *type `T`.
  * @param clsTag A classtag for `T`.
  */
 case class ExpressionEncoder[T](
-schema: StructType,
-flat: Boolean,
-serializer: Seq[Expression],
-deserializer: Expression,
+objSerializer: Expression,
+objDeserializer: Expression,
 clsTag: ClassTag[T])
   extends Encoder[T] {
 
-  if (flat) require(serializer.size == 1)
+  /**
+   * A set of expressions, one for each top-level field that can be used to
+   * extract the values from a raw object into an [[InternalRow]]:
+   * 1. If `serializer` encodes a raw object to a struct, we directly use 
the `serializer`.
+   * 2. For other cases, we create a struct to wrap the `serializer`.
+   */
+  val serializer: Seq[NamedExpression] = {
+val serializedAsStruct = 
objSerializer.dataType.isInstanceOf[StructType]
+val clsName = Utils.getSimpleName(clsTag.runtimeClass)
+
+if (serializedAsStruct) {
+  val nullSafeSerializer = objSerializer.transformUp {
+case r: BoundReference =>
+  // For input object of Product type, we can't encode it to row 
if it's null, as Spark SQL
+  // doesn't allow top-level row to be null, only its columns can 
be null.
+  AssertNotNull(r, Seq("top level Product or row object"))
+  }
+  nullSafeSerializer match {
+case If(_, _, s: CreateNamedStruct) => s
+case s: CreateNamedStruct => s
+case _ =>
+  throw new RuntimeException(s"class $clsName has unexpected 
serializer: $objSerializer")
+  }
+} else {
+  // For other input objects like primitive, array, map, etc., we 
construct a struct to wrap
+  // the serializer which is a column of an row.
+  CreateNamedStruct(Literal("value") :: objSerializer :: Nil)
+}
+  }.flatten
+
+  /**
+   * Returns an expression that can be used to deserialize an input row to 
an object of type `T`
+   * with a compatible schema. Fields of the row will be extracted using 
`UnresolvedAttribute`.
+   * of the same name as the constructor arguments.
+   *
+   * For complex objects that are encoded to structs, Fields of the struct 
will be extracted using
+   * `GetColumnByOrdinal` with corresponding ordinal.
+   */
+  val deserializer: Expression = {
+val serializedAsStruct = 
objSerializer.dataType.isInstanceOf[StructType]
+
+if (serializedAsStruct) {
+  // We serialized this kind of objects to root-level row. The input 
of general deserializer
+  // is a `GetColumnByOrdinal(0)` expression to extract first column 
of a row. We need to
+  // transform attributes accessors.
+  objDeserializer.transform {
+case UnresolvedExtractValue(GetColumnByOrdinal(0, _),
+Literal(part: UTF8String, StringType)) =>
+  UnresolvedAttribute.quoted(part.toString)
+case GetStructField(GetColumnByOrdinal(0, dt), ordinal, _) =>
+  GetColumnByOrdinal(ordinal, dt)
+case If(IsNull(GetColumnByOrdinal(0, _)), _, n: NewInstance) => n
+case If(IsNull(GetColumnByOrdinal(0, _)), _, i: 
InitializeJavaBean) => i
+  }
+} else {
+  // For other input objects like primitive, array, map, etc., we 
deserialize the first column
+  // of a row to the object.
+  objDeserializer
+}
+  }
+
 

[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226301139
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -212,21 +183,88 @@ object ExpressionEncoder {
  * A generic encoder for JVM objects that uses Catalyst Expressions for a 
`serializer`
  * and a `deserializer`.
  *
- * @param schema The schema after converting `T` to a Spark SQL row.
- * @param serializer A set of expressions, one for each top-level field 
that can be used to
- *   extract the values from a raw object into an 
[[InternalRow]].
- * @param deserializer An expression that will construct an object given 
an [[InternalRow]].
+ * @param objSerializer An expression that can be used to encode a raw 
object to corresponding
+ *   Spark SQL representation that can be a primitive 
column, array, map or a
+ *   struct. This represents how Spark SQL generally 
serializes an object of
+ *   type `T`.
+ * @param objDeserializer An expression that will construct an object 
given a Spark SQL
+ *representation. This represents how Spark SQL 
generally deserializes
+ *a serialized value in Spark SQL representation 
back to an object of
+ *type `T`.
  * @param clsTag A classtag for `T`.
  */
 case class ExpressionEncoder[T](
-schema: StructType,
-flat: Boolean,
-serializer: Seq[Expression],
-deserializer: Expression,
+objSerializer: Expression,
+objDeserializer: Expression,
 clsTag: ClassTag[T])
   extends Encoder[T] {
 
-  if (flat) require(serializer.size == 1)
+  /**
+   * A set of expressions, one for each top-level field that can be used to
+   * extract the values from a raw object into an [[InternalRow]]:
+   * 1. If `serializer` encodes a raw object to a struct, we directly use 
the `serializer`.
+   * 2. For other cases, we create a struct to wrap the `serializer`.
+   */
+  val serializer: Seq[NamedExpression] = {
+val serializedAsStruct = 
objSerializer.dataType.isInstanceOf[StructType]
+val clsName = Utils.getSimpleName(clsTag.runtimeClass)
+
+if (serializedAsStruct) {
+  val nullSafeSerializer = objSerializer.transformUp {
+case r: BoundReference =>
+  // For input object of Product type, we can't encode it to row 
if it's null, as Spark SQL
+  // doesn't allow top-level row to be null, only its columns can 
be null.
+  AssertNotNull(r, Seq("top level Product or row object"))
+  }
+  nullSafeSerializer match {
+case If(_, _, s: CreateNamedStruct) => s
+case s: CreateNamedStruct => s
+case _ =>
+  throw new RuntimeException(s"class $clsName has unexpected 
serializer: $objSerializer")
+  }
+} else {
+  // For other input objects like primitive, array, map, etc., we 
construct a struct to wrap
+  // the serializer which is a column of an row.
+  CreateNamedStruct(Literal("value") :: objSerializer :: Nil)
+}
+  }.flatten
+
+  /**
+   * Returns an expression that can be used to deserialize an input row to 
an object of type `T`
+   * with a compatible schema. Fields of the row will be extracted using 
`UnresolvedAttribute`.
+   * of the same name as the constructor arguments.
+   *
+   * For complex objects that are encoded to structs, Fields of the struct 
will be extracted using
+   * `GetColumnByOrdinal` with corresponding ordinal.
+   */
+  val deserializer: Expression = {
+val serializedAsStruct = 
objSerializer.dataType.isInstanceOf[StructType]
+
+if (serializedAsStruct) {
+  // We serialized this kind of objects to root-level row. The input 
of general deserializer
+  // is a `GetColumnByOrdinal(0)` expression to extract first column 
of a row. We need to
+  // transform attributes accessors.
+  objDeserializer.transform {
+case UnresolvedExtractValue(GetColumnByOrdinal(0, _),
+Literal(part: UTF8String, StringType)) =>
+  UnresolvedAttribute.quoted(part.toString)
+case GetStructField(GetColumnByOrdinal(0, dt), ordinal, _) =>
+  GetColumnByOrdinal(ordinal, dt)
+case If(IsNull(GetColumnByOrdinal(0, _)), _, n: NewInstance) => n
+case If(IsNull(GetColumnByOrdinal(0, _)), _, i: 
InitializeJavaBean) => i
+  }
+} else {
+  // For other input objects like primitive, array, map, etc., we 
deserialize the first column
+  // of a row to the object.
+  objDeserializer
+}
+  }
+
 

[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226299441
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -212,21 +183,88 @@ object ExpressionEncoder {
  * A generic encoder for JVM objects that uses Catalyst Expressions for a 
`serializer`
  * and a `deserializer`.
  *
- * @param schema The schema after converting `T` to a Spark SQL row.
- * @param serializer A set of expressions, one for each top-level field 
that can be used to
- *   extract the values from a raw object into an 
[[InternalRow]].
- * @param deserializer An expression that will construct an object given 
an [[InternalRow]].
+ * @param objSerializer An expression that can be used to encode a raw 
object to corresponding
+ *   Spark SQL representation that can be a primitive 
column, array, map or a
+ *   struct. This represents how Spark SQL generally 
serializes an object of
+ *   type `T`.
+ * @param objDeserializer An expression that will construct an object 
given a Spark SQL
+ *representation. This represents how Spark SQL 
generally deserializes
+ *a serialized value in Spark SQL representation 
back to an object of
+ *type `T`.
  * @param clsTag A classtag for `T`.
  */
 case class ExpressionEncoder[T](
-schema: StructType,
-flat: Boolean,
-serializer: Seq[Expression],
-deserializer: Expression,
+objSerializer: Expression,
+objDeserializer: Expression,
 clsTag: ClassTag[T])
   extends Encoder[T] {
 
-  if (flat) require(serializer.size == 1)
+  /**
+   * A set of expressions, one for each top-level field that can be used to
--- End diff --

set -> sequence


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226298803
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -212,21 +183,88 @@ object ExpressionEncoder {
  * A generic encoder for JVM objects that uses Catalyst Expressions for a 
`serializer`
  * and a `deserializer`.
  *
- * @param schema The schema after converting `T` to a Spark SQL row.
- * @param serializer A set of expressions, one for each top-level field 
that can be used to
- *   extract the values from a raw object into an 
[[InternalRow]].
- * @param deserializer An expression that will construct an object given 
an [[InternalRow]].
+ * @param objSerializer An expression that can be used to encode a raw 
object to corresponding
+ *   Spark SQL representation that can be a primitive 
column, array, map or a
+ *   struct. This represents how Spark SQL generally 
serializes an object of
+ *   type `T`.
+ * @param objDeserializer An expression that will construct an object 
given a Spark SQL
+ *representation. This represents how Spark SQL 
generally deserializes
+ *a serialized value in Spark SQL representation 
back to an object of
+ *type `T`.
  * @param clsTag A classtag for `T`.
  */
 case class ExpressionEncoder[T](
-schema: StructType,
-flat: Boolean,
-serializer: Seq[Expression],
-deserializer: Expression,
+objSerializer: Expression,
+objDeserializer: Expression,
 clsTag: ClassTag[T])
   extends Encoder[T] {
 
-  if (flat) require(serializer.size == 1)
+  /**
+   * A set of expressions, one for each top-level field that can be used to
+   * extract the values from a raw object into an [[InternalRow]]:
+   * 1. If `serializer` encodes a raw object to a struct, we directly use 
the `serializer`.
+   * 2. For other cases, we create a struct to wrap the `serializer`.
+   */
+  val serializer: Seq[NamedExpression] = {
+val serializedAsStruct = 
objSerializer.dataType.isInstanceOf[StructType]
+val clsName = Utils.getSimpleName(clsTag.runtimeClass)
+
+if (serializedAsStruct) {
+  val nullSafeSerializer = objSerializer.transformUp {
+case r: BoundReference =>
+  // For input object of Product type, we can't encode it to row 
if it's null, as Spark SQL
+  // doesn't allow top-level row to be null, only its columns can 
be null.
+  AssertNotNull(r, Seq("top level Product or row object"))
+  }
+  nullSafeSerializer match {
+case If(_, _, s: CreateNamedStruct) => s
--- End diff --

let's also make sure the if condition is `IsNull`, which better explains 
why we strip it(it can't be null)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226296369
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -103,75 +88,61 @@ object ExpressionEncoder {
* name/positional binding is preserved.
*/
   def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = {
--- End diff --

cool, this method is simplified a lot with the new abstraction.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226295859
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -103,75 +88,61 @@ object ExpressionEncoder {
* name/positional binding is preserved.
*/
   def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = {
+if (encoders.length > 22) {
+  throw new RuntimeException("Can't construct a tuple encoder for more 
than 22 encoders.")
+}
+
 encoders.foreach(_.assertUnresolved())
 
 val schema = StructType(encoders.zipWithIndex.map {
   case (e, i) =>
-val (dataType, nullable) = if (e.flat) {
-  e.schema.head.dataType -> e.schema.head.nullable
-} else {
-  e.schema -> true
-}
-StructField(s"_${i + 1}", dataType, nullable)
+StructField(s"_${i + 1}", e.objSerializer.dataType, 
e.objSerializer.nullable)
 })
 
 val cls = 
Utils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple${encoders.size}")
 
-val serializer = encoders.zipWithIndex.map { case (enc, index) =>
-  val originalInputObject = enc.serializer.head.collect { case b: 
BoundReference => b }.head
+val serializers = encoders.zipWithIndex.map { case (enc, index) =>
+  val boundRefs = enc.objSerializer.collect { case b: BoundReference 
=> b }.distinct
+  assert(boundRefs.size == 1, "object serializer should have only one 
bound reference but " +
+s"there are ${boundRefs.size}")
+
+  val originalInputObject = boundRefs.head
   val newInputObject = Invoke(
 BoundReference(0, ObjectType(cls), nullable = true),
 s"_${index + 1}",
-originalInputObject.dataType)
+originalInputObject.dataType,
+returnNullable = originalInputObject.nullable)
 
-  val newSerializer = enc.serializer.map(_.transformUp {
+  val newSerializer = enc.objSerializer.transformUp {
 case b: BoundReference if b == originalInputObject => 
newInputObject
--- End diff --

Since there is only one distinct `BoundReference`, we can just write `case 
b: BoundReference  => newInputObject`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226294255
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -103,75 +88,61 @@ object ExpressionEncoder {
* name/positional binding is preserved.
*/
   def tuple(encoders: Seq[ExpressionEncoder[_]]): ExpressionEncoder[_] = {
+if (encoders.length > 22) {
--- End diff --

can we do it in a separated PR with a test?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-18 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22749#discussion_r226294017
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -43,10 +44,11 @@ import org.apache.spark.util.Utils
  *to the name `value`.
  */
 object ExpressionEncoder {
+
   def apply[T : TypeTag](): ExpressionEncoder[T] = {
-// We convert the not-serializable TypeTag into StructType and 
ClassTag.
 val mirror = ScalaReflection.mirror
-val tpe = typeTag[T].in(mirror).tpe
+val tpe = ScalaReflection.localTypeOf[T]
--- End diff --

why change it from `typeTag[T].in(mirror).tpe`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22749: [WIP][SPARK-25746][SQL] Refactoring ExpressionEnc...

2018-10-16 Thread viirya
GitHub user viirya opened a pull request:

https://github.com/apache/spark/pull/22749

[WIP][SPARK-25746][SQL] Refactoring ExpressionEncoder to get rid of flat 
flag

## What changes were proposed in this pull request?

This is inspired during implementing #21732. For now `ScalaReflection` 
needs to consider how `ExpressionEncoder` uses generated serializers and 
deserializers. And `ExpressionEncoder` has a weird `flat` flag. After 
discussion with @cloud-fan, it seems to be better to refactor 
`ExpressionEncoder`. It should make SPARK-24762 easier to do.

## How was this patch tested?

Existing tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viirya/spark-1 SPARK-24762-refactor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22749.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22749


commit e1b5deebe715479125c8878f0c90a55dc9ab3e85
Author: Liang-Chi Hsieh 
Date:   2018-07-09T03:42:04Z

Aggregator should be able to use Option of Product encoder.

commit 80506f4e98184ccd66dbaac14ec52d69c358020d
Author: Liang-Chi Hsieh 
Date:   2018-07-13T04:40:55Z

Enable top-level Option of Product encoders.

commit ed3d5cb697b10af2e2cf4c78ab521d4d0b2f3c9b
Author: Liang-Chi Hsieh 
Date:   2018-08-24T04:26:28Z

Remove topLevel parameter.

commit 9fc3f6165156051142a8366a32726badaaa16bb7
Author: Liang-Chi Hsieh 
Date:   2018-08-24T04:37:39Z

Merge remote-tracking branch 'upstream/master' into SPARK-24762

commit 5f95bd0cf1bd308c7df55c41caef7a9f19368f5d
Author: Liang-Chi Hsieh 
Date:   2018-08-24T04:42:33Z

Remove useless change.

commit a4f04055b2ba22f371663565710328791942855a
Author: Liang-Chi Hsieh 
Date:   2018-08-24T14:38:16Z

Add more tests.

commit c1f798f7e9cba0d04223eed06f1b1f547ec29dc5
Author: Liang-Chi Hsieh 
Date:   2018-08-25T01:52:01Z

Add test.

commit 80e11d289d7775863cb9c28b2c1d4364292048a4
Author: Liang-Chi Hsieh 
Date:   2018-10-06T04:06:57Z

Merge remote-tracking branch 'upstream/master' into SPARK-24762

commit 0f029b0a28700334dc6334f1ad89b3124f235a51
Author: Liang-Chi Hsieh 
Date:   2018-10-06T04:40:07Z

Improve code comments.

commit d755e8406f06117ccc96b8f19debab6b2a736e10
Author: Liang-Chi Hsieh 
Date:   2018-10-15T09:55:03Z

Refactoring ExpressionEncoder.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org