We have some code we've written using stateful streaming (mapWithState) which works well for the most part. The stateful streaming runs, processes the RDD of input data, calls the state spec function for each input record, and does all proper adding and removing from the state cache.
However, I have a need to do some cleanup after stateful streaming processes the input data RDD, and I can't seem to find any place where we can put that code where it will run when it's supposed to. Essentially our state spec function needs to a) call out to an external service, b) hold some data from that service, and then c) inform the service to clean up the remaining data once the RDD is complete. I've gotten to the point where the code looks approximately like this: val eventStream = incomingStream.transform(...) val stateUpdater = new StateUpdater val stateUpdateStream = eventStream.mapWithState(stateUpdater.stateUpdateFunction _) stateUpdateStream.foreachRdd(...) { ... } stateUpdater.cleanupExternalService // DOES NOT WORK! class StateUpdater extends Serializable { def stateUpdateFunction(key, value, state) { if (!state.initalized) { state.initialize(externalService) } ... } def cleanupExternalService { externalService.cleanup // writes some data back to the external service } @transient lazy val externalService = new ExternalService } Note that the ExternalService object is holding onto a small bit of state that it needs to write back to the external service once we have completed running the stateUpdateFunction on every record in the RDD. However this code doesn't actually work. Because of the way Spark serializes objects on the driver and then deserializes them onto the executor, there's no way for me to get a hold of the ExternalService object that is being used on each RDD partition and clean up its leftover data. Those objects seem to be held internally somewhere in the bowels of stateful streaming (where it processes an RDD of incoming data and applies it to the state). And back in the main code where I'm trying to call the cleanup method, I'm actually calling it on a totally different object than the one that ran in the RDD partitions. And stateful streaming doesn't provide me with any opportunity to perform any cleanup processing - say by calling some "rddDone" method to notify me that it just finished doing state processing on an RDD. It just calls only the statespec function over and over, once for every record, and never notifying me that we've ended processing an RDD or started a new one. Is there any way out of this conundrum? I've tried to avoid the problem by moving my interactions with the external service outside of the state spec function. But that didn't work: the interaction with the external service is really needed inside of the state spec function, and I caused a bug in our code when I tried to move it. Any suggestions that I might not have thought of on how to fix this issue? Thanks, DR