GitHub user tdas opened a pull request:

    https://github.com/apache/spark/pull/16758

    Arbitrary stateful operations with MapGroupsWithState

    ## What changes were proposed in this pull request?
    
    `mapGroupsWithState` is a new API for arbitrary stateful operations in 
Structured Streaming, similar to `DStream.mapWithState` 
    
    *Requirements*
    - Users should be able to specify a function that can do the following
    - Access the input row corresponding to a key
    - Access the previous state corresponding to a key
    - Optionally, update or remove the state
    - Output any number of new rows (or none at all)
    
    *Proposed API*
    ```
    // ------------ New methods on KeyValueGroupedDataset ------------
    class KeyValueGroupedDataset[K, V] {        
        // Scala friendly
        def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], 
State[S]) => U)
            def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, 
Iterator[V], State[S]) => Iterator[U])
        // Java friendly
           def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, 
S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
           def flatMapGroupsWithState[S, U](func: 
FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], 
resultEncoder: Encoder[U])
    }
    
    // ------------------- New Java-friendly function classes 
------------------- 
    public interface MapGroupsWithStateFunction<K, V, S, R> extends 
Serializable {
      R call(K key, Iterator<V> values, state: State<S>) throws Exception;
    }
    public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends 
Serializable {
      Iterator<R> call(K key, Iterator<V> values, state: State<S>) throws 
Exception;
    }
    
    // ---------------------- Wrapper class for state data 
---------------------- 
    trait State[S] {
        def exists(): Boolean   
        def get(): S                    // throws Exception is state does not 
exist
        def getOption(): Option[S]       
        def update(newState: S): Unit
        def remove(): Unit              // exists() will be false after this
    }
    ```
    
    Key Semantics of the State class
    - The state can be null.
    - If the state.remove() is called, then state.exists() will return false, 
and getOption will returm None.
    - After that state.update(newState) is called, then state.exists() will 
return true, and getOption will return Some(...). 
    - None of the operations are thread-safe. This is to avoid memory barriers.
    
    *Usage*
    ```
    val stateFunc = (word: String, words: Iterator[String, runningCount: 
State[Long]) => {
        val newCount = words.size + runningCount.getOption.getOrElse(0L)
        runningCount.update(newCount)
       (word, newCount)
    }
    
    dataset                                                             // type 
is Dataset[String]
      .groupByKey[String](w => w)                               // generates 
KeyValueGroupedDataset[String, String]
      .mapGroupsWithState[Long, (String, Long)](stateFunc)      // returns 
Dataset[(String, Long)]
    ```
    
    ## How was this patch tested?
    New unit tests.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tdas/spark mapWithState

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/16758.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #16758
    
----
commit d10da2700a970c82537b678db34b2d80cebebcc8
Author: Tathagata Das <tathagata.das1...@gmail.com>
Date:   2017-01-18T00:45:54Z

    Prototype - almost working

commit 78cd1853033a091b62cb350879bb6c3a0b6c8641
Author: Tathagata Das <tathagata.das1...@gmail.com>
Date:   2017-01-18T01:05:51Z

    Renamed to mapGroupsWithState

commit 0c22e08a8f9ad66a49bc939652fee14577f9bd4b
Author: Tathagata Das <tathagata.das1...@gmail.com>
Date:   2017-01-18T01:56:07Z

    Fixed bugs

commit 52e14e479ffa1e38c9efc5b063a95831caab6997
Author: Tathagata Das <tathagata.das1...@gmail.com>
Date:   2017-01-18T10:52:20Z

    Removed prints

commit 529aefe6d7cb9cd54e20c0cdaa11cec90a4f16be
Author: Tathagata Das <tathagata.das1...@gmail.com>
Date:   2017-01-18T10:58:27Z

    Test state remove

commit 57f5e8d2e8a74a8269667a9d4d89971eb9107c07
Author: Tathagata Das <tathagata.das1...@gmail.com>
Date:   2017-01-18T20:26:53Z

    Test restart, and test with metrics

commit 3e0d8dcfa81d58ae6ca6754cc54c19383179802a
Author: Tathagata Das <tathagata.das1...@gmail.com>
Date:   2017-01-21T02:25:29Z

    Fixed everything

commit b54fa230eda141316713e3b1d1c56d8a28fd3a6c
Author: Tathagata Das <tathagata.das1...@gmail.com>
Date:   2017-01-30T02:06:00Z

    Refactored, added java APIs and tests

commit ab3cb6c961f0d861a24c2146dcb9dc0380c8adc9
Author: Tathagata Das <tathagata.das1...@gmail.com>
Date:   2017-01-30T05:07:19Z

    Refactored

commit ddf4550b765af89a4ed7d80edabfe3370cbd1e23
Author: Tathagata Das <tathagata.das1...@gmail.com>
Date:   2017-01-30T21:29:44Z

    Added more test

commit 3133f83195ca562e866f6565cdfbd59e118cb6aa
Author: Tathagata Das <tathagata.das1...@gmail.com>
Date:   2017-01-31T18:53:58Z

    Merge remote-tracking branch 'apache-github/master' into mapWithState

commit 6fab7a5fde75309198d1e73e66948d82d0f590e6
Author: Tathagata Das <tathagata.das1...@gmail.com>
Date:   2017-01-31T19:16:16Z

    Added docs

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to