doing cleanup in an iterator like that assumes the iterator always gets
fully read, which is not necessary the case (for example RDD.take does not).

instead i would use mapPartitionsWithContext, in which case you can write a
function of the form.
 f: (TaskContext, Iterator[T]) => Iterator[U]

now you can register a cleanup with the task context, like this:
context.addTaskCompletionListener(new TaskCompletionListener {
  override def onTaskCompletion(context: TaskContext): Unit = dosomething
)

and after that proceed with an iterator transformation as usual


On Thu, Jul 31, 2014 at 4:35 AM, Sean Owen <so...@cloudera.com> wrote:

> On Thu, Jul 31, 2014 at 2:11 AM, Tobias Pfeiffer <t...@preferred.jp> wrote:
> > rdd.mapPartitions { partition =>
> >    // Some setup code here
> >    val result = partition.map(yourfunction)
> >
> >    // Some cleanup code here
> >    result
> > }
>
> Yes, I realized that after I hit send. You definitely have to store
> and return the result from the mapping!
>
>
> > rdd.mapPartitions { partition =>
> >    if (!partition.isEmpty) {
> >
> >      // Some setup code here
> >      partition.map(item => {
> >        val output = yourfunction(item)
> >        if (!partition.hasNext) {
> >          // Some cleanup code here
> >        }
> >        output
> >      })
> >    } else {
> >      // return an empty Iterator of your return type
> >    }
> > }
>
> Great point, yeah. If you knew the number of values were small you
> could collect them and process locally, but this is the right general
> way to do it.
>

Reply via email to