Repository: spark
Updated Branches:
  refs/heads/branch-1.5 71460b889 -> 767ee1884
diff --git 
index 953284c..7383d3f 100644
@@ -25,15 +25,24 @@ import
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.ui.SparkPlanGraph
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.{SQLTestUtils, TestSQLContext}
 import org.apache.spark.util.Utils
+class SQLMetricsSuite extends SparkFunSuite with SQLTestUtils {
-class SQLMetricsSuite extends SparkFunSuite {
+  override val sqlContext = TestSQLContext
+  import sqlContext.implicits._
   test("LongSQLMetric should not box Long") {
     val l = SQLMetrics.createLongMetric(TestSQLContext.sparkContext, "long")
-    val f = () => { l += 1L }
+    val f = () => {
+      l += 1L
+      l.add(1L)
+    }
     BoxingFinder.getClassReader(f.getClass).foreach { cl =>
       val boxingFinder = new BoxingFinder()
       cl.accept(boxingFinder, 0)
@@ -51,6 +60,441 @@ class SQLMetricsSuite extends SparkFunSuite {
       assert(boxingFinder.boxingInvokes.nonEmpty, "Found find boxing in this 
+  /**
+   * Call `df.collect()` and verify if the collected metrics are same as 
+   *
+   * @param df `DataFrame` to run
+   * @param expectedNumOfJobs number of jobs that will run
+   * @param expectedMetrics the expected metrics. The format is
+   *                        `nodeId -> (operatorName, metric name -> metric 
+   */
+  private def testSparkPlanMetrics(
+      df: DataFrame,
+      expectedNumOfJobs: Int,
+      expectedMetrics: Map[Long, (String, Map[String, Any])]): Unit = {
+    val previousExecutionIds = TestSQLContext.listener.executionIdToData.keySet
+    df.collect()
+    TestSQLContext.sparkContext.listenerBus.waitUntilEmpty(10000)
+    val executionIds = 
+    assert(executionIds.size === 1)
+    val executionId = executionIds.head
+    val jobs = TestSQLContext.listener.getExecution(executionId)
+    // Use "<=" because there is a race condition that we may miss some jobs
+    // TODO Change it to "=" once we fix the race condition that missing the 
JobStarted event.
+    assert(jobs.size <= expectedNumOfJobs)
+    if (jobs.size == expectedNumOfJobs) {
+      // If we can track all jobs, check the metric values
+      val metricValues = 
+      val actualMetrics = 
SparkPlanGraph(df.queryExecution.executedPlan).nodes.filter { node =>
+        expectedMetrics.contains(
+      }.map { node =>
+        val nodeMetrics = { metric =>
+          val metricValue = metricValues(metric.accumulatorId)
+          (, metricValue)
+        }.toMap
+        (, -> nodeMetrics)
+      }.toMap
+      assert(expectedMetrics === actualMetrics)
+    } else {
+      // TODO Remove this "else" once we fix the race condition that missing 
the JobStarted event.
+      // Since we cannot track all jobs, the metric values could be wrong and 
we should not check
+      // them.
+      logWarning("Due to a race condition, we miss some jobs and cannot verify 
the metric values")
+    }
+  }
+  test("Project metrics") {
+    withSQLConf(
+      SQLConf.UNSAFE_ENABLED.key -> "false",
+      SQLConf.CODEGEN_ENABLED.key -> "false",
+      SQLConf.TUNGSTEN_ENABLED.key -> "false") {
+      // Assume the execution plan is
+      // PhysicalRDD(nodeId = 1) -> Project(nodeId = 0)
+      val df ='name)
+      testSparkPlanMetrics(df, 1, Map(
+        0L ->("Project", Map(
+          "number of rows" -> 2L)))
+      )
+    }
+  }
+  test("TungstenProject metrics") {
+    withSQLConf(
+      SQLConf.UNSAFE_ENABLED.key -> "true",
+      SQLConf.CODEGEN_ENABLED.key -> "true",
+      SQLConf.TUNGSTEN_ENABLED.key -> "true") {
+      // Assume the execution plan is
+      // PhysicalRDD(nodeId = 1) -> TungstenProject(nodeId = 0)
+      val df ='name)
+      testSparkPlanMetrics(df, 1, Map(
+        0L ->("TungstenProject", Map(
+          "number of rows" -> 2L)))
+      )
+    }
+  }
+  test("Filter metrics") {
+    // Assume the execution plan is
+    // PhysicalRDD(nodeId = 1) -> Filter(nodeId = 0)
+    val df = TestData.person.filter('age < 25)
+    testSparkPlanMetrics(df, 1, Map(
+      0L -> ("Filter", Map(
+        "number of input rows" -> 2L,
+        "number of output rows" -> 1L)))
+    )
+  }
+  test("Aggregate metrics") {
+    withSQLConf(
+      SQLConf.UNSAFE_ENABLED.key -> "false",
+      SQLConf.CODEGEN_ENABLED.key -> "false",
+      SQLConf.TUNGSTEN_ENABLED.key -> "false") {
+      // Assume the execution plan is
+      // ... -> Aggregate(nodeId = 2) -> TungstenExchange(nodeId = 1) -> 
Aggregate(nodeId = 0)
+      val df = TestData.testData2.groupBy().count() // 2 partitions
+      testSparkPlanMetrics(df, 1, Map(
+        2L -> ("Aggregate", Map(
+          "number of input rows" -> 6L,
+          "number of output rows" -> 2L)),
+        0L -> ("Aggregate", Map(
+          "number of input rows" -> 2L,
+          "number of output rows" -> 1L)))
+      )
+      // 2 partitions and each partition contains 2 keys
+      val df2 = TestData.testData2.groupBy('a).count()
+      testSparkPlanMetrics(df2, 1, Map(
+        2L -> ("Aggregate", Map(
+          "number of input rows" -> 6L,
+          "number of output rows" -> 4L)),
+        0L -> ("Aggregate", Map(
+          "number of input rows" -> 4L,
+          "number of output rows" -> 3L)))
+      )
+    }
+  }
+  test("SortBasedAggregate metrics") {
+    // Because SortBasedAggregate may skip different rows if the number of 
partitions is different,
+    // this test should use the deterministic number of partitions.
+    withSQLConf(
+      SQLConf.UNSAFE_ENABLED.key -> "false",
+      SQLConf.CODEGEN_ENABLED.key -> "true",
+      SQLConf.TUNGSTEN_ENABLED.key -> "true") {
+      // Assume the execution plan is
+      // ... -> SortBasedAggregate(nodeId = 2) -> TungstenExchange(nodeId = 1) 
+      // SortBasedAggregate(nodeId = 0)
+      val df = TestData.testData2.groupBy().count() // 2 partitions
+      testSparkPlanMetrics(df, 1, Map(
+        2L -> ("SortBasedAggregate", Map(
+          "number of input rows" -> 6L,
+          "number of output rows" -> 2L)),
+        0L -> ("SortBasedAggregate", Map(
+          "number of input rows" -> 2L,
+          "number of output rows" -> 1L)))
+      )
+      // Assume the execution plan is
+      // ... -> SortBasedAggregate(nodeId = 3) -> TungstenExchange(nodeId = 2)
+      // -> ExternalSort(nodeId = 1)-> SortBasedAggregate(nodeId = 0)
+      // 2 partitions and each partition contains 2 keys
+      val df2 = TestData.testData2.groupBy('a).count()
+      testSparkPlanMetrics(df2, 1, Map(
+        3L -> ("SortBasedAggregate", Map(
+          "number of input rows" -> 6L,
+          "number of output rows" -> 4L)),
+        0L -> ("SortBasedAggregate", Map(
+          "number of input rows" -> 4L,
+          "number of output rows" -> 3L)))
+      )
+    }
+  }
+  test("TungstenAggregate metrics") {
+    withSQLConf(
+      SQLConf.UNSAFE_ENABLED.key -> "true",
+      SQLConf.CODEGEN_ENABLED.key -> "true",
+      SQLConf.TUNGSTEN_ENABLED.key -> "true") {
+      // Assume the execution plan is
+      // ... -> TungstenAggregate(nodeId = 2) -> Exchange(nodeId = 1)
+      // -> TungstenAggregate(nodeId = 0)
+      val df = TestData.testData2.groupBy().count() // 2 partitions
+      testSparkPlanMetrics(df, 1, Map(
+        2L -> ("TungstenAggregate", Map(
+          "number of input rows" -> 6L,
+          "number of output rows" -> 2L)),
+        0L -> ("TungstenAggregate", Map(
+          "number of input rows" -> 2L,
+          "number of output rows" -> 1L)))
+      )
+      // 2 partitions and each partition contains 2 keys
+      val df2 = TestData.testData2.groupBy('a).count()
+      testSparkPlanMetrics(df2, 1, Map(
+        2L -> ("TungstenAggregate", Map(
+          "number of input rows" -> 6L,
+          "number of output rows" -> 4L)),
+        0L -> ("TungstenAggregate", Map(
+          "number of input rows" -> 4L,
+          "number of output rows" -> 3L)))
+      )
+    }
+  }
+  test("SortMergeJoin metrics") {
+    // Because SortMergeJoin may skip different rows if the number of 
partitions is different, this
+    // test should use the deterministic number of partitions.
+    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
+      val testDataForJoin = TestData.testData2.filter('a < 2) // TestData2(1, 
1) :: TestData2(1, 2)
+      testDataForJoin.registerTempTable("testDataForJoin")
+      withTempTable("testDataForJoin") {
+        // Assume the execution plan is
+        // ... -> SortMergeJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+        val df = sqlContext.sql(
+          "SELECT * FROM testData2 JOIN testDataForJoin ON testData2.a = 
+        testSparkPlanMetrics(df, 1, Map(
+          1L -> ("SortMergeJoin", Map(
+            // It's 4 because we only read 3 rows in the first partition and 1 
row in the second one
+            "number of left rows" -> 4L,
+            "number of right rows" -> 2L,
+            "number of output rows" -> 4L)))
+        )
+      }
+    }
+  }
+  test("SortMergeOuterJoin metrics") {
+    // Because SortMergeOuterJoin may skip different rows if the number of 
partitions is different,
+    // this test should use the deterministic number of partitions.
+    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
+      val testDataForJoin = TestData.testData2.filter('a < 2) // TestData2(1, 
1) :: TestData2(1, 2)
+      testDataForJoin.registerTempTable("testDataForJoin")
+      withTempTable("testDataForJoin") {
+        // Assume the execution plan is
+        // ... -> SortMergeOuterJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+        val df = sqlContext.sql(
+          "SELECT * FROM testData2 left JOIN testDataForJoin ON testData2.a = 
+        testSparkPlanMetrics(df, 1, Map(
+          1L -> ("SortMergeOuterJoin", Map(
+            // It's 4 because we only read 3 rows in the first partition and 1 
row in the second one
+            "number of left rows" -> 6L,
+            "number of right rows" -> 2L,
+            "number of output rows" -> 8L)))
+        )
+        val df2 = sqlContext.sql(
+          "SELECT * FROM testDataForJoin right JOIN testData2 ON testData2.a = 
+        testSparkPlanMetrics(df2, 1, Map(
+          1L -> ("SortMergeOuterJoin", Map(
+            // It's 4 because we only read 3 rows in the first partition and 1 
row in the second one
+            "number of left rows" -> 2L,
+            "number of right rows" -> 6L,
+            "number of output rows" -> 8L)))
+        )
+      }
+    }
+  }
+  test("BroadcastHashJoin metrics") {
+    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
+      val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+      val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key", 
+      // Assume the execution plan is
+      // ... -> BroadcastHashJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+      val df = df1.join(broadcast(df2), "key")
+      testSparkPlanMetrics(df, 2, Map(
+        1L -> ("BroadcastHashJoin", Map(
+          "number of left rows" -> 2L,
+          "number of right rows" -> 4L,
+          "number of output rows" -> 2L)))
+      )
+    }
+  }
+  test("ShuffledHashJoin metrics") {
+    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
+      val testDataForJoin = TestData.testData2.filter('a < 2) // TestData2(1, 
1) :: TestData2(1, 2)
+      testDataForJoin.registerTempTable("testDataForJoin")
+      withTempTable("testDataForJoin") {
+        // Assume the execution plan is
+        // ... -> ShuffledHashJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+        val df = sqlContext.sql(
+          "SELECT * FROM testData2 JOIN testDataForJoin ON testData2.a = 
+        testSparkPlanMetrics(df, 1, Map(
+          1L -> ("ShuffledHashJoin", Map(
+            "number of left rows" -> 6L,
+            "number of right rows" -> 2L,
+            "number of output rows" -> 4L)))
+        )
+      }
+    }
+  }
+  test("ShuffledHashOuterJoin metrics") {
+    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false",
+      val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
+      val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", 
+      // Assume the execution plan is
+      // ... -> ShuffledHashOuterJoin(nodeId = 0)
+      val df = df1.join(df2, $"key" === $"key2", "left_outer")
+      testSparkPlanMetrics(df, 1, Map(
+        0L -> ("ShuffledHashOuterJoin", Map(
+          "number of left rows" -> 3L,
+          "number of right rows" -> 4L,
+          "number of output rows" -> 5L)))
+      )
+      val df3 = df1.join(df2, $"key" === $"key2", "right_outer")
+      testSparkPlanMetrics(df3, 1, Map(
+        0L -> ("ShuffledHashOuterJoin", Map(
+          "number of left rows" -> 3L,
+          "number of right rows" -> 4L,
+          "number of output rows" -> 6L)))
+      )
+      val df4 = df1.join(df2, $"key" === $"key2", "outer")
+      testSparkPlanMetrics(df4, 1, Map(
+        0L -> ("ShuffledHashOuterJoin", Map(
+          "number of left rows" -> 3L,
+          "number of right rows" -> 4L,
+          "number of output rows" -> 7L)))
+      )
+    }
+  }
+  test("BroadcastHashOuterJoin metrics") {
+    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
+      val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
+      val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", 
+      // Assume the execution plan is
+      // ... -> BroadcastHashOuterJoin(nodeId = 0)
+      val df = df1.join(broadcast(df2), $"key" === $"key2", "left_outer")
+      testSparkPlanMetrics(df, 2, Map(
+        0L -> ("BroadcastHashOuterJoin", Map(
+          "number of left rows" -> 3L,
+          "number of right rows" -> 4L,
+          "number of output rows" -> 5L)))
+      )
+      val df3 = df1.join(broadcast(df2), $"key" === $"key2", "right_outer")
+      testSparkPlanMetrics(df3, 2, Map(
+        0L -> ("BroadcastHashOuterJoin", Map(
+          "number of left rows" -> 3L,
+          "number of right rows" -> 4L,
+          "number of output rows" -> 6L)))
+      )
+    }
+  }
+  test("BroadcastNestedLoopJoin metrics") {
+    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
+      val testDataForJoin = TestData.testData2.filter('a < 2) // TestData2(1, 
1) :: TestData2(1, 2)
+      testDataForJoin.registerTempTable("testDataForJoin")
+      withTempTable("testDataForJoin") {
+        // Assume the execution plan is
+        // ... -> BroadcastNestedLoopJoin(nodeId = 1) -> 
TungstenProject(nodeId = 0)
+        val df = sqlContext.sql(
+          "SELECT * FROM testData2 left JOIN testDataForJoin ON " +
+            "testData2.a * testDataForJoin.a != testData2.a + 
+        testSparkPlanMetrics(df, 3, Map(
+          1L -> ("BroadcastNestedLoopJoin", Map(
+            "number of left rows" -> 12L, // left needs to be scanned twice
+            "number of right rows" -> 2L,
+            "number of output rows" -> 12L)))
+        )
+      }
+    }
+  }
+  test("BroadcastLeftSemiJoinHash metrics") {
+    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
+      val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+      val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", 
+      // Assume the execution plan is
+      // ... -> BroadcastLeftSemiJoinHash(nodeId = 0)
+      val df = df1.join(broadcast(df2), $"key" === $"key2", "leftsemi")
+      testSparkPlanMetrics(df, 2, Map(
+        0L -> ("BroadcastLeftSemiJoinHash", Map(
+          "number of left rows" -> 2L,
+          "number of right rows" -> 4L,
+          "number of output rows" -> 2L)))
+      )
+    }
+  }
+  test("LeftSemiJoinHash metrics") {
+    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true",
+      val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+      val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", 
+      // Assume the execution plan is
+      // ... -> LeftSemiJoinHash(nodeId = 0)
+      val df = df1.join(df2, $"key" === $"key2", "leftsemi")
+      testSparkPlanMetrics(df, 1, Map(
+        0L -> ("LeftSemiJoinHash", Map(
+          "number of left rows" -> 2L,
+          "number of right rows" -> 4L,
+          "number of output rows" -> 2L)))
+      )
+    }
+  }
+  test("LeftSemiJoinBNL metrics") {
+    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
+      val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+      val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", 
+      // Assume the execution plan is
+      // ... -> LeftSemiJoinBNL(nodeId = 0)
+      val df = df1.join(df2, $"key" < $"key2", "leftsemi")
+      testSparkPlanMetrics(df, 2, Map(
+        0L -> ("LeftSemiJoinBNL", Map(
+          "number of left rows" -> 2L,
+          "number of right rows" -> 4L,
+          "number of output rows" -> 2L)))
+      )
+    }
+  }
+  test("CartesianProduct metrics") {
+    val testDataForJoin = TestData.testData2.filter('a < 2) // TestData2(1, 1) 
:: TestData2(1, 2)
+    testDataForJoin.registerTempTable("testDataForJoin")
+    withTempTable("testDataForJoin") {
+      // Assume the execution plan is
+      // ... -> CartesianProduct(nodeId = 1) -> TungstenProject(nodeId = 0)
+      val df = sqlContext.sql(
+        "SELECT * FROM testData2 JOIN testDataForJoin")
+      testSparkPlanMetrics(df, 1, Map(
+        1L -> ("CartesianProduct", Map(
+          "number of left rows" -> 12L, // left needs to be scanned twice
+          "number of right rows" -> 12L, // right is read 6 times
+          "number of output rows" -> 12L)))
+      )
+    }
+  }
+  test("save metrics") {
+    withTempPath { file =>
+      val previousExecutionIds = 
+      // Assume the execution plan is
+      // PhysicalRDD(nodeId = 0)
+      TestSQLContext.sparkContext.listenerBus.waitUntilEmpty(10000)
+      val executionIds = 
+      assert(executionIds.size === 1)
+      val executionId = executionIds.head
+      val jobs = TestSQLContext.listener.getExecution(executionId)
+      // Use "<=" because there is a race condition that we may miss some jobs
+      // TODO Change "<=" to "=" once we fix the race condition that missing 
the JobStarted event.
+      assert(jobs.size <= 1)
+      val metricValues = 
+      // Because "save" will create a new DataFrame internally, we cannot get 
the real metric id.
+      // However, we still can check the value.
+      assert(metricValues.values.toSeq === Seq(2L))
+    }
+  }
 private case class MethodIdentifier[T](cls: Class[T], name: String, desc: 

To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to