Could you write your update func like this? val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => { iterator.flatMap { case (key, values, stateOption) => if (values.isEmpty) { // don't access database } else { // update to new state and save to database } // return new state } }
and use this overload: def updateStateByKey[S: ClassTag]( updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner ): DStream[(K, S)] There is a JIRA: https://issues.apache.org/jira/browse/SPARK-2629 but doesn't have a doc now... Best Regards, Shixiong Zhu 2015-09-24 17:26 GMT+08:00 Bin Wang <wbi...@gmail.com>: > Data that are not updated should be saved earlier: while the data added to > the DStream at the first time, it should be considered as updated. So save > the same data again is a waste. > > What are the community is doing? Is there any doc or discussion that I can > look for? Thanks. > > > > Shixiong Zhu <zsxw...@gmail.com>于2015年9月24日周四 下午4:27写道: > >> For data that are not updated, where do you save? Or do you only want to >> avoid accessing database for those that are not updated? >> >> Besides, the community is working on optimizing "updateStateBykey"'s >> performance. Hope it will be delivered soon. >> >> Best Regards, >> Shixiong Zhu >> >> 2015-09-24 13:45 GMT+08:00 Bin Wang <wbi...@gmail.com>: >> >>> I've read the source code and it seems to be impossible, but I'd like to >>> confirm it. >>> >>> It is a very useful feature. For example, I need to store the state of >>> DStream into my database, in order to recovery them from next redeploy. But >>> I only need to save the updated ones. Save all keys into database is a lot >>> of waste. >>> >>> Through the source code, I think it could be add easily: StateDStream >>> can get prevStateRDD so that it can make a diff. Is there any chance to add >>> this as an API of StateDStream? If so, I can work on this feature. >>> >>> If not possible, is there any work around or hack to do this by myself? >>> >> >>