Re: spark challenge: zip with next???
But isn't foldLeft() overkill for the originally stated use case of max diff of adjacent pairs? Isn't foldLeft() for recursive non-commutative non-associative accumulation as opposed to an embarrassingly parallel operation such as this one? This use case reminds me of FIR filtering in DSP. It seems that RDDs could use something that serves the same purpose as scala.collection.Iterator.sliding. From: Koert Kuipers ko...@tresata.com To: Mohit Jaggi mohitja...@gmail.com Cc: Tobias Pfeiffer t...@preferred.jp; Ganelin, Ilya ilya.gane...@capitalone.com; derrickburns derrickrbu...@gmail.com; user@spark.apache.org user@spark.apache.org Sent: Friday, January 30, 2015 7:11 AM Subject: Re: spark challenge: zip with next??? assuming the data can be partitioned then you have many timeseries for which you want to detect potential gaps. also assuming the resulting gaps info per timeseries is much smaller data then the timeseries data itself, then this is a classical example to me of a sorted (streaming) foldLeft, requiring an efficient secondary sort in the spark shuffle. i am trying to get that into spark here: https://issues.apache.org/jira/browse/SPARK-3655 On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi mohitja...@gmail.com wrote: http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E you can use the MLLib function or do the following (which is what I had done): - in first pass over the data, using mapPartitionWithIndex, gather the first item in each partition. you can use collect (or aggregator) for this. “key” them by the partition index. at the end, you will have a map (partition index) -- first item- in the second pass over the data, using mapPartitionWithIndex again, look at two (or in the general case N items at a time, you can use scala’s sliding iterator) items at a time and check the time difference(or any sliding window computation). To this mapParitition, pass the map created in previous step. You will need to use them to check the last item in this partition. If you can tolerate a few inaccuracies then you can just do the second step. You will miss the “boundaries” of the partitions but it might be acceptable for your use case. On Jan 29, 2015, at 4:36 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Fri, Jan 30, 2015 at 6:32 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Make a copy of your RDD with an extra entry in the beginning to offset. The you can zip the two RDDs and run a map to generate an RDD of differences. Does that work? I recently tried something to compute differences between each entry and the next, so I did val rdd1 = ... // null element + rdd val rdd2 = ... // rdd + null elementbut got an error message about zip requiring data sizes in each partition to match. Tobias
Re: spark challenge: zip with next???
and if its a single giant timeseries that is already sorted then Mohit's solution sounds good to me. On Fri, Jan 30, 2015 at 11:05 AM, Michael Malak michaelma...@yahoo.com wrote: But isn't foldLeft() overkill for the originally stated use case of max diff of adjacent pairs? Isn't foldLeft() for recursive non-commutative non-associative accumulation as opposed to an embarrassingly parallel operation such as this one? This use case reminds me of FIR filtering in DSP. It seems that RDDs could use something that serves the same purpose as scala.collection.Iterator.sliding. -- *From:* Koert Kuipers ko...@tresata.com *To:* Mohit Jaggi mohitja...@gmail.com *Cc:* Tobias Pfeiffer t...@preferred.jp; Ganelin, Ilya ilya.gane...@capitalone.com; derrickburns derrickrbu...@gmail.com; user@spark.apache.org user@spark.apache.org *Sent:* Friday, January 30, 2015 7:11 AM *Subject:* Re: spark challenge: zip with next??? assuming the data can be partitioned then you have many timeseries for which you want to detect potential gaps. also assuming the resulting gaps info per timeseries is much smaller data then the timeseries data itself, then this is a classical example to me of a sorted (streaming) foldLeft, requiring an efficient secondary sort in the spark shuffle. i am trying to get that into spark here: https://issues.apache.org/jira/browse/SPARK-3655 On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi mohitja...@gmail.com wrote: http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E you can use the MLLib function or do the following (which is what I had done): - in first pass over the data, using mapPartitionWithIndex, gather the first item in each partition. you can use collect (or aggregator) for this. “key” them by the partition index. at the end, you will have a map (partition index) -- first item - in the second pass over the data, using mapPartitionWithIndex again, look at two (or in the general case N items at a time, you can use scala’s sliding iterator) items at a time and check the time difference(or any sliding window computation). To this mapParitition, pass the map created in previous step. You will need to use them to check the last item in this partition. If you can tolerate a few inaccuracies then you can just do the second step. You will miss the “boundaries” of the partitions but it might be acceptable for your use case. On Jan 29, 2015, at 4:36 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Fri, Jan 30, 2015 at 6:32 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Make a copy of your RDD with an extra entry in the beginning to offset. The you can zip the two RDDs and run a map to generate an RDD of differences. Does that work? I recently tried something to compute differences between each entry and the next, so I did val rdd1 = ... // null element + rdd val rdd2 = ... // rdd + null element but got an error message about zip requiring data sizes in each partition to match. Tobias
Re: spark challenge: zip with next???
yeah i meant foldLeft by key, sorted by date. it is non-commutative because i care about the order of processing the values (chronological). i dont see how i can do it with a reduce efficiently, but i would be curious to hear otherwise. i might be biased since this is such a typical operation in map-reduce. so basically assuming its logs of servers being RDD[(String, Long)] where String is the server name and Long is the timestamp, you keep a state that contains the last observed timestamp (if any) and the list of found gaps. so state type would be (Option[Long], List[Long]). as you process items in the timeseries chronologically you always update the last observed timestamp and possible add to the list of found gaps. foldLeftByKey on RDD[(K, V)] looks something like this: def foldLeftByKey(state: X)(update: (X, V) = X)(implicit ord: Ordering[V]): RDD[(K, X)] and the logic would be (just made this up, didnt test or compile): rdd.foldLeftByKey((None: Option[Long]), List.empty[Long])){ case ((Some(prev), gaps), curr) if (curr - prev thres) = (Some(curr), curr :: gaps) // gap found case ((_, gaps, curr) = ((Some(curr), gaps) // no gap found } the sort required within timeseries would be done efficiently by spark in the shuffle (assuming sort-based shuffle is enabled). the foldLeftByKey would never require the entire timeseries per key to be in memory. however every timeseries would be processed by a single task, so it might take a while if the timeseries is very large. On Fri, Jan 30, 2015 at 11:05 AM, Michael Malak michaelma...@yahoo.com wrote: But isn't foldLeft() overkill for the originally stated use case of max diff of adjacent pairs? Isn't foldLeft() for recursive non-commutative non-associative accumulation as opposed to an embarrassingly parallel operation such as this one? This use case reminds me of FIR filtering in DSP. It seems that RDDs could use something that serves the same purpose as scala.collection.Iterator.sliding. -- *From:* Koert Kuipers ko...@tresata.com *To:* Mohit Jaggi mohitja...@gmail.com *Cc:* Tobias Pfeiffer t...@preferred.jp; Ganelin, Ilya ilya.gane...@capitalone.com; derrickburns derrickrbu...@gmail.com; user@spark.apache.org user@spark.apache.org *Sent:* Friday, January 30, 2015 7:11 AM *Subject:* Re: spark challenge: zip with next??? assuming the data can be partitioned then you have many timeseries for which you want to detect potential gaps. also assuming the resulting gaps info per timeseries is much smaller data then the timeseries data itself, then this is a classical example to me of a sorted (streaming) foldLeft, requiring an efficient secondary sort in the spark shuffle. i am trying to get that into spark here: https://issues.apache.org/jira/browse/SPARK-3655 On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi mohitja...@gmail.com wrote: http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E you can use the MLLib function or do the following (which is what I had done): - in first pass over the data, using mapPartitionWithIndex, gather the first item in each partition. you can use collect (or aggregator) for this. “key” them by the partition index. at the end, you will have a map (partition index) -- first item - in the second pass over the data, using mapPartitionWithIndex again, look at two (or in the general case N items at a time, you can use scala’s sliding iterator) items at a time and check the time difference(or any sliding window computation). To this mapParitition, pass the map created in previous step. You will need to use them to check the last item in this partition. If you can tolerate a few inaccuracies then you can just do the second step. You will miss the “boundaries” of the partitions but it might be acceptable for your use case. On Jan 29, 2015, at 4:36 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Fri, Jan 30, 2015 at 6:32 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Make a copy of your RDD with an extra entry in the beginning to offset. The you can zip the two RDDs and run a map to generate an RDD of differences. Does that work? I recently tried something to compute differences between each entry and the next, so I did val rdd1 = ... // null element + rdd val rdd2 = ... // rdd + null element but got an error message about zip requiring data sizes in each partition to match. Tobias
Re: spark challenge: zip with next???
Koert, thanks for the referral to your current pull request! I found it very thoughtful and thought-provoking. On Fri, Jan 30, 2015 at 9:19 AM, Koert Kuipers ko...@tresata.com wrote: and if its a single giant timeseries that is already sorted then Mohit's solution sounds good to me. On Fri, Jan 30, 2015 at 11:05 AM, Michael Malak michaelma...@yahoo.com wrote: But isn't foldLeft() overkill for the originally stated use case of max diff of adjacent pairs? Isn't foldLeft() for recursive non-commutative non-associative accumulation as opposed to an embarrassingly parallel operation such as this one? This use case reminds me of FIR filtering in DSP. It seems that RDDs could use something that serves the same purpose as scala.collection.Iterator.sliding. -- *From:* Koert Kuipers ko...@tresata.com *To:* Mohit Jaggi mohitja...@gmail.com *Cc:* Tobias Pfeiffer t...@preferred.jp; Ganelin, Ilya ilya.gane...@capitalone.com; derrickburns derrickrbu...@gmail.com; user@spark.apache.org user@spark.apache.org *Sent:* Friday, January 30, 2015 7:11 AM *Subject:* Re: spark challenge: zip with next??? assuming the data can be partitioned then you have many timeseries for which you want to detect potential gaps. also assuming the resulting gaps info per timeseries is much smaller data then the timeseries data itself, then this is a classical example to me of a sorted (streaming) foldLeft, requiring an efficient secondary sort in the spark shuffle. i am trying to get that into spark here: https://issues.apache.org/jira/browse/SPARK-3655 On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi mohitja...@gmail.com wrote: http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E you can use the MLLib function or do the following (which is what I had done): - in first pass over the data, using mapPartitionWithIndex, gather the first item in each partition. you can use collect (or aggregator) for this. “key” them by the partition index. at the end, you will have a map (partition index) -- first item - in the second pass over the data, using mapPartitionWithIndex again, look at two (or in the general case N items at a time, you can use scala’s sliding iterator) items at a time and check the time difference(or any sliding window computation). To this mapParitition, pass the map created in previous step. You will need to use them to check the last item in this partition. If you can tolerate a few inaccuracies then you can just do the second step. You will miss the “boundaries” of the partitions but it might be acceptable for your use case. On Jan 29, 2015, at 4:36 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Fri, Jan 30, 2015 at 6:32 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Make a copy of your RDD with an extra entry in the beginning to offset. The you can zip the two RDDs and run a map to generate an RDD of differences. Does that work? I recently tried something to compute differences between each entry and the next, so I did val rdd1 = ... // null element + rdd val rdd2 = ... // rdd + null element but got an error message about zip requiring data sizes in each partition to match. Tobias
Re: spark challenge: zip with next???
assuming the data can be partitioned then you have many timeseries for which you want to detect potential gaps. also assuming the resulting gaps info per timeseries is much smaller data then the timeseries data itself, then this is a classical example to me of a sorted (streaming) foldLeft, requiring an efficient secondary sort in the spark shuffle. i am trying to get that into spark here: https://issues.apache.org/jira/browse/SPARK-3655 On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi mohitja...@gmail.com wrote: http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E you can use the MLLib function or do the following (which is what I had done): - in first pass over the data, using mapPartitionWithIndex, gather the first item in each partition. you can use collect (or aggregator) for this. “key” them by the partition index. at the end, you will have a map (partition index) -- first item - in the second pass over the data, using mapPartitionWithIndex again, look at two (or in the general case N items at a time, you can use scala’s sliding iterator) items at a time and check the time difference(or any sliding window computation). To this mapParitition, pass the map created in previous step. You will need to use them to check the last item in this partition. If you can tolerate a few inaccuracies then you can just do the second step. You will miss the “boundaries” of the partitions but it might be acceptable for your use case. On Jan 29, 2015, at 4:36 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Fri, Jan 30, 2015 at 6:32 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Make a copy of your RDD with an extra entry in the beginning to offset. The you can zip the two RDDs and run a map to generate an RDD of differences. Does that work? I recently tried something to compute differences between each entry and the next, so I did val rdd1 = ... // null element + rdd val rdd2 = ... // rdd + null element but got an error message about zip requiring data sizes in each partition to match. Tobias
RE: spark challenge: zip with next???
Another solution would be to use the reduce action. Mohammed From: Ganelin, Ilya [mailto:ilya.gane...@capitalone.com] Sent: Thursday, January 29, 2015 1:32 PM To: 'derrickburns'; 'user@spark.apache.org' Subject: RE: spark challenge: zip with next??? Make a copy of your RDD with an extra entry in the beginning to offset. The you can zip the two RDDs and run a map to generate an RDD of differences. Sent with Good (www.good.comhttp://www.good.com) -Original Message- From: derrickburns [derrickrbu...@gmail.commailto:derrickrbu...@gmail.com] Sent: Thursday, January 29, 2015 02:52 PM Eastern Standard Time To: user@spark.apache.orgmailto:user@spark.apache.org Subject: spark challenge: zip with next??? Here is a spark challenge for you! I have a data set where each entry has a date. I would like to identify gaps in the dates greater larger a given length. For example, if the data were log entries, then the gaps would tell me when I was missing log data for long periods of time. What is the most efficient way to achieve this in Spark? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-challenge-zip-with-next-tp21423.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
RE: spark challenge: zip with next???
Make a copy of your RDD with an extra entry in the beginning to offset. The you can zip the two RDDs and run a map to generate an RDD of differences. Sent with Good (www.good.com) -Original Message- From: derrickburns [derrickrbu...@gmail.commailto:derrickrbu...@gmail.com] Sent: Thursday, January 29, 2015 02:52 PM Eastern Standard Time To: user@spark.apache.org Subject: spark challenge: zip with next??? Here is a spark challenge for you! I have a data set where each entry has a date. I would like to identify gaps in the dates greater larger a given length. For example, if the data were log entries, then the gaps would tell me when I was missing log data for long periods of time. What is the most efficient way to achieve this in Spark? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-challenge-zip-with-next-tp21423.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: spark challenge: zip with next???
Hi, On Fri, Jan 30, 2015 at 6:32 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Make a copy of your RDD with an extra entry in the beginning to offset. The you can zip the two RDDs and run a map to generate an RDD of differences. Does that work? I recently tried something to compute differences between each entry and the next, so I did val rdd1 = ... // null element + rdd val rdd2 = ... // rdd + null element but got an error message about zip requiring data sizes in each partition to match. Tobias
Re: spark challenge: zip with next???
http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E you can use the MLLib function or do the following (which is what I had done): - in first pass over the data, using mapPartitionWithIndex, gather the first item in each partition. you can use collect (or aggregator) for this. “key” them by the partition index. at the end, you will have a map (partition index) -- first item - in the second pass over the data, using mapPartitionWithIndex again, look at two (or in the general case N items at a time, you can use scala’s sliding iterator) items at a time and check the time difference(or any sliding window computation). To this mapParitition, pass the map created in previous step. You will need to use them to check the last item in this partition. If you can tolerate a few inaccuracies then you can just do the second step. You will miss the “boundaries” of the partitions but it might be acceptable for your use case. On Jan 29, 2015, at 4:36 PM, Tobias Pfeiffer t...@preferred.jp wrote: Hi, On Fri, Jan 30, 2015 at 6:32 AM, Ganelin, Ilya ilya.gane...@capitalone.com mailto:ilya.gane...@capitalone.com wrote: Make a copy of your RDD with an extra entry in the beginning to offset. The you can zip the two RDDs and run a map to generate an RDD of differences. Does that work? I recently tried something to compute differences between each entry and the next, so I did val rdd1 = ... // null element + rdd val rdd2 = ... // rdd + null element but got an error message about zip requiring data sizes in each partition to match. Tobias