[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...
Github user viirya closed the pull request at: https://github.com/apache/spark/pull/22524 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22524#discussion_r221520801 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -2850,6 +2849,80 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { result.rdd.isEmpty } + test("SPARK-25497: limit operation within whole stage codegen should not " + +"consume all the inputs") { + +val aggDF = spark.range(0, 100, 1, 1) + .groupBy("id") + .count().limit(1).filter('count > 0) +aggDF.collect() +val aggNumRecords = aggDF.queryExecution.sparkPlan.collect { + case h: HashAggregateExec => h +}.map { hashNode => + hashNode.metrics("numOutputRows").value +}.sum +// The first hash aggregate node outputs 100 records. +// The second hash aggregate before local limit outputs 1 record. +assert(aggNumRecords == 101) + +val aggNoGroupingDF = spark.range(0, 100, 1, 1) + .groupBy() + .count().limit(1).filter('count > 0) +aggNoGroupingDF.collect() +val aggNoGroupingNumRecords = aggNoGroupingDF.queryExecution.sparkPlan.collect { + case h: HashAggregateExec => h +}.map { hashNode => + hashNode.metrics("numOutputRows").value +}.sum +assert(aggNoGroupingNumRecords == 2) + +// Sets `TOP_K_SORT_FALLBACK_THRESHOLD` to a low value because we don't want sort + limit +// be planned as `TakeOrderedAndProject` node. +withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1") { + val sortDF = spark.range(0, 100, 1, 1) +.filter('id >= 0) +.limit(10) +.sortWithinPartitions("id") +// use non-deterministic expr to prevent filter be pushed down. +.selectExpr("rand() + id as id2") +.filter('id2 >= 0) +.limit(5) +.selectExpr("1 + id2 as id3") + sortDF.collect() + val sortNumRecords = sortDF.queryExecution.sparkPlan.collect { +case l@LocalLimitExec(_, f: FilterExec) => f + }.map { filterNode => +filterNode.metrics("numOutputRows").value + } + assert(sortNumRecords.sorted === Seq(5, 10)) +} + +val filterDF = spark.range(0, 100, 1, 1).filter('id >= 0) + .selectExpr("id + 1 as id2").limit(1).filter('id > 50) +filterDF.collect() +val filterNumRecords = filterDF.queryExecution.sparkPlan.collect { + case f@FilterExec(_, r: RangeExec) => f --- End diff -- nit: spaces --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22524#discussion_r221520772 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -2850,6 +2849,80 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { result.rdd.isEmpty } + test("SPARK-25497: limit operation within whole stage codegen should not " + +"consume all the inputs") { + +val aggDF = spark.range(0, 100, 1, 1) + .groupBy("id") + .count().limit(1).filter('count > 0) +aggDF.collect() +val aggNumRecords = aggDF.queryExecution.sparkPlan.collect { + case h: HashAggregateExec => h +}.map { hashNode => + hashNode.metrics("numOutputRows").value +}.sum +// The first hash aggregate node outputs 100 records. +// The second hash aggregate before local limit outputs 1 record. +assert(aggNumRecords == 101) + +val aggNoGroupingDF = spark.range(0, 100, 1, 1) + .groupBy() + .count().limit(1).filter('count > 0) +aggNoGroupingDF.collect() +val aggNoGroupingNumRecords = aggNoGroupingDF.queryExecution.sparkPlan.collect { + case h: HashAggregateExec => h +}.map { hashNode => + hashNode.metrics("numOutputRows").value +}.sum +assert(aggNoGroupingNumRecords == 2) + +// Sets `TOP_K_SORT_FALLBACK_THRESHOLD` to a low value because we don't want sort + limit +// be planned as `TakeOrderedAndProject` node. +withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1") { + val sortDF = spark.range(0, 100, 1, 1) +.filter('id >= 0) +.limit(10) +.sortWithinPartitions("id") +// use non-deterministic expr to prevent filter be pushed down. +.selectExpr("rand() + id as id2") +.filter('id2 >= 0) +.limit(5) +.selectExpr("1 + id2 as id3") + sortDF.collect() + val sortNumRecords = sortDF.queryExecution.sparkPlan.collect { +case l@LocalLimitExec(_, f: FilterExec) => f --- End diff -- nit: spaces --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22524#discussion_r221520640 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -2850,6 +2849,80 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { result.rdd.isEmpty } + test("SPARK-25497: limit operation within whole stage codegen should not " + +"consume all the inputs") { + +val aggDF = spark.range(0, 100, 1, 1) + .groupBy("id") + .count().limit(1).filter('count > 0) +aggDF.collect() +val aggNumRecords = aggDF.queryExecution.sparkPlan.collect { + case h: HashAggregateExec => h +}.map { hashNode => + hashNode.metrics("numOutputRows").value +}.sum +// The first hash aggregate node outputs 100 records. +// The second hash aggregate before local limit outputs 1 record. +assert(aggNumRecords == 101) + +val aggNoGroupingDF = spark.range(0, 100, 1, 1) + .groupBy() + .count().limit(1).filter('count > 0) +aggNoGroupingDF.collect() +val aggNoGroupingNumRecords = aggNoGroupingDF.queryExecution.sparkPlan.collect { + case h: HashAggregateExec => h +}.map { hashNode => + hashNode.metrics("numOutputRows").value +}.sum +assert(aggNoGroupingNumRecords == 2) + +// Sets `TOP_K_SORT_FALLBACK_THRESHOLD` to a low value because we don't want sort + limit +// be planned as `TakeOrderedAndProject` node. +withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1") { + val sortDF = spark.range(0, 100, 1, 1) +.filter('id >= 0) +.limit(10) +.sortWithinPartitions("id") +// use non-deterministic expr to prevent filter be pushed down. +.selectExpr("rand() + id as id2") +.filter('id2 >= 0) +.limit(5) +.selectExpr("1 + id2 as id3") + sortDF.collect() + val sortNumRecords = sortDF.queryExecution.sparkPlan.collect { +case l@LocalLimitExec(_, f: FilterExec) => f + }.map { filterNode => +filterNode.metrics("numOutputRows").value + } + assert(sortNumRecords.sorted === Seq(5, 10)) +} + +val filterDF = spark.range(0, 100, 1, 1).filter('id >= 0) + .selectExpr("id + 1 as id2").limit(1).filter('id > 50) +filterDF.collect() +val filterNumRecords = filterDF.queryExecution.sparkPlan.collect { + case f@FilterExec(_, r: RangeExec) => f +}.map { case filterNode => + filterNode.metrics("numOutputRows").value +}.head +assert(filterNumRecords == 1) + +val twoLimitsDF = spark.range(0, 100, 1, 1) + .filter('id >= 0) + .limit(1) + .selectExpr("id + 1 as id2") + .limit(2) + .filter('id2 >= 0) +twoLimitsDF.collect() +val twoLimitsDFNumRecords = twoLimitsDF.queryExecution.sparkPlan.collect { + case f@FilterExec(_, _: RangeExec) => f --- End diff -- nit: spaces --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22524#discussion_r220054697 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -71,22 +71,14 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { -val stopEarly = - ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "stopEarly") // init as stopEarly = false - -ctx.addNewFunction("stopEarly", s""" - @Override - protected boolean stopEarly() { -return $stopEarly; - } -""", inlineToOuterClass = true) val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "count") // init as count = 0 s""" | if ($countTerm < $limit) { | $countTerm += 1; + | if ($countTerm == $limit) { + | setStopEarly(true); --- End diff -- Actually as I'm just looking at the query again, there should not be a `stopEarly` check inside `consume` that prevents us to consume the last record. Because the check should be at the outer while loop. The cases having `stopEarly` check inside `consume`, is blocking operators like sort and aggregate, for them we need to reset the flag. But for safety, I think I will also move this after `consume`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22524#discussion_r220048264 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -71,22 +71,14 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { -val stopEarly = - ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "stopEarly") // init as stopEarly = false - -ctx.addNewFunction("stopEarly", s""" - @Override - protected boolean stopEarly() { -return $stopEarly; - } -""", inlineToOuterClass = true) val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "count") // init as count = 0 s""" | if ($countTerm < $limit) { | $countTerm += 1; + | if ($countTerm == $limit) { + | setStopEarly(true); --- End diff -- Oh, I see. And I think `shouldStop` shouldn't be called inside `consume`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22524#discussion_r220046724 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -71,22 +71,14 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { -val stopEarly = - ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "stopEarly") // init as stopEarly = false - -ctx.addNewFunction("stopEarly", s""" - @Override - protected boolean stopEarly() { -return $stopEarly; - } -""", inlineToOuterClass = true) val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "count") // init as count = 0 s""" | if ($countTerm < $limit) { | $countTerm += 1; + | if ($countTerm == $limit) { + | setStopEarly(true); --- End diff -- `if ($countTerm == $limit)` means this is the last record, and we should still consume it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22524#discussion_r220046213 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java --- @@ -73,14 +78,21 @@ public void append(InternalRow row) { currentRows.add(row); } + /** + * Sets the flag of stopping the query execution early. + */ + public void setStopEarly(boolean value) { --- End diff -- You also hint me that we should reset stop early flag in sort exec node too. I will add it and related test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22524#discussion_r220046092 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java --- @@ -73,14 +78,21 @@ public void append(InternalRow row) { currentRows.add(row); } + /** + * Sets the flag of stopping the query execution early. + */ + public void setStopEarly(boolean value) { --- End diff -- Ok. Let me add it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22524#discussion_r220044740 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -71,22 +71,14 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { -val stopEarly = - ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "stopEarly") // init as stopEarly = false - -ctx.addNewFunction("stopEarly", s""" - @Override - protected boolean stopEarly() { -return $stopEarly; - } -""", inlineToOuterClass = true) val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "count") // init as count = 0 s""" | if ($countTerm < $limit) { | $countTerm += 1; + | if ($countTerm == $limit) { + | setStopEarly(true); --- End diff -- won't we call `shouldStop` inside `consume`? if it does, `stopEarly` will not be set. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22524#discussion_r220044584 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala --- @@ -465,13 +465,18 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) | $initRangeFuncName(partitionIndex); | } | - | while (true) { + | while (true && !stopEarly()) { | long $range = $batchEnd - $number; | if ($range != 0L) { | int $localEnd = (int)($range / ${step}L); | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) { | long $value = ((long)$localIdx * ${step}L) + $number; + | $numOutput.add(1); --- End diff -- ok. then I should revert the `numOutput` change if the number of records can be a bit inaccurate. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22524#discussion_r220044271 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -71,22 +71,14 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { -val stopEarly = - ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "stopEarly") // init as stopEarly = false - -ctx.addNewFunction("stopEarly", s""" - @Override - protected boolean stopEarly() { -return $stopEarly; - } -""", inlineToOuterClass = true) val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "count") // init as count = 0 s""" | if ($countTerm < $limit) { | $countTerm += 1; + | if ($countTerm == $limit) { + | setStopEarly(true); --- End diff -- shall we do this after `consume`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22524#discussion_r220044149 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala --- @@ -465,13 +465,18 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) | $initRangeFuncName(partitionIndex); | } | - | while (true) { + | while (true && !stopEarly()) { | long $range = $batchEnd - $number; | if ($range != 0L) { | int $localEnd = (int)($range / ${step}L); | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) { | long $value = ((long)$localIdx * ${step}L) + $number; + | $numOutput.add(1); --- End diff -- This is very likely to hit perf regression since it's not a tight loop anymore. We want the range operator to stop earlier for better performance, but it doesn't mean the range operator must return exactly the `limit` number of records. Since the range operator is already returning data in batch, I think we can stop earlier in a batch granularity. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22524#discussion_r220043421 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java --- @@ -73,14 +78,21 @@ public void append(InternalRow row) { currentRows.add(row); } + /** + * Sets the flag of stopping the query execution early. + */ + public void setStopEarly(boolean value) { --- End diff -- can we have more documents about how to use it? For now I see 2 use cases: 1. limit operator should call it with `true` when the limit is hit 2. blocking operator(sort, agg, etc.) should call it with `false` to reset it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22524#discussion_r220040370 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java --- @@ -38,6 +38,11 @@ protected int partitionIndex = -1; + // This indicates whether the query execution should be stopped even the input rows are still + // available. This is used in limit operator. When it reaches the given number of rows to limit, + // this flag is set and the execution should be stopped. + protected boolean isStopEarly = false; --- End diff -- I've added a test for 2 limits. When any of 2 limits sets `isStopEarly`, I think the execution should be stopped. Is there any case opposite to this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22524#discussion_r220039084 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java --- @@ -38,6 +38,11 @@ protected int partitionIndex = -1; + // This indicates whether the query execution should be stopped even the input rows are still + // available. This is used in limit operator. When it reaches the given number of rows to limit, + // this flag is set and the execution should be stopped. + protected boolean isStopEarly = false; --- End diff -- what if there are 2 limits in the query? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22524#discussion_r219667461 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala --- @@ -84,9 +84,10 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { s""" | if ($countTerm < $limit) { | $countTerm += 1; + | if ($countTerm == $limit) { + | $stopEarly = true; + | } | ${consume(ctx, input)} - | } else { --- End diff -- do we need to remove this? Isn't it safer to let it here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22524#discussion_r219667410 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala --- @@ -465,13 +465,18 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) | $initRangeFuncName(partitionIndex); | } | - | while (true) { + | while (true && !stopEarly()) { | long $range = $batchEnd - $number; | if ($range != 0L) { | int $localEnd = (int)($range / ${step}L); | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) { | long $value = ((long)$localIdx * ${step}L) + $number; + | $numOutput.add(1); --- End diff -- can this introduce a perf regression? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...
GitHub user viirya opened a pull request: https://github.com/apache/spark/pull/22524 [SPARK-25497][SQL] Limit operation within whole stage codegen should not consume all the inputs ## What changes were proposed in this pull request? This issue was discovered during https://github.com/apache/spark/pull/21738 . It turns out that limit is not whole-stage-codegened correctly and always consume all the inputs. This patch fixes limit's whole-stage codegen. Some nodes like hash aggregate and range have loop structure that doesn't properly check the condition to stop early. It is fixed to stop consume inputs after limit number is reached. ## How was this patch tested? Added tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 SPARK-25497 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22524.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22524 commit 12703bded143002be417ffa247eef4a970ffd54c Author: Liang-Chi Hsieh Date: 2018-09-22T09:34:41Z limit operation within whole stage codegen should not consume all the inputs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org