The updateStateByKey can be supplied an initialRDD to populate it with.
Per code (
https://github.com/apache/spark/blob/v1.4.0/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L435-L445
).

Provided here for your convenience.


  /**
   * Return a new "state" DStream where the state for each key is
updated by applying
   * the given function on the previous state of the key and the new
values of the key.
   * org.apache.spark.Partitioner is used to control the partitioning
of each RDD.
   * @param updateFunc State update function. If `this` function
returns None, then
   *                   corresponding state key-value pair will be eliminated.
   * @param partitioner Partitioner for controlling the partitioning
of each RDD in the new
   *                    DStream.
   * @param initialRDD initial state value of each key.
   * @tparam S State type
   */
  def updateStateByKey[S: ClassTag](
      updateFunc: (Seq[V], Option[S]) => Option[S],
      partitioner: Partitioner,
      initialRDD: RDD[(K, S)]
    ): DStream[(K, S)] = ssc.withScope {
    val cleanedUpdateF = sparkContext.clean(updateFunc)
    val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {
      iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s)))
    }
    updateStateByKey(newUpdateFunc, partitioner, true, initialRDD)
  }

Simple example by Aniket Bhatnagar from an earlier thread on the forum.

def counter(events: Seq[Event], prevStateOpt: Option[Long]): Option[Long] = {
  val prevCount = prevStateOpt.getOrElse(0L)
  val newCount = prevCount + events.size
  Some(newCount)
}
val interval = 60 * 1000
val initialRDD = sparkContext.makeRDD(Array(1L, 2L, 3L, 4L, 5L)).map(_
* interval).map(n => (n % interval, n / interval))
val counts = eventsStream.map(event => {
  (event.timestamp - event.timestamp % interval, event)
}).updateStateByKey[Long](PrintEventCountsByInterval.counter _, new
HashPartitioner(3), initialRDD = initialRDD)
counts.print()


HTH.

-Todd


On Thu, Mar 10, 2016 at 1:35 AM, Zalzberg, Idan (Agoda) <
idan.zalzb...@agoda.com> wrote:

> Hi,
>
>
>
> I have a spark-streaming application that basically keeps track of a
> string->string dictionary.
>
>
>
> So I have messages coming in with updates, like:
>
> “A”->”B”
>
> And I need to update the dictionary.
>
>
>
> This seems like a simple use case for the updateStateByKey method.
>
>
>
> However, my issue is that when the app starts I need to “initialize” the
> dictionary with data from a hive table, that has all the historical
> key/values with the dictionary.
>
>
>
> The only way I could think of is doing something like:
>
>
>
> val rdd =… //get data from hive
>
> *def *process(input: DStream[(String, String)]) = {
>     input.join(rdd).updateStateByKey(*update*)
>   }
>
> So the join operation will be done on every incoming buffer, where in fact
> I only need it on initialization.
>
>
>
> Any idea how to achieve that?
>
>
>
> Thanks
>
> ------------------------------
> This message is confidential and is for the sole use of the intended
> recipient(s). It may also be privileged or otherwise protected by copyright
> or other legal rules. If you have received it by mistake please let us know
> by reply email and delete it from your system. It is prohibited to copy
> this message or disclose its content to anyone. Any confidentiality or
> privilege is not waived or lost by any mistaken delivery or unauthorized
> disclosure of the message. All messages sent to and from Agoda may be
> monitored to ensure compliance with company policies, to protect the
> company's interests and to remove potential malware. Electronic messages
> may be intercepted, amended, lost or deleted, or contain viruses.
>

Reply via email to