Hello Lubo, The idea of timeouts is to make a best-effort and last-resort effort to process a key, when it has not received data for a while. With processing time timeout is 1 minute, the system guarantees that it will not timeout unless at least 1 minute has passed. Defining a precise timing on when the timeout is triggered, is really hard for many reasons (distributed system, lack of precise clock-synch, need for deterministic re-executions for fault-tolerance, etc.). We made a design decision to process timed out data after processing the data in a batch, but that choice should not affect your business logic if your logic is constructed the right way. So your business logic should set loosely defined timeout durations, and not depend on the exactly timing of when the timeouts are hit.
TD On Wed, Jul 26, 2017 at 1:54 AM, Zhang, Lubo <lubo.zh...@intel.com> wrote: > Hi all > > > > I have a question about the Stateful operations > [map/flatmap]GroupsWithState in Structured streaming. Issue are as follows: > > > > Take StructuredSessionization case for example, first I input two words > like apache and spark in batch 0, then input another word Hadoop in batch 1 > until timeout happens (here the timeout type is ProcessingTimeout). So I > can see both words apache and spark are outputed since each group state is > timedout. But if I input the same word apache in batch 1 which already > existed in batch 0, the result shows only spark is expired. I deep into > this and find the code https://github.com/apache/ > spark/blob/master/sql/core/src/main/scala/org/apache/ > spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala#L131 deal > with the group state update, it first process new group data and set the > flag hasTimedout to be false in update func , which result the key already > timedout to be just update. I know the timeout function call will not > occur until there is new data to trigger, but I am wondering why don’t > we first process timeout keys, so we can retrieve the expired data exist in > batch 0 in user-given function > > > > def statefunc(key: K, values: Iterator [V],state: GroupState [S]): U = { > > *if* (state.hasTimedOut) { // If called when timing out, > remove the state > > ToDO; > > state.remove() > > > > } *else* *if* (state.exists) { > > } > > } > > > > > > Thanks > > Lubo > > >