You can create connection like this:
val updateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])])
=> {
val dbConnection = create a db connection
iterator.flatMap { case (key, values, stateOption) =>
if (values.isEmpty) {
// don't access database
} else {
// update to new state and save to database
}
// return new state
}
TaskContext.get().addTaskCompletionListener(_ => db.disconnect())
}
Best Regards,
Shixiong Zhu
2015-09-24 17:42 GMT+08:00 Bin Wang <[email protected]>:
> It seems like a work around. But I don't know how to get the database
> connection from the working nodes.
>
> Shixiong Zhu <[email protected]>于2015年9月24日周四 下午5:37写道:
>
>> Could you write your update func like this?
>>
>> val updateFunc = (iterator: Iterator[(String, Seq[Int],
>> Option[Int])]) => {
>> iterator.flatMap { case (key, values, stateOption) =>
>> if (values.isEmpty) {
>> // don't access database
>> } else {
>> // update to new state and save to database
>> }
>> // return new state
>> }
>> }
>>
>> and use this overload:
>>
>> def updateStateByKey[S: ClassTag](
>> updateFunc: (Seq[V], Option[S]) => Option[S],
>> partitioner: Partitioner
>> ): DStream[(K, S)]
>>
>> There is a JIRA: https://issues.apache.org/jira/browse/SPARK-2629 but
>> doesn't have a doc now...
>>
>>
>> Best Regards,
>> Shixiong Zhu
>>
>> 2015-09-24 17:26 GMT+08:00 Bin Wang <[email protected]>:
>>
>>> Data that are not updated should be saved earlier: while the data added
>>> to the DStream at the first time, it should be considered as updated. So
>>> save the same data again is a waste.
>>>
>>> What are the community is doing? Is there any doc or discussion that I
>>> can look for? Thanks.
>>>
>>>
>>>
>>> Shixiong Zhu <[email protected]>于2015年9月24日周四 下午4:27写道:
>>>
>>>> For data that are not updated, where do you save? Or do you only want
>>>> to avoid accessing database for those that are not updated?
>>>>
>>>> Besides, the community is working on optimizing "updateStateBykey"'s
>>>> performance. Hope it will be delivered soon.
>>>>
>>>> Best Regards,
>>>> Shixiong Zhu
>>>>
>>>> 2015-09-24 13:45 GMT+08:00 Bin Wang <[email protected]>:
>>>>
>>>>> I've read the source code and it seems to be impossible, but I'd like
>>>>> to confirm it.
>>>>>
>>>>> It is a very useful feature. For example, I need to store the state of
>>>>> DStream into my database, in order to recovery them from next redeploy.
>>>>> But
>>>>> I only need to save the updated ones. Save all keys into database is a lot
>>>>> of waste.
>>>>>
>>>>> Through the source code, I think it could be add easily: StateDStream
>>>>> can get prevStateRDD so that it can make a diff. Is there any chance to
>>>>> add
>>>>> this as an API of StateDStream? If so, I can work on this feature.
>>>>>
>>>>> If not possible, is there any work around or hack to do this by myself?
>>>>>
>>>>
>>>>
>>