To implement setup/cleanup function in Spark we follow the pattern below as discussed here <http://apache-spark-user-list.1001560.n3.nabble.com/Keep-state-inside-map-function-td10968.html#a11009> .
rdd.mapPartitions { partition => if (!partition.isEmpty) { // Some setup code here partition.map(item => { val output = yourfunction(item) if (!partition.hasNext) { // Some cleanup code here } output }) } else { // return an empty Iterator of your return type } } In my case the rdd is a pair-rdd loaded from Accumulo using InputFormat and our map function only changes the values (no change to the keys). However, the returned iterator from the mapPartitions, somehow, ends up with incorrect keys. I even tried "preservesPartitioning=true" but no luck. Debugging the code shows that the keys get changed after calling partition.hasNext. If I remove "partition.hasNext" from the code then everything works fine! Any ideas? Thanks, Mohammad