Hi all, I'm a complete newbie to spark and spark streaming, so the question may seem obvious, sorry for that. It is okay to store Seq[Data] in state when using 'updateStateByKey'? I have a function with signature def saveState(values: Seq[Msg], value: Option[Iterable[Msg]]): Option[Iterable[Msg]] that stores about 200k records in iterable. I've seen most examples having some kind of accumulators in state, so I'm wondering if having a collection is a normal usecase.
Maybe you can suggest how to solve my task without this mutable state. I have a kafka topic that generates about 20k messages/sec. I need to group messages based on some key and send groups to another topic. Groups should be sent when number of messages exceeds some count N OR when predefined time T has passed since the first message in a group has arrived, no matter if group messages count is less then N. First of all window functions come into mind, but the problem is that I need to send group as soon N messages arrived, not wait until window duration has passed. I decided to set batch size to 0.5 sec and T is about 3 sec. on each batch I first take groups that have enough messages and send them. The rest of the messages I put to shared state. In updateStateByKey I have all messages that have not been set yet - I again try to group them and send those groups that have enough messages. This way I check messages with latency 0.5 sec instead of 3s. Update function: def saveState(values: Seq[Iterable[Msg]], value: Option[(Iterable[Msg], Iterable[Msg])]): Option[(Iterable[Msg], Iterable[Msg])] = { // when does values size is greater than 1? I didn't get into that yet. if (values.size > 1){ throw new NullPointerException } // notSent - those that have not been sent yet val (notSent, _) = value.getOrElse((List(), List())) // discard sent // here goes more complex logic with verification if message should be sent based on its arrival time // for now it is simplified val all = notSent ++ values(0) val result = all.groupBy(_.key) .partition(ifNotSend _) Some((result._1.values.flatten, result._2.values.flatten)) } The whole code: val batchSize = 5 // will "persist" speed-up anything here? val grouped = inputStream.map(msg => (msg.key, msg)).groupByKey().persist() def ifSend(x: (Int, Iterable[_])) = x._2.size >= batchSize def ifNotSend(x: (Int, Iterable[_])) = !ifSend(x) val readyToSend = grouped.filter(ifSend _) readyToSend.foreachRDD(rdd => { // send to kafka }) // this should not be sent immediately but combined with those val incomplete = grouped.filter(ifNotSend _) /** * returns (Seq[Msg], Seq[Msg]) * _1 - messages that should not be sent and preserved for next batch execution * _2 - messages that * */ def saveState(values: Seq[Iterable[Msg]], value: Option[(Iterable[Msg], Iterable[Msg])]): Option[(Iterable[Msg], Iterable[Msg])] = { if (values.size > 1){ throw new NullPointerException } // notSent - those that have not been sent yet val (notSent, _) = value.getOrElse((List(), List())) // discard sent // here goes more complex logic with verification if message should be sent based on its arrival time // for now it is simplified val all = notSent ++ values(0) val result = all.groupBy(_.key) .partition(ifNotSend _) Some((result._1.values.flatten, result._2.values.flatten)) } val state = incomplete.updateStateByKey(saveState _) state.foreachRDD(rdd => { val messagesToSend = rdd.filter(x => x._2._2.nonEmpty) .map(x => x._2._2) println(messagesToSend.collect().flatten.mkString(",")) println("----------------") }) Maybe you could suggest a better/more efficient solution? Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Storing-a-lot-of-state-with-updateStateByKey-tp22890.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org