Rahul Shivu Mahadev created SPARK-35897:
-------------------------------------------

             Summary: Support user defined initial state with 
flatMapGroupsWithState in Structured Streaming
                 Key: SPARK-35897
                 URL: https://issues.apache.org/jira/browse/SPARK-35897
             Project: Spark
          Issue Type: Improvement
          Components: Structured Streaming
    Affects Versions: 3.1.2
            Reporter: Rahul Shivu Mahadev
             Fix For: 3.2.0


Structured Streaming supports arbitrary stateful processing using 
mapGroupsWithState and flatMapGroupWithState operators. The state is created by 
processing the data that comes in with every batch. This API improvement will 
allow users to specify an initial state which is applied at the time of 
executing the first batch.

 
h2. Proposed new APIs (Scala)

 

 

  def mapGroupsWithState[S: Encoder, U: Encoder](

      timeoutConf: GroupStateTimeout,

      initialState: Dataset[(K, S)])(

      func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] 

 

  def flatMapGroupsWithState[S: Encoder, U: Encoder](

      outputMode: OutputMode,

      timeoutConf: GroupStateTimeout,

      initialState: Dataset[(K, S)])(

      func: (K, Iterator[V], GroupState[S]) => Iterator[U])

 
h2.    Proposed new APIs (Java)

  

    def mapGroupsWithState[S, U](

      func: MapGroupsWithStateFunction[K, V, S, U],

      stateEncoder: Encoder[S],

      outputEncoder: Encoder[U],

      timeoutConf: GroupStateTimeout,

      initialState: Dataset[(K, S)]): Dataset[U]





    def flatMapGroupsWithState[S, U](

      func: FlatMapGroupsWithStateFunction[K, V, S, U],

      outputMode: OutputMode,

      stateEncoder: Encoder[S],

      outputEncoder: Encoder[U],

      timeoutConf: GroupStateTimeout,

      initialState: Dataset[(K, S)]): Dataset[U]

 

   
h2. Example Usage

   

    val initialState: Dataset[(String, RunningCount)] = Seq(

      ("a", new RunningCount(1)),

 ("b", new RunningCount(1))

    ).toDS()

 

    val inputData = MemoryStream[String]

    val result =

      inputData.toDS()

        .groupByKey(x => x)

        .mapGroupsWithState(initialState, timeoutConf)(stateFunc)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to