[ 
https://issues.apache.org/jira/browse/SPARK-35897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17369625#comment-17369625
 ] 

Apache Spark commented on SPARK-35897:
--------------------------------------

User 'rahulsmahadev' has created a pull request for this issue:
https://github.com/apache/spark/pull/33093

> Support user defined initial state with flatMapGroupsWithState in Structured 
> Streaming
> --------------------------------------------------------------------------------------
>
>                 Key: SPARK-35897
>                 URL: https://issues.apache.org/jira/browse/SPARK-35897
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 3.1.2
>            Reporter: Rahul Shivu Mahadev
>            Priority: Major
>             Fix For: 3.2.0
>
>
> Structured Streaming supports arbitrary stateful processing using 
> mapGroupsWithState and flatMapGroupWithState operators. The state is created 
> by processing the data that comes in with every batch. This API improvement 
> will allow users to specify an initial state which is applied at the time of 
> executing the first batch.
>  
> h2. Proposed new APIs (Scala)
>  
>  
>   def mapGroupsWithState[S: Encoder, U: Encoder](
>       timeoutConf: GroupStateTimeout,
>       initialState: Dataset[(K, S)])(
>       func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] 
>  
>   def flatMapGroupsWithState[S: Encoder, U: Encoder](
>       outputMode: OutputMode,
>       timeoutConf: GroupStateTimeout,
>       initialState: Dataset[(K, S)])(
>       func: (K, Iterator[V], GroupState[S]) => Iterator[U])
>  
> h2.    Proposed new APIs (Java)
>   
>     def mapGroupsWithState[S, U](
>       func: MapGroupsWithStateFunction[K, V, S, U],
>       stateEncoder: Encoder[S],
>       outputEncoder: Encoder[U],
>       timeoutConf: GroupStateTimeout,
>       initialState: Dataset[(K, S)]): Dataset[U]
>     def flatMapGroupsWithState[S, U](
>       func: FlatMapGroupsWithStateFunction[K, V, S, U],
>       outputMode: OutputMode,
>       stateEncoder: Encoder[S],
>       outputEncoder: Encoder[U],
>       timeoutConf: GroupStateTimeout,
>       initialState: Dataset[(K, S)]): Dataset[U]
>  
>    
> h2. Example Usage
>    
>     val initialState: Dataset[(String, RunningCount)] = Seq(
>       ("a", new RunningCount(1)),
>  ("b", new RunningCount(1))
>     ).toDS()
>  
>     val inputData = MemoryStream[String]
>     val result =
>       inputData.toDS()
>         .groupByKey(x => x)
>         .mapGroupsWithState(initialState, timeoutConf)(stateFunc)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to