juliuszsompolski commented on code in PR #51091:
URL: https://github.com/apache/spark/pull/51091#discussion_r2143214271


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -233,6 +246,7 @@ case class MergeRowsExec(
         }
       }
 
+      longMetric("numTargetRowsCopied") += 1

Review Comment:
   I think this is incorrect.
   E.g. it will make a row from `applyInstructions(row, 
notMatchedInstructions)` that then does not match any of the 
notMatchedInstructions and falls through be counted here.
   Could you add a test for this?
   All rows that are copied over should be captured by 
`keepCarryoverRowsInstruction` at the end of matchedInstructions / 
notMatchedBySourceInstructions.
   



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/MergeRows.scala:
##########
@@ -87,7 +87,13 @@ object MergeRows {
     override def dataType: DataType = NullType
   }
 
-  case class Keep(condition: Expression, output: Seq[Expression]) extends 
Instruction {
+  case class Keep(
+      condition: Expression,
+      output: Seq[Expression],
+      // flag marking that row should be considered not matching
+      // any user predicate for metric calculations
+      systemPredicate: Boolean = false)

Review Comment:
   nit: I would just name it `copiedTargetRow`, to name it what it is.
   With the name `systemPredicate` there are two scenarios:
   - it will never be used for anything else, so better to name it for what it 
actually is used for.
   - naming it like this encourages it to be used for something else, and then 
have logic that decides somewhere what `systemPredicate` actually needs which I 
think would not be a good pattern.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala:
##########
@@ -199,7 +199,8 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand 
with PredicateHelper
     // as the last MATCHED and NOT MATCHED BY SOURCE instruction
     // this logic is specific to data sources that replace groups of data
     val carryoverRowsOutput = Literal(WRITE_WITH_METADATA_OPERATION) +: 
targetTable.output
-    val keepCarryoverRowsInstruction = Keep(TrueLiteral, carryoverRowsOutput)
+    val keepCarryoverRowsInstruction = Keep(TrueLiteral, carryoverRowsOutput,
+      systemPredicate = true)

Review Comment:
   nit scalastyle: break the whole Keep to newline instead of breaking at last 
argument.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to