Yes, that’s what I meant. Sure, the numbers might not be actually sorted, but the order of rows semantically are kept throughout non-shuffling transforms. I’m on board with you on union as well.
Back to the original question, then, why is it important to coalesce to a single partition? When you union two RDDs, for example, rdd1 = [“a, b, c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with three lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from the two reds are concatenated. Mingyu On 4/29/14, 10:55 PM, "Patrick Wendell" <pwend...@gmail.com> wrote: >If you call map() on an RDD it will retain the ordering it had before, >but that is not necessarily a correct sort order for the new RDD. > >var rdd = sc.parallelize([2, 1, 3]); >var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3] >var mapped = sorted.mapValues(x => 3 - x); // should be [2, 1, 0] > >Note that mapped is no longer sorted. > >When you union two RDD's together it will effectively concatenate the >two orderings, which is also not a valid sorted order on the new RDD: > >rdd1 = [1,2,3] >rdd2 = [1,4,5] > >rdd1.union(rdd2) = [1,2,3,1,4,5] > >On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim <m...@palantir.com> wrote: >> Thanks for the quick response! >> >> To better understand it, the reason sorted RDD has a well-defined >>ordering >> is because sortedRDD.getPartitions() returns the partitions in the right >> order and each partition internally is properly sorted. So, if you have >> >> var rdd = sc.parallelize([2, 1, 3]); >> var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3] >> var mapped = sorted.mapValues(x => x + 1); // should be [2, 3, 4] >> >> Since mapValues doesn’t change the order of partitions not change the >> order of rows within the partitions, I think “mapped” should have the >> exact same order as “sorted”. Sure, if a transform involves shuffling, >>the >> order will change. Am I mistaken? Is there an extra detail in sortedRDD >> that guarantees a well-defined ordering? >> >> If it’s true that the order of partitions returned by >>RDD.getPartitions() >> and the row orders within the partitions determine the row order, I’m >>not >> sure why union doesn’t respect the order because union operation simply >> concatenates the two lists of partitions from the two RDDs. >> >> Mingyu >> >> >> >> >> On 4/29/14, 10:25 PM, "Patrick Wendell" <pwend...@gmail.com> wrote: >> >>>You are right, once you sort() the RDD, then yes it has a well defined >>>ordering. >>> >>>But that ordering is lost as soon as you transform the RDD, including >>>if you union it with another RDD. >>> >>>On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim <m...@palantir.com> wrote: >>>> Hi Patrick, >>>> >>>> I¹m a little confused about your comment that RDDs are not ordered. As >>>>far >>>> as I know, RDDs keep list of partitions that are ordered and this is >>>>why I >>>> can call RDD.take() and get the same first k rows every time I call it >>>>and >>>> RDD.take() returns the same entries as RDD.map(Š).take() because map >>>> preserves the partition order. RDD order is also what allows me to get >>>>the >>>> top k out of RDD by doing RDD.sort().take(). >>>> >>>> Am I misunderstanding it? Or, is it just when RDD is written to disk >>>>that >>>> the order is not well preserved? Thanks in advance! >>>> >>>> Mingyu >>>> >>>> >>>> >>>> >>>> On 1/22/14, 4:46 PM, "Patrick Wendell" <pwend...@gmail.com> wrote: >>>> >>>>>Ah somehow after all this time I've never seen that! >>>>> >>>>>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia >>>>><buendia...@gmail.com> >>>>>wrote: >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell >>>>>><pwend...@gmail.com> >>>>>> wrote: >>>>>>> >>>>>>> What is the ++ operator here? Is this something you defined? >>>>>> >>>>>> >>>>>> No, it's an alias for union defined in RDD.scala: >>>>>> >>>>>> def ++(other: RDD[T]): RDD[T] = this.union(other) >>>>>> >>>>>>> >>>>>>> >>>>>>> Another issue is that RDD's are not ordered, so when you union two >>>>>>> together it doesn't have a well defined ordering. >>>>>>> >>>>>>> If you do want to do this you could coalesce into one partition, >>>>>>>then >>>>>>> call MapPartitions and return an iterator that first adds your >>>>>>>header >>>>>>> and then the rest of the file, then call saveAsTextFile. Keep in >>>>>>>mind >>>>>>> this will only work if you coalesce into a single partition. >>>>>> >>>>>> >>>>>> Thanks! I'll give this a try. >>>>>> >>>>>>> >>>>>>> >>>>>>> myRdd.coalesce(1) >>>>>>> .map(_.mkString(","))) >>>>>>> .mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator) >>>>>>> .saveAsTextFile("out.csv") >>>>>>> >>>>>>> - Patrick >>>>>>> >>>>>>> On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia >>>>>>> <buendia...@gmail.com> wrote: >>>>>>> > Hi, >>>>>>> > >>>>>>> > I'm trying to find a way to create a csv header when using >>>>>>> > saveAsTextFile, >>>>>>> > and I came up with this: >>>>>>> > >>>>>>> > (sc.makeRDD(Array("col1,col2,col3"), 1) ++ >>>>>>> > myRdd.coalesce(1).map(_.mkString(","))) >>>>>>> > .saveAsTextFile("out.csv") >>>>>>> > >>>>>>> > But it only saves the header part. Why is that the union method >>>>>>>does >>>>>>>not >>>>>>> > return both RDD's? >>>>>> >>>>>>
smime.p7s
Description: S/MIME cryptographic signature