And duh, of course, you can do the setup in that new RDD as well :)

On Wed, Aug 20, 2014 at 1:59 AM, Victor Tso-Guillen <v...@paxata.com> wrote:

> How about this:
>
> val prev: RDD[V] = rdd.mapPartitions(partition => { /*setup()*/; partition
> })
> new RDD[V](prev) {
>   protected def getPartitions = prev.partitions
>
>   def compute(split: Partition, context: TaskContext) = {
>     context.addOnCompleteCallback(() => /*cleanup()*/)
>     firstParent[V].iterator(split, context)
>   }
> }
>
>
> On Tue, Aug 19, 2014 at 11:56 AM, Sean Owen <so...@cloudera.com> wrote:
>
>> I think you're looking for foreachPartition(). You've kinda hacked it
>> out of mapPartitions(). Your case has a simple solution, yes. After
>> saving to the DB, you know you can close the connection, since you
>> know the use of the connection has definitely just finished. But it's
>> not a simpler solution for mapPartitions() since that's not really
>> what you are using :)
>>
>> In general, mapPartitions creates an Iterator from another Iterator.
>> Of course you could consume the input iterator, open the connection,
>> perform operations, close the connection and return an iterator over
>> the result. That works, but requires reading the entire input no
>> matter what, and, reading it into memory. These may not be OK in all
>> cases.
>>
>> Where possible, it's nicest to return an Iterator that accesses the
>> source Iterator only as needed to produce elements. This means
>> returning that Iterator before any work has been done. So you have to
>> close the connection later when the Iterator has been exhausted.
>> Really Tobias's method is trying to shim in a "cleanup()" lifecycle
>> method into the Iterator. I suppose it could be done a little more
>> cleanly using Guava's Iterator library, which would give you a more
>> explicit way to execute something when done.
>>
>>
>> On Tue, Aug 19, 2014 at 7:36 PM, Yana Kadiyska <yana.kadiy...@gmail.com>
>> wrote:
>> > Sean, would this work --
>> >
>> > rdd.mapPartitions { partition => Iterator(partition) }.foreach(
>> >
>> >    // Some setup code here
>> >    // save partition to DB
>> >    // Some cleanup code here
>> > )
>> >
>> >
>> > I tried a pretty simple example ... I can see that the setup and
>> cleanup are
>> > executed on the executor node, once per partition (I used
>> > mapPartitionWithIndex instead of mapPartition to track this a little
>> > better). Seems like an easier solution than Tobias's but I'm wondering
>> if
>> > it's perhaps incorrect
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>

Reply via email to