Re: Grouping runs of elements in a RDD
Thanks, Mohit. It sounds like we're on the same page -- I used a similar approach. On Thu, Jul 2, 2015 at 12:27 PM, Mohit Jaggi wrote: > if you are joining successive lines together based on a predicate, then > you are doing a "flatMap" not an "aggregate". you are on the right track > with a multi-pass solution. i had the same challenge when i needed a > sliding window over an RDD(see below). > > [ i had suggested that the sliding window API be moved to spark-core. not > sure if that happened ] > > - previous posts --- > > > http://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.rdd.RDDFunctions > > > On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi > > wrote: > > > > > > http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E > > > > you can use the MLLib function or do the following (which is what I had > > done): > > > > - in first pass over the data, using mapPartitionWithIndex, gather the > > first item in each partition. you can use collect (or aggregator) for this. > > “key” them by the partition index. at the end, you will have a map > >(partition index) --> first item > > - in the second pass over the data, using mapPartitionWithIndex again, > > look at two (or in the general case N items at a time, you can use scala’s > > sliding iterator) items at a time and check the time difference(or any > > sliding window computation). To this mapParitition, pass the map created in > > previous step. You will need to use them to check the last item in this > > partition. > > > > If you can tolerate a few inaccuracies then you can just do the second > > step. You will miss the “boundaries” of the partitions but it might be > > acceptable for your use case. > > > On Tue, Jun 30, 2015 at 12:21 PM, RJ Nowling wrote: > >> That's an interesting idea! I hadn't considered that. However, looking >> at the Partitioner interface, I would need to know from looking at a single >> key which doesn't fit my case, unfortunately. For my case, I need to >> compare successive pairs of keys. (I'm trying to re-join lines that were >> split prematurely.) >> >> On Tue, Jun 30, 2015 at 2:07 PM, Abhishek R. Singh < >> abhis...@tetrationanalytics.com> wrote: >> >>> could you use a custom partitioner to preserve boundaries such that all >>> related tuples end up on the same partition? >>> >>> On Jun 30, 2015, at 12:00 PM, RJ Nowling wrote: >>> >>> Thanks, Reynold. I still need to handle incomplete groups that fall >>> between partition boundaries. So, I need a two-pass approach. I came up >>> with a somewhat hacky way to handle those using the partition indices and >>> key-value pairs as a second pass after the first. >>> >>> OCaml's std library provides a function called group() that takes a >>> break function that operators on pairs of successive elements. It seems a >>> similar approach could be used in Spark and would be more efficient than my >>> approach with key-value pairs since you know the ordering of the partitions. >>> >>> Has this need been expressed by others? >>> >>> On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin >>> wrote: >>> Try mapPartitions, which gives you an iterator, and you can produce an iterator back. On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling wrote: > Hi all, > > I have a problem where I have a RDD of elements: > > Item1 Item2 Item3 Item4 Item5 Item6 ... > > and I want to run a function over them to decide which runs of > elements to group together: > > [Item1 Item2] [Item3] [Item4 Item5 Item6] ... > > Technically, I could use aggregate to do this, but I would have to use > a List of List of T which would produce a very large collection in memory. > > Is there an easy way to accomplish this? e.g.,, it would be nice to > have a version of aggregate where the combination function can return a > complete group that is added to the new RDD and an incomplete group which > is passed to the next call of the reduce function. > > Thanks, > RJ > >>> >>> >> >
Re: Grouping runs of elements in a RDD
if you are joining successive lines together based on a predicate, then you are doing a "flatMap" not an "aggregate". you are on the right track with a multi-pass solution. i had the same challenge when i needed a sliding window over an RDD(see below). [ i had suggested that the sliding window API be moved to spark-core. not sure if that happened ] - previous posts --- http://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.rdd.RDDFunctions > On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi > wrote: > > > http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E > > you can use the MLLib function or do the following (which is what I had > done): > > - in first pass over the data, using mapPartitionWithIndex, gather the > first item in each partition. you can use collect (or aggregator) for this. > “key” them by the partition index. at the end, you will have a map >(partition index) --> first item > - in the second pass over the data, using mapPartitionWithIndex again, > look at two (or in the general case N items at a time, you can use scala’s > sliding iterator) items at a time and check the time difference(or any > sliding window computation). To this mapParitition, pass the map created in > previous step. You will need to use them to check the last item in this > partition. > > If you can tolerate a few inaccuracies then you can just do the second > step. You will miss the “boundaries” of the partitions but it might be > acceptable for your use case. On Tue, Jun 30, 2015 at 12:21 PM, RJ Nowling wrote: > That's an interesting idea! I hadn't considered that. However, looking > at the Partitioner interface, I would need to know from looking at a single > key which doesn't fit my case, unfortunately. For my case, I need to > compare successive pairs of keys. (I'm trying to re-join lines that were > split prematurely.) > > On Tue, Jun 30, 2015 at 2:07 PM, Abhishek R. Singh < > abhis...@tetrationanalytics.com> wrote: > >> could you use a custom partitioner to preserve boundaries such that all >> related tuples end up on the same partition? >> >> On Jun 30, 2015, at 12:00 PM, RJ Nowling wrote: >> >> Thanks, Reynold. I still need to handle incomplete groups that fall >> between partition boundaries. So, I need a two-pass approach. I came up >> with a somewhat hacky way to handle those using the partition indices and >> key-value pairs as a second pass after the first. >> >> OCaml's std library provides a function called group() that takes a break >> function that operators on pairs of successive elements. It seems a >> similar approach could be used in Spark and would be more efficient than my >> approach with key-value pairs since you know the ordering of the partitions. >> >> Has this need been expressed by others? >> >> On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin wrote: >> >>> Try mapPartitions, which gives you an iterator, and you can produce an >>> iterator back. >>> >>> >>> On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling wrote: >>> Hi all, I have a problem where I have a RDD of elements: Item1 Item2 Item3 Item4 Item5 Item6 ... and I want to run a function over them to decide which runs of elements to group together: [Item1 Item2] [Item3] [Item4 Item5 Item6] ... Technically, I could use aggregate to do this, but I would have to use a List of List of T which would produce a very large collection in memory. Is there an easy way to accomplish this? e.g.,, it would be nice to have a version of aggregate where the combination function can return a complete group that is added to the new RDD and an incomplete group which is passed to the next call of the reduce function. Thanks, RJ >>> >>> >> >> >
Re: Grouping runs of elements in a RDD
That's an interesting idea! I hadn't considered that. However, looking at the Partitioner interface, I would need to know from looking at a single key which doesn't fit my case, unfortunately. For my case, I need to compare successive pairs of keys. (I'm trying to re-join lines that were split prematurely.) On Tue, Jun 30, 2015 at 2:07 PM, Abhishek R. Singh < abhis...@tetrationanalytics.com> wrote: > could you use a custom partitioner to preserve boundaries such that all > related tuples end up on the same partition? > > On Jun 30, 2015, at 12:00 PM, RJ Nowling wrote: > > Thanks, Reynold. I still need to handle incomplete groups that fall > between partition boundaries. So, I need a two-pass approach. I came up > with a somewhat hacky way to handle those using the partition indices and > key-value pairs as a second pass after the first. > > OCaml's std library provides a function called group() that takes a break > function that operators on pairs of successive elements. It seems a > similar approach could be used in Spark and would be more efficient than my > approach with key-value pairs since you know the ordering of the partitions. > > Has this need been expressed by others? > > On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin wrote: > >> Try mapPartitions, which gives you an iterator, and you can produce an >> iterator back. >> >> >> On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling wrote: >> >>> Hi all, >>> >>> I have a problem where I have a RDD of elements: >>> >>> Item1 Item2 Item3 Item4 Item5 Item6 ... >>> >>> and I want to run a function over them to decide which runs of elements >>> to group together: >>> >>> [Item1 Item2] [Item3] [Item4 Item5 Item6] ... >>> >>> Technically, I could use aggregate to do this, but I would have to use a >>> List of List of T which would produce a very large collection in memory. >>> >>> Is there an easy way to accomplish this? e.g.,, it would be nice to >>> have a version of aggregate where the combination function can return a >>> complete group that is added to the new RDD and an incomplete group which >>> is passed to the next call of the reduce function. >>> >>> Thanks, >>> RJ >>> >> >> > >
Re: Grouping runs of elements in a RDD
could you use a custom partitioner to preserve boundaries such that all related tuples end up on the same partition? On Jun 30, 2015, at 12:00 PM, RJ Nowling wrote: > Thanks, Reynold. I still need to handle incomplete groups that fall between > partition boundaries. So, I need a two-pass approach. I came up with a > somewhat hacky way to handle those using the partition indices and key-value > pairs as a second pass after the first. > > OCaml's std library provides a function called group() that takes a break > function that operators on pairs of successive elements. It seems a similar > approach could be used in Spark and would be more efficient than my approach > with key-value pairs since you know the ordering of the partitions. > > Has this need been expressed by others? > > On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin wrote: > Try mapPartitions, which gives you an iterator, and you can produce an > iterator back. > > > On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling wrote: > Hi all, > > I have a problem where I have a RDD of elements: > > Item1 Item2 Item3 Item4 Item5 Item6 ... > > and I want to run a function over them to decide which runs of elements to > group together: > > [Item1 Item2] [Item3] [Item4 Item5 Item6] ... > > Technically, I could use aggregate to do this, but I would have to use a List > of List of T which would produce a very large collection in memory. > > Is there an easy way to accomplish this? e.g.,, it would be nice to have a > version of aggregate where the combination function can return a complete > group that is added to the new RDD and an incomplete group which is passed to > the next call of the reduce function. > > Thanks, > RJ > >
Re: Grouping runs of elements in a RDD
Thanks, Reynold. I still need to handle incomplete groups that fall between partition boundaries. So, I need a two-pass approach. I came up with a somewhat hacky way to handle those using the partition indices and key-value pairs as a second pass after the first. OCaml's std library provides a function called group() that takes a break function that operators on pairs of successive elements. It seems a similar approach could be used in Spark and would be more efficient than my approach with key-value pairs since you know the ordering of the partitions. Has this need been expressed by others? On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin wrote: > Try mapPartitions, which gives you an iterator, and you can produce an > iterator back. > > > On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling wrote: > >> Hi all, >> >> I have a problem where I have a RDD of elements: >> >> Item1 Item2 Item3 Item4 Item5 Item6 ... >> >> and I want to run a function over them to decide which runs of elements >> to group together: >> >> [Item1 Item2] [Item3] [Item4 Item5 Item6] ... >> >> Technically, I could use aggregate to do this, but I would have to use a >> List of List of T which would produce a very large collection in memory. >> >> Is there an easy way to accomplish this? e.g.,, it would be nice to have >> a version of aggregate where the combination function can return a complete >> group that is added to the new RDD and an incomplete group which is passed >> to the next call of the reduce function. >> >> Thanks, >> RJ >> > >
Re: Grouping runs of elements in a RDD
Try mapPartitions, which gives you an iterator, and you can produce an iterator back. On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling wrote: > Hi all, > > I have a problem where I have a RDD of elements: > > Item1 Item2 Item3 Item4 Item5 Item6 ... > > and I want to run a function over them to decide which runs of elements to > group together: > > [Item1 Item2] [Item3] [Item4 Item5 Item6] ... > > Technically, I could use aggregate to do this, but I would have to use a > List of List of T which would produce a very large collection in memory. > > Is there an easy way to accomplish this? e.g.,, it would be nice to have > a version of aggregate where the combination function can return a complete > group that is added to the new RDD and an incomplete group which is passed > to the next call of the reduce function. > > Thanks, > RJ >
Grouping runs of elements in a RDD
Hi all, I have a problem where I have a RDD of elements: Item1 Item2 Item3 Item4 Item5 Item6 ... and I want to run a function over them to decide which runs of elements to group together: [Item1 Item2] [Item3] [Item4 Item5 Item6] ... Technically, I could use aggregate to do this, but I would have to use a List of List of T which would produce a very large collection in memory. Is there an easy way to accomplish this? e.g.,, it would be nice to have a version of aggregate where the combination function can return a complete group that is added to the new RDD and an incomplete group which is passed to the next call of the reduce function. Thanks, RJ