We have some code we've written using stateful streaming (mapWithState)
which works well for the most part.  The stateful streaming runs, processes
the RDD of input data, calls the state spec function for each input record,
and does all proper adding and removing from the state cache.

However, I have a need to do some cleanup after stateful streaming
processes the input data RDD, and I can't seem to find any place where we
can put that code where it will run when it's supposed to.

Essentially our state spec function needs to a) call out to an external
service, b) hold some data from that service, and then c) inform the
service to clean up the remaining data once the RDD is complete.

I've gotten to the point where the code looks approximately like this:


val eventStream = incomingStream.transform(...)

val stateUpdater = new StateUpdater
val stateUpdateStream =
eventStream.mapWithState(stateUpdater.stateUpdateFunction _)

stateUpdateStream.foreachRdd(...) {
...
}
stateUpdater.cleanupExternalService    // DOES NOT WORK!


class StateUpdater extends Serializable {

def stateUpdateFunction(key, value, state) {
if (!state.initalized) {
state.initialize(externalService)
}
...
}

def cleanupExternalService {
externalService.cleanup  // writes some data back to the external service
}

@transient lazy val externalService = new ExternalService
}


Note that the ExternalService object is holding onto a small bit of state
that it needs to write back to the external service once we have completed
running the stateUpdateFunction on every record in the RDD.  However this
code doesn't actually work.  Because of the way Spark serializes objects on
the driver and then deserializes them onto the executor, there's no way for
me to get a hold of the ExternalService object that is being used on each
RDD partition and clean up its leftover data.  Those objects seem to be
held internally somewhere in the bowels of stateful streaming (where it
processes an RDD of incoming data and applies it to the state).  And back
in the main code where I'm trying to call the cleanup method, I'm actually
calling it on a totally different object than the one that ran in the RDD
partitions.  And stateful streaming doesn't provide me with any opportunity
to perform any cleanup processing - say by calling some "rddDone" method to
notify me that it just finished doing state processing on an RDD.  It just
calls only the statespec function over and over, once for every record, and
never notifying me that we've ended processing an RDD or started a new one.


Is there any way out of this conundrum?  I've tried to avoid the problem by
moving my interactions with the external service outside of the state spec
function.  But that didn't work:  the interaction with the external service
is really needed inside of the state spec function, and I caused a bug in
our code when I tried to move it.

Any suggestions that I might not have thought of on how to fix this issue?

Thanks,

DR

Reply via email to