I go an error: Cannot update the state that is timing out Because I set the timeout: val newStateDstream = newActionDstream.mapWithState(StateSpec.function(mappingFunc).timeout(Seconds(3600)).initialState(initialRDD))
In the spark code : https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/State.scala the mappingFunction show: /** * :: Experimental :: * Abstract class for getting and updating the state in mapping function used in the `mapWithState` * operation of a [[org.apache.spark.streaming.dstream.PairDStreamFunctions pair DStream]] (Scala) * or a [[org.apache.spark.streaming.api.java.JavaPairDStream JavaPairDStream]] (Java). * * Scala example of using `State`: * {{{ * // A mapping function that maintains an integer state and returns a String * def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = { * // Check if state exists * if (state.exists) { * val existingState = state.get // Get the existing state * val shouldRemove = ... // Decide whether to remove the state * if (shouldRemove) { * state.remove() // Remove the state * } else { * val newState = ... * state.update(newState) // Set the new state * } * } else { * val initialState = ... * state.update(initialState) // Set the initial state * } * ... // return something * } * * }}} update will throw exception in the timeout batch: /** * Update the state with a new value. * * State cannot be updated if it has been already removed (that is, `remove()` has already been * called) or it is going to be removed due to timeout (that is, `isTimingOut()` is `true`). * * @throws java.lang.IllegalArgumentException If the state has already been removed, or is * going to be removed */ def update(newState: S): Unit I wonder how to handle timeout in mappingFunc without lost current batch data? -- http://www.cnblogs.com/hustlijian/ https://github.com/hustlijian