Re: Pairwise Processing of a List

2015-01-26 Thread Sean Owen
AFAIK ordering is not strictly guaranteed unless the RDD is the
product of a sort. I think that in practice, you'll never find
elements of a file read in some random order, for example (although
see the recent issue about partition ordering potentially depending on
how the local file system lists them).

Likewise I can't imagine you encounter elements from one Kafka
partition out of order. One receiver hears one partition and create
one block per block interval. What I'm not 100% clear on is whether
you get undefined ordering when you have multiple threads listening in
one receiver.

You can always sort RDDs by a timestamp of some sort to be sure,
although that has overheads. I'm also curious about what if anything
is guaranteed here without a sort.

On Mon, Jan 26, 2015 at 1:33 AM, Tobias Pfeiffer t...@preferred.jp wrote:
 Sean,

 On Mon, Jan 26, 2015 at 10:28 AM, Sean Owen so...@cloudera.com wrote:

 Note that RDDs don't really guarantee anything about ordering though,
 so this only makes sense if you've already sorted some upstream RDD by
 a timestamp or sequence number.


 Speaking of order, is there some reading on guarantees and non-guarantees
 about order in RDDs? For example, when reading a file and doing
 zipWithIndex, can I assume that the lines are numbered in order? Does this
 hold for receiving data from Kafka, too?

 Tobias


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Pairwise Processing of a List

2015-01-25 Thread Joseph Lust
So you’ve got a point A and you want the sum of distances between it and all 
other points? Or am I misunderstanding you?

// target point, can be Broadcast global sent to all workers
val tarPt = (10,20)
val pts = Seq((2,2),(3,3),(2,3),(10,2))
val rdd= sc.parallelize(pts)
rdd.map( pt = Math.sqrt( Math.pow(tarPt._1 - pt._1,2) + Math.pow(tarPt._2 - 
pt._2,2)) ).reduce( (d1,d2) = d1+d2)

-Joe

From: Steve Nunez snu...@hortonworks.commailto:snu...@hortonworks.com
Date: Sunday, January 25, 2015 at 7:32 PM
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Pairwise Processing of a List

Spark Experts,

I’ve got a list of points: List[(Float, Float)]) that represent (x,y) 
coordinate pairs and need to sum the distance. It’s easy enough to compute the 
distance:

case class Point(x: Float, y: Float) {
  def distance(other: Point): Float =
sqrt(pow(x - other.x, 2) + pow(y - other.y, 2)).toFloat
}

(in this case I create a ‘Point’ class, but the maths are the same).

What I can’t figure out is the ‘right’ way to sum distances between all the 
points. I can make this work by traversing the list with a for loop and using 
indices, but this doesn’t seem right.

Anyone know a clever way to process List[(Float, Float)]) in a pairwise fashion?

Regards,
- Steve




Re: Pairwise Processing of a List

2015-01-25 Thread Tobias Pfeiffer
Hi,

On Mon, Jan 26, 2015 at 9:32 AM, Steve Nunez snu...@hortonworks.com wrote:

  I’ve got a list of points: List[(Float, Float)]) that represent (x,y)
 coordinate pairs and need to sum the distance. It’s easy enough to compute
 the distance:


Are you saying you want all combinations (N^2) of distances? That should be
possible with rdd.cartesian():

val points = sc.parallelize(List((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)))
points.cartesian(points).collect
-- Array[((Double, Double), (Double, Double))] =
Array(((1.0,2.0),(1.0,2.0)), ((1.0,2.0),(3.0,4.0)), ((1.0,2.0),(5.0,6.0)),
((3.0,4.0),(1.0,2.0)), ((3.0,4.0),(3.0,4.0)), ((3.0,4.0),(5.0,6.0)),
((5.0,6.0),(1.0,2.0)), ((5.0,6.0),(3.0,4.0)), ((5.0,6.0),(5.0,6.0)))

I guess this is a very expensive operation, though.

Tobias


Re: Pairwise Processing of a List

2015-01-25 Thread Sean Owen
If this is really about just Scala Lists, then a simple answer (using
tuples of doubles) is:

val points: List[(Double,Double)] = ...
val distances = for (p1 - points; p2 - points) yield {
  val dx = p1._1 - p2._1
  val dy = p1._2 - p2._2
  math.sqrt(dx*dx + dy*dy)
}
distances.sum / 2

It's / 2 since this counts every pair twice. You could double the
speed of that, with a slightly more complex formulation using indices,
that avoids comparing points to themselves and makes each comparison
just once.

If you really need the sum of all pairwise distances, I don't think
you can do better than that (modulo dealing with duplicates
intelligently).

If we're talking RDDs, then the simple answer is similar:

val pointsRDD: RDD[(Double,Double)] = ...
val distancesRDD = pointsRDD.cartesian(pointsRDD).map { case (p1, p2) = ... }
distancesRDD.sum / 2

It takes more work to make the same optimization, and involves
zipWithIndex, but is possible.

If the reason we're talking about Lists is that the set of points is
still fairly small, but big enough that all-pairs deserves distributed
computation, then I'd parallelize the List into an RDD, and also
broadcast it, and then implement a hybrid of these two approaches.
You'd have the outer loop over points happening in parallel via the
RDD, and inner loop happening locally over the local broadcasted copy
in memory.

... and if the use case isn't really to find all-pairs distances and
their sum, maybe there are faster ways still to do what you need to.

On Mon, Jan 26, 2015 at 12:32 AM, Steve Nunez snu...@hortonworks.com wrote:
 Spark Experts,

 I’ve got a list of points: List[(Float, Float)]) that represent (x,y)
 coordinate pairs and need to sum the distance. It’s easy enough to compute
 the distance:

 case class Point(x: Float, y: Float) {
   def distance(other: Point): Float =
 sqrt(pow(x - other.x, 2) + pow(y - other.y, 2)).toFloat
 }

 (in this case I create a ‘Point’ class, but the maths are the same).

 What I can’t figure out is the ‘right’ way to sum distances between all the
 points. I can make this work by traversing the list with a for loop and
 using indices, but this doesn’t seem right.

 Anyone know a clever way to process List[(Float, Float)]) in a pairwise
 fashion?

 Regards,
 - Steve



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Pairwise Processing of a List

2015-01-25 Thread Steve Nunez
Not combinations, linear distances, e.g., given: List[ (x1,y1), (x2,y2), 
(x3,y3) ], compute the sum of:

distance (x1,y2) and (x2,y2) and
distance (x2,y2) and (x3,y3)

Imagine that the list of coordinate point comes from a GPS and describes a trip.

- Steve

From: Joseph Lust jl...@mc10inc.commailto:jl...@mc10inc.com
Date: Sunday, January 25, 2015 at 17:17
To: Steve Nunez snu...@hortonworks.commailto:snu...@hortonworks.com, 
user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Pairwise Processing of a List

So you've got a point A and you want the sum of distances between it and all 
other points? Or am I misunderstanding you?

// target point, can be Broadcast global sent to all workers
val tarPt = (10,20)
val pts = Seq((2,2),(3,3),(2,3),(10,2))
val rdd= sc.parallelize(pts)
rdd.map( pt = Math.sqrt( Math.pow(tarPt._1 - pt._1,2) + Math.pow(tarPt._2 - 
pt._2,2)) ).reduce( (d1,d2) = d1+d2)

-Joe

From: Steve Nunez snu...@hortonworks.commailto:snu...@hortonworks.com
Date: Sunday, January 25, 2015 at 7:32 PM
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Pairwise Processing of a List

Spark Experts,

I've got a list of points: List[(Float, Float)]) that represent (x,y) 
coordinate pairs and need to sum the distance. It's easy enough to compute the 
distance:

case class Point(x: Float, y: Float) {
  def distance(other: Point): Float =
sqrt(pow(x - other.x, 2) + pow(y - other.y, 2)).toFloat
}

(in this case I create a 'Point' class, but the maths are the same).

What I can't figure out is the 'right' way to sum distances between all the 
points. I can make this work by traversing the list with a for loop and using 
indices, but this doesn't seem right.

Anyone know a clever way to process List[(Float, Float)]) in a pairwise fashion?

Regards,
- Steve



CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader of 
this message is not the intended recipient, you are hereby notified that any 
printing, copying, dissemination, distribution, disclosure or forwarding of 
this communication is strictly prohibited. If you have received this 
communication in error, please contact the sender immediately and delete it 
from your system. Thank You.


Re: Pairwise Processing of a List

2015-01-25 Thread Sean Owen
(PS the Scala code I posted is a poor way to do it -- it would
materialize the entire cartesian product in memory. You can use
.iterator or .view to fix that.)

Ah, so you want sum of distances between successive points.

val points: List[(Double,Double)] = ...
points.sliding(2).map { case List(p1,p2) = distance(p1,p2) }.sum

If you import org.apache.spark.mllib.rdd.RDDFunctions._ you should
have access to something similar in Spark over an RDD. It gives you a
sliding() function that produces Arrays of sequential elements.

Note that RDDs don't really guarantee anything about ordering though,
so this only makes sense if you've already sorted some upstream RDD by
a timestamp or sequence number.

On Mon, Jan 26, 2015 at 1:21 AM, Steve Nunez snu...@hortonworks.com wrote:
 Not combinations, linear distances, e.g., given: List[ (x1,y1), (x2,y2),
 (x3,y3) ], compute the sum of:

 distance (x1,y2) and (x2,y2) and
 distance (x2,y2) and (x3,y3)

 Imagine that the list of coordinate point comes from a GPS and describes a
 trip.

 - Steve

 From: Joseph Lust jl...@mc10inc.com
 Date: Sunday, January 25, 2015 at 17:17
 To: Steve Nunez snu...@hortonworks.com, user@spark.apache.org
 user@spark.apache.org
 Subject: Re: Pairwise Processing of a List

 So you’ve got a point A and you want the sum of distances between it and all
 other points? Or am I misunderstanding you?

 // target point, can be Broadcast global sent to all workers
 val tarPt = (10,20)
 val pts = Seq((2,2),(3,3),(2,3),(10,2))
 val rdd= sc.parallelize(pts)
 rdd.map( pt = Math.sqrt( Math.pow(tarPt._1 - pt._1,2) + Math.pow(tarPt._2 -
 pt._2,2)) ).reduce( (d1,d2) = d1+d2)

 -Joe

 From: Steve Nunez snu...@hortonworks.com
 Date: Sunday, January 25, 2015 at 7:32 PM
 To: user@spark.apache.org user@spark.apache.org
 Subject: Pairwise Processing of a List

 Spark Experts,

 I’ve got a list of points: List[(Float, Float)]) that represent (x,y)
 coordinate pairs and need to sum the distance. It’s easy enough to compute
 the distance:

 case class Point(x: Float, y: Float) {
   def distance(other: Point): Float =
 sqrt(pow(x - other.x, 2) + pow(y - other.y, 2)).toFloat
 }

 (in this case I create a ‘Point’ class, but the maths are the same).

 What I can’t figure out is the ‘right’ way to sum distances between all the
 points. I can make this work by traversing the list with a for loop and
 using indices, but this doesn’t seem right.

 Anyone know a clever way to process List[(Float, Float)]) in a pairwise
 fashion?

 Regards,
 - Steve



 CONFIDENTIALITY NOTICE
 NOTICE: This message is intended for the use of the individual or entity to
 which it is addressed and may contain information that is confidential,
 privileged and exempt from disclosure under applicable law. If the reader of
 this message is not the intended recipient, you are hereby notified that any
 printing, copying, dissemination, distribution, disclosure or forwarding of
 this communication is strictly prohibited. If you have received this
 communication in error, please contact the sender immediately and delete it
 from your system. Thank You.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Pairwise Processing of a List

2015-01-25 Thread Tobias Pfeiffer
Sean,

On Mon, Jan 26, 2015 at 10:28 AM, Sean Owen so...@cloudera.com wrote:

 Note that RDDs don't really guarantee anything about ordering though,
 so this only makes sense if you've already sorted some upstream RDD by
 a timestamp or sequence number.


Speaking of order, is there some reading on guarantees and non-guarantees
about order in RDDs? For example, when reading a file and doing
zipWithIndex, can I assume that the lines are numbered in order? Does this
hold for receiving data from Kafka, too?

Tobias