Re: Grouping runs of elements in a RDD
if you are joining successive lines together based on a predicate, then you are doing a flatMap not an aggregate. you are on the right track with a multi-pass solution. i had the same challenge when i needed a sliding window over an RDD(see below). [ i had suggested that the sliding window API be moved to spark-core. not sure if that happened ] - previous posts --- http://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.rdd.RDDFunctions On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi mohitja...@gmail.com wrote: http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E you can use the MLLib function or do the following (which is what I had done): - in first pass over the data, using mapPartitionWithIndex, gather the first item in each partition. you can use collect (or aggregator) for this. “key” them by the partition index. at the end, you will have a map (partition index) -- first item - in the second pass over the data, using mapPartitionWithIndex again, look at two (or in the general case N items at a time, you can use scala’s sliding iterator) items at a time and check the time difference(or any sliding window computation). To this mapParitition, pass the map created in previous step. You will need to use them to check the last item in this partition. If you can tolerate a few inaccuracies then you can just do the second step. You will miss the “boundaries” of the partitions but it might be acceptable for your use case. On Tue, Jun 30, 2015 at 12:21 PM, RJ Nowling rnowl...@gmail.com wrote: That's an interesting idea! I hadn't considered that. However, looking at the Partitioner interface, I would need to know from looking at a single key which doesn't fit my case, unfortunately. For my case, I need to compare successive pairs of keys. (I'm trying to re-join lines that were split prematurely.) On Tue, Jun 30, 2015 at 2:07 PM, Abhishek R. Singh abhis...@tetrationanalytics.com wrote: could you use a custom partitioner to preserve boundaries such that all related tuples end up on the same partition? On Jun 30, 2015, at 12:00 PM, RJ Nowling rnowl...@gmail.com wrote: Thanks, Reynold. I still need to handle incomplete groups that fall between partition boundaries. So, I need a two-pass approach. I came up with a somewhat hacky way to handle those using the partition indices and key-value pairs as a second pass after the first. OCaml's std library provides a function called group() that takes a break function that operators on pairs of successive elements. It seems a similar approach could be used in Spark and would be more efficient than my approach with key-value pairs since you know the ordering of the partitions. Has this need been expressed by others? On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin r...@databricks.com wrote: Try mapPartitions, which gives you an iterator, and you can produce an iterator back. On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling rnowl...@gmail.com wrote: Hi all, I have a problem where I have a RDD of elements: Item1 Item2 Item3 Item4 Item5 Item6 ... and I want to run a function over them to decide which runs of elements to group together: [Item1 Item2] [Item3] [Item4 Item5 Item6] ... Technically, I could use aggregate to do this, but I would have to use a List of List of T which would produce a very large collection in memory. Is there an easy way to accomplish this? e.g.,, it would be nice to have a version of aggregate where the combination function can return a complete group that is added to the new RDD and an incomplete group which is passed to the next call of the reduce function. Thanks, RJ
Re: Grouping runs of elements in a RDD
Thanks, Mohit. It sounds like we're on the same page -- I used a similar approach. On Thu, Jul 2, 2015 at 12:27 PM, Mohit Jaggi mohitja...@gmail.com wrote: if you are joining successive lines together based on a predicate, then you are doing a flatMap not an aggregate. you are on the right track with a multi-pass solution. i had the same challenge when i needed a sliding window over an RDD(see below). [ i had suggested that the sliding window API be moved to spark-core. not sure if that happened ] - previous posts --- http://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.rdd.RDDFunctions On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi mohitja...@gmail.com wrote: http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3ccalrvtpkn65rolzbetc+ddk4o+yjm+tfaf5dz8eucpl-2yhy...@mail.gmail.com%3E you can use the MLLib function or do the following (which is what I had done): - in first pass over the data, using mapPartitionWithIndex, gather the first item in each partition. you can use collect (or aggregator) for this. “key” them by the partition index. at the end, you will have a map (partition index) -- first item - in the second pass over the data, using mapPartitionWithIndex again, look at two (or in the general case N items at a time, you can use scala’s sliding iterator) items at a time and check the time difference(or any sliding window computation). To this mapParitition, pass the map created in previous step. You will need to use them to check the last item in this partition. If you can tolerate a few inaccuracies then you can just do the second step. You will miss the “boundaries” of the partitions but it might be acceptable for your use case. On Tue, Jun 30, 2015 at 12:21 PM, RJ Nowling rnowl...@gmail.com wrote: That's an interesting idea! I hadn't considered that. However, looking at the Partitioner interface, I would need to know from looking at a single key which doesn't fit my case, unfortunately. For my case, I need to compare successive pairs of keys. (I'm trying to re-join lines that were split prematurely.) On Tue, Jun 30, 2015 at 2:07 PM, Abhishek R. Singh abhis...@tetrationanalytics.com wrote: could you use a custom partitioner to preserve boundaries such that all related tuples end up on the same partition? On Jun 30, 2015, at 12:00 PM, RJ Nowling rnowl...@gmail.com wrote: Thanks, Reynold. I still need to handle incomplete groups that fall between partition boundaries. So, I need a two-pass approach. I came up with a somewhat hacky way to handle those using the partition indices and key-value pairs as a second pass after the first. OCaml's std library provides a function called group() that takes a break function that operators on pairs of successive elements. It seems a similar approach could be used in Spark and would be more efficient than my approach with key-value pairs since you know the ordering of the partitions. Has this need been expressed by others? On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin r...@databricks.com wrote: Try mapPartitions, which gives you an iterator, and you can produce an iterator back. On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling rnowl...@gmail.com wrote: Hi all, I have a problem where I have a RDD of elements: Item1 Item2 Item3 Item4 Item5 Item6 ... and I want to run a function over them to decide which runs of elements to group together: [Item1 Item2] [Item3] [Item4 Item5 Item6] ... Technically, I could use aggregate to do this, but I would have to use a List of List of T which would produce a very large collection in memory. Is there an easy way to accomplish this? e.g.,, it would be nice to have a version of aggregate where the combination function can return a complete group that is added to the new RDD and an incomplete group which is passed to the next call of the reduce function. Thanks, RJ
Grouping runs of elements in a RDD
Hi all, I have a problem where I have a RDD of elements: Item1 Item2 Item3 Item4 Item5 Item6 ... and I want to run a function over them to decide which runs of elements to group together: [Item1 Item2] [Item3] [Item4 Item5 Item6] ... Technically, I could use aggregate to do this, but I would have to use a List of List of T which would produce a very large collection in memory. Is there an easy way to accomplish this? e.g.,, it would be nice to have a version of aggregate where the combination function can return a complete group that is added to the new RDD and an incomplete group which is passed to the next call of the reduce function. Thanks, RJ
Re: Grouping runs of elements in a RDD
Try mapPartitions, which gives you an iterator, and you can produce an iterator back. On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling rnowl...@gmail.com wrote: Hi all, I have a problem where I have a RDD of elements: Item1 Item2 Item3 Item4 Item5 Item6 ... and I want to run a function over them to decide which runs of elements to group together: [Item1 Item2] [Item3] [Item4 Item5 Item6] ... Technically, I could use aggregate to do this, but I would have to use a List of List of T which would produce a very large collection in memory. Is there an easy way to accomplish this? e.g.,, it would be nice to have a version of aggregate where the combination function can return a complete group that is added to the new RDD and an incomplete group which is passed to the next call of the reduce function. Thanks, RJ
Re: Grouping runs of elements in a RDD
That's an interesting idea! I hadn't considered that. However, looking at the Partitioner interface, I would need to know from looking at a single key which doesn't fit my case, unfortunately. For my case, I need to compare successive pairs of keys. (I'm trying to re-join lines that were split prematurely.) On Tue, Jun 30, 2015 at 2:07 PM, Abhishek R. Singh abhis...@tetrationanalytics.com wrote: could you use a custom partitioner to preserve boundaries such that all related tuples end up on the same partition? On Jun 30, 2015, at 12:00 PM, RJ Nowling rnowl...@gmail.com wrote: Thanks, Reynold. I still need to handle incomplete groups that fall between partition boundaries. So, I need a two-pass approach. I came up with a somewhat hacky way to handle those using the partition indices and key-value pairs as a second pass after the first. OCaml's std library provides a function called group() that takes a break function that operators on pairs of successive elements. It seems a similar approach could be used in Spark and would be more efficient than my approach with key-value pairs since you know the ordering of the partitions. Has this need been expressed by others? On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin r...@databricks.com wrote: Try mapPartitions, which gives you an iterator, and you can produce an iterator back. On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling rnowl...@gmail.com wrote: Hi all, I have a problem where I have a RDD of elements: Item1 Item2 Item3 Item4 Item5 Item6 ... and I want to run a function over them to decide which runs of elements to group together: [Item1 Item2] [Item3] [Item4 Item5 Item6] ... Technically, I could use aggregate to do this, but I would have to use a List of List of T which would produce a very large collection in memory. Is there an easy way to accomplish this? e.g.,, it would be nice to have a version of aggregate where the combination function can return a complete group that is added to the new RDD and an incomplete group which is passed to the next call of the reduce function. Thanks, RJ
Re: Grouping runs of elements in a RDD
Thanks, Reynold. I still need to handle incomplete groups that fall between partition boundaries. So, I need a two-pass approach. I came up with a somewhat hacky way to handle those using the partition indices and key-value pairs as a second pass after the first. OCaml's std library provides a function called group() that takes a break function that operators on pairs of successive elements. It seems a similar approach could be used in Spark and would be more efficient than my approach with key-value pairs since you know the ordering of the partitions. Has this need been expressed by others? On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin r...@databricks.com wrote: Try mapPartitions, which gives you an iterator, and you can produce an iterator back. On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling rnowl...@gmail.com wrote: Hi all, I have a problem where I have a RDD of elements: Item1 Item2 Item3 Item4 Item5 Item6 ... and I want to run a function over them to decide which runs of elements to group together: [Item1 Item2] [Item3] [Item4 Item5 Item6] ... Technically, I could use aggregate to do this, but I would have to use a List of List of T which would produce a very large collection in memory. Is there an easy way to accomplish this? e.g.,, it would be nice to have a version of aggregate where the combination function can return a complete group that is added to the new RDD and an incomplete group which is passed to the next call of the reduce function. Thanks, RJ
Re: Grouping runs of elements in a RDD
could you use a custom partitioner to preserve boundaries such that all related tuples end up on the same partition? On Jun 30, 2015, at 12:00 PM, RJ Nowling rnowl...@gmail.com wrote: Thanks, Reynold. I still need to handle incomplete groups that fall between partition boundaries. So, I need a two-pass approach. I came up with a somewhat hacky way to handle those using the partition indices and key-value pairs as a second pass after the first. OCaml's std library provides a function called group() that takes a break function that operators on pairs of successive elements. It seems a similar approach could be used in Spark and would be more efficient than my approach with key-value pairs since you know the ordering of the partitions. Has this need been expressed by others? On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin r...@databricks.com wrote: Try mapPartitions, which gives you an iterator, and you can produce an iterator back. On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling rnowl...@gmail.com wrote: Hi all, I have a problem where I have a RDD of elements: Item1 Item2 Item3 Item4 Item5 Item6 ... and I want to run a function over them to decide which runs of elements to group together: [Item1 Item2] [Item3] [Item4 Item5 Item6] ... Technically, I could use aggregate to do this, but I would have to use a List of List of T which would produce a very large collection in memory. Is there an easy way to accomplish this? e.g.,, it would be nice to have a version of aggregate where the combination function can return a complete group that is added to the new RDD and an incomplete group which is passed to the next call of the reduce function. Thanks, RJ