[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22010


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-26 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r220674969
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala 
---
@@ -19,7 +19,7 @@ package org.apache.spark.rdd
 
 import scala.reflect.ClassTag
 
-import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.{Partition, Partitioner, TaskContext}
--- End diff --

Thanks! I'll fix that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-26 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r220674846
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -42,7 +42,8 @@ import org.apache.spark.partial.GroupedCountEvaluator
 import org.apache.spark.partial.PartialResult
 import org.apache.spark.storage.{RDDBlockId, StorageLevel}
 import org.apache.spark.util.{BoundedPriorityQueue, Utils}
-import org.apache.spark.util.collection.{OpenHashMap, Utils => 
collectionUtils}
+import org.apache.spark.util.collection.{ExternalAppendOnlyMap, 
OpenHashMap,
+  Utils => collectionUtils}
--- End diff --

yeah but we generally break anyways based on the rest of the code base.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-26 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r220674552
  
--- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---
@@ -95,6 +95,18 @@ class RDDSuite extends SparkFunSuite with 
SharedSparkContext {
 assert(!deserial.toString().isEmpty())
   }
 
+  test("distinct with known partitioner preserves partitioning") {
+val rdd = sc.parallelize(1.to(100), 10).map(x => (x % 10, x % 
10)).sortByKey()
+val initialPartitioner = rdd.partitioner
+val distinctRdd = rdd.distinct()
+val resultingPartitioner = distinctRdd.partitioner
+assert(initialPartitioner === resultingPartitioner)
+val distinctRddDifferent = rdd.distinct(5)
+val distinctRddDifferentPartitioner = distinctRddDifferent.partitioner
+assert(initialPartitioner != distinctRddDifferentPartitioner)
+assert(distinctRdd.collect().sorted === 
distinctRddDifferent.collect().sorted)
--- End diff --

We could, but we don't need to.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-26 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r22067
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +397,20 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = 
{
+  // Create an instance of external append only map which ignores 
values.
+  val map = new ExternalAppendOnlyMap[T, Null, Null](
+createCombiner = value => null,
+mergeValue = (a, b) => a,
+mergeCombiners = (a, b) => a)
--- End diff --

scratch that - does not matter


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-26 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r220672896
  
--- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---
@@ -95,6 +95,18 @@ class RDDSuite extends SparkFunSuite with 
SharedSparkContext {
 assert(!deserial.toString().isEmpty())
   }
 
+  test("distinct with known partitioner preserves partitioning") {
+val rdd = sc.parallelize(1.to(100), 10).map(x => (x % 10, x % 
10)).sortByKey()
+val initialPartitioner = rdd.partitioner
+val distinctRdd = rdd.distinct()
+val resultingPartitioner = distinctRdd.partitioner
+assert(initialPartitioner === resultingPartitioner)
+val distinctRddDifferent = rdd.distinct(5)
+val distinctRddDifferentPartitioner = distinctRddDifferent.partitioner
+assert(initialPartitioner != distinctRddDifferentPartitioner)
+assert(distinctRdd.collect().sorted === 
distinctRddDifferent.collect().sorted)
--- End diff --

We could also check if the number of stages is what we expect.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-26 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r220672579
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +397,20 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = 
{
+  // Create an instance of external append only map which ignores 
values.
+  val map = new ExternalAppendOnlyMap[T, Null, Null](
+createCombiner = value => null,
+mergeValue = (a, b) => a,
+mergeCombiners = (a, b) => a)
--- End diff --

nit: clean them ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r220399074
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala 
---
@@ -19,7 +19,7 @@ package org.apache.spark.rdd
 
 import scala.reflect.ClassTag
 
-import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.{Partition, Partitioner, TaskContext}
--- End diff --

nit: unnecessary change


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r220399123
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -42,7 +42,8 @@ import org.apache.spark.partial.GroupedCountEvaluator
 import org.apache.spark.partial.PartialResult
 import org.apache.spark.storage.{RDDBlockId, StorageLevel}
 import org.apache.spark.util.{BoundedPriorityQueue, Utils}
-import org.apache.spark.util.collection.{OpenHashMap, Utils => 
collectionUtils}
+import org.apache.spark.util.collection.{ExternalAppendOnlyMap, 
OpenHashMap,
+  Utils => collectionUtils}
--- End diff --

nit: AFAIK we don't have length limit for import


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-19 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r218946996
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala 
---
@@ -35,16 +35,24 @@ import org.apache.spark.{Partition, TaskContext}
  * @param isOrderSensitive whether or not the function is order-sensitive. 
If it's order
  * sensitive, it may return totally different 
result when the input order
  * is changed. Mostly stateful functions are 
order-sensitive.
+ * @param knownPartitioner If the result has a known partitioner.
  */
 private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
 var prev: RDD[T],
 f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, 
partition index, iterator)
 preservesPartitioning: Boolean = false,
 isFromBarrier: Boolean = false,
-isOrderSensitive: Boolean = false)
+isOrderSensitive: Boolean = false,
+knownPartitioner: Option[Partitioner] = None)
   extends RDD[U](prev) {
 
-  override val partitioner = if (preservesPartitioning) 
firstParent[T].partitioner else None
+  override val partitioner = {
+if (preservesPartitioning) {
+  firstParent[T].partitioner
+} else {
+  knownPartitioner
+}
+  }
--- End diff --

yes


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-19 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r218946895
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +396,26 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+partitioner match {
+  case Some(p) if numPartitions == partitions.length =>
+def key(x: T): (T, Null) = (x, null)
+val cleanKey = sc.clean(key _)
+val keyed = new MapPartitionsRDD[(T, Null), T](
+  this,
+  (context, pid, iter) => iter.map(cleanKey),
+  knownPartitioner = Some(new WrappedPartitioner(p)))
+val duplicatesRemoved = keyed.reduceByKey((x, y) => x)
--- End diff --

Yes, would use right partitioner in this case


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-19 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r218918701
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +396,26 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+partitioner match {
+  case Some(p) if numPartitions == partitions.length =>
+def key(x: T): (T, Null) = (x, null)
+val cleanKey = sc.clean(key _)
+val keyed = new MapPartitionsRDD[(T, Null), T](
+  this,
+  (context, pid, iter) => iter.map(cleanKey),
+  knownPartitioner = Some(new WrappedPartitioner(p)))
+val duplicatesRemoved = keyed.reduceByKey((x, y) => x)
--- End diff --

So I _think_ it is partitioner of input RDD if known partitioner otherwise 
hash partitioner of the default parallelism. Yes?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-19 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r218917483
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala 
---
@@ -35,16 +35,24 @@ import org.apache.spark.{Partition, TaskContext}
  * @param isOrderSensitive whether or not the function is order-sensitive. 
If it's order
  * sensitive, it may return totally different 
result when the input order
  * is changed. Mostly stateful functions are 
order-sensitive.
+ * @param knownPartitioner If the result has a known partitioner.
  */
 private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
 var prev: RDD[T],
 f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, 
partition index, iterator)
 preservesPartitioning: Boolean = false,
 isFromBarrier: Boolean = false,
-isOrderSensitive: Boolean = false)
+isOrderSensitive: Boolean = false,
+knownPartitioner: Option[Partitioner] = None)
   extends RDD[U](prev) {
 
-  override val partitioner = if (preservesPartitioning) 
firstParent[T].partitioner else None
+  override val partitioner = {
+if (preservesPartitioning) {
+  firstParent[T].partitioner
+} else {
+  knownPartitioner
+}
+  }
--- End diff --

I mean yes we can sub-class just as easily -- is that what you mean?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-15 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r217901185
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +396,26 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+partitioner match {
+  case Some(p) if numPartitions == partitions.length =>
+def key(x: T): (T, Null) = (x, null)
+val cleanKey = sc.clean(key _)
+val keyed = new MapPartitionsRDD[(T, Null), T](
+  this,
+  (context, pid, iter) => iter.map(cleanKey),
+  knownPartitioner = Some(new WrappedPartitioner(p)))
+val duplicatesRemoved = keyed.reduceByKey((x, y) => x)
--- End diff --

Ah yes, no partitioner specified => use parent's partitioner.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-15 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r217901179
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala 
---
@@ -35,16 +35,24 @@ import org.apache.spark.{Partition, TaskContext}
  * @param isOrderSensitive whether or not the function is order-sensitive. 
If it's order
  * sensitive, it may return totally different 
result when the input order
  * is changed. Mostly stateful functions are 
order-sensitive.
+ * @param knownPartitioner If the result has a known partitioner.
  */
 private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
 var prev: RDD[T],
 f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, 
partition index, iterator)
 preservesPartitioning: Boolean = false,
 isFromBarrier: Boolean = false,
-isOrderSensitive: Boolean = false)
+isOrderSensitive: Boolean = false,
+knownPartitioner: Option[Partitioner] = None)
   extends RDD[U](prev) {
 
-  override val partitioner = if (preservesPartitioning) 
firstParent[T].partitioner else None
+  override val partitioner = {
+if (preservesPartitioning) {
+  firstParent[T].partitioner
+} else {
+  knownPartitioner
+}
+  }
--- End diff --

Since we are already creating a `MapPartitionsRDD` in distinct, overriding 
`partitioner` should be trivial.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-15 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r217900574
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala 
---
@@ -35,16 +35,24 @@ import org.apache.spark.{Partition, TaskContext}
  * @param isOrderSensitive whether or not the function is order-sensitive. 
If it's order
  * sensitive, it may return totally different 
result when the input order
  * is changed. Mostly stateful functions are 
order-sensitive.
+ * @param knownPartitioner If the result has a known partitioner.
  */
 private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
 var prev: RDD[T],
 f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, 
partition index, iterator)
 preservesPartitioning: Boolean = false,
 isFromBarrier: Boolean = false,
-isOrderSensitive: Boolean = false)
+isOrderSensitive: Boolean = false,
+knownPartitioner: Option[Partitioner] = None)
   extends RDD[U](prev) {
 
-  override val partitioner = if (preservesPartitioning) 
firstParent[T].partitioner else None
+  override val partitioner = {
+if (preservesPartitioning) {
+  firstParent[T].partitioner
+} else {
+  knownPartitioner
+}
+  }
--- End diff --

`MapPartitionsRDD` is already private. But yes the other option is 
sub-classing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-15 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r217900563
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +396,26 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+partitioner match {
+  case Some(p) if numPartitions == partitions.length =>
+def key(x: T): (T, Null) = (x, null)
+val cleanKey = sc.clean(key _)
+val keyed = new MapPartitionsRDD[(T, Null), T](
+  this,
+  (context, pid, iter) => iter.map(cleanKey),
+  knownPartitioner = Some(new WrappedPartitioner(p)))
+val duplicatesRemoved = keyed.reduceByKey((x, y) => x)
--- End diff --

No reduceByKey on a known partioner is fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-14 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r217876215
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +396,26 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+partitioner match {
+  case Some(p) if numPartitions == partitions.length =>
+def key(x: T): (T, Null) = (x, null)
+val cleanKey = sc.clean(key _)
+val keyed = new MapPartitionsRDD[(T, Null), T](
+  this,
+  (context, pid, iter) => iter.map(cleanKey),
+  knownPartitioner = Some(new WrappedPartitioner(p)))
+val duplicatesRemoved = keyed.reduceByKey((x, y) => x)
--- End diff --

Dont you need to specify the partitioner here ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-14 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r217876184
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala 
---
@@ -35,16 +35,24 @@ import org.apache.spark.{Partition, TaskContext}
  * @param isOrderSensitive whether or not the function is order-sensitive. 
If it's order
  * sensitive, it may return totally different 
result when the input order
  * is changed. Mostly stateful functions are 
order-sensitive.
+ * @param knownPartitioner If the result has a known partitioner.
  */
 private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
 var prev: RDD[T],
 f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, 
partition index, iterator)
 preservesPartitioning: Boolean = false,
 isFromBarrier: Boolean = false,
-isOrderSensitive: Boolean = false)
+isOrderSensitive: Boolean = false,
+knownPartitioner: Option[Partitioner] = None)
   extends RDD[U](prev) {
 
-  override val partitioner = if (preservesPartitioning) 
firstParent[T].partitioner else None
+  override val partitioner = {
+if (preservesPartitioning) {
+  firstParent[T].partitioner
+} else {
+  knownPartitioner
+}
+  }
--- End diff --

We dont need to modify public api to add support for this.
Create a subclass of MapPartitionsRDD which has partitioner method 
overridden to specify what you need.
Did I miss something here ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-09 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r216170355
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +396,26 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+partitioner match {
--- End diff --

No we can't. Since the original partitioner function takes in the key of 
the RDD and they key is now changing we can't gaurantee that the previous 
partitioner preserves partioning when we do (`x => (x, null)`).  In fact with 
the HashBased partitioner this is not the case (if you want I explored this in 
my live stream - https://youtu.be/NDGM501uUrE?t=19m17s and you can see the 
Challenger with that approach.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-08 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r216145892
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +396,26 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+partitioner match {
--- End diff --

you can just create a new MapPartitionsRDD with preservesPartitioning set 
to true, can't you?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-09-02 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r214575455
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+// If the data is already approriately partitioned with a known 
partitioner we can work locally.
+def removeDuplicatesInPartition(itr: Iterator[T]): Iterator[T] = {
+  val set = new mutable.HashSet[T]()
+  itr.filter(set.add(_))
--- End diff --

So to reuse `reduceByKey` I'd write a custom partitioner which uses the 
existing partioner as it's base but takes in the combined key type as input and 
drop it down to the original key.

Sound right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-08-30 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r214103667
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+// If the data is already approriately partitioned with a known 
partitioner we can work locally.
+def removeDuplicatesInPartition(itr: Iterator[T]): Iterator[T] = {
+  val set = new mutable.HashSet[T]()
+  itr.filter(set.add(_))
--- End diff --

This is a bad implementation and could OOM. You should reuse reduceByKey.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-08-30 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r214103223
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+// If the data is already approriately partitioned with a known 
partitioner we can work locally.
+def removeDuplicatesInPartition(itr: Iterator[T]): Iterator[T] = {
+  val set = new mutable.HashSet[T]()
+  itr.filter(set.add(_))
--- End diff --

This is a bad implementation and could OOM. You should reuse reduceByKey.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-08-30 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r214057591
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+// If the data is already approriately partitioned with a known 
partitioner we can work locally.
+def removeDuplicatesInPartition(itr: Iterator[T]): Iterator[T] = {
+  val set = new mutable.HashSet[T]()
+  itr.filter(set.add(_))
--- End diff --

yes, the hash should be computed anyway, this is just a nit, I am not sure 
if the perf gain would be even noticeable, that is why I already gave my LGTM, 
despite this comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-08-30 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r214046905
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+// If the data is already approriately partitioned with a known 
partitioner we can work locally.
+def removeDuplicatesInPartition(itr: Iterator[T]): Iterator[T] = {
+  val set = new mutable.HashSet[T]()
+  itr.filter(set.add(_))
--- End diff --

But we need to compute the hash of the key anyways to check if it exists.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-08-12 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r209450199
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+// If the data is already approriately partitioned with a known 
partitioner we can work locally.
+def removeDuplicatesInPartition(itr: Iterator[T]): Iterator[T] = {
+  val set = new mutable.HashSet[T]()
+  itr.filter(set.add(_))
--- End diff --

yes, it is not a big deal, but if you check the implementation in the scala 
lib you can see that the hash and the index for the key is computed despite it 
not needed (since `addElem` is called anyway). Probably it doesn't change much, 
but we could save this computation...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-08-11 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r209440037
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+// If the data is already approriately partitioned with a known 
partitioner we can work locally.
+def removeDuplicatesInPartition(itr: Iterator[T]): Iterator[T] = {
+  val set = new mutable.HashSet[T]()
+  itr.filter(set.add(_))
--- End diff --

So according to HashSet can only contain one instance for each element so 
we don't need to worry about adding multiple copies of the elements.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-08-10 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r209351478
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+// If the data is already approriately partitioned with a known 
partitioner we can work locally.
+def removeDuplicatesInPartition(itr: Iterator[T]): Iterator[T] = {
+  val set = new mutable.HashSet[T]()
+  itr.filter(set.add(_))
--- End diff --

not a big deal, but despite this is really compact and elegant, it adds to 
the set also the elements which are already there and it is not needed. We can 
probably check if the key is there and add it only in that case, probably it is 
a bit faster.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-08-10 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r209340344
  
--- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---
@@ -95,6 +95,18 @@ class RDDSuite extends SparkFunSuite with 
SharedSparkContext {
 assert(!deserial.toString().isEmpty())
   }
 
+  test("distinct with known partioner does not cause shuffle") {
--- End diff --

Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-08-10 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r209340321
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+// If the data is already approriately partioned with a known 
partioner we can work locally.
--- End diff --

Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-08-10 Thread holdenk
Github user holdenk commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r209338809
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+// If the data is already approriately partioned with a known 
partioner we can work locally.
+def removeDuplicatesInPartition(itr: Iterator[T]): Iterator[T] = {
+  val set = new mutable.HashSet[T]() ++= itr
--- End diff --

I like this suggestion, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-08-10 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r209230417
  
--- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---
@@ -95,6 +95,18 @@ class RDDSuite extends SparkFunSuite with 
SharedSparkContext {
 assert(!deserial.toString().isEmpty())
   }
 
+  test("distinct with known partioner does not cause shuffle") {
--- End diff --

nite: partioner -> partitioner


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-08-10 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r209230438
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+// If the data is already approriately partioned with a known 
partioner we can work locally.
--- End diff --

nit: partioned -> partitioned, partioner -> partitioner


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22010: [SPARK-21436][CORE] Take advantage of known parti...

2018-08-07 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22010#discussion_r208180371
  
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag](
* Return a new RDD containing the distinct elements in this RDD.
*/
   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
-map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+// If the data is already approriately partioned with a known 
partioner we can work locally.
+def removeDuplicatesInPartition(itr: Iterator[T]): Iterator[T] = {
+  val set = new mutable.HashSet[T]() ++= itr
--- End diff --

I think here we could return a new iterator which wraps `itr` and uses this 
set as a state in order to filter out the records we have already met. In this 
way we could have only one pass over the data, instead of the 2 of the current 
solution. What do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org