Re: spark challenge: zip with next???

2015-01-30 Thread Michael Malak
But isn't foldLeft() overkill for the originally stated use case of max diff of 
adjacent pairs? Isn't foldLeft() for recursive non-commutative non-associative 
accumulation as opposed to an embarrassingly parallel operation such as this 
one?
This use case reminds me of FIR filtering in DSP. It seems that RDDs could use 
something that serves the same purpose as scala.collection.Iterator.sliding.
  From: Koert Kuipers ko...@tresata.com
 To: Mohit Jaggi mohitja...@gmail.com 
Cc: Tobias Pfeiffer t...@preferred.jp; Ganelin, Ilya 
ilya.gane...@capitalone.com; derrickburns derrickrbu...@gmail.com; 
user@spark.apache.org user@spark.apache.org 
 Sent: Friday, January 30, 2015 7:11 AM
 Subject: Re: spark challenge: zip with next???
   
assuming the data can be partitioned then you have many timeseries for which 
you want to detect potential gaps. also assuming the resulting gaps info per 
timeseries is much smaller data then the timeseries data itself, then this is a 
classical example to me of a sorted (streaming) foldLeft, requiring an 
efficient secondary sort in the spark shuffle. i am trying to get that into 
spark here:
https://issues.apache.org/jira/browse/SPARK-3655



On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi mohitja...@gmail.com wrote:

http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E
you can use the MLLib function or do the following (which is what I had done):
- in first pass over the data, using mapPartitionWithIndex, gather the first 
item in each partition. you can use collect (or aggregator) for this. “key” 
them by the partition index. at the end, you will have a map   (partition 
index) -- first item- in the second pass over the data, using 
mapPartitionWithIndex again, look at two (or in the general case N items at a 
time, you can use scala’s sliding iterator) items at a time and check the time 
difference(or any sliding window computation). To this mapParitition, pass the 
map created in previous step. You will need to use them to check the last item 
in this partition.
If you can tolerate a few inaccuracies then you can just do the second step. 
You will miss the “boundaries” of the partitions but it might be acceptable for 
your use case.



On Jan 29, 2015, at 4:36 PM, Tobias Pfeiffer t...@preferred.jp wrote:
Hi,

On Fri, Jan 30, 2015 at 6:32 AM, Ganelin, Ilya ilya.gane...@capitalone.com 
wrote:

Make a copy of your RDD with an extra entry in the beginning to offset. The you 
can zip the two RDDs and run a map to generate an RDD of differences.


Does that work? I recently tried something to compute differences between each 
entry and the next, so I did  val rdd1 = ... // null element + rdd  val rdd2 = 
... // rdd + null elementbut got an error message about zip requiring data 
sizes in each partition to match.
Tobias






  

Re: spark challenge: zip with next???

2015-01-30 Thread Koert Kuipers
and if its a single giant timeseries that is already sorted then Mohit's
solution sounds good to me.

On Fri, Jan 30, 2015 at 11:05 AM, Michael Malak michaelma...@yahoo.com
wrote:

 But isn't foldLeft() overkill for the originally stated use case of max
 diff of adjacent pairs? Isn't foldLeft() for recursive non-commutative
 non-associative accumulation as opposed to an embarrassingly parallel
 operation such as this one?

 This use case reminds me of FIR filtering in DSP. It seems that RDDs could
 use something that serves the same purpose as
 scala.collection.Iterator.sliding.

   --
  *From:* Koert Kuipers ko...@tresata.com
 *To:* Mohit Jaggi mohitja...@gmail.com
 *Cc:* Tobias Pfeiffer t...@preferred.jp; Ganelin, Ilya 
 ilya.gane...@capitalone.com; derrickburns derrickrbu...@gmail.com; 
 user@spark.apache.org user@spark.apache.org
 *Sent:* Friday, January 30, 2015 7:11 AM
 *Subject:* Re: spark challenge: zip with next???

 assuming the data can be partitioned then you have many timeseries for
 which you want to detect potential gaps. also assuming the resulting gaps
 info per timeseries is much smaller data then the timeseries data itself,
 then this is a classical example to me of a sorted (streaming) foldLeft,
 requiring an efficient secondary sort in the spark shuffle. i am trying to
 get that into spark here:
 https://issues.apache.org/jira/browse/SPARK-3655



 On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi mohitja...@gmail.com
 wrote:


 http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E

 you can use the MLLib function or do the following (which is what I had
 done):

 - in first pass over the data, using mapPartitionWithIndex, gather the
 first item in each partition. you can use collect (or aggregator) for this.
 “key” them by the partition index. at the end, you will have a map
(partition index) -- first item
 - in the second pass over the data, using mapPartitionWithIndex again,
 look at two (or in the general case N items at a time, you can use scala’s
 sliding iterator) items at a time and check the time difference(or any
 sliding window computation). To this mapParitition, pass the map created in
 previous step. You will need to use them to check the last item in this
 partition.

 If you can tolerate a few inaccuracies then you can just do the second
 step. You will miss the “boundaries” of the partitions but it might be
 acceptable for your use case.



 On Jan 29, 2015, at 4:36 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 On Fri, Jan 30, 2015 at 6:32 AM, Ganelin, Ilya 
 ilya.gane...@capitalone.com wrote:

  Make a copy of your RDD with an extra entry in the beginning to offset.
 The you can zip the two RDDs and run a map to generate an RDD of
 differences.


 Does that work? I recently tried something to compute differences between
 each entry and the next, so I did
   val rdd1 = ... // null element + rdd
   val rdd2 = ... // rdd + null element
 but got an error message about zip requiring data sizes in each partition
 to match.

 Tobias








Re: spark challenge: zip with next???

2015-01-30 Thread Koert Kuipers
yeah i meant foldLeft by key, sorted by date.
it is non-commutative because i care about the order of processing the
values (chronological). i dont see how i can do it with a reduce
efficiently, but i would be curious to hear otherwise. i might be biased
since this is such a typical operation in map-reduce.

so basically assuming its logs of servers being RDD[(String, Long)] where
String is the server name and Long is the timestamp, you keep a state that
contains the last observed timestamp (if any) and the list of found gaps.
so state type would be (Option[Long], List[Long]). as you process items in
the timeseries chronologically you always update the last observed
timestamp and possible add to the list of found gaps.

foldLeftByKey on RDD[(K, V)] looks something like this:
def foldLeftByKey(state: X)(update: (X, V) = X)(implicit ord:
Ordering[V]): RDD[(K, X)]

and the logic would be (just made this up, didnt test or compile):

rdd.foldLeftByKey((None: Option[Long]), List.empty[Long])){
  case ((Some(prev), gaps), curr) if (curr - prev  thres) = (Some(curr),
curr :: gaps) // gap found
  case ((_, gaps, curr) = ((Some(curr), gaps) // no gap found
}

the sort required within timeseries would be done efficiently by spark in
the shuffle (assuming sort-based shuffle is enabled). the foldLeftByKey
would never require the entire timeseries per key to be in memory. however
every timeseries would be processed by a single task, so it might take a
while if the timeseries is very large.

On Fri, Jan 30, 2015 at 11:05 AM, Michael Malak michaelma...@yahoo.com
wrote:

 But isn't foldLeft() overkill for the originally stated use case of max
 diff of adjacent pairs? Isn't foldLeft() for recursive non-commutative
 non-associative accumulation as opposed to an embarrassingly parallel
 operation such as this one?

 This use case reminds me of FIR filtering in DSP. It seems that RDDs could
 use something that serves the same purpose as
 scala.collection.Iterator.sliding.

   --
  *From:* Koert Kuipers ko...@tresata.com
 *To:* Mohit Jaggi mohitja...@gmail.com
 *Cc:* Tobias Pfeiffer t...@preferred.jp; Ganelin, Ilya 
 ilya.gane...@capitalone.com; derrickburns derrickrbu...@gmail.com; 
 user@spark.apache.org user@spark.apache.org
 *Sent:* Friday, January 30, 2015 7:11 AM
 *Subject:* Re: spark challenge: zip with next???

 assuming the data can be partitioned then you have many timeseries for
 which you want to detect potential gaps. also assuming the resulting gaps
 info per timeseries is much smaller data then the timeseries data itself,
 then this is a classical example to me of a sorted (streaming) foldLeft,
 requiring an efficient secondary sort in the spark shuffle. i am trying to
 get that into spark here:
 https://issues.apache.org/jira/browse/SPARK-3655



 On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi mohitja...@gmail.com
 wrote:


 http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E

 you can use the MLLib function or do the following (which is what I had
 done):

 - in first pass over the data, using mapPartitionWithIndex, gather the
 first item in each partition. you can use collect (or aggregator) for this.
 “key” them by the partition index. at the end, you will have a map
(partition index) -- first item
 - in the second pass over the data, using mapPartitionWithIndex again,
 look at two (or in the general case N items at a time, you can use scala’s
 sliding iterator) items at a time and check the time difference(or any
 sliding window computation). To this mapParitition, pass the map created in
 previous step. You will need to use them to check the last item in this
 partition.

 If you can tolerate a few inaccuracies then you can just do the second
 step. You will miss the “boundaries” of the partitions but it might be
 acceptable for your use case.



 On Jan 29, 2015, at 4:36 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 On Fri, Jan 30, 2015 at 6:32 AM, Ganelin, Ilya 
 ilya.gane...@capitalone.com wrote:

  Make a copy of your RDD with an extra entry in the beginning to offset.
 The you can zip the two RDDs and run a map to generate an RDD of
 differences.


 Does that work? I recently tried something to compute differences between
 each entry and the next, so I did
   val rdd1 = ... // null element + rdd
   val rdd2 = ... // rdd + null element
 but got an error message about zip requiring data sizes in each partition
 to match.

 Tobias








Re: spark challenge: zip with next???

2015-01-30 Thread Derrick Burns
Koert, thanks for the referral to your current pull request!  I found it
very thoughtful and thought-provoking.



On Fri, Jan 30, 2015 at 9:19 AM, Koert Kuipers ko...@tresata.com wrote:

 and if its a single giant timeseries that is already sorted then Mohit's
 solution sounds good to me.

 On Fri, Jan 30, 2015 at 11:05 AM, Michael Malak michaelma...@yahoo.com
 wrote:

 But isn't foldLeft() overkill for the originally stated use case of max
 diff of adjacent pairs? Isn't foldLeft() for recursive non-commutative
 non-associative accumulation as opposed to an embarrassingly parallel
 operation such as this one?

 This use case reminds me of FIR filtering in DSP. It seems that RDDs
 could use something that serves the same purpose as
 scala.collection.Iterator.sliding.

   --
  *From:* Koert Kuipers ko...@tresata.com
 *To:* Mohit Jaggi mohitja...@gmail.com
 *Cc:* Tobias Pfeiffer t...@preferred.jp; Ganelin, Ilya 
 ilya.gane...@capitalone.com; derrickburns derrickrbu...@gmail.com; 
 user@spark.apache.org user@spark.apache.org
 *Sent:* Friday, January 30, 2015 7:11 AM
 *Subject:* Re: spark challenge: zip with next???

 assuming the data can be partitioned then you have many timeseries for
 which you want to detect potential gaps. also assuming the resulting gaps
 info per timeseries is much smaller data then the timeseries data itself,
 then this is a classical example to me of a sorted (streaming) foldLeft,
 requiring an efficient secondary sort in the spark shuffle. i am trying to
 get that into spark here:
 https://issues.apache.org/jira/browse/SPARK-3655



 On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi mohitja...@gmail.com
 wrote:


 http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E

 you can use the MLLib function or do the following (which is what I had
 done):

 - in first pass over the data, using mapPartitionWithIndex, gather the
 first item in each partition. you can use collect (or aggregator) for this.
 “key” them by the partition index. at the end, you will have a map
(partition index) -- first item
 - in the second pass over the data, using mapPartitionWithIndex again,
 look at two (or in the general case N items at a time, you can use scala’s
 sliding iterator) items at a time and check the time difference(or any
 sliding window computation). To this mapParitition, pass the map created in
 previous step. You will need to use them to check the last item in this
 partition.

 If you can tolerate a few inaccuracies then you can just do the second
 step. You will miss the “boundaries” of the partitions but it might be
 acceptable for your use case.



 On Jan 29, 2015, at 4:36 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 On Fri, Jan 30, 2015 at 6:32 AM, Ganelin, Ilya 
 ilya.gane...@capitalone.com wrote:

  Make a copy of your RDD with an extra entry in the beginning to offset.
 The you can zip the two RDDs and run a map to generate an RDD of
 differences.


 Does that work? I recently tried something to compute differences between
 each entry and the next, so I did
   val rdd1 = ... // null element + rdd
   val rdd2 = ... // rdd + null element
 but got an error message about zip requiring data sizes in each partition
 to match.

 Tobias









Re: spark challenge: zip with next???

2015-01-30 Thread Koert Kuipers
assuming the data can be partitioned then you have many timeseries for
which you want to detect potential gaps. also assuming the resulting gaps
info per timeseries is much smaller data then the timeseries data itself,
then this is a classical example to me of a sorted (streaming) foldLeft,
requiring an efficient secondary sort in the spark shuffle. i am trying to
get that into spark here:
https://issues.apache.org/jira/browse/SPARK-3655

On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi mohitja...@gmail.com wrote:


 http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E

 you can use the MLLib function or do the following (which is what I had
 done):

 - in first pass over the data, using mapPartitionWithIndex, gather the
 first item in each partition. you can use collect (or aggregator) for this.
 “key” them by the partition index. at the end, you will have a map
(partition index) -- first item
 - in the second pass over the data, using mapPartitionWithIndex again,
 look at two (or in the general case N items at a time, you can use scala’s
 sliding iterator) items at a time and check the time difference(or any
 sliding window computation). To this mapParitition, pass the map created in
 previous step. You will need to use them to check the last item in this
 partition.

 If you can tolerate a few inaccuracies then you can just do the second
 step. You will miss the “boundaries” of the partitions but it might be
 acceptable for your use case.



 On Jan 29, 2015, at 4:36 PM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 On Fri, Jan 30, 2015 at 6:32 AM, Ganelin, Ilya 
 ilya.gane...@capitalone.com wrote:

  Make a copy of your RDD with an extra entry in the beginning to offset.
 The you can zip the two RDDs and run a map to generate an RDD of
 differences.


 Does that work? I recently tried something to compute differences between
 each entry and the next, so I did
   val rdd1 = ... // null element + rdd
   val rdd2 = ... // rdd + null element
 but got an error message about zip requiring data sizes in each partition
 to match.

 Tobias





RE: spark challenge: zip with next???

2015-01-29 Thread Mohammed Guller
Another solution would be to use the reduce action.

Mohammed

From: Ganelin, Ilya [mailto:ilya.gane...@capitalone.com]
Sent: Thursday, January 29, 2015 1:32 PM
To: 'derrickburns'; 'user@spark.apache.org'
Subject: RE: spark challenge: zip with next???

Make a copy of your RDD with an extra entry in the beginning to offset. The you 
can zip the two RDDs and run a map to generate an RDD of differences.



Sent with Good (www.good.comhttp://www.good.com)


-Original Message-
From: derrickburns [derrickrbu...@gmail.commailto:derrickrbu...@gmail.com]
Sent: Thursday, January 29, 2015 02:52 PM Eastern Standard Time
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: spark challenge: zip with next???



Here is a spark challenge for you!

I have a data set where each entry has a date.  I would like to identify
gaps in the dates greater larger a given length.  For example, if the data
were log entries, then the gaps would tell me when I was missing log data
for long periods of time. What is the most efficient way to achieve this in
Spark?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-challenge-zip-with-next-tp21423.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: spark challenge: zip with next???

2015-01-29 Thread Ganelin, Ilya
Make a copy of your RDD with an extra entry in the beginning to offset. The you 
can zip the two RDDs and run a map to generate an RDD of differences.



Sent with Good (www.good.com)


-Original Message-
From: derrickburns [derrickrbu...@gmail.commailto:derrickrbu...@gmail.com]
Sent: Thursday, January 29, 2015 02:52 PM Eastern Standard Time
To: user@spark.apache.org
Subject: spark challenge: zip with next???


Here is a spark challenge for you!

I have a data set where each entry has a date.  I would like to identify
gaps in the dates greater larger a given length.  For example, if the data
were log entries, then the gaps would tell me when I was missing log data
for long periods of time. What is the most efficient way to achieve this in
Spark?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-challenge-zip-with-next-tp21423.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: spark challenge: zip with next???

2015-01-29 Thread Tobias Pfeiffer
Hi,

On Fri, Jan 30, 2015 at 6:32 AM, Ganelin, Ilya ilya.gane...@capitalone.com
wrote:

  Make a copy of your RDD with an extra entry in the beginning to offset.
 The you can zip the two RDDs and run a map to generate an RDD of
 differences.


Does that work? I recently tried something to compute differences between
each entry and the next, so I did
  val rdd1 = ... // null element + rdd
  val rdd2 = ... // rdd + null element
but got an error message about zip requiring data sizes in each partition
to match.

Tobias


Re: spark challenge: zip with next???

2015-01-29 Thread Mohit Jaggi
http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E
 
http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E

you can use the MLLib function or do the following (which is what I had done):

- in first pass over the data, using mapPartitionWithIndex, gather the first 
item in each partition. you can use collect (or aggregator) for this. “key” 
them by the partition index. at the end, you will have a map
   (partition index) -- first item
- in the second pass over the data, using mapPartitionWithIndex again, look at 
two (or in the general case N items at a time, you can use scala’s sliding 
iterator) items at a time and check the time difference(or any sliding window 
computation). To this mapParitition, pass the map created in previous step. You 
will need to use them to check the last item in this partition.

If you can tolerate a few inaccuracies then you can just do the second step. 
You will miss the “boundaries” of the partitions but it might be acceptable for 
your use case.



 On Jan 29, 2015, at 4:36 PM, Tobias Pfeiffer t...@preferred.jp wrote:
 
 Hi,
 
 On Fri, Jan 30, 2015 at 6:32 AM, Ganelin, Ilya ilya.gane...@capitalone.com 
 mailto:ilya.gane...@capitalone.com wrote:
 Make a copy of your RDD with an extra entry in the beginning to offset. The 
 you can zip the two RDDs and run a map to generate an RDD of differences.
 
 Does that work? I recently tried something to compute differences between 
 each entry and the next, so I did
   val rdd1 = ... // null element + rdd
   val rdd2 = ... // rdd + null element
 but got an error message about zip requiring data sizes in each partition to 
 match.
 
 Tobias