[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/14573 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...
Github user robert3005 commented on a diff in the pull request: https://github.com/apache/spark/pull/14573#discussion_r77336634 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -116,6 +116,14 @@ object SQLConf { .longConf .createWithDefault(10L * 1024 * 1024) + val LIMIT_SCALE_UP_FACTOR = SQLConfigBuilder("spark.sql.limit.scaleUpFactor") +.internal() +.doc("Minimal increase rate in number of partitions between attempts when executing a take " + + "on a query. Higher values lead to more partitions read. Lower values might lead to " + + "longer execution times as more jobs will be run") --- End diff -- less tasks per job -> potentially more jobs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...
Github user ash211 commented on a diff in the pull request: https://github.com/apache/spark/pull/14573#discussion_r76717713 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -116,6 +116,14 @@ object SQLConf { .longConf .createWithDefault(10L * 1024 * 1024) + val LIMIT_SCALE_UP_FACTOR = SQLConfigBuilder("spark.sql.limit.scaleUpFactor") +.internal() +.doc("Minimal increase rate in number of partitions between attempts when executing a take " + + "on a query. Higher values lead to more partitions read. Lower values might lead to " + + "longer execution times as more jobs will be run") --- End diff -- is this more tasks being run, not more jobs? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...
Github user robert3005 commented on a diff in the pull request: https://github.com/apache/spark/pull/14573#discussion_r76245512 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1296,6 +1296,7 @@ abstract class RDD[T: ClassTag]( * an exception if called on an RDD of `Nothing` or `Null`. */ def take(num: Int): Array[T] = withScope { +val takeRampUpRate = conf.getInt("spark.rdd.take.rampUpRate", 4) --- End diff -- Any pointers on what's the best way to do this? I haven't seen anything that's in core and sql. I can not introduce sqlconf option and just read sparkconf but then you lose the type safety. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/14573#discussion_r76244574 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1296,6 +1296,7 @@ abstract class RDD[T: ClassTag]( * an exception if called on an RDD of `Nothing` or `Null`. */ def take(num: Int): Array[T] = withScope { +val takeRampUpRate = conf.getInt("spark.rdd.take.rampUpRate", 4) --- End diff -- Hmmm... That is a fair point. Can we share a config? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...
Github user robert3005 commented on a diff in the pull request: https://github.com/apache/spark/pull/14573#discussion_r76230260 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1296,6 +1296,7 @@ abstract class RDD[T: ClassTag]( * an exception if called on an RDD of `Nothing` or `Null`. */ def take(num: Int): Array[T] = withScope { +val takeRampUpRate = conf.getInt("spark.rdd.take.rampUpRate", 4) --- End diff -- One thing is Spark SQL the other is in Core. Where should the final config live? I thought you'd want symmetry between those two functions and there doesn't seem to be config outside of sparkconf that is available in both places. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...
Github user robert3005 commented on a diff in the pull request: https://github.com/apache/spark/pull/14573#discussion_r76229774 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -311,30 +311,32 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ val buf = new ArrayBuffer[InternalRow] val totalParts = childRDD.partitions.length var partsScanned = 0 + while (buf.size < n && partsScanned < totalParts) { // The number of partitions to try in this iteration. It is ok for this number to be // greater than totalParts because we actually cap it at totalParts in runJob. var numPartsToTry = 1L if (partsScanned > 0) { -// If we didn't find any rows after the first iteration, just try all partitions next. -// Otherwise, interpolate the number of partitions we need to try, but overestimate it -// by 50%. -if (buf.size == 0) { - numPartsToTry = totalParts - 1 +// If we didn't find any rows after the previous iteration, quadruple and retry. +// Otherwise, interpolate the number of partitions we need to try, but overestimate +// it by 50%. We also cap the estimation in the end. +val takeRampUpRate = sqlContext.conf.takeRampUpRate +if (buf.isEmpty) { + numPartsToTry = partsScanned * takeRampUpRate } else { - numPartsToTry = (1.5 * n * partsScanned / buf.size).toInt + // the left side of max is >=1 whenever partsScanned >= 2 + numPartsToTry = Math.max((1.5 * n * partsScanned / buf.size).toInt - partsScanned, 1) + numPartsToTry = Math.min(numPartsToTry, partsScanned * takeRampUpRate) } } - numPartsToTry = math.max(0, numPartsToTry) // guard against negative num of partitions val left = n - buf.size val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) val sc = sqlContext.sparkContext - val res = sc.runJob(childRDD, -(it: Iterator[Array[Byte]]) => if (it.hasNext) it.next() else Array.empty, p) + val res = sc.runJob(childRDD, (it: Iterator[Array[Byte]]) => it.take(left).toArray, p) --- End diff -- Yes, this is wrong. Running tests on a fixed version now to confirm I haven't missed anything. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/14573#discussion_r76228058 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1296,6 +1296,7 @@ abstract class RDD[T: ClassTag]( * an exception if called on an RDD of `Nothing` or `Null`. */ def take(num: Int): Array[T] = withScope { +val takeRampUpRate = conf.getInt("spark.rdd.take.rampUpRate", 4) --- End diff -- Also would someone really want to set this differently for RDD vs other things? Trying to avoid proliferation of config. Would someone reasonably ever tune this any way? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/14573#discussion_r76227923 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -311,30 +311,32 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ val buf = new ArrayBuffer[InternalRow] val totalParts = childRDD.partitions.length var partsScanned = 0 + while (buf.size < n && partsScanned < totalParts) { // The number of partitions to try in this iteration. It is ok for this number to be // greater than totalParts because we actually cap it at totalParts in runJob. var numPartsToTry = 1L if (partsScanned > 0) { -// If we didn't find any rows after the first iteration, just try all partitions next. -// Otherwise, interpolate the number of partitions we need to try, but overestimate it -// by 50%. -if (buf.size == 0) { - numPartsToTry = totalParts - 1 +// If we didn't find any rows after the previous iteration, quadruple and retry. +// Otherwise, interpolate the number of partitions we need to try, but overestimate +// it by 50%. We also cap the estimation in the end. +val takeRampUpRate = sqlContext.conf.takeRampUpRate +if (buf.isEmpty) { + numPartsToTry = partsScanned * takeRampUpRate } else { - numPartsToTry = (1.5 * n * partsScanned / buf.size).toInt + // the left side of max is >=1 whenever partsScanned >= 2 + numPartsToTry = Math.max((1.5 * n * partsScanned / buf.size).toInt - partsScanned, 1) + numPartsToTry = Math.min(numPartsToTry, partsScanned * takeRampUpRate) } } - numPartsToTry = math.max(0, numPartsToTry) // guard against negative num of partitions val left = n - buf.size val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) val sc = sqlContext.sparkContext - val res = sc.runJob(childRDD, -(it: Iterator[Array[Byte]]) => if (it.hasNext) it.next() else Array.empty, p) + val res = sc.runJob(childRDD, (it: Iterator[Array[Byte]]) => it.take(left).toArray, p) res.foreach { r => -decodeUnsafeRows(r.asInstanceOf[Array[Byte]]).foreach(buf.+=) +buf ++= decodeUnsafeRows(r.asInstanceOf[Array[Byte]]).take(n - buf.size) --- End diff -- This is not really necessary for this change. We already limit the buffer size at the end of this method. Why do it here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/14573#discussion_r76227554 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -311,30 +311,32 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ val buf = new ArrayBuffer[InternalRow] val totalParts = childRDD.partitions.length var partsScanned = 0 + while (buf.size < n && partsScanned < totalParts) { // The number of partitions to try in this iteration. It is ok for this number to be // greater than totalParts because we actually cap it at totalParts in runJob. var numPartsToTry = 1L if (partsScanned > 0) { -// If we didn't find any rows after the first iteration, just try all partitions next. -// Otherwise, interpolate the number of partitions we need to try, but overestimate it -// by 50%. -if (buf.size == 0) { - numPartsToTry = totalParts - 1 +// If we didn't find any rows after the previous iteration, quadruple and retry. +// Otherwise, interpolate the number of partitions we need to try, but overestimate +// it by 50%. We also cap the estimation in the end. +val takeRampUpRate = sqlContext.conf.takeRampUpRate +if (buf.isEmpty) { + numPartsToTry = partsScanned * takeRampUpRate } else { - numPartsToTry = (1.5 * n * partsScanned / buf.size).toInt + // the left side of max is >=1 whenever partsScanned >= 2 + numPartsToTry = Math.max((1.5 * n * partsScanned / buf.size).toInt - partsScanned, 1) + numPartsToTry = Math.min(numPartsToTry, partsScanned * takeRampUpRate) } } - numPartsToTry = math.max(0, numPartsToTry) // guard against negative num of partitions val left = n - buf.size val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt) val sc = sqlContext.sparkContext - val res = sc.runJob(childRDD, -(it: Iterator[Array[Byte]]) => if (it.hasNext) it.next() else Array.empty, p) + val res = sc.runJob(childRDD, (it: Iterator[Array[Byte]]) => it.take(left).toArray, p) --- End diff -- Why change this? The childRDD is an `RDD[Array[Byte]]` it returns an iterator with exactly one element (a byte array containing the entire array). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...
Github user hvanhovell commented on a diff in the pull request: https://github.com/apache/spark/pull/14573#discussion_r76217738 --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala --- @@ -1296,6 +1296,7 @@ abstract class RDD[T: ClassTag]( * an exception if called on an RDD of `Nothing` or `Null`. */ def take(num: Int): Array[T] = withScope { +val takeRampUpRate = conf.getInt("spark.rdd.take.rampUpRate", 4) --- End diff -- Should we protect a user against themselves, and prevent a rampUpRate < 1? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/14573#discussion_r76177989 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -116,6 +116,13 @@ object SQLConf { .longConf .createWithDefault(10L * 1024 * 1024) + val TAKE_RAMP_UP_RATE = SQLConfigBuilder("spark.sql.take.rampUpRate") +.doc("Minimal increase rate in number of partitions between attempts when executing a take " + + "on a query. Higher values lead to more partitions read. Lower values might lead to " + + "longer execution times as more jobs will be run") +.intConf --- End diff -- i'd make this an internal config --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/14573#discussion_r74239358 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -333,15 +333,14 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ // If we didn't find any rows after the first iteration, just try all partitions next. // Otherwise, interpolate the number of partitions we need to try, but overestimate it // by 50%. -if (buf.size == 0) { - numPartsToTry = totalParts - 1 +if (buf.isEmpty) { --- End diff -- Yeah, this is actually taken from `RDD.take()`. Your approach is closer to what that does, but I think we should actually make them operate as similarly as possible unless there's a reason not to. Could you compare and sync them up further? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #14573: [SPARK-16984][SQL] don't try whole dataset immedi...
GitHub user robert3005 opened a pull request: https://github.com/apache/spark/pull/14573 [SPARK-16984][SQL] don't try whole dataset immediately when first partition doesn't have⦠## What changes were proposed in this pull request? Try increase number of partitions to try so we don't revert to all. ## How was this patch tested? Empirically. This is common case optimization. You can merge this pull request into a Git repository by running: $ git pull https://github.com/robert3005/spark robertk/execute-take-backoff Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/14573.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #14573 commit e00e4d731f1ab98fae05ca608cbde3153dcc3418 Author: Robert KruszewskiDate: 2016-08-08T17:09:14Z don't try whole dataset immediately when first partition doesn't have any data --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org