Re: Union of 2 RDD's only returns the first one

2014-04-30 Thread Mingyu Kim
Yes, that’s what I meant. Sure, the numbers might not be actually sorted,
but the order of rows semantically are kept throughout non-shuffling
transforms. I’m on board with you on union as well.

Back to the original question, then, why is it important to coalesce to a
single partition? When you union two RDDs, for example, rdd1 = [“a, b,
c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then
rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with three
lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from the
two reds are concatenated.

Mingyu




On 4/29/14, 10:55 PM, Patrick Wendell pwend...@gmail.com wrote:

If you call map() on an RDD it will retain the ordering it had before,
but that is not necessarily a correct sort order for the new RDD.

var rdd = sc.parallelize([2, 1, 3]);
var sorted = rdd.map(x = (x, x)).sort(); // should be [1, 2, 3]
var mapped = sorted.mapValues(x = 3 - x); // should be [2, 1, 0]

Note that mapped is no longer sorted.

When you union two RDD's together it will effectively concatenate the
two orderings, which is also not a valid sorted order on the new RDD:

rdd1 = [1,2,3]
rdd2 = [1,4,5]

rdd1.union(rdd2) = [1,2,3,1,4,5]

On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim m...@palantir.com wrote:
 Thanks for the quick response!

 To better understand it, the reason sorted RDD has a well-defined
ordering
 is because sortedRDD.getPartitions() returns the partitions in the right
 order and each partition internally is properly sorted. So, if you have

 var rdd = sc.parallelize([2, 1, 3]);
 var sorted = rdd.map(x = (x, x)).sort(); // should be [1, 2, 3]
 var mapped = sorted.mapValues(x = x + 1); // should be [2, 3, 4]

 Since mapValues doesn’t change the order of partitions not change the
 order of rows within the partitions, I think “mapped” should have the
 exact same order as “sorted”. Sure, if a transform involves shuffling,
the
 order will change. Am I mistaken? Is there an extra detail in sortedRDD
 that guarantees a well-defined ordering?

 If it’s true that the order of partitions returned by
RDD.getPartitions()
 and the row orders within the partitions determine the row order, I’m
not
 sure why union doesn’t respect the order because union operation simply
 concatenates the two lists of partitions from the two RDDs.

 Mingyu




 On 4/29/14, 10:25 PM, Patrick Wendell pwend...@gmail.com wrote:

You are right, once you sort() the RDD, then yes it has a well defined
ordering.

But that ordering is lost as soon as you transform the RDD, including
if you union it with another RDD.

On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim m...@palantir.com wrote:
 Hi Patrick,

 I¹m a little confused about your comment that RDDs are not ordered. As
far
 as I know, RDDs keep list of partitions that are ordered and this is
why I
 can call RDD.take() and get the same first k rows every time I call it
and
 RDD.take() returns the same entries as RDD.map(Š).take() because map
 preserves the partition order. RDD order is also what allows me to get
the
 top k out of RDD by doing RDD.sort().take().

 Am I misunderstanding it? Or, is it just when RDD is written to disk
that
 the order is not well preserved? Thanks in advance!

 Mingyu




 On 1/22/14, 4:46 PM, Patrick Wendell pwend...@gmail.com wrote:

Ah somehow after all this time I've never seen that!

On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia
buendia...@gmail.com
wrote:



 On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell
pwend...@gmail.com
 wrote:

 What is the ++ operator here? Is this something you defined?


 No, it's an alias for union defined in RDD.scala:

 def ++(other: RDD[T]): RDD[T] = this.union(other)



 Another issue is that RDD's are not ordered, so when you union two
 together it doesn't have a well defined ordering.

 If you do want to do this you could coalesce into one partition,
then
 call MapPartitions and return an iterator that first adds your
header
 and then the rest of the file, then call saveAsTextFile. Keep in
mind
 this will only work if you coalesce into a single partition.


 Thanks! I'll give this a try.



 myRdd.coalesce(1)
 .map(_.mkString(,)))
 .mapPartitions(it = (Seq(col1,col2,col3) ++ it).iterator)
 .saveAsTextFile(out.csv)

 - Patrick

 On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
 buendia...@gmail.com wrote:
  Hi,
 
  I'm trying to find a way to create a csv header when using
  saveAsTextFile,
  and I came up with this:
 
  (sc.makeRDD(Array(col1,col2,col3), 1) ++
  myRdd.coalesce(1).map(_.mkString(,)))
.saveAsTextFile(out.csv)
 
  But it only saves the header part. Why is that the union method
does
not
  return both RDD's?




smime.p7s
Description: S/MIME cryptographic signature


Re: Union of 2 RDD's only returns the first one

2014-04-30 Thread Mingyu Kim
Okay, that makes sense. It’d be great if this can be better documented at
some point, because the only way to find out about the resulting RDD row
order is by looking at the code.

Thanks for the discussion!

Mingyu




On 4/29/14, 11:59 PM, Patrick Wendell pwend...@gmail.com wrote:

I don't think we guarantee anywhere that union(A, B) will behave by
concatenating the partitions, it just happens to be an artifact of the
current implementation.

rdd1 = [1,2,3]
rdd2 = [1,4,5]

rdd1.union(rdd2) = [1,2,3,1,4,5] // how it is now
rdd1.union(rdd2) = [1,4,5,1,2,3] // some day it could be like this, it
wouldn't violate the contract of union

AFIAK the only guarentee is the resulting RDD will contain all elements.

- Patrick

On Tue, Apr 29, 2014 at 11:26 PM, Mingyu Kim m...@palantir.com wrote:
 Yes, that’s what I meant. Sure, the numbers might not be actually
sorted,
 but the order of rows semantically are kept throughout non-shuffling
 transforms. I’m on board with you on union as well.

 Back to the original question, then, why is it important to coalesce to
a
 single partition? When you union two RDDs, for example, rdd1 = [“a, b,
 c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then
 rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with
three
 lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from
the
 two reds are concatenated.

 Mingyu




 On 4/29/14, 10:55 PM, Patrick Wendell pwend...@gmail.com wrote:

If you call map() on an RDD it will retain the ordering it had before,
but that is not necessarily a correct sort order for the new RDD.

var rdd = sc.parallelize([2, 1, 3]);
var sorted = rdd.map(x = (x, x)).sort(); // should be [1, 2, 3]
var mapped = sorted.mapValues(x = 3 - x); // should be [2, 1, 0]

Note that mapped is no longer sorted.

When you union two RDD's together it will effectively concatenate the
two orderings, which is also not a valid sorted order on the new RDD:

rdd1 = [1,2,3]
rdd2 = [1,4,5]

rdd1.union(rdd2) = [1,2,3,1,4,5]

On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim m...@palantir.com wrote:
 Thanks for the quick response!

 To better understand it, the reason sorted RDD has a well-defined
ordering
 is because sortedRDD.getPartitions() returns the partitions in the
right
 order and each partition internally is properly sorted. So, if you
have

 var rdd = sc.parallelize([2, 1, 3]);
 var sorted = rdd.map(x = (x, x)).sort(); // should be [1, 2, 3]
 var mapped = sorted.mapValues(x = x + 1); // should be [2, 3, 4]

 Since mapValues doesn’t change the order of partitions not change the
 order of rows within the partitions, I think “mapped” should have the
 exact same order as “sorted”. Sure, if a transform involves shuffling,
the
 order will change. Am I mistaken? Is there an extra detail in
sortedRDD
 that guarantees a well-defined ordering?

 If it’s true that the order of partitions returned by
RDD.getPartitions()
 and the row orders within the partitions determine the row order, I’m
not
 sure why union doesn’t respect the order because union operation
simply
 concatenates the two lists of partitions from the two RDDs.

 Mingyu




 On 4/29/14, 10:25 PM, Patrick Wendell pwend...@gmail.com wrote:

You are right, once you sort() the RDD, then yes it has a well defined
ordering.

But that ordering is lost as soon as you transform the RDD, including
if you union it with another RDD.

On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim m...@palantir.com
wrote:
 Hi Patrick,

 I¹m a little confused about your comment that RDDs are not ordered.
As
far
 as I know, RDDs keep list of partitions that are ordered and this is
why I
 can call RDD.take() and get the same first k rows every time I call
it
and
 RDD.take() returns the same entries as RDD.map(Š).take() because map
 preserves the partition order. RDD order is also what allows me to
get
the
 top k out of RDD by doing RDD.sort().take().

 Am I misunderstanding it? Or, is it just when RDD is written to disk
that
 the order is not well preserved? Thanks in advance!

 Mingyu




 On 1/22/14, 4:46 PM, Patrick Wendell pwend...@gmail.com wrote:

Ah somehow after all this time I've never seen that!

On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia
buendia...@gmail.com
wrote:



 On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell
pwend...@gmail.com
 wrote:

 What is the ++ operator here? Is this something you defined?


 No, it's an alias for union defined in RDD.scala:

 def ++(other: RDD[T]): RDD[T] = this.union(other)



 Another issue is that RDD's are not ordered, so when you union
two
 together it doesn't have a well defined ordering.

 If you do want to do this you could coalesce into one partition,
then
 call MapPartitions and return an iterator that first adds your
header
 and then the rest of the file, then call saveAsTextFile. Keep in
mind
 this will only work if you coalesce into a single partition.


 Thanks! I'll give this a try.



 myRdd.coalesce(1)
 .map(_.mkString(,)))
 .mapPartitions(it = (Seq(col1,col2,col3) ++ 

Re: Union of 2 RDD's only returns the first one

2014-04-30 Thread Mingyu Kim
I agree with you in general that as an API user, I shouldn’t be relying on
code. However, without looking at the code, there is no way for me to find
out even whether map() keeps the row order. Without the knowledge at all,
I’d need to do “sort” every time I need certain things in a certain order.
(and, sort is really expensive.) On the other hand, if I can assume, say,
“filter” or “map” doesn’t shuffle the rows around, I can do the sort once
and assume that the order is retained throughout such operations saving a
lot of time from doing unnecessary sorts.

Mingyu

From:  Mark Hamstra m...@clearstorydata.com
Reply-To:  user@spark.apache.org user@spark.apache.org
Date:  Wednesday, April 30, 2014 at 11:36 AM
To:  user@spark.apache.org user@spark.apache.org
Subject:  Re: Union of 2 RDD's only returns the first one

Which is what you shouldn't be doing as an API user, since that
implementation code might change.  The documentation doesn't mention a row
ordering guarantee, so none should be assumed.

It is hard enough for us to correctly document all of the things that the
API does do.  We really shouldn't be forced into the expectation that we
will also fully document everything that the API doesn't do.


On Wed, Apr 30, 2014 at 11:13 AM, Mingyu Kim m...@palantir.com wrote:
 Okay, that makes sense. It’d be great if this can be better documented at
 some point, because the only way to find out about the resulting RDD row
 order is by looking at the code.
 
 Thanks for the discussion!
 
 Mingyu
 
 
 
 
 On 4/29/14, 11:59 PM, Patrick Wendell pwend...@gmail.com wrote:
 
 I don't think we guarantee anywhere that union(A, B) will behave by
 concatenating the partitions, it just happens to be an artifact of the
 current implementation.
 
 rdd1 = [1,2,3]
 rdd2 = [1,4,5]
 
 rdd1.union(rdd2) = [1,2,3,1,4,5] // how it is now
 rdd1.union(rdd2) = [1,4,5,1,2,3] // some day it could be like this, it
 wouldn't violate the contract of union
 
 AFIAK the only guarentee is the resulting RDD will contain all elements.
 
 - Patrick
 
 On Tue, Apr 29, 2014 at 11:26 PM, Mingyu Kim m...@palantir.com wrote:
  Yes, that’s what I meant. Sure, the numbers might not be actually
 sorted,
  but the order of rows semantically are kept throughout non-shuffling
  transforms. I’m on board with you on union as well.
 
  Back to the original question, then, why is it important to coalesce to
 a
  single partition? When you union two RDDs, for example, rdd1 = [“a, b,
  c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then
  rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with
 three
  lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from
 the
  two reds are concatenated.
 
  Mingyu
 
 
 
 
  On 4/29/14, 10:55 PM, Patrick Wendell pwend...@gmail.com wrote:
 
 If you call map() on an RDD it will retain the ordering it had before,
 but that is not necessarily a correct sort order for the new RDD.
 
 var rdd = sc.parallelize([2, 1, 3]);
 var sorted = rdd.map(x = (x, x)).sort(); // should be [1, 2, 3]
 var mapped = sorted.mapValues(x = 3 - x); // should be [2, 1, 0]
 
 Note that mapped is no longer sorted.
 
 When you union two RDD's together it will effectively concatenate the
 two orderings, which is also not a valid sorted order on the new RDD:
 
 rdd1 = [1,2,3]
 rdd2 = [1,4,5]
 
 rdd1.union(rdd2) = [1,2,3,1,4,5]
 
 On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim m...@palantir.com wrote:
  Thanks for the quick response!
 
  To better understand it, the reason sorted RDD has a well-defined
 ordering
  is because sortedRDD.getPartitions() returns the partitions in the
 right
  order and each partition internally is properly sorted. So, if you
 have
 
  var rdd = sc.parallelize([2, 1, 3]);
  var sorted = rdd.map(x = (x, x)).sort(); // should be [1, 2, 3]
  var mapped = sorted.mapValues(x = x + 1); // should be [2, 3, 4]
 
  Since mapValues doesn’t change the order of partitions not change the
  order of rows within the partitions, I think “mapped” should have the
  exact same order as “sorted”. Sure, if a transform involves
 shuffling,
 the
  order will change. Am I mistaken? Is there an extra detail in
 sortedRDD
  that guarantees a well-defined ordering?
 
  If it’s true that the order of partitions returned by
 RDD.getPartitions()
  and the row orders within the partitions determine the row order, I’m
 not
  sure why union doesn’t respect the order because union operation
 simply
  concatenates the two lists of partitions from the two RDDs.
 
  Mingyu
 
 
 
 
  On 4/29/14, 10:25 PM, Patrick Wendell pwend...@gmail.com wrote:
 
 You are right, once you sort() the RDD, then yes it has a well
 defined
 ordering.
 
 But that ordering is lost as soon as you transform the RDD,
 including
 if you union it with another RDD.
 
 On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim m...@palantir.com
 wrote:
  Hi Patrick,
 
  I¹m a little confused about your comment that RDDs are not
 ordered.
 As
 far
  as I know, RDDs keep list of partitions

Re: Union of 2 RDD's only returns the first one

2014-04-29 Thread Mingyu Kim
Hi Patrick,

I¹m a little confused about your comment that RDDs are not ordered. As far
as I know, RDDs keep list of partitions that are ordered and this is why I
can call RDD.take() and get the same first k rows every time I call it and
RDD.take() returns the same entries as RDD.map(Š).take() because map
preserves the partition order. RDD order is also what allows me to get the
top k out of RDD by doing RDD.sort().take().

Am I misunderstanding it? Or, is it just when RDD is written to disk that
the order is not well preserved? Thanks in advance!

Mingyu




On 1/22/14, 4:46 PM, Patrick Wendell pwend...@gmail.com wrote:

Ah somehow after all this time I've never seen that!

On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia buendia...@gmail.com
wrote:



 On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell pwend...@gmail.com
 wrote:

 What is the ++ operator here? Is this something you defined?


 No, it's an alias for union defined in RDD.scala:

 def ++(other: RDD[T]): RDD[T] = this.union(other)



 Another issue is that RDD's are not ordered, so when you union two
 together it doesn't have a well defined ordering.

 If you do want to do this you could coalesce into one partition, then
 call MapPartitions and return an iterator that first adds your header
 and then the rest of the file, then call saveAsTextFile. Keep in mind
 this will only work if you coalesce into a single partition.


 Thanks! I'll give this a try.



 myRdd.coalesce(1)
 .map(_.mkString(,)))
 .mapPartitions(it = (Seq(col1,col2,col3) ++ it).iterator)
 .saveAsTextFile(out.csv)

 - Patrick

 On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
 buendia...@gmail.com wrote:
  Hi,
 
  I'm trying to find a way to create a csv header when using
  saveAsTextFile,
  and I came up with this:
 
  (sc.makeRDD(Array(col1,col2,col3), 1) ++
  myRdd.coalesce(1).map(_.mkString(,)))
.saveAsTextFile(out.csv)
 
  But it only saves the header part. Why is that the union method does
not
  return both RDD's?




smime.p7s
Description: S/MIME cryptographic signature


Re: Union of 2 RDD's only returns the first one

2014-04-29 Thread Patrick Wendell
You are right, once you sort() the RDD, then yes it has a well defined ordering.

But that ordering is lost as soon as you transform the RDD, including
if you union it with another RDD.

On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim m...@palantir.com wrote:
 Hi Patrick,

 I¹m a little confused about your comment that RDDs are not ordered. As far
 as I know, RDDs keep list of partitions that are ordered and this is why I
 can call RDD.take() and get the same first k rows every time I call it and
 RDD.take() returns the same entries as RDD.map(Š).take() because map
 preserves the partition order. RDD order is also what allows me to get the
 top k out of RDD by doing RDD.sort().take().

 Am I misunderstanding it? Or, is it just when RDD is written to disk that
 the order is not well preserved? Thanks in advance!

 Mingyu




 On 1/22/14, 4:46 PM, Patrick Wendell pwend...@gmail.com wrote:

Ah somehow after all this time I've never seen that!

On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia buendia...@gmail.com
wrote:



 On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell pwend...@gmail.com
 wrote:

 What is the ++ operator here? Is this something you defined?


 No, it's an alias for union defined in RDD.scala:

 def ++(other: RDD[T]): RDD[T] = this.union(other)



 Another issue is that RDD's are not ordered, so when you union two
 together it doesn't have a well defined ordering.

 If you do want to do this you could coalesce into one partition, then
 call MapPartitions and return an iterator that first adds your header
 and then the rest of the file, then call saveAsTextFile. Keep in mind
 this will only work if you coalesce into a single partition.


 Thanks! I'll give this a try.



 myRdd.coalesce(1)
 .map(_.mkString(,)))
 .mapPartitions(it = (Seq(col1,col2,col3) ++ it).iterator)
 .saveAsTextFile(out.csv)

 - Patrick

 On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
 buendia...@gmail.com wrote:
  Hi,
 
  I'm trying to find a way to create a csv header when using
  saveAsTextFile,
  and I came up with this:
 
  (sc.makeRDD(Array(col1,col2,col3), 1) ++
  myRdd.coalesce(1).map(_.mkString(,)))
.saveAsTextFile(out.csv)
 
  But it only saves the header part. Why is that the union method does
not
  return both RDD's?