The updateStateByKey can be supplied an initialRDD to populate it with. Per code ( https://github.com/apache/spark/blob/v1.4.0/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L435-L445 ).
Provided here for your convenience. /** * Return a new "state" DStream where the state for each key is updated by applying * the given function on the previous state of the key and the new values of the key. * org.apache.spark.Partitioner is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. * @param partitioner Partitioner for controlling the partitioning of each RDD in the new * DStream. * @param initialRDD initial state value of each key. * @tparam S State type */ def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner, initialRDD: RDD[(K, S)] ): DStream[(K, S)] = ssc.withScope { val cleanedUpdateF = sparkContext.clean(updateFunc) val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => { iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s))) } updateStateByKey(newUpdateFunc, partitioner, true, initialRDD) } Simple example by Aniket Bhatnagar from an earlier thread on the forum. def counter(events: Seq[Event], prevStateOpt: Option[Long]): Option[Long] = { val prevCount = prevStateOpt.getOrElse(0L) val newCount = prevCount + events.size Some(newCount) } val interval = 60 * 1000 val initialRDD = sparkContext.makeRDD(Array(1L, 2L, 3L, 4L, 5L)).map(_ * interval).map(n => (n % interval, n / interval)) val counts = eventsStream.map(event => { (event.timestamp - event.timestamp % interval, event) }).updateStateByKey[Long](PrintEventCountsByInterval.counter _, new HashPartitioner(3), initialRDD = initialRDD) counts.print() HTH. -Todd On Thu, Mar 10, 2016 at 1:35 AM, Zalzberg, Idan (Agoda) < idan.zalzb...@agoda.com> wrote: > Hi, > > > > I have a spark-streaming application that basically keeps track of a > string->string dictionary. > > > > So I have messages coming in with updates, like: > > “A”->”B” > > And I need to update the dictionary. > > > > This seems like a simple use case for the updateStateByKey method. > > > > However, my issue is that when the app starts I need to “initialize” the > dictionary with data from a hive table, that has all the historical > key/values with the dictionary. > > > > The only way I could think of is doing something like: > > > > val rdd =… //get data from hive > > *def *process(input: DStream[(String, String)]) = { > input.join(rdd).updateStateByKey(*update*) > } > > So the join operation will be done on every incoming buffer, where in fact > I only need it on initialization. > > > > Any idea how to achieve that? > > > > Thanks > > ------------------------------ > This message is confidential and is for the sole use of the intended > recipient(s). It may also be privileged or otherwise protected by copyright > or other legal rules. If you have received it by mistake please let us know > by reply email and delete it from your system. It is prohibited to copy > this message or disclose its content to anyone. Any confidentiality or > privilege is not waived or lost by any mistaken delivery or unauthorized > disclosure of the message. All messages sent to and from Agoda may be > monitored to ensure compliance with company policies, to protect the > company's interests and to remove potential malware. Electronic messages > may be intercepted, amended, lost or deleted, or contain viruses. >