Hi all, 
I'm a complete newbie to spark and spark streaming, so the question may seem
obvious, sorry for that.
It is okay to store Seq[Data] in state when using 'updateStateByKey'? I have
a function with signature
def saveState(values: Seq[Msg], value: Option[Iterable[Msg]]):
Option[Iterable[Msg]] that stores about 200k records in iterable. I've seen
most examples having some kind of accumulators in state, so I'm wondering if
having a collection is a normal usecase.

Maybe you can suggest how to solve my task without this mutable state. I
have a kafka topic that generates about 20k messages/sec. I need to group
messages based on some key and send groups to another topic. Groups should
be sent when number of messages exceeds some count N OR when predefined time
T has passed since the first message in a group has arrived, no matter if
group messages count is less then N. First of all window functions come into
mind, but the problem is that I need to send group as soon N messages
arrived, not wait until window duration has passed.
I decided to set batch size to 0.5 sec and T is about 3 sec. on each batch I
first take groups that have enough messages and send them. The rest of the
messages I put to shared state. In updateStateByKey I have all messages that
have not been set yet - I again try to group them and send those groups that
have enough messages. This way I check messages with latency 0.5 sec instead
of 3s.

Update function:
 def saveState(values: Seq[Iterable[Msg]], value: Option[(Iterable[Msg],
Iterable[Msg])]): Option[(Iterable[Msg], Iterable[Msg])] = {
      // when does values size is greater than 1? I didn't get into that
yet.
      if (values.size > 1){
        throw new NullPointerException
      }

      // notSent - those that have not been sent yet
      val (notSent, _) = value.getOrElse((List(), List()))
      // discard sent

      // here goes more complex logic with verification if message should be
sent based on its arrival time
      // for now it is simplified
      val all = notSent ++ values(0)
      val result = all.groupBy(_.key)
        .partition(ifNotSend _)

      Some((result._1.values.flatten, result._2.values.flatten))
    }

The whole code:
    val batchSize = 5

    // will "persist" speed-up anything here?
    val grouped = inputStream.map(msg => (msg.key,
msg)).groupByKey().persist()

    def ifSend(x: (Int, Iterable[_])) = x._2.size >= batchSize
    def ifNotSend(x: (Int, Iterable[_])) = !ifSend(x)

    val readyToSend = grouped.filter(ifSend _)
    readyToSend.foreachRDD(rdd => {
      // send to kafka
    })
    // this should not be sent immediately but combined with those
    val incomplete = grouped.filter(ifNotSend _)

    /**
     * returns (Seq[Msg], Seq[Msg])
     * _1 - messages that should not be sent and preserved for next batch
execution
     * _2 - messages that
     *
     */
    def saveState(values: Seq[Iterable[Msg]], value: Option[(Iterable[Msg],
Iterable[Msg])]): Option[(Iterable[Msg], Iterable[Msg])] = {
      if (values.size > 1){
        throw new NullPointerException
      }

      // notSent - those that have not been sent yet
      val (notSent, _) = value.getOrElse((List(), List()))
      // discard sent

      // here goes more complex logic with verification if message should be
sent based on its arrival time
      // for now it is simplified
      val all = notSent ++ values(0)
      val result = all.groupBy(_.key)
        .partition(ifNotSend _)

      Some((result._1.values.flatten, result._2.values.flatten))
    }

    val state = incomplete.updateStateByKey(saveState _)
    state.foreachRDD(rdd => {
      val messagesToSend = rdd.filter(x => x._2._2.nonEmpty)
        .map(x => x._2._2)
      println(messagesToSend.collect().flatten.mkString(","))
      println("----------------")
    })


Maybe you could suggest a better/more efficient solution?

Thanks in advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Storing-a-lot-of-state-with-updateStateByKey-tp22890.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to