doing cleanup in an iterator like that assumes the iterator always gets fully read, which is not necessary the case (for example RDD.take does not).
instead i would use mapPartitionsWithContext, in which case you can write a function of the form. f: (TaskContext, Iterator[T]) => Iterator[U] now you can register a cleanup with the task context, like this: context.addTaskCompletionListener(new TaskCompletionListener { override def onTaskCompletion(context: TaskContext): Unit = dosomething ) and after that proceed with an iterator transformation as usual On Thu, Jul 31, 2014 at 4:35 AM, Sean Owen <so...@cloudera.com> wrote: > On Thu, Jul 31, 2014 at 2:11 AM, Tobias Pfeiffer <t...@preferred.jp> wrote: > > rdd.mapPartitions { partition => > > // Some setup code here > > val result = partition.map(yourfunction) > > > > // Some cleanup code here > > result > > } > > Yes, I realized that after I hit send. You definitely have to store > and return the result from the mapping! > > > > rdd.mapPartitions { partition => > > if (!partition.isEmpty) { > > > > // Some setup code here > > partition.map(item => { > > val output = yourfunction(item) > > if (!partition.hasNext) { > > // Some cleanup code here > > } > > output > > }) > > } else { > > // return an empty Iterator of your return type > > } > > } > > Great point, yeah. If you knew the number of values were small you > could collect them and process locally, but this is the right general > way to do it. >