[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...

2016-09-02 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/14573


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...

2016-09-02 Thread robert3005
Github user robert3005 commented on a diff in the pull request:

https://github.com/apache/spark/pull/14573#discussion_r77336634
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -116,6 +116,14 @@ object SQLConf {
 .longConf
 .createWithDefault(10L * 1024 * 1024)
 
+  val LIMIT_SCALE_UP_FACTOR = 
SQLConfigBuilder("spark.sql.limit.scaleUpFactor")
+.internal()
+.doc("Minimal increase rate in number of partitions between attempts 
when executing a take " +
+  "on a query. Higher values lead to more partitions read. Lower 
values might lead to " +
+  "longer execution times as more jobs will be run")
--- End diff --

less tasks per job -> potentially more jobs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...

2016-08-29 Thread ash211
Github user ash211 commented on a diff in the pull request:

https://github.com/apache/spark/pull/14573#discussion_r76717713
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -116,6 +116,14 @@ object SQLConf {
 .longConf
 .createWithDefault(10L * 1024 * 1024)
 
+  val LIMIT_SCALE_UP_FACTOR = 
SQLConfigBuilder("spark.sql.limit.scaleUpFactor")
+.internal()
+.doc("Minimal increase rate in number of partitions between attempts 
when executing a take " +
+  "on a query. Higher values lead to more partitions read. Lower 
values might lead to " +
+  "longer execution times as more jobs will be run")
--- End diff --

is this more tasks being run, not more jobs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...

2016-08-25 Thread robert3005
Github user robert3005 commented on a diff in the pull request:

https://github.com/apache/spark/pull/14573#discussion_r76245512
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1296,6 +1296,7 @@ abstract class RDD[T: ClassTag](
* an exception if called on an RDD of `Nothing` or `Null`.
*/
   def take(num: Int): Array[T] = withScope {
+val takeRampUpRate = conf.getInt("spark.rdd.take.rampUpRate", 4)
--- End diff --

Any pointers on what's the best way to do this? I haven't seen anything 
that's in core and sql. I can not introduce sqlconf option and just read 
sparkconf but then you lose the type safety. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...

2016-08-25 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/14573#discussion_r76244574
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1296,6 +1296,7 @@ abstract class RDD[T: ClassTag](
* an exception if called on an RDD of `Nothing` or `Null`.
*/
   def take(num: Int): Array[T] = withScope {
+val takeRampUpRate = conf.getInt("spark.rdd.take.rampUpRate", 4)
--- End diff --

Hmmm... That is a fair point. Can we share a config?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...

2016-08-25 Thread robert3005
Github user robert3005 commented on a diff in the pull request:

https://github.com/apache/spark/pull/14573#discussion_r76230260
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1296,6 +1296,7 @@ abstract class RDD[T: ClassTag](
* an exception if called on an RDD of `Nothing` or `Null`.
*/
   def take(num: Int): Array[T] = withScope {
+val takeRampUpRate = conf.getInt("spark.rdd.take.rampUpRate", 4)
--- End diff --

One thing is Spark SQL the other is in Core. Where should the final config 
live? I thought you'd want symmetry between those two functions and there 
doesn't seem to be config outside of sparkconf that is available in both places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...

2016-08-25 Thread robert3005
Github user robert3005 commented on a diff in the pull request:

https://github.com/apache/spark/pull/14573#discussion_r76229774
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -311,30 +311,32 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
 val buf = new ArrayBuffer[InternalRow]
 val totalParts = childRDD.partitions.length
 var partsScanned = 0
+
 while (buf.size < n && partsScanned < totalParts) {
   // The number of partitions to try in this iteration. It is ok for 
this number to be
   // greater than totalParts because we actually cap it at totalParts 
in runJob.
   var numPartsToTry = 1L
   if (partsScanned > 0) {
-// If we didn't find any rows after the first iteration, just try 
all partitions next.
-// Otherwise, interpolate the number of partitions we need to try, 
but overestimate it
-// by 50%.
-if (buf.size == 0) {
-  numPartsToTry = totalParts - 1
+// If we didn't find any rows after the previous iteration, 
quadruple and retry.
+// Otherwise, interpolate the number of partitions we need to try, 
but overestimate
+// it by 50%. We also cap the estimation in the end.
+val takeRampUpRate = sqlContext.conf.takeRampUpRate
+if (buf.isEmpty) {
+  numPartsToTry = partsScanned * takeRampUpRate
 } else {
-  numPartsToTry = (1.5 * n * partsScanned / buf.size).toInt
+  // the left side of max is >=1 whenever partsScanned >= 2
+  numPartsToTry = Math.max((1.5 * n * partsScanned / 
buf.size).toInt - partsScanned, 1)
+  numPartsToTry = Math.min(numPartsToTry, partsScanned * 
takeRampUpRate)
 }
   }
-  numPartsToTry = math.max(0, numPartsToTry)  // guard against 
negative num of partitions
 
   val left = n - buf.size
   val p = partsScanned.until(math.min(partsScanned + numPartsToTry, 
totalParts).toInt)
   val sc = sqlContext.sparkContext
-  val res = sc.runJob(childRDD,
-(it: Iterator[Array[Byte]]) => if (it.hasNext) it.next() else 
Array.empty, p)
+  val res = sc.runJob(childRDD, (it: Iterator[Array[Byte]]) => 
it.take(left).toArray, p)
--- End diff --

Yes, this is wrong. Running tests on a fixed version now to confirm I 
haven't missed anything.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...

2016-08-25 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14573#discussion_r76228058
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1296,6 +1296,7 @@ abstract class RDD[T: ClassTag](
* an exception if called on an RDD of `Nothing` or `Null`.
*/
   def take(num: Int): Array[T] = withScope {
+val takeRampUpRate = conf.getInt("spark.rdd.take.rampUpRate", 4)
--- End diff --

Also would someone really want to set this differently for RDD vs other 
things? Trying to avoid proliferation of config. Would someone reasonably ever 
tune this any way?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...

2016-08-25 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/14573#discussion_r76227923
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -311,30 +311,32 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
 val buf = new ArrayBuffer[InternalRow]
 val totalParts = childRDD.partitions.length
 var partsScanned = 0
+
 while (buf.size < n && partsScanned < totalParts) {
   // The number of partitions to try in this iteration. It is ok for 
this number to be
   // greater than totalParts because we actually cap it at totalParts 
in runJob.
   var numPartsToTry = 1L
   if (partsScanned > 0) {
-// If we didn't find any rows after the first iteration, just try 
all partitions next.
-// Otherwise, interpolate the number of partitions we need to try, 
but overestimate it
-// by 50%.
-if (buf.size == 0) {
-  numPartsToTry = totalParts - 1
+// If we didn't find any rows after the previous iteration, 
quadruple and retry.
+// Otherwise, interpolate the number of partitions we need to try, 
but overestimate
+// it by 50%. We also cap the estimation in the end.
+val takeRampUpRate = sqlContext.conf.takeRampUpRate
+if (buf.isEmpty) {
+  numPartsToTry = partsScanned * takeRampUpRate
 } else {
-  numPartsToTry = (1.5 * n * partsScanned / buf.size).toInt
+  // the left side of max is >=1 whenever partsScanned >= 2
+  numPartsToTry = Math.max((1.5 * n * partsScanned / 
buf.size).toInt - partsScanned, 1)
+  numPartsToTry = Math.min(numPartsToTry, partsScanned * 
takeRampUpRate)
 }
   }
-  numPartsToTry = math.max(0, numPartsToTry)  // guard against 
negative num of partitions
 
   val left = n - buf.size
   val p = partsScanned.until(math.min(partsScanned + numPartsToTry, 
totalParts).toInt)
   val sc = sqlContext.sparkContext
-  val res = sc.runJob(childRDD,
-(it: Iterator[Array[Byte]]) => if (it.hasNext) it.next() else 
Array.empty, p)
+  val res = sc.runJob(childRDD, (it: Iterator[Array[Byte]]) => 
it.take(left).toArray, p)
 
   res.foreach { r =>
-decodeUnsafeRows(r.asInstanceOf[Array[Byte]]).foreach(buf.+=)
+buf ++= decodeUnsafeRows(r.asInstanceOf[Array[Byte]]).take(n - 
buf.size)
--- End diff --

This is not really necessary for this change. We already limit the buffer 
size at the end of this method. Why do it here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...

2016-08-25 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/14573#discussion_r76227554
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -311,30 +311,32 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
 val buf = new ArrayBuffer[InternalRow]
 val totalParts = childRDD.partitions.length
 var partsScanned = 0
+
 while (buf.size < n && partsScanned < totalParts) {
   // The number of partitions to try in this iteration. It is ok for 
this number to be
   // greater than totalParts because we actually cap it at totalParts 
in runJob.
   var numPartsToTry = 1L
   if (partsScanned > 0) {
-// If we didn't find any rows after the first iteration, just try 
all partitions next.
-// Otherwise, interpolate the number of partitions we need to try, 
but overestimate it
-// by 50%.
-if (buf.size == 0) {
-  numPartsToTry = totalParts - 1
+// If we didn't find any rows after the previous iteration, 
quadruple and retry.
+// Otherwise, interpolate the number of partitions we need to try, 
but overestimate
+// it by 50%. We also cap the estimation in the end.
+val takeRampUpRate = sqlContext.conf.takeRampUpRate
+if (buf.isEmpty) {
+  numPartsToTry = partsScanned * takeRampUpRate
 } else {
-  numPartsToTry = (1.5 * n * partsScanned / buf.size).toInt
+  // the left side of max is >=1 whenever partsScanned >= 2
+  numPartsToTry = Math.max((1.5 * n * partsScanned / 
buf.size).toInt - partsScanned, 1)
+  numPartsToTry = Math.min(numPartsToTry, partsScanned * 
takeRampUpRate)
 }
   }
-  numPartsToTry = math.max(0, numPartsToTry)  // guard against 
negative num of partitions
 
   val left = n - buf.size
   val p = partsScanned.until(math.min(partsScanned + numPartsToTry, 
totalParts).toInt)
   val sc = sqlContext.sparkContext
-  val res = sc.runJob(childRDD,
-(it: Iterator[Array[Byte]]) => if (it.hasNext) it.next() else 
Array.empty, p)
+  val res = sc.runJob(childRDD, (it: Iterator[Array[Byte]]) => 
it.take(left).toArray, p)
--- End diff --

Why change this? The childRDD is an `RDD[Array[Byte]]` it returns an 
iterator with exactly one element (a byte array containing the entire array).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...

2016-08-25 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/14573#discussion_r76217738
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1296,6 +1296,7 @@ abstract class RDD[T: ClassTag](
* an exception if called on an RDD of `Nothing` or `Null`.
*/
   def take(num: Int): Array[T] = withScope {
+val takeRampUpRate = conf.getInt("spark.rdd.take.rampUpRate", 4)
--- End diff --

Should we protect a user against themselves, and prevent a rampUpRate < 1?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...

2016-08-24 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/14573#discussion_r76177989
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -116,6 +116,13 @@ object SQLConf {
 .longConf
 .createWithDefault(10L * 1024 * 1024)
 
+  val TAKE_RAMP_UP_RATE = SQLConfigBuilder("spark.sql.take.rampUpRate")
+.doc("Minimal increase rate in number of partitions between attempts 
when executing a take " +
+  "on a query. Higher values lead to more partitions read. Lower 
values might lead to " +
+  "longer execution times as more jobs will be run")
+.intConf
--- End diff --

i'd make this an internal config


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...

2016-08-10 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14573#discussion_r74239358
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -333,15 +333,14 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
 // If we didn't find any rows after the first iteration, just try 
all partitions next.
 // Otherwise, interpolate the number of partitions we need to try, 
but overestimate it
 // by 50%.
-if (buf.size == 0) {
-  numPartsToTry = totalParts - 1
+if (buf.isEmpty) {
--- End diff --

Yeah, this is actually taken from `RDD.take()`. Your approach is closer to 
what that does, but I think we should actually make them operate as similarly 
as possible unless there's a reason not to. Could you compare and sync them up 
further?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...

2016-08-09 Thread robert3005
GitHub user robert3005 opened a pull request:

https://github.com/apache/spark/pull/14573

[SPARK-16984][SQL] don't try whole dataset immediately when first partition 
doesn't have…

## What changes were proposed in this pull request?

Try increase number of partitions to try so we don't revert to all.

## How was this patch tested?

Empirically. This is common case optimization.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robert3005/spark robertk/execute-take-backoff

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/14573.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #14573


commit e00e4d731f1ab98fae05ca608cbde3153dcc3418
Author: Robert Kruszewski 
Date:   2016-08-08T17:09:14Z

don't try whole dataset immediately when first partition doesn't have any 
data




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org