if you are joining successive lines together based on a predicate, then you are doing a "flatMap" not an "aggregate". you are on the right track with a multi-pass solution. i had the same challenge when i needed a sliding window over an RDD(see below).
[ i had suggested that the sliding window API be moved to spark-core. not sure if that happened ] ----- previous posts --- http://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.rdd.RDDFunctions > On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi <mohitja...@gmail.com> > wrote: > > > http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E > > you can use the MLLib function or do the following (which is what I had > done): > > - in first pass over the data, using mapPartitionWithIndex, gather the > first item in each partition. you can use collect (or aggregator) for this. > “key” them by the partition index. at the end, you will have a map > (partition index) --> first item > - in the second pass over the data, using mapPartitionWithIndex again, > look at two (or in the general case N items at a time, you can use scala’s > sliding iterator) items at a time and check the time difference(or any > sliding window computation). To this mapParitition, pass the map created in > previous step. You will need to use them to check the last item in this > partition. > > If you can tolerate a few inaccuracies then you can just do the second > step. You will miss the “boundaries” of the partitions but it might be > acceptable for your use case. On Tue, Jun 30, 2015 at 12:21 PM, RJ Nowling <rnowl...@gmail.com> wrote: > That's an interesting idea! I hadn't considered that. However, looking > at the Partitioner interface, I would need to know from looking at a single > key which doesn't fit my case, unfortunately. For my case, I need to > compare successive pairs of keys. (I'm trying to re-join lines that were > split prematurely.) > > On Tue, Jun 30, 2015 at 2:07 PM, Abhishek R. Singh < > abhis...@tetrationanalytics.com> wrote: > >> could you use a custom partitioner to preserve boundaries such that all >> related tuples end up on the same partition? >> >> On Jun 30, 2015, at 12:00 PM, RJ Nowling <rnowl...@gmail.com> wrote: >> >> Thanks, Reynold. I still need to handle incomplete groups that fall >> between partition boundaries. So, I need a two-pass approach. I came up >> with a somewhat hacky way to handle those using the partition indices and >> key-value pairs as a second pass after the first. >> >> OCaml's std library provides a function called group() that takes a break >> function that operators on pairs of successive elements. It seems a >> similar approach could be used in Spark and would be more efficient than my >> approach with key-value pairs since you know the ordering of the partitions. >> >> Has this need been expressed by others? >> >> On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin <r...@databricks.com> wrote: >> >>> Try mapPartitions, which gives you an iterator, and you can produce an >>> iterator back. >>> >>> >>> On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling <rnowl...@gmail.com> wrote: >>> >>>> Hi all, >>>> >>>> I have a problem where I have a RDD of elements: >>>> >>>> Item1 Item2 Item3 Item4 Item5 Item6 ... >>>> >>>> and I want to run a function over them to decide which runs of elements >>>> to group together: >>>> >>>> [Item1 Item2] [Item3] [Item4 Item5 Item6] ... >>>> >>>> Technically, I could use aggregate to do this, but I would have to use >>>> a List of List of T which would produce a very large collection in memory. >>>> >>>> Is there an easy way to accomplish this? e.g.,, it would be nice to >>>> have a version of aggregate where the combination function can return a >>>> complete group that is added to the new RDD and an incomplete group which >>>> is passed to the next call of the reduce function. >>>> >>>> Thanks, >>>> RJ >>>> >>> >>> >> >> >