The updateStateByKey can be supplied an initialRDD to populate it with.
Per code (

Provided here for your convenience.

   * Return a new "state" DStream where the state for each key is
updated by applying
   * the given function on the previous state of the key and the new
values of the key.
   * org.apache.spark.Partitioner is used to control the partitioning
of each RDD.
   * @param updateFunc State update function. If `this` function
returns None, then
   *                   corresponding state key-value pair will be eliminated.
   * @param partitioner Partitioner for controlling the partitioning
of each RDD in the new
   *                    DStream.
   * @param initialRDD initial state value of each key.
   * @tparam S State type
  def updateStateByKey[S: ClassTag](
      updateFunc: (Seq[V], Option[S]) => Option[S],
      partitioner: Partitioner,
      initialRDD: RDD[(K, S)]
    ): DStream[(K, S)] = ssc.withScope {
    val cleanedUpdateF = sparkContext.clean(updateFunc)
    val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
      iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s)))
    updateStateByKey(newUpdateFunc, partitioner, true, initialRDD)

Simple example by Aniket Bhatnagar from an earlier thread on the forum.

def counter(events: Seq[Event], prevStateOpt: Option[Long]): Option[Long] = {
  val prevCount = prevStateOpt.getOrElse(0L)
  val newCount = prevCount + events.size
val interval = 60 * 1000
val initialRDD = sparkContext.makeRDD(Array(1L, 2L, 3L, 4L, 5L)).map(_
* interval).map(n => (n % interval, n / interval))
val counts = => {
  (event.timestamp - event.timestamp % interval, event)
}).updateStateByKey[Long](PrintEventCountsByInterval.counter _, new
HashPartitioner(3), initialRDD = initialRDD)



On Thu, Mar 10, 2016 at 1:35 AM, Zalzberg, Idan (Agoda) <> wrote:

> Hi,
> I have a spark-streaming application that basically keeps track of a
> string->string dictionary.
> So I have messages coming in with updates, like:
> “A”->”B”
> And I need to update the dictionary.
> This seems like a simple use case for the updateStateByKey method.
> However, my issue is that when the app starts I need to “initialize” the
> dictionary with data from a hive table, that has all the historical
> key/values with the dictionary.
> The only way I could think of is doing something like:
> val rdd =… //get data from hive
> *def *process(input: DStream[(String, String)]) = {
>     input.join(rdd).updateStateByKey(*update*)
>   }
> So the join operation will be done on every incoming buffer, where in fact
> I only need it on initialization.
> Any idea how to achieve that?
> Thanks
