GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/17179
[SPARK-19067][SS] Processing-time-based timeout in MapGroupsWithState ## What changes were proposed in this pull request? When a key does not get any new data in `mapGroupsWithState`, the mapping function is never called on it. So we need a timeout feature that calls the function again in such cases, so that the user can decide whether to continue waiting or clean up (remove state, save stuff externally, etc.). Timeouts can be either based on processing time or event time. This JIRA is for processing time, but defines the high level API design for both. The usage would look like this. ``` def stateFunction(key: K, value: Iterator[V], state: KeyedState[S]): U = { ... state.setTimeoutDuration(10000) ... } dataset // type is Dataset[T] .groupByKey[K](keyingFunc) // generates KeyValueGroupedDataset[K, T] .mapGroupsWithState[S, U]( func = stateFunction, timeout = KeyedStateTimeout.withProcessingTime) // returns Dataset[U] ``` Note the following design aspects. - The timeout type is provided as a param in mapGroupsWithState as a parameter global to all the keys. This is so that the planner knows this at planning time, and accordingly optimize the execution based on whether to saves extra info in state or not (e.g. timeout durations or timestamps). - The exact timeout duration is provided inside the function call so that it can be customized on a per key basis. - When the timeout occurs for a key, the function is called with no values, and KeyedState.isTimingOut() set to true. - The timeout is reset for key every time the function is called on the key, that is, when the key has new data, or the key has timed out. So the user has to set the timeout duration everytime the function is called, otherwise there will not be any timeout set. Implementation details: - Added new param to `mapGroupsWithState` for timeout - Refactored logic of `MapGroupsWithStateExec` to save timeout info to state store, and process timeouts *after* data has been processed in a batch. - Added new method to StateStore to filter data based on timeout timestamp ## How was this patch tested? New unit tests in - MapGroupsWithStateSuite for timeouts. - StateStoreSuite for new APIs in StateStore. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark mapgroupwithstate-timeout Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17179.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17179 ---- ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org