To implement setup/cleanup function in Spark we follow the pattern below as
discussed here
<http://apache-spark-user-list.1001560.n3.nabble.com/Keep-state-inside-map-function-td10968.html#a11009>
.

rdd.mapPartitions { partition =>
   if (!partition.isEmpty) {
     // Some setup code here
     partition.map(item => {
       val output = yourfunction(item)
       if (!partition.hasNext) {
         // Some cleanup code here
       }
       output
     })
   } else {
     // return an empty Iterator of your return type
   }
}

In my case the rdd is a pair-rdd loaded from Accumulo using InputFormat and
our map function only changes the values (no change to the keys). However,
the returned iterator from the  mapPartitions, somehow, ends up with
incorrect keys. I even tried "preservesPartitioning=true"  but no luck.

Debugging the code shows that the keys get changed after calling
partition.hasNext. If I remove "partition.hasNext" from the code then
everything works fine!

Any ideas?

Thanks,
Mohammad

Reply via email to