juliuszsompolski commented on code in PR #51091:
URL: https://github.com/apache/spark/pull/51091#discussion_r2156633944


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -203,23 +225,36 @@ case class MergeRowsExec(
 
       if (isTargetRowPresent && isSourceRowPresent) {
         cardinalityValidator.validate(row)
-        applyInstructions(row, matchedInstructions)
+        applyInstructions(row, matchedInstructions, sourcePresent = true, 
targetPresent = true)
       } else if (isSourceRowPresent) {
-        applyInstructions(row, notMatchedInstructions)
+        applyInstructions(row, notMatchedInstructions, sourcePresent = true)
       } else if (isTargetRowPresent) {
-        applyInstructions(row, notMatchedBySourceInstructions)
+        applyInstructions(row, notMatchedBySourceInstructions, targetPresent = 
true)
       } else {
         null
       }
     }
 
     private def applyInstructions(
         row: InternalRow,
-        instructions: Seq[InstructionExec]): InternalRow = {
+        instructions: Seq[InstructionExec],
+        sourcePresent: Boolean = false,
+        targetPresent: Boolean = false): InternalRow = {
 
       for (instruction <- instructions) {
         if (instruction.condition.eval(row)) {
           instruction match {
+            case copy: CopyExec =>
+              // For GroupBased Merge, Spark inserts a Copy predicate
+              // for source + target present rows
+              // to retain the row if no other case matches
+              longMetric("numTargetRowsCopied") += 1
+              longMetric("numTargetRowsUnused") += 1
+              if (sourcePresent) {
+                longMetric("numSourceRowsUnused") += 1
+              }

Review Comment:
   hm, my idea was to not increment this metric here. Do not count rows that 
were copied as unused. That's why I proposed the name "unused" and now I see 
the confusion in your last comment.
   My idea, for these metrics is as follows:
   * numTargetRowsCopied - rows that were copied into the output unmodified. 
This metric is useful to show you the amount of write amplification.
   * num{Source|Target}RowsCopied - rows that have passed through the join in 
merge, but then were not used at all (MergeRows dropped it from output, 
returned null) because it didn't pass any of the instruction conditions. This 
metric is useful because it shows that there may be some filter pushdown 
potential to drop these rows earlier, in or before the join. For example in 
Delta, in some situations an OR of all matched contitions is pushed down, so 
that rows that would not pass any of them are pruned earlier: 
https://github.com/delta-io/delta/blob/5bbe7c81f65e2b136ba2211bc2d789c9d0206636/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/ClassicMergeExecutor.scala#L104
 - a small optimization that as far as I see currently is not done in DSv2.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to