maropu commented on a change in pull request #31477:
URL: https://github.com/apache/spark/pull/31477#discussion_r570624874



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -57,7 +57,8 @@ case class BroadcastHashJoinExec(
   }
 
   override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+    "numMatchedPairs" -> SQLMetrics.createMetric(sparkContext, "number of 
matched pairs"))

Review comment:
       `numMatchedPairs` -> `numJoinedRows`?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
##########
@@ -415,24 +431,26 @@ trait HashJoin extends BaseJoinExec with CodegenSupport {
       ctx: CodegenContext,
       input: Seq[ExprCode]): (String, String, Seq[ExprCode]) = {
     val matched = ctx.freshName("matched")
+    val numMatched = metricTerm(ctx, "numMatchedPairs")
     val buildVars = genBuildSideVars(ctx, matched)
-    val checkCondition = if (condition.isDefined) {
-      val expr = condition.get
-      // evaluate the variables from build side that used by condition
-      val eval = evaluateRequiredVariables(buildPlan.output, buildVars, 
expr.references)
-      // filter the output via condition
-      ctx.currentVars = input ++ buildVars
-      val ev =
-        BindReferences.bindReference(expr, streamedPlan.output ++ 
buildPlan.output).genCode(ctx)
-      val skipRow = s"${ev.isNull} || !${ev.value}"
-      s"""
-         |$eval
-         |${ev.code}
-         |if (!($skipRow))
-       """.stripMargin
-    } else {
-      ""
-    }
+    val checkCondition = s"$numMatched.add(1);\n" +
+      (if (condition.isDefined) {
+        val expr = condition.get
+        // evaluate the variables from build side that used by condition
+        val eval = evaluateRequiredVariables(buildPlan.output, buildVars, 
expr.references)
+        // filter the output via condition
+        ctx.currentVars = input ++ buildVars
+        val ev =
+          BindReferences.bindReference(expr, streamedPlan.output ++ 
buildPlan.output).genCode(ctx)
+        val skipRow = s"${ev.isNull} || !${ev.value}"
+        s"""
+           |$eval
+           |${ev.code}
+           |if (!($skipRow))
+         """.stripMargin
+      } else {
+        ""
+      })

Review comment:
       Could you avoid unnecessary changes? For example;
   ```
       val checkCondition = if (condition.isDefined) {
         val expr = condition.get
         ...
       } else {
           ""
       }
       val some_val = s"$numMatched.add(1);\n$checkCondition"
       (matched, some_val, buildVars)
   ```

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
##########
@@ -89,13 +90,20 @@ case class BroadcastNestedLoopJoinExec(
     }
   }
 
-  @transient private lazy val boundCondition = {
+  private val numMatchedPairs = longMetric("numMatchedPairs")
+
+  @transient private lazy val boundCondition: InternalRow => Boolean =
     if (condition.isDefined) {

Review comment:
       If `condition` is empty, `numOutputRows` == `numMatchedPairs `? If so, 
could we only show this new metric in the web UI when `condition` defined?

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -57,7 +57,8 @@ case class BroadcastHashJoinExec(
   }
 
   override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+    "numMatchedPairs" -> SQLMetrics.createMetric(sparkContext, "number of 
matched pairs"))

Review comment:
       >  I feel that number of matched pairs might be more precise and less 
confusing to the user.
   
   Any reason? At first, I was not sure about what `pairs` means though.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala
##########
@@ -89,13 +90,20 @@ case class BroadcastNestedLoopJoinExec(
     }
   }
 
-  @transient private lazy val boundCondition = {
+  private val numMatchedPairs = longMetric("numMatchedPairs")
+
+  @transient private lazy val boundCondition: InternalRow => Boolean =
     if (condition.isDefined) {

Review comment:
       Ah, I see. What is the merit of being able to see this number for a 
user? I read the PR description and, at first, I thought this PR mainly targets 
at the case where a `condition` is given.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
##########
@@ -57,7 +57,8 @@ case class BroadcastHashJoinExec(
   }
 
   override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
+    "numMatchedPairs" -> SQLMetrics.createMetric(sparkContext, "number of 
matched pairs"))

Review comment:
       sgtm




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to