Rahul Shivu Mahadev created SPARK-35897: -------------------------------------------
Summary: Support user defined initial state with flatMapGroupsWithState in Structured Streaming Key: SPARK-35897 URL: https://issues.apache.org/jira/browse/SPARK-35897 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 3.1.2 Reporter: Rahul Shivu Mahadev Fix For: 3.2.0 Structured Streaming supports arbitrary stateful processing using mapGroupsWithState and flatMapGroupWithState operators. The state is created by processing the data that comes in with every batch. This API improvement will allow users to specify an initial state which is applied at the time of executing the first batch. h2. Proposed new APIs (Scala) def mapGroupsWithState[S: Encoder, U: Encoder]( timeoutConf: GroupStateTimeout, initialState: Dataset[(K, S)])( func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] def flatMapGroupsWithState[S: Encoder, U: Encoder]( outputMode: OutputMode, timeoutConf: GroupStateTimeout, initialState: Dataset[(K, S)])( func: (K, Iterator[V], GroupState[S]) => Iterator[U]) h2. Proposed new APIs (Java) def mapGroupsWithState[S, U]( func: MapGroupsWithStateFunction[K, V, S, U], stateEncoder: Encoder[S], outputEncoder: Encoder[U], timeoutConf: GroupStateTimeout, initialState: Dataset[(K, S)]): Dataset[U] def flatMapGroupsWithState[S, U]( func: FlatMapGroupsWithStateFunction[K, V, S, U], outputMode: OutputMode, stateEncoder: Encoder[S], outputEncoder: Encoder[U], timeoutConf: GroupStateTimeout, initialState: Dataset[(K, S)]): Dataset[U] h2. Example Usage val initialState: Dataset[(String, RunningCount)] = Seq( ("a", new RunningCount(1)), ("b", new RunningCount(1)) ).toDS() val inputData = MemoryStream[String] val result = inputData.toDS() .groupByKey(x => x) .mapGroupsWithState(initialState, timeoutConf)(stateFunc) -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org