Repository: spark
Updated Branches:
  refs/heads/branch-1.5 71460b889 -> 767ee1884


http://git-wip-us.apache.org/repos/asf/spark/blob/767ee188/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 953284c..7383d3f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -25,15 +25,24 @@ import 
com.esotericsoftware.reflectasm.shaded.org.objectweb.asm._
 import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql._
+import org.apache.spark.sql.execution.ui.SparkPlanGraph
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.{SQLTestUtils, TestSQLContext}
 import org.apache.spark.util.Utils
 
+class SQLMetricsSuite extends SparkFunSuite with SQLTestUtils {
 
-class SQLMetricsSuite extends SparkFunSuite {
+  override val sqlContext = TestSQLContext
+
+  import sqlContext.implicits._
 
   test("LongSQLMetric should not box Long") {
     val l = SQLMetrics.createLongMetric(TestSQLContext.sparkContext, "long")
-    val f = () => { l += 1L }
+    val f = () => {
+      l += 1L
+      l.add(1L)
+    }
     BoxingFinder.getClassReader(f.getClass).foreach { cl =>
       val boxingFinder = new BoxingFinder()
       cl.accept(boxingFinder, 0)
@@ -51,6 +60,441 @@ class SQLMetricsSuite extends SparkFunSuite {
       assert(boxingFinder.boxingInvokes.nonEmpty, "Found find boxing in this 
test")
     }
   }
+
+  /**
+   * Call `df.collect()` and verify if the collected metrics are same as 
"expectedMetrics".
+   *
+   * @param df `DataFrame` to run
+   * @param expectedNumOfJobs number of jobs that will run
+   * @param expectedMetrics the expected metrics. The format is
+   *                        `nodeId -> (operatorName, metric name -> metric 
value)`.
+   */
+  private def testSparkPlanMetrics(
+      df: DataFrame,
+      expectedNumOfJobs: Int,
+      expectedMetrics: Map[Long, (String, Map[String, Any])]): Unit = {
+    val previousExecutionIds = TestSQLContext.listener.executionIdToData.keySet
+    df.collect()
+    TestSQLContext.sparkContext.listenerBus.waitUntilEmpty(10000)
+    val executionIds = 
TestSQLContext.listener.executionIdToData.keySet.diff(previousExecutionIds)
+    assert(executionIds.size === 1)
+    val executionId = executionIds.head
+    val jobs = TestSQLContext.listener.getExecution(executionId).get.jobs
+    // Use "<=" because there is a race condition that we may miss some jobs
+    // TODO Change it to "=" once we fix the race condition that missing the 
JobStarted event.
+    assert(jobs.size <= expectedNumOfJobs)
+    if (jobs.size == expectedNumOfJobs) {
+      // If we can track all jobs, check the metric values
+      val metricValues = 
TestSQLContext.listener.getExecutionMetrics(executionId)
+      val actualMetrics = 
SparkPlanGraph(df.queryExecution.executedPlan).nodes.filter { node =>
+        expectedMetrics.contains(node.id)
+      }.map { node =>
+        val nodeMetrics = node.metrics.map { metric =>
+          val metricValue = metricValues(metric.accumulatorId)
+          (metric.name, metricValue)
+        }.toMap
+        (node.id, node.name -> nodeMetrics)
+      }.toMap
+      assert(expectedMetrics === actualMetrics)
+    } else {
+      // TODO Remove this "else" once we fix the race condition that missing 
the JobStarted event.
+      // Since we cannot track all jobs, the metric values could be wrong and 
we should not check
+      // them.
+      logWarning("Due to a race condition, we miss some jobs and cannot verify 
the metric values")
+    }
+  }
+
+  test("Project metrics") {
+    withSQLConf(
+      SQLConf.UNSAFE_ENABLED.key -> "false",
+      SQLConf.CODEGEN_ENABLED.key -> "false",
+      SQLConf.TUNGSTEN_ENABLED.key -> "false") {
+      // Assume the execution plan is
+      // PhysicalRDD(nodeId = 1) -> Project(nodeId = 0)
+      val df = TestData.person.select('name)
+      testSparkPlanMetrics(df, 1, Map(
+        0L ->("Project", Map(
+          "number of rows" -> 2L)))
+      )
+    }
+  }
+
+  test("TungstenProject metrics") {
+    withSQLConf(
+      SQLConf.UNSAFE_ENABLED.key -> "true",
+      SQLConf.CODEGEN_ENABLED.key -> "true",
+      SQLConf.TUNGSTEN_ENABLED.key -> "true") {
+      // Assume the execution plan is
+      // PhysicalRDD(nodeId = 1) -> TungstenProject(nodeId = 0)
+      val df = TestData.person.select('name)
+      testSparkPlanMetrics(df, 1, Map(
+        0L ->("TungstenProject", Map(
+          "number of rows" -> 2L)))
+      )
+    }
+  }
+
+  test("Filter metrics") {
+    // Assume the execution plan is
+    // PhysicalRDD(nodeId = 1) -> Filter(nodeId = 0)
+    val df = TestData.person.filter('age < 25)
+    testSparkPlanMetrics(df, 1, Map(
+      0L -> ("Filter", Map(
+        "number of input rows" -> 2L,
+        "number of output rows" -> 1L)))
+    )
+  }
+
+  test("Aggregate metrics") {
+    withSQLConf(
+      SQLConf.UNSAFE_ENABLED.key -> "false",
+      SQLConf.CODEGEN_ENABLED.key -> "false",
+      SQLConf.TUNGSTEN_ENABLED.key -> "false") {
+      // Assume the execution plan is
+      // ... -> Aggregate(nodeId = 2) -> TungstenExchange(nodeId = 1) -> 
Aggregate(nodeId = 0)
+      val df = TestData.testData2.groupBy().count() // 2 partitions
+      testSparkPlanMetrics(df, 1, Map(
+        2L -> ("Aggregate", Map(
+          "number of input rows" -> 6L,
+          "number of output rows" -> 2L)),
+        0L -> ("Aggregate", Map(
+          "number of input rows" -> 2L,
+          "number of output rows" -> 1L)))
+      )
+
+      // 2 partitions and each partition contains 2 keys
+      val df2 = TestData.testData2.groupBy('a).count()
+      testSparkPlanMetrics(df2, 1, Map(
+        2L -> ("Aggregate", Map(
+          "number of input rows" -> 6L,
+          "number of output rows" -> 4L)),
+        0L -> ("Aggregate", Map(
+          "number of input rows" -> 4L,
+          "number of output rows" -> 3L)))
+      )
+    }
+  }
+
+  test("SortBasedAggregate metrics") {
+    // Because SortBasedAggregate may skip different rows if the number of 
partitions is different,
+    // this test should use the deterministic number of partitions.
+    withSQLConf(
+      SQLConf.UNSAFE_ENABLED.key -> "false",
+      SQLConf.CODEGEN_ENABLED.key -> "true",
+      SQLConf.TUNGSTEN_ENABLED.key -> "true") {
+      // Assume the execution plan is
+      // ... -> SortBasedAggregate(nodeId = 2) -> TungstenExchange(nodeId = 1) 
->
+      // SortBasedAggregate(nodeId = 0)
+      val df = TestData.testData2.groupBy().count() // 2 partitions
+      testSparkPlanMetrics(df, 1, Map(
+        2L -> ("SortBasedAggregate", Map(
+          "number of input rows" -> 6L,
+          "number of output rows" -> 2L)),
+        0L -> ("SortBasedAggregate", Map(
+          "number of input rows" -> 2L,
+          "number of output rows" -> 1L)))
+      )
+
+      // Assume the execution plan is
+      // ... -> SortBasedAggregate(nodeId = 3) -> TungstenExchange(nodeId = 2)
+      // -> ExternalSort(nodeId = 1)-> SortBasedAggregate(nodeId = 0)
+      // 2 partitions and each partition contains 2 keys
+      val df2 = TestData.testData2.groupBy('a).count()
+      testSparkPlanMetrics(df2, 1, Map(
+        3L -> ("SortBasedAggregate", Map(
+          "number of input rows" -> 6L,
+          "number of output rows" -> 4L)),
+        0L -> ("SortBasedAggregate", Map(
+          "number of input rows" -> 4L,
+          "number of output rows" -> 3L)))
+      )
+    }
+  }
+
+  test("TungstenAggregate metrics") {
+    withSQLConf(
+      SQLConf.UNSAFE_ENABLED.key -> "true",
+      SQLConf.CODEGEN_ENABLED.key -> "true",
+      SQLConf.TUNGSTEN_ENABLED.key -> "true") {
+      // Assume the execution plan is
+      // ... -> TungstenAggregate(nodeId = 2) -> Exchange(nodeId = 1)
+      // -> TungstenAggregate(nodeId = 0)
+      val df = TestData.testData2.groupBy().count() // 2 partitions
+      testSparkPlanMetrics(df, 1, Map(
+        2L -> ("TungstenAggregate", Map(
+          "number of input rows" -> 6L,
+          "number of output rows" -> 2L)),
+        0L -> ("TungstenAggregate", Map(
+          "number of input rows" -> 2L,
+          "number of output rows" -> 1L)))
+      )
+
+      // 2 partitions and each partition contains 2 keys
+      val df2 = TestData.testData2.groupBy('a).count()
+      testSparkPlanMetrics(df2, 1, Map(
+        2L -> ("TungstenAggregate", Map(
+          "number of input rows" -> 6L,
+          "number of output rows" -> 4L)),
+        0L -> ("TungstenAggregate", Map(
+          "number of input rows" -> 4L,
+          "number of output rows" -> 3L)))
+      )
+    }
+  }
+
+  test("SortMergeJoin metrics") {
+    // Because SortMergeJoin may skip different rows if the number of 
partitions is different, this
+    // test should use the deterministic number of partitions.
+    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
+      val testDataForJoin = TestData.testData2.filter('a < 2) // TestData2(1, 
1) :: TestData2(1, 2)
+      testDataForJoin.registerTempTable("testDataForJoin")
+      withTempTable("testDataForJoin") {
+        // Assume the execution plan is
+        // ... -> SortMergeJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+        val df = sqlContext.sql(
+          "SELECT * FROM testData2 JOIN testDataForJoin ON testData2.a = 
testDataForJoin.a")
+        testSparkPlanMetrics(df, 1, Map(
+          1L -> ("SortMergeJoin", Map(
+            // It's 4 because we only read 3 rows in the first partition and 1 
row in the second one
+            "number of left rows" -> 4L,
+            "number of right rows" -> 2L,
+            "number of output rows" -> 4L)))
+        )
+      }
+    }
+  }
+
+  test("SortMergeOuterJoin metrics") {
+    // Because SortMergeOuterJoin may skip different rows if the number of 
partitions is different,
+    // this test should use the deterministic number of partitions.
+    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
+      val testDataForJoin = TestData.testData2.filter('a < 2) // TestData2(1, 
1) :: TestData2(1, 2)
+      testDataForJoin.registerTempTable("testDataForJoin")
+      withTempTable("testDataForJoin") {
+        // Assume the execution plan is
+        // ... -> SortMergeOuterJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+        val df = sqlContext.sql(
+          "SELECT * FROM testData2 left JOIN testDataForJoin ON testData2.a = 
testDataForJoin.a")
+        testSparkPlanMetrics(df, 1, Map(
+          1L -> ("SortMergeOuterJoin", Map(
+            // It's 4 because we only read 3 rows in the first partition and 1 
row in the second one
+            "number of left rows" -> 6L,
+            "number of right rows" -> 2L,
+            "number of output rows" -> 8L)))
+        )
+
+        val df2 = sqlContext.sql(
+          "SELECT * FROM testDataForJoin right JOIN testData2 ON testData2.a = 
testDataForJoin.a")
+        testSparkPlanMetrics(df2, 1, Map(
+          1L -> ("SortMergeOuterJoin", Map(
+            // It's 4 because we only read 3 rows in the first partition and 1 
row in the second one
+            "number of left rows" -> 2L,
+            "number of right rows" -> 6L,
+            "number of output rows" -> 8L)))
+        )
+      }
+    }
+  }
+
+  test("BroadcastHashJoin metrics") {
+    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
+      val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+      val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key", 
"value")
+      // Assume the execution plan is
+      // ... -> BroadcastHashJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+      val df = df1.join(broadcast(df2), "key")
+      testSparkPlanMetrics(df, 2, Map(
+        1L -> ("BroadcastHashJoin", Map(
+          "number of left rows" -> 2L,
+          "number of right rows" -> 4L,
+          "number of output rows" -> 2L)))
+      )
+    }
+  }
+
+  test("ShuffledHashJoin metrics") {
+    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
+      val testDataForJoin = TestData.testData2.filter('a < 2) // TestData2(1, 
1) :: TestData2(1, 2)
+      testDataForJoin.registerTempTable("testDataForJoin")
+      withTempTable("testDataForJoin") {
+        // Assume the execution plan is
+        // ... -> ShuffledHashJoin(nodeId = 1) -> TungstenProject(nodeId = 0)
+        val df = sqlContext.sql(
+          "SELECT * FROM testData2 JOIN testDataForJoin ON testData2.a = 
testDataForJoin.a")
+        testSparkPlanMetrics(df, 1, Map(
+          1L -> ("ShuffledHashJoin", Map(
+            "number of left rows" -> 6L,
+            "number of right rows" -> 2L,
+            "number of output rows" -> 4L)))
+        )
+      }
+    }
+  }
+
+  test("ShuffledHashOuterJoin metrics") {
+    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false",
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
+      val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
+      val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", 
"value")
+      // Assume the execution plan is
+      // ... -> ShuffledHashOuterJoin(nodeId = 0)
+      val df = df1.join(df2, $"key" === $"key2", "left_outer")
+      testSparkPlanMetrics(df, 1, Map(
+        0L -> ("ShuffledHashOuterJoin", Map(
+          "number of left rows" -> 3L,
+          "number of right rows" -> 4L,
+          "number of output rows" -> 5L)))
+      )
+
+      val df3 = df1.join(df2, $"key" === $"key2", "right_outer")
+      testSparkPlanMetrics(df3, 1, Map(
+        0L -> ("ShuffledHashOuterJoin", Map(
+          "number of left rows" -> 3L,
+          "number of right rows" -> 4L,
+          "number of output rows" -> 6L)))
+      )
+
+      val df4 = df1.join(df2, $"key" === $"key2", "outer")
+      testSparkPlanMetrics(df4, 1, Map(
+        0L -> ("ShuffledHashOuterJoin", Map(
+          "number of left rows" -> 3L,
+          "number of right rows" -> 4L,
+          "number of output rows" -> 7L)))
+      )
+    }
+  }
+
+  test("BroadcastHashOuterJoin metrics") {
+    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
+      val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
+      val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", 
"value")
+      // Assume the execution plan is
+      // ... -> BroadcastHashOuterJoin(nodeId = 0)
+      val df = df1.join(broadcast(df2), $"key" === $"key2", "left_outer")
+      testSparkPlanMetrics(df, 2, Map(
+        0L -> ("BroadcastHashOuterJoin", Map(
+          "number of left rows" -> 3L,
+          "number of right rows" -> 4L,
+          "number of output rows" -> 5L)))
+      )
+
+      val df3 = df1.join(broadcast(df2), $"key" === $"key2", "right_outer")
+      testSparkPlanMetrics(df3, 2, Map(
+        0L -> ("BroadcastHashOuterJoin", Map(
+          "number of left rows" -> 3L,
+          "number of right rows" -> 4L,
+          "number of output rows" -> 6L)))
+      )
+    }
+  }
+
+  test("BroadcastNestedLoopJoin metrics") {
+    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true") {
+      val testDataForJoin = TestData.testData2.filter('a < 2) // TestData2(1, 
1) :: TestData2(1, 2)
+      testDataForJoin.registerTempTable("testDataForJoin")
+      withTempTable("testDataForJoin") {
+        // Assume the execution plan is
+        // ... -> BroadcastNestedLoopJoin(nodeId = 1) -> 
TungstenProject(nodeId = 0)
+        val df = sqlContext.sql(
+          "SELECT * FROM testData2 left JOIN testDataForJoin ON " +
+            "testData2.a * testDataForJoin.a != testData2.a + 
testDataForJoin.a")
+        testSparkPlanMetrics(df, 3, Map(
+          1L -> ("BroadcastNestedLoopJoin", Map(
+            "number of left rows" -> 12L, // left needs to be scanned twice
+            "number of right rows" -> 2L,
+            "number of output rows" -> 12L)))
+        )
+      }
+    }
+  }
+
+  test("BroadcastLeftSemiJoinHash metrics") {
+    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
+      val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+      val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", 
"value")
+      // Assume the execution plan is
+      // ... -> BroadcastLeftSemiJoinHash(nodeId = 0)
+      val df = df1.join(broadcast(df2), $"key" === $"key2", "leftsemi")
+      testSparkPlanMetrics(df, 2, Map(
+        0L -> ("BroadcastLeftSemiJoinHash", Map(
+          "number of left rows" -> 2L,
+          "number of right rows" -> 4L,
+          "number of output rows" -> 2L)))
+      )
+    }
+  }
+
+  test("LeftSemiJoinHash metrics") {
+    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "true",
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") {
+      val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+      val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", 
"value")
+      // Assume the execution plan is
+      // ... -> LeftSemiJoinHash(nodeId = 0)
+      val df = df1.join(df2, $"key" === $"key2", "leftsemi")
+      testSparkPlanMetrics(df, 1, Map(
+        0L -> ("LeftSemiJoinHash", Map(
+          "number of left rows" -> 2L,
+          "number of right rows" -> 4L,
+          "number of output rows" -> 2L)))
+      )
+    }
+  }
+
+  test("LeftSemiJoinBNL metrics") {
+    withSQLConf(SQLConf.SORTMERGE_JOIN.key -> "false") {
+      val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+      val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", 
"value")
+      // Assume the execution plan is
+      // ... -> LeftSemiJoinBNL(nodeId = 0)
+      val df = df1.join(df2, $"key" < $"key2", "leftsemi")
+      testSparkPlanMetrics(df, 2, Map(
+        0L -> ("LeftSemiJoinBNL", Map(
+          "number of left rows" -> 2L,
+          "number of right rows" -> 4L,
+          "number of output rows" -> 2L)))
+      )
+    }
+  }
+
+  test("CartesianProduct metrics") {
+    val testDataForJoin = TestData.testData2.filter('a < 2) // TestData2(1, 1) 
:: TestData2(1, 2)
+    testDataForJoin.registerTempTable("testDataForJoin")
+    withTempTable("testDataForJoin") {
+      // Assume the execution plan is
+      // ... -> CartesianProduct(nodeId = 1) -> TungstenProject(nodeId = 0)
+      val df = sqlContext.sql(
+        "SELECT * FROM testData2 JOIN testDataForJoin")
+      testSparkPlanMetrics(df, 1, Map(
+        1L -> ("CartesianProduct", Map(
+          "number of left rows" -> 12L, // left needs to be scanned twice
+          "number of right rows" -> 12L, // right is read 6 times
+          "number of output rows" -> 12L)))
+      )
+    }
+  }
+
+  test("save metrics") {
+    withTempPath { file =>
+      val previousExecutionIds = 
TestSQLContext.listener.executionIdToData.keySet
+      // Assume the execution plan is
+      // PhysicalRDD(nodeId = 0)
+      
TestData.person.select('name).write.format("json").save(file.getAbsolutePath)
+      TestSQLContext.sparkContext.listenerBus.waitUntilEmpty(10000)
+      val executionIds = 
TestSQLContext.listener.executionIdToData.keySet.diff(previousExecutionIds)
+      assert(executionIds.size === 1)
+      val executionId = executionIds.head
+      val jobs = TestSQLContext.listener.getExecution(executionId).get.jobs
+      // Use "<=" because there is a race condition that we may miss some jobs
+      // TODO Change "<=" to "=" once we fix the race condition that missing 
the JobStarted event.
+      assert(jobs.size <= 1)
+      val metricValues = 
TestSQLContext.listener.getExecutionMetrics(executionId)
+      // Because "save" will create a new DataFrame internally, we cannot get 
the real metric id.
+      // However, we still can check the value.
+      assert(metricValues.values.toSeq === Seq(2L))
+    }
+  }
+
 }
 
 private case class MethodIdentifier[T](cls: Class[T], name: String, desc: 
String)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to