Thanks much for the suggestion.  Reading over your mail though, I realize
that I may not have made something clear:  I don't just have a single
external service object; the idea is that I have one per executor, so that
the code running on each executor accesses the external service
independently.  (And performs its cleanup independently as well.)  So I
actually don't want stateUpdater.cleanupExternalService to run on the
driver, but rather to run on each executor.

So to be a bit more explicit, using words rather than code, what I'm trying
to do is:

In the driver:
* Start reading the incoming stream of data (event strings, in my case)
* Run stateful streaming to ingest the stream of incoming data and update
state objects with it (user summaries, in my case)
* Output all the completed user summaries (to Kafka, in my case) for
further downstream processing

In each executor (i.e., RDD partition), using stateful streaming / the
state spec function:
* Process a batch of incoming event strings
* Update each event string into the user summary
* In the process of building the user summary, retrieve some data from the
external service (lazily initialized)
* Emit each completed user summary as output from the stateful streaming
state spec function
* After all event strings in the batch have been processed, perform cleanup
on this executor's interaction with the external service by writing some
data back to the service

I'm able to get all of this working except for that last bullet point.

There doesn't seem to be any way to access the external service connection
object being used in my state spec function and then, after the RDD batch
is done, tell it to write back the data it's holding.  Stateful streaming
doesn't seem to give one much to work with in this regard.  It doesn't
notify you in any way that a batch is complete; it just repeatedly calls
your state spec function for each incoming record, without any indication
as to when one batch ended and another one started.  And the instances of
the objects that each executor deserializes and calls your state spec
function on don't seem to be accessible from ... well, anywhere else other
than in the state spec function itself.

Any ideas how what I'm trying to do might be achievable using stateful
streaming?

Thanks,

DR


On Tue, Jun 6, 2017 at 7:31 PM, Gerard Maas <gerard.m...@gmail.com> wrote:

> It looks like the clean up should go into the foreachRDD function:
>
> stateUpdateStream.foreachRdd(...) { rdd =>
> // do stuff with the rdd
>
>   stateUpdater.cleanupExternalService    // should work in this position
> }
>
> Code within the foreachRDD(*) executes on the driver, so you can keep the
> state of the object there.
>
> What will not work is to update the stateUpdater state from a side effect
> of the stateUpdateFunction used in the mapWithState transformation and
> expect those changes to be visible at the call site sketched above.
>
> kr, Gerard.
>
> (*) a typical construct found in the wild is:
> dstream.foreachRDD{rdd =>
>    // do some preparation
>    rdd.operation{elem => ... }
>    ...
>    // close/clean/report
> }
> So the code within the foreachRDD closure executes on the driver, *but*
> the code within the rdd.operation{...} closure is a spark operation and
> executes distributed on the executors.
> One must be careful of not incorrectly mixing the scopes, in particular
> when holding on to local state.
>
>
>
> On Wed, Jun 7, 2017 at 1:08 AM, David Rosenstrauch <daro...@gmail.com>
> wrote:
>
>> We have some code we've written using stateful streaming (mapWithState)
>> which works well for the most part.  The stateful streaming runs, processes
>> the RDD of input data, calls the state spec function for each input record,
>> and does all proper adding and removing from the state cache.
>>
>> However, I have a need to do some cleanup after stateful streaming
>> processes the input data RDD, and I can't seem to find any place where we
>> can put that code where it will run when it's supposed to.
>>
>> Essentially our state spec function needs to a) call out to an external
>> service, b) hold some data from that service, and then c) inform the
>> service to clean up the remaining data once the RDD is complete.
>>
>> I've gotten to the point where the code looks approximately like this:
>>
>>
>> val eventStream = incomingStream.transform(...)
>>
>> val stateUpdater = new StateUpdater
>> val stateUpdateStream = 
>> eventStream.mapWithState(stateUpdater.stateUpdateFunction
>> _)
>>
>> stateUpdateStream.foreachRdd(...) {
>> ...
>> }
>> stateUpdater.cleanupExternalService    // DOES NOT WORK!
>>
>>
>> class StateUpdater extends Serializable {
>>
>> def stateUpdateFunction(key, value, state) {
>> if (!state.initalized) {
>> state.initialize(externalService)
>> }
>> ...
>> }
>>
>> def cleanupExternalService {
>> externalService.cleanup  // writes some data back to the external service
>> }
>>
>> @transient lazy val externalService = new ExternalService
>> }
>>
>>
>> Note that the ExternalService object is holding onto a small bit of state
>> that it needs to write back to the external service once we have completed
>> running the stateUpdateFunction on every record in the RDD.  However this
>> code doesn't actually work.  Because of the way Spark serializes objects on
>> the driver and then deserializes them onto the executor, there's no way for
>> me to get a hold of the ExternalService object that is being used on each
>> RDD partition and clean up its leftover data.  Those objects seem to be
>> held internally somewhere in the bowels of stateful streaming (where it
>> processes an RDD of incoming data and applies it to the state).  And back
>> in the main code where I'm trying to call the cleanup method, I'm actually
>> calling it on a totally different object than the one that ran in the RDD
>> partitions.  And stateful streaming doesn't provide me with any opportunity
>> to perform any cleanup processing - say by calling some "rddDone" method to
>> notify me that it just finished doing state processing on an RDD.  It just
>> calls only the statespec function over and over, once for every record, and
>> never notifying me that we've ended processing an RDD or started a new one.
>>
>>
>> Is there any way out of this conundrum?  I've tried to avoid the problem
>> by moving my interactions with the external service outside of the state
>> spec function.  But that didn't work:  the interaction with the external
>> service is really needed inside of the state spec function, and I caused a
>> bug in our code when I tried to move it.
>>
>> Any suggestions that I might not have thought of on how to fix this issue?
>>
>> Thanks,
>>
>> DR
>>
>
>

Reply via email to