[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

2018-10-05 Thread viirya
Github user viirya closed the pull request at:

https://github.com/apache/spark/pull/22524


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

2018-10-01 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22524#discussion_r221520801
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
---
@@ -2850,6 +2849,80 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
 result.rdd.isEmpty
   }
 
+  test("SPARK-25497: limit operation within whole stage codegen should not 
" +
+"consume all the inputs") {
+
+val aggDF = spark.range(0, 100, 1, 1)
+  .groupBy("id")
+  .count().limit(1).filter('count > 0)
+aggDF.collect()
+val aggNumRecords = aggDF.queryExecution.sparkPlan.collect {
+  case h: HashAggregateExec => h
+}.map { hashNode =>
+  hashNode.metrics("numOutputRows").value
+}.sum
+// The first hash aggregate node outputs 100 records.
+// The second hash aggregate before local limit outputs 1 record.
+assert(aggNumRecords == 101)
+
+val aggNoGroupingDF = spark.range(0, 100, 1, 1)
+  .groupBy()
+  .count().limit(1).filter('count > 0)
+aggNoGroupingDF.collect()
+val aggNoGroupingNumRecords = 
aggNoGroupingDF.queryExecution.sparkPlan.collect {
+  case h: HashAggregateExec => h
+}.map { hashNode =>
+  hashNode.metrics("numOutputRows").value
+}.sum
+assert(aggNoGroupingNumRecords == 2)
+
+// Sets `TOP_K_SORT_FALLBACK_THRESHOLD` to a low value because we 
don't want sort + limit
+// be planned as `TakeOrderedAndProject` node.
+withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1") {
+  val sortDF = spark.range(0, 100, 1, 1)
+.filter('id >= 0)
+.limit(10)
+.sortWithinPartitions("id")
+// use non-deterministic expr to prevent filter be pushed down.
+.selectExpr("rand() + id as id2")
+.filter('id2 >= 0)
+.limit(5)
+.selectExpr("1 + id2 as id3")
+  sortDF.collect()
+  val sortNumRecords = sortDF.queryExecution.sparkPlan.collect {
+case l@LocalLimitExec(_, f: FilterExec) => f
+  }.map { filterNode =>
+filterNode.metrics("numOutputRows").value
+  }
+  assert(sortNumRecords.sorted === Seq(5, 10))
+}
+
+val filterDF = spark.range(0, 100, 1, 1).filter('id >= 0)
+  .selectExpr("id + 1 as id2").limit(1).filter('id > 50)
+filterDF.collect()
+val filterNumRecords = filterDF.queryExecution.sparkPlan.collect {
+  case f@FilterExec(_, r: RangeExec) => f
--- End diff --

nit: spaces


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

2018-10-01 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22524#discussion_r221520772
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
---
@@ -2850,6 +2849,80 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
 result.rdd.isEmpty
   }
 
+  test("SPARK-25497: limit operation within whole stage codegen should not 
" +
+"consume all the inputs") {
+
+val aggDF = spark.range(0, 100, 1, 1)
+  .groupBy("id")
+  .count().limit(1).filter('count > 0)
+aggDF.collect()
+val aggNumRecords = aggDF.queryExecution.sparkPlan.collect {
+  case h: HashAggregateExec => h
+}.map { hashNode =>
+  hashNode.metrics("numOutputRows").value
+}.sum
+// The first hash aggregate node outputs 100 records.
+// The second hash aggregate before local limit outputs 1 record.
+assert(aggNumRecords == 101)
+
+val aggNoGroupingDF = spark.range(0, 100, 1, 1)
+  .groupBy()
+  .count().limit(1).filter('count > 0)
+aggNoGroupingDF.collect()
+val aggNoGroupingNumRecords = 
aggNoGroupingDF.queryExecution.sparkPlan.collect {
+  case h: HashAggregateExec => h
+}.map { hashNode =>
+  hashNode.metrics("numOutputRows").value
+}.sum
+assert(aggNoGroupingNumRecords == 2)
+
+// Sets `TOP_K_SORT_FALLBACK_THRESHOLD` to a low value because we 
don't want sort + limit
+// be planned as `TakeOrderedAndProject` node.
+withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1") {
+  val sortDF = spark.range(0, 100, 1, 1)
+.filter('id >= 0)
+.limit(10)
+.sortWithinPartitions("id")
+// use non-deterministic expr to prevent filter be pushed down.
+.selectExpr("rand() + id as id2")
+.filter('id2 >= 0)
+.limit(5)
+.selectExpr("1 + id2 as id3")
+  sortDF.collect()
+  val sortNumRecords = sortDF.queryExecution.sparkPlan.collect {
+case l@LocalLimitExec(_, f: FilterExec) => f
--- End diff --

nit: spaces


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

2018-10-01 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22524#discussion_r221520640
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
---
@@ -2850,6 +2849,80 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
 result.rdd.isEmpty
   }
 
+  test("SPARK-25497: limit operation within whole stage codegen should not 
" +
+"consume all the inputs") {
+
+val aggDF = spark.range(0, 100, 1, 1)
+  .groupBy("id")
+  .count().limit(1).filter('count > 0)
+aggDF.collect()
+val aggNumRecords = aggDF.queryExecution.sparkPlan.collect {
+  case h: HashAggregateExec => h
+}.map { hashNode =>
+  hashNode.metrics("numOutputRows").value
+}.sum
+// The first hash aggregate node outputs 100 records.
+// The second hash aggregate before local limit outputs 1 record.
+assert(aggNumRecords == 101)
+
+val aggNoGroupingDF = spark.range(0, 100, 1, 1)
+  .groupBy()
+  .count().limit(1).filter('count > 0)
+aggNoGroupingDF.collect()
+val aggNoGroupingNumRecords = 
aggNoGroupingDF.queryExecution.sparkPlan.collect {
+  case h: HashAggregateExec => h
+}.map { hashNode =>
+  hashNode.metrics("numOutputRows").value
+}.sum
+assert(aggNoGroupingNumRecords == 2)
+
+// Sets `TOP_K_SORT_FALLBACK_THRESHOLD` to a low value because we 
don't want sort + limit
+// be planned as `TakeOrderedAndProject` node.
+withSQLConf(SQLConf.TOP_K_SORT_FALLBACK_THRESHOLD.key -> "1") {
+  val sortDF = spark.range(0, 100, 1, 1)
+.filter('id >= 0)
+.limit(10)
+.sortWithinPartitions("id")
+// use non-deterministic expr to prevent filter be pushed down.
+.selectExpr("rand() + id as id2")
+.filter('id2 >= 0)
+.limit(5)
+.selectExpr("1 + id2 as id3")
+  sortDF.collect()
+  val sortNumRecords = sortDF.queryExecution.sparkPlan.collect {
+case l@LocalLimitExec(_, f: FilterExec) => f
+  }.map { filterNode =>
+filterNode.metrics("numOutputRows").value
+  }
+  assert(sortNumRecords.sorted === Seq(5, 10))
+}
+
+val filterDF = spark.range(0, 100, 1, 1).filter('id >= 0)
+  .selectExpr("id + 1 as id2").limit(1).filter('id > 50)
+filterDF.collect()
+val filterNumRecords = filterDF.queryExecution.sparkPlan.collect {
+  case f@FilterExec(_, r: RangeExec) => f
+}.map { case filterNode =>
+  filterNode.metrics("numOutputRows").value
+}.head
+assert(filterNumRecords == 1)
+
+val twoLimitsDF = spark.range(0, 100, 1, 1)
+  .filter('id >= 0)
+  .limit(1)
+  .selectExpr("id + 1 as id2")
+  .limit(2)
+  .filter('id2 >= 0)
+twoLimitsDF.collect()
+val twoLimitsDFNumRecords = 
twoLimitsDF.queryExecution.sparkPlan.collect {
+  case f@FilterExec(_, _: RangeExec) => f
--- End diff --

nit: spaces


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

2018-09-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22524#discussion_r220054697
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -71,22 +71,14 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
   }
 
   override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: 
ExprCode): String = {
-val stopEarly =
-  ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "stopEarly") // init 
as stopEarly = false
-
-ctx.addNewFunction("stopEarly", s"""
-  @Override
-  protected boolean stopEarly() {
-return $stopEarly;
-  }
-""", inlineToOuterClass = true)
 val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "count") 
// init as count = 0
 s"""
| if ($countTerm < $limit) {
|   $countTerm += 1;
+   |   if ($countTerm == $limit) {
+   | setStopEarly(true);
--- End diff --

Actually as I'm just looking at the query again, there should not be a 
`stopEarly` check inside `consume` that prevents us to consume the last record. 
Because the check should be at the outer while loop.

The cases having `stopEarly` check inside `consume`, is blocking operators 
like sort and aggregate, for them we need to reset the flag.

But for safety, I think I will also move this after `consume`.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

2018-09-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22524#discussion_r220048264
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -71,22 +71,14 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
   }
 
   override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: 
ExprCode): String = {
-val stopEarly =
-  ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "stopEarly") // init 
as stopEarly = false
-
-ctx.addNewFunction("stopEarly", s"""
-  @Override
-  protected boolean stopEarly() {
-return $stopEarly;
-  }
-""", inlineToOuterClass = true)
 val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "count") 
// init as count = 0
 s"""
| if ($countTerm < $limit) {
|   $countTerm += 1;
+   |   if ($countTerm == $limit) {
+   | setStopEarly(true);
--- End diff --

Oh, I see. And I think `shouldStop` shouldn't be called inside `consume`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

2018-09-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22524#discussion_r220046724
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -71,22 +71,14 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
   }
 
   override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: 
ExprCode): String = {
-val stopEarly =
-  ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "stopEarly") // init 
as stopEarly = false
-
-ctx.addNewFunction("stopEarly", s"""
-  @Override
-  protected boolean stopEarly() {
-return $stopEarly;
-  }
-""", inlineToOuterClass = true)
 val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "count") 
// init as count = 0
 s"""
| if ($countTerm < $limit) {
|   $countTerm += 1;
+   |   if ($countTerm == $limit) {
+   | setStopEarly(true);
--- End diff --

`if ($countTerm == $limit)` means this is the last record, and we should 
still consume it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

2018-09-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22524#discussion_r220046213
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java 
---
@@ -73,14 +78,21 @@ public void append(InternalRow row) {
 currentRows.add(row);
   }
 
+  /**
+   * Sets the flag of stopping the query execution early.
+   */
+  public void setStopEarly(boolean value) {
--- End diff --

You also hint me that we should reset stop early flag in sort exec node 
too. I will add it and related test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

2018-09-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22524#discussion_r220046092
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java 
---
@@ -73,14 +78,21 @@ public void append(InternalRow row) {
 currentRows.add(row);
   }
 
+  /**
+   * Sets the flag of stopping the query execution early.
+   */
+  public void setStopEarly(boolean value) {
--- End diff --

Ok. Let me add it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

2018-09-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22524#discussion_r220044740
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -71,22 +71,14 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
   }
 
   override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: 
ExprCode): String = {
-val stopEarly =
-  ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "stopEarly") // init 
as stopEarly = false
-
-ctx.addNewFunction("stopEarly", s"""
-  @Override
-  protected boolean stopEarly() {
-return $stopEarly;
-  }
-""", inlineToOuterClass = true)
 val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "count") 
// init as count = 0
 s"""
| if ($countTerm < $limit) {
|   $countTerm += 1;
+   |   if ($countTerm == $limit) {
+   | setStopEarly(true);
--- End diff --

won't we call `shouldStop` inside `consume`? if it does, `stopEarly` will 
not be set.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

2018-09-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22524#discussion_r220044584
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -465,13 +465,18 @@ case class RangeExec(range: 
org.apache.spark.sql.catalyst.plans.logical.Range)
   |   $initRangeFuncName(partitionIndex);
   | }
   |
-  | while (true) {
+  | while (true && !stopEarly()) {
   |   long $range = $batchEnd - $number;
   |   if ($range != 0L) {
   | int $localEnd = (int)($range / ${step}L);
   | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
   |   long $value = ((long)$localIdx * ${step}L) + $number;
+  |   $numOutput.add(1);
--- End diff --

ok. then I should revert the `numOutput` change if the number of records 
can be a bit inaccurate.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

2018-09-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22524#discussion_r220044271
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -71,22 +71,14 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
   }
 
   override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: 
ExprCode): String = {
-val stopEarly =
-  ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "stopEarly") // init 
as stopEarly = false
-
-ctx.addNewFunction("stopEarly", s"""
-  @Override
-  protected boolean stopEarly() {
-return $stopEarly;
-  }
-""", inlineToOuterClass = true)
 val countTerm = ctx.addMutableState(CodeGenerator.JAVA_INT, "count") 
// init as count = 0
 s"""
| if ($countTerm < $limit) {
|   $countTerm += 1;
+   |   if ($countTerm == $limit) {
+   | setStopEarly(true);
--- End diff --

shall we do this after `consume`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

2018-09-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22524#discussion_r220044149
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -465,13 +465,18 @@ case class RangeExec(range: 
org.apache.spark.sql.catalyst.plans.logical.Range)
   |   $initRangeFuncName(partitionIndex);
   | }
   |
-  | while (true) {
+  | while (true && !stopEarly()) {
   |   long $range = $batchEnd - $number;
   |   if ($range != 0L) {
   | int $localEnd = (int)($range / ${step}L);
   | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
   |   long $value = ((long)$localIdx * ${step}L) + $number;
+  |   $numOutput.add(1);
--- End diff --

This is very likely to hit perf regression since it's not a tight loop 
anymore.

We want the range operator to stop earlier for better performance, but it 
doesn't mean the range operator must return exactly the `limit` number of 
records. Since the range operator is already returning data in batch, I think 
we can stop earlier in a batch granularity.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

2018-09-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22524#discussion_r220043421
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java 
---
@@ -73,14 +78,21 @@ public void append(InternalRow row) {
 currentRows.add(row);
   }
 
+  /**
+   * Sets the flag of stopping the query execution early.
+   */
+  public void setStopEarly(boolean value) {
--- End diff --

can we have more documents about how to use it? For now I see 2 use cases:
1. limit operator should call it with `true` when the limit is hit
2. blocking operator(sort, agg, etc.) should call it with `false` to reset 
it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

2018-09-24 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22524#discussion_r220040370
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java 
---
@@ -38,6 +38,11 @@
 
   protected int partitionIndex = -1;
 
+  // This indicates whether the query execution should be stopped even the 
input rows are still
+  // available. This is used in limit operator. When it reaches the given 
number of rows to limit,
+  // this flag is set and the execution should be stopped.
+  protected boolean isStopEarly = false;
--- End diff --

I've added a test for 2 limits.

When any of 2 limits sets `isStopEarly`, I think the execution should be 
stopped. Is there any case opposite to this?




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

2018-09-24 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22524#discussion_r220039084
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java 
---
@@ -38,6 +38,11 @@
 
   protected int partitionIndex = -1;
 
+  // This indicates whether the query execution should be stopped even the 
input rows are still
+  // available. This is used in limit operator. When it reaches the given 
number of rows to limit,
+  // this flag is set and the execution should be stopped.
+  protected boolean isStopEarly = false;
--- End diff --

what if there are 2 limits in the query? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

2018-09-22 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22524#discussion_r219667461
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala ---
@@ -84,9 +84,10 @@ trait BaseLimitExec extends UnaryExecNode with 
CodegenSupport {
 s"""
| if ($countTerm < $limit) {
|   $countTerm += 1;
+   |   if ($countTerm == $limit) {
+   | $stopEarly = true;
+   |   }
|   ${consume(ctx, input)}
-   | } else {
--- End diff --

do we need to remove this? Isn't it safer to let it here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

2018-09-22 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22524#discussion_r219667410
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -465,13 +465,18 @@ case class RangeExec(range: 
org.apache.spark.sql.catalyst.plans.logical.Range)
   |   $initRangeFuncName(partitionIndex);
   | }
   |
-  | while (true) {
+  | while (true && !stopEarly()) {
   |   long $range = $batchEnd - $number;
   |   if ($range != 0L) {
   | int $localEnd = (int)($range / ${step}L);
   | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
   |   long $value = ((long)$localIdx * ${step}L) + $number;
+  |   $numOutput.add(1);
--- End diff --

can this introduce a perf regression?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22524: [SPARK-25497][SQL] Limit operation within whole s...

2018-09-22 Thread viirya
GitHub user viirya opened a pull request:

https://github.com/apache/spark/pull/22524

[SPARK-25497][SQL] Limit operation within whole stage codegen should not 
consume all the inputs

## What changes were proposed in this pull request?

This issue was discovered during https://github.com/apache/spark/pull/21738 
.

It turns out that limit is not whole-stage-codegened correctly and always 
consume all the inputs.

This patch fixes limit's whole-stage codegen. Some nodes like hash 
aggregate and range have loop structure that doesn't properly check the 
condition to stop early. It is fixed to stop consume inputs after limit number 
is reached.

## How was this patch tested?

Added tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viirya/spark-1 SPARK-25497

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22524.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22524


commit 12703bded143002be417ffa247eef4a970ffd54c
Author: Liang-Chi Hsieh 
Date:   2018-09-22T09:34:41Z

limit operation within whole stage codegen should not consume all the 
inputs.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org