Could you write your update func like this?

    val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])])
=> {
      iterator.flatMap { case (key, values, stateOption) =>
        if (values.isEmpty) {
          // don't access database
        } else {
          // update to new state and save to database
        // return new state

and use this overload:

def updateStateByKey[S: ClassTag](
      updateFunc: (Seq[V], Option[S]) => Option[S],
      partitioner: Partitioner
    ): DStream[(K, S)]

There is a JIRA: but
doesn't have a doc now...

Best Regards,
Shixiong Zhu

2015-09-24 17:26 GMT+08:00 Bin Wang <>:

> Data that are not updated should be saved earlier: while the data added to
> the DStream at the first time, it should be considered as updated. So save
> the same data again is a waste.
> What are the community is doing? Is there any doc or discussion that I can
> look for? Thanks.
> Shixiong Zhu <>于2015年9月24日周四 下午4:27写道:
>> For data that are not updated, where do you save? Or do you only want to
>> avoid accessing database for those that are not updated?
>> Besides,  the community is working on optimizing "updateStateBykey"'s
>> performance. Hope it will be delivered soon.
>> Best Regards,
>> Shixiong Zhu
>> 2015-09-24 13:45 GMT+08:00 Bin Wang <>:
>>> I've read the source code and it seems to be impossible, but I'd like to
>>> confirm it.
>>> It is a very useful feature. For example, I need to store the state of
>>> DStream into my database, in order to recovery them from next redeploy. But
>>> I only need to save the updated ones. Save all keys into database is a lot
>>> of waste.
>>> Through the source code, I think it could be add easily: StateDStream
>>> can get prevStateRDD so that it can make a diff. Is there any chance to add
>>> this as an API of StateDStream? If so, I can work on this feature.
>>> If not possible, is there any work around or hack to do this by myself?

Reply via email to