rdblue commented on code in PR #9556:
URL: https://github.com/apache/iceberg/pull/9556#discussion_r1468925515
##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala:
##########
@@ -214,6 +214,8 @@ object RewriteMergeIntoTable extends
RewriteRowLevelIcebergCommand with Predicat
val rowFromSourceAttr = resolveAttrRef(ROW_FROM_SOURCE_REF, joinPlan)
val rowFromTargetAttr = resolveAttrRef(ROW_FROM_TARGET_REF, joinPlan)
+ // The output expression should retain read attributes for correctly
determining nullability
+ val matchedOutputsWithAttrs = matchedActions.map(matchedActionOutput(_,
metadataAttrs) :+ readAttrs)
Review Comment:
This doesn't make sense to me. This does have the intended effect to
correctly set the nullability in `buildMergeRowsOutput`, it seems to me like
there is a better way than creating a different (fake) version of
`matchedOutputs`. I think this works because it is including the original
attributes as fake outputs (_for every action output list_). A more direct way
of doing the same thing is to make an attribute nullable if it is nullable OR
if an output is nullable.
In other words, I think the actual problem is in
`RewriteRowLevelIcebergCommand.buildMergingOutput`:
```scala
protected def buildMergingOutput(
outputs: Seq[Seq[Expression]],
attrs: Seq[Attribute]): Seq[Attribute] = {
// build a correct nullability map for output attributes
// an attribute is nullable if at least one output may produce null
val nullabilityMap = attrs.indices.map { index =>
index -> outputs.exists(output => output(index).nullable)
}.toMap
attrs.zipWithIndex.map { case (attr, index) =>
AttributeReference(attr.name, attr.dataType, nullabilityMap(index))()
}
}
```
The change in the current PR updates the `outputs` arg to that method to
include a copy of the original `attrs` for every matched expression. What ends
up happening is that `outputs.exists` checks whether the corresponding original
attr is nullable. You could do the same without this PR's change like this:
```scala
protected def buildMergingOutput(
outputs: Seq[Seq[Expression]],
attrs: Seq[Attribute]): Seq[Attribute] = {
// build a correct nullability map for output attributes
// an attribute is nullable if at least one output may produce null
val nullabilityMap = attrs.indices.map { index =>
index -> outputs.exists(output => output(index).nullable)
}.toMap
attrs.zipWithIndex.map { case (attr, index) =>
AttributeReference(attr.name, attr.dataType, attr.nullable ||
nullabilityMap(index))()
}
}
```
Looking at this code also raises a larger question: what is
`buildMergingOutput` trying to do? It basically copies the input attrs to be
the `MergeRows` output attrs, but then bases nullability on whether unrelated
expressions are nullable. The problem is the expression coming from an output
comes from an `Assignment` and has nothing to do with the position in the input
attrs.
@aokolnychyi do you know what's happening here? Why would the output of
assignment expressions affect the nullability of the incoming data?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]