It looks like the clean up should go into the foreachRDD function:

stateUpdateStream.foreachRdd(...) { rdd =>
// do stuff with the rdd

  stateUpdater.cleanupExternalService    // should work in this position
}

Code within the foreachRDD(*) executes on the driver, so you can keep the
state of the object there.

What will not work is to update the stateUpdater state from a side effect
of the stateUpdateFunction used in the mapWithState transformation and
expect those changes to be visible at the call site sketched above.

kr, Gerard.

(*) a typical construct found in the wild is:
dstream.foreachRDD{rdd =>
   // do some preparation
   rdd.operation{elem => ... }
   ...
   // close/clean/report
}
So the code within the foreachRDD closure executes on the driver, *but* the
code within the rdd.operation{...} closure is a spark operation and
executes distributed on the executors.
One must be careful of not incorrectly mixing the scopes, in particular
when holding on to local state.



On Wed, Jun 7, 2017 at 1:08 AM, David Rosenstrauch <daro...@gmail.com>
wrote:

> We have some code we've written using stateful streaming (mapWithState)
> which works well for the most part.  The stateful streaming runs, processes
> the RDD of input data, calls the state spec function for each input record,
> and does all proper adding and removing from the state cache.
>
> However, I have a need to do some cleanup after stateful streaming
> processes the input data RDD, and I can't seem to find any place where we
> can put that code where it will run when it's supposed to.
>
> Essentially our state spec function needs to a) call out to an external
> service, b) hold some data from that service, and then c) inform the
> service to clean up the remaining data once the RDD is complete.
>
> I've gotten to the point where the code looks approximately like this:
>
>
> val eventStream = incomingStream.transform(...)
>
> val stateUpdater = new StateUpdater
> val stateUpdateStream = 
> eventStream.mapWithState(stateUpdater.stateUpdateFunction
> _)
>
> stateUpdateStream.foreachRdd(...) {
> ...
> }
> stateUpdater.cleanupExternalService    // DOES NOT WORK!
>
>
> class StateUpdater extends Serializable {
>
> def stateUpdateFunction(key, value, state) {
> if (!state.initalized) {
> state.initialize(externalService)
> }
> ...
> }
>
> def cleanupExternalService {
> externalService.cleanup  // writes some data back to the external service
> }
>
> @transient lazy val externalService = new ExternalService
> }
>
>
> Note that the ExternalService object is holding onto a small bit of state
> that it needs to write back to the external service once we have completed
> running the stateUpdateFunction on every record in the RDD.  However this
> code doesn't actually work.  Because of the way Spark serializes objects on
> the driver and then deserializes them onto the executor, there's no way for
> me to get a hold of the ExternalService object that is being used on each
> RDD partition and clean up its leftover data.  Those objects seem to be
> held internally somewhere in the bowels of stateful streaming (where it
> processes an RDD of incoming data and applies it to the state).  And back
> in the main code where I'm trying to call the cleanup method, I'm actually
> calling it on a totally different object than the one that ran in the RDD
> partitions.  And stateful streaming doesn't provide me with any opportunity
> to perform any cleanup processing - say by calling some "rddDone" method to
> notify me that it just finished doing state processing on an RDD.  It just
> calls only the statespec function over and over, once for every record, and
> never notifying me that we've ended processing an RDD or started a new one.
>
>
> Is there any way out of this conundrum?  I've tried to avoid the problem
> by moving my interactions with the external service outside of the state
> spec function.  But that didn't work:  the interaction with the external
> service is really needed inside of the state spec function, and I caused a
> bug in our code when I tried to move it.
>
> Any suggestions that I might not have thought of on how to fix this issue?
>
> Thanks,
>
> DR
>

Reply via email to