I am now trying to use mapWithState in the following way using some example
codes. But, by looking at the DAG it does not seem to checkpoint the state
and when restarting the application from checkpoint, it re-partitions all
the previous batches data from kafka.

static Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
MyClass>> mappingFunc =
new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String,
MyClass>>() {
@Override
public Tuple2<String, MyClass> call(String key, Optional<MyClass> one,
State<MyClass> state) {
MyClass nullObj = new MyClass();
nullObj.setImprLog(null);
nullObj.setNotifyLog(null);
MyClass current = one.or(nullObj);

if(current!= null && current.getImprLog() != null &&
current.getMyClassType() == 1){
return new Tuple2<>(key, null);
}
else if (current.getNotifyLog() != null  && current.getMyClassType() == 3){
MyClass oldState = (state.exists() ? state.get() : nullObj);
if(oldState!= null && oldState.getNotifyLog() != null){
oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
return new Tuple2<>(key, oldState);
}
else{
return new Tuple2<>(key, null);
}
}
else{
return new Tuple2<>(key, null);
}

}
};


Please suggest if this is the proper way or am I doing something wrong.


Thanks !!
Abhi

On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <sebastian....@gmail.com>
wrote:

> If you don't want to update your only option will be updateStateByKey then
> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yuzhih...@gmail.com> wrote:
>
>> mapWithState supports checkpoint.
>>
>> There has been some bug fix since release of 1.6.0
>> e.g.
>>   SPARK-12591 NullPointerException using checkpointed mapWithState with
>> KryoSerializer
>>
>> which is in the upcoming 1.6.1
>>
>> Cheers
>>
>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand <abhis.anan...@gmail.com
>> > wrote:
>>
>>> Does mapWithState checkpoints the data ?
>>>
>>> When my application goes down and is restarted from checkpoint, will
>>> mapWithState need to recompute the previous batches data ?
>>>
>>> Also, to use mapWithState I will need to upgrade my application as I am
>>> using version 1.4.0 and mapWithState isnt supported there. Is there any
>>> other work around ?
>>>
>>> Cheers!!
>>> Abhi
>>>
>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu <sebastian....@gmail.com>
>>> wrote:
>>>
>>>> Looks like mapWithState could help you?
>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <abhis.anan...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I have an use case like follows in my production environment where I
>>>>> am listening from kafka with slideInterval of 1 min and windowLength of 2
>>>>> hours.
>>>>>
>>>>> I have a JavaPairDStream where for each key I am getting the same key
>>>>> but with different value,which might appear in the same batch or some next
>>>>> batch.
>>>>>
>>>>> When the key appears second time I need to update a field in value of
>>>>> previous key with a field in the later key. The keys for which the
>>>>> combination keys do not come should be rejected after 2 hours.
>>>>>
>>>>> At the end of each second I need to output the result to external
>>>>> database.
>>>>>
>>>>> For example :
>>>>>
>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>> At t=1sec I am getting
>>>>> key0,value0(0,"prev0")
>>>>> key1,value1 (1, "prev1")
>>>>> key2,value2 (2,"prev2")
>>>>> key2,value3 (3, "next2")
>>>>>
>>>>> Output to database after 1 sec
>>>>> key2, newValue (2,"next2")
>>>>>
>>>>> At t=2 sec getting
>>>>> key3,value4(4,"prev3")
>>>>> key1,value5(5,"next1")
>>>>>
>>>>> Output to database after 2 sec
>>>>> key1,newValue(1,"next1")
>>>>>
>>>>> At t=3 sec
>>>>> key4,value6(6,"prev4")
>>>>> key3,value7(7,"next3")
>>>>> key5,value5(8,"prev5")
>>>>> key5,value5(9,"next5")
>>>>> key0,value0(10,"next0")
>>>>>
>>>>> Output to database after 3 sec
>>>>> key0,newValue(0,"next0")
>>>>> key3,newValue(4,"next3")
>>>>> key5,newValue(8,"next5")
>>>>>
>>>>>
>>>>> Please suggest how this can be achieved.
>>>>>
>>>>>
>>>>> Thanks a lot !!!!
>>>>> Abhi
>>>>>
>>>>>
>>>>>
>>>
>>

Reply via email to