[ 
https://issues.apache.org/jira/browse/SPARK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903532#comment-15903532
 ] 

Amit Sela commented on SPARK-19067:
-----------------------------------

[~tdas] I just read the PR, and I'm very excited for Spark to provide such a 
powerful stateful operator!
I added a few comments in the PR based on my first impressions, hope you don't 
mind.
I assume that for event-time-timeouts you'd look at the Watermark time instead 
of Wall time, correct ? how would that work ? If I get it right it's all 
represented as a table so the "Watermark Manager" would constantly right 
updates to the table in the "Watermark Column" ?

> mapGroupsWithState - arbitrary stateful operations with Structured Streaming 
> (similar to DStream.mapWithState)
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-19067
>                 URL: https://issues.apache.org/jira/browse/SPARK-19067
>             Project: Spark
>          Issue Type: New Feature
>          Components: Structured Streaming
>            Reporter: Michael Armbrust
>            Assignee: Tathagata Das
>            Priority: Critical
>
> Right now the only way to do stateful operations with with Aggregator or 
> UDAF.  However, this does not give users control of emission or expiration of 
> state making it hard to implement things like sessionization.  We should add 
> a more general construct (probably similar to {{DStream.mapWithState}}) to 
> structured streaming. Here is the design. 
> *Requirements*
> - Users should be able to specify a function that can do the following
> - Access the input row corresponding to a key
> - Access the previous state corresponding to a key
> - Optionally, update or remove the state
> - Output any number of new rows (or none at all)
> *Proposed API*
> {code}
> // ------------ New methods on KeyValueGroupedDataset ------------
> class KeyValueGroupedDataset[K, V] {  
>       // Scala friendly
>       def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], 
> State[S]) => U)
>         def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, 
> Iterator[V], State[S]) => Iterator[U])
>       // Java friendly
>        def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, 
> R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
>        def flatMapGroupsWithState[S, U](func: 
> FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], 
> resultEncoder: Encoder[U])
> }
> // ------------------- New Java-friendly function classes ------------------- 
> public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
>   R call(K key, Iterator<V> values, state: State<S>) throws Exception;
> }
> public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends 
> Serializable {
>   Iterator<R> call(K key, Iterator<V> values, state: State<S>) throws 
> Exception;
> }
> // ---------------------- Wrapper class for state data ---------------------- 
> trait KeyedState[S] {
>       def exists(): Boolean   
>       def get(): S                    // throws Exception is state does not 
> exist
>       def getOption(): Option[S]       
>       def update(newState: S): Unit
>       def remove(): Unit              // exists() will be false after this
> }
> {code}
> Key Semantics of the State class
> - The state can be null.
> - If the state.remove() is called, then state.exists() will return false, and 
> getOption will returm None.
> - After that state.update(newState) is called, then state.exists() will 
> return true, and getOption will return Some(...). 
> - None of the operations are thread-safe. This is to avoid memory barriers.
> *Usage*
> {code}
> val stateFunc = (word: String, words: Iterator[String, runningCount: 
> KeyedState[Long]) => {
>     val newCount = words.size + runningCount.getOption.getOrElse(0L)
>     runningCount.update(newCount)
>    (word, newCount)
> }
> dataset                                                               // type 
> is Dataset[String]
>   .groupByKey[String](w => w)                         // generates 
> KeyValueGroupedDataset[String, String]
>   .mapGroupsWithState[Long, (String, Long)](stateFunc)        // returns 
> Dataset[(String, Long)]
> {code}
> *Future Directions*
> - Timeout based state expiration (that has not received data for a while)
> - General expression based expiration 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to