GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/17179

    [SPARK-19067][SS] Processing-time-based timeout in MapGroupsWithState

    ## What changes were proposed in this pull request?
    
    When a key does not get any new data in `mapGroupsWithState`, the mapping 
function is never called on it. So we need a timeout feature that calls the 
function again in such cases, so that the user can decide whether to continue 
waiting or clean up (remove state, save stuff externally, etc.).
    Timeouts can be either based on processing time or event time. This JIRA is 
for processing time, but defines the high level API design for both. The usage 
would look like this.
    ```
    def stateFunction(key: K, value: Iterator[V], state: KeyedState[S]): U = {
      ...
      state.setTimeoutDuration(10000)
      ...
    }
    
    dataset                                     // type is Dataset[T]
      .groupByKey[K](keyingFunc)   // generates KeyValueGroupedDataset[K, T]
      .mapGroupsWithState[S, U](
         func = stateFunction, 
         timeout = KeyedStateTimeout.withProcessingTime)        // returns 
Dataset[U]
    ```
    
    Note the following design aspects.
    
    - The timeout type is provided as a param in mapGroupsWithState as a 
parameter global to all the keys. This is so that the planner knows this at 
planning time, and accordingly optimize the execution based on whether to saves 
extra info in state or not (e.g. timeout durations or timestamps).
    
    - The exact timeout duration is provided inside the function call so that 
it can be customized on a per key basis.
    
    - When the timeout occurs for a key, the function is called with no values, 
and KeyedState.isTimingOut() set to true.
    
    - The timeout is reset for key every time the function is called on the 
key, that is, when the key has new data, or the key has timed out. So the user 
has to set the timeout duration everytime the function is called, otherwise 
there will not be any timeout set.
    
    Implementation details:
    - Added new param to `mapGroupsWithState` for timeout
    - Refactored logic of `MapGroupsWithStateExec` to save timeout info to 
state store, and process timeouts *after* data has been processed in a batch. 
    - Added new method to StateStore to filter data based on timeout timestamp
    
    ## How was this patch tested?
    New unit tests in 
    - MapGroupsWithStateSuite for timeouts.
    - StateStoreSuite for new APIs in StateStore.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark mapgroupwithstate-timeout

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/17179.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #17179
    
----

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to