Hi Ryan, I am using mapWithState after doing reduceByKey.
I am right now using mapWithState as you suggested and triggering the count manually. But, still unable to see any checkpointing taking place. In the DAG I can see that the reduceByKey operation for the previous batches are also being computed. Thanks Abhi On Tue, Feb 23, 2016 at 2:36 AM, Shixiong(Ryan) Zhu <shixi...@databricks.com > wrote: > Hey Abhi, > > Using reducebykeyandwindow and mapWithState will trigger the bug > in SPARK-6847. Here is a workaround to trigger checkpoint manually: > > JavaMapWithStateDStream<...> stateDStream = > myPairDstream.mapWithState(StateSpec.function(mappingFunc)); > stateDStream.foreachRDD(new Function1<...>() { > @Override > public Void call(JavaRDD<...> rdd) throws Exception { > rdd.count(); > } > }); > return stateDStream.stateSnapshots(); > > > On Mon, Feb 22, 2016 at 12:25 PM, Abhishek Anand <abhis.anan...@gmail.com> > wrote: > >> Hi Ryan, >> >> Reposting the code. >> >> Basically my use case is something like - I am receiving the web >> impression logs and may get the notify (listening from kafka) for those >> impressions in the same interval (for me its 1 min) or any next interval >> (upto 2 hours). Now, when I receive notify for a particular impression I >> need to swap the date field in impression with the date field in notify >> logs. The notify for an impression has the same key as impression. >> >> static Function3<String, Optional<MyClass>, State<MyClass>, >> Tuple2<String, MyClass>> mappingFunc = >> new Function3<String, Optional<MyClass>, State<MyClass>, Tuple2<String, >> MyClass>>() { >> @Override >> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one, >> State<MyClass> state) { >> MyClass nullObj = new MyClass(); >> nullObj.setImprLog(null); >> nullObj.setNotifyLog(null); >> MyClass current = one.or(nullObj); >> >> if(current!= null && current.getImprLog() != null && >> current.getMyClassType() == 1 /*this is impression*/){ >> return new Tuple2<>(key, null); >> } >> else if (current.getNotifyLog() != null && current.getMyClassType() == 3 >> /*notify for the impression received*/){ >> MyClass oldState = (state.exists() ? state.get() : nullObj); >> if(oldState!= null && oldState.getNotifyLog() != null){ >> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd()); >> //swappping the dates >> return new Tuple2<>(key, oldState); >> } >> else{ >> return new Tuple2<>(key, null); >> } >> } >> else{ >> return new Tuple2<>(key, null); >> } >> >> } >> }; >> >> >> return >> myPairDstream.mapWithState(StateSpec.function(mappingFunc)).stateSnapshots(); >> >> >> Currently I am using reducebykeyandwindow without the inverse function >> and I am able to get the correct data. But, issue the might arise is when I >> have to restart my application from checkpoint and it repartitions and >> computes the previous 120 partitions, which delays the incoming batches. >> >> >> Thanks !! >> Abhi >> >> On Tue, Feb 23, 2016 at 1:25 AM, Shixiong(Ryan) Zhu < >> shixi...@databricks.com> wrote: >> >>> Hey Abhi, >>> >>> Could you post how you use mapWithState? By default, it should do >>> checkpointing every 10 batches. >>> However, there is a known issue that prevents mapWithState from >>> checkpointing in some special cases: >>> https://issues.apache.org/jira/browse/SPARK-6847 >>> >>> On Mon, Feb 22, 2016 at 5:47 AM, Abhishek Anand <abhis.anan...@gmail.com >>> > wrote: >>> >>>> Any Insights on this one ? >>>> >>>> >>>> Thanks !!! >>>> Abhi >>>> >>>> On Mon, Feb 15, 2016 at 11:08 PM, Abhishek Anand < >>>> abhis.anan...@gmail.com> wrote: >>>> >>>>> I am now trying to use mapWithState in the following way using some >>>>> example codes. But, by looking at the DAG it does not seem to checkpoint >>>>> the state and when restarting the application from checkpoint, it >>>>> re-partitions all the previous batches data from kafka. >>>>> >>>>> static Function3<String, Optional<MyClass>, State<MyClass>, >>>>> Tuple2<String, MyClass>> mappingFunc = >>>>> new Function3<String, Optional<MyClass>, State<MyClass>, >>>>> Tuple2<String, MyClass>>() { >>>>> @Override >>>>> public Tuple2<String, MyClass> call(String key, Optional<MyClass> one, >>>>> State<MyClass> state) { >>>>> MyClass nullObj = new MyClass(); >>>>> nullObj.setImprLog(null); >>>>> nullObj.setNotifyLog(null); >>>>> MyClass current = one.or(nullObj); >>>>> >>>>> if(current!= null && current.getImprLog() != null && >>>>> current.getMyClassType() == 1){ >>>>> return new Tuple2<>(key, null); >>>>> } >>>>> else if (current.getNotifyLog() != null && current.getMyClassType() >>>>> == 3){ >>>>> MyClass oldState = (state.exists() ? state.get() : nullObj); >>>>> if(oldState!= null && oldState.getNotifyLog() != null){ >>>>> oldState.getImprLog().setCrtd(current.getNotifyLog().getCrtd()); >>>>> return new Tuple2<>(key, oldState); >>>>> } >>>>> else{ >>>>> return new Tuple2<>(key, null); >>>>> } >>>>> } >>>>> else{ >>>>> return new Tuple2<>(key, null); >>>>> } >>>>> >>>>> } >>>>> }; >>>>> >>>>> >>>>> Please suggest if this is the proper way or am I doing something wrong. >>>>> >>>>> >>>>> Thanks !! >>>>> Abhi >>>>> >>>>> On Sun, Feb 14, 2016 at 2:26 AM, Sebastian Piu < >>>>> sebastian....@gmail.com> wrote: >>>>> >>>>>> If you don't want to update your only option will be updateStateByKey >>>>>> then >>>>>> On 13 Feb 2016 8:48 p.m., "Ted Yu" <yuzhih...@gmail.com> wrote: >>>>>> >>>>>>> mapWithState supports checkpoint. >>>>>>> >>>>>>> There has been some bug fix since release of 1.6.0 >>>>>>> e.g. >>>>>>> SPARK-12591 NullPointerException using checkpointed mapWithState >>>>>>> with KryoSerializer >>>>>>> >>>>>>> which is in the upcoming 1.6.1 >>>>>>> >>>>>>> Cheers >>>>>>> >>>>>>> On Sat, Feb 13, 2016 at 12:05 PM, Abhishek Anand < >>>>>>> abhis.anan...@gmail.com> wrote: >>>>>>> >>>>>>>> Does mapWithState checkpoints the data ? >>>>>>>> >>>>>>>> When my application goes down and is restarted from checkpoint, >>>>>>>> will mapWithState need to recompute the previous batches data ? >>>>>>>> >>>>>>>> Also, to use mapWithState I will need to upgrade my application as >>>>>>>> I am using version 1.4.0 and mapWithState isnt supported there. Is >>>>>>>> there >>>>>>>> any other work around ? >>>>>>>> >>>>>>>> Cheers!! >>>>>>>> Abhi >>>>>>>> >>>>>>>> On Fri, Feb 12, 2016 at 3:33 AM, Sebastian Piu < >>>>>>>> sebastian....@gmail.com> wrote: >>>>>>>> >>>>>>>>> Looks like mapWithState could help you? >>>>>>>>> On 11 Feb 2016 8:40 p.m., "Abhishek Anand" < >>>>>>>>> abhis.anan...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi All, >>>>>>>>>> >>>>>>>>>> I have an use case like follows in my production environment >>>>>>>>>> where I am listening from kafka with slideInterval of 1 min and >>>>>>>>>> windowLength of 2 hours. >>>>>>>>>> >>>>>>>>>> I have a JavaPairDStream where for each key I am getting the same >>>>>>>>>> key but with different value,which might appear in the same batch or >>>>>>>>>> some >>>>>>>>>> next batch. >>>>>>>>>> >>>>>>>>>> When the key appears second time I need to update a field in >>>>>>>>>> value of previous key with a field in the later key. The keys for >>>>>>>>>> which the >>>>>>>>>> combination keys do not come should be rejected after 2 hours. >>>>>>>>>> >>>>>>>>>> At the end of each second I need to output the result to external >>>>>>>>>> database. >>>>>>>>>> >>>>>>>>>> For example : >>>>>>>>>> >>>>>>>>>> Suppose valueX is object of MyClass with fields int a, String b >>>>>>>>>> At t=1sec I am getting >>>>>>>>>> key0,value0(0,"prev0") >>>>>>>>>> key1,value1 (1, "prev1") >>>>>>>>>> key2,value2 (2,"prev2") >>>>>>>>>> key2,value3 (3, "next2") >>>>>>>>>> >>>>>>>>>> Output to database after 1 sec >>>>>>>>>> key2, newValue (2,"next2") >>>>>>>>>> >>>>>>>>>> At t=2 sec getting >>>>>>>>>> key3,value4(4,"prev3") >>>>>>>>>> key1,value5(5,"next1") >>>>>>>>>> >>>>>>>>>> Output to database after 2 sec >>>>>>>>>> key1,newValue(1,"next1") >>>>>>>>>> >>>>>>>>>> At t=3 sec >>>>>>>>>> key4,value6(6,"prev4") >>>>>>>>>> key3,value7(7,"next3") >>>>>>>>>> key5,value5(8,"prev5") >>>>>>>>>> key5,value5(9,"next5") >>>>>>>>>> key0,value0(10,"next0") >>>>>>>>>> >>>>>>>>>> Output to database after 3 sec >>>>>>>>>> key0,newValue(0,"next0") >>>>>>>>>> key3,newValue(4,"next3") >>>>>>>>>> key5,newValue(8,"next5") >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Please suggest how this can be achieved. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Thanks a lot !!!! >>>>>>>>>> Abhi >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>>> >>> >> >