aokolnychyi commented on code in PR #51285:
URL: https://github.com/apache/spark/pull/51285#discussion_r2183926134


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -257,4 +272,27 @@ case class MergeRowsExec(
       null
     }
   }
+
+  // For group based merge, copy is inserted if row matches no other case
+  private def incrementCopyMetric(): Unit = longMetric("numTargetRowsCopied") 
+= 1

Review Comment:
   What's the cost of doing this per each row? I know we implement [some tricks 
for regular writes to update metrics only once per 100 
rows](https://github.com/apache/spark/blob/4ec992c773968b78ab2eebfd4bd3a1d55d23dcb1/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala#L26).
 Do we need to worry about it here?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -49,7 +48,21 @@ case class MergeRowsExec(
 
   override lazy val metrics: Map[String, SQLMetric] = Map(
     "numTargetRowsCopied" -> SQLMetrics.createMetric(sparkContext,
-      "Number of target rows copied unmodified because they did not match any 
action."))
+      "Number of target rows copied unmodified because they did not match any 
action"),

Review Comment:
   Minor: It seems like we call it "action" here and "clause" in all other 
metrics. It would be nice to align.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -231,23 +237,32 @@ case class MergeRowsExec(
 
     private def applyInstructions(
         row: InternalRow,
-        instructions: Seq[InstructionExec]): InternalRow = {
+        instructions: Seq[InstructionExec],
+        sourcePresent: Boolean = false): InternalRow = {
 
       for (instruction <- instructions) {
         if (instruction.condition.eval(row)) {
           instruction match {
-            case copy: CopyExec =>
-              // group-based operations copy over target rows that didn't 
match any actions
-              longMetric("numTargetRowsCopied") += 1
-              return copy.apply(row)
-
             case keep: KeepExec =>
+              keep.context match {
+                case Copy => incrementCopyMetric()
+                case Update => incrementUpdateMetric(sourcePresent)
+                case Insert => incrementInsertMetric()
+                case Delete => incrementDeleteMetric(sourcePresent)
+                case _ => throw new IllegalArgumentException(
+                  s"Unexpected context for KeepExec: ${keep.context}")
+              }

Review Comment:
   Minor: Shall we drop the empty line?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -231,23 +237,32 @@ case class MergeRowsExec(
 
     private def applyInstructions(
         row: InternalRow,
-        instructions: Seq[InstructionExec]): InternalRow = {
+        instructions: Seq[InstructionExec],
+        sourcePresent: Boolean = false): InternalRow = {
 
       for (instruction <- instructions) {
         if (instruction.condition.eval(row)) {
           instruction match {
-            case copy: CopyExec =>
-              // group-based operations copy over target rows that didn't 
match any actions
-              longMetric("numTargetRowsCopied") += 1
-              return copy.apply(row)
-
             case keep: KeepExec =>
+              keep.context match {
+                case Copy => incrementCopyMetric()
+                case Update => incrementUpdateMetric(sourcePresent)
+                case Insert => incrementInsertMetric()
+                case Delete => incrementDeleteMetric(sourcePresent)
+                case _ => throw new IllegalArgumentException(
+                  s"Unexpected context for KeepExec: ${keep.context}")
+              }
+
               return keep.apply(row)
 
             case _: DiscardExec =>
+              incrementDeleteMetric(sourcePresent)
+

Review Comment:
   Minor: Shall we drop the empty line?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -231,23 +237,32 @@ case class MergeRowsExec(
 
     private def applyInstructions(
         row: InternalRow,
-        instructions: Seq[InstructionExec]): InternalRow = {
+        instructions: Seq[InstructionExec],
+        sourcePresent: Boolean = false): InternalRow = {
 
       for (instruction <- instructions) {
         if (instruction.condition.eval(row)) {
           instruction match {
-            case copy: CopyExec =>
-              // group-based operations copy over target rows that didn't 
match any actions
-              longMetric("numTargetRowsCopied") += 1
-              return copy.apply(row)
-
             case keep: KeepExec =>
+              keep.context match {
+                case Copy => incrementCopyMetric()
+                case Update => incrementUpdateMetric(sourcePresent)
+                case Insert => incrementInsertMetric()
+                case Delete => incrementDeleteMetric(sourcePresent)
+                case _ => throw new IllegalArgumentException(
+                  s"Unexpected context for KeepExec: ${keep.context}")
+              }
+
               return keep.apply(row)
 
             case _: DiscardExec =>
+              incrementDeleteMetric(sourcePresent)
+
               return null
 
             case split: SplitExec =>
+              incrementUpdateMetric(sourcePresent)
+

Review Comment:
   Minor: Shall we drop the empty line?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to