[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/11865 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/11865#issuecomment-212139556 Merging in master. Thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user nezihyigitbasi commented on the pull request: https://github.com/apache/spark/pull/11865#issuecomment-212120928 @rxin tests look OK, do you have any other comments? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11865#issuecomment-212089666 **[Test build #2829 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2829/consoleFull)** for PR 11865 at commit [`5a12586`](https://github.com/apache/spark/commit/5a125864785bfba8cfe80432fe7c162f62a6b769). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `trait PartitionCoalescer ` * `class PartitionGroup(val prefLoc: Option[String] = None) ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11865#issuecomment-212033947 **[Test build #2829 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2829/consoleFull)** for PR 11865 at commit [`5a12586`](https://github.com/apache/spark/commit/5a125864785bfba8cfe80432fe7c162f62a6b769). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user nezihyigitbasi commented on the pull request: https://github.com/apache/spark/pull/11865#issuecomment-211995362 @rxin somehow jenkins didn't start the tests after my last push, can you please kick it off? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/11865#issuecomment-211711849 Yup go for it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user nezihyigitbasi commented on the pull request: https://github.com/apache/spark/pull/11865#issuecomment-211703836 Mima tests failing, I guess we can exclude them all. What do you think? ```Scala [info] spark-core: found 3 potential binary incompatibilities while checking against org.apache.spark:spark-core_2.11:1.6.0 (filtered 1299) [error] * method coalesce(Int,Boolean,scala.math.Ordering)org.apache.spark.rdd.RDD in class org.apache.spark.rdd.RDD does not have a correspondent in current version [error]filter with: ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.rdd.RDD.coalesce") [error] * class org.apache.spark.rdd.PartitionCoalescer#LocationIterator does not have a correspondent in current version [error]filter with: ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.rdd.PartitionCoalescer$LocationIterator") [error] * declaration of class org.apache.spark.rdd.PartitionCoalescer is interface org.apache.spark.rdd.PartitionCoalescer in current version; changing class to interface breaks client code [error]filter with: ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.rdd.PartitionCoalescer") ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11865#issuecomment-211655206 **[Test build #2818 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2818/consoleFull)** for PR 11865 at commit [`9d91f77`](https://github.com/apache/spark/commit/9d91f7742bac035c5dde1e1d7450adeb2302fa6a). * This patch **fails MiMa tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `trait PartitionCoalescer ` * `class PartitionGroup(val prefLoc: Option[String] = None) ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11865#issuecomment-211651846 **[Test build #2818 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2818/consoleFull)** for PR 11865 at commit [`9d91f77`](https://github.com/apache/spark/commit/9d91f7742bac035c5dde1e1d7450adeb2302fa6a). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/11865#issuecomment-211651557 Thanks - let's wait for Jenkins. Can you update the title / description of the pull request? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user nezihyigitbasi commented on the pull request: https://github.com/apache/spark/pull/11865#issuecomment-211643260 @rxin thanks, comments addressed. Renamed that file to use lower-case too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/11865#issuecomment-211636424 LGTM other than that couple minor feedback. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/11865#discussion_r60151689 --- Diff: core/src/main/scala/org/apache/spark/rdd/CoalescedRDDPublic.scala --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import scala.collection.mutable + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.Partition + +/** + * A PartitionCoalescer defines how to coalesce the partitions of a given RDD. + */ +@DeveloperApi +trait PartitionCoalescer { + + /** + * Coalesce the partitions of the given RDD. + * + * @param maxPartitions the maximum number of partitions to have after coalescing + * @param parent the parent RDD whose partitions to coalesce + * @return an array of [[PartitionGroup]]s, where each element is itself an array of + * [[Partition]]s and represents a partition after coalescing is performed. + */ + def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] +} + +/** + * A group of [[Partition]]s + * @param prefLoc preferred location for the partition group + */ +@DeveloperApi +class PartitionGroup(val prefLoc: Option[String] = None) { + var arr = mutable.ArrayBuffer[Partition]() --- End diff -- arr -> partitions also can this be a val? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/11865#discussion_r60151643 --- Diff: core/src/main/scala/org/apache/spark/rdd/CoalescedRDDPublic.scala --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import scala.collection.mutable + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.Partition + +/** + * A PartitionCoalescer defines how to coalesce the partitions of a given RDD. + */ +@DeveloperApi +trait PartitionCoalescer { + + /** + * Coalesce the partitions of the given RDD. + * + * @param maxPartitions the maximum number of partitions to have after coalescing + * @param parent the parent RDD whose partitions to coalesce + * @return an array of [[PartitionGroup]]s, where each element is itself an array of + * [[Partition]]s and represents a partition after coalescing is performed. + */ + def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] +} + +/** + * A group of [[Partition]]s --- End diff -- here too --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/11865#discussion_r60151637 --- Diff: core/src/main/scala/org/apache/spark/rdd/CoalescedRDDPublic.scala --- @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import scala.collection.mutable + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.Partition + +/** + * A PartitionCoalescer defines how to coalesce the partitions of a given RDD. --- End diff -- We would need to add the label here. something like `::DeveloperApi`. look up other classes to confirm. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/11865#discussion_r60151579 --- Diff: core/src/main/scala/org/apache/spark/rdd/CoalescedRDDPublic.scala --- @@ -0,0 +1,47 @@ +/* --- End diff -- Yea actually many of the naming in spark core is wrong but we never bothered changing them. Usually SomeWord.scala means there is a class named SomeWord. The scala style guide actually recommends when there are multiple classes that are part of a coherent group, start with lowercase (similar to a lot of c++ naming guides). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user nezihyigitbasi commented on the pull request: https://github.com/apache/spark/pull/11865#issuecomment-211609869 @rxin thanks for the comments. Updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user nezihyigitbasi commented on a diff in the pull request: https://github.com/apache/spark/pull/11865#discussion_r60145840 --- Diff: core/src/main/scala/org/apache/spark/rdd/CoalescedRDDPublic.scala --- @@ -0,0 +1,47 @@ +/* --- End diff -- I tried to follow the naming convention for other classes, let me know if you still want lower-case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/11865#issuecomment-211594064 Also can you tag all these apis as DeveloperApi? Thanks.' --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/11865#discussion_r60138999 --- Diff: core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala --- @@ -121,6 +121,21 @@ private[spark] class CoalescedRDD[T: ClassTag]( } /** + * A PartitionCoalescer defines how to coalesce the partitions of a given RDD. + */ +trait PartitionCoalescer { + + /** + * Coalesce the partitions of the given RDD. + * + * @param maxPartitions the maximum number of partitions to have after coalescing + * @param parent the parent RDD whose partitions to coalesce + * @return a grouping of the parent RDD's partitions + */ + def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] --- End diff -- btw i'd move the public apis maybe to a separate file (e.g. coalesce-public.scala) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/11865#discussion_r60138955 --- Diff: core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala --- @@ -121,6 +121,21 @@ private[spark] class CoalescedRDD[T: ClassTag]( } /** + * A PartitionCoalescer defines how to coalesce the partitions of a given RDD. + */ +trait PartitionCoalescer { + + /** + * Coalesce the partitions of the given RDD. + * + * @param maxPartitions the maximum number of partitions to have after coalescing + * @param parent the parent RDD whose partitions to coalesce + * @return a grouping of the parent RDD's partitions + */ + def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] --- End diff -- can you document a little bit more the semantics of the return? i think it's better to say explicitly each element in the returned array represents one rdd partition post-coalescing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/11865#discussion_r60138638 --- Diff: core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala --- @@ -337,19 +353,23 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: * load balanced and grouped by locality * @return array of partition groups */ - def run(): Array[PartitionGroup] = { -setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins) -throwBalls() // assign partitions (balls) to each group (bins) + def coalesce(maxPartitions: Int, prev: RDD[_]): Array[PartitionGroup] = { +setupGroups(math.min(prev.partitions.length, maxPartitions), prev) // setup the groups (bins) +throwBalls(maxPartitions, prev, balanceSlack) // assign partitions (balls) to each group (bins) getPartitions } } -private case class PartitionGroup(prefLoc: Option[String] = None) { +/** + * A group of [[Partition]]s + * @param prefLoc preferred location for the partition group + */ +case class PartitionGroup(prefLoc: Option[String] = None) { --- End diff -- 1. move this closer to PartitionCoalescer 2. make it a normal class (case class is difficult to maintain backward compatibility due the pattern matching) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/11865#discussion_r60138636 --- Diff: core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala --- @@ -337,19 +353,23 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: * load balanced and grouped by locality * @return array of partition groups */ - def run(): Array[PartitionGroup] = { -setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins) -throwBalls() // assign partitions (balls) to each group (bins) + def coalesce(maxPartitions: Int, prev: RDD[_]): Array[PartitionGroup] = { +setupGroups(math.min(prev.partitions.length, maxPartitions), prev) // setup the groups (bins) +throwBalls(maxPartitions, prev, balanceSlack) // assign partitions (balls) to each group (bins) getPartitions } } -private case class PartitionGroup(prefLoc: Option[String] = None) { +/** + * A group of [[Partition]]s + * @param prefLoc preferred location for the partition group + */ +case class PartitionGroup(prefLoc: Option[String] = None) { var arr = mutable.ArrayBuffer[Partition]() def size: Int = arr.size --- End diff -- size -> numPartitions? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/11865#discussion_r60138483 --- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala --- @@ -971,5 +1013,60 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { assertFails { sc.parallelize(1 to 100) } assertFails { sc.textFile("/nonexistent-path") } } +} +/** + * Coalesces partitions based on their size assuming that the parent RDD is a [[HadoopRDD]]. + * Took this class out of the test suite to prevent "Task not serializable" exceptions. + */ +class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Serializable { + override def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] = { +val partitions: Array[Partition] = parent.asInstanceOf[HadoopRDD[Any, Any]].getPartitions +val groups = ArrayBuffer[PartitionGroup]() +var currentGroup = PartitionGroup() +var currentSum = 0L +var totalSum = 0L +var index = 0 + +// sort partitions based on the size of the corresponding input splits +partitions.sortWith((partition1, partition2) => { + val partition1Size = partition1.asInstanceOf[HadoopPartition].inputSplit.value.getLength + val partition2Size = partition2.asInstanceOf[HadoopPartition].inputSplit.value.getLength + partition1Size < partition2Size +}) + +def updateGroups: Unit = { --- End diff -- this should have parentheses since it has side effect --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user nezihyigitbasi commented on the pull request: https://github.com/apache/spark/pull/11865#issuecomment-211592037 @rxin rebased & addressed comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user nezihyigitbasi commented on a diff in the pull request: https://github.com/apache/spark/pull/11865#discussion_r60137892 --- Diff: core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala --- @@ -70,21 +70,24 @@ private[spark] case class CoalescedRDDPartition( * parent partitions * @param prev RDD to be coalesced * @param maxPartitions number of desired partitions in the coalesced RDD (must be positive) - * @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance + * @param partitionCoalescer [[PartitionCoalescer]] implementation to use for coalescing */ private[spark] class CoalescedRDD[T: ClassTag]( @transient var prev: RDD[T], maxPartitions: Int, -balanceSlack: Double = 0.10) +partitionCoalescer: Option[PartitionCoalescer] = None) extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies require(maxPartitions > 0 || maxPartitions == prev.partitions.length, s"Number of partitions ($maxPartitions) must be positive.") + // used to trade-off balance and locality. 1.0 is all locality, 0 is all balance + val balanceSlack: Double = 0.10 --- End diff -- no we don't. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user nezihyigitbasi commented on the pull request: https://github.com/apache/spark/pull/11865#issuecomment-211568597 If the API changes look OK to you, then I don't have anything else before this is not WIP. I only need to resolve conflicts with the master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user rxin commented on the pull request: https://github.com/apache/spark/pull/11865#issuecomment-211566058 The API change looks alright. I'd separate the dataset changes from this one. Are there other things you want to do before this is not WIP? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/11865#discussion_r60129541 --- Diff: core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala --- @@ -70,21 +70,24 @@ private[spark] case class CoalescedRDDPartition( * parent partitions * @param prev RDD to be coalesced * @param maxPartitions number of desired partitions in the coalesced RDD (must be positive) - * @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance + * @param partitionCoalescer [[PartitionCoalescer]] implementation to use for coalescing */ private[spark] class CoalescedRDD[T: ClassTag]( @transient var prev: RDD[T], maxPartitions: Int, -balanceSlack: Double = 0.10) +partitionCoalescer: Option[PartitionCoalescer] = None) extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies require(maxPartitions > 0 || maxPartitions == prev.partitions.length, s"Number of partitions ($maxPartitions) must be positive.") + // used to trade-off balance and locality. 1.0 is all locality, 0 is all balance + val balanceSlack: Double = 0.10 --- End diff -- we don't need a val here do we? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user nezihyigitbasi commented on the pull request: https://github.com/apache/spark/pull/11865#issuecomment-211558139 @rxin any plans to review this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user nezihyigitbasi commented on the pull request: https://github.com/apache/spark/pull/11865#issuecomment-205390556 @hbhanawat once we figure out the details I think it makes sense to do that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user hbhanawat commented on the pull request: https://github.com/apache/spark/pull/11865#issuecomment-205145273 @nezihyigitbasi, do you plan to add something similar for DF/DS API? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user nezihyigitbasi commented on the pull request: https://github.com/apache/spark/pull/11865#issuecomment-204082776 @rxin any comments? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user nezihyigitbasi commented on a diff in the pull request: https://github.com/apache/spark/pull/11865#discussion_r56891755 --- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala --- @@ -971,5 +1013,60 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { assertFails { sc.parallelize(1 to 100) } assertFails { sc.textFile("/nonexistent-path") } } +} +/** + * Coalesces partitions based on their size assuming that the parent RDD is a [[HadoopRDD]]. + * Took this class out of the test suite to prevent "Task not serializable" exceptions. + */ +class SizeBasedCoalescer(val maxSize: Int) extends PartitionCoalescer with Serializable { + override def coalesce(maxPartitions: Int, parent: RDD[_]): Array[PartitionGroup] = { +val partitions: Array[Partition] = parent.asInstanceOf[HadoopRDD[Any, Any]].getPartitions +val groups = ArrayBuffer[PartitionGroup]() +var currentGroup = PartitionGroup() +var currentSum = 0L +var totalSum = 0L +var index = 0 + +// sort partitions based on the size of the corresponding input splits +partitions.sortWith((partition1, partition2) => { + val partition1Size = partition1.asInstanceOf[HadoopPartition].inputSplit.value.getLength --- End diff -- Since `HadoopPartition` is not public a user who wants to implement this outside of Spark can have some trouble. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11865#issuecomment-199463105 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-14042][CORE] Add custom coalescer suppo...
GitHub user nezihyigitbasi opened a pull request: https://github.com/apache/spark/pull/11865 [SPARK-14042][CORE] Add custom coalescer support [WIP] ## What changes were proposed in this pull request? This is a work-in-progress PR (to gather feedback) about a new feature that adds support for specifying a custom coalescer to the `coalesce()` method. Currently I have only added this feature to the `RDD` interface, and once we sort out the details we can proceed with adding this feature to the other APIs (`Dataset` etc.) ## How was this patch tested? Added a unit test for this functionality. \cc @rxin (per our discussion on the mailing list) You can merge this pull request into a Git repository by running: $ git pull https://github.com/nezihyigitbasi/spark custom_coalesce_policy Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/11865.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #11865 commit adc12e6ae144b7c33b7512a28bd88c99412ad683 Author: Nezih Yigitbasi Date: 2016-03-09T18:38:00Z Add custom coalescer support --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org