Fix for SPARK-6847 is not in branch-1.6

Should the fix be ported to branch-1.6 ?

Thanks

> On Feb 22, 2016, at 11:55 AM, Shixiong(Ryan) Zhu <shixi...@databricks.com> 
> wrote:
> 
> Hey Abhi,
> 
> Could you post how you use mapWithState? By default, it should do 
> checkpointing every 10 batches.
> However, there is a known issue that prevents mapWithState from checkpointing 
> in some special cases: https://issues.apache.org/jira/browse/SPARK-6847
> 
>> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <abhis.anan...@gmail.com> 
>> wrote:
>> Any Insights on this one ?
>> 
>> 
>> Thanks !!!
>> Abhi 
>> 
>>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand <abhis.anan...@gmail.com> 
>>> wrote:
>>> I am now trying to use mapWithState in the following way using some example 
>>> codes. But, by looking at the DAG it does not seem to checkpoint the state 
>>> and when restarting the application from checkpoint, it re-partitions all 
>>> the previous batches data from kafka.
>>> 
>>> static Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String, 
>>> MyClass>> mappingFunc =
>>>                     new Function3<String, Optional<MyClass>, 
>>> State<MyClass>, Tuple2<String, MyClass>>() {
>>>             @Override
>>>             public Tuple2<String, MyClass> call(String key, 
>>> Optional<MyClass> one, State<MyClass> state) {
>>>                     MyClass nullObj = new MyClass();
>>>                     nullObj.setImprLog(null);
>>>                     nullObj.setNotifyLog(null);
>>>                     MyClass current = one.or(nullObj);
>>> 
>>>                     if(current!= null && current.getImprLog() != null && 
>>> current.getMyClassType() == 1){
>>>                             return new Tuple2<>(key, null);
>>>                     }
>>>                     else if (current.getNotifyLog() != null  && 
>>> current.getMyClassType() == 3){
>>>                             MyClass oldState = (state.exists() ? 
>>> state.get() : nullObj);
>>>                             if(oldState!= null && oldState.getNotifyLog() 
>>> != null){
>>>                                     
>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd());
>>>                                     return new Tuple2<>(key, oldState);
>>>                             }
>>>                             else{
>>>                                     return new Tuple2<>(key, null);
>>>                             }
>>>                     }
>>>                     else{
>>>                             return new Tuple2<>(key, null);
>>>                     }
>>> 
>>>             }
>>>     };
>>> 
>>> 
>>> Please suggest if this is the proper way or am I doing something wrong.
>>> 
>>> 
>>> Thanks !!
>>> Abhi 
>>> 
>>>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu <sebastian....@gmail.com> 
>>>> wrote:
>>>> If you don't want to update your only option will be updateStateByKey then
>>>> 
>>>>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yuzhih...@gmail.com> wrote:
>>>>> mapWithState supports checkpoint.
>>>>> 
>>>>> There has been some bug fix since release of 1.6.0
>>>>> e.g.
>>>>>   SPARK-12591 NullPointerException using checkpointed mapWithState with 
>>>>> KryoSerializer
>>>>> 
>>>>> which is in the upcoming 1.6.1
>>>>> 
>>>>> Cheers
>>>>> 
>>>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand 
>>>>>> <abhis.anan...@gmail.com> wrote:
>>>>>> Does mapWithState checkpoints the data ?
>>>>>> 
>>>>>> When my application goes down and is restarted from checkpoint, will 
>>>>>> mapWithState need to recompute the previous batches data ?
>>>>>> 
>>>>>> Also, to use mapWithState I will need to upgrade my application as I am 
>>>>>> using version 1.4.0 and mapWithState isnt supported there. Is there any 
>>>>>> other work around ?
>>>>>> 
>>>>>> Cheers!!
>>>>>> Abhi
>>>>>> 
>>>>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu 
>>>>>>> <sebastian....@gmail.com> wrote:
>>>>>>> Looks like mapWithState could help you?
>>>>>>> 
>>>>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" <abhis.anan...@gmail.com> 
>>>>>>>> wrote:
>>>>>>>> Hi All,
>>>>>>>> 
>>>>>>>> I have an use case like follows in my production environment where I 
>>>>>>>> am listening from kafka with slideInterval of 1 min and windowLength 
>>>>>>>> of 2 hours.
>>>>>>>> 
>>>>>>>> I have a JavaPairDStream where for each key I am getting the same key 
>>>>>>>> but with different value,which might appear in the same batch or some 
>>>>>>>> next batch. 
>>>>>>>> 
>>>>>>>> When the key appears second time I need to update a field in value of 
>>>>>>>> previous key with a field in the later key. The keys for which the 
>>>>>>>> combination keys do not come should be rejected after 2 hours.
>>>>>>>> 
>>>>>>>> At the end of each second I need to output the result to external 
>>>>>>>> database.
>>>>>>>> 
>>>>>>>> For example :
>>>>>>>> 
>>>>>>>> Suppose valueX is object of MyClass with fields int a, String b
>>>>>>>> At t=1sec I am getting 
>>>>>>>> key0,value0(0,"prev0") 
>>>>>>>> key1,value1 (1, "prev1")
>>>>>>>> key2,value2 (2,"prev2")
>>>>>>>> key2,value3 (3, "next2")
>>>>>>>> 
>>>>>>>> Output to database after 1 sec 
>>>>>>>> key2, newValue (2,"next2")
>>>>>>>> 
>>>>>>>> At t=2 sec getting 
>>>>>>>> key3,value4(4,"prev3")
>>>>>>>> key1,value5(5,"next1")
>>>>>>>> 
>>>>>>>> Output to database after 2 sec 
>>>>>>>> key1,newValue(1,"next1")
>>>>>>>> 
>>>>>>>> At t=3 sec 
>>>>>>>> key4,value6(6,"prev4")
>>>>>>>> key3,value7(7,"next3")
>>>>>>>> key5,value5(8,"prev5")
>>>>>>>> key5,value5(9,"next5")
>>>>>>>> key0,value0(10,"next0")
>>>>>>>> 
>>>>>>>> Output to database after 3 sec
>>>>>>>> key0,newValue(0,"next0")
>>>>>>>> key3,newValue(4,"next3")
>>>>>>>> key5,newValue(8,"next5")
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Please suggest how this can be achieved.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Thanks a lot !!!!
>>>>>>>> Abhi 
> 

Reply via email to