[ https://issues.apache.org/jira/browse/SPARK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903532#comment-15903532 ]
Amit Sela commented on SPARK-19067: ----------------------------------- [~tdas] I just read the PR, and I'm very excited for Spark to provide such a powerful stateful operator! I added a few comments in the PR based on my first impressions, hope you don't mind. I assume that for event-time-timeouts you'd look at the Watermark time instead of Wall time, correct ? how would that work ? If I get it right it's all represented as a table so the "Watermark Manager" would constantly right updates to the table in the "Watermark Column" ? > mapGroupsWithState - arbitrary stateful operations with Structured Streaming > (similar to DStream.mapWithState) > -------------------------------------------------------------------------------------------------------------- > > Key: SPARK-19067 > URL: https://issues.apache.org/jira/browse/SPARK-19067 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming > Reporter: Michael Armbrust > Assignee: Tathagata Das > Priority: Critical > > Right now the only way to do stateful operations with with Aggregator or > UDAF. However, this does not give users control of emission or expiration of > state making it hard to implement things like sessionization. We should add > a more general construct (probably similar to {{DStream.mapWithState}}) to > structured streaming. Here is the design. > *Requirements* > - Users should be able to specify a function that can do the following > - Access the input row corresponding to a key > - Access the previous state corresponding to a key > - Optionally, update or remove the state > - Output any number of new rows (or none at all) > *Proposed API* > {code} > // ------------ New methods on KeyValueGroupedDataset ------------ > class KeyValueGroupedDataset[K, V] { > // Scala friendly > def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], > State[S]) => U) > def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, > Iterator[V], State[S]) => Iterator[U]) > // Java friendly > def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, > R], stateEncoder: Encoder[S], resultEncoder: Encoder[U]) > def flatMapGroupsWithState[S, U](func: > FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], > resultEncoder: Encoder[U]) > } > // ------------------- New Java-friendly function classes ------------------- > public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable { > R call(K key, Iterator<V> values, state: State<S>) throws Exception; > } > public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends > Serializable { > Iterator<R> call(K key, Iterator<V> values, state: State<S>) throws > Exception; > } > // ---------------------- Wrapper class for state data ---------------------- > trait KeyedState[S] { > def exists(): Boolean > def get(): S // throws Exception is state does not > exist > def getOption(): Option[S] > def update(newState: S): Unit > def remove(): Unit // exists() will be false after this > } > {code} > Key Semantics of the State class > - The state can be null. > - If the state.remove() is called, then state.exists() will return false, and > getOption will returm None. > - After that state.update(newState) is called, then state.exists() will > return true, and getOption will return Some(...). > - None of the operations are thread-safe. This is to avoid memory barriers. > *Usage* > {code} > val stateFunc = (word: String, words: Iterator[String, runningCount: > KeyedState[Long]) => { > val newCount = words.size + runningCount.getOption.getOrElse(0L) > runningCount.update(newCount) > (word, newCount) > } > dataset // type > is Dataset[String] > .groupByKey[String](w => w) // generates > KeyValueGroupedDataset[String, String] > .mapGroupsWithState[Long, (String, Long)](stateFunc) // returns > Dataset[(String, Long)] > {code} > *Future Directions* > - Timeout based state expiration (that has not received data for a while) > - General expression based expiration -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org