Tathagata Das created SPARK-19838:
-------------------------------------

             Summary: Adding Processing Time based timeout
                 Key: SPARK-19838
                 URL: https://issues.apache.org/jira/browse/SPARK-19838
             Project: Spark
          Issue Type: Sub-task
          Components: Structured Streaming
    Affects Versions: 2.1.0
            Reporter: Tathagata Das
            Assignee: Tathagata Das


When a key does not get any new data in mapGroupsWithState, the mapping 
function is never called on it. So we need a timeout feature that calls the 
function again in such cases, so that the user can decide whether to continue 
waiting or clean up (remove state, save stuff externally, etc.).

Timeouts can be either based  on processing time or event time. This JIRA is 
for processing time, but defines the high level API design for both. The usage 
would look like this 

{code}
def stateFunction(key: K, value: Iterator[V], state: KeyedState[S]): U = {
  ...
  state.setTimeoutDuration(10000)
  ...
}

dataset                                 // type is Dataset[T]
  .groupByKey[K](keyingFunc)   // generates KeyValueGroupedDataset[K, T]
  .mapGroupsWithState[S, U](
     func = stateFunction, 
     timeout = KeyedStateTimeout.withProcessingTime)    // returns Dataset[U]
{code}

Note the following design aspects. 

- The timeout type is provided as a param in mapGroupsWithState as a parameter 
global to all the keys. This is so that the planner knows this at planning 
time, and accordingly optimize the execution based on whether to saves extra 
info in state or not (e.g. timeout durations or timestamps).

- The exact timeout duration is provided inside the function call so that it 
can be customized on a per key basis.

- When the timeout occurs for a key, the function is called with no values, and 
{{KeyedState.isTimingOut()}} set to {{true}}.

- The timeout is reset for key every time the function is called on the key, 
that is, when the key has new data, or the key has timed out. So the user has 
to set the timeout duration everytime the function is called, otherwise there 
will not be any timeout set.







--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to