And duh, of course, you can do the setup in that new RDD as well :)
On Wed, Aug 20, 2014 at 1:59 AM, Victor Tso-Guillen <v...@paxata.com> wrote: > How about this: > > val prev: RDD[V] = rdd.mapPartitions(partition => { /*setup()*/; partition > }) > new RDD[V](prev) { > protected def getPartitions = prev.partitions > > def compute(split: Partition, context: TaskContext) = { > context.addOnCompleteCallback(() => /*cleanup()*/) > firstParent[V].iterator(split, context) > } > } > > > On Tue, Aug 19, 2014 at 11:56 AM, Sean Owen <so...@cloudera.com> wrote: > >> I think you're looking for foreachPartition(). You've kinda hacked it >> out of mapPartitions(). Your case has a simple solution, yes. After >> saving to the DB, you know you can close the connection, since you >> know the use of the connection has definitely just finished. But it's >> not a simpler solution for mapPartitions() since that's not really >> what you are using :) >> >> In general, mapPartitions creates an Iterator from another Iterator. >> Of course you could consume the input iterator, open the connection, >> perform operations, close the connection and return an iterator over >> the result. That works, but requires reading the entire input no >> matter what, and, reading it into memory. These may not be OK in all >> cases. >> >> Where possible, it's nicest to return an Iterator that accesses the >> source Iterator only as needed to produce elements. This means >> returning that Iterator before any work has been done. So you have to >> close the connection later when the Iterator has been exhausted. >> Really Tobias's method is trying to shim in a "cleanup()" lifecycle >> method into the Iterator. I suppose it could be done a little more >> cleanly using Guava's Iterator library, which would give you a more >> explicit way to execute something when done. >> >> >> On Tue, Aug 19, 2014 at 7:36 PM, Yana Kadiyska <yana.kadiy...@gmail.com> >> wrote: >> > Sean, would this work -- >> > >> > rdd.mapPartitions { partition => Iterator(partition) }.foreach( >> > >> > // Some setup code here >> > // save partition to DB >> > // Some cleanup code here >> > ) >> > >> > >> > I tried a pretty simple example ... I can see that the setup and >> cleanup are >> > executed on the executor node, once per partition (I used >> > mapPartitionWithIndex instead of mapPartition to track this a little >> > better). Seems like an easier solution than Tobias's but I'm wondering >> if >> > it's perhaps incorrect >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >