[ 
https://issues.apache.org/jira/browse/SPARK-26316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16717141#comment-16717141
 ] 

ASF GitHub Bot commented on SPARK-26316:
----------------------------------------

asfgit closed pull request #23269: [SPARK-26316] Revert hash join metrics in 
spark 21052 that causes performance degradation 
URL: https://github.com/apache/spark/pull/23269
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
index a6f3ea47c8492..fd4a7897c7ad1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
@@ -29,7 +29,6 @@ import 
org.apache.spark.sql.catalyst.plans.physical.{BroadcastDistribution, Dist
 import org.apache.spark.sql.execution.{BinaryExecNode, CodegenSupport, 
SparkPlan}
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.types.{BooleanType, LongType}
-import org.apache.spark.util.TaskCompletionListener
 
 /**
  * Performs an inner hash join of two child relations.  When the output RDD of 
this operator is
@@ -48,8 +47,7 @@ case class BroadcastHashJoinExec(
   extends BinaryExecNode with HashJoin with CodegenSupport {
 
   override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
-    "avgHashProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hash 
probe"))
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
 
   override def requiredChildDistribution: Seq[Distribution] = {
     val mode = HashedRelationBroadcastMode(buildKeys)
@@ -63,13 +61,12 @@ case class BroadcastHashJoinExec(
 
   protected override def doExecute(): RDD[InternalRow] = {
     val numOutputRows = longMetric("numOutputRows")
-    val avgHashProbe = longMetric("avgHashProbe")
 
     val broadcastRelation = buildPlan.executeBroadcast[HashedRelation]()
     streamedPlan.execute().mapPartitions { streamedIter =>
       val hashed = broadcastRelation.value.asReadOnlyCopy()
       
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
-      join(streamedIter, hashed, numOutputRows, avgHashProbe)
+      join(streamedIter, hashed, numOutputRows)
     }
   }
 
@@ -111,23 +108,6 @@ case class BroadcastHashJoinExec(
     }
   }
 
-  /**
-   * Returns the codes used to add a task completion listener to update avg 
hash probe
-   * at the end of the task.
-   */
-  private def genTaskListener(avgHashProbe: String, relationTerm: String): 
String = {
-    val listenerClass = classOf[TaskCompletionListener].getName
-    val taskContextClass = classOf[TaskContext].getName
-    s"""
-       | $taskContextClass$$.MODULE$$.get().addTaskCompletionListener(new 
$listenerClass() {
-       |   @Override
-       |   public void onTaskCompletion($taskContextClass context) {
-       |     $avgHashProbe.set($relationTerm.getAverageProbesPerLookup());
-       |   }
-       | });
-     """.stripMargin
-  }
-
   /**
    * Returns a tuple of Broadcast of HashedRelation and the variable name for 
it.
    */
@@ -137,15 +117,11 @@ case class BroadcastHashJoinExec(
     val broadcast = ctx.addReferenceObj("broadcast", broadcastRelation)
     val clsName = broadcastRelation.value.getClass.getName
 
-    // At the end of the task, we update the avg hash probe.
-    val avgHashProbe = metricTerm(ctx, "avgHashProbe")
-
     // Inline mutable state since not many join operations in a task
     val relationTerm = ctx.addMutableState(clsName, "relation",
       v => s"""
          | $v = (($clsName) $broadcast.value()).asReadOnlyCopy();
          | incPeakExecutionMemory($v.estimatedSize());
-         | ${genTaskListener(avgHashProbe, v)}
        """.stripMargin, forceInline = true)
     (broadcastRelation, relationTerm)
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index dab873bf9b9a0..1aef5f6864263 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.execution.joins
 
-import org.apache.spark.TaskContext
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
@@ -194,8 +193,7 @@ trait HashJoin {
   protected def join(
       streamedIter: Iterator[InternalRow],
       hashed: HashedRelation,
-      numOutputRows: SQLMetric,
-      avgHashProbe: SQLMetric): Iterator[InternalRow] = {
+      numOutputRows: SQLMetric): Iterator[InternalRow] = {
 
     val joinedIter = joinType match {
       case _: InnerLike =>
@@ -213,10 +211,6 @@ trait HashJoin {
           s"BroadcastHashJoin should not take $x as the JoinType")
     }
 
-    // At the end of the task, we update the avg hash probe.
-    TaskContext.get().addTaskCompletionListener[Unit](_ =>
-      avgHashProbe.set(hashed.getAverageProbesPerLookup))
-
     val resultProj = createResultProjection
     joinedIter.map { r =>
       numOutputRows += 1
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index b1ff6e83acc24..7c21062c4cec3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -80,11 +80,6 @@ private[execution] sealed trait HashedRelation extends 
KnownSizeEstimation {
    * Release any used resources.
    */
   def close(): Unit
-
-  /**
-   * Returns the average number of probes per key lookup.
-   */
-  def getAverageProbesPerLookup: Double
 }
 
 private[execution] object HashedRelation {
@@ -279,8 +274,6 @@ private[joins] class UnsafeHashedRelation(
   override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException {
     read(() => in.readInt(), () => in.readLong(), in.readBytes)
   }
-
-  override def getAverageProbesPerLookup: Double = 
binaryMap.getAverageProbesPerLookup
 }
 
 private[joins] object UnsafeHashedRelation {
@@ -395,10 +388,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
   // The number of unique keys.
   private var numKeys = 0L
 
-  // Tracking average number of probes per key lookup.
-  private var numKeyLookups = 0L
-  private var numProbes = 0L
-
   // needed by serializer
   def this() = {
     this(
@@ -483,8 +472,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
    */
   def getValue(key: Long, resultRow: UnsafeRow): UnsafeRow = {
     if (isDense) {
-      numKeyLookups += 1
-      numProbes += 1
       if (key >= minKey && key <= maxKey) {
         val value = array((key - minKey).toInt)
         if (value > 0) {
@@ -493,14 +480,11 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
       }
     } else {
       var pos = firstSlot(key)
-      numKeyLookups += 1
-      numProbes += 1
       while (array(pos + 1) != 0) {
         if (array(pos) == key) {
           return getRow(array(pos + 1), resultRow)
         }
         pos = nextSlot(pos)
-        numProbes += 1
       }
     }
     null
@@ -528,8 +512,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
    */
   def get(key: Long, resultRow: UnsafeRow): Iterator[UnsafeRow] = {
     if (isDense) {
-      numKeyLookups += 1
-      numProbes += 1
       if (key >= minKey && key <= maxKey) {
         val value = array((key - minKey).toInt)
         if (value > 0) {
@@ -538,14 +520,11 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
       }
     } else {
       var pos = firstSlot(key)
-      numKeyLookups += 1
-      numProbes += 1
       while (array(pos + 1) != 0) {
         if (array(pos) == key) {
           return valueIter(array(pos + 1), resultRow)
         }
         pos = nextSlot(pos)
-        numProbes += 1
       }
     }
     null
@@ -585,11 +564,8 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
   private def updateIndex(key: Long, address: Long): Unit = {
     var pos = firstSlot(key)
     assert(numKeys < array.length / 2)
-    numKeyLookups += 1
-    numProbes += 1
     while (array(pos) != key && array(pos + 1) != 0) {
       pos = nextSlot(pos)
-      numProbes += 1
     }
     if (array(pos + 1) == 0) {
       // this is the first value for this key, put the address in array.
@@ -721,8 +697,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
     writeLong(maxKey)
     writeLong(numKeys)
     writeLong(numValues)
-    writeLong(numKeyLookups)
-    writeLong(numProbes)
 
     writeLong(array.length)
     writeLongArray(writeBuffer, array, array.length)
@@ -764,8 +738,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
     maxKey = readLong()
     numKeys = readLong()
     numValues = readLong()
-    numKeyLookups = readLong()
-    numProbes = readLong()
 
     val length = readLong().toInt
     mask = length - 2
@@ -783,11 +755,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: 
TaskMemoryManager, cap
   override def read(kryo: Kryo, in: Input): Unit = {
     read(() => in.readBoolean(), () => in.readLong(), in.readBytes)
   }
-
-  /**
-   * Returns the average number of probes per key lookup.
-   */
-  def getAverageProbesPerLookup: Double = numProbes.toDouble / numKeyLookups
 }
 
 private[joins] class LongHashedRelation(
@@ -839,8 +806,6 @@ private[joins] class LongHashedRelation(
     resultRow = new UnsafeRow(nFields)
     map = in.readObject().asInstanceOf[LongToUnsafeRowMap]
   }
-
-  override def getAverageProbesPerLookup: Double = 
map.getAverageProbesPerLookup
 }
 
 /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
index 2b59ed6e4d16b..524804d61e599 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
@@ -42,8 +42,7 @@ case class ShuffledHashJoinExec(
   override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
     "buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size of 
build side"),
-    "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build 
hash map"),
-    "avgHashProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hash 
probe"))
+    "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build 
hash map"))
 
   override def requiredChildDistribution: Seq[Distribution] =
     HashClusteredDistribution(leftKeys) :: 
HashClusteredDistribution(rightKeys) :: Nil
@@ -63,10 +62,9 @@ case class ShuffledHashJoinExec(
 
   protected override def doExecute(): RDD[InternalRow] = {
     val numOutputRows = longMetric("numOutputRows")
-    val avgHashProbe = longMetric("avgHashProbe")
     streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, 
buildIter) =>
       val hashed = buildHashedRelation(buildIter)
-      join(streamIter, hashed, numOutputRows, avgHashProbe)
+      join(streamIter, hashed, numOutputRows)
     }
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 2251607e76af8..9ed760281dada 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -261,50 +261,6 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
     )
   }
 
-  test("BroadcastHashJoin metrics: track avg probe") {
-    // The executed plan looks like:
-    // Project [a#210, b#211, b#221]
-    // +- BroadcastHashJoin [a#210], [a#220], Inner, BuildRight
-    //    :- Project [_1#207 AS a#210, _2#208 AS b#211]
-    //    :  +- Filter isnotnull(_1#207)
-    //    :     +- LocalTableScan [_1#207, _2#208]
-    //    +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
binary, true]))
-    //       +- Project [_1#217 AS a#220, _2#218 AS b#221]
-    //          +- Filter isnotnull(_1#217)
-    //             +- LocalTableScan [_1#217, _2#218]
-    //
-    // Assume the execution plan with node id is
-    // WholeStageCodegen disabled:
-    // Project(nodeId = 0)
-    //   BroadcastHashJoin(nodeId = 1)
-    //     ...(ignored)
-    //
-    // WholeStageCodegen enabled:
-    // WholeStageCodegen(nodeId = 0)
-    //   Project(nodeId = 1)
-    //     BroadcastHashJoin(nodeId = 2)
-    //       Project(nodeId = 3)
-    //         Filter(nodeId = 4)
-    //           ...(ignored)
-    Seq(true, false).foreach { enableWholeStage =>
-      val df1 = generateRandomBytesDF()
-      val df2 = generateRandomBytesDF()
-      val df = df1.join(broadcast(df2), "a")
-      val nodeIds = if (enableWholeStage) {
-        Set(2L)
-      } else {
-        Set(1L)
-      }
-      val metrics = getSparkPlanMetrics(df, 2, nodeIds, enableWholeStage).get
-      nodeIds.foreach { nodeId =>
-        val probes = metrics(nodeId)._2("avg hash probe (min, med, max)")
-        probes.toString.stripPrefix("\n(").stripSuffix(")").split(", 
").foreach { probe =>
-          assert(probe.toDouble > 1.0)
-        }
-      }
-    }
-  }
-
   test("ShuffledHashJoin metrics") {
     withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "40",
         "spark.sql.shuffle.partitions" -> "2",
@@ -323,8 +279,7 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
       val df = df1.join(df2, "key")
       testSparkPlanMetrics(df, 1, Map(
         1L -> (("ShuffledHashJoin", Map(
-          "number of output rows" -> 2L,
-          "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))),
+          "number of output rows" -> 2L))),
         2L -> (("Exchange", Map(
           "shuffle records written" -> 2L,
           "records read" -> 2L))),
@@ -335,53 +290,6 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
     }
   }
 
-  test("ShuffledHashJoin metrics: track avg probe") {
-    // The executed plan looks like:
-    // Project [a#308, b#309, b#319]
-    // +- ShuffledHashJoin [a#308], [a#318], Inner, BuildRight
-    //    :- Exchange hashpartitioning(a#308, 2)
-    //    :  +- Project [_1#305 AS a#308, _2#306 AS b#309]
-    //    :     +- Filter isnotnull(_1#305)
-    //    :        +- LocalTableScan [_1#305, _2#306]
-    //    +- Exchange hashpartitioning(a#318, 2)
-    //       +- Project [_1#315 AS a#318, _2#316 AS b#319]
-    //          +- Filter isnotnull(_1#315)
-    //             +- LocalTableScan [_1#315, _2#316]
-    //
-    // Assume the execution plan with node id is
-    // WholeStageCodegen disabled:
-    // Project(nodeId = 0)
-    //   ShuffledHashJoin(nodeId = 1)
-    //     ...(ignored)
-    //
-    // WholeStageCodegen enabled:
-    // WholeStageCodegen(nodeId = 0)
-    //   Project(nodeId = 1)
-    //     ShuffledHashJoin(nodeId = 2)
-    //       ...(ignored)
-    withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "5000000",
-        "spark.sql.shuffle.partitions" -> "2",
-        "spark.sql.join.preferSortMergeJoin" -> "false") {
-      Seq(true, false).foreach { enableWholeStage =>
-        val df1 = generateRandomBytesDF(65535 * 5)
-        val df2 = generateRandomBytesDF(65535)
-        val df = df1.join(df2, "a")
-        val nodeIds = if (enableWholeStage) {
-          Set(2L)
-        } else {
-          Set(1L)
-        }
-        val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get
-        nodeIds.foreach { nodeId =>
-          val probes = metrics(nodeId)._2("avg hash probe (min, med, max)")
-          probes.toString.stripPrefix("\n(").stripSuffix(")").split(", 
").foreach { probe =>
-            assert(probe.toDouble > 1.0)
-          }
-        }
-      }
-    }
-  }
-
   test("BroadcastHashJoin(outer) metrics") {
     val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
     val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Because of the perf degradation in TPC-DS, we currently partial revert 
> SPARK-21052:Add hash map metrics to join,
> ----------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-26316
>                 URL: https://issues.apache.org/jira/browse/SPARK-26316
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.3.0, 2.3.1, 2.3.2, 2.4.0
>            Reporter: Ke Jia
>            Priority: Major
>             Fix For: 3.0.0
>
>
> The code of  
> [L486|https://github.com/apache/spark/blob/1d3dd58d21400b5652b75af7e7e53aad85a31528/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L486]
>  and 
> [L487|https://github.com/apache/spark/blob/1d3dd58d21400b5652b75af7e7e53aad85a31528/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L487]
>  in  SPARK-21052 cause performance degradation in spark2.3. The result of  
> all queries in TPC-DS with 1TB is in [TPC-DS 
> result|https://docs.google.com/spreadsheets/d/18a5BdOlmm8euTaRodyeWum9yu92mbWWu6JbhGXtr7yE/edit#gid=0]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to