GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/16758
Arbitrary stateful operations with MapGroupsWithState ## What changes were proposed in this pull request? `mapGroupsWithState` is a new API for arbitrary stateful operations in Structured Streaming, similar to `DStream.mapWithState` *Requirements* - Users should be able to specify a function that can do the following - Access the input row corresponding to a key - Access the previous state corresponding to a key - Optionally, update or remove the state - Output any number of new rows (or none at all) *Proposed API* ``` // ------------ New methods on KeyValueGroupedDataset ------------ class KeyValueGroupedDataset[K, V] { // Scala friendly def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], State[S]) => U) def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, Iterator[V], State[S]) => Iterator[U]) // Java friendly def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U]) def flatMapGroupsWithState[S, U](func: FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U]) } // ------------------- New Java-friendly function classes ------------------- public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable { R call(K key, Iterator<V> values, state: State<S>) throws Exception; } public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends Serializable { Iterator<R> call(K key, Iterator<V> values, state: State<S>) throws Exception; } // ---------------------- Wrapper class for state data ---------------------- trait State[S] { def exists(): Boolean def get(): S // throws Exception is state does not exist def getOption(): Option[S] def update(newState: S): Unit def remove(): Unit // exists() will be false after this } ``` Key Semantics of the State class - The state can be null. - If the state.remove() is called, then state.exists() will return false, and getOption will returm None. - After that state.update(newState) is called, then state.exists() will return true, and getOption will return Some(...). - None of the operations are thread-safe. This is to avoid memory barriers. *Usage* ``` val stateFunc = (word: String, words: Iterator[String, runningCount: State[Long]) => { val newCount = words.size + runningCount.getOption.getOrElse(0L) runningCount.update(newCount) (word, newCount) } dataset // type is Dataset[String] .groupByKey[String](w => w) // generates KeyValueGroupedDataset[String, String] .mapGroupsWithState[Long, (String, Long)](stateFunc) // returns Dataset[(String, Long)] ``` ## How was this patch tested? New unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark mapWithState Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/16758.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #16758 ---- commit d10da2700a970c82537b678db34b2d80cebebcc8 Author: Tathagata Das <tathagata.das1...@gmail.com> Date: 2017-01-18T00:45:54Z Prototype - almost working commit 78cd1853033a091b62cb350879bb6c3a0b6c8641 Author: Tathagata Das <tathagata.das1...@gmail.com> Date: 2017-01-18T01:05:51Z Renamed to mapGroupsWithState commit 0c22e08a8f9ad66a49bc939652fee14577f9bd4b Author: Tathagata Das <tathagata.das1...@gmail.com> Date: 2017-01-18T01:56:07Z Fixed bugs commit 52e14e479ffa1e38c9efc5b063a95831caab6997 Author: Tathagata Das <tathagata.das1...@gmail.com> Date: 2017-01-18T10:52:20Z Removed prints commit 529aefe6d7cb9cd54e20c0cdaa11cec90a4f16be Author: Tathagata Das <tathagata.das1...@gmail.com> Date: 2017-01-18T10:58:27Z Test state remove commit 57f5e8d2e8a74a8269667a9d4d89971eb9107c07 Author: Tathagata Das <tathagata.das1...@gmail.com> Date: 2017-01-18T20:26:53Z Test restart, and test with metrics commit 3e0d8dcfa81d58ae6ca6754cc54c19383179802a Author: Tathagata Das <tathagata.das1...@gmail.com> Date: 2017-01-21T02:25:29Z Fixed everything commit b54fa230eda141316713e3b1d1c56d8a28fd3a6c Author: Tathagata Das <tathagata.das1...@gmail.com> Date: 2017-01-30T02:06:00Z Refactored, added java APIs and tests commit ab3cb6c961f0d861a24c2146dcb9dc0380c8adc9 Author: Tathagata Das <tathagata.das1...@gmail.com> Date: 2017-01-30T05:07:19Z Refactored commit ddf4550b765af89a4ed7d80edabfe3370cbd1e23 Author: Tathagata Das <tathagata.das1...@gmail.com> Date: 2017-01-30T21:29:44Z Added more test commit 3133f83195ca562e866f6565cdfbd59e118cb6aa Author: Tathagata Das <tathagata.das1...@gmail.com> Date: 2017-01-31T18:53:58Z Merge remote-tracking branch 'apache-github/master' into mapWithState commit 6fab7a5fde75309198d1e73e66948d82d0f590e6 Author: Tathagata Das <tathagata.das1...@gmail.com> Date: 2017-01-31T19:16:16Z Added docs ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org