Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22010#discussion_r209351478
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
    @@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag](
        * Return a new RDD containing the distinct elements in this RDD.
        */
       def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): 
RDD[T] = withScope {
    -    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
    +    // If the data is already approriately partitioned with a known 
partitioner we can work locally.
    +    def removeDuplicatesInPartition(itr: Iterator[T]): Iterator[T] = {
    +      val set = new mutable.HashSet[T]()
    +      itr.filter(set.add(_))
    --- End diff --
    
    not a big deal, but despite this is really compact and elegant, it adds to 
the set also the elements which are already there and it is not needed. We can 
probably check if the key is there and add it only in that case, probably it is 
a bit faster.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to