[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-29 Thread GitBox


sririshindra commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r417569102



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -365,16 +384,19 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
   withTempView("testDataForJoin") {
 // Assume the execution plan is
 // ... -> BroadcastNestedLoopJoin(nodeId = 1) -> 
TungstenProject(nodeId = 0)
-val query = "SELECT * FROM testData2 left JOIN testDataForJoin ON " +
+val leftQuery = "SELECT * FROM testData2 LEFT JOIN testDataForJoin ON 
" +
   "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a"
-Seq(false, true).foreach { enableWholeStage =>
-  val df = spark.sql(query)
-  testSparkPlanMetrics(df, 2, Map(
-0L -> (("BroadcastNestedLoopJoin", Map(
-  "number of output rows" -> 12L,
-enableWholeStage
-  )
-}
+val rightQuery = "SELECT * FROM testData2 RIGHT JOIN testDataForJoin 
ON " +
+  "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a"
+Seq((leftQuery, false), (rightQuery, false), (leftQuery, true), 
(rightQuery, true))
+  .foreach { case (query, enableWholeStage) =>
+val df = spark.sql(query)
+testSparkPlanMetrics(df, 2, Map(
+  0L -> (("BroadcastNestedLoopJoin", Map(
+"number of output rows" -> 12L,
+  enableWholeStage
+)
+  }

Review comment:
   Fixed in latest commit.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-29 Thread GitBox


sririshindra commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r417357078



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -365,16 +384,19 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
   withTempView("testDataForJoin") {
 // Assume the execution plan is
 // ... -> BroadcastNestedLoopJoin(nodeId = 1) -> 
TungstenProject(nodeId = 0)
-val query = "SELECT * FROM testData2 left JOIN testDataForJoin ON " +
+val leftQuery = "SELECT * FROM testData2 LEFT JOIN testDataForJoin ON 
" +
   "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a"
-Seq(false, true).foreach { enableWholeStage =>
-  val df = spark.sql(query)
-  testSparkPlanMetrics(df, 2, Map(
-0L -> (("BroadcastNestedLoopJoin", Map(
-  "number of output rows" -> 12L,
-enableWholeStage
-  )
-}
+val rightQuery = "SELECT * FROM testData2 RIGHT JOIN testDataForJoin 
ON " +
+  "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a"
+Seq((leftQuery, false), (rightQuery, false), (leftQuery, true), 
(rightQuery, true))
+  .foreach { case (query, enableWholeStage) =>
+val df = spark.sql(query)
+testSparkPlanMetrics(df, 2, Map(
+  0L -> (("BroadcastNestedLoopJoin", Map(
+"number of output rows" -> 12L,
+  enableWholeStage
+)
+  }

Review comment:
   I tried reformatting this piece of code using intellij reformat feature 
and that didn't change anything. Could you please let me know where the 
indentation is wrong so that I can fix it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-29 Thread GitBox


sririshindra commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r417357078



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -365,16 +384,19 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
   withTempView("testDataForJoin") {
 // Assume the execution plan is
 // ... -> BroadcastNestedLoopJoin(nodeId = 1) -> 
TungstenProject(nodeId = 0)
-val query = "SELECT * FROM testData2 left JOIN testDataForJoin ON " +
+val leftQuery = "SELECT * FROM testData2 LEFT JOIN testDataForJoin ON 
" +
   "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a"
-Seq(false, true).foreach { enableWholeStage =>
-  val df = spark.sql(query)
-  testSparkPlanMetrics(df, 2, Map(
-0L -> (("BroadcastNestedLoopJoin", Map(
-  "number of output rows" -> 12L,
-enableWholeStage
-  )
-}
+val rightQuery = "SELECT * FROM testData2 RIGHT JOIN testDataForJoin 
ON " +
+  "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a"
+Seq((leftQuery, false), (rightQuery, false), (leftQuery, true), 
(rightQuery, true))
+  .foreach { case (query, enableWholeStage) =>
+val df = spark.sql(query)
+testSparkPlanMetrics(df, 2, Map(
+  0L -> (("BroadcastNestedLoopJoin", Map(
+"number of output rows" -> 12L,
+  enableWholeStage
+)
+  }

Review comment:
   I tried reformatting this piece of code using intellij reformat feature 
and that didn't change anything. Could you please let me know where the 
indentation is wrong so that I can fix it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-28 Thread GitBox


sririshindra commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r416809429



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -325,34 +325,57 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
   // +- LocalTableScan(nodeId = 7)
   Seq((1L, 2L, 5L, false), (2L, 3L, 7L, true)).foreach {
 case (nodeId1, nodeId2, nodeId3, enableWholeStage) =>
-val df = df1.join(df2, "key")
-testSparkPlanMetrics(df, 1, Map(
-  nodeId1 -> (("ShuffledHashJoin", Map(
-"number of output rows" -> 2L))),
-  nodeId2 -> (("Exchange", Map(
-"shuffle records written" -> 2L,
-"records read" -> 2L))),
-  nodeId3 -> (("Exchange", Map(
-"shuffle records written" -> 10L,
-"records read" -> 10L,
-  enableWholeStage
-)
+  val df = df1.join(df2, "key")
+  testSparkPlanMetrics(df, 1, Map(
+nodeId1 -> (("ShuffledHashJoin", Map(
+  "number of output rows" -> 2L))),
+nodeId2 -> (("Exchange", Map(
+  "shuffle records written" -> 2L,
+  "records read" -> 2L))),
+nodeId3 -> (("Exchange", Map(
+  "shuffle records written" -> 10L,
+  "records read" -> 10L,
+enableWholeStage

Review comment:
   I think this indentation is correct. I know it is just a small cosmetic 
change and probably doesn't need to be included in this PR. I will remove it if 
you think this should not be there.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-28 Thread GitBox


sririshindra commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r416806910



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -325,34 +325,57 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
   // +- LocalTableScan(nodeId = 7)
   Seq((1L, 2L, 5L, false), (2L, 3L, 7L, true)).foreach {
 case (nodeId1, nodeId2, nodeId3, enableWholeStage) =>
-val df = df1.join(df2, "key")
-testSparkPlanMetrics(df, 1, Map(
-  nodeId1 -> (("ShuffledHashJoin", Map(
-"number of output rows" -> 2L))),
-  nodeId2 -> (("Exchange", Map(
-"shuffle records written" -> 2L,
-"records read" -> 2L))),
-  nodeId3 -> (("Exchange", Map(
-"shuffle records written" -> 10L,
-"records read" -> 10L,
-  enableWholeStage
-)
+  val df = df1.join(df2, "key")
+  testSparkPlanMetrics(df, 1, Map(
+nodeId1 -> (("ShuffledHashJoin", Map(
+  "number of output rows" -> 2L))),
+nodeId2 -> (("Exchange", Map(
+  "shuffle records written" -> 2L,
+  "records read" -> 2L))),
+nodeId3 -> (("Exchange", Map(
+  "shuffle records written" -> 10L,
+  "records read" -> 10L,
+enableWholeStage
+  )
   }
 }
   }
 
+  test("ShuffledHashJoin(left,outer) metrics") {
+withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "2",

Review comment:
   You are right. I should have removed these in the last commit itself. 
Fixed in the latest commit.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-28 Thread GitBox


sririshindra commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r416807097



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -325,34 +325,57 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
   // +- LocalTableScan(nodeId = 7)
   Seq((1L, 2L, 5L, false), (2L, 3L, 7L, true)).foreach {
 case (nodeId1, nodeId2, nodeId3, enableWholeStage) =>
-val df = df1.join(df2, "key")
-testSparkPlanMetrics(df, 1, Map(
-  nodeId1 -> (("ShuffledHashJoin", Map(
-"number of output rows" -> 2L))),
-  nodeId2 -> (("Exchange", Map(
-"shuffle records written" -> 2L,
-"records read" -> 2L))),
-  nodeId3 -> (("Exchange", Map(
-"shuffle records written" -> 10L,
-"records read" -> 10L,
-  enableWholeStage
-)
+  val df = df1.join(df2, "key")
+  testSparkPlanMetrics(df, 1, Map(
+nodeId1 -> (("ShuffledHashJoin", Map(
+  "number of output rows" -> 2L))),
+nodeId2 -> (("Exchange", Map(
+  "shuffle records written" -> 2L,
+  "records read" -> 2L))),
+nodeId3 -> (("Exchange", Map(
+  "shuffle records written" -> 10L,
+  "records read" -> 10L,
+enableWholeStage
+  )
   }
 }
   }
 
+  test("ShuffledHashJoin(left,outer) metrics") {

Review comment:
   fixed in the latest commit.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-28 Thread GitBox


sririshindra commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r416806448



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -325,34 +325,57 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
   // +- LocalTableScan(nodeId = 7)
   Seq((1L, 2L, 5L, false), (2L, 3L, 7L, true)).foreach {
 case (nodeId1, nodeId2, nodeId3, enableWholeStage) =>
-val df = df1.join(df2, "key")
-testSparkPlanMetrics(df, 1, Map(
-  nodeId1 -> (("ShuffledHashJoin", Map(
-"number of output rows" -> 2L))),
-  nodeId2 -> (("Exchange", Map(
-"shuffle records written" -> 2L,
-"records read" -> 2L))),
-  nodeId3 -> (("Exchange", Map(
-"shuffle records written" -> 10L,
-"records read" -> 10L,
-  enableWholeStage
-)
+  val df = df1.join(df2, "key")
+  testSparkPlanMetrics(df, 1, Map(
+nodeId1 -> (("ShuffledHashJoin", Map(
+  "number of output rows" -> 2L))),
+nodeId2 -> (("Exchange", Map(
+  "shuffle records written" -> 2L,
+  "records read" -> 2L))),
+nodeId3 -> (("Exchange", Map(
+  "shuffle records written" -> 10L,
+  "records read" -> 10L,
+enableWholeStage
+  )
   }
 }
   }
 
+  test("ShuffledHashJoin(left,outer) metrics") {
+withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "2",
+  SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
+  val leftDf = Seq((1, "1"), (2, "2")).toDF("key", "value")
+  val rightDf = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", 
"value")
+  Seq((0L, "right_outer", leftDf, rightDf, 10L, false),
+(0L, "left_outer", rightDf, leftDf, 10L, false),
+(0L, "right_outer", leftDf, rightDf, 10L, true),
+(0L, "left_outer", rightDf, leftDf, 10L, true),
+(2L, "left_anti", rightDf, leftDf, 8L, true),
+(2L, "left_semi", rightDf, leftDf, 2L, true),
+(1L, "left_anti", rightDf, leftDf, 8L, false),
+(1L, "left_semi", rightDf, leftDf, 2L, false))
+.foreach { case (nodeId, joinType, leftDf, rightDf, rows, 
enableWholeStage) =>
+  val df = leftDf.hint("shuffle_hash").join(
+rightDf.hint("shuffle_hash"), $"key" === $"key2", joinType)
+  testSparkPlanMetrics(df, 1, Map(
+nodeId -> (("ShuffledHashJoin", Map(
+  "number of output rows" -> rows,
+enableWholeStage
+  )
+}
+}
+  }
+
   test("BroadcastHashJoin(outer) metrics") {
 val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
 val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value")
 // Assume the execution plan is
 // ... -> BroadcastHashJoin(nodeId = 0)
-Seq(("left_outer", 0L, 5L, false), ("right_outer", 0L, 6L, false),
-  ("left_outer", 1L, 5L, true), ("right_outer", 1L, 6L, true)).foreach {
-  case (joinType, nodeId, numRows, enableWholeStage) =>
+Seq(("left_outer", 0L, 5L, false), ("right_outer", 0L, 6L, false), 
("left_outer", 1L, 5L, true),
+  ("right_outer", 1L, 6L, true)).foreach { case (joinType, nodeId, 
numRows, enableWholeStage) =>
   val df = df1.join(broadcast(df2), $"key" === $"key2", joinType)
   testSparkPlanMetrics(df, 2, Map(
-nodeId -> (("BroadcastHashJoin", Map(
-  "number of output rows" -> numRows,
+nodeId -> (("BroadcastHashJoin", Map("number of output rows" -> 
numRows,

Review comment:
   Fixed in latest commit





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-28 Thread GitBox


sririshindra commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r416806570



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -394,6 +420,21 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
 }
   }
 
+  test("BroadcastLeftAntiJoinHash metrics") {
+val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value")
+// Assume the execution plan is
+// ... -> BroadcastHashJoin(nodeId = 1)

Review comment:
   Fixed in latest commit





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-27 Thread GitBox


sririshindra commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r415882589



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -341,6 +341,62 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
 }
   }
 
+  test("ShuffledHashJoin(outer) metrics") {
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40",

Review comment:
   Fixed in the latest commit.

##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -341,6 +341,62 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
 }
   }
 
+  test("ShuffledHashJoin(outer) metrics") {
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40",
+  SQLConf.SHUFFLE_PARTITIONS.key -> "2",

Review comment:
   Fixed in the latest commit.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-27 Thread GitBox


sririshindra commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r415861439



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -547,9 +590,9 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
 
   test("SPARK-25602: SparkPlan.getByteArrayRdd should not consume the input 
when not necessary") {
 def checkFilterAndRangeMetrics(
-df: DataFrame,
-filterNumOutputs: Int,
-rangeNumOutputs: Int): Unit = {
+df: DataFrame,
+filterNumOutputs: Int,
+rangeNumOutputs: Int): Unit = {

Review comment:
   I am sorry about this. Somehow intellj added the extras spaces. Fixed in 
latest commit.

##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -325,36 +325,61 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
   // +- LocalTableScan(nodeId = 7)
   Seq((1L, 2L, 5L, false), (2L, 3L, 7L, true)).foreach {
 case (nodeId1, nodeId2, nodeId3, enableWholeStage) =>
-val df = df1.join(df2, "key")
-testSparkPlanMetrics(df, 1, Map(
-  nodeId1 -> (("ShuffledHashJoin", Map(
-"number of output rows" -> 2L))),
-  nodeId2 -> (("Exchange", Map(
-"shuffle records written" -> 2L,
-"records read" -> 2L))),
-  nodeId3 -> (("Exchange", Map(
-"shuffle records written" -> 10L,
-"records read" -> 10L,
-  enableWholeStage
-)
+  val df = df1.join(df2, "key")
+  testSparkPlanMetrics(df, 1, Map(
+nodeId1 -> (("ShuffledHashJoin", Map(
+  "number of output rows" -> 2L))),
+nodeId2 -> (("Exchange", Map(
+  "shuffle records written" -> 2L,
+  "records read" -> 2L))),
+nodeId3 -> (("Exchange", Map(
+  "shuffle records written" -> 10L,
+  "records read" -> 10L,
+enableWholeStage
+  )
   }
 }
   }
 
+  test("ShuffledHashJoin(left,outer) metrics") {
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40",
+  SQLConf.SHUFFLE_PARTITIONS.key -> "2",
+  SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
+  val leftDf = Seq((1, "1"), (2, "2")).toDF("key", "value")
+  val rightDf = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", 
"value")
+  Seq((0L, "right_outer", leftDf, rightDf, 10L, false),
+(0L, "left_outer", rightDf, leftDf, 10L, false),
+(0L, "right_outer", leftDf, rightDf, 10L, true),
+(0L, "left_outer", rightDf, leftDf, 10L, true),
+(2L, "left_anti", rightDf, leftDf, 8L, true),
+(2L, "left_semi", rightDf, leftDf, 2L, true),
+(1L, "left_anti", rightDf, leftDf, 8L, false),
+(1L, "left_semi", rightDf, leftDf, 2L, false))
+.foreach { case (nodeId, joinType, leftDf, rightDf, rows, 
enableWholeStage) =>
+  val df = leftDf.join(rightDf.hint("shuffle_hash"), $"key" === 
$"key2", joinType)
+  testSparkPlanMetrics(df, 1, Map(
+nodeId -> (("ShuffledHashJoin", Map(
+  "number of output rows" -> rows,
+enableWholeStage
+  )
+}
+}
+  }
+
   test("BroadcastHashJoin(outer) metrics") {
 val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
 val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value")
 // Assume the execution plan is
 // ... -> BroadcastHashJoin(nodeId = 0)
-Seq(("left_outer", 0L, 5L, false), ("right_outer", 0L, 6L, false),
-  ("left_outer", 1L, 5L, true), ("right_outer", 1L, 6L, true)).foreach {
+Seq(("left_outer", 0L, 5L, false), ("right_outer", 0L, 6L, false), 
("left_outer", 1L, 5L, true),
+  ("right_outer", 1L, 6L, true)).foreach {
   case (joinType, nodeId, numRows, enableWholeStage) =>
-  val df = df1.join(broadcast(df2), $"key" === $"key2", joinType)
-  testSparkPlanMetrics(df, 2, Map(
-nodeId -> (("BroadcastHashJoin", Map(
-  "number of output rows" -> numRows,
-enableWholeStage
-  )
+val df = df1.join(broadcast(df2), $"key" === $"key2", joinType)
+testSparkPlanMetrics(df, 2, Map(
+  nodeId -> (("BroadcastHashJoin", Map(
+"number of output rows" -> numRows,
+  enableWholeStage
+)

Review comment:
   Fixed in latest commit





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---

[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-26 Thread GitBox


sririshindra commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r415370552



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -341,6 +341,62 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
 }
   }
 
+  test("ShuffledHashJoin(outer) metrics") {
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40",
+  SQLConf.SHUFFLE_PARTITIONS.key -> "2",

Review comment:
   In my latest commit combined three different tests into one. Not setting 
this parameter will not trigger the correct join type based on the rules in the 
Spark Strategies file.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-26 Thread GitBox


sririshindra commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r415370594



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -341,6 +341,62 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
 }
   }
 
+  test("ShuffledHashJoin(outer) metrics") {
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40",
+  SQLConf.SHUFFLE_PARTITIONS.key -> "2",
+  SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
+  val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+  val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value")
+
+  Seq(("right_outer", 0L, df1, df2, false), ("left_outer", 0L, df2, df1, 
false),
+("right_outer", 0L, df1, df2, true), ("left_outer", 0L, df2, df1, 
true))
+.foreach { case (joinType, nodeId, df1, df2, enableWholeStage) =>
+  val df = df1.join(df2, $"key" === $"key2", joinType)
+  testSparkPlanMetrics(df, 1, Map(
+nodeId -> (("ShuffledHashJoin", Map(
+  "number of output rows" -> 10L,
+enableWholeStage
+  )
+}
+}
+  }
+
+  test("ShuffledHashJoin(left-anti) metrics") {
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+  SQLConf.SHUFFLE_PARTITIONS.key -> "2",
+  SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
+  val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+  val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value")
+
+  Seq((2L, true), (1L, false)).foreach { case (nodeId, enableWholeStage) =>
+val df = df2.join(df1.hint("shuffle_hash"), $"key" === $"key2", 
"left_anti")
+testSparkPlanMetrics(df, 1, Map(
+  nodeId -> (("ShuffledHashJoin", Map(
+"number of output rows" -> 8L,
+  enableWholeStage
+)
+  }
+}
+  }
+
+  test("ShuffledHashJoin(left-semi) metrics") {
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+  SQLConf.SHUFFLE_PARTITIONS.key -> "2",
+  SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
+  val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+  val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value")
+
+  Seq((1L, false), (2L, true)).foreach { case (nodeId, enableWholeStage) =>
+val df = df2.join(df1.hint("shuffle_hash"), $"key" === $"key2", 
"left_semi")
+testSparkPlanMetrics(df, 1, Map(
+  nodeId -> (("ShuffledHashJoin", Map(
+"number of output rows" -> 2L,

Review comment:
   fixed in the latest commit





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite

2020-04-26 Thread GitBox


sririshindra commented on a change in pull request #28330:
URL: https://github.com/apache/spark/pull/28330#discussion_r415369992



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -341,6 +341,62 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
 }
   }
 
+  test("ShuffledHashJoin(outer) metrics") {
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40",

Review comment:
   Setting it to -1 would trigger SortMergeJoin instead of ShuffledHashJoin 
based on the rules in Spark Strategies.

##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
##
@@ -341,6 +341,62 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
 }
   }
 
+  test("ShuffledHashJoin(outer) metrics") {
+withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40",
+  SQLConf.SHUFFLE_PARTITIONS.key -> "2",
+  SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
+  val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value")
+  val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value")
+
+  Seq(("right_outer", 0L, df1, df2, false), ("left_outer", 0L, df2, df1, 
false),
+("right_outer", 0L, df1, df2, true), ("left_outer", 0L, df2, df1, 
true))
+.foreach { case (joinType, nodeId, df1, df2, enableWholeStage) =>

Review comment:
   fixed in the latest commit





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org