Hello Lubo,

The idea of timeouts is to make a best-effort and last-resort effort to
process a key, when it has not received data for a while. With processing
time timeout is 1 minute, the system guarantees that it will not timeout
unless at least 1 minute has passed. Defining a precise timing on when the
timeout is triggered, is really hard for many reasons (distributed system,
lack of precise clock-synch, need for deterministic re-executions for
fault-tolerance, etc.). We made a design decision to process timed out data
after processing the data in a batch, but that choice should not affect
your business logic if your logic is constructed the right way. So your
business logic should set loosely defined timeout durations, and not depend
on the exactly timing of when the timeouts are hit.

TD

On Wed, Jul 26, 2017 at 1:54 AM, Zhang, Lubo <lubo.zh...@intel.com> wrote:

> Hi all
>
>
>
> I have a question about the Stateful  operations
> [map/flatmap]GroupsWithState in Structured streaming. Issue are as follows:
>
>
>
> Take StructuredSessionization case for example, first I input two words
> like apache and spark in batch 0, then input another word Hadoop in batch 1
> until timeout happens (here the timeout type is ProcessingTimeout). So I
> can see both words apache and spark are outputed since each group state is
> timedout. But if I input the same word apache in batch 1 which already
> existed in batch 0, the result shows only spark is expired.  I deep into
> this and find the code https://github.com/apache/
> spark/blob/master/sql/core/src/main/scala/org/apache/
> spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala#L131  deal
> with the group state update, it first process new group data and set the
> flag hasTimedout to be false in update func , which result the key already
> timedout to be just update. I know the timeout function call will not
> occur until there is new data to trigger, but  I am wondering why don’t
> we first process timeout keys, so we can retrieve the expired data exist in
> batch 0 in user-given function
>
>
>
> def statefunc(key: K, values: Iterator [V],state: GroupState [S]): U = {
>
>     *if* (state.hasTimedOut) {                // If called when timing out, 
> remove the state
>
>       ToDO;
>
>       state.remove()
>
>
>
> } *else* *if* (state.exists) {
>
> }
>
> }
>
>
>
>
>
> Thanks
>
> Lubo
>
>
>

Reply via email to