fred-db commented on code in PR #38941:
URL: https://github.com/apache/spark/pull/38941#discussion_r1052485421


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -449,22 +449,54 @@ case class Union(
       AttributeSet.fromAttributeSets(children.map(_.outputSet)).size
   }
 
-  // updating nullability to make all the children consistent
-  override def output: Seq[Attribute] = {
-    children.map(_.output).transpose.map { attrs =>
-      val firstAttr = attrs.head
-      val nullable = attrs.exists(_.nullable)
-      val newDt = attrs.map(_.dataType).reduce(StructType.unionLikeMerge)
-      if (firstAttr.dataType == newDt) {
-        firstAttr.withNullability(nullable)
-      } else {
-        AttributeReference(firstAttr.name, newDt, nullable, 
firstAttr.metadata)(
-          firstAttr.exprId, firstAttr.qualifier)
-      }
+  /**
+   * Merges a sequence of attributes to have a common datatype and updates the
+   * nullability to be consistent with the attributes being merged.
+   */
+  private def mergeAttributes(attributes: Seq[Attribute]): Attribute = {
+    val firstAttr = attributes.head
+    val nullable = attributes.exists(_.nullable)
+    val newDt = attributes.map(_.dataType).reduce(StructType.unionLikeMerge)
+    if (firstAttr.dataType == newDt) {
+      firstAttr.withNullability(nullable)
+    } else {
+      AttributeReference(firstAttr.name, newDt, nullable, firstAttr.metadata)(
+        firstAttr.exprId, firstAttr.qualifier)
     }
   }
 
-  override def metadataOutput: Seq[Attribute] = Nil
+  override def output: Seq[Attribute] = 
children.map(_.output).transpose.map(mergeAttributes)
+
+  override def metadataOutput: Seq[Attribute] = {
+    val childrenMetadataOutput = children.map(_.metadataOutput)
+    // This follows similar code in `CheckAnalysis` to check if the output of 
a Union is correct,
+    // but just silently doesn't return an output instead of throwing an 
error. It also ensures
+    // that the attribute and data type names are the same.
+    val refDataTypes = childrenMetadataOutput.head.map(_.dataType)
+    val refAttrNames = childrenMetadataOutput.head.map(_.name)
+    childrenMetadataOutput.tail.foreach { childMetadataOutput =>
+      // We can only propagate the metadata output correctly if every child 
has the same
+      // number of columns
+      if (childMetadataOutput.length != refDataTypes.length) return Nil
+      // Check if the data types match by name and type
+      val childDataTypes = childMetadataOutput.map(_.dataType)
+      childDataTypes.zip(refDataTypes).foreach { case (dt1, dt2) =>
+        if (!DataType.equalsStructurally(dt1, dt2, true) ||
+           !DataType.equalsStructurallyByName(dt1, dt2, conf.resolver)) {

Review Comment:
   Yes, as otherwise, if there is a data type mismatch we won't detect it.
   I.e `attr -> int` and `attr -> string` wouldn't be detected. See `Metadata 
column is not propagated when children of Union have a type mismatch in a 
metadata column `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to