[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite
sririshindra commented on a change in pull request #28330: URL: https://github.com/apache/spark/pull/28330#discussion_r417569102 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ## @@ -365,16 +384,19 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils withTempView("testDataForJoin") { // Assume the execution plan is // ... -> BroadcastNestedLoopJoin(nodeId = 1) -> TungstenProject(nodeId = 0) -val query = "SELECT * FROM testData2 left JOIN testDataForJoin ON " + +val leftQuery = "SELECT * FROM testData2 LEFT JOIN testDataForJoin ON " + "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a" -Seq(false, true).foreach { enableWholeStage => - val df = spark.sql(query) - testSparkPlanMetrics(df, 2, Map( -0L -> (("BroadcastNestedLoopJoin", Map( - "number of output rows" -> 12L, -enableWholeStage - ) -} +val rightQuery = "SELECT * FROM testData2 RIGHT JOIN testDataForJoin ON " + + "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a" +Seq((leftQuery, false), (rightQuery, false), (leftQuery, true), (rightQuery, true)) + .foreach { case (query, enableWholeStage) => +val df = spark.sql(query) +testSparkPlanMetrics(df, 2, Map( + 0L -> (("BroadcastNestedLoopJoin", Map( +"number of output rows" -> 12L, + enableWholeStage +) + } Review comment: Fixed in latest commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite
sririshindra commented on a change in pull request #28330: URL: https://github.com/apache/spark/pull/28330#discussion_r417357078 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ## @@ -365,16 +384,19 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils withTempView("testDataForJoin") { // Assume the execution plan is // ... -> BroadcastNestedLoopJoin(nodeId = 1) -> TungstenProject(nodeId = 0) -val query = "SELECT * FROM testData2 left JOIN testDataForJoin ON " + +val leftQuery = "SELECT * FROM testData2 LEFT JOIN testDataForJoin ON " + "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a" -Seq(false, true).foreach { enableWholeStage => - val df = spark.sql(query) - testSparkPlanMetrics(df, 2, Map( -0L -> (("BroadcastNestedLoopJoin", Map( - "number of output rows" -> 12L, -enableWholeStage - ) -} +val rightQuery = "SELECT * FROM testData2 RIGHT JOIN testDataForJoin ON " + + "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a" +Seq((leftQuery, false), (rightQuery, false), (leftQuery, true), (rightQuery, true)) + .foreach { case (query, enableWholeStage) => +val df = spark.sql(query) +testSparkPlanMetrics(df, 2, Map( + 0L -> (("BroadcastNestedLoopJoin", Map( +"number of output rows" -> 12L, + enableWholeStage +) + } Review comment: I tried reformatting this piece of code using intellij reformat feature and that didn't change anything. Could you please let me know where the indentation is wrong so that I can fix it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite
sririshindra commented on a change in pull request #28330: URL: https://github.com/apache/spark/pull/28330#discussion_r417357078 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ## @@ -365,16 +384,19 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils withTempView("testDataForJoin") { // Assume the execution plan is // ... -> BroadcastNestedLoopJoin(nodeId = 1) -> TungstenProject(nodeId = 0) -val query = "SELECT * FROM testData2 left JOIN testDataForJoin ON " + +val leftQuery = "SELECT * FROM testData2 LEFT JOIN testDataForJoin ON " + "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a" -Seq(false, true).foreach { enableWholeStage => - val df = spark.sql(query) - testSparkPlanMetrics(df, 2, Map( -0L -> (("BroadcastNestedLoopJoin", Map( - "number of output rows" -> 12L, -enableWholeStage - ) -} +val rightQuery = "SELECT * FROM testData2 RIGHT JOIN testDataForJoin ON " + + "testData2.a * testDataForJoin.a != testData2.a + testDataForJoin.a" +Seq((leftQuery, false), (rightQuery, false), (leftQuery, true), (rightQuery, true)) + .foreach { case (query, enableWholeStage) => +val df = spark.sql(query) +testSparkPlanMetrics(df, 2, Map( + 0L -> (("BroadcastNestedLoopJoin", Map( +"number of output rows" -> 12L, + enableWholeStage +) + } Review comment: I tried reformatting this piece of code using intellij reformat feature and that didn't change anything. Could you please let me know where the indentation is wrong so that I can fix it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite
sririshindra commented on a change in pull request #28330: URL: https://github.com/apache/spark/pull/28330#discussion_r416809429 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ## @@ -325,34 +325,57 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils // +- LocalTableScan(nodeId = 7) Seq((1L, 2L, 5L, false), (2L, 3L, 7L, true)).foreach { case (nodeId1, nodeId2, nodeId3, enableWholeStage) => -val df = df1.join(df2, "key") -testSparkPlanMetrics(df, 1, Map( - nodeId1 -> (("ShuffledHashJoin", Map( -"number of output rows" -> 2L))), - nodeId2 -> (("Exchange", Map( -"shuffle records written" -> 2L, -"records read" -> 2L))), - nodeId3 -> (("Exchange", Map( -"shuffle records written" -> 10L, -"records read" -> 10L, - enableWholeStage -) + val df = df1.join(df2, "key") + testSparkPlanMetrics(df, 1, Map( +nodeId1 -> (("ShuffledHashJoin", Map( + "number of output rows" -> 2L))), +nodeId2 -> (("Exchange", Map( + "shuffle records written" -> 2L, + "records read" -> 2L))), +nodeId3 -> (("Exchange", Map( + "shuffle records written" -> 10L, + "records read" -> 10L, +enableWholeStage Review comment: I think this indentation is correct. I know it is just a small cosmetic change and probably doesn't need to be included in this PR. I will remove it if you think this should not be there. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite
sririshindra commented on a change in pull request #28330: URL: https://github.com/apache/spark/pull/28330#discussion_r416806910 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ## @@ -325,34 +325,57 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils // +- LocalTableScan(nodeId = 7) Seq((1L, 2L, 5L, false), (2L, 3L, 7L, true)).foreach { case (nodeId1, nodeId2, nodeId3, enableWholeStage) => -val df = df1.join(df2, "key") -testSparkPlanMetrics(df, 1, Map( - nodeId1 -> (("ShuffledHashJoin", Map( -"number of output rows" -> 2L))), - nodeId2 -> (("Exchange", Map( -"shuffle records written" -> 2L, -"records read" -> 2L))), - nodeId3 -> (("Exchange", Map( -"shuffle records written" -> 10L, -"records read" -> 10L, - enableWholeStage -) + val df = df1.join(df2, "key") + testSparkPlanMetrics(df, 1, Map( +nodeId1 -> (("ShuffledHashJoin", Map( + "number of output rows" -> 2L))), +nodeId2 -> (("Exchange", Map( + "shuffle records written" -> 2L, + "records read" -> 2L))), +nodeId3 -> (("Exchange", Map( + "shuffle records written" -> 10L, + "records read" -> 10L, +enableWholeStage + ) } } } + test("ShuffledHashJoin(left,outer) metrics") { +withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "2", Review comment: You are right. I should have removed these in the last commit itself. Fixed in the latest commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite
sririshindra commented on a change in pull request #28330: URL: https://github.com/apache/spark/pull/28330#discussion_r416807097 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ## @@ -325,34 +325,57 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils // +- LocalTableScan(nodeId = 7) Seq((1L, 2L, 5L, false), (2L, 3L, 7L, true)).foreach { case (nodeId1, nodeId2, nodeId3, enableWholeStage) => -val df = df1.join(df2, "key") -testSparkPlanMetrics(df, 1, Map( - nodeId1 -> (("ShuffledHashJoin", Map( -"number of output rows" -> 2L))), - nodeId2 -> (("Exchange", Map( -"shuffle records written" -> 2L, -"records read" -> 2L))), - nodeId3 -> (("Exchange", Map( -"shuffle records written" -> 10L, -"records read" -> 10L, - enableWholeStage -) + val df = df1.join(df2, "key") + testSparkPlanMetrics(df, 1, Map( +nodeId1 -> (("ShuffledHashJoin", Map( + "number of output rows" -> 2L))), +nodeId2 -> (("Exchange", Map( + "shuffle records written" -> 2L, + "records read" -> 2L))), +nodeId3 -> (("Exchange", Map( + "shuffle records written" -> 10L, + "records read" -> 10L, +enableWholeStage + ) } } } + test("ShuffledHashJoin(left,outer) metrics") { Review comment: fixed in the latest commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite
sririshindra commented on a change in pull request #28330: URL: https://github.com/apache/spark/pull/28330#discussion_r416806448 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ## @@ -325,34 +325,57 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils // +- LocalTableScan(nodeId = 7) Seq((1L, 2L, 5L, false), (2L, 3L, 7L, true)).foreach { case (nodeId1, nodeId2, nodeId3, enableWholeStage) => -val df = df1.join(df2, "key") -testSparkPlanMetrics(df, 1, Map( - nodeId1 -> (("ShuffledHashJoin", Map( -"number of output rows" -> 2L))), - nodeId2 -> (("Exchange", Map( -"shuffle records written" -> 2L, -"records read" -> 2L))), - nodeId3 -> (("Exchange", Map( -"shuffle records written" -> 10L, -"records read" -> 10L, - enableWholeStage -) + val df = df1.join(df2, "key") + testSparkPlanMetrics(df, 1, Map( +nodeId1 -> (("ShuffledHashJoin", Map( + "number of output rows" -> 2L))), +nodeId2 -> (("Exchange", Map( + "shuffle records written" -> 2L, + "records read" -> 2L))), +nodeId3 -> (("Exchange", Map( + "shuffle records written" -> 10L, + "records read" -> 10L, +enableWholeStage + ) } } } + test("ShuffledHashJoin(left,outer) metrics") { +withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "2", + SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { + val leftDf = Seq((1, "1"), (2, "2")).toDF("key", "value") + val rightDf = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value") + Seq((0L, "right_outer", leftDf, rightDf, 10L, false), +(0L, "left_outer", rightDf, leftDf, 10L, false), +(0L, "right_outer", leftDf, rightDf, 10L, true), +(0L, "left_outer", rightDf, leftDf, 10L, true), +(2L, "left_anti", rightDf, leftDf, 8L, true), +(2L, "left_semi", rightDf, leftDf, 2L, true), +(1L, "left_anti", rightDf, leftDf, 8L, false), +(1L, "left_semi", rightDf, leftDf, 2L, false)) +.foreach { case (nodeId, joinType, leftDf, rightDf, rows, enableWholeStage) => + val df = leftDf.hint("shuffle_hash").join( +rightDf.hint("shuffle_hash"), $"key" === $"key2", joinType) + testSparkPlanMetrics(df, 1, Map( +nodeId -> (("ShuffledHashJoin", Map( + "number of output rows" -> rows, +enableWholeStage + ) +} +} + } + test("BroadcastHashJoin(outer) metrics") { val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value") val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value") // Assume the execution plan is // ... -> BroadcastHashJoin(nodeId = 0) -Seq(("left_outer", 0L, 5L, false), ("right_outer", 0L, 6L, false), - ("left_outer", 1L, 5L, true), ("right_outer", 1L, 6L, true)).foreach { - case (joinType, nodeId, numRows, enableWholeStage) => +Seq(("left_outer", 0L, 5L, false), ("right_outer", 0L, 6L, false), ("left_outer", 1L, 5L, true), + ("right_outer", 1L, 6L, true)).foreach { case (joinType, nodeId, numRows, enableWholeStage) => val df = df1.join(broadcast(df2), $"key" === $"key2", joinType) testSparkPlanMetrics(df, 2, Map( -nodeId -> (("BroadcastHashJoin", Map( - "number of output rows" -> numRows, +nodeId -> (("BroadcastHashJoin", Map("number of output rows" -> numRows, Review comment: Fixed in latest commit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite
sririshindra commented on a change in pull request #28330: URL: https://github.com/apache/spark/pull/28330#discussion_r416806570 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ## @@ -394,6 +420,21 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils } } + test("BroadcastLeftAntiJoinHash metrics") { +val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") +val df2 = Seq((1, "1"), (2, "2"), (3, "3"), (4, "4")).toDF("key2", "value") +// Assume the execution plan is +// ... -> BroadcastHashJoin(nodeId = 1) Review comment: Fixed in latest commit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite
sririshindra commented on a change in pull request #28330: URL: https://github.com/apache/spark/pull/28330#discussion_r415882589 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ## @@ -341,6 +341,62 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils } } + test("ShuffledHashJoin(outer) metrics") { +withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40", Review comment: Fixed in the latest commit. ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ## @@ -341,6 +341,62 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils } } + test("ShuffledHashJoin(outer) metrics") { +withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40", + SQLConf.SHUFFLE_PARTITIONS.key -> "2", Review comment: Fixed in the latest commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite
sririshindra commented on a change in pull request #28330: URL: https://github.com/apache/spark/pull/28330#discussion_r415861439 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ## @@ -547,9 +590,9 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils test("SPARK-25602: SparkPlan.getByteArrayRdd should not consume the input when not necessary") { def checkFilterAndRangeMetrics( -df: DataFrame, -filterNumOutputs: Int, -rangeNumOutputs: Int): Unit = { +df: DataFrame, +filterNumOutputs: Int, +rangeNumOutputs: Int): Unit = { Review comment: I am sorry about this. Somehow intellj added the extras spaces. Fixed in latest commit. ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ## @@ -325,36 +325,61 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils // +- LocalTableScan(nodeId = 7) Seq((1L, 2L, 5L, false), (2L, 3L, 7L, true)).foreach { case (nodeId1, nodeId2, nodeId3, enableWholeStage) => -val df = df1.join(df2, "key") -testSparkPlanMetrics(df, 1, Map( - nodeId1 -> (("ShuffledHashJoin", Map( -"number of output rows" -> 2L))), - nodeId2 -> (("Exchange", Map( -"shuffle records written" -> 2L, -"records read" -> 2L))), - nodeId3 -> (("Exchange", Map( -"shuffle records written" -> 10L, -"records read" -> 10L, - enableWholeStage -) + val df = df1.join(df2, "key") + testSparkPlanMetrics(df, 1, Map( +nodeId1 -> (("ShuffledHashJoin", Map( + "number of output rows" -> 2L))), +nodeId2 -> (("Exchange", Map( + "shuffle records written" -> 2L, + "records read" -> 2L))), +nodeId3 -> (("Exchange", Map( + "shuffle records written" -> 10L, + "records read" -> 10L, +enableWholeStage + ) } } } + test("ShuffledHashJoin(left,outer) metrics") { +withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40", + SQLConf.SHUFFLE_PARTITIONS.key -> "2", + SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { + val leftDf = Seq((1, "1"), (2, "2")).toDF("key", "value") + val rightDf = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value") + Seq((0L, "right_outer", leftDf, rightDf, 10L, false), +(0L, "left_outer", rightDf, leftDf, 10L, false), +(0L, "right_outer", leftDf, rightDf, 10L, true), +(0L, "left_outer", rightDf, leftDf, 10L, true), +(2L, "left_anti", rightDf, leftDf, 8L, true), +(2L, "left_semi", rightDf, leftDf, 2L, true), +(1L, "left_anti", rightDf, leftDf, 8L, false), +(1L, "left_semi", rightDf, leftDf, 2L, false)) +.foreach { case (nodeId, joinType, leftDf, rightDf, rows, enableWholeStage) => + val df = leftDf.join(rightDf.hint("shuffle_hash"), $"key" === $"key2", joinType) + testSparkPlanMetrics(df, 1, Map( +nodeId -> (("ShuffledHashJoin", Map( + "number of output rows" -> rows, +enableWholeStage + ) +} +} + } + test("BroadcastHashJoin(outer) metrics") { val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value") val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value") // Assume the execution plan is // ... -> BroadcastHashJoin(nodeId = 0) -Seq(("left_outer", 0L, 5L, false), ("right_outer", 0L, 6L, false), - ("left_outer", 1L, 5L, true), ("right_outer", 1L, 6L, true)).foreach { +Seq(("left_outer", 0L, 5L, false), ("right_outer", 0L, 6L, false), ("left_outer", 1L, 5L, true), + ("right_outer", 1L, 6L, true)).foreach { case (joinType, nodeId, numRows, enableWholeStage) => - val df = df1.join(broadcast(df2), $"key" === $"key2", joinType) - testSparkPlanMetrics(df, 2, Map( -nodeId -> (("BroadcastHashJoin", Map( - "number of output rows" -> numRows, -enableWholeStage - ) +val df = df1.join(broadcast(df2), $"key" === $"key2", joinType) +testSparkPlanMetrics(df, 2, Map( + nodeId -> (("BroadcastHashJoin", Map( +"number of output rows" -> numRows, + enableWholeStage +) Review comment: Fixed in latest commit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org ---
[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite
sririshindra commented on a change in pull request #28330: URL: https://github.com/apache/spark/pull/28330#discussion_r415370552 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ## @@ -341,6 +341,62 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils } } + test("ShuffledHashJoin(outer) metrics") { +withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40", + SQLConf.SHUFFLE_PARTITIONS.key -> "2", Review comment: In my latest commit combined three different tests into one. Not setting this parameter will not trigger the correct join type based on the rules in the Spark Strategies file. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite
sririshindra commented on a change in pull request #28330: URL: https://github.com/apache/spark/pull/28330#discussion_r415370594 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ## @@ -341,6 +341,62 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils } } + test("ShuffledHashJoin(outer) metrics") { +withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40", + SQLConf.SHUFFLE_PARTITIONS.key -> "2", + SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { + val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") + val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value") + + Seq(("right_outer", 0L, df1, df2, false), ("left_outer", 0L, df2, df1, false), +("right_outer", 0L, df1, df2, true), ("left_outer", 0L, df2, df1, true)) +.foreach { case (joinType, nodeId, df1, df2, enableWholeStage) => + val df = df1.join(df2, $"key" === $"key2", joinType) + testSparkPlanMetrics(df, 1, Map( +nodeId -> (("ShuffledHashJoin", Map( + "number of output rows" -> 10L, +enableWholeStage + ) +} +} + } + + test("ShuffledHashJoin(left-anti) metrics") { +withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.SHUFFLE_PARTITIONS.key -> "2", + SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { + val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") + val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value") + + Seq((2L, true), (1L, false)).foreach { case (nodeId, enableWholeStage) => +val df = df2.join(df1.hint("shuffle_hash"), $"key" === $"key2", "left_anti") +testSparkPlanMetrics(df, 1, Map( + nodeId -> (("ShuffledHashJoin", Map( +"number of output rows" -> 8L, + enableWholeStage +) + } +} + } + + test("ShuffledHashJoin(left-semi) metrics") { +withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.SHUFFLE_PARTITIONS.key -> "2", + SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { + val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") + val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value") + + Seq((1L, false), (2L, true)).foreach { case (nodeId, enableWholeStage) => +val df = df2.join(df1.hint("shuffle_hash"), $"key" === $"key2", "left_semi") +testSparkPlanMetrics(df, 1, Map( + nodeId -> (("ShuffledHashJoin", Map( +"number of output rows" -> 2L, Review comment: fixed in the latest commit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] sririshindra commented on a change in pull request #28330: [SPARK-31377][SQL][TEST] Added unit tests to 'number of output rows metric' for some joins in SQLMetricSuite
sririshindra commented on a change in pull request #28330: URL: https://github.com/apache/spark/pull/28330#discussion_r415369992 ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ## @@ -341,6 +341,62 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils } } + test("ShuffledHashJoin(outer) metrics") { +withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40", Review comment: Setting it to -1 would trigger SortMergeJoin instead of ShuffledHashJoin based on the rules in Spark Strategies. ## File path: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ## @@ -341,6 +341,62 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils } } + test("ShuffledHashJoin(outer) metrics") { +withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40", + SQLConf.SHUFFLE_PARTITIONS.key -> "2", + SQLConf.PREFER_SORTMERGEJOIN.key -> "false") { + val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") + val df2 = (1 to 10).map(i => (i, i.toString)).toSeq.toDF("key2", "value") + + Seq(("right_outer", 0L, df1, df2, false), ("left_outer", 0L, df2, df1, false), +("right_outer", 0L, df1, df2, true), ("left_outer", 0L, df2, df1, true)) +.foreach { case (joinType, nodeId, df1, df2, enableWholeStage) => Review comment: fixed in the latest commit This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org