[ https://issues.apache.org/jira/browse/SPARK-35897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17369625#comment-17369625 ]
Apache Spark commented on SPARK-35897: -------------------------------------- User 'rahulsmahadev' has created a pull request for this issue: https://github.com/apache/spark/pull/33093 > Support user defined initial state with flatMapGroupsWithState in Structured > Streaming > -------------------------------------------------------------------------------------- > > Key: SPARK-35897 > URL: https://issues.apache.org/jira/browse/SPARK-35897 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming > Affects Versions: 3.1.2 > Reporter: Rahul Shivu Mahadev > Priority: Major > Fix For: 3.2.0 > > > Structured Streaming supports arbitrary stateful processing using > mapGroupsWithState and flatMapGroupWithState operators. The state is created > by processing the data that comes in with every batch. This API improvement > will allow users to specify an initial state which is applied at the time of > executing the first batch. > > h2. Proposed new APIs (Scala) > > > def mapGroupsWithState[S: Encoder, U: Encoder]( > timeoutConf: GroupStateTimeout, > initialState: Dataset[(K, S)])( > func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] > > def flatMapGroupsWithState[S: Encoder, U: Encoder]( > outputMode: OutputMode, > timeoutConf: GroupStateTimeout, > initialState: Dataset[(K, S)])( > func: (K, Iterator[V], GroupState[S]) => Iterator[U]) > > h2. Proposed new APIs (Java) > > def mapGroupsWithState[S, U]( > func: MapGroupsWithStateFunction[K, V, S, U], > stateEncoder: Encoder[S], > outputEncoder: Encoder[U], > timeoutConf: GroupStateTimeout, > initialState: Dataset[(K, S)]): Dataset[U] > def flatMapGroupsWithState[S, U]( > func: FlatMapGroupsWithStateFunction[K, V, S, U], > outputMode: OutputMode, > stateEncoder: Encoder[S], > outputEncoder: Encoder[U], > timeoutConf: GroupStateTimeout, > initialState: Dataset[(K, S)]): Dataset[U] > > > h2. Example Usage > > val initialState: Dataset[(String, RunningCount)] = Seq( > ("a", new RunningCount(1)), > ("b", new RunningCount(1)) > ).toDS() > > val inputData = MemoryStream[String] > val result = > inputData.toDS() > .groupByKey(x => x) > .mapGroupsWithState(initialState, timeoutConf)(stateFunc) -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org